# Security Agents Demo

This notebook demonstrates the capabilities of the Security Agents framework for cybersecurity analysis and response.

## Features
- AI-powered security analysis using LangGraph
- Multiple AI provider support (OpenAI, Anthropic, Local)
- MITRE ATT&CK technique mapping
- Sigma rule generation
- Scenario-based testing
- RESTful API interface

In [None]:
# Import required libraries
import sys
import os
import json
import asyncio
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

# Add the sec-agents package to path
sys.path.append('../')

from sec_agents.core.graph import SecurityGraph
from sec_agents.core.models import AnalysisRequest, AnalysisType, get_provider
from sec_agents.scenarios import run_scenario, get_available_scenarios
from sec_agents.tools.parsers import LogParser, NetworkParser, FileParser
from sec_agents.tools.sigma_builder import SigmaRuleBuilder
from sec_agents.tools.mitre_mapper import MITREMapper

print("Security Agents framework imported successfully!")

## 1. Basic Security Analysis

Let's start with a simple log analysis example.

In [None]:
# Sample suspicious log data
sample_log_data = {
    "raw_log": """
2024-01-15 14:32:15 [INFO] EventID=4688 ProcessName=powershell.exe CommandLine="powershell.exe -EncodedCommand SQBuAHYAbwBrAGUALQBXAGUAYgBSAGUAcQB1AGUAcwB0AA=="
2024-01-15 14:32:16 [INFO] EventID=4688 ProcessName=cmd.exe CommandLine="cmd.exe /c echo malware.exe > temp.exe"
2024-01-15 14:32:17 [WARN] EventID=5001 AntivirusDetection="Suspicious file detected: temp.exe"
2024-01-15 14:32:18 [ERROR] EventID=4625 LogonType=3 IpAddress=192.168.1.100 TargetUserName=Administrator FailureReason=BadPassword
"""
}

# Create analysis request
request = AnalysisRequest(
    analysis_type=AnalysisType.LOG_TRIAGE,
    data=sample_log_data,
    context={"source": "windows_security_log", "host": "DEMO-WORKSTATION"}
)

# Initialize security graph with local provider for demo
graph = SecurityGraph(provider_type="local")

# Run analysis
result = await graph.analyze(request)

print("Analysis Results:")
print(f"Threat Level: {result.threat_level}")
print(f"Confidence Score: {result.confidence_score:.2f}")
print(f"Number of Findings: {len(result.findings)}")
print(f"MITRE Techniques: {result.mitre_techniques}")
print(f"Sigma Rules: {result.sigma_rules}")

# Display findings
for i, finding in enumerate(result.findings, 1):
    print(f"\nFinding {i}:")
    print(f"  Title: {finding.title}")
    print(f"  Severity: {finding.severity}")
    print(f"  Confidence: {finding.confidence:.2f}")
    print(f"  Description: {finding.description}")

## 2. Data Parsing Capabilities

Demonstrate the parsing capabilities for different data types.

In [None]:
# Initialize parsers
log_parser = LogParser()
network_parser = NetworkParser()
file_parser = FileParser()

# Parse log data
parsed_logs = await log_parser.parse(sample_log_data)
print("Parsed Log Data:")
print(json.dumps(parsed_logs, indent=2)[:500] + "...")

# Sample network data
network_data = {
    "connections": [
        {
            "src_ip": "10.10.10.55",
            "dst_ip": "185.243.112.89",
            "dst_port": 443,
            "protocol": "TCP",
            "bytes": 2048576,
            "timestamp": "2024-01-15T10:30:00Z"
        }
    ]
}

# Parse network data
parsed_network = await network_parser.parse(network_data)
print("\nParsed Network Data:")
print(json.dumps(parsed_network, indent=2)[:500] + "...")

## 3. MITRE ATT&CK Mapping

Demonstrate automatic mapping to MITRE ATT&CK techniques.

In [None]:
from sec_agents.core.models import SecurityFinding, ThreatLevel

# Initialize MITRE mapper
mitre_mapper = MITREMapper()

# Create sample findings
sample_findings = [
    SecurityFinding(
        title="PowerShell Encoded Command Execution",
        description="Detected PowerShell executing encoded commands, potentially bypassing security controls",
        severity=ThreatLevel.HIGH,
        confidence=0.9,
        indicators=["powershell.exe", "-EncodedCommand", "base64"]
    ),
    SecurityFinding(
        title="Failed Authentication Attempts",
        description="Multiple failed login attempts detected from external IP address",
        severity=ThreatLevel.MEDIUM,
        confidence=0.8,
        indicators=["failed_login", "brute_force", "authentication"]
    )
]

