# Test Individual Crews

This notebook allows you to test and debug each crew independently.

**Setup:**
1. Make sure your `.env` file has OPENAI_API_KEY
2. Run cells in order (or jump to specific crew)


In [None]:
# Setup and imports
import json
import sys
import os
from pathlib import Path
from dotenv import load_dotenv

# Add src to path
sys.path.insert(0, str(Path.cwd() / "src"))

# Load environment
load_dotenv()

# Set OpenAI model for CrewAI
os.environ["OPENAI_MODEL_NAME"] = "gpt-4o-mini"  # or gpt-4o, gpt-4-turbo
# Verify API key is loaded
if not os.getenv("OPENAI_API_KEY"):
    print("‚ö†Ô∏è  WARNING: OPENAI_API_KEY not found in environment!")
    print("   Please add it to your .env file")
else:
    print("‚úì OpenAI API key loaded")
    print(f"‚úì Using model: {os.getenv('OPENAI_MODEL_NAME')}")

print("‚úì Setup complete")

In [None]:
# Weaviate Configuration and Health Check
import weaviate

# Set Weaviate connection parameters
os.environ.setdefault("WEAVIATE_HOST", "localhost")
os.environ.setdefault("WEAVIATE_PORT", "8080")
os.environ.setdefault("WEAVIATE_GRPC_PORT", "50051")

print("Checking Weaviate connection...")
try:
    client = weaviate.connect_to_local(
        host=os.getenv("WEAVIATE_HOST"),
        port=int(os.getenv("WEAVIATE_PORT")),
        grpc_port=int(os.getenv("WEAVIATE_GRPC_PORT")),
    )

    # Check if connected
    if client.is_ready():
        print("‚úì Weaviate is connected and ready")

        # Check if SecurityControl collection exists
        if client.collections.exists("SecurityControl"):
            collection = client.collections.get("SecurityControl")
            # Try to count objects (may not work in all versions)
            print("‚úì SecurityControl collection exists")
        else:
            print("‚ö†Ô∏è  WARNING: SecurityControl collection does not exist!")
            print("   Run: python -m security_requirements_system.tools.weaviate_setup")
    else:
        print("‚ö†Ô∏è  WARNING: Weaviate is not ready")

    client.close()

except Exception as e:
    print(f"‚ùå ERROR: Cannot connect to Weaviate: {e}")
    print(f"   Make sure Weaviate is running on {os.getenv('WEAVIATE_HOST')}:{os.getenv('WEAVIATE_PORT')}")
    print("   Start with: docker-compose up -d")
    print("\nWeaviate Configuration:")
    print(f"  Host: {os.getenv('WEAVIATE_HOST')}")
    print(f"  Port: {os.getenv('WEAVIATE_PORT')}")
    print(f"  gRPC Port: {os.getenv('WEAVIATE_GRPC_PORT')}")

In [None]:
# Helper functions

# Determine project root directory
# This handles cases where notebook is run from different locations
_current_dir = Path.cwd()
if (_current_dir / "test_outputs").exists():
    # Running from project root
    PROJECT_ROOT = _current_dir
elif (_current_dir.parent / "test_outputs").exists():
    # Running from tests/ subdirectory
    PROJECT_ROOT = _current_dir.parent
else:
    # Fallback: assume we're in project root
    PROJECT_ROOT = _current_dir

TEST_OUTPUTS_DIR = PROJECT_ROOT / "test_outputs"


def display_crew_output(result, crew_name):
    """Display crew output in a readable format."""
    print(f"\n{'='*60}")
    print(f"{crew_name} OUTPUT")
    print(f"{'='*60}\n")

    if hasattr(result, "raw"):
        print("RAW OUTPUT:")
        print(result.raw[:500] + "..." if len(result.raw) > 500 else result.raw)

    if hasattr(result, "pydantic") and result.pydantic:
        print("\nSTRUCTURED OUTPUT:")
        print(json.dumps(result.pydantic.model_dump(), indent=2, default=str)[:1000])

    if hasattr(result, "tasks_output"):
        print(f"\nTASKS: {len(result.tasks_output)}")
        for i, task in enumerate(result.tasks_output):
            task_name = task.name if hasattr(task, "name") else f"Task {i}"
            print(f"  {i+1}. {task_name}")
            if hasattr(task, "pydantic") and task.pydantic:
                print(f"     Type: {type(task.pydantic).__name__}")


