# Analyse des Vulnérabilités ANSSI
## Extraction, Enrichissement et Machine Learning

Ce notebook couvre l'ensemble du pipeline d'analyse des vulnérabilités ANSSI:
- Extraction des flux RSS
- Enrichissement des CVE avec APIs externes
- Consolidation dans un DataFrame
- Analyses exploratoires et visualisations
- Modèles de Machine Learning (supervisé et non-supervisé)
- Génération d'alertes personnalisées

## 1. Import des Bibliothèques Requises

In [None]:
# Manipulations de données
import pandas as pd
import numpy as np
import os

# Visualisations
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px

# Machine Learning
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier, GradientBoostingRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, f1_score, mean_squared_error
from sklearn.metrics import silhouette_score

# Paramètres de visualisation
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')
%matplotlib inline

# Warnings
import warnings
warnings.filterwarnings('ignore')

print("✓ Toutes les bibliothèques importées avec succès")

## 2. Chargement du DataFrame Consolidé

In [None]:
# Chemin du fichier CSV
csv_path = "data/processed/cves_consolidated.csv"

# Vérification du fichier
if os.path.exists(csv_path):
    df = pd.read_csv(csv_path)
    print(f"✓ DataFrame chargé: {df.shape[0]} lignes, {df.shape[1]} colonnes")
else:
    print(f"✗ Fichier non trouvé: {csv_path}")
    print("Exécutez d'abord main.py pour générer le fichier")
    df = None

## 3. Exploration des Données

In [None]:
if df is not None:
    print("=== Informations Générales ===")
    print(f"Dimensions: {df.shape}")
    print(f"\nColonnes:")
    print(df.dtypes)
    print(f"\nValeurs manquantes:")
    print(df.isnull().sum())
    print(f"\nPremières lignes:")
    df.head()

In [None]:
# Statistiques descriptives
if df is not None:
    print("=== Statistiques Descriptives ===")
    print(df.describe())
    print(f"\n=== Statistiques par Catégorie ===")
    print(f"CVE uniques: {df['cve_id'].nunique()}")
    print(f"Bulletins uniques: {df['id_anssi'].nunique()}")
    print(f"Vendors uniques: {df['vendor'].nunique()}")
    print(f"Produits uniques: {df['produit'].nunique()}")

## 4. Visualisations Exploratoires - Scores CVSS

In [None]:
# Préparation des données
if df is not None:
    df_clean = df.dropna(subset=['cvss_score'])
    
    # Histogramme CVSS
    fig, axes = plt.subplots(2, 2, figsize=(14, 10))
    
    # 1. Distribution des scores CVSS
    axes[0, 0].hist(df_clean['cvss_score'], bins=20, color='steelblue', edgecolor='black')
    axes[0, 0].set_xlabel('Score CVSS')
    axes[0, 0].set_ylabel('Fréquence')
    axes[0, 0].set_title('Distribution des Scores CVSS')
    axes[0, 0].axvline(df_clean['cvss_score'].mean(), color='red', linestyle='--', label=f"Moyenne: {df_clean['cvss_score'].mean():.2f}")
    axes[0, 0].legend()
    
    # 2. Gravité par Score
    severity_counts = df_clean['base_severity'].value_counts()
    axes[0, 1].bar(severity_counts.index, severity_counts.values, color=['#d32f2f', '#f57c00', '#fbc02d', '#388e3c'])
    axes[0, 1].set_xlabel('Sévérité')
    axes[0, 1].set_ylabel('Nombre')
    axes[0, 1].set_title('Distribution de la Sévérité')
    
    # 3. Box plot CVSS par type de bulletin
    df_clean.boxplot(column='cvss_score', by='type_bulletin', ax=axes[1, 0])
    axes[1, 0].set_xlabel('Type de Bulletin')
    axes[1, 0].set_ylabel('Score CVSS')
    axes[1, 0].set_title('Distribution CVSS par Type de Bulletin')
    plt.sca(axes[1, 0])
    plt.xticks(rotation=0)
    
    # 4. Score EPSS
    df_clean_epss = df.dropna(subset=['epss_score'])
    axes[1, 1].hist(df_clean_epss['epss_score'], bins=20, color='coral', edgecolor='black')
    axes[1, 1].set_xlabel('Score EPSS')
    axes[1, 1].set_ylabel('Fréquence')
    axes[1, 1].set_title('Distribution des Scores EPSS')
    
    plt.tight_layout()
    plt.show()

