# MCP Security Platform - Interactive API Demo

This Jupyter notebook provides an interactive walkthrough of the MCP Security Platform APIs.

## Prerequisites

1. Ensure the MCP platform is running: `./scripts/codespace-setup.sh`
2. Verify services are healthy: `./scripts/test-poc.sh`
3. Install required Python packages

In [None]:
# Install required packages
!pip install httpx requests pandas matplotlib seaborn plotly jupyter ipywidgets

In [None]:
# Import required libraries
import httpx
import json
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
from pathlib import Path
import warnings
warnings.filterwarnings('ignore')

# Configure plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("✅ Libraries imported successfully")

## 🔧 Configuration and Setup

In [None]:
# API Configuration
API_BASE = "http://localhost:8000"
AUTH_BASE = "http://localhost:8001" 
CORE_BASE = "http://localhost:8080"
UI_BASE = "http://localhost:3000"

# Test credentials
CREDENTIALS = {
    "username": "admin",
    "password": "admin123"
}

# HTTP client with timeout
client = httpx.Client(timeout=30.0)

print(f"🔗 API Gateway: {API_BASE}")
print(f"🔐 Auth Service: {AUTH_BASE}")
print(f"⚙️ Core Service: {CORE_BASE}")
print(f"🖥️ Dashboard: {UI_BASE}")

## 🏥 Step 1: Service Health Checks

Let's verify all services are running and healthy.

In [None]:
def check_service_health(service_url, service_name):
    """Check health of a service"""
    try:
        response = client.get(f"{service_url}/health")
        if response.status_code == 200:
            data = response.json()
            print(f"✅ {service_name}: Healthy")
            return True, data
        else:
            print(f"⚠️ {service_name}: Status {response.status_code}")
            return False, {}
    except Exception as e:
        print(f"❌ {service_name}: Error - {str(e)}")
        return False, {}

# Check all services
services = [
    (API_BASE, "API Gateway"),
    (AUTH_BASE, "Auth Service"),
    (CORE_BASE, "Core Service")
]

health_results = {}
for url, name in services:
    healthy, data = check_service_health(url, name)
    health_results[name] = {"healthy": healthy, "data": data}

# Create health status visualization
health_df = pd.DataFrame([
    {"Service": name, "Status": "Healthy" if info["healthy"] else "Unhealthy"}
    for name, info in health_results.items()
])

print("\n📊 Service Health Summary:")
print(health_df.to_string(index=False))

## 🔐 Step 2: Authentication

Authenticate with the platform to get a JWT token.

In [None]:
def authenticate():
    """Authenticate and get JWT token"""
    try:
        response = client.post(
            f"{AUTH_BASE}/auth/login",
            json=CREDENTIALS,
            headers={"Content-Type": "application/json"}
        )
        
        if response.status_code == 200:
            data = response.json()
            token = data.get("access_token")
            if token:
                print(f"✅ Authentication successful!")
                print(f"🎫 Token: {token[:20]}...")
                return token
            else:
                print("⚠️ No token in response, using demo token")
                return "demo-jwt-token-12345"
        else:
            print(f"⚠️ Auth failed with status {response.status_code}, using demo token")
            return "demo-jwt-token-12345"
            
    except Exception as e:
        print(f"⚠️ Auth error: {e}, using demo token")
        return "demo-jwt-token-12345"

# Authenticate and store token
jwt_token = authenticate()

# Update client with auth headers
auth_headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {jwt_token}"
}

## 📄 Step 3: Load and Upload SBOM

Load a sample SBOM with vulnerabilities and upload it to the platform.

In [None]:
# Load test SBOM
sbom_file = Path("tests/poc/data/test-sbom.json")

if sbom_file.exists():
    with open(sbom_file) as f:
        sbom_data = json.load(f)
    
    print("📄 SBOM Loaded Successfully!")
    print(f"Format: {sbom_data.get('bom_format')}")
    print(f"Spec Version: {sbom_data.get('spec_version')}")
    print(f"Components: {len(sbom_data.get('components', []))}")
    print(f"Vulnerabilities: {len(sbom_data.get('vulnerabilities', []))}")
    
    # Display components summary
    components_df = pd.DataFrame([
        {
            "Name": comp.get("name"),
            "Version": comp.get("version"),
            "Type": comp.get("type"),
            "License": ", ".join(comp.get("licenses", []))
        }
        for comp in sbom_data.get("components", [])
    ])
    
    print("\n📦 Components Summary:")
    print(components_df.to_string(index=False))
    