def save_output(result, crew_name, output_dir=None):
    """Save crew output to JSON file."""
    if output_dir is None:
        output_path = TEST_OUTPUTS_DIR
    else:
        output_path = Path(output_dir)

    output_path.mkdir(exist_ok=True, parents=True)

    output_file = output_path / f"{crew_name}_output.json"

    data = {}
    if hasattr(result, "raw"):
        data["raw"] = result.raw
    if hasattr(result, "pydantic") and result.pydantic:
        data["pydantic"] = result.pydantic.model_dump()
    if hasattr(result, "tasks_output"):
        data["tasks"] = []
        for task in result.tasks_output:
            task_data = {"name": task.name if hasattr(task, "name") else "unknown", "raw": task.raw if hasattr(task, "raw") else str(task)}
            if hasattr(task, "pydantic") and task.pydantic:
                task_data["pydantic"] = task.pydantic.model_dump()
            data["tasks"].append(task_data)

    with open(output_file, "w") as f:
        json.dump(data, f, indent=2, default=str)

    print(f"\nüíæ Saved to: {output_file}")
    return output_file


def load_cached_output(crew_name, output_dir=None):
    """Load cached crew output from JSON file if it exists."""
    if output_dir is None:
        output_path = TEST_OUTPUTS_DIR
    else:
        output_path = Path(output_dir)

    output_file = output_path / f"{crew_name}_output.json"

    if output_file.exists():
        with open(output_file, "r") as f:
            data = json.load(f)
        print(f"üìÇ Loaded cached output from: {output_file}")
        return data
    else:
        print(f"‚ö†Ô∏è  No cached output found for {crew_name}")
        print(f"   Looked in: {output_file}")
        print(f"   Project root: {PROJECT_ROOT}")
        print(f"   Test outputs dir: {TEST_OUTPUTS_DIR}")
        print(f"   Test outputs exists: {TEST_OUTPUTS_DIR.exists()}")
        if TEST_OUTPUTS_DIR.exists():
            files = list(TEST_OUTPUTS_DIR.glob("*.json"))
            print(f"   Files in test_outputs: {len(files)} files")
            if files:
                print(f"   Sample files: {[f.name for f in files[:5]]}")
        return None


print("‚úì Helper functions loaded")
print(f"  Project root: {PROJECT_ROOT}")
print(f"  Test outputs dir: {TEST_OUTPUTS_DIR}")
print(f"  Test outputs exists: {TEST_OUTPUTS_DIR.exists()}")

## Loading Cached Outputs

Each crew cell below will automatically load required data from cached JSON files if the variables don't exist in memory. This allows you to:
- Jump to any crew without running previous cells
- Reuse outputs from previous runs
- Speed up testing

If a required cache file is missing, you'll see an error prompting you to run the prerequisite crew first.


In [None]:
# Sample requirements text

SAMPLE_REQUIREMENTS = """
E-commerce Platform Requirements:

1. User Management
   - User registration and login
   - Multi-factor authentication
   - Password reset functionality
   - Profile management

2. Product Catalog
   - Browse products by category
   - Search functionality
   - Product details and images
   - Inventory management

3. Shopping Cart & Checkout
   - Add/remove items from cart
   - Secure payment processing (credit cards, PayPal)
   - Order confirmation emails
   - Shipping address management

4. Order Management
   - Order history
   - Order tracking
   - Returns and refunds

5. Admin Panel
   - User management
   - Product management
   - Order processing
   - Sales analytics
"""

print("‚úì Sample requirements loaded")
print(f"Length: {len(SAMPLE_REQUIREMENTS)} characters")

In [None]:
# Helper: Load all cached outputs (run this to load dependencies)


def load_all_dependencies():
    """Load all crew outputs from cache into memory."""
    global analysis_output, architecture_output, stakeholder_output
    global threat_output, domain_output, ai_security_output, compliance_output
    global security_architecture_output, roadmap_output, verification_output, validation_output
    global components_json, threats_json, security_controls_json

    from security_requirements_system.data_models import (
        AnalysisOutput,
        ArchitectureOutput,
        ThreatModelingOutput,
        DomainSecurityOutput,
    )

    # 1. Requirements Analysis
    if cached := load_cached_output("requirements_analysis"):
        analysis_task = next((t for t in cached["tasks"] if t["name"] == "analyze_requirements"), None)
        arch_task = next((t for t in cached["tasks"] if t["name"] == "analyze_architecture"), None)

        if analysis_task and "pydantic" in analysis_task:
            analysis_output = AnalysisOutput(**analysis_task["pydantic"])
        if arch_task and "pydantic" in arch_task:
            architecture_output = ArchitectureOutput(**arch_task["pydantic"])
            components_json = json.dumps([c for c in arch_task["pydantic"]["components"]])

    # 2. Stakeholder
    if cached := load_cached_output("stakeholder"):
        stakeholder_output = cached.get("raw", "")

    # 3. Threat Modeling
    if cached := load_cached_output("threat_modeling"):
        if "pydantic" in cached:
            threat_output = ThreatModelingOutput(**cached["pydantic"])
            threats_json = json.dumps(cached["pydantic"])

    # 4. Domain Security
    if cached := load_cached_output("domain_security"):
        if "tasks" in cached and len(cached["tasks"]) > 0:
            if "pydantic" in cached["tasks"][0]:
                domain_output = DomainSecurityOutput(**cached["tasks"][0]["pydantic"])
                security_controls_json = json.dumps(cached["tasks"][0]["pydantic"])

    # 5. AI/LLM Security
    if cached := load_cached_output("llm_security"):
        ai_security_output = cached.get("raw", "")

    # 6. Compliance
    if cached := load_cached_output("compliance"):
        compliance_output = cached.get("raw", "")

    # 7. Security Architecture
    if cached := load_cached_output("security_architecture"):
        security_architecture_output = cached.get("raw", "")

    # 8. Roadmap
    if cached := load_cached_output("roadmap"):
        roadmap_output = cached.get("raw", "")

    # 9. Verification
    if cached := load_cached_output("verification"):
        verification_output = cached.get("raw", "")

    # 10. Validation
    if cached := load_cached_output("validation"):
        if "tasks" in cached and len(cached["tasks"]) > 0:
            if "pydantic" in cached["tasks"][0]:
                from security_requirements_system.data_models import ValidationOutput

                validation_output = ValidationOutput(**cached["tasks"][0]["pydantic"])

    print("\n‚úì All available dependencies loaded from cache")