## 5. Visualisations - Types de Vulnérabilités (CWE)

In [None]:
if df is not None:
    # Top 10 CWE
    top_cwe = df['cwe_id'].value_counts().head(10)
    
    fig = px.pie(
        values=top_cwe.values,
        names=top_cwe.index,
        title='Top 10 Types de Vulnérabilités (CWE)',
        labels=top_cwe.index
    )
    fig.show()
    
    print(f"\nTop 10 CWE:")
    print(top_cwe)

## 6. Corrélation CVSS-EPSS

In [None]:
if df is not None:
    df_corr = df.dropna(subset=['cvss_score', 'epss_score'])
    
    # Scatter plot
    fig = px.scatter(
        df_corr,
        x='cvss_score',
        y='epss_score',
        color='base_severity',
        size='epss_score',
        hover_data=['cve_id', 'vendor', 'produit'],
        title='Corrélation entre CVSS et EPSS',
        labels={'cvss_score': 'Score CVSS', 'epss_score': 'Score EPSS'}
    )
    fig.show()
    
    # Matrice de corrélation
    print(f"\nCorrélation CVSS-EPSS: {df_corr['cvss_score'].corr(df_corr['epss_score']):.3f}")

## 7. Vendors et Produits les Plus Impactés

In [None]:
if df is not None:
    fig, axes = plt.subplots(1, 2, figsize=(15, 5))
    
    # Top vendors
    top_vendors = df['vendor'].value_counts().head(10)
    axes[0].barh(top_vendors.index, top_vendors.values, color='steelblue')
    axes[0].set_xlabel('Nombre de Vulnérabilités')
    axes[0].set_title('Top 10 Éditeurs Affectés')
    axes[0].invert_yaxis()
    
    # Top produits
    top_produits = df['produit'].value_counts().head(10)
    axes[1].barh(top_produits.index, top_produits.values, color='coral')
    axes[1].set_xlabel('Nombre de Vulnérabilités')
    axes[1].set_title('Top 10 Produits Affectés')
    axes[1].invert_yaxis()
    
    plt.tight_layout()
    plt.show()

## 8. Machine Learning - Préparation des Données

In [None]:
if df is not None:
    # Création d'une copie pour le ML
    df_ml = df.copy()
    
    # Remplissage des valeurs manquantes
    df_ml['cvss_score'].fillna(df_ml['cvss_score'].median(), inplace=True)
    df_ml['epss_score'].fillna(df_ml['epss_score'].median(), inplace=True)
    
    # Création de la variable cible: criticité (basée sur CVSS)
    def get_criticality(cvss):
        if pd.isna(cvss):
            return 'Unknown'
        elif cvss >= 9:
            return 'Critique'
        elif cvss >= 7:
            return 'Élevée'
        elif cvss >= 4:
            return 'Moyenne'
        else:
            return 'Faible'
    
    df_ml['criticality'] = df_ml['cvss_score'].apply(get_criticality)
    
    # Encodage des variables catégorielles
    le_vendor = LabelEncoder()
    le_product = LabelEncoder()
    le_severity = LabelEncoder()
    le_criticality = LabelEncoder()
    
    df_ml['vendor_encoded'] = le_vendor.fit_transform(df_ml['vendor'].fillna('Unknown'))
    df_ml['produit_encoded'] = le_product.fit_transform(df_ml['produit'].fillna('Unknown'))
    df_ml['severity_encoded'] = le_severity.fit_transform(df_ml['base_severity'].fillna('Unknown'))
    df_ml['criticality_encoded'] = le_criticality.fit_transform(df_ml['criticality'])
    
    print("✓ Données préparées pour ML")
    print(f"Criticité: {df_ml['criticality'].value_counts()}")