# Map findings to MITRE techniques
all_techniques = []
for finding in sample_findings:
    techniques = await mitre_mapper.map_finding(finding)
    all_techniques.extend(techniques)
    print(f"Finding: {finding.title}")
    print(f"  Mapped Techniques: {techniques}")
    
    # Get technique details
    for technique in techniques:
        details = await mitre_mapper.get_technique_details(technique)
        print(f"  {technique}: {details['name']}")
    print()

# Generate attack chain
attack_chain = await mitre_mapper.generate_attack_chain(all_techniques)
print("Attack Chain Analysis:")
print(json.dumps(attack_chain, indent=2))

## 4. Sigma Rule Generation

Demonstrate automatic Sigma rule generation from findings.

In [None]:
# Initialize Sigma rule builder
sigma_builder = SigmaRuleBuilder()

# Generate Sigma rules for findings
for finding in sample_findings:
    rules = await sigma_builder.generate_rules(finding, parsed_logs)
    
    print(f"Sigma Rules for: {finding.title}")
    print("=" * 50)
    
    for rule in rules:
        print(rule)
        print("\n" + "-" * 30 + "\n")
        
        # Validate the rule
        validation = await sigma_builder.validate_rule(rule)
        print(f"Validation: {validation}\n")

## 5. Scenario Execution

Run predefined security scenarios.

In [None]:
# Get available scenarios
scenarios = await get_available_scenarios()
print("Available Scenarios:")
for scenario in scenarios:
    print(f"  - {scenario}")

# Run a specific scenario
scenario_name = "log_triage"
print(f"\nRunning scenario: {scenario_name}")

scenario_result = await run_scenario(scenario_name)
print(f"Scenario Success: {scenario_result['success']}")
print(f"Number of Steps: {len(scenario_result['results'])}")

# Analyze scenario results
for i, step_result in enumerate(scenario_result['results'], 1):
    step_name = step_result['step']
    result_data = step_result['result']
    
    print(f"\nStep {i}: {step_name}")
    print(f"  Findings: {len(result_data['findings'])}")
    print(f"  Threat Level: {result_data['threat_level']}")
    print(f"  Confidence: {result_data['confidence_score']:.2f}")
    print(f"  MITRE Techniques: {result_data['mitre_techniques']}")

## 6. Data Visualization

Create visualizations of security analysis results.

In [None]:
# Prepare data for visualization
viz_data = []

for step_result in scenario_result['results']:
    step_name = step_result['step']
    result_data = step_result['result']
    
    viz_data.append({
        'step': step_name,
        'findings_count': len(result_data['findings']),
        'threat_level': result_data['threat_level'],
        'confidence_score': result_data['confidence_score'],
        'mitre_count': len(result_data['mitre_techniques'])
    })

df = pd.DataFrame(viz_data)

# Create visualizations
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('Security Analysis Results Dashboard', fontsize=16)

# Findings count by step
axes[0, 0].bar(df['step'], df['findings_count'], color='skyblue')
axes[0, 0].set_title('Number of Findings by Step')
axes[0, 0].set_xlabel('Analysis Step')
axes[0, 0].set_ylabel('Number of Findings')
axes[0, 0].tick_params(axis='x', rotation=45)

# Confidence scores
axes[0, 1].plot(df['step'], df['confidence_score'], marker='o', color='green')
axes[0, 1].set_title('Confidence Scores')
axes[0, 1].set_xlabel('Analysis Step')
axes[0, 1].set_ylabel('Confidence Score')
axes[0, 1].set_ylim(0, 1)
axes[0, 1].tick_params(axis='x', rotation=45)

# Threat level distribution
threat_counts = df['threat_level'].value_counts()
axes[1, 0].pie(threat_counts.values, labels=threat_counts.index, autopct='%1.1f%%')
axes[1, 0].set_title('Threat Level Distribution')