# Uncomment to auto-load all cached outputs:
load_all_dependencies()

---

## 1. Requirements Analysis Crew

Analyzes product requirements and extracts security-relevant information.


In [None]:
from security_requirements_system.crews.requirements_analysis_crew import AnalysisOutputCrew

print("Testing Requirements Analysis Crew...\n")

crew = AnalysisOutputCrew().crew()
result = crew.kickoff(inputs={"requirements_text": SAMPLE_REQUIREMENTS})

display_crew_output(result, "Requirements Analysis")
save_output(result, "requirements_analysis")

# Store outputs for downstream crews
analysis_task = next(filter(lambda x: x.name == "analyze_requirements", result.tasks_output))
architecture_task = next(filter(lambda x: x.name == "analyze_architecture", result.tasks_output))

analysis_output = analysis_task.pydantic
architecture_output = architecture_task.pydantic

print(f"\n‚úì Application Summary: {analysis_output.application_summary[:150]}...")
print(f"‚úì High-level requirements: {len(analysis_output.high_level_requirements)}")
print(f"‚úì Architecture: {architecture_output.architecture_summary[:150]}...")

## 2. Stakeholder Crew

Identifies stakeholders and trust boundaries.


In [None]:
from security_requirements_system.crews.stakeholder_crew import StakeholderCrew

print("Testing Stakeholder Crew...\n")

# Load dependencies from cache if not in memory
if "architecture_output" not in globals():
    print("‚ö†Ô∏è  Loading dependencies from cache...")
    from security_requirements_system.data_models import ArchitectureOutput

    cached = load_cached_output("requirements_analysis")
    if cached and "tasks" in cached:
        arch_task = next((t for t in cached["tasks"] if t["name"] == "analyze_architecture"), None)
        if arch_task and "pydantic" in arch_task:
            architecture_output = ArchitectureOutput(**arch_task["pydantic"])

crew = StakeholderCrew().crew()
result = crew.kickoff(inputs={"requirements_text": SAMPLE_REQUIREMENTS, "architecture_summary": architecture_output.architecture_summary})

display_crew_output(result, "Stakeholder Analysis")
save_output(result, "stakeholder")

stakeholder_output = result.raw
print(f"\n‚úì Output length: {len(stakeholder_output)} characters")

## 3. Threat Modeling Crew

Performs STRIDE threat modeling.


In [None]:
from security_requirements_system.crews.threat_modeling_crew import ThreatModelingCrew

print("Testing Threat Modeling Crew...\n")

# Load dependencies from cache if not in memory
if "architecture_output" not in globals():
    print("‚ö†Ô∏è  Loading dependencies from cache...")
    from security_requirements_system.data_models import ArchitectureOutput

    cached = load_cached_output("requirements_analysis")
    if cached and "tasks" in cached:
        arch_task = next((t for t in cached["tasks"] if t["name"] == "analyze_architecture"), None)
        if arch_task and "pydantic" in arch_task:
            architecture_output = ArchitectureOutput(**arch_task["pydantic"])

# Prepare components JSON
components_json = json.dumps([c.model_dump() for c in architecture_output.components]) if architecture_output.components else "[]"

crew = ThreatModelingCrew().crew()
result = crew.kickoff(
    inputs={
        "requirements_text": SAMPLE_REQUIREMENTS,
        "architecture_summary": architecture_output.architecture_summary,
        "components": components_json,
    }
)

display_crew_output(result, "Threat Modeling")
save_output(result, "threat_modeling")

threat_output = result.pydantic
threats_json = json.dumps(threat_output.model_dump())
print(f"\n‚úì Threats identified: {len(threat_output.threats)}")
print(f"\nTop 5 threats:")
for i, threat in enumerate(threat_output.threats[:5], 1):
    print(f"{i}. {threat.threat_id}: {threat.description[:80]}...")