## 9. Modèle Non-Supervisé: K-Means Clustering

In [None]:
if df is not None:
    # Préparation des features
    features_clustering = df_ml[['cvss_score', 'epss_score', 'vendor_encoded', 'severity_encoded']].copy()
    
    # Normalisation
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features_clustering)
    
    # Détermination du nombre optimal de clusters (Elbow method)
    inertias = []
    silhouette_scores = []
    K_range = range(2, 11)
    
    for k in K_range:
        kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)
        kmeans.fit(features_scaled)
        inertias.append(kmeans.inertia_)
        silhouette_scores.append(silhouette_score(features_scaled, kmeans.labels_))
    
    # Visualisation
    fig, axes = plt.subplots(1, 2, figsize=(14, 4))
    
    axes[0].plot(K_range, inertias, 'bo-')
    axes[0].set_xlabel('Nombre de Clusters')
    axes[0].set_ylabel('Inertie')
    axes[0].set_title('Elbow Method')
    axes[0].grid(True, alpha=0.3)
    
    axes[1].plot(K_range, silhouette_scores, 'ro-')
    axes[1].set_xlabel('Nombre de Clusters')
    axes[1].set_ylabel('Silhouette Score')
    axes[1].set_title('Silhouette Analysis')
    axes[1].grid(True, alpha=0.3)
    
    plt.tight_layout()
    plt.show()
    
    # Sélection du meilleur k
    best_k = K_range[np.argmax(silhouette_scores)]
    print(f"✓ Meilleur nombre de clusters: {best_k} (Silhouette: {max(silhouette_scores):.3f})")

In [None]:
if df is not None:
    # K-Means avec k optimal
    kmeans = KMeans(n_clusters=best_k, random_state=42, n_init=10)
    df_ml['cluster'] = kmeans.fit_predict(features_scaled)
    
    # Visualisation avec PCA
    pca = PCA(n_components=2)
    features_pca = pca.fit_transform(features_scaled)
    
    plt.figure(figsize=(10, 8))
    scatter = plt.scatter(features_pca[:, 0], features_pca[:, 1], 
                         c=df_ml['cluster'], cmap='viridis', s=50, alpha=0.6)
    plt.scatter(pca.transform(kmeans.cluster_centers_)[:, 0],
               pca.transform(kmeans.cluster_centers_)[:, 1],
               marker='+', s=300, c='red', edgecolors='black', linewidth=2,
               label='Centroïdes')
    plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.1%})')
    plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.1%})')
    plt.title('K-Means Clustering (PCA Visualisation)')
    plt.legend()
    plt.colorbar(scatter, label='Cluster')
    plt.grid(True, alpha=0.3)
    plt.show()
    
    # Analyse des clusters
    print(f"\nDistribution des clusters:")
    print(df_ml['cluster'].value_counts().sort_index())
    
    print(f"\nCaractéristiques des clusters:")
    cluster_analysis = df_ml.groupby('cluster')[['cvss_score', 'epss_score']].mean()
    print(cluster_analysis)

## 10. Modèle Supervisé: Classification de la Criticité