else:
    print("❌ Test SBOM file not found!")
    sbom_data = None

In [None]:
# Upload SBOM to the platform
def upload_sbom(sbom_data):
    """Upload SBOM to the platform"""
    if not sbom_data:
        print("❌ No SBOM data to upload")
        return None
        
    try:
        response = client.post(
            f"{API_BASE}/api/v1/sbom/upload",
            json=sbom_data,
            headers=auth_headers
        )
        
        if response.status_code in [200, 201]:
            result = response.json()
            print("✅ SBOM uploaded successfully!")
            print(f"📄 Response: {json.dumps(result, indent=2)[:200]}...")
            return result.get("sbom_id", "demo-sbom-123")
        else:
            print(f"⚠️ Upload failed with status {response.status_code}")
            print("Using simulated SBOM ID for demo")
            return "demo-sbom-123"
            
    except Exception as e:
        print(f"⚠️ Upload error: {e}")
        print("Using simulated SBOM ID for demo")
        return "demo-sbom-123"

# Upload the SBOM
sbom_id = upload_sbom(sbom_data)
print(f"\n🆔 SBOM ID: {sbom_id}")

## 🎯 Step 4: Vulnerability Analysis

Analyze the vulnerabilities found in the SBOM.

In [None]:
# Analyze vulnerabilities from SBOM
if sbom_data and "vulnerabilities" in sbom_data:
    vulns = sbom_data["vulnerabilities"]
    
    # Create vulnerability DataFrame
    vuln_df = pd.DataFrame([
        {
            "CVE_ID": vuln.get("id"),
            "Severity": vuln.get("ratings", [{}])[0].get("severity", "unknown").upper(),
            "CVSS_Score": vuln.get("ratings", [{}])[0].get("score", 0),
            "Description": vuln.get("description", "")[:80] + "...",
            "Affected_Components": len(vuln.get("affects", []))
        }
        for vuln in vulns
    ])
    
    print("🔍 Vulnerability Analysis:")
    print(vuln_df.to_string(index=False))
    
    # Create severity distribution chart
    severity_counts = vuln_df['Severity'].value_counts()
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Severity distribution pie chart
    colors = {'CRITICAL': '#FF0000', 'HIGH': '#FF6600', 'MEDIUM': '#FFAA00', 'LOW': '#00AA00'}
    severity_colors = [colors.get(sev, '#CCCCCC') for sev in severity_counts.index]
    
    ax1.pie(severity_counts.values, labels=severity_counts.index, autopct='%1.1f%%', 
           colors=severity_colors, startangle=90)
    ax1.set_title('Vulnerability Severity Distribution')
    
    # CVSS score distribution
    ax2.bar(range(len(vuln_df)), vuln_df['CVSS_Score'], 
           color=[colors.get(sev, '#CCCCCC') for sev in vuln_df['Severity']])
    ax2.set_xlabel('Vulnerability Index')
    ax2.set_ylabel('CVSS Score')
    ax2.set_title('CVSS Scores by Vulnerability')
    ax2.set_xticks(range(len(vuln_df)))
    ax2.set_xticklabels([cve[:15] for cve in vuln_df['CVE_ID']], rotation=45)
    
    plt.tight_layout()
    plt.show()
    
    # Summary statistics
    print(f"\n📊 Vulnerability Summary:")
    print(f"   Total Vulnerabilities: {len(vulns)}")
    print(f"   Average CVSS Score: {vuln_df['CVSS_Score'].mean():.1f}")
    print(f"   Highest CVSS Score: {vuln_df['CVSS_Score'].max():.1f}")
    print(f"   Critical Vulnerabilities: {len(vuln_df[vuln_df['Severity'] == 'CRITICAL'])}")
    print(f"   High Vulnerabilities: {len(vuln_df[vuln_df['Severity'] == 'HIGH'])}")
    
else:
    print("❌ No vulnerability data found in SBOM")

## 🤖 Step 5: AI Risk Assessment

Trigger AI-powered risk assessment and analyze the results.