## 4. Domain Security Crew (OWASP ASVS)

Maps requirements to OWASP ASVS controls.


In [None]:
from security_requirements_system.crews.domain_security_crew import DomainSecurityCrew

print("Testing Domain Security Crew...\n")

# Load dependencies from cache if not in memory
if "analysis_output" not in globals():
    print("‚ö†Ô∏è  Loading dependencies from cache...")
    from security_requirements_system.data_models import AnalysisOutput

    cached = load_cached_output("requirements_analysis")
    if cached and "tasks" in cached:
        analysis_task = next((t for t in cached["tasks"] if t["name"] == "analyze_requirements"), None)
        if analysis_task and "pydantic" in analysis_task:
            analysis_output = AnalysisOutput(**analysis_task["pydantic"])

crew = DomainSecurityCrew().crew()
result = crew.kickoff(inputs={"high_level_requirements": json.dumps(analysis_output.high_level_requirements)})

display_crew_output(result, "Domain Security (OWASP)")
save_output(result, "domain_security")

domain_output = result.tasks_output[0].pydantic
security_controls_json = json.dumps(domain_output.model_dump())
total_controls = sum(len(rm.owasp_controls) for rm in domain_output.requirements_mapping)

print(f"\n‚úì Requirements mapped: {len(domain_output.requirements_mapping)}")
print(f"‚úì Total OWASP controls: {total_controls}")
print(f"‚úì Recommended ASVS level: {domain_output.recommended_asvs_level}")

## 5. LLM Security Crew

Identifies AI/ML security requirements.


In [None]:
from security_requirements_system.crews.llm_security_crew import LLMSecurityCrew

print("Testing LLM Security Crew...\n")

# Load dependencies from cache if not in memory
if "analysis_output" not in globals():
    print("‚ö†Ô∏è  Loading dependencies from cache...")
    from security_requirements_system.data_models import AnalysisOutput

    cached = load_cached_output("requirements_analysis")
    if cached and "tasks" in cached:
        analysis_task = next((t for t in cached["tasks"] if t["name"] == "analyze_requirements"), None)
        if analysis_task and "pydantic" in analysis_task:
            analysis_output = AnalysisOutput(**analysis_task["pydantic"])

crew = LLMSecurityCrew().crew()
result = crew.kickoff(
    inputs={
        "requirements_text": SAMPLE_REQUIREMENTS,
        "analyzed_requirements": f"Application Summary: {analysis_output.application_summary}\nHigh-Level Requirements: {analysis_output.high_level_requirements}",
    }
)

display_crew_output(result, "LLM Security")
save_output(result, "llm_security")

ai_security_output = result.raw
print(f"\n‚úì Output length: {len(ai_security_output)} characters")

## 6. Compliance Crew

Identifies compliance requirements (GDPR, PCI-DSS, etc.).


In [None]:
from security_requirements_system.crews.compliance_crew import ComplianceCrew

print("Testing Compliance Crew...\n")

# Load dependencies from cache if not in memory
if "analysis_output" not in globals():
    print("‚ö†Ô∏è  Loading dependencies from cache...")
    from security_requirements_system.data_models import AnalysisOutput

    cached = load_cached_output("requirements_analysis")
    if cached and "tasks" in cached:
        analysis_task = next((t for t in cached["tasks"] if t["name"] == "analyze_requirements"), None)
        if analysis_task and "pydantic" in analysis_task:
            analysis_output = AnalysisOutput(**analysis_task["pydantic"])

crew = ComplianceCrew().crew()
result = crew.kickoff(
    inputs={
        "requirements_text": SAMPLE_REQUIREMENTS,
        "analyzed_requirements": f"Application Summary: {analysis_output.application_summary}\nHigh-Level Requirements: {analysis_output.high_level_requirements}",
    }
)

display_crew_output(result, "Compliance")
save_output(result, "compliance")

compliance_output = result.raw
print(f"\n‚úì Output length: {len(compliance_output)} characters")

## 7. Security Architecture Crew

Designs security architecture recommendations.


In [None]:
from security_requirements_system.crews.security_architecture_crew import SecurityArchitectureCrew

print("Testing Security Architecture Crew...\n")

# Load dependencies from cache if not in memory
if "architecture_output" not in globals() or "components_json" not in globals() or "security_controls_json" not in globals():
    print("‚ö†Ô∏è  Loading dependencies from cache...")
    from security_requirements_system.data_models import ArchitectureOutput, DomainSecurityOutput

    # Load architecture
    if "architecture_output" not in globals():
        cached = load_cached_output("requirements_analysis")
        if cached and "tasks" in cached:
            arch_task = next((t for t in cached["tasks"] if t["name"] == "analyze_architecture"), None)
            if arch_task and "pydantic" in arch_task:
                architecture_output = ArchitectureOutput(**arch_task["pydantic"])
                components_json = json.dumps([c.model_dump() for c in architecture_output.components])

    # Load security controls
    if "security_controls_json" not in globals():
        cached = load_cached_output("domain_security")
        if cached and "tasks" in cached and len(cached["tasks"]) > 0:
            if "pydantic" in cached["tasks"][0]:
                security_controls_json = json.dumps(cached["tasks"][0]["pydantic"])