# MITRE technique coverage
axes[1, 1].bar(df['step'], df['mitre_count'], color='orange')
axes[1, 1].set_title('MITRE Techniques Identified')
axes[1, 1].set_xlabel('Analysis Step')
axes[1, 1].set_ylabel('Number of Techniques')
axes[1, 1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

# Display summary statistics
print("\nSummary Statistics:")
print(f"Total Findings: {df['findings_count'].sum()}")
print(f"Average Confidence: {df['confidence_score'].mean():.2f}")
print(f"Total MITRE Techniques: {df['mitre_count'].sum()}")
print(f"Highest Threat Level: {df['threat_level'].max()}")

## 7. Threat Intelligence Integration

Demonstrate integration with threat intelligence data.

In [None]:
# Load threat intelligence data
import json
from pathlib import Path

ti_file = Path('../data/threat_intelligence.json')
if ti_file.exists():
    with open(ti_file, 'r') as f:
        threat_intel = json.load(f)
    
    print("Threat Intelligence Summary:")
    print(f"Malicious IPs: {len(threat_intel['malicious_ips'])}")
    print(f"Malicious Domains: {len(threat_intel['malicious_domains'])}")
    print(f"Malicious Hashes: {len(threat_intel['malicious_hashes'])}")
    print(f"Attack Patterns: {len(threat_intel['attack_patterns'])}")
    
    # Display some threat intelligence data
    print("\nSample Malicious IPs:")
    for ip_info in threat_intel['malicious_ips'][:2]:
        print(f"  IP: {ip_info['ip']}")
        print(f"  Threat Types: {ip_info['threat_types']}")
        print(f"  Confidence: {ip_info['confidence']}")
        print(f"  Source: {ip_info['source']}")
        print()
    
    print("Attack Patterns:")
    for pattern in threat_intel['attack_patterns']:
        print(f"  {pattern['name']}: {pattern['description'][:100]}...")
        print(f"    MITRE Techniques: {pattern['mitre_techniques']}")
        print(f"    Severity: {pattern['severity']}")
        print()
else:
    print("Threat intelligence data not found.")

## 8. API Integration Example

Demonstrate how to interact with the Security Agents API.

In [None]:
import httpx
from sec_agents.app.auth import generate_demo_token

# Generate authentication token
token = generate_demo_token()
print(f"Generated API token: {token[:20]}...")

# Example API request (assuming server is running)
api_example = """
# Example API usage with curl:

# 1. Health check
curl -X GET "http://localhost:8000/health"

# 2. List scenarios
curl -X GET "http://localhost:8000/scenarios" \
  -H "Authorization: Bearer {token}"

# 3. Run analysis
curl -X POST "http://localhost:8000/analyze" \
  -H "Authorization: Bearer {token}" \
  -H "Content-Type: application/json" \
  -d '{{
    "analysis_type": "log_triage",
    "data": {{
      "raw_log": "2024-01-15 14:32:15 [INFO] EventID=4688 ProcessName=powershell.exe"
    }}
  }}'

# 4. Run scenario
curl -X POST "http://localhost:8000/scenarios/log_triage/run" \
  -H "Authorization: Bearer {token}"
""".format(token=token)

print(api_example)

## 9. Custom Analysis Pipeline

Create a custom analysis pipeline combining multiple components.

In [None]:
async def custom_security_analysis(raw_data, analysis_type):
    """
    Custom security analysis pipeline that combines multiple components.
    """
    print(f"Starting custom analysis for {analysis_type}...")
    
    # Step 1: Parse the data
    print("Step 1: Parsing data...")
    parser = LogParser() if analysis_type == "log_triage" else NetworkParser()
    parsed_data = await parser.parse(raw_data)
    
    # Step 2: Run AI analysis
    print("Step 2: Running AI analysis...")
    request = AnalysisRequest(
        analysis_type=AnalysisType(analysis_type),
        data=raw_data
    )
    graph = SecurityGraph(provider_type="local")
    ai_result = await graph.analyze(request)
    
    # Step 3: Map to MITRE ATT&CK
    print("Step 3: Mapping to MITRE ATT&CK...")
    mitre_mapper = MITREMapper()
    all_techniques = []
    for finding in ai_result.findings:
        techniques = await mitre_mapper.map_finding(finding)
        all_techniques.extend(techniques)
    
    # Step 4: Generate Sigma rules
    print("Step 4: Generating Sigma rules...")
    sigma_builder = SigmaRuleBuilder()
    sigma_rules = []
    for finding in ai_result.findings:
        rules = await sigma_builder.generate_rules(finding, parsed_data)
        sigma_rules.extend(rules)
    
    # Step 5: Calculate risk score
    print("Step 5: Calculating risk score...")
    risk_analysis = await mitre_mapper.calculate_risk_score(all_techniques)
    
    # Compile final results
    custom_result = {
        "analysis_type": analysis_type,
        "timestamp": datetime.now().isoformat(),
        "parsed_data_summary": {
            "type": parsed_data.get("log_type", "unknown"),
            "indicators_found": len(parsed_data.get("indicators", {})),
            "events_count": len(parsed_data.get("events", []))
        },
        "ai_analysis": {
            "findings_count": len(ai_result.findings),
            "threat_level": ai_result.threat_level,
            "confidence_score": ai_result.confidence_score,
            "recommendations": ai_result.recommendations
        },
        "mitre_analysis": {
            "techniques_mapped": list(set(all_techniques)),
            "risk_score": risk_analysis["risk_score"],
            "risk_level": risk_analysis["risk_level"]
        },
        "sigma_rules": {
            "rules_generated": len(sigma_rules),
            "sample_rule": sigma_rules[0][:200] + "..." if sigma_rules else None
        }
    }
    
    print("Custom analysis complete!")
    return custom_result

# Run custom analysis
custom_result = await custom_security_analysis(sample_log_data, "log_triage")
print("\nCustom Analysis Results:")
print(json.dumps(custom_result, indent=2))

## 10. Performance and Benchmarking

Benchmark the performance of different components.

In [None]:
import time

async def benchmark_analysis():
    """
    Benchmark the performance of security analysis components.
    """
    results = {}
    
    # Benchmark log parsing
    print("Benchmarking log parsing...")
    parser = LogParser()
    start_time = time.time()
    for _ in range(10):
        await parser.parse(sample_log_data)
    parsing_time = (time.time() - start_time) / 10
    results["log_parsing_avg_time"] = parsing_time
    
    # Benchmark MITRE mapping
    print("Benchmarking MITRE mapping...")
    mitre_mapper = MITREMapper()
    test_finding = SecurityFinding(
        title="Test Finding",
        description="PowerShell command execution with encoded payload",
        severity=ThreatLevel.HIGH,
        confidence=0.8
    )
    start_time = time.time()
    for _ in range(10):
        await mitre_mapper.map_finding(test_finding)
    mitre_time = (time.time() - start_time) / 10
    results["mitre_mapping_avg_time"] = mitre_time
    
    # Benchmark Sigma rule generation
    print("Benchmarking Sigma rule generation...")
    sigma_builder = SigmaRuleBuilder()
    start_time = time.time()
    for _ in range(10):
        await sigma_builder.generate_rules(test_finding, {})
    sigma_time = (time.time() - start_time) / 10
    results["sigma_generation_avg_time"] = sigma_time
    
    return results

# Run benchmarks
benchmark_results = await benchmark_analysis()

print("\nPerformance Benchmark Results:")
for component, avg_time in benchmark_results.items():
    print(f"{component}: {avg_time:.4f} seconds")

# Create performance visualization
components = list(benchmark_results.keys())
times = list(benchmark_results.values())

plt.figure(figsize=(10, 6))
bars = plt.bar(components, times, color=['blue', 'green', 'orange'])
plt.title('Component Performance Benchmark')
plt.xlabel('Component')
plt.ylabel('Average Time (seconds)')
plt.xticks(rotation=45)

# Add value labels on bars
for bar, time_val in zip(bars, times):
    plt.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.001,
             f'{time_val:.4f}s', ha='center', va='bottom')

plt.tight_layout()
plt.show()

## Conclusion

This notebook has demonstrated the key capabilities of the Security Agents framework:

1. **AI-Powered Analysis**: Leveraging LangGraph and multiple AI providers for intelligent security analysis
2. **Data Parsing**: Comprehensive parsing capabilities for logs, network traffic, and file data
3. **MITRE ATT&CK Integration**: Automatic mapping of findings to MITRE ATT&CK techniques
4. **Sigma Rule Generation**: Automated creation of detection rules from security findings
5. **Scenario-Based Testing**: Predefined scenarios for comprehensive security testing
6. **Visualization**: Rich visualizations for analysis results and performance metrics
7. **API Integration**: RESTful API for integration with external systems
8. **Extensibility**: Modular architecture allowing for custom analysis pipelines

The framework provides a solid foundation for building sophisticated cybersecurity analysis and response systems, with the flexibility to adapt to various security use cases and environments.

### Next Steps

1. Integrate with real AI providers (OpenAI, Anthropic) by setting up API keys
2. Extend the framework with additional security tools and data sources
3. Implement real-time processing capabilities
4. Add support for additional output formats and integrations
5. Enhance the evaluation framework with more comprehensive test scenarios