In [None]:
# Install required packages
!pip install apachelogs dask pandas numpy scikit-learn tensorflow plotly seaborn joblib
!pip install --upgrade matplotlib  # Ensure latest matplotlib version

# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Import standard libraries
import os
import glob
import json
import re
import gzip
import logging
from datetime import datetime
import numpy as np
import pandas as pd
from typing import Optional, Dict, List, Tuple

# Import data processing libraries
import dask.bag as db
import apachelogs

# Import ML libraries
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier
from sklearn.svm import OneClassSVM
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import classification_report, roc_auc_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
import joblib

# Import visualization libraries
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.graph_objects as go
import plotly.express as px

# Import additional libraries
import urllib.parse
import base64
import html
from itertools import combinations

Collecting apachelogs
  Downloading apachelogs-0.6.1-py3-none-any.whl.metadata (4.7 kB)
Collecting pydicti~=1.1 (from apachelogs)
  Downloading pydicti-1.2.1-py2.py3-none-any.whl.metadata (6.6 kB)
Downloading apachelogs-0.6.1-py3-none-any.whl (17 kB)
Downloading pydicti-1.2.1-py2.py3-none-any.whl (9.0 kB)
Installing collected packages: pydicti, apachelogs
Successfully installed apachelogs-0.6.1 pydicti-1.2.1
Collecting matplotlib
  Downloading matplotlib-3.10.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (11 kB)