crew = SecurityArchitectureCrew().crew()
result = crew.kickoff(
    inputs={
        "requirements_text": SAMPLE_REQUIREMENTS,
        "architecture_summary": architecture_output.architecture_summary,
        "components": components_json,
        "security_controls": security_controls_json,
    }
)

display_crew_output(result, "Security Architecture")
save_output(result, "security_architecture")

security_architecture_output = result.raw
print(f"\n‚úì Output length: {len(security_architecture_output)} characters")

## 8. Implementation Roadmap Crew

Creates phased implementation plan.


In [None]:
from security_requirements_system.crews.roadmap_crew import RoadmapCrew

print("Testing Roadmap Crew...\n")

# Load dependencies from cache if not in memory
if "security_controls_json" not in globals() or "threats_json" not in globals() or "compliance_output" not in globals():
    print("‚ö†Ô∏è  Loading dependencies from cache...")

    # Load security controls
    if "security_controls_json" not in globals():
        cached = load_cached_output("domain_security")
        if cached and "tasks" in cached and len(cached["tasks"]) > 0:
            if "pydantic" in cached["tasks"][0]:
                security_controls_json = json.dumps(cached["tasks"][0]["pydantic"])

    # Load threats
    if "threats_json" not in globals():
        cached = load_cached_output("threat_modeling")
        if cached and "pydantic" in cached:
            threats_json = json.dumps(cached["pydantic"])

    # Load compliance
    if "compliance_output" not in globals():
        cached = load_cached_output("compliance")
        if cached:
            compliance_output = cached.get("raw", "")

crew = RoadmapCrew().crew()
result = crew.kickoff(
    inputs={
        "requirements_text": SAMPLE_REQUIREMENTS,
        "security_controls": security_controls_json,
        "threats": threats_json,
        "compliance_requirements": compliance_output,
    }
)

display_crew_output(result, "Implementation Roadmap")
save_output(result, "roadmap")

roadmap_output = result.raw
print(f"\n‚úì Output length: {len(roadmap_output)} characters")

## 9. Verification Crew

Defines verification and testing strategy.


In [None]:
from security_requirements_system.crews.verification_crew import VerificationCrew

print("Testing Verification Crew...\n")

# Load dependencies from cache if not in memory
if "security_controls_json" not in globals() or "compliance_output" not in globals():
    print("‚ö†Ô∏è  Loading dependencies from cache...")

    # Load security controls
    if "security_controls_json" not in globals():
        cached = load_cached_output("domain_security")
        if cached and "tasks" in cached and len(cached["tasks"]) > 0:
            if "pydantic" in cached["tasks"][0]:
                security_controls_json = json.dumps(cached["tasks"][0]["pydantic"])

    # Load compliance
    if "compliance_output" not in globals():
        cached = load_cached_output("compliance")
        if cached:
            compliance_output = cached.get("raw", "")

crew = VerificationCrew().crew()
result = crew.kickoff(
    inputs={
        "security_controls": security_controls_json,
        "compliance_requirements": compliance_output,
        "owasp_controls": security_controls_json,
    }
)

display_crew_output(result, "Verification & Testing")
save_output(result, "verification")

verification_output = result.raw
print(f"\n‚úì Output length: {len(verification_output)} characters")

## 10. Validation Crew

Validates completeness and quality of generated requirements.


In [None]:
from security_requirements_system.crews.validation_crew import ValidationCrew

print("Testing Validation Crew...\n")

# Load dependencies from cache if not in memory
if "analysis_output" not in globals() or "domain_output" not in globals():
    print("‚ö†Ô∏è  Loading dependencies from cache...")
    from security_requirements_system.data_models import AnalysisOutput, DomainSecurityOutput

    # Load requirements analysis
    if "analysis_output" not in globals():
        cached = load_cached_output("requirements_analysis")
        if cached and "tasks" in cached:
            analysis_task = next((t for t in cached["tasks"] if t["name"] == "analyze_requirements"), None)
            if analysis_task and "pydantic" in analysis_task:
                analysis_output = AnalysisOutput(**analysis_task["pydantic"])

    # Load domain security
    if "domain_output" not in globals():
        cached = load_cached_output("domain_security")
        if cached and "tasks" in cached and len(cached["tasks"]) > 0:
            if "pydantic" in cached["tasks"][0]:
                domain_output = DomainSecurityOutput(**cached["tasks"][0]["pydantic"])