In [None]:
if df is not None:
    # Préparation des features pour la classification
    features_sup = df_ml[['cvss_score', 'epss_score', 'vendor_encoded', 'produit_encoded', 'severity_encoded']].copy()
    target = df_ml['criticality_encoded']
    
    # Split train/test
    X_train, X_test, y_train, y_test = train_test_split(
        features_sup, target, test_size=0.2, random_state=42, stratify=target
    )
    
    # Normalisation
    scaler_sup = StandardScaler()
    X_train_scaled = scaler_sup.fit_transform(X_train)
    X_test_scaled = scaler_sup.transform(X_test)
    
    # Entraînement du modèle
    clf = RandomForestClassifier(n_estimators=100, random_state=42, max_depth=10)
    clf.fit(X_train_scaled, y_train)
    
    # Prédictions
    y_pred = clf.predict(X_test_scaled)
    y_pred_proba = clf.predict_proba(X_test_scaled)
    
    # Évaluation
    accuracy = (y_pred == y_test).mean()
    f1 = f1_score(y_test, y_pred, average='weighted')
    
    print("=== Modèle de Classification - Random Forest ===")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"F1-Score: {f1:.4f}")
    print(f"\nRapport de Classification:")
    criticality_labels = le_criticality.classes_
    print(classification_report(y_test, y_pred, target_names=criticality_labels))

In [None]:
if df is not None:
    # Matrice de confusion
    cm = confusion_matrix(y_test, y_pred)
    
    plt.figure(figsize=(8, 6))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=list(criticality_labels),
                yticklabels=list(criticality_labels))
    plt.xlabel('Prédiction')
    plt.ylabel('Réalité')
    plt.title('Matrice de Confusion - Classification Criticité')
    plt.show()
    
    # Importance des features
    feature_importance = pd.DataFrame({
        'feature': features_sup.columns,
        'importance': clf.feature_importances_
    }).sort_values('importance', ascending=False)
    
    plt.figure(figsize=(10, 5))
    plt.barh(feature_importance['feature'], feature_importance['importance'])
    plt.xlabel('Importance')
    plt.title('Importance des Features - Classification')
    plt.gca().invert_yaxis()
    plt.show()
    
    print("\nImportance des Features:")
    print(feature_importance)

## 11. Modèle Supervisé: Prédiction du Score EPSS (Régression)

In [None]:
if df is not None:
    # Préparation pour la régression
    df_regression = df_ml.dropna(subset=['epss_score']).copy()
    
    features_reg = df_regression[['cvss_score', 'vendor_encoded', 'produit_encoded', 'severity_encoded']]
    target_reg = df_regression['epss_score']
    
    # Split
    X_train_reg, X_test_reg, y_train_reg, y_test_reg = train_test_split(
        features_reg, target_reg, test_size=0.2, random_state=42
    )
    
    # Normalisation
    scaler_reg = StandardScaler()
    X_train_reg_scaled = scaler_reg.fit_transform(X_train_reg)
    X_test_reg_scaled = scaler_reg.transform(X_test_reg)
    
    # Entraînement
    regressor = GradientBoostingRegressor(n_estimators=100, random_state=42, max_depth=5)
    regressor.fit(X_train_reg_scaled, y_train_reg)
    
    # Prédictions
    y_pred_reg = regressor.predict(X_test_reg_scaled)
    
    # Évaluation
    mse = mean_squared_error(y_test_reg, y_pred_reg)
    rmse = np.sqrt(mse)
    r2 = regressor.score(X_test_reg_scaled, y_test_reg)
    
    print("=== Modèle de Régression - Gradient Boosting ===")
    print(f"RMSE: {rmse:.4f}")
    print(f"R² Score: {r2:.4f}")
    
    # Visualisation
    plt.figure(figsize=(10, 6))
    plt.scatter(y_test_reg, y_pred_reg, alpha=0.6)
    plt.plot([y_test_reg.min(), y_test_reg.max()], 
            [y_test_reg.min(), y_test_reg.max()], 
            'r--', lw=2)
    plt.xlabel('EPSS Réel')
    plt.ylabel('EPSS Prédis')
    plt.title(f'Prédiction EPSS (R²={r2:.4f})')
    plt.grid(True, alpha=0.3)
    plt.show()

