# ViolentUTF API Security Verification - Issue #47

## Executive Summary

This notebook provides comprehensive verification of the **16 CRITICAL** and **24 HIGH** security violations reported in GitHub issue #47 for the ViolentUTF API.

**Assessment Date:** August 6, 2025  
**GitHub Issue:** [#47 - Validate and resolve Critical Security ADR Issues](https://github.com/GSA-TTS/violentutf-api/issues/47)  
**Source Report:** `docs/reports/ADRaudit-02AUG25-Analysis/criticalViolations_security.md`  
**Security Compliance Score:** 44.35% (CRITICAL FAILURE)

### ⚠️ CRITICAL SECURITY WARNING ⚠️

**FISMA/FedRAMP Authorization Status:** ❌ **WOULD NOT ACHIEVE ATO**

The system contains critical security vulnerabilities that would prevent federal authorization to operate.

### Security Violations Distribution

| Category | CRITICAL Count | Related HIGH Count | Risk Impact |
|----------|----------------|-------------------|-------------|
| Multi-Tenant Data Isolation | 2 | 5 | **CATASTROPHIC** - Complete tenant data exposure |
| AI Model Sandboxing | 4 | 3 | **CRITICAL** - Remote code execution via models |
| Secrets Management | 5 | 4 | **CRITICAL** - Credential exposure risk |
| Vulnerability Classification | 4 | 6 | **HIGH** - No security tracking capability |
| Authentication/Authorization | 1 | 4 | **HIGH** - Privilege escalation risks |

## Prerequisites

**IMPORTANT:** This notebook performs security testing. Please ensure:
1. You are running this in a **TEST environment only**
2. The ViolentUTF API is accessible at the configured endpoint
3. You have proper authorization to perform security testing
4. Database backups exist before running tests

### Setup Instructions

```bash
# Start the ViolentUTF API in test mode
cd /path/to/violentutf-api
export ENVIRONMENT=test
./setup_macos.sh  # or setup_linux.sh for Linux
docker-compose up -d
```

## 1. Environment Setup and Security Testing Framework

In [34]:
# Install required packages for security testing
import subprocess
import sys
import os

def install_package(package):
    """Install a package using pip if not already installed."""
    try:
        __import__(package.split('[')[0] if '[' in package else package)
    except ImportError:
        print(f"Installing {package}...")
        subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# Required packages for security testing
packages = [
    "requests",
    "pydantic",
    "python-jose[cryptography]",
    "cryptography",
    "sqlalchemy",
    "asyncpg",
    "pytest",
    "faker",
    "python-dotenv",
    "colorama",
    "tabulate",
    "pandas",
    "matplotlib",
    "seaborn",
    "pyyaml",
    "docker",
    "passlib"
]

for package in packages:
    install_package(package)

print("✅ All required security testing packages installed")

Installing python-jose[cryptography]...
Installing python-dotenv...



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


Installing pyyaml...



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


✅ All required security testing packages installed



[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.1.1[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [35]:
import os
import json
import time
import uuid
import hashlib
import secrets
import warnings
import subprocess
import importlib.util
import ast
import yaml
import base64
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
import asyncio

import requests
from requests.auth import HTTPBasicAuth
from faker import Faker
from colorama import init, Fore, Back, Style
from tabulate import tabulate
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from dotenv import load_dotenv
from jose import jwt, JWTError
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend

# Initialize colorama for cross-platform colored output
init(autoreset=True)

# Load environment variables
load_dotenv()

# Suppress warnings for cleaner output
warnings.filterwarnings('ignore', category=requests.packages.urllib3.exceptions.InsecureRequestWarning)

# Initialize Faker for test data generation
fake = Faker()

# Get project root
PROJECT_ROOT = Path.cwd().parent.parent if 'docs' in str(Path.cwd()) else Path.cwd()
print(f"Project root: {PROJECT_ROOT}")

print("✅ Security testing environment initialized")

Project root: /Users/tamnguyen/Documents/GitHub/violentutf-api
✅ Security testing environment initialized


In [36]:
# Load credentials from .api_credentials
from pathlib import Path

def load_api_credentials():
    """Load credentials from .api_credentials file."""
    creds = {}
    cred_file = Path.cwd().parent.parent / ".api_credentials" if 'docs' in str(Path.cwd()) else Path.cwd() / ".api_credentials"
    
    if cred_file.exists():
        print(f"📁 Loading credentials from {cred_file}")
        with open(cred_file, 'r') as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith('#') and '=' in line:
                    key, value = line.split('=', 1)
                    creds[key] = value
        print(f"   ✅ Loaded {len(creds)} credential values")
    else:
        print(f"⚠️ .api_credentials file not found at {cred_file}")
    
    return creds

# Load credentials
api_creds = load_api_credentials()

@dataclass
class SecurityTestConfig:
    """Configuration for security testing."""
    
    def __init__(self):
        # API Configuration
        self.api_base_url = api_creds.get('API_URL') or os.getenv('VIOLENTUTF_API_URL', 'http://localhost:8000')
        self.api_version = os.getenv('API_VERSION', 'v1')
        
        # Authentication - Load from .api_credentials
        self.admin_username = api_creds.get('ADMIN_USERNAME') or os.getenv('ADMIN_USERNAME', '')
        self.admin_password = api_creds.get('ADMIN_PASSWORD') or os.getenv('ADMIN_PASSWORD', '')
        self.api_key = api_creds.get('API_KEY') or os.getenv('API_KEY', '')
        
        # Create test users for multi-tenant testing
        self.org1_user = 'org1_user'
        self.org1_password = 'org1_pass123'
        self.org2_user = 'org2_user'
        self.org2_password = 'org2_pass123'
        
        # Database credentials
        self.db_host = api_creds.get('DB_HOST', 'localhost')
        self.db_port = api_creds.get('DB_PORT', '5432')
        self.db_name = api_creds.get('DB_NAME', 'violentutf')
        self.db_user = api_creds.get('DB_USER', 'violentutf')
        self.db_password = api_creds.get('DB_PASSWORD', '')
        
        # Service URLs
        self.health_url = api_creds.get('HEALTH_URL') or f"{self.api_base_url}/api/{self.api_version}/health"
        self.docs_url = api_creds.get('DOCS_URL') or f"{self.api_base_url}/docs"
        
        # Project paths
        self.project_root = PROJECT_ROOT
        self.app_dir = self.project_root / "app"
        self.requirements_file = self.project_root / "requirements.txt"
        
        # Security Test Parameters
        self.request_timeout = 30
        self.max_retries = 3
        self.verify_ssl = False  # Disabled for testing
        self.enable_destructive_tests = False  # Set to True only in isolated test env
    
    @property
    def api_url(self) -> str:
        """Full API URL."""
        return f"{self.api_base_url}/api/{self.api_version}"

# Initialize configuration
config = SecurityTestConfig()

print(f"\n🔧 Security Test Configuration loaded")
print(f"   API URL: {config.api_url}")
print(f"   Username: {config.admin_username}")
print(f"   Password: {'*' * len(config.admin_password) if config.admin_password else '(not set)'}")
print(f"   API Key: {'*' * 20 + '...' if config.api_key else 'Not set'}")
print(f"   Database: {config.db_user}@{config.db_host}:{config.db_port}/{config.db_name}")
print(f"   Destructive Tests: {'ENABLED' if config.enable_destructive_tests else 'DISABLED'}")

📁 Loading credentials from /Users/tamnguyen/Documents/GitHub/violentutf-api/.api_credentials
   ✅ Loaded 12 credential values

🔧 Security Test Configuration loaded
   API URL: http://localhost:8000/api/v1
   Username: admin
   Password: ****************
   API Key: ********************...
   Database: violentutf@localhost:5432/violentutf
   Destructive Tests: DISABLED


In [37]:
# Security Violation Tracking Framework
class SecurityViolationType(Enum):
    """Types of security violations."""
    MULTI_TENANT = "Multi-Tenant Isolation"
    AI_SANDBOXING = "AI Model Sandboxing"
    SECRETS_MGMT = "Secrets Management"
    VULN_CLASSIFICATION = "Vulnerability Classification"
    AUTH_AUTHZ = "Authentication/Authorization"

class SecuritySeverity(Enum):
    """Security violation severity levels."""
    CRITICAL = "CRITICAL"
    HIGH = "HIGH"
    MEDIUM = "MEDIUM"
    LOW = "LOW"
    INFO = "INFO"

@dataclass
class SecurityViolation:
    """Represents a security violation."""
    
    violation_id: str
    violation_type: SecurityViolationType
    severity: SecuritySeverity
    title: str
    description: str
    cwe_id: Optional[str]
    owasp_category: Optional[str]
    file_location: str
    vulnerable_code: str
    secure_code: str
    exploitation_impact: str
    test_result: str
    evidence: Dict[str, Any]
    remediation: str
    verified: bool = False
    exploitable: bool = False
    timestamp: datetime = field(default_factory=datetime.now)
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for reporting."""
        return {
            "violation_id": self.violation_id,
            "type": self.violation_type.value,
            "severity": self.severity.value,
            "title": self.title,
            "description": self.description,
            "cwe_id": self.cwe_id,
            "owasp_category": self.owasp_category,
            "file_location": self.file_location,
            "exploitation_impact": self.exploitation_impact,
            "test_result": self.test_result,
            "evidence": self.evidence,
            "remediation": self.remediation,
            "verified": self.verified,
            "exploitable": self.exploitable,
            "timestamp": self.timestamp.isoformat()
        }

class SecurityTestResults:
    """Collector for security test results."""
    
    def __init__(self):
        self.violations: List[SecurityViolation] = []
        self.test_start_time = datetime.now()
        self.exploitation_attempts = []
        
    def add_violation(self, violation: SecurityViolation):
        """Add a security violation."""
        self.violations.append(violation)
        
    def add_exploitation_attempt(self, violation_id: str, success: bool, details: str):
        """Record an exploitation attempt."""
        self.exploitation_attempts.append({
            "violation_id": violation_id,
            "success": success,
            "details": details,
            "timestamp": datetime.now().isoformat()
        })
        
    def get_summary(self) -> Dict[str, Any]:
        """Get security assessment summary."""
        severity_counts = {}
        for severity in SecuritySeverity:
            severity_counts[severity.value] = sum(
                1 for v in self.violations if v.severity == severity
            )
        
        type_counts = {}
        for vtype in SecurityViolationType:
            type_counts[vtype.value] = sum(
                1 for v in self.violations if v.violation_type == vtype
            )
        
        return {
            "total_violations": len(self.violations),
            "verified_violations": sum(1 for v in self.violations if v.verified),
            "exploitable_violations": sum(1 for v in self.violations if v.exploitable),
            "severity_distribution": severity_counts,
            "type_distribution": type_counts,
            "exploitation_attempts": len(self.exploitation_attempts),
            "successful_exploits": sum(1 for e in self.exploitation_attempts if e["success"]),
            "test_duration": str(datetime.now() - self.test_start_time),
            "compliance_standards_violated": self._get_compliance_violations()
        }
    
    def _get_compliance_violations(self) -> Dict[str, int]:
        """Get compliance standard violations."""
        return {
            "NIST_800-53": sum(1 for v in self.violations if v.verified and v.severity == SecuritySeverity.CRITICAL),
            "ISO_27001": sum(1 for v in self.violations if v.verified and v.severity in [SecuritySeverity.CRITICAL, SecuritySeverity.HIGH]),
            "OWASP_Top_10": len(set(v.owasp_category for v in self.violations if v.owasp_category)),
            "CIS_Controls": sum(1 for v in self.violations if v.verified)
        }
    
    def display_summary(self):
        """Display formatted security summary."""
        summary = self.get_summary()
        
        print("\n" + "="*80)
        print(f"{Fore.RED}SECURITY TEST RESULTS SUMMARY")
        print("="*80)
        
        print(f"\n🔒 Security Assessment:")
        print(f"   Total Violations: {summary['total_violations']}")
        print(f"   Verified: {summary['verified_violations']}")
        print(f"   Exploitable: {summary['exploitable_violations']}")
        print(f"   Test Duration: {summary['test_duration']}")
        
        print(f"\n🔴 Severity Distribution:")
        for severity, count in summary['severity_distribution'].items():
            if count > 0:
                color = Fore.RED if severity == "CRITICAL" else \
                        Fore.YELLOW if severity == "HIGH" else \
                        Fore.BLUE if severity == "MEDIUM" else \
                        Fore.GREEN
                print(f"   {color}{severity}: {count}")
        
        print(f"\n🎯 Exploitation Results:")
        print(f"   Exploitation Attempts: {summary['exploitation_attempts']}")
        print(f"   Successful Exploits: {summary['successful_exploits']}")
        
        print(f"\n📋 Compliance Violations:")
        for standard, count in summary['compliance_standards_violated'].items():
            if count > 0:
                print(f"   {standard}: {count} controls violated")

# Initialize results collector
security_results = SecurityTestResults()
print("✅ Security testing framework initialized")

✅ Security testing framework initialized


In [38]:
# Security Testing Helper Functions
class SecurityAPIClient:
    """API client for security testing."""
    
    def __init__(self, config: SecurityTestConfig):
        self.config = config
        self.session = requests.Session()
        self.session.verify = config.verify_ssl
        self.tokens = {}  # Store tokens for different users/orgs
        
        # Set API key header if available
        if config.api_key:
            self.session.headers.update({
                'X-API-Key': config.api_key
            })
    
    def authenticate(self, username: str = None, password: str = None) -> Tuple[bool, Optional[str]]:
        """Authenticate and obtain JWT token."""
        username = username or self.config.admin_username
        password = password or self.config.admin_password
        
        try:
            auth_endpoints = [
                f"{self.config.api_url}/auth/login",
                f"{self.config.api_url}/auth/token",
                f"{self.config.api_base_url}/token",
                f"{self.config.api_base_url}/api/auth/login"
            ]
            
            for auth_url in auth_endpoints:
                try:
                    response = self.session.post(
                        auth_url,
                        json={"username": username, "password": password},
                        timeout=self.config.request_timeout
                    )
                    
                    if response.status_code == 200:
                        data = response.json()
                        token = data.get('access_token') or data.get('token')
                        if token:
                            self.tokens[username] = token
                            return True, token
                except:
                    continue
                    
        except Exception as e:
            print(f"❌ Authentication failed: {e}")
        
        return False, None
    
    def decode_jwt(self, token: str) -> Optional[Dict[str, Any]]:
        """Decode JWT token without verification (for testing)."""
        try:
            # Decode without verification for inspection
            return jwt.get_unverified_claims(token)
        except JWTError as e:
            print(f"JWT decode error: {e}")
            return None
    
    def make_request(self, method: str, endpoint: str, token: str = None, **kwargs) -> requests.Response:
        """Make an API request with optional JWT token."""
        url = f"{self.config.api_url}{endpoint}"
        
        headers = kwargs.pop('headers', {})
        if token:
            headers['Authorization'] = f'Bearer {token}'
        
        for attempt in range(self.config.max_retries):
            try:
                response = self.session.request(
                    method=method,
                    url=url,
                    headers=headers,
                    timeout=self.config.request_timeout,
                    **kwargs
                )
                return response
            except requests.exceptions.RequestException as e:
                if attempt == self.config.max_retries - 1:
                    raise
                time.sleep(2 ** attempt)
    
    def test_cross_tenant_access(self, user1_token: str, user2_resource_id: str) -> bool:
        """Test if user1 can access user2's resources."""
        try:
            response = self.make_request(
                "GET",
                f"/users/{user2_resource_id}",
                token=user1_token
            )
            return response.status_code == 200
        except:
            return False

def check_file_vulnerability(file_path: Path, pattern: str) -> Tuple[bool, Optional[str]]:
    """Check if a file contains a security vulnerability pattern."""
    if not file_path.exists():
        return False, "File not found"
    
    try:
        with open(file_path, 'r') as f:
            content = f.read()
            if pattern in content:
                # Find the line number
                lines = content.split('\n')
                for i, line in enumerate(lines, 1):
                    if pattern in line:
                        return True, f"Line {i}: {line.strip()}"
    except Exception as e:
        return False, str(e)
    
    return False, None

def check_docker_sandboxing() -> bool:
    """Check if Docker is available for sandboxing."""
    try:
        import docker
        client = docker.from_env()
        client.ping()
        return True
    except:
        return False

# Initialize API client
api_client = SecurityAPIClient(config)
admin_token = None
print("✅ Security testing helpers initialized")

✅ Security testing helpers initialized


## 2. Test API Connection and Security Headers

Test the API connection and check for security headers.

In [39]:
# Test API connection and authentication
def test_api_security_connection():
    """Test if the API is accessible and check security headers."""
    global admin_token, api_available
    
    try:
        # Try different health check endpoints
        health_endpoints = [
            f"{config.api_base_url}/health",
            f"{config.api_base_url}/api/health",
            f"{config.api_url}/health",
            f"{config.api_base_url}/api/v1/health",
            f"{config.api_base_url}/ping"
        ]
        
        headers = {}
        if config.api_key:
            headers['X-API-Key'] = config.api_key
        
        for health_url in health_endpoints:
            try:
                response = requests.get(
                    health_url,
                    timeout=5,
                    verify=False,
                    headers=headers
                )
                
                if response.status_code == 200:
                    print(f"✅ API is accessible at {config.api_base_url}")
                    print(f"   Health check endpoint: {health_url}")
                    try:
                        print(f"   Response: {response.json()}")
                    except:
                        print(f"   Response: {response.text[:100]}")
                    
                    # Check security headers
                    print(f"\n🔒 Security Headers Check:")
                    security_headers = [
                        ('X-Content-Type-Options', 'nosniff'),
                        ('X-Frame-Options', 'DENY'),
                        ('X-XSS-Protection', '1; mode=block'),
                        ('Strict-Transport-Security', None),
                        ('Content-Security-Policy', None)
                    ]
                    
                    missing_headers = []
                    for header, expected in security_headers:
                        value = response.headers.get(header)
                        if value:
                            print(f"   ✓ {header}: {value}")
                        else:
                            print(f"   {Fore.YELLOW}✗ {header}: Missing")
                            missing_headers.append(header)
                    
                    if missing_headers:
                        print(f"\n   {Fore.YELLOW}⚠️ Missing {len(missing_headers)} security headers")
                    
                    return True
            except:
                continue
        
        print(f"⚠️ API health check failed at {config.api_base_url}")
        return False
            
    except requests.exceptions.ConnectionError:
        print(f"❌ Cannot connect to API at {config.api_base_url}")
        print("   Please ensure the ViolentUTF API is running")
        return False
    except Exception as e:
        print(f"❌ Unexpected error: {e}")
        return False

# Test connection
api_available = test_api_security_connection()

if api_available:
    print("\n🔐 Testing authentication...")
    
    # If API key is set, we can use that for authentication
    if config.api_key:
        print("   Using API key authentication")
        admin_token = None  # We'll use API key in headers instead
        auth_success = True
        
        # Test if API key works by trying to access a protected endpoint
        try:
            test_response = requests.get(
                f"{config.api_url}/users/me",
                headers={'X-API-Key': config.api_key},
                verify=False,
                timeout=5
            )
            if test_response.status_code in [200, 401, 403]:
                print(f"   ✅ API key authentication configured")
        except:
            pass
    else:
        # Try JWT authentication
        auth_success, admin_token = api_client.authenticate()
        
        if auth_success and admin_token:
            print(f"   ✅ JWT authentication successful")
            
            # Decode and inspect JWT
            claims = api_client.decode_jwt(admin_token)
            if claims:
                print(f"\n   JWT Claims:")
                for key, value in claims.items():
                    if key not in ['exp', 'iat', 'nbf']:
                        print(f"      {key}: {value}")
                
                # Check for organization_id (Critical Violation #1)
                if 'organization_id' not in claims:
                    print(f"\n   {Fore.RED}❌ CRITICAL: JWT missing organization_id claim!")
                    print(f"      This is Security Violation #1")
        else:
            print(f"   ❌ JWT authentication failed")
            print(f"   Proceeding with API key authentication only")
            
    if not auth_success and not config.api_key:
        print(f"   ⚠️ No authentication available - tests will be limited")
else:
    print("\n⚠️ API not accessible - will perform static code analysis only")

⚠️ API health check failed at http://localhost:8000

⚠️ API not accessible - will perform static code analysis only


## 3. Multi-Tenant Data Isolation Tests (Violations #1-2)

Testing for critical multi-tenant isolation vulnerabilities that could allow complete cross-tenant data access.

In [40]:
def test_jwt_organization_claim():
    """Test for missing organization_id in JWT (CRITICAL Violation #1)."""
    
    print(f"\n{Fore.YELLOW}Testing JWT Organization Claims (Violation #1)...")
    print("="*60)
    
    # Check authentication middleware file
    auth_middleware = config.app_dir / "middleware" / "authentication.py"
    
    print(f"📁 Checking {auth_middleware}...")
    
    vulnerable_code = False
    if auth_middleware.exists():
        with open(auth_middleware, 'r') as f:
            content = f.read()
            
        # Check for organization_id extraction
        if 'organization_id' not in content:
            vulnerable_code = True
            print(f"   {Fore.RED}❌ No organization_id extraction found in JWT processing")
            
            # Find the vulnerable code
            if 'payload.get("sub")' in content:
                print(f"\n   Vulnerable Code Found:")
                print(f"   {Fore.RED}user_id = payload.get('sub')")
                print(f"   {Fore.RED}roles = payload.get('roles', [])")
                print(f"   {Fore.RED}# MISSING: organization_id = payload.get('organization_id')")
        else:
            print(f"   {Fore.GREEN}✓ organization_id mentioned in code")
    else:
        print(f"   {Fore.RED}❌ authentication.py not found")
        vulnerable_code = True
    
    # Test with live API if available
    jwt_missing_org = False
    claims = {}
    if api_available and admin_token:
        print(f"\n🔍 Testing JWT token structure...")
        claims = api_client.decode_jwt(admin_token)
        
        if claims:
            print(f"   JWT Claims: {list(claims.keys())}")
            
            if 'organization_id' not in claims:
                jwt_missing_org = True
                print(f"   {Fore.RED}❌ CRITICAL: organization_id missing from JWT!")
                print(f"   {Fore.RED}   This allows cross-tenant data access!")
            else:
                print(f"   {Fore.GREEN}✓ organization_id present: {claims['organization_id']}")
    
    violation = SecurityViolation(
        violation_id="SEC-001",
        violation_type=SecurityViolationType.MULTI_TENANT,
        severity=SecuritySeverity.CRITICAL,
        title="JWT Missing Organization Claims",
        description="JWT payload missing 'organization_id' claim required for multi-tenant isolation. Only 'sub' and 'roles' claims are extracted.",
        cwe_id="CWE-863",
        owasp_category="A01:2021 - Broken Access Control",
        file_location="app/middleware/authentication.py:114",
        vulnerable_code="user_id = payload.get('sub')\nroles = payload.get('roles', [])\n# Missing: organization_id extraction",
        secure_code="user_id = payload.get('sub')\nroles = payload.get('roles', [])\norganization_id = payload.get('organization_id')\nif not organization_id:\n    raise AuthenticationError('JWT missing required organization_id claim')",
        exploitation_impact="ANY USER CAN ACCESS ANY ORGANIZATION'S DATA",
        test_result="VULNERABLE" if (vulnerable_code or jwt_missing_org) else "SECURE",
        evidence={
            "auth_middleware_exists": auth_middleware.exists(),
            "organization_id_in_code": not vulnerable_code,
            "jwt_has_org_claim": not jwt_missing_org,
            "jwt_claims": list(claims.keys()) if claims else []
        },
        remediation="Add organization_id extraction and validation in JWT processing. Ensure all JWTs include organization_id claim.",
        verified=vulnerable_code or jwt_missing_org,
        exploitable=jwt_missing_org
    )
    
    security_results.add_violation(violation)
    
    if vulnerable_code or jwt_missing_org:
        print(f"\n{Fore.RED}❌ CRITICAL SECURITY VIOLATION CONFIRMED")
        print(f"   Impact: Complete multi-tenant isolation failure")
        print(f"   Risk: Cross-tenant data access possible")
    else:
        print(f"\n{Fore.GREEN}✅ JWT appears to have organization claims")

# Run the test
test_jwt_organization_claim()


Testing JWT Organization Claims (Violation #1)...
📁 Checking /Users/tamnguyen/Documents/GitHub/violentutf-api/app/middleware/authentication.py...
   ✓ organization_id mentioned in code

✅ JWT appears to have organization claims


In [41]:
def test_repository_abac_enforcement():
    """Test for missing ABAC enforcement in repositories (CRITICAL Violation #2)."""
    
    print(f"\n{Fore.YELLOW}Testing Repository ABAC Enforcement (Violation #2)...")
    print("="*60)
    
    base_repo = config.app_dir / "repositories" / "base.py"
    
    print(f"📁 Checking {base_repo}...")
    
    missing_abac = False
    vulnerable_methods = []
    
    if base_repo.exists():
        with open(base_repo, 'r') as f:
            content = f.read()
            
        # Check for organization_id filtering
        if 'def get_by_id' in content:
            method_start = content.find('def get_by_id')
            method_end = content.find('\n\n', method_start)
            method_content = content[method_start:method_end] if method_end > 0 else content[method_start:method_start+500]
            
            if 'organization_id' not in method_content:
                vulnerable_methods.append('get_by_id')
                missing_abac = True
                print(f"   {Fore.RED}❌ get_by_id() missing organization_id filter")
        
        # Check list method
        if 'def list' in content:
            if 'organization_id' not in content[content.find('def list'):content.find('def list') + 500]:
                vulnerable_methods.append('list')
                missing_abac = True
                print(f"   {Fore.RED}❌ list() missing organization_id filter")
        
        if vulnerable_methods:
            print(f"\n   Vulnerable Methods: {', '.join(vulnerable_methods)}")
            print(f"   {Fore.RED}These methods allow cross-tenant data access!")
    else:
        print(f"   {Fore.RED}❌ base.py not found")
        missing_abac = True
    
    violation = SecurityViolation(
        violation_id="SEC-002",
        violation_type=SecurityViolationType.MULTI_TENANT,
        severity=SecuritySeverity.CRITICAL,
        title="Repository Layer Missing ABAC Enforcement",
        description="Base repository missing ABAC enforcement - queries do not filter by organization_id.",
        cwe_id="CWE-284",
        owasp_category="A01:2021 - Broken Access Control",
        file_location="app/repositories/base.py:97",
        vulnerable_code="async def get_by_id(self, entity_id: UUID) -> Optional[T]:\n    query = select(self.model).where(self.model.id == entity_id)\n    # CRITICAL: No organization_id filtering",
        secure_code="async def get_by_id(self, entity_id: UUID, organization_id: UUID) -> Optional[T]:\n    query = select(self.model).where(\n        and_(\n            self.model.id == entity_id,\n            self.model.organization_id == organization_id\n        )\n    )",
        exploitation_impact="ANY USER CAN ACCESS ANY ORGANIZATION'S DATA",
        test_result="VULNERABLE" if missing_abac else "SECURE",
        evidence={
            "base_repo_exists": base_repo.exists(),
            "vulnerable_methods": vulnerable_methods,
        },
        remediation="Implement mandatory organization_id filtering in all repository methods.",
        verified=missing_abac,
        exploitable=missing_abac
    )
    
    security_results.add_violation(violation)
    
    if missing_abac:
        print(f"\n{Fore.RED}❌ CRITICAL SECURITY VIOLATION CONFIRMED")
        print(f"   Impact: No tenant isolation at data layer")
        print(f"   Risk: Complete cross-tenant data exposure")

# Run the test
test_repository_abac_enforcement()


Testing Repository ABAC Enforcement (Violation #2)...
📁 Checking /Users/tamnguyen/Documents/GitHub/violentutf-api/app/repositories/base.py...


## 4. AI Model Sandboxing Tests (Violations #3-6)

Testing for missing AI model sandboxing that could allow remote code execution.

In [42]:
def test_ai_model_sandboxing():
    """Test for AI model sandboxing violations (CRITICAL Violations #3-6)."""
    
    print(f"\n{Fore.YELLOW}Testing AI Model Sandboxing (Violations #3-6)...")
    print("="*60)
    
    violations_found = []
    
    # Violation #3: Missing Generator Model
    generator_model = config.app_dir / "models" / "generator.py"
    print(f"\n[SEC-003] Checking Generator model...")
    print(f"   generator.py exists: {generator_model.exists()}")
    
    if not generator_model.exists():
        violations_found.append("SEC-003")
        security_results.add_violation(SecurityViolation(
            violation_id="SEC-003",
            violation_type=SecurityViolationType.AI_SANDBOXING,
            severity=SecuritySeverity.CRITICAL,
            title="Missing Generator Database Model",
            description="Missing Generator database model required for tracking AI model plugins.",
            cwe_id="CWE-693",
            owasp_category="A04:2021 - Insecure Design",
            file_location="app/models/:1",
            vulnerable_code="# Missing Generator model",
            secure_code="class Generator(BaseModel):\n    name: str\n    model_type: str\n    security_profile: str",
            exploitation_impact="Cannot track or audit AI model usage",
            test_result="VULNERABLE",
            evidence={"generator_model_exists": False},
            remediation="Create Generator model for AI model tracking",
            verified=True,
            exploitable=False
        ))
    
    # Violation #4: No Container Sandboxing
    print(f"\n[SEC-004] Checking Container Sandboxing...")
    docker_available = check_docker_sandboxing()
    main_file = config.app_dir / "main.py"
    has_sandboxing = False
    
    if main_file.exists():
        with open(main_file, 'r') as f:
            content = f.read()
        has_sandboxing = 'docker' in content.lower() or 'container' in content.lower()
    
    print(f"   Docker available: {docker_available}")
    print(f"   Sandboxing code found: {has_sandboxing}")
    
    if not has_sandboxing:
        violations_found.append("SEC-004")
        security_results.add_violation(SecurityViolation(
            violation_id="SEC-004",
            violation_type=SecurityViolationType.AI_SANDBOXING,
            severity=SecuritySeverity.CRITICAL,
            title="No Container Sandboxing Infrastructure",
            description="No container-based sandboxing for untrusted model execution.",
            cwe_id="CWE-94",
            owasp_category="A03:2021 - Injection",
            file_location="app/main.py:238",
            vulnerable_code="# No sandboxing implementation",
            secure_code="docker_client = docker.from_env()\nsecurity_profile = {'read_only': True, 'network_mode': 'none'}",
            exploitation_impact="UNTRUSTED AI MODELS CAN EXECUTE ARBITRARY CODE",
            test_result="VULNERABLE",
            evidence={"docker_available": docker_available, "sandboxing_in_code": has_sandboxing},
            remediation="Implement Docker-based sandboxing for model execution",
            verified=True,
            exploitable=True
        ))
    
    # Violation #5: Missing Templating Service
    templating_service = config.app_dir / "services" / "templating_service.py"
    print(f"\n[SEC-005] Checking Templating Service...")
    print(f"   templating_service.py exists: {templating_service.exists()}")
    
    if not templating_service.exists():
        violations_found.append("SEC-005")
        security_results.add_violation(SecurityViolation(
            violation_id="SEC-005",
            violation_type=SecurityViolationType.AI_SANDBOXING,
            severity=SecuritySeverity.CRITICAL,
            title="Missing Templating Service Sandboxing",
            description="No sandboxed Jinja2 implementation for attack payload generation.",
            cwe_id="CWE-94",
            owasp_category="A03:2021 - Injection",
            file_location="app/services/:None",
            vulnerable_code="# Missing templating service",
            secure_code="from jinja2.sandbox import SandboxedEnvironment",
            exploitation_impact="Template injection attacks possible",
            test_result="VULNERABLE",
            evidence={"templating_service_exists": False},
            remediation="Implement sandboxed Jinja2 templating",
            verified=True,
            exploitable=True
        ))
    
    # Violation #6: Evidence Storage Model Missing
    evidence_model = config.app_dir / "models" / "evidence.py"
    print(f"\n[SEC-006] Checking Evidence Model...")
    print(f"   evidence.py exists: {evidence_model.exists()}")
    
    if not evidence_model.exists():
        violations_found.append("SEC-006")
        security_results.add_violation(SecurityViolation(
            violation_id="SEC-006",
            violation_type=SecurityViolationType.AI_SANDBOXING,
            severity=SecuritySeverity.CRITICAL,
            title="Evidence Storage Model Missing",
            description="Missing evidence document storage for prompt/response pairs.",
            cwe_id="CWE-778",
            owasp_category="A09:2021 - Security Logging Failures",
            file_location="app/models/session.py:1",
            vulnerable_code="# Missing evidence model",
            secure_code="class Evidence(BaseModel):\n    prompt: str\n    response: str\n    score: float",
            exploitation_impact="Cannot track AI model interactions",
            test_result="VULNERABLE",
            evidence={"evidence_model_exists": False},
            remediation="Create Evidence model for session tracking",
            verified=True,
            exploitable=False
        ))
    
    if violations_found:
        print(f"\n{Fore.RED}❌ {len(violations_found)} AI SANDBOXING VIOLATIONS CONFIRMED")
        print(f"   Violations: {', '.join(violations_found)}")
        print(f"   Risk: Remote code execution via malicious models")

# Run the test
test_ai_model_sandboxing()


Testing AI Model Sandboxing (Violations #3-6)...

[SEC-003] Checking Generator model...
   generator.py exists: False

[SEC-004] Checking Container Sandboxing...
   Docker available: True
   Sandboxing code found: False

[SEC-005] Checking Templating Service...
   templating_service.py exists: False

[SEC-006] Checking Evidence Model...
   evidence.py exists: False

❌ 4 AI SANDBOXING VIOLATIONS CONFIRMED
   Violations: SEC-003, SEC-004, SEC-005, SEC-006
   Risk: Remote code execution via malicious models


## 5. Secrets Management Tests (Violations #7-11)

Testing for insecure secrets storage that could expose all API credentials.

In [43]:
def test_secrets_management():
    """Test for secrets management violations (CRITICAL Violations #7-11)."""
    
    print(f"\n{Fore.YELLOW}Testing Secrets Management (Violations #7-11)...")
    print("="*60)
    
    violations_found = []
    
    # Violation #7: API Keys in Database
    api_key_model = config.app_dir / "models" / "api_key.py"
    print(f"\n[SEC-007] Checking API Key Storage...")
    
    stores_in_db = False
    if api_key_model.exists():
        with open(api_key_model, 'r') as f:
            content = f.read()
        
        if 'key_hash' in content or 'SHA256' in content or 'sha256' in content:
            stores_in_db = True
            print(f"   {Fore.RED}❌ API keys stored as hashes in database!")
            violations_found.append("SEC-007")
            
            security_results.add_violation(SecurityViolation(
                violation_id="SEC-007",
                violation_type=SecurityViolationType.SECRETS_MGMT,
                severity=SecuritySeverity.CRITICAL,
                title="API Keys Stored in Application Database",
                description="API keys stored as SHA256 hashes in main database.",
                cwe_id="CWE-256",
                owasp_category="A02:2021 - Cryptographic Failures",
                file_location="app/models/api_key.py:27",
                vulnerable_code="key_hash: Mapped[str] = mapped_column(String(64))",
                secure_code="secret_reference: Mapped[str] = mapped_column(String(255))",
                exploitation_impact="DATABASE BREACH EXPOSES ALL API CREDENTIALS",
                test_result="VULNERABLE",
                evidence={"stores_hashes_in_db": True},
                remediation="Use dedicated secrets manager for API keys",
                verified=True,
                exploitable=True
            ))
    
    # Violation #8: API Key Generation
    api_key_service = config.app_dir / "services" / "api_key_service.py"
    print(f"\n[SEC-008] Checking API Key Service...")
    
    if api_key_service.exists():
        with open(api_key_service, 'r') as f:
            content = f.read()
        
        if 'hashlib' in content or 'SHA256' in content:
            print(f"   {Fore.RED}❌ API key service generates SHA256 hashes")
            violations_found.append("SEC-008")
            
            security_results.add_violation(SecurityViolation(
                violation_id="SEC-008",
                violation_type=SecurityViolationType.SECRETS_MGMT,
                severity=SecuritySeverity.CRITICAL,
                title="API Key Generation Stores Hashes",
                description="API key generation creates SHA256 hash for database storage.",
                cwe_id="CWE-256",
                owasp_category="A02:2021 - Cryptographic Failures",
                file_location="app/services/api_key_service.py:288",
                vulnerable_code="key_hash = hashlib.sha256(api_key.encode()).hexdigest()",
                secure_code="secret_ref = await secrets_manager.store_secret(api_key)",
                exploitation_impact="Insecure credential storage pattern",
                test_result="VULNERABLE",
                evidence={"generates_hashes": True},
                remediation="Store API keys in secrets manager",
                verified=True,
                exploitable=True
            ))
    
    # Violations #9-11: Missing Orchestration State Models
    orchestration_models = [
        ("orchestration_state.py", "SEC-009"),
        ("orchestration_secrets.py", "SEC-010"),
        ("orchestration_config.py", "SEC-011")
    ]
    
    print(f"\n[SEC-009-011] Checking Orchestration Models...")
    for model_file, violation_id in orchestration_models:
        model_path = config.app_dir / "models" / model_file
        if not model_path.exists():
            print(f"   {Fore.RED}❌ {model_file} missing")
            violations_found.append(violation_id)
            
            security_results.add_violation(SecurityViolation(
                violation_id=violation_id,
                violation_type=SecurityViolationType.SECRETS_MGMT,
                severity=SecuritySeverity.CRITICAL,
                title=f"Missing {model_file.replace('.py', '').replace('_', ' ').title()} Model",
                description=f"No {model_file} for storing orchestration secrets and state.",
                cwe_id="CWE-522",
                owasp_category="A04:2021 - Insecure Design",
                file_location=f"app/models/{model_file}:Missing",
                vulnerable_code=f"# Missing {model_file}",
                secure_code=f"class {model_file.replace('.py', '').title()}(BaseModel): ...",
                exploitation_impact="Cannot securely manage orchestration state",
                test_result="VULNERABLE",
                evidence={"model_exists": False},
                remediation=f"Create {model_file} for orchestration state management",
                verified=True,
                exploitable=False
            ))
    
    if violations_found:
        print(f"\n{Fore.RED}❌ {len(violations_found)} SECRETS MANAGEMENT VIOLATIONS CONFIRMED")
        print(f"   Violations: {', '.join(violations_found)}")
        print(f"   Risk: Credential exposure in database breach")

# Run the test
test_secrets_management()


Testing Secrets Management (Violations #7-11)...

[SEC-007] Checking API Key Storage...
   ❌ API keys stored as hashes in database!

[SEC-008] Checking API Key Service...
   ❌ API key service generates SHA256 hashes

[SEC-009-011] Checking Orchestration Models...
   ❌ orchestration_state.py missing
   ❌ orchestration_secrets.py missing
   ❌ orchestration_config.py missing

❌ 5 SECRETS MANAGEMENT VIOLATIONS CONFIRMED
   Violations: SEC-007, SEC-008, SEC-009, SEC-010, SEC-011
   Risk: Credential exposure in database breach


## 6. Vulnerability Classification Tests (Violations #12-15)

Testing for missing vulnerability tracking capabilities.

In [44]:
def test_vulnerability_classification():
    """Test for vulnerability classification violations (CRITICAL Violations #12-15)."""
    
    print(f"\n{Fore.YELLOW}Testing Vulnerability Classification (Violations #12-15)...")
    print("="*60)
    
    violations_found = []
    
    # Violation #12: Missing Vulnerability Tables
    migrations_dir = config.project_root / "alembic" / "versions"
    print(f"\n[SEC-012] Checking Database Migrations...")
    
    has_vuln_tables = False
    if migrations_dir.exists():
        for mig_file in migrations_dir.glob("*.py"):
            with open(mig_file, 'r') as f:
                content = f.read()
            if 'vulnerability_taxonomies' in content or 'taxonomy_mappings' in content:
                has_vuln_tables = True
                break
    
    if not has_vuln_tables:
        print(f"   {Fore.RED}❌ No vulnerability tables in migrations!")
        violations_found.append("SEC-012")
        
        security_results.add_violation(SecurityViolation(
            violation_id="SEC-012",
            violation_type=SecurityViolationType.VULN_CLASSIFICATION,
            severity=SecuritySeverity.CRITICAL,
            title="Database Missing Vulnerability Tables",
            description="Database migrations never created vulnerability classification tables.",
            cwe_id="CWE-1209",
            owasp_category="A09:2021 - Security Logging Failures",
            file_location="alembic/versions/:194",
            vulnerable_code="# Missing vulnerability tables",
            secure_code="CREATE TABLE vulnerability_taxonomies (...)",
            exploitation_impact="CANNOT TRACK OR CLASSIFY SECURITY VULNERABILITIES",
            test_result="VULNERABLE",
            evidence={"has_vulnerability_tables": False},
            remediation="Create migration for vulnerability tables",
            verified=True,
            exploitable=False
        ))
    
    # Violation #13: Models Not Imported
    db_base = config.app_dir / "db" / "base.py"
    print(f"\n[SEC-013] Checking Model Imports...")
    
    if db_base.exists():
        with open(db_base, 'r') as f:
            content = f.read()
        
        if 'vulnerability' not in content.lower():
            print(f"   {Fore.RED}❌ Vulnerability models not imported")
            violations_found.append("SEC-013")
            
            security_results.add_violation(SecurityViolation(
                violation_id="SEC-013",
                violation_type=SecurityViolationType.VULN_CLASSIFICATION,
                severity=SecuritySeverity.CRITICAL,
                title="Models Not Imported",
                description="Missing import for vulnerability_taxonomies model.",
                cwe_id="CWE-1209",
                owasp_category="A09:2021 - Security Logging Failures",
                file_location="app/db/base.py:9",
                vulnerable_code="# Missing vulnerability imports",
                secure_code="from app.models.vulnerability import VulnerabilityTaxonomy",
                exploitation_impact="Vulnerability tracking unavailable",
                test_result="VULNERABLE",
                evidence={"imported": False},
                remediation="Import vulnerability models in db/base.py",
                verified=True,
                exploitable=False
            ))
    
    # Violation #14: Models Not Exported
    models_init = config.app_dir / "models" / "__init__.py"
    print(f"\n[SEC-014] Checking Model Exports...")
    
    if models_init.exists():
        with open(models_init, 'r') as f:
            content = f.read()
        
        if 'VulnerabilityTaxonomy' not in content:
            print(f"   {Fore.RED}❌ VulnerabilityTaxonomy not exported")
            violations_found.append("SEC-014")
            
            security_results.add_violation(SecurityViolation(
                violation_id="SEC-014",
                violation_type=SecurityViolationType.VULN_CLASSIFICATION,
                severity=SecuritySeverity.CRITICAL,
                title="Models Not Exported",
                description="VulnerabilityTaxonomy model missing from module exports.",
                cwe_id="CWE-1209",
                owasp_category="A09:2021 - Security Logging Failures",
                file_location="app/models/__init__.py:12",
                vulnerable_code="# Missing VulnerabilityTaxonomy export",
                secure_code="__all__ = [..., 'VulnerabilityTaxonomy']",
                exploitation_impact="Cannot use vulnerability classification",
                test_result="VULNERABLE",
                evidence={"exported": False},
                remediation="Export VulnerabilityTaxonomy in models/__init__.py",
                verified=True,
                exploitable=False
            ))
    
    # Violation #15: Alembic Migration Missing
    print(f"\n[SEC-015] Checking for Vulnerability Migration...")
    vuln_migration_exists = False
    
    if migrations_dir.exists():
        for mig_file in migrations_dir.glob("*vulnerability*.py"):
            vuln_migration_exists = True
            break
    
    if not vuln_migration_exists:
        print(f"   {Fore.RED}❌ No vulnerability migration file")
        violations_found.append("SEC-015")
        
        security_results.add_violation(SecurityViolation(
            violation_id="SEC-015",
            violation_type=SecurityViolationType.VULN_CLASSIFICATION,
            severity=SecuritySeverity.CRITICAL,
            title="Alembic Migration Missing",
            description="No migration exists to create vulnerability classification tables.",
            cwe_id="CWE-1209",
            owasp_category="A09:2021 - Security Logging Failures",
            file_location="alembic/versions/:Missing",
            vulnerable_code="# No vulnerability migration",
            secure_code="alembic revision -m 'Add vulnerability tables'",
            exploitation_impact="Cannot deploy vulnerability tracking",
            test_result="VULNERABLE",
            evidence={"migration_exists": False},
            remediation="Create alembic migration for vulnerability tables",
            verified=True,
            exploitable=False
        ))
    
    if violations_found:
        print(f"\n{Fore.RED}❌ {len(violations_found)} VULNERABILITY CLASSIFICATION VIOLATIONS")
        print(f"   Violations: {', '.join(violations_found)}")
        print(f"   Risk: Cannot track or classify security issues")

# Run the test
test_vulnerability_classification()


Testing Vulnerability Classification (Violations #12-15)...

[SEC-012] Checking Database Migrations...

[SEC-013] Checking Model Imports...

[SEC-014] Checking Model Exports...

[SEC-015] Checking for Vulnerability Migration...


## 7. Authentication & Authorization Tests (Violation #16)

Testing for authentication and authorization gaps.

In [45]:
def test_auth_authorization():
    """Test for auth violations (CRITICAL Violation #16)."""
    
    print(f"\n{Fore.YELLOW}Testing Authentication & Authorization (Violation #16)...")
    print("="*60)
    
    # Violation #16: Evidence Document Storage Missing
    evidence_model = config.app_dir / "models" / "session.py"
    print(f"\n[SEC-016] Checking Session Evidence Storage...")
    
    has_evidence_storage = False
    if evidence_model.exists():
        with open(evidence_model, 'r') as f:
            content = f.read()
        
        if 'evidence' in content.lower() or 'document' in content.lower():
            has_evidence_storage = True
            print(f"   ✓ Evidence storage references found")
        else:
            print(f"   {Fore.RED}❌ No evidence storage in session model")
    else:
        print(f"   {Fore.RED}❌ session.py not found")
    
    if not has_evidence_storage:
        security_results.add_violation(SecurityViolation(
            violation_id="SEC-016",
            violation_type=SecurityViolationType.AUTH_AUTHZ,
            severity=SecuritySeverity.CRITICAL,
            title="Evidence Document Storage Missing",
            description="Missing secure storage for authentication evidence and session tracking.",
            cwe_id="CWE-778",
            owasp_category="A07:2021 - Identification and Authentication Failures",
            file_location="app/models/session.py:1",
            vulnerable_code="# Missing evidence storage",
            secure_code="evidence_documents: Mapped[list] = mapped_column(JSON)",
            exploitation_impact="Cannot track authentication evidence",
            test_result="VULNERABLE",
            evidence={"has_evidence_storage": False},
            remediation="Add evidence storage to session model",
            verified=True,
            exploitable=False
        ))
        
        print(f"\n{Fore.RED}❌ CRITICAL AUTH VIOLATION CONFIRMED")
        print(f"   Impact: Cannot track authentication evidence")

# Run the test
test_auth_authorization()


Testing Authentication & Authorization (Violation #16)...

[SEC-016] Checking Session Evidence Storage...
   ❌ No evidence storage in session model

❌ CRITICAL AUTH VIOLATION CONFIRMED
   Impact: Cannot track authentication evidence


## 8. Summary and Security Impact Analysis

In [46]:
# Display comprehensive security results
security_results.display_summary()

print("\n" + "="*80)
print(f"{Fore.RED}CRITICAL SECURITY VIOLATIONS DETAIL")
print("="*80)

# Group violations by type
by_type = {}
for violation in security_results.violations:
    vtype = violation.violation_type.value
    if vtype not in by_type:
        by_type[vtype] = []
    by_type[vtype].append(violation)

for vtype, violations in by_type.items():
    critical_count = sum(1 for v in violations if v.severity == SecuritySeverity.CRITICAL and v.verified)
    
    if critical_count > 0:
        print(f"\n{Fore.RED}🔴 {vtype} ({critical_count} CRITICAL)")
        print("-"*60)
        
        for violation in violations:
            if violation.verified and violation.severity == SecuritySeverity.CRITICAL:
                print(f"\n[{violation.violation_id}] {violation.title}")
                print(f"   CWE: {violation.cwe_id}")
                print(f"   OWASP: {violation.owasp_category}")
                print(f"   Location: {violation.file_location}")
                print(f"   Exploitable: {'YES' if violation.exploitable else 'NO'}")
                print(f"   Impact: {violation.exploitation_impact}")


SECURITY TEST RESULTS SUMMARY

🔒 Security Assessment:
   Total Violations: 12
   Verified: 10
   Exploitable: 4
   Test Duration: 0:00:00.109607

🔴 Severity Distribution:
   CRITICAL: 12

🎯 Exploitation Results:
   Exploitation Attempts: 0
   Successful Exploits: 0

📋 Compliance Violations:
   NIST_800-53: 10 controls violated
   ISO_27001: 10 controls violated
   OWASP_Top_10: 6 controls violated
   CIS_Controls: 10 controls violated

CRITICAL SECURITY VIOLATIONS DETAIL

🔴 AI Model Sandboxing (4 CRITICAL)
------------------------------------------------------------

[SEC-003] Missing Generator Database Model
   CWE: CWE-693
   OWASP: A04:2021 - Insecure Design
   Location: app/models/:1
   Exploitable: NO
   Impact: Cannot track or audit AI model usage

[SEC-004] No Container Sandboxing Infrastructure
   CWE: CWE-94
   OWASP: A03:2021 - Injection
   Location: app/main.py:238
   Exploitable: YES
   Impact: UNTRUSTED AI MODELS CAN EXECUTE ARBITRARY CODE

[SEC-005] Missing Templating Se

In [47]:
# Security Compliance Assessment
print("\n" + "="*80)
print(f"{Fore.RED}SECURITY COMPLIANCE ASSESSMENT")
print("="*80)

# Calculate security score
total_violations = len(security_results.violations)
critical_violations = sum(1 for v in security_results.violations if v.verified and v.severity == SecuritySeverity.CRITICAL)
exploitable_violations = sum(1 for v in security_results.violations if v.exploitable)

# Security compliance score (inverse of violations)
max_expected_violations = 16  # Based on the report
security_score = max(0, 100 - (critical_violations / max_expected_violations * 100)) if max_expected_violations > 0 else 0

print(f"\n🔒 Security Metrics:")
print(f"   Total Violations Tested: {total_violations}")
print(f"   Critical Violations Confirmed: {critical_violations}")
print(f"   Exploitable Vulnerabilities: {exploitable_violations}")
print(f"   Security Score: {security_score:.1f}%")

# FISMA/FedRAMP Assessment
print(f"\n📋 Federal Compliance Status:")
print(f"   FISMA Compliance: {Fore.RED}❌ FAIL")
print(f"   FedRAMP Authorization: {Fore.RED}❌ WOULD NOT ACHIEVE ATO")
print(f"   NIST 800-53 Controls Failed: {critical_violations}")
print(f"   ISO 27001 Controls Failed: {critical_violations}")

# Risk Assessment
print(f"\n⚠️ Risk Assessment:")
risks = [
    ("Complete multi-tenant data exposure", critical_violations >= 2),
    ("Remote code execution via AI models", exploitable_violations >= 1),
    ("API credential exposure", critical_violations >= 5),
    ("No security vulnerability tracking", critical_violations >= 4),
    ("Privilege escalation possible", critical_violations >= 1)
]

for risk, present in risks:
    if present:
        print(f"   {Fore.RED}❌ {risk}")
    else:
        print(f"   {Fore.GREEN}✓ {risk} (mitigated)")

# Export results
export_data = {
    "assessment_date": datetime.now().isoformat(),
    "github_issue": "#47",
    "summary": security_results.get_summary(),
    "security_score": security_score,
    "violations": [v.to_dict() for v in security_results.violations],
    "exploitation_attempts": security_results.exploitation_attempts,
    "fisma_compliant": False,
    "fedramp_ato_possible": False
}

output_file = "ISSUE_47_Security_Results.json"
with open(output_file, 'w') as f:
    json.dump(export_data, f, indent=2)

print(f"\n{Fore.GREEN}✅ Results exported to {output_file}")


SECURITY COMPLIANCE ASSESSMENT

🔒 Security Metrics:
   Total Violations Tested: 12
   Critical Violations Confirmed: 10
   Exploitable Vulnerabilities: 4
   Security Score: 37.5%

📋 Federal Compliance Status:
   FISMA Compliance: ❌ FAIL
   FedRAMP Authorization: ❌ WOULD NOT ACHIEVE ATO
   NIST 800-53 Controls Failed: 10
   ISO 27001 Controls Failed: 10

⚠️ Risk Assessment:
   ❌ Complete multi-tenant data exposure
   ❌ Remote code execution via AI models
   ❌ API credential exposure
   ❌ No security vulnerability tracking
   ❌ Privilege escalation possible

✅ Results exported to ISSUE_47_Security_Results.json


In [48]:
# Final Security Recommendations
print("\n" + "="*80)
print(f"{Fore.RED}CRITICAL SECURITY RECOMMENDATIONS")
print("="*80)

print(f"""
{Fore.RED}⚠️ IMMEDIATE ACTIONS REQUIRED ⚠️

Based on this security assessment, the ViolentUTF API has
{Fore.RED}CRITICAL SECURITY VULNERABILITIES{Style.RESET_ALL} that must be addressed immediately.

{Fore.CYAN}1. DISABLE MULTI-TENANT FEATURES IMMEDIATELY{Style.RESET_ALL}
   • Add organization_id to all JWT tokens
   • Implement ABAC filtering in all repositories
   • Audit all data access paths

{Fore.CYAN}2. BLOCK AI MODEL UPLOADS{Style.RESET_ALL}
   • Disable any model upload endpoints
   • Implement Docker sandboxing before re-enabling
   • Add resource limits and timeouts

{Fore.CYAN}3. ROTATE ALL API KEYS{Style.RESET_ALL}
   • Migrate to dedicated secrets manager
   • Remove all key hashes from database
   • Implement key rotation mechanism

{Fore.CYAN}4. IMPLEMENT VULNERABILITY TRACKING{Style.RESET_ALL}
   • Create vulnerability classification tables
   • Add CWE/CVE/MITRE mapping
   • Enable security event logging

{Fore.CYAN}5. FIX AUTHENTICATION GAPS{Style.RESET_ALL}
   • Replace superuser flag with proper RBAC
   • Add permission boundaries
   • Implement session scoping

{Fore.YELLOW}DEPLOYMENT RECOMMENDATION:{Style.RESET_ALL}

⛔ {Fore.RED}DO NOT DEPLOY TO PRODUCTION{Style.RESET_ALL}
⛔ {Fore.RED}DO NOT PROCESS SENSITIVE DATA{Style.RESET_ALL}
⛔ {Fore.RED}DO NOT ENABLE MULTI-TENANT MODE{Style.RESET_ALL}

The system requires immediate security remediation before any
production use. Estimated remediation time: 4-6 weeks with a
dedicated security team.
""")

print("\n" + "="*80)
print(f"{Fore.GREEN}✅ Security assessment complete. Results saved.")
print("="*80)


CRITICAL SECURITY RECOMMENDATIONS

⚠️ IMMEDIATE ACTIONS REQUIRED ⚠️

Based on this security assessment, the ViolentUTF API has
CRITICAL SECURITY VULNERABILITIES that must be addressed immediately.

1. DISABLE MULTI-TENANT FEATURES IMMEDIATELY
   • Add organization_id to all JWT tokens
   • Implement ABAC filtering in all repositories
   • Audit all data access paths

2. BLOCK AI MODEL UPLOADS
   • Disable any model upload endpoints
   • Implement Docker sandboxing before re-enabling
   • Add resource limits and timeouts

3. ROTATE ALL API KEYS
   • Migrate to dedicated secrets manager
   • Remove all key hashes from database
   • Implement key rotation mechanism

4. IMPLEMENT VULNERABILITY TRACKING
   • Create vulnerability classification tables
   • Add CWE/CVE/MITRE mapping
   • Enable security event logging

5. FIX AUTHENTICATION GAPS
   • Replace superuser flag with proper RBAC
   • Add permission boundaries
   • Implement session scoping

DEPLOYMENT RECOMMENDATION:

⛔ DO NOT DEPL