crew = ValidationCrew().crew()
result = crew.kickoff(
    inputs={
        "requirements_text": SAMPLE_REQUIREMENTS,
        "analyzed_requirements": f"Application Summary: {analysis_output.application_summary}\\nHigh-Level Requirements: {analysis_output.high_level_requirements}",
        "security_controls": domain_output.model_dump_json(indent=2),
        "ai_security": "No AI components detected",
        "compliance_requirements": "PCI-DSS, GDPR",
    }
)

display_crew_output(result, "Validation")
save_output(result, "validation")

validation_output = result.tasks_output[0].pydantic
print(f"\n‚úì Validation score: {validation_output.overall_score:.2f}")
print(f"‚úì Validation passed: {validation_output.validation_passed}")
print(f"\nDimension scores:")
for dim, score in validation_output.dimension_scores.items():
    print(f"  - {dim}: {score:.2f}")

## Summary

All crew outputs saved to `test_outputs/` directory.

Run any cell above to test specific crews. Outputs are cached to JSON files for inspection.


---

## Testing Main Flow Internal Methods

Test the internal methods from `main.py` like `_export_dashboard_artifacts` and `_generate_markdown_summary` using cached crew outputs.



In [None]:
# Setup mock flow state for testing internal methods
from security_requirements_system.main import SecurityRequirementsFlow
from datetime import datetime

print("Setting up mock flow state with cached data...\n")

# Load all cached outputs
load_all_dependencies()

# Create a mock flow instance
flow = SecurityRequirementsFlow()

# Populate state with cached data
if "analysis_output" in globals():
    flow.state.application_summary = analysis_output.application_summary
    flow.state.high_level_requirements = analysis_output.high_level_requirements

    # Load detailed requirements if available
    cached = load_cached_output("requirements_analysis")
    if cached and "tasks" in cached:
        analysis_task = next((t for t in cached["tasks"] if t["name"] == "analyze_requirements"), None)
        if analysis_task and "pydantic" in analysis_task:
            detailed_reqs = analysis_task["pydantic"].get("detailed_requirements", [])
            if detailed_reqs:
                flow.state.detailed_requirements = json.dumps(
                    [r.model_dump() if hasattr(r, "model_dump") else r for r in detailed_reqs], indent=2
                )

if "architecture_output" in globals():
    flow.state.architecture_summary = architecture_output.architecture_summary
    flow.state.architecture_diagram = architecture_output.architecture_diagram
    if architecture_output.components:
        flow.state.components = json.dumps(
            [c.model_dump() if hasattr(c, "model_dump") else c for c in architecture_output.components], indent=2
        )

if "stakeholder_output" in globals():
    flow.state.stakeholders = stakeholder_output

if "threat_output" in globals():
    flow.state.threats = json.dumps(threat_output.model_dump(), indent=2)

if "domain_output" in globals():
    flow.state.security_controls = json.dumps(domain_output.model_dump(), indent=2)

if "ai_security_output" in globals():
    flow.state.ai_security = ai_security_output

if "compliance_output" in globals():
    flow.state.compliance_requirements = compliance_output

if "security_architecture_output" in globals():
    flow.state.security_architecture = security_architecture_output

if "roadmap_output" in globals():
    flow.state.implementation_roadmap = roadmap_output

if "verification_output" in globals():
    flow.state.verification_testing = verification_output

if "validation_output" in globals():
    flow.state.validation_report = json.dumps(validation_output.model_dump(), indent=2)
    flow.state.validation_score = validation_output.overall_score
    flow.state.validation_passed = validation_output.validation_passed

# Set sample requirements text
flow.state.requirements_text = SAMPLE_REQUIREMENTS
flow.state.iteration_count = 1

# Build traceability matrix if not already set
if not flow.state.traceability_matrix:
    try:
        flow.build_traceability_matrix()
    except Exception as e:
        print(f"‚ö†Ô∏è Could not build traceability matrix: {e}")

print("‚úì Mock flow state created")
print(f"  - Requirements: {len(flow.state.high_level_requirements)}")
print(f"  - Has threats: {bool(flow.state.threats)}")
print(f"  - Has controls: {bool(flow.state.security_controls)}")
print(f"  - Has traceability matrix: {bool(flow.state.traceability_matrix)}")

### Test: _export_dashboard_artifacts

Test the dashboard artifacts export function.


In [None]:
# Test _export_dashboard_artifacts
from pathlib import Path

test_artifacts_dir = Path("test_outputs/test_artifacts")
test_artifacts_dir.mkdir(exist_ok=True, parents=True)

# Clean up old artifacts
for file in test_artifacts_dir.glob("*.json"):
    file.unlink()

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