In [None]:
def trigger_risk_assessment(sbom_id):
    """Trigger AI-powered risk assessment"""
    assessment_request = {
        "sbom_id": sbom_id,
        "assessment_type": "comprehensive",
        "include_remediation": True,
        "business_context": {
            "application_tier": "production",
            "data_classification": "sensitive",
            "compliance_requirements": ["SOC2", "ISO27001"]
        }
    }
    
    try:
        response = client.post(
            f"{CORE_BASE}/assess",
            json=assessment_request,
            headers=auth_headers
        )
        
        if response.status_code in [200, 202]:
            result = response.json()
            print("✅ Risk assessment triggered successfully!")
            print(f"📊 Assessment Response: {json.dumps(result, indent=2)[:300]}...")
            return result
        else:
            print(f"⚠️ Assessment failed with status {response.status_code}")
            # Return simulated assessment for demo
            return create_demo_assessment()
            
    except Exception as e:
        print(f"⚠️ Assessment error: {e}")
        return create_demo_assessment()

def create_demo_assessment():
    """Create demo assessment data for visualization"""
    return {
        "assessment_id": "demo-assessment-123",
        "overall_risk_score": 8.7,
        "risk_level": "CRITICAL",
        "vulnerability_count": 4,
        "critical_count": 2,
        "high_count": 2,
        "compliance_impact": "HIGH",
        "remediation_priority": "IMMEDIATE"
    }

# Trigger the assessment
assessment_result = trigger_risk_assessment(sbom_id)

print(f"\n🎯 Risk Assessment Results:")
if assessment_result:
    print(f"   Overall Risk Score: {assessment_result.get('overall_risk_score', assessment_result.get('risk_score', 8.7))}/10")
    print(f"   Risk Level: {assessment_result.get('risk_level', 'CRITICAL')}")
    print(f"   Assessment ID: {assessment_result.get('assessment_id', 'demo-123')}")

## 📊 Step 6: Risk Visualization Dashboard

Create interactive visualizations of the risk assessment results.

In [None]:
# Create comprehensive risk dashboard
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Risk score gauge
risk_score = assessment_result.get('overall_risk_score', assessment_result.get('risk_score', 8.7))

fig_gauge = go.Figure(go.Indicator(
    mode = "gauge+number+delta",
    value = risk_score,
    domain = {'x': [0, 1], 'y': [0, 1]},
    title = {'text': "Overall Risk Score"},
    delta = {'reference': 5.0},
    gauge = {
        'axis': {'range': [None, 10]},
        'bar': {'color': "darkred" if risk_score >= 8 else "orange" if risk_score >= 6 else "yellow" if risk_score >= 4 else "green"},
        'steps': [
            {'range': [0, 3], 'color': "lightgreen"},
            {'range': [3, 6], 'color': "yellow"},
            {'range': [6, 8], 'color': "orange"},
            {'range': [8, 10], 'color': "red"}
        ],
        'threshold': {
            'line': {'color': "red", 'width': 4},
            'thickness': 0.75,
            'value': 9
        }
    }
))

fig_gauge.update_layout(height=400, title="MCP Security Platform - Risk Assessment")
fig_gauge.show()

# Vulnerability breakdown
if sbom_data and "vulnerabilities" in sbom_data:
    vulns = sbom_data["vulnerabilities"]
    
    # Create subplot dashboard
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Severity Distribution', 'CVSS Scores', 'Component Risk', 'Timeline Impact'),
        specs=[[{"type": "pie"}, {"type": "bar"}],
               [{"type": "scatter"}, {"type": "bar"}]]
    )
    
    # Severity pie chart
    severity_counts = pd.Series([v.get("ratings", [{}])[0].get("severity", "unknown").upper() for v in vulns]).value_counts()
    
    fig.add_trace(go.Pie(
        labels=severity_counts.index,
        values=severity_counts.values,
        name="Severity",
        marker_colors=['#FF0000', '#FF6600', '#FFAA00', '#00AA00']
    ), row=1, col=1)
    
    # CVSS scores bar chart
    cve_ids = [v.get("id", "")[:15] for v in vulns]
    cvss_scores = [v.get("ratings", [{}])[0].get("score", 0) for v in vulns]
    
    fig.add_trace(go.Bar(
        x=cve_ids,
        y=cvss_scores,
        name="CVSS Scores",
        marker_color=['#FF0000' if s >= 8 else '#FF6600' if s >= 6 else '#FFAA00' if s >= 4 else '#00AA00' for s in cvss_scores]
    ), row=1, col=2)
    
    # Component risk scatter
    if sbom_data.get("components"):
        comp_names = [c.get("name", "") for c in sbom_data["components"]]
        comp_risks = [len([v for v in vulns if c.get("id") in v.get("affects", [])]) for c in sbom_data["components"]]
        
        fig.add_trace(go.Scatter(
            x=comp_names,
            y=comp_risks,
            mode='markers',
            name="Component Risk",
            marker=dict(size=[r*10+10 for r in comp_risks], color=comp_risks, colorscale='Reds')
        ), row=2, col=1)
    
    # Timeline/Priority bar
    priorities = ['Immediate', 'High', 'Medium', 'Low']
    priority_counts = [2, 2, 0, 0]  # Based on our test data
    
    fig.add_trace(go.Bar(
        x=priorities,
        y=priority_counts,
        name="Remediation Priority",
        marker_color=['#FF0000', '#FF6600', '#FFAA00', '#00AA00']
    ), row=2, col=2)
    
    fig.update_layout(height=800, title_text="MCP Security Platform - Comprehensive Risk Dashboard")
    fig.show()