## 12. Validation des Modèles

In [None]:
if df is not None:
    print("=== Résumé de la Validation des Modèles ===")
    print(f"\n1. Clustering (K-Means):")
    print(f"   - Nombre de clusters: {best_k}")
    print(f"   - Silhouette Score: {max(silhouette_scores):.4f}")
    print(f"   - Variance expliquée (PCA): {sum(pca.explained_variance_ratio_):.4f}")
    
    print(f"\n2. Classification Criticité (Random Forest):")
    print(f"   - Accuracy: {accuracy:.4f}")
    print(f"   - F1-Score: {f1:.4f}")
    print(f"   - Samples: {len(X_test)}")
    
    print(f"\n3. Régression EPSS (Gradient Boosting):")
    print(f"   - R² Score: {r2:.4f}")
    print(f"   - RMSE: {rmse:.4f}")
    print(f"   - Samples: {len(X_test_reg)}")

## 13. Génération d'Alertes Personnalisées

In [None]:
if df is not None:
    # Définition des règles d'alerte
    def create_alert_level(row) -> str:
        cvss = row['cvss_score']
        epss = row['epss_score']
        
        if pd.isna(cvss):
            return ''
        
        # Critique
        if cvss >= 9:
            return 'CRITIQUE'
        if cvss >= 7 and pd.notna(epss) and epss >= 0.75:
            return 'CRITIQUE'
        
        # Élevée
        if cvss >= 7:
            return 'ÉLEVÉE'
        if epss and epss >= 0.75:
            return 'ÉLEVÉE'
        
        # Moyenne
        if cvss >= 4:
            return 'MOYENNE'
        
        return ''
    
    df['alert_level'] = df.apply(create_alert_level, axis=1)  # type: ignore[call-overload]
    
    # Statistiques d'alertes
    print("=== Statistiques d'Alertes ===")
    alert_counts = df['alert_level'].value_counts()
    print(alert_counts)
    
    # Visualisation
    fig = px.pie(
        values=alert_counts.values,
        names=alert_counts.index,
        color=alert_counts.index,
        color_discrete_map={'CRITIQUE': '#d32f2f', 'ÉLEVÉE': '#f57c00', 'MOYENNE': '#fbc02d'},
        title='Distribution des Niveaux d\'Alerte'
    )
    fig.show()
    
    # Top alertes
    print(f"\n=== Top 5 Alertes Critiques ===")
    critical_alerts = df[df['alert_level'] == 'CRITIQUE'].sort_values('cvss_score', ascending=False).head(5)
    for idx, row in critical_alerts.iterrows():
        print(f"\n{row['cve_id']} - {row['produit']} ({row['vendor']})")
        print(f"  CVSS: {row['cvss_score']}, EPSS: {row['epss_score']}")
        print(f"  Bulletin: {row['titre_anssi']}")

## 14. Résumé et Conclusions

In [None]:
if df is not None:
    print("=== RÉSUMÉ DU PROJET ===")
    print(f"\n✓ Données consolidées: {len(df)} lignes")
    print(f"✓ CVE uniques: {df['cve_id'].nunique()}")
    print(f"✓ Bulletins uniques: {df['id_anssi'].nunique()}")
    print(f"✓ Vendors couverts: {df['vendor'].nunique()}")
    
    print(f"\n✓ Modèles développés:")
    print(f"  1. K-Means Clustering ({best_k} clusters)")
    print(f"  2. Classification Criticité (Accuracy: {accuracy:.2%})")
    print(f"  3. Régression EPSS (R²: {r2:.4f})")
    
    print(f"\n✓ Alertes générées:")
    for level, count in alert_counts.items():
        print(f"  - {level}: {count}")
    
    print(f"\n✓ Livrables créés:")
    print(f"  - data/processed/cves_consolidated.csv")
    print(f"  - output/alerts/")
    print(f"  - notebooks/analysis.ipynb")