print("Testing _export_dashboard_artifacts...\n")
try:
    flow._export_dashboard_artifacts(test_artifacts_dir, timestamp)

    print("‚úì Dashboard artifacts exported successfully\n")

    # List exported files
    artifact_files = list(test_artifacts_dir.glob("*.json"))
    print(f"Exported {len(artifact_files)} artifact files:\n")

    for file in sorted(artifact_files):
        size = file.stat().st_size / 1024  # KB
        print(f"  - {file.name} ({size:.2f} KB)")

        # Preview first few lines of each file
        with open(file, "r") as f:
            content = json.load(f)
            if isinstance(content, list):
                print(f"    ‚Üí {len(content)} items")
                if len(content) > 0:
                    print(f"    ‚Üí Sample keys: {list(content[0].keys()) if isinstance(content[0], dict) else 'N/A'}")
            elif isinstance(content, dict):
                print(f"    ‚Üí Keys: {list(content.keys())}")

except Exception as e:
    print(f"‚ùå Error: {e}")
    import traceback

    traceback.print_exc()

In [None]:
# Inspect specific artifact files
inspect_file = "asvs_mapping.json"  # Change this to inspect different files

artifact_path = test_artifacts_dir / inspect_file
if artifact_path.exists():
    print(f"Inspecting {inspect_file}:\n")
    with open(artifact_path, "r") as f:
        data = json.load(f)

    if isinstance(data, list):
        print(f"Total items: {len(data)}\n")
        if len(data) > 0:
            print("Sample entries:\n")
            for i, item in enumerate(data[:3], 1):
                print(f"{i}. {json.dumps(item, indent=2)}\n")
            if len(data) > 3:
                print(f"... and {len(data) - 3} more entries")
    else:
        print(json.dumps(data, indent=2))
else:
    print(f"File {inspect_file} not found")
    print(f"Available files: {[f.name for f in test_artifacts_dir.glob('*.json')]}")

### Test: _generate_markdown_summary

Test the markdown generation function and inspect the output.


In [None]:
# Test _generate_markdown_summary
test_output_path = Path("test_security_requirements.qmd")

print("Testing _generate_markdown_summary...\n")
print(f"Output will be saved to: {test_output_path}\n")

try:
    flow._generate_markdown_summary(test_output_path, test_artifacts_dir)

    print("‚úì Markdown summary generated successfully\n")

    # Show file stats
    if test_output_path.exists():
        size = test_output_path.stat().st_size / 1024  # KB
        with open(test_output_path, "r", encoding="utf-8") as f:
            lines = f.readlines()
            line_count = len(lines)
            char_count = sum(len(line) for line in lines)

        print(f"File statistics:")
        print(f"  - Size: {size:.2f} KB")
        print(f"  - Lines: {line_count:,}")
        print(f"  - Characters: {char_count:,}\n")

        # Show first 100 lines
        print("First 100 lines preview:\n")
        print("=" * 80)
        with open(test_output_path, "r", encoding="utf-8") as f:
            for i, line in enumerate(f, 1):
                if i <= 100:
                    print(f"{i:4d}| {line.rstrip()}")
                else:
                    break
        print("=" * 80)
        print(f"\n... (file continues for {line_count - 100} more lines)")

except Exception as e:
    print(f"‚ùå Error: {e}")
    import traceback

    traceback.print_exc()

In [None]:
# Search for specific sections in the generated markdown
if test_output_path.exists():
    with open(test_output_path, "r", encoding="utf-8") as f:
        content = f.read()

    # Find section headers
    import re

    sections = re.findall(r"^##+\s+(.+)$", content, re.MULTILINE)

    print("Found sections:\n")
    for i, section in enumerate(sections[:30], 1):  # Show first 30
        print(f"{i:2d}. {section}")

    if len(sections) > 30:
        print(f"\n... and {len(sections) - 30} more sections")

    # Search for specific content
    search_terms = [
        ("Requirement IDs", r"#### 6\.2\.\d+\.\s+([^\n]+)"),
        ("OWASP Controls", r"Control\s+([A-Z]\d+\.\d+\.\d+)"),
        ("Threat IDs", r"Threat ID[^\n]+"),
    ]

    print("\n\nSearch results:\n")
    for term, pattern in search_terms:
        matches = re.findall(pattern, content)
        print(f"{term}: Found {len(matches)} matches")
        if matches:
            print(f"  Sample: {matches[:3]}")
        print()

In [None]:
# Validate requirement IDs in Section 6.2
if test_output_path.exists():
    with open(test_output_path, "r", encoding="utf-8") as f:
        content = f.read()

    import re

    # Find all section 6.2 subsections
    pattern = r"#### 6\.2\.(\d+)\.\s+([^\n]+):\s+([^\n]+)"
    matches = re.findall(pattern, content)

    print("Section 6.2 Subsections:\n")
    print("Index | Requirement ID | Requirement Text")
    print("-" * 80)

    none_count = 0
    for idx, req_id, req_text in matches:
        if req_id.lower() == "none" or not req_id.strip():
            none_count += 1
            print(f"{idx:5s} | ‚ùå {req_id} | {req_text[:60]}...")
        else:
            print(f"{idx:5s} | ‚úì {req_id} | {req_text[:60]}...")

    print("-" * 80)
    print(f"\nTotal subsections: {len(matches)}")
    print(f"With valid IDs: {len(matches) - none_count}")
    print(f"With 'None' or empty IDs: {none_count}")

    if none_count > 0:
        print(f"\n‚ö†Ô∏è Warning: {none_count} subsections still have 'None' as requirement ID")

### Test: build_traceability_matrix

Test the traceability matrix building function.


In [None]:
# Test build_traceability_matrix
print("Testing build_traceability_matrix...\n")

# Clear existing matrix to test building
flow.state.traceability_matrix = ""

try:
    flow.build_traceability_matrix()

    if flow.state.traceability_matrix:
        matrix_data = json.loads(flow.state.traceability_matrix)
        entries = matrix_data.get("entries", [])
        summary = matrix_data.get("summary", "")

        print("‚úì Traceability matrix built successfully\n")
        print(f"Summary: {summary}\n")
        print(f"Total entries: {len(entries)}\n")

        if entries:
            print("Sample entries:\n")
            for i, entry in enumerate(entries[:3], 1):
                print(f"{i}. {entry.get('req_id', 'N/A')}: {entry.get('high_level_requirement', 'N/A')[:60]}...")
                print(f"   Threats: {len(entry.get('threat_ids', []))}")
                print(f"   Controls: {len(entry.get('owasp_control_ids', []))}")
                print(f"   Priority: {entry.get('priority', 'N/A')}\n")

            # Statistics
            with_threats = sum(1 for e in entries if e.get("threat_ids"))
            with_controls = sum(1 for e in entries if e.get("owasp_control_ids"))

            print(f"Statistics:")
            print(f"  - Entries with threats: {with_threats} ({with_threats/len(entries)*100:.1f}%)")
            print(f"  - Entries with controls: {with_controls} ({with_controls/len(entries)*100:.1f}%)")
    else:
        print("‚ö†Ô∏è Traceability matrix is empty")

except Exception as e:
    print(f"‚ùå Error: {e}")
    import traceback

    traceback.print_exc()

### Helper: Compare Generated Outputs

Compare different runs or validate output quality.


In [None]:
# Compare requirement ID mapping in artifacts vs markdown
print("Comparing requirement IDs across outputs:\n")

# From detailed requirements
if flow.state.detailed_requirements:
    detailed_reqs = json.loads(flow.state.detailed_requirements)
    req_ids_from_details = {}
    for req in detailed_reqs:
        req_text = req.get("requirement_text", "").strip().lower()
        req_id = req.get("requirement_id", "")
        if req_text and req_id:
            req_ids_from_details[req_text] = req_id

# From ASVS mapping artifacts
asvs_file = test_artifacts_dir / "asvs_mapping.json"
req_ids_from_asvs = set()
if asvs_file.exists():
    with open(asvs_file, "r") as f:
        asvs_data = json.load(f)
        for item in asvs_data:
            req_id = item.get("requirement_id")
            if req_id:
                req_ids_from_asvs.add(req_id)

# From markdown
if test_output_path.exists():
    with open(test_output_path, "r", encoding="utf-8") as f:
        md_content = f.read()

    import re

    md_section_pattern = r"#### 6\.2\.\d+\.\s+([A-Z]+-\d+):"
    req_ids_from_md = set(re.findall(md_section_pattern, md_content))

print("Requirement IDs found:\n")
print(f"From detailed requirements: {len(req_ids_from_details)}")
print(f"  Sample: {list(req_ids_from_details.values())[:5]}\n")

print(f"From ASVS mapping artifacts: {len(req_ids_from_asvs)}")
print(f"  Sample: {list(req_ids_from_asvs)[:5]}\n")

print(f"From markdown section 6.2: {len(req_ids_from_md)}")
print(f"  Sample: {list(req_ids_from_md)[:5]}\n")

# Check for consistency
if req_ids_from_details and req_ids_from_asvs:
    overlap = req_ids_from_asvs.intersection(set(req_ids_from_details.values()))
    print(f"Overlap between detailed reqs and ASVS: {len(overlap)}/{len(req_ids_from_asvs)}")

if req_ids_from_asvs and req_ids_from_md:
    overlap_md = req_ids_from_asvs.intersection(req_ids_from_md)
    print(f"Overlap between ASVS and markdown: {len(overlap_md)}/{len(req_ids_from_md)}")

    missing_in_md = req_ids_from_asvs - req_ids_from_md
    if missing_in_md:
        print(f"\n‚ö†Ô∏è IDs in ASVS but not in markdown: {missing_in_md}")

In [None]:
# List all saved outputs
import os

output_dir = Path("test_outputs")
if output_dir.exists():
    print("\nSaved outputs:")
    for file in sorted(output_dir.glob("*.json")):
        size = file.stat().st_size / 1024  # KB
        print(f"  - {file.name} ({size:.1f} KB)")
else:
    print("\nNo outputs saved yet. Run cells above to generate outputs.")