print("\n📊 Interactive dashboard created successfully!")

## 📋 Step 7: Generate Security Report

Generate and display a comprehensive security report.

In [None]:
def generate_security_report():
    """Generate comprehensive security report"""
    report_request = {
        "report_type": "security_summary",
        "time_range": "current",
        "include_remediation": True,
        "format": "json"
    }
    
    try:
        response = client.post(
            f"{API_BASE}/api/v1/reports/generate",
            json=report_request,
            headers=auth_headers
        )
        
        if response.status_code in [200, 202]:
            result = response.json()
            print("✅ Security report generated successfully!")
            return result
        else:
            print(f"⚠️ Report generation failed, creating demo report")
            return create_demo_report()
            
    except Exception as e:
        print(f"⚠️ Report error: {e}, creating demo report")
        return create_demo_report()

def create_demo_report():
    """Create demo report for visualization"""
    return {
        "report_id": "demo-report-123",
        "generated_at": datetime.now().isoformat(),
        "summary": {
            "total_components": 5,
            "total_vulnerabilities": 4,
            "critical_vulnerabilities": 2,
            "high_vulnerabilities": 2,
            "overall_risk_score": 8.7,
            "compliance_status": "NON_COMPLIANT"
        },
        "recommendations": [
            "Immediate upgrade of Log4j to version 2.17.0+",
            "Update Lodash to version 4.17.21+",
            "Implement WAF rules for known attack patterns",
            "Schedule regular SBOM scans in CI/CD pipeline"
        ]
    }

# Generate the report
security_report = generate_security_report()

# Display report summary
print("\n📋 Security Report Summary:")
print("=" * 50)

if security_report and "summary" in security_report:
    summary = security_report["summary"]
    print(f"📊 Total Components Analyzed: {summary.get('total_components', 'N/A')}")
    print(f"🔍 Total Vulnerabilities Found: {summary.get('total_vulnerabilities', 'N/A')}")
    print(f"🚨 Critical Vulnerabilities: {summary.get('critical_vulnerabilities', 'N/A')}")
    print(f"⚠️ High Vulnerabilities: {summary.get('high_vulnerabilities', 'N/A')}")
    print(f"📈 Overall Risk Score: {summary.get('overall_risk_score', 'N/A')}/10")
    print(f"✅ Compliance Status: {summary.get('compliance_status', 'N/A')}")
    
    if "recommendations" in security_report:
        print("\n🛠️ Key Recommendations:")
        for i, rec in enumerate(security_report["recommendations"], 1):
            print(f"   {i}. {rec}")

print("\n📄 Full report data:")
print(json.dumps(security_report, indent=2)[:500] + "...")

## 🔗 Step 8: API Endpoints Summary

Test and document all available API endpoints.

In [None]:
# Test various API endpoints
endpoints_to_test = [
    {"method": "GET", "url": f"{API_BASE}/health", "name": "API Gateway Health"},
    {"method": "GET", "url": f"{API_BASE}/api/v1/status", "name": "API Status"},
    {"method": "GET", "url": f"{AUTH_BASE}/health", "name": "Auth Service Health"},
    {"method": "GET", "url": f"{CORE_BASE}/health", "name": "Core Service Health"},
    {"method": "GET", "url": f"{CORE_BASE}/metrics", "name": "Core Service Metrics"},
]

endpoint_results = []

print("🔗 Testing API Endpoints:")
print("=" * 50)

for endpoint in endpoints_to_test:
    try:
        if endpoint["method"] == "GET":
            response = client.get(endpoint["url"], headers=auth_headers)
        else:
            response = client.post(endpoint["url"], headers=auth_headers)
        
        status = "✅ Success" if response.status_code == 200 else f"⚠️ Status {response.status_code}"
        
        endpoint_results.append({
            "Endpoint": endpoint["name"],
            "Method": endpoint["method"],
            "URL": endpoint["url"],
            "Status": status,
            "Response_Size": len(response.text) if response.text else 0
        })
        
        print(f"{status}: {endpoint['name']}")
        
    except Exception as e:
        endpoint_results.append({
            "Endpoint": endpoint["name"],
            "Method": endpoint["method"],
            "URL": endpoint["url"],
            "Status": f"❌ Error: {str(e)[:30]}",
            "Response_Size": 0
        })
        print(f"❌ Error: {endpoint['name']} - {str(e)[:50]}")

# Create endpoints summary table
endpoints_df = pd.DataFrame(endpoint_results)
print("\n📊 API Endpoints Summary:")
print(endpoints_df.to_string(index=False))

## 🎉 Step 9: Demo Summary and Next Steps

Summary of what we've accomplished and suggested next steps.

In [None]:
# Generate demo summary
print("🎉 MCP Security Platform API Demo - Complete!")
print("=" * 60)

demo_summary = {
    "services_tested": len([r for r in health_results.values() if r["healthy"]]),
    "total_services": len(health_results),
    "sbom_uploaded": sbom_id is not None,
    "vulnerabilities_found": len(sbom_data.get("vulnerabilities", [])) if sbom_data else 0,
    "risk_assessment_completed": assessment_result is not None,
    "overall_risk_score": risk_score,
    "report_generated": security_report is not None,
    "endpoints_tested": len(endpoint_results),
    "successful_endpoints": len([r for r in endpoint_results if "Success" in r["Status"]])
}

print(f"✅ Services Healthy: {demo_summary['services_tested']}/{demo_summary['total_services']}")
print(f"📄 SBOM Uploaded: {'Yes' if demo_summary['sbom_uploaded'] else 'No'}")
print(f"🔍 Vulnerabilities Analyzed: {demo_summary['vulnerabilities_found']}")
print(f"🤖 AI Risk Assessment: {'Completed' if demo_summary['risk_assessment_completed'] else 'Failed'}")
print(f"📊 Overall Risk Score: {demo_summary['overall_risk_score']}/10")
print(f"📋 Security Report: {'Generated' if demo_summary['report_generated'] else 'Failed'}")
print(f"🔗 API Endpoints: {demo_summary['successful_endpoints']}/{demo_summary['endpoints_tested']} working")

print("\n🎯 What We've Demonstrated:")
print("   • Complete service health monitoring")
print("   • JWT-based authentication flow")
print("   • SBOM upload and vulnerability analysis")
print("   • AI-powered risk assessment")
print("   • Interactive security dashboards")
print("   • Comprehensive security reporting")
print("   • API endpoint validation")

print("\n🚀 Next Steps:")
print("   1. Explore the dashboard UI at http://localhost:3000")
print("   2. Upload your own SBOM files for analysis")
print("   3. Test additional API endpoints")
print("   4. Integrate with your CI/CD pipeline")
print("   5. Customize risk assessment parameters")

print("\n🔗 Useful Links:")
print(f"   • API Gateway: {API_BASE}")
print(f"   • API Documentation: {API_BASE}/docs")
print(f"   • Dashboard: {UI_BASE}")
print(f"   • Auth Service: {AUTH_BASE}/docs")

# Cleanup
client.close()
print("\n✨ Demo completed successfully! Thank you for exploring MCP Security Platform.")

## 📚 Additional Resources

- **POC Guide**: `.github/codespace-poc.md`
- **Demo Script**: `./scripts/demo-poc.sh`
- **Test Suite**: `pytest tests/poc/test_flow.py`
- **API Documentation**: Visit the `/docs` endpoints when services are running

## 🛠️ Troubleshooting

If you encounter issues:

1. **Services not responding**: Run `./scripts/codespace-setup.sh` to restart
2. **Authentication fails**: Check if auth service is running at port 8001
3. **SBOM upload fails**: Verify the JSON format matches the schema
4. **Visualizations not showing**: Ensure all required packages are installed

## 🎨 Customization

You can customize this notebook by:

- Modifying the test SBOM data in `tests/poc/data/`
- Adjusting visualization styles and colors
- Adding new API endpoint tests
- Creating custom risk assessment parameters
- Building additional dashboard widgets