Downloading matplotlib-3.10.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (8.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.6/8.6 MB[0m [31m42.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: matplotlib
  Attempting uninstall: matplotlib
    Found existing installation: matplotlib 3.10.0
    Uninstalling matplotlib-3.10.0:
      Successfully uninstalled matplotlib-3.10.0
Successfully installed m

In [None]:
def parse_component_focused_logs_robust(line: str) -> Optional[Dict]:
    """Enhanced log parsing with better error handling and format flexibility"""
    logger = logging.getLogger(__name__)

    # Try apache log parser first
    try:
        formats = [
            '%h %l %u %t "%r" %>s %b "%{Referer}i" "%{User-Agent}i"',  # Combined
            '%h %l %u %t "%r" %>s %b',  # Common
            '%h %l %u %t "%r" %>s'  # Basic
        ]

        for fmt in formats:
            try:
                parser = apachelogs.LogParser(fmt)
                entry = parser.parse(line)

                request_parts = entry.request_line.split(' ')
                method = request_parts[0] if len(request_parts) > 0 else ''
                url = request_parts[1] if len(request_parts) > 1 else ''
                protocol = request_parts[2] if len(request_parts) > 2 else ''

                return {
                    'ip': entry.remote_host,
                    'timestamp': entry.request_time,
                    'method': method,
                    'url': url,
                    'protocol': protocol,
                    'status': entry.final_status,
                    'bytes': entry.bytes_sent,
                    'referer': getattr(entry, 'headers_in', {}).get('Referer', ''),
                    'user_agent': getattr(entry, 'headers_in', {}).get('User-Agent', '')
                }
            except:
                continue
    except:
        pass

    # Fallback to regex parsing
    patterns = [
        re.compile(r'(\S+) \S+ \S+ $$ ([^ $$]+)\] "(\S+) ([^"]+) (\S+)" (\d+) (\d+|-) "([^"]*)" "([^"]*)"'),
        re.compile(r'(\S+) \S+ \S+ $$ ([^ $$]+)\] "(\S+) ([^"]+) (\S+)" (\d+) (\d+|-)'),
        re.compile(r'(\S+).*?$$ ([^ $$]+)\].*?"(\S+) ([^"]+) (\S+)" (\d+)')
    ]

    for pattern in patterns:
        match = pattern.match(line.strip())
        if match:
            groups = match.groups()
            while len(groups) < 9:
                groups = groups + ('',)

            return {
                'ip': groups[0],
                'timestamp': groups[1],
                'method': groups[2],
                'url': groups[3],
                'protocol': groups[4],
                'status': int(groups[5]) if groups[5].isdigit() else 0,
                'bytes': int(groups[6]) if groups[6] and groups[6].isdigit() else 0,
                'referer': groups[7] if len(groups) > 7 else '',
                'user_agent': groups[8] if len(groups) > 8 else ''
            }

    logger.debug(f"Failed to parse line: {line[:100]}...")
    return None

def process_logs_distributed(file_paths: List[str], sample_size: Optional[int] = None) -> pd.DataFrame:
    """Process logs using Dask for scalability"""
    logger = logging.getLogger(__name__)

    config = {
        'sample_size': sample_size,
        'chunk_size': 10000,
        'max_workers': 4
    }

    logger.info(f"🔍 Processing {len(file_paths)} files with Dask...")

    def process_file(file_path):
        records = []
        line_count = 0
        error_count = 0

        try:
            open_func = gzip.open if file_path.endswith('.gz') else open
            mode = 'rt' if file_path.endswith('.gz') else 'r'

            with open_func(file_path, mode, encoding='utf-8', errors='ignore') as f:
                for line in f:
                    if config['sample_size'] and line_count >= config['sample_size']:
                        break

                    parsed = parse_component_focused_logs_robust(line)
                    if parsed:
                        component_name, component_type, clean_url = extract_component_from_url_enhanced(parsed['url'])
                        parsed.update({
                            'clean_url': clean_url,
                            'component_name': component_name,
                            'component_type': component_type,
                            'source_file': os.path.basename(file_path)
                        })
                        records.append(parsed)
                    else:
                        error_count += 1

                    line_count += 1

                    if line_count % 10000 == 0:
                        logger.debug(f"Processed {line_count} lines from {os.path.basename(file_path)}")

            logger.info(f"✅ Processed {os.path.basename(file_path)}: {len(records)} valid entries, {error_count} errors")
        except Exception as e:
            logger.error(f"❌ Error processing {file_path}: {e}")

        return records

    bag = db.from_sequence(file_paths)
    results = bag.map(process_file).compute()
    all_records = [record for file_records in results for record in file_records]

    df = pd.DataFrame(all_records)
    logger.info(f"✅ Processed {len(df)} total log entries from {len(file_paths)} files")

    return df

In [None]:

def extract_component_from_url_enhanced(url: str) -> Tuple[str, str, str]:
    """Enhanced component extraction with better classification"""
    logger = logging.getLogger(__name__)

    try:
        base_url = url.split('?')[0].split('#')[0]
        path_parts = [part for part in base_url.split('/') if part]

        if not path_parts:
            return 'root', 'homepage', '/'

        component_info = analyze_component_hierarchy(path_parts)

        component_rules = {
            'api': {'patterns': ['api', 'rest', 'graphql', 'v1', 'v2', 'v3', 'endpoint'], 'depth': 2},
            'admin': {'patterns': ['admin', 'dashboard', 'manage', 'control', 'panel'], 'depth': 2},
            'authentication': {'patterns': ['auth', 'login', 'logout', 'register', 'oauth', 'sso', 'signin', 'signup'], 'depth': 1},
            'file_handler': {'patterns': ['upload', 'download', 'file', 'media', 'assets', 'static'], 'depth': 1},
            'search': {'patterns': ['search', 'query', 'filter', 'find', 'lookup'], 'depth': 1},
            'microservice': {'patterns': ['service', 'ms-', 'svc-'], 'depth': 1},
            'database': {'patterns': ['db', 'mysql', 'postgres', 'mongo', 'redis'], 'depth': 1}
        }

        component_type = 'application'
        component_name = path_parts[0] if path_parts else 'root'

        for comp_type, rules in component_rules.items():
            depth = rules.get('depth', 1)
            check_parts = path_parts[:depth]

            for part in check_parts:
                if any(pattern in part.lower() for pattern in rules['patterns']):
                    component_type = comp_type
                    component_name = '/'.join(path_parts[:depth])
                    break
            if component_type != 'application':
                break

        if '.' in base_url:
            extension = base_url.split('.')[-1].lower()
            if extension in ['php', 'jsp', 'asp', 'py', 'rb', 'go']:
                component_type = 'script'
            elif extension in ['css', 'js', 'jpg', 'png', 'gif', 'svg', 'ico']:
                component_type = 'static'
                component_name = 'static_resources'

        if len(path_parts) >= 3 and component_type == 'application':
            if path_parts[0] in ['users', 'products', 'orders', 'payments', 'notifications']:
                component_type = 'microservice'
                component_name = f"{path_parts[0]}/{path_parts[1]}"

        return component_name, component_type, base_url

    except Exception as e:
        logger.error(f"Error extracting component from URL {url}: {e}")
        return 'unknown', 'unknown', url

def analyze_component_hierarchy(path_parts: List[str]) -> Dict:
    """Analyze component hierarchy for better classification"""
    hierarchy = {
        'depth': len(path_parts),
        'primary': path_parts[0] if path_parts else None,
        'secondary': path_parts[1] if len(path_parts) > 1 else None,
        'resource': path_parts[-1] if path_parts else None
    }

    if len(path_parts) >= 2:
        id_pattern = re.compile(r'^(\d+|[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})$')
        hierarchy['has_id'] = any(id_pattern.match(part) for part in path_parts)
        action_patterns = ['create', 'edit', 'delete', 'update', 'list', 'view', 'get', 'post']
        hierarchy['has_action'] = any(action in path_parts[-1].lower() for action in action_patterns)

    return hierarchy

In [None]:

def decode_payload(payload: str) -> str:
    """Decode potentially encoded payloads"""
    decoded = payload
    try:
        decoded = urllib.parse.unquote(decoded)
    except:
        pass
    try:
        decoded = html.unescape(decoded)
    except:
        pass
    if re.match(r'^[A-Za-z0-9+/]+={0,2}$', decoded) and len(decoded) % 4 == 0:
        try:
            decoded_bytes = base64.b64decode(decoded)
            decoded = decoded_bytes.decode('utf-8', errors='ignore')
        except:
            pass
    return decoded

def detect_attack_patterns_enhanced(df: pd.DataFrame) -> pd.DataFrame:
    """Enhanced attack detection with confidence scoring"""
    logger = logging.getLogger(__name__)

    logger.info("🎯 Enhanced attack pattern detection...")

    df['decoded_url'] = df['url'].apply(decode_payload)

    attack_signatures = {
        'sql_injection': {
            'patterns': [
                (r'union.*select', 0.9),
                (r'(order|group).*by.*\d+', 0.7),
                (r'(and|or)\s*\d+\s*=\s*\d+', 0.8),
                (r'(\'|")\s*(and|or)\s*(\'|")?\s*\d+\s*=\s*\d+', 0.9),
                (r'information_schema', 0.8),
                (r'sysobjects|syscolumns', 0.9),
                (r'exec\s*$$ |execute\s*\(', 0.8),
                (r'drop\s+(table|database)', 0.95)
            ],
            'severity': 'high'
        },
        'xss': {
            'patterns': [
                (r'<script[^>]*>', 0.95),
                (r'javascript:', 0.9),
                (r'on(error|load|click|mouse\w+)\s*=', 0.85),
                (r'(alert|prompt|confirm)\s*\(', 0.8),
                (r'eval\s*\(', 0.9),
                (r'document\.(cookie|write)', 0.85),
                (r'window\.(location|open)', 0.7)
            ],
            'severity': 'high'
        },
        'directory_traversal': {
            'patterns': [
                (r'\.\./', 0.7),
                (r'\.\.\\', 0.7),
                (r'\.{2,}[/\\]', 0.8),
                (r'etc/passwd', 0.95),
                (r'boot\.ini', 0.95),
                (r'windows/system32', 0.9),
                (r'/proc/self', 0.9)
            ],
            'severity': 'medium'
        },
        'command_injection': {
            'patterns': [
                (r';.*?(cat|ls|id|whoami|pwd)', 0.9),
                (r'\|.*?(cat|ls|id|whoami)', 0.9),
                (r'`.*?`', 0.7),
                (r'\$\(.*? $$', 0.8),
                (r'&&\s*\w+', 0.7),
                (r'\|\|\s*\w+', 0.7)
            ],
            'severity': 'critical'
        },
        'file_inclusion': {
            'patterns': [
                (r'(file|include|require|page)\s*=', 0.7),
                (r'\.php\?', 0.6),
                (r'(http|https|ftp)://', 0.7),
                (r'php://input', 0.9),
                (r'expect://', 0.9)
            ],
            'severity': 'high'
        }
    }

    for attack_type, config in attack_signatures.items():
        df[f'attack_{attack_type}_score'] = 0.0
        df[f'attack_{attack_type}_matches'] = 0

        for pattern, weight in config['patterns']:
            matches = df['decoded_url'].str.contains(pattern, case=False, na=False, regex=True)
            df[f'attack_{attack_type}_score'] += matches * weight
            df[f'attack_{attack_type}_matches'] += matches

        max_score = sum(weight for _, weight in config['patterns'])
        df[f'attack_{attack_type}_score'] = df[f'attack_{attack_type}_score'] / max_score
        df[f'attack_{attack_type}'] = df[f'attack_{attack_type}_score'] > 0.5

    attack_score_columns = [col for col in df.columns if col.endswith('_score') and 'attack_' in col]
    df['attack_confidence'] = df[attack_score_columns].max(axis=1)
    df['is_attack'] = df['attack_confidence'] > 0.5

    def get_attack_severity(row):
        severities = []
        for attack_type, config in attack_signatures.items():
            if row[f'attack_{attack_type}']:
                severities.append(config['severity'])
        return 'critical' if 'critical' in severities else 'high' if 'high' in severities else 'medium' if 'medium' in severities else 'low'

    df['attack_severity'] = df.apply(get_attack_severity, axis=1)

    logger.info(f"🚨 Attack detection complete: Total attacks: {df['is_attack'].sum()}, Critical: {(df['attack_severity'] == 'critical').sum()}")

    return df

In [None]:

def calculate_component_vulnerability_scores_configurable(df: pd.DataFrame, weights: Optional[Dict[str, float]] = None) -> pd.DataFrame:
    """Calculate vulnerability scores with configurable weights"""
    logger = logging.getLogger(__name__)

    logger.info("📊 Calculating component vulnerability scores...")

    if weights is None:
        weights = {
            'attack_rate': 100.0, 'error_rate': 50.0, 'server_errors': 2.0, 'auth_errors': 3.0,
            'attack_severity_critical': 5.0, 'attack_severity_high': 3.0, 'attack_severity_medium': 1.0,
            'attack_confidence': 50.0, 'unique_attack_types': 10.0, 'failed_auth_rate': 75.0
        }

    component_stats = df.groupby(['component_name', 'component_type']).agg({
        'ip': 'nunique', 'url': 'count', 'is_attack': ['sum', 'mean'], 'attack_confidence': 'mean',
        'is_error': ['sum', 'mean'], 'is_server_error': ['sum', 'mean'], 'is_auth_error': ['sum', 'mean'],
        'status': ['mean', 'std'], 'bytes': ['sum', 'mean', 'std'], 'timestamp': ['min', 'max']
    }).reset_index()

    severity_stats = df.groupby(['component_name', 'component_type', 'attack_severity']).size().unstack(fill_value=0).reset_index()
    component_stats = component_stats.merge(severity_stats, on=['component_name', 'component_type'], how='left')

    attack_types = [col for col in df.columns if col.startswith('attack_') and col.endswith('_score')]
    unique_attacks = df.groupby(['component_name', 'component_type'])[attack_types].apply(
        lambda x: (x > 0).any().sum()
    ).reset_index(name='unique_attack_types')

    component_stats = component_stats.merge(unique_attacks, on=['component_name', 'component_type'], how='left')

    component_stats.columns = [col[0] if col[1] == '' else f"{col[0]}_{col[1]}" for col in component_stats.columns]

    rename_dict = {
        'ip_nunique': 'unique_ips', 'url_count': 'total_requests', 'is_attack_sum': 'attack_count',
        'is_attack_mean': 'attack_rate', 'attack_confidence_mean': 'avg_attack_confidence',
        'is_error_sum': 'error_count', 'is_error_mean': 'error_rate', 'is_server_error_sum': 'server_errors',
        'is_server_error_mean': 'server_error_rate', 'is_auth_error_sum': 'auth_errors',
        'is_auth_error_mean': 'auth_error_rate', 'status_mean': 'avg_status', 'status_std': 'status_std',
        'bytes_sum': 'total_bytes', 'bytes_mean': 'avg_bytes', 'bytes_std': 'bytes_std',
        'timestamp_min': 'first_access', 'timestamp_max': 'last_access'
    }

    component_stats.rename(columns=rename_dict, inplace=True)

    for severity in ['critical', 'high', 'medium', 'low']:
        if severity not in component_stats.columns:
            component_stats[severity] = 0

    component_stats['vulnerability_score'] = (
        weights.get('attack_rate', 100) * component_stats['attack_rate'] +
        weights.get('error_rate', 50) * component_stats['error_rate'] +
        weights.get('server_errors', 2) * component_stats['server_errors'] +
        weights.get('auth_errors', 3) * component_stats['auth_errors'] +
        weights.get('attack_severity_critical', 5) * component_stats.get('critical', 0) +
        weights.get('attack_severity_high', 3) * component_stats.get('high', 0) +
        weights.get('attack_severity_medium', 1) * component_stats.get('medium', 0) +
        weights.get('attack_confidence', 50) * component_stats['avg_attack_confidence'] +
        weights.get('unique_attack_types', 10) * component_stats['unique_attack_types'] +
        weights.get('failed_auth_rate', 75) * component_stats['auth_error_rate']
    )

    def sigmoid_scale(x, midpoint=100, steepness=0.01):
        return 1 / (1 + np.exp(-steepness * (x - midpoint)))

    component_stats['traffic_factor'] = component_stats['total_requests'].apply(
        lambda x: 0.5 + 0.5 * sigmoid_scale(x, midpoint=100, steepness=0.01)
    )

    component_stats['risk_score'] = component_stats['vulnerability_score'] * component_stats['traffic_factor']

    critical_components = ['authentication', 'admin', 'api']
    is_critical = component_stats['component_type'].isin(critical_components)
    component_stats.loc[is_critical, 'risk_score'] = component_stats.loc[is_critical, 'risk_score'].clip(lower=50)

    component_stats['exposure_score'] = component_stats['unique_ips'] / (component_stats['total_requests'] + 1)
    component_stats['activity_duration'] = (
        component_stats['last_access'] - component_stats['first_access']
    ).dt.total_seconds() / 3600
    component_stats['requests_per_hour'] = component_stats['total_requests'] / (component_stats['activity_duration'] + 0.01)

    component_stats = component_stats.sort_values('risk_score', ascending=False)

    logger.info(f"📈 Vulnerability scoring completed for {len(component_stats)} components")

    return component_stats

In [None]:

def train_ensemble_vulnerability_model(component_stats: pd.DataFrame, use_synthetic_labels: bool = False):
    """Train ensemble model with multiple algorithms"""
    logger = logging.getLogger(__name__)

    logger.info("🤖 Training ensemble vulnerability prediction model...")

    feature_columns = [
        'unique_ips', 'total_requests', 'attack_count', 'attack_rate',
        'error_count', 'error_rate', 'server_errors', 'auth_errors',
        'avg_status', 'status_std', 'exposure_score', 'requests_per_hour',
        'avg_attack_confidence', 'unique_attack_types', 'server_error_rate',
        'auth_error_rate', 'traffic_factor'
    ]

    for severity in ['critical', 'high', 'medium']:
        if severity in component_stats.columns:
            feature_columns.append(severity)

    X = component_stats[feature_columns].fillna(0)

    if use_synthetic_labels:
        y = create_synthetic_vulnerability_labels(component_stats)
    else:
        vulnerability_conditions = (
            (component_stats['attack_rate'] > 0.1) |
            (component_stats['server_error_rate'] > 0.05) |
            (component_stats['auth_error_rate'] > 0.1) |
            (component_stats.get('critical', 0) > 0) |
            (component_stats['risk_score'] > component_stats['risk_score'].quantile(0.8))
        )
        y = vulnerability_conditions.astype(int)

    logger.info(f"🏷 Label distribution - Vulnerable: {y.sum()}, Safe: {len(y) - y.sum()}")

    le_component_type = LabelEncoder()
    component_type_encoded = le_component_type.fit_transform(component_stats['component_type'])

    X_with_type = np.column_stack([X.values, component_type_encoded])
    feature_columns_extended = feature_columns + ['component_type_encoded']

    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X_with_type)

    X_train, X_test, y_train, y_test = train_test_split(X_scaled, y, test_size=0.3, random_state=42, stratify=y)

    logger.info("🌲 Training Random Forest...")
    rf_params = {'n_estimators': [100, 200], 'max_depth': [10, 20, None], 'min_samples_split': [2, 5], 'class_weight': ['balanced']}
    rf_model = GridSearchCV(RandomForestClassifier(random_state=42), rf_params, cv=5, scoring='f1', n_jobs=-1)
    rf_model.fit(X_train, y_train)

    logger.info("🚀 Training Gradient Boosting...")
    gb_model = GradientBoostingClassifier(n_estimators=100, learning_rate=0.1, max_depth=5, random_state=42)
    gb_model.fit(X_train, y_train)

    logger.info("🧠 Training Neural Network...")
    mlp_model = MLPClassifier(hidden_layer_sizes=(100, 50, 25), activation='relu', solver='adam', alpha=0.001, max_iter=500, random_state=42)
    mlp_model.fit(X_train, y_train)

    logger.info("🔍 Training One-Class SVM...")
    X_normal = X_scaled[y == 0]
    oc_svm = None
    if len(X_normal) > 10:
        oc_svm = OneClassSVM(kernel='rbf', gamma='auto', nu=0.1)
        oc_svm.fit(X_normal)
    else:
        logger.warning("⚠ Insufficient normal samples for One-Class SVM")

    logger.info("🎭 Creating Ensemble Model...")
    ensemble_model = VotingClassifier(estimators=[('rf', rf_model.best_estimator_), ('gb', gb_model), ('mlp', mlp_model)], voting='soft')
    ensemble_model.fit(X_train, y_train)

    models = {'Random Forest': rf_model.best_estimator_, 'Gradient Boosting': gb_model, 'Neural Network': mlp_model, 'Ensemble': ensemble_model}
    results = {}

    for name, model in models.items():
        y_pred = model.predict(X_test)
        y_pred_proba = model.predict_proba(X_test)[:, 1]
        report = classification_report(y_test, y_pred, output_dict=True)
        auc_score = roc_auc_score(y_test, y_pred_proba)
        results[name] = {'precision': report['1']['precision'], 'recall': report['1']['recall'], 'f1': report['1']['f1-score'], 'auc': auc_score}
        logger.info(f"📊 {name} Results: Precision: {report['1']['precision']:.3f}, Recall: {report['1']['recall']:.3f}, F1: {report['1']['f1-score']:.3f}, AUC: {auc_score:.3f}")

    feature_importance = pd.DataFrame({
        'feature': feature_columns_extended,
        'rf_importance': rf_model.best_estimator_.feature_importances_,
        'gb_importance': gb_model.feature_importances_
    })
    feature_importance['avg_importance'] = (feature_importance['rf_importance'] + feature_importance['gb_importance']) / 2
    feature_importance = feature_importance.sort_values('avg_importance', ascending=False)

    logger.info("🎯 Top 10 Most Important Features:")
    for _, row in feature_importance.head(10).iterrows():
        logger.info(f"   {row['feature']}: {row['avg_importance']:.4f}")

    all_predictions = ensemble_model.predict_proba(X_scaled)[:, 1]
    component_stats['vulnerability_probability'] = all_predictions

    if oc_svm is not None:
        anomaly_scores = oc_svm.decision_function(X_scaled)
        component_stats['anomaly_score'] = -anomaly_scores

    model_package = {
        'ensemble_model': ensemble_model, 'rf_model': rf_model.best_estimator_, 'gb_model': gb_model,
        'mlp_model': mlp_model, 'oc_svm': oc_svm, 'scaler': scaler, 'label_encoder': le_component_type,
        'feature_columns': feature_columns_extended, 'feature_importance': feature_importance, 'model_results': results
    }

    joblib.dump(model_package, '/content/drive/My Drive/ensemble_vulnerability_model.pkl')
    logger.info("💾 Ensemble model saved to ensemble_vulnerability_model.pkl")

    return model_package, component_stats

def create_synthetic_vulnerability_labels(component_stats: pd.DataFrame) -> np.ndarray:
    """Create synthetic labels based on OWASP patterns"""
    vulnerability_patterns = [
        (component_stats['component_type'] == 'api') & (component_stats['attack_rate'] > 0.05) & (component_stats.get('sql_injection', 0) > 0),
        (component_stats['component_type'] == 'authentication') & (component_stats['auth_error_rate'] > 0.1),
        (component_stats['component_type'] == 'file_handler') & (component_stats['error_rate'] > 0.05),
        (component_stats['component_type'] == 'admin') & (component_stats['unique_ips'] > 10),
        component_stats['attack_rate'] > 0.2,
        component_stats['server_error_rate'] > 0.1,
        (component_stats.get('critical', 0) > 0) | (component_stats.get('high', 0) > 5)
    ]

    y = np.zeros(len(component_stats))
    for pattern in vulnerability_patterns:
        y = y | pattern.values

    return y.astype(int)

In [None]:

def analyze_attack_component_correlation_enhanced(df: pd.DataFrame, component_stats: pd.DataFrame) -> Tuple[pd.DataFrame, Dict, pd.DataFrame]:
    """Enhanced correlation analysis with application log integration"""
    logger = logging.getLogger(__name__)

    logger.info("🔗 Enhanced attack-component correlation analysis...")

    attack_columns = [col for col in df.columns if col.startswith('attack_') and not col.endswith('_score')]
    correlation_data = []

    for attack_type in attack_columns:
        attack_name = attack_type.replace('attack_', '')
        attack_df = df[df[attack_type] == True]

        if len(attack_df) > 0:
            component_attack_stats = attack_df.groupby(['component_name', 'component_type']).agg({
                'url': 'count', 'is_error': 'sum', 'is_server_error': 'sum', 'status': ['mean', 'std'],
                'attack_confidence': 'mean', 'ip': 'nunique'
            }).reset_index()

            component_attack_stats.columns = [
                'component_name', 'component_type', 'attack_attempts', 'error_count', 'server_error_count',
                'avg_status', 'status_std', 'avg_confidence', 'unique_attackers'
            ]

            component_attack_stats['attack_success_rate'] = component_attack_stats['error_count'] / component_attack_stats['attack_attempts']
            component_attack_stats['attack_type'] = attack_name
            correlation_data.append(component_attack_stats)

    if correlation_data:
        full_correlation_df = pd.concat(correlation_data, ignore_index=True)

        pivot_metrics = ['attack_attempts', 'attack_success_rate', 'avg_confidence']
        pivot_tables = {}

        for metric in pivot_metrics:
            pivot_tables[metric] = full_correlation_df.pivot_table(
                index=['component_name', 'component_type'], columns='attack_type', values=metric, fill_value=0, aggfunc='mean'
            ).reset_index()

        correlation_matrix = full_correlation_df.pivot_table(
            index='component_name', columns='attack_type', values='attack_attempts', fill_value=0
        )
        correlation_matrix_normalized = correlation_matrix.div(correlation_matrix.sum(axis=1), axis=0).fillna(0)

        attack_patterns = identify_attack_patterns(full_correlation_df)
        vulnerability_profiles = create_vulnerability_profiles(full_correlation_df, component_stats)

        logger.info("🎯 Attack-Component Correlation Insights:")
        for attack_type in attack_columns:
            attack_name = attack_type.replace('attack_', '')
            attack_data = full_correlation_df[full_correlation_df['attack_type'] == attack_name]
            if len(attack_data) > 0:
                top_target = attack_data.nlargest(1, 'attack_attempts').iloc[0]
                logger.info(f"   {attack_name.upper()} -> {top_target['component_name']} ({top_target['attack_attempts']} attempts, {top_target['attack_success_rate']:.2%} success rate)")

        return full_correlation_df, pivot_tables, vulnerability_profiles
    return pd.DataFrame(), {}, pd.DataFrame()

def identify_attack_patterns(correlation_df: pd.DataFrame) -> pd.DataFrame:
    """Identify common attack patterns and sequences"""
    attack_combinations = correlation_df.groupby('component_name')['attack_type'].apply(lambda x: list(x.unique())).reset_index()

    attack_pairs = []
    for attacks in attack_combinations['attack_type']:
        if len(attacks) > 1:
            for pair in combinations(attacks, 2):
                attack_pairs.append(sorted(pair))

    pair_counts = pd.Series(['-'.join(pair) for pair in attack_pairs]).value_counts()

    return pd.DataFrame({'attack_pattern': pair_counts.index, 'frequency': pair_counts.values})

def create_vulnerability_profiles(correlation_df: pd.DataFrame, component_stats: pd.DataFrame) -> pd.DataFrame:
    """Create comprehensive vulnerability profiles for components"""
    component_profiles = correlation_df.groupby(['component_name', 'component_type']).agg({
        'attack_attempts': 'sum', 'error_count': 'sum', 'server_error_count': 'sum',
        'attack_success_rate': 'mean', 'avg_confidence': 'mean', 'unique_attackers': 'max', 'attack_type': 'nunique'
    }).reset_index()

    component_profiles.columns = [
        'component_name', 'component_type', 'total_attacks', 'total_errors', 'total_server_errors',
        'avg_success_rate', 'avg_attack_confidence', 'max_unique_attackers', 'attack_diversity'
    ]

    profiles = component_profiles.merge(
        component_stats[['component_name', 'risk_score', 'vulnerability_probability']],
        on='component_name', how='left'
    )

    profiles['profile_score'] = (
        profiles['total_attacks'] * 0.3 + profiles['avg_success_rate'] * 100 * 0.3 +
        profiles['attack_diversity'] * 10 * 0.2 + profiles['max_unique_attackers'] * 0.2
    )

    def categorize_profile(row):
        if row['attack_diversity'] >= 3 and row['avg_success_rate'] > 0.5:
            return 'critical_multi_vector'
        elif row['total_attacks'] > 100 and row['avg_success_rate'] > 0.3:
            return 'high_volume_vulnerable'
        elif row['attack_diversity'] >= 2:
            return 'multi_vector_target'
        elif row['avg_success_rate'] > 0.7:
            return 'highly_vulnerable'
        elif row['total_attacks'] > 50:
            return 'frequent_target'
        return 'low_risk'

    profiles['vulnerability_profile'] = profiles.apply(categorize_profile, axis=1)

    return profiles.sort_values('profile_score', ascending=False)

In [None]:

def parse_application_logs(app_log_files: List[str]) -> pd.DataFrame:
    """Parse application logs to correlate with access logs"""
    logger = logging.getLogger(__name__)

    logger.info("📱 Parsing application logs for correlation...")

    app_records = []
    patterns = {
        'error': re.compile(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*?(ERROR|FATAL|CRITICAL).*?(\w+Exception|\w+Error)?.*?(.+)'),
        'warning': re.compile(r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}).*?(WARN|WARNING).*?(.+)'),
        'stack_trace': re.compile(r'at\s+[\w\.$]+$$ [\w\.]+:\d+ $$'),
        'sql_error': re.compile(r'(SQL|SQLException|Database).*?(error|exception)', re.IGNORECASE),
        'auth_failure': re.compile(r'(authentication|authorization|login|access).*?(failed|denied|invalid)', re.IGNORECASE)
    }

    for file_path in app_log_files:
        try:
            with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
                current_error = None
                for line in f:
                    for pattern_name, pattern in patterns.items():
                        match = pattern.search(line)
                        if match:
                            if pattern_name in ['error', 'warning']:
                                if current_error:
                                    app_records.append(current_error)
                                current_error = {
                                    'timestamp': match.group(1),
                                    'level': match.group(2),
                                    'exception': match.group(3) if len(match.groups()) > 2 else None,
                                    'message': match.group(4) if len(match.groups()) > 3 else match.group(3),
                                    'type': pattern_name,
                                    'stack_trace': []
                                }
                            elif pattern_name == 'stack_trace' and current_error:
                                current_error['stack_trace'].append(line.strip())
                            elif pattern_name in ['sql_error', 'auth_failure']:
                                if current_error:
                                    current_error['type'] = pattern_name
                if current_error:
                    app_records.append(current_error)
        except Exception as e:
            logger.error(f"Error parsing application log {file_path}: {e}")

    app_logs_df = pd.DataFrame(app_records)

    if not app_logs_df.empty:
        app_logs_df['timestamp'] = pd.to_datetime(app_logs_df['timestamp'], format='%Y-%m-%d %H:%M:%S', errors='coerce')
        app_logs_df['component'] = app_logs_df['stack_trace'].apply(extract_component_from_stacktrace)
        logger.info(f"✅ Parsed {len(app_logs_df)} application log entries")

    return app_logs_df

def extract_component_from_stacktrace(stack_trace: List[str]) -> str:
    """Extract component name from stack trace"""
    if not stack_trace:
        return 'unknown'

    for line in stack_trace[:5]:
        match = re.search(r'at\s+([\w\.]+)\.([\w]+)\(', line)
        if match:
            package = match.group(1)
            parts = package.split('.')
            if len(parts) > 2:
                return parts[-2] if parts[-2] not in ['util', 'common', 'core'] else parts[-3]
    return 'unknown'

def correlate_access_and_app_logs(access_logs: pd.DataFrame, app_logs: pd.DataFrame, time_window: int = 5) -> pd.DataFrame:
    """Correlate access logs with application logs within time windows"""
    logger = logging.getLogger(__name__)

    logger.info("🔗 Correlating access and application logs...")

    correlations = []
    access_logs['timestamp'] = pd.to_datetime(access_logs['timestamp'])

    for _, app_error in app_logs.iterrows():
        error_time = app_error['timestamp']
        time_start = error_time - pd.Timedelta(seconds=time_window)
        time_end = error_time + pd.Timedelta(seconds=time_window)

        related_access = access_logs[(access_logs['timestamp'] >= time_start) & (access_logs['timestamp'] <= time_end)]

        if not related_access.empty:
            for _, access in related_access.iterrows():
                correlation = {
                    'app_error_time': error_time,
                    'access_time': access['timestamp'],
                    'time_diff': abs((error_time - access['timestamp']).total_seconds()),
                    'component': access['component_name'],
                    'url': access['url'],
                    'status': access['status'],
                    'is_attack': access['is_attack'],
                    'error_type': app_error['type'],
                    'error_level': app_error['level'],
                    'exception': app_error.get('exception', ''),
                    'error_message': app_error['message'][:200]
                }
                correlations.append(correlation)

    correlation_df = pd.DataFrame(correlations)

    if not correlation_df.empty:
        correlation_df = correlation_df.sort_values('time_diff').drop_duplicates(subset=['app_error_time', 'component'], keep='first')
        logger.info(f"✅ Found {len(correlation_df)} access-application log correlations")

        attack_induced_errors = correlation_df[correlation_df['is_attack'] == True]
        logger.info(f"🚨 {len(attack_induced_errors)} errors potentially caused by attacks")

        component_errors = correlation_df.groupby('component').agg({'error_type': 'count', 'is_attack': 'sum'}).reset_index()
        component_errors.columns = ['component', 'error_count', 'attack_induced_errors']

        return correlation_df, component_errors
    logger.warning("⚠ No correlations found between access and application logs")
    return pd.DataFrame(), pd.DataFrame()

In [None]:

def generate_comprehensive_visualizations(component_df: pd.DataFrame, component_stats: pd.DataFrame, correlation_df: pd.DataFrame, model_results: Dict):
    """Generate comprehensive research visualizations"""
    logger = logging.getLogger(__name__)

    logger.info("📊 Generating comprehensive visualizations...")

    plt.style.use('seaborn-v0_8-darkgrid')
    sns.set_palette("husl")

    fig = plt.figure(figsize=(20, 15))

    ax1 = plt.subplot(3, 3, 1)
    component_stats['risk_score'].hist(bins=30, ax=ax1)
    ax1.set_title('Component Risk Score Distribution', fontsize=14)
    ax1.set_xlabel('Risk Score')
    ax1.set_ylabel('Number of Components')

    ax2 = plt.subplot(3, 3, 2)
    top_10 = component_stats.head(10)
    ax2.barh(range(len(top_10)), top_10['risk_score'])
    ax2.set_yticks(range(len(top_10)))
    ax2.set_yticklabels(top_10['component_name'])
    ax2.set_title('Top 10 Vulnerable Components', fontsize=14)
    ax2.set_xlabel('Risk Score')

    ax3 = plt.subplot(3, 3, 3)
    attack_columns = [col for col in component_df.columns if col.startswith('attack_') and not col.endswith('_score')]
    attack_counts = component_df[attack_columns].sum().sort_values(ascending=False)
    ax3.bar(range(len(attack_counts)), attack_counts.values)
    ax3.set_xticks(range(len(attack_counts)))
    ax3.set_xticklabels([col.replace('attack_', '') for col in attack_counts.index], rotation=45)
    ax3.set_title('Attack Type Distribution', fontsize=14)
    ax3.set_ylabel('Number of Attacks')

    ax4 = plt.subplot(3, 3, 4)
    pivot_data = component_stats.pivot_table(index='component_type', values=['attack_rate', 'error_rate', 'server_error_rate'], aggfunc='mean')
    sns.heatmap(pivot_data, annot=True, fmt='.3f', cmap='YlOrRd', ax=ax4)
    ax4.set_title('Component Type Vulnerability Rates', fontsize=14)

    ax5 = plt.subplot(3, 3, 5)
    if not correlation_df.empty:
        success_rates = correlation_df.groupby('component_type')['attack_success_rate'].mean().sort_values(ascending=False)
        ax5.bar(range(len(success_rates)), success_rates.values)
        ax5.set_xticks(range(len(success_rates)))
        ax5.set_xticklabels(success_rates.index, rotation=45)
        ax5.set_title('Attack Success Rate by Component Type', fontsize=14)
        ax5.set_ylabel('Success Rate')

    ax6 = plt.subplot(3, 3, 6)
    if model_results:
        metrics = ['precision', 'recall', 'f1', 'auc']
        model_names = list(model_results.keys())
        x = np.arange(len(metrics))
        width = 0.2
        for i, model in enumerate(model_names):
            values = [model_results[model][metric] for metric in metrics]
            ax6.bar(x + i * width, values, width, label=model)
        ax6.set_xticks(x + width * 1.5)
        ax6.set_xticklabels(metrics)
        ax6.set_title('Model Performance Comparison', fontsize=14)
        ax6.set_ylabel('Score')
        ax6.legend()

    ax7 = plt.subplot(3, 3, 7)
    if 'timestamp' in component_df.columns:
        component_df['hour'] = pd.to_datetime(component_df['timestamp']).dt.hour
        hourly_attacks = component_df.groupby('hour').agg({'is_attack': 'sum'})
        ax7.plot(hourly_attacks.index, hourly_attacks.values, marker='o')
        ax7.set_title('Hourly Attack Distribution', fontsize=14)
        ax7.set_xlabel('Hour of Day')
        ax7.set_ylabel('Number of Attacks')
        ax7.set_xticks(range(0, 24, 4))

    ax8 = plt.subplot(3, 3, 8)
    if 'vulnerability_probability' in component_stats.columns:
        component_stats['vulnerability_probability'].hist(bins=30, ax=ax8)
        ax8.axvline(0.5, color='red', linestyle='--', label='Threshold')
        ax8.set_title('ML Vulnerability Probability Distribution', fontsize=14)
        ax8.set_xlabel('Probability')
        ax8.set_ylabel('Number of Components')
        ax8.legend()

    ax9 = plt.subplot(3, 3, 9)
    if 'feature_importance' in model_results:
        top_features = model_results['feature_importance'].head(10)
        ax9.barh(range(len(top_features)), top_features['avg_importance'])
        ax9.set_yticks(range(len(top_features)))
        ax9.set_yticklabels(top_features['feature'])
        ax9.set_title('Top 10 Feature Importances', fontsize=14)
        ax9.set_xlabel('Importance')

    plt.tight_layout()
    plt.savefig('/content/drive/My Drive/vulnerability_analysis_dashboard.png', dpi=150, bbox_inches='tight')
    plt.close(fig)

    create_interactive_visualizations(component_stats, correlation_df)

    logger.info("✅ Visualizations generated and saved")

def create_interactive_visualizations(component_stats: pd.DataFrame, correlation_df: pd.DataFrame):
    """Create interactive visualizations using Plotly"""
    logger = logging.getLogger(__name__)

    fig1 = go.Figure(data=[go.Scatter3d(
        x=component_stats['attack_rate'], y=component_stats['error_rate'], z=component_stats['risk_score'],
        mode='markers+text', text=component_stats['component_name'], textposition="top center",
        marker=dict(size=component_stats['total_requests'] / 100, color=component_stats['vulnerability_probability'], colorscale='Viridis', showscale=True, colorbar=dict(title="Vulnerability Probability"))
    )])

    fig1.update_layout(
        title='3D Component Vulnerability Score',
        scene=dict(xaxis_title='Attack Rate', yaxis_title='Error Rate', zaxis_title='Risk Score')
    )
    fig1.write_html('/content/drive/My Drive/component_vulnerability_3d.html')

    if not correlation_df.empty:
        attack_flow = correlation_df.groupby(['attack_type', 'component_type']).size().reset_index(name='attack_count')
        attack_types = attack_flow['attack_type'].unique()
        component_types = attack_flow['component_type'].unique()
        labels = list(attack_types) + list(component_types)
        source = [list(attack_types).index(x) for x in attack_flow['attack_type']]
        target = [len(attack_types) + list(component_types).index(x) for x in attack_flow['component_type']]
        value = attack_flow['attack_count'].tolist()

        fig2 = go.Figure(data=[go.Sankey(
            node=dict(pad=15, thickness=20, line=dict(color="black", width=0.5), label=labels, color="blue"),
            link=dict(source=source, target=target, value=value)
        )])

        fig2.update_layout(title="Attack Type to Component Type Flow", font_size=12)
        fig2.write_html('/content/drive/My Drive/attack_flow_sankey.html')

    logger.info("✅ Interactive visualizations created")

In [None]:

class VulnerabilityDetectionPipeline:
    """Production-ready vulnerability detection pipeline"""

    def __init__(self, config: Dict):
        self.config = config
        self.logger = self._setup_logging()
        self.models = {}
        self.scalers = {}
        self.encoders = {}

    def _setup_logging(self) -> logging.Logger:
        """Setup comprehensive logging"""
        log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        logging.basicConfig(
            level=self.config.get('log_level', logging.INFO),
            format=log_format,
            handlers=[
                logging.FileHandler(self.config.get('log_file', '/content/drive/My Drive/vulnerability_detection.log')),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(__name__)

    def run_full_pipeline(self, log_files: List[str], app_log_files: Optional[List[str]] = None):
        """Run the complete vulnerability detection pipeline"""
        try:
            self.logger.info("Starting vulnerability detection pipeline...")
            access_logs = self.process_logs(log_files)

            access_logs = self.detect_attacks(access_logs)

            component_stats = self.calculate_vulnerability_scores(access_logs)

            if app_log_files:
                app_logs = self.parse_application_logs(app_log_files)
                correlations, component_errors = self.correlate_logs(access_logs, app_logs)
                component_stats = self.enhance_with_app_correlations(component_stats, correlations)

            if self.config.get('train_models', True):
                model_package, predictions = self.train_models(component_stats)
            else:
                self.load_models()
                predictions = self.predict_vulnerabilities(component_stats)

            self.generate_reports(predictions)
            self.create_visualizations(access_logs, predictions, model_package.get('model_results', {}))

            return predictions

        except Exception as e:
            self.logger.error(f"Pipeline failed: {e}")
            raise

    def process_logs(self, log_files: List[str]) -> pd.DataFrame:
        return process_logs_distributed(log_files, sample_size=self.config.get('sample_size', None))

    def detect_attacks(self, df: pd.DataFrame) -> pd.DataFrame:
        return detect_attack_patterns_enhanced(df)

    def calculate_vulnerability_scores(self, df: pd.DataFrame) -> pd.DataFrame:
        weights = self.config.get('vulnerability_weights', None)
        return calculate_component_vulnerability_scores_configurable(df, weights)

    def parse_application_logs(self, app_log_files: List[str]) -> pd.DataFrame:
        return parse_application_logs(app_log_files)

    def correlate_logs(self, access_logs: pd.DataFrame, app_logs: pd.DataFrame):
        return correlate_access_and_app_logs(access_logs, app_logs)

    def enhance_with_app_correlations(self, component_stats: pd.DataFrame, correlations: pd.DataFrame) -> pd.DataFrame:
        if not correlations.empty:
            component_stats = component_stats.merge(
                correlations.groupby('component_name').agg({'error_count': 'sum', 'attack_induced_errors': 'sum'}),
                on='component_name', how='left'
            ).fillna(0)
        return component_stats

    def train_models(self, component_stats: pd.DataFrame):
        return train_ensemble_vulnerability_model(component_stats, use_synthetic_labels=self.config.get('use_synthetic_labels', False))

    def load_models(self):
        model_path = self.config.get('model_path', '/content/drive/My Drive/ensemble_vulnerability_model.pkl')
        self.models = joblib.load(model_path)
        self.logger.info(f"Loaded models from {model_path}")

    def predict_vulnerabilities(self, component_stats: pd.DataFrame) -> pd.DataFrame:
        feature_columns = self.models['feature_columns']
        X = component_stats[feature_columns[:-1]].fillna(0)
        component_type_encoded = self.models['label_encoder'].transform(component_stats['component_type'])
        X_with_type = np.column_stack([X.values, component_type_encoded])
        X_scaled = self.models['scaler'].transform(X_with_type)

        vulnerability_probs = self.models['ensemble_model'].predict_proba(X_scaled)[:, 1]
        component_stats['vulnerability_probability'] = vulnerability_probs

        if self.models.get('oc_svm'):
            anomaly_scores = self.models['oc_svm'].decision_function(X_scaled)
            component_stats['anomaly_score'] = -anomaly_scores

        threshold = self.config.get('vulnerability_threshold', 0.5)
        component_stats['is_vulnerable'] = component_stats['vulnerability_probability'] > threshold
        component_stats['recommendations'] = component_stats.apply(self._generate_recommendations, axis=1)

        return component_stats

    def _generate_recommendations(self, component: pd.Series) -> List[str]:
        recommendations = []
        if component['is_vulnerable']:
            if component['component_type'] == 'authentication':
                recommendations.extend(['Implement rate limiting', 'Enable MFA', 'Review session management'])
            elif component['component_type'] == 'api':
                recommendations.extend(['Implement API rate limiting', 'Add input validation', 'Enable API authentication'])
            elif component['component_type'] == 'file_handler':
                recommendations.extend(['File type validation', 'Use sandboxed environments', 'Enable antivirus scanning'])

            if component.get('attack_sql_injection', False):
                recommendations.extend(['Use parameterized queries', 'Implement input validation'])
            if component.get('attack_xss', False):
                recommendations.extend(['Implement CSP', 'Enable output encoding'])
            if component.get('server_error_rate', 0) > 0.1:
                recommendations.extend(['Review error handling', 'Implement exception handling'])
            if component['risk_score'] > 100:
                recommendations.extend(['Conduct immediate security review', 'Consider temporary disabling'])

        return recommendations

    def generate_reports(self, predictions: pd.DataFrame):
        executive_summary = self._generate_executive_summary(predictions)
        component_report = self._generate_component_report(predictions)

        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        with open(f'/content/drive/My Drive/vulnerability_report_{timestamp}.md', 'w') as f:
            f.write(executive_summary + '\n\n' + component_report)

        report_data = {
            'timestamp': timestamp,
            'summary': {
                'total_components': len(predictions),
                'vulnerable_components': predictions['is_vulnerable'].sum(),
                'critical_components': len(predictions[predictions['risk_score'] > 200]),
                'average_risk_score': predictions['risk_score'].mean()
            },
            'vulnerable_components': predictions[predictions['is_vulnerable']].to_dict('records')
        }

        with open(f'/content/drive/My Drive/vulnerability_report_{timestamp}.json', 'w') as f:
            json.dump(report_data, f, indent=2, default=str)

        self.logger.info(f"Reports generated: vulnerability_report_{timestamp}.[md|json]")

    def _generate_executive_summary(self, predictions: pd.DataFrame) -> str:
        vulnerable_count = predictions['is_vulnerable'].sum()
        total_count = len(predictions)
        vulnerability_rate = (vulnerable_count / total_count) * 100

        summary = f"""# Vulnerability Detection Report - Executive Summary
## Overview
- Total Components Analyzed: {total_count}
- Vulnerable Components: {vulnerable_count} ({vulnerability_rate:.1f}%)
- Critical Risk Components: {len(predictions[predictions['risk_score'] > 200])}
- Average Risk Score: {predictions['risk_score'].mean():.2f}

## Key Findings
1. **Most Vulnerable Component Types**:
"""
        vuln_by_type = predictions[predictions['is_vulnerable']].groupby('component_type').size()
        for comp_type, count in vuln_by_type.nlargest(3).items():
            summary += f"   - {comp_type}: {count} vulnerable components\n"

        summary += "2. **Top Attack Patterns**:\n"
        attack_columns = [col for col in predictions.columns if col.startswith('attack_') and col.endswith('_sum')]
        if attack_columns:
            attack_sums = predictions[attack_columns].sum().sort_values(ascending=False)
            for attack, count in attack_sums.head(3).items():
                attack_name = attack.replace('attack_', '').replace('_sum', '')
                summary += f"   - {attack_name}: {count} instances\n"

        summary += f"""
3. **Immediate Actions**:
   - Review {len(predictions[predictions['risk_score'] > 200])} critical components
   - Implement security measures for vulnerable components
   - Schedule security audits
"""
        return summary

    def _generate_component_report(self, predictions: pd.DataFrame) -> str:
        report = "# Detailed Component Vulnerability Report\n\n"
        vulnerable_components = predictions[predictions['is_vulnerable']].sort_values('risk_score', ascending=False)

        for idx, component in vulnerable_components.head(20).iterrows():
            report += f"""## {component['component_name']} ({component['component_type']})
**Risk Score**: {component['risk_score']:.2f}
**Vulnerability Probability**: {component['vulnerability_probability']:.3f}
**Attack Rate**: {component.get('attack_rate', 0):.3f}
**Error Rate**: {component.get('error_rate', 0):.3f}

### Detected Attacks:
"""
            attack_columns = [col for col in component.index if col.startswith('attack_') and component[col] and not col.endswith('_score') and not col.endswith('_matches')]
            for attack in attack_columns:
                attack_name = attack.replace('attack_', '').replace('_', ' ').title()
                report += f"- {attack_name}\n"

            report += "\n### Recommendations:\n"
            if 'recommendations' in component and component['recommendations']:
                for rec in component['recommendations']:
                    report += f"- {rec}\n"
            report += "\n---\n\n"

        return report

    def create_visualizations(self, access_logs: pd.DataFrame, component_stats: pd.DataFrame, model_results: Dict):
        full_correlation_df, _, _ = analyze_attack_component_correlation_enhanced(access_logs, component_stats)
        generate_comprehensive_visualizations(access_logs, component_stats, full_correlation_df, model_results)

# Configuration
config = {
    'log_level': logging.INFO,
    'log_file': '/content/drive/My Drive/vulnerability_detection.log',
    'sample_size': None,
    'train_models': True,
    'use_synthetic_labels': False,
    'vulnerability_threshold': 0.5,
    'vulnerability_weights': {
        'attack_rate': 100.0, 'error_rate': 50.0, 'server_errors': 2.0, 'auth_errors': 3.0,
        'attack_severity_critical': 5.0, 'attack_severity_high': 3.0, 'attack_severity_medium': 1.0,
        'attack_confidence': 50.0, 'unique_attack_types': 10.0, 'failed_auth_rate': 75.0
    }
}

# Run pipeline
if __name__ == "__main__":
    pipeline = VulnerabilityDetectionPipeline(config)
    log_files = glob.glob('/content/drive/My Drive/logs/*.log')
    app_log_files = glob.glob('/content/drive/My Drive/app_logs/*.log')
    results = pipeline.run_full_pipeline(log_files, app_log_files)
    print(f"✅ Pipeline completed. Found {results['is_vulnerable'].sum()} vulnerable components.")

ERROR:__main__:Pipeline failed: 'url'


KeyError: 'url'