<a href="https://colab.research.google.com/github/1948023/AI_Risk_Tool/blob/main/CVE_AI.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pandas numpy requests



In [6]:
import gzip
import json
import os
from xml.etree import ElementTree as ET

# 🔧 CONFIG
CPE_FILE = "official-cpe-dictionary_v2.3.xml.gz"
CVE_FILES = [
    "nvdcve-2.0-2025.json.gz",
    "nvdcve-2.0-2024.json.gz",
    "nvdcve-2.0-2023.json.gz",
    "nvdcve-2.0-2022.json.gz"
]
KEYWORDS = ["vxworks", "qnx", "rtems", "integrity", "nucleus", "threadx", "micrium",
    "freeRTOS", "zephyr", "ti-rtos", "embos", "ucos", "satellite-toolkit",
    "cosmos", "coreflightexec", "coreflight", "flightsoftware", "gnuradio",
    "uhd", "hackrf", "bladeRF", "ettus", "openbts", "srsLTE", "srsRAN", "ccsds",
    "libccsds", "dvb-s", "dvb-s2", "modcod", "aes", "ecc", "spacewire", "ssh",
    "tls", "http", "snmp", "ntp", "ftp", "l3harris", "thales", "airbus", "boeing",
    "cobham", "raytheon", "ball", "northrop", "sierra", "ohb", "maxar", "viasat",
    "spacex", "blueorigin", "rocketlab", "openssh", "openssl", "apache", "nginx",
    "postgres", "mysql", "ubuntu", "debian", "windows_server", "grafana",
    "kibana", "elasticsearch", "zabbix", "nagios", "docker", "kubernetes",
    "satellite-control", "groundstation", "egse", "telemetry", "tm-tc"]

OUTPUT_FILE = "output_cve.txt"

def load_json_gz(filename):
    try:
        with gzip.open(filename, 'rt', encoding='utf-8') as f:
            return json.load(f)
    except Exception as e:
        print(f"❌ Errore nel caricamento {filename}: {e}")
        return None

def load_cpe_dictionary(filename):
    cpes = set()
    try:
        with gzip.open(filename, 'rb') as f:
            tree = ET.parse(f)
            root = tree.getroot()
            for item in root.findall('{http://cpe.mitre.org/dictionary/2.0}cpe-item'):
                name = item.get('name')
                if name and any(keyword.lower() in name.lower() for keyword in KEYWORDS):
                    cpes.add(name)
    except Exception as e:
        print(f"❌ Errore nel parsing CPE: {e}")
    return cpes

def extract_cpes_from_nodes(nodes):
    cpes = set()
    for node in nodes:
        matches = node.get("cpe_match", [])
        for match in matches:
            if match.get("vulnerable", False):
                cpe_uri = match.get("cpe23Uri", "")
                if cpe_uri:
                    cpes.add(cpe_uri)
        # Ricorsione per child nodes
        children = node.get("children", [])
        if children:
            cpes.update(extract_cpes_from_nodes(children))
    return cpes

def find_matching_cves(cve_data, cpe_set):
    found_cves = []
    if not cve_data:
        return found_cves

    for item in cve_data.get("CVE_Items", []):
        cve_id = item.get("cve", {}).get("CVE_data_meta", {}).get("ID", "")
        nodes = item.get("configurations", {}).get("nodes", [])
        cpes_found = extract_cpes_from_nodes(nodes)

        for cpe_uri in cpes_found:
            if cpe_uri in cpe_set:
                # CVSS v3.1 > v3.0 > v2
                score = "-"
                metrics = item.get("impact", {})
                for key in ["baseMetricV3", "baseMetricV2"]:
                    if key in metrics:
                        score = metrics[key].get("cvssV3", {}).get("baseScore") or metrics[key].get("cvssV2", {}).get("baseScore") or "-"
                        break
                found_cves.append((cve_id, cpe_uri, score))
    return found_cves

def main():
    print("📦 Caricamento dizionario CPE...")
    cpe_set = load_cpe_dictionary(CPE_FILE)
    print(f"✅ Trovati {len(cpe_set)} CPE rilevanti.")

    all_cves = []
    for cve_file in CVE_FILES:
        print(f"\n🔍 Analisi file CVE: {cve_file}")
        cve_data = load_json_gz(cve_file)
        cves = find_matching_cves(cve_data, cpe_set)
        print(f"✅ Trovate {len(cves)} CVE nel file {cve_file}")
        all_cves.extend(cves)

    with open(OUTPUT_FILE, "w", encoding='utf-8') as f:
        for cve_id, cpe_uri, score in all_cves:
            f.write(f"{cve_id} | {cpe_uri} | CVSS: {score}\n")

    print(f"\n📁 Output salvato in: {OUTPUT_FILE}")

if __name__ == "__main__":
    main()


📦 Caricamento CPE...
✅ Trovati 59930 CPE rilevanti.

🔍 Analisi file CVE: nvdcve-2.0-2025.json.gz
✅ Trovate 0 CVE nel file nvdcve-2.0-2025.json.gz

🔍 Analisi file CVE: nvdcve-2.0-2024.json.gz
✅ Trovate 0 CVE nel file nvdcve-2.0-2024.json.gz

🔍 Analisi file CVE: nvdcve-2.0-2023.json.gz
✅ Trovate 0 CVE nel file nvdcve-2.0-2023.json.gz

🔍 Analisi file CVE: nvdcve-2.0-2022.json.gz
✅ Trovate 0 CVE nel file nvdcve-2.0-2022.json.gz

📁 Output salvato in: output_cve.txt


In [None]:
import pandas as pd
import requests
import os
import io
import re
import json
from google.colab import files
from datetime import datetime

# 🔧 Config
EPSS_CSV_URL = "https://epss.cyentia.com/epss_scores-current.csv.gz"
LOCAL_EPSS_FILE = "/content/epss_scores-current.csv.gz"
INPUT_FILE = "/content/output_cve.txt"
OUTPUT_FILE = "/content/cve_with_epss.csv"
INTERMEDIATE_DIR = "/content/intermediate_files"

def get_current_time():
    """Restituisce la data e ora corrente in formato UTC"""
    return datetime.utcnow().strftime("%Y-%m-%d %H:%M:%S")

def ensure_directory(directory):
    """Crea una directory se non esiste"""
    if not os.path.exists(directory):
        os.makedirs(directory)
        print(f"[{get_current_time()}] 📁 Creata directory: {directory}")

def download_epss_file():
    """Scarica il database EPSS"""
    print(f"[{get_current_time()}] ⬇️ Scaricando EPSS database...")
    r = requests.get(EPSS_CSV_URL)
    if r.status_code == 200:
        ensure_directory(INTERMEDIATE_DIR)
        with open(LOCAL_EPSS_FILE, "wb") as f:
            f.write(r.content)
        print(f"[{get_current_time()}] ✅ EPSS database scaricato.")

        # Salva una copia nel directory intermedio
        intermediate_epss = os.path.join(INTERMEDIATE_DIR, "epss_database.csv.gz")
        with open(intermediate_epss, "wb") as f:
            f.write(r.content)
    else:
        raise Exception(f"Errore download EPSS database: {r.status_code}")

def load_cve_from_file():
    """Carica e analizza il file output_cve.txt"""
    try:
        if not os.path.exists(INPUT_FILE):
            print(f"[{get_current_time()}] ⚠️ File output_cve.txt non trovato.")
            print("Carica manualmente il file output_cve.txt...")
            uploaded = files.upload()
            if not uploaded:
                raise Exception("Nessun file caricato!")
            with open(INPUT_FILE, 'wb') as f:
                f.write(next(iter(uploaded.values())))
    except Exception as e:
        print(f"[{get_current_time()}] ❌ Errore durante il caricamento del file: {str(e)}")
        raise

    print(f"[{get_current_time()}] 📖 Leggendo CVE dal file...")
    cve_ids = set()

    with open(INPUT_FILE, 'r', encoding='utf-8') as f:
        content = f.read()

    cve_pattern = r'CVE-\d{4}-\d+|CVE-\d+-\d+'
    matches = re.findall(cve_pattern, content, re.IGNORECASE)
    cve_ids = {cve.upper() for cve in matches}

    # Salva i CVE trovati in un file intermedio
    ensure_directory(INTERMEDIATE_DIR)
    intermediate_cve = os.path.join(INTERMEDIATE_DIR, "extracted_cves.txt")
    with open(intermediate_cve, 'w', encoding='utf-8') as f:
        for cve in sorted(cve_ids):
            f.write(f"{cve}\n")

    print(f"[{get_current_time()}] 🔍 Trovati {len(cve_ids)} CVE unici.")
    return list(cve_ids)

def clean_epss_data(df):
    """Pulisce e formatta i dati EPSS in modo più robusto"""
    try:
        # Resetta l'indice per ottenere i CVE come colonna
        if df.index.name is None and any('CVE-' in str(idx) for idx in df.index):
            df = df.reset_index()

        # Identifica la colonna CVE
        cve_col = None
        for col in df.columns:
            if any('CVE-' in str(val) for val in df[col].head()):
                cve_col = col
                break

        if cve_col is None:
            raise ValueError("Nessuna colonna CVE trovata")

        # Identifica la colonna EPSS
        epss_col = None
        for col in df.columns:
            try:
                # Prova a convertire i valori in float
                test_vals = pd.to_numeric(df[col].str.split().str[0], errors='coerce')
                if test_vals.notna().any() and test_vals.max() <= 1.0:
                    epss_col = col
                    break
            except:
                continue

        if epss_col is None:
            raise ValueError("Nessuna colonna EPSS valida trovata")

        # Crea nuovo DataFrame pulito
        clean_df = pd.DataFrame()
        clean_df['cve'] = df[cve_col].astype(str).str.upper()
        clean_df['epss'] = pd.to_numeric(df[epss_col].str.split().str[0], errors='coerce')

        # Pulizia finale
        clean_df = clean_df[clean_df['cve'].str.contains('CVE-', na=False)]
        clean_df = clean_df.dropna(subset=['epss'])

        # Rimuovi eventuali duplicati
        clean_df = clean_df.drop_duplicates(subset=['cve'])

        return clean_df

    except Exception as e:
        print(f"\n❌ Errore durante la pulizia dei dati: {str(e)}")
        print("\nStruttura del DataFrame originale:")
        print(df.head())
        print("\nColonne disponibili:", df.columns.tolist())
        print("\nTipi di dati:")
        print(df.dtypes)
        raise

def match_epss(cve_ids):
    """Matcha i CVE con i loro EPSS scores"""
    print(f"[{get_current_time()}] 📊 Caricamento database EPSS...")

    try:
        # Carica il database EPSS
        epss_df = pd.read_csv(LOCAL_EPSS_FILE, compression='gzip', low_memory=False)
        print("\nDataFrame originale:")
        print(epss_df.head())
        print("\nTipi di dati:")
        print(epss_df.dtypes)

        # Pulisci i dati
        epss_df = clean_epss_data(epss_df)

        print("\nDataFrame dopo pulizia:")
        print(epss_df.head())

        # Match e tracking delle statistiche
        matched = []
        found = 0
        not_found = 0
        not_found_examples = []

        for cve_id in cve_ids:
            epss_score = None
            # Cerca il CVE nel database
            matching_rows = epss_df[epss_df['cve'] == cve_id]
            if not matching_rows.empty:
                epss_score = matching_rows['epss'].iloc[0]
                found += 1
            else:
                not_found += 1
                if len(not_found_examples) < 5:
                    not_found_examples.append(cve_id)
            matched.append((cve_id, epss_score))

        # Stampa statistiche
        print(f"\n[{get_current_time()}] 📈 Statistiche matching:")
        print(f"   ✅ CVE trovati: {found}")
        print(f"   ❌ CVE non trovati: {not_found}")
        if not_found_examples:
            print(f"   📝 Esempi di CVE non trovati: {', '.join(not_found_examples)}")

        # Salva debug info
        ensure_directory(INTERMEDIATE_DIR)
        debug_info = {
            'timestamp': get_current_time(),
            'total_epss_entries': len(epss_df),
            'matched_stats': {
                'found': found,
                'not_found': not_found,
                'not_found_examples': not_found_examples
            }
        }

        with open(os.path.join(INTERMEDIATE_DIR, 'matching_debug_info.json'), 'w') as f:
            json.dump(debug_info, f, indent=2)

        return matched

    except Exception as e:
        print(f"Errore durante il matching: {str(e)}")
        raise

def save_output(matched_list):
    """Salva i risultati"""
    # Crea DataFrame
    df = pd.DataFrame(matched_list, columns=["CVE_ID", "EPSS_Score"])

    # Aggiungi metadati
    metadata = {
        "Data_Generazione": get_current_time(),
        "Totale_CVE": len(matched_list),
        "CVE_Con_EPSS": len(df[df["EPSS_Score"].notna()]),
        "CVE_Senza_EPSS": len(df[df["EPSS_Score"].isna()])
    }

    # Salva CSV principale
    df.to_csv(OUTPUT_FILE, index=False)

    # Salva report
    report_file = "/content/cve_epss_report.txt"
    with open(report_file, "w", encoding="utf-8") as f:
        f.write("=== Report Analisi CVE-EPSS ===\n\n")
        for key, value in metadata.items():
            f.write(f"{key}: {value}\n")

        # Top 10 CVE per EPSS score
        if len(df[df["EPSS_Score"].notna()]) > 0:
            f.write("\nTop 10 CVE per EPSS Score:\n")
            top_10 = df.nlargest(10, "EPSS_Score")
            for _, row in top_10.iterrows():
                f.write(f"{row['CVE_ID']}: {row['EPSS_Score']:.4f}\n")

    # Salva copie nella directory intermedia
    ensure_directory(INTERMEDIATE_DIR)
    df.to_csv(os.path.join(INTERMEDIATE_DIR, "final_results.csv"), index=False)
    with open(os.path.join(INTERMEDIATE_DIR, "final_report.txt"), "w", encoding="utf-8") as f:
        with open(report_file, "r", encoding="utf-8") as source:
            f.write(source.read())

    print(f"[{get_current_time()}] ✅ File salvati:")
    print(f"   📊 CSV: {OUTPUT_FILE}")
    print(f"   📝 Report: {report_file}")
    print(f"   📁 File intermedi salvati in: {INTERMEDIATE_DIR}")

    # Download automatico
    #files.download(OUTPUT_FILE)
    #files.download(report_file)

def main():
    print(f"[{get_current_time()}] 🚀 Avvio analisi CVE-EPSS")

    try:
        # Crea directory per i file intermedi
        ensure_directory(INTERMEDIATE_DIR)

        # Download EPSS database
        # download_epss_file()

        # Carica CVE
        cve_ids = load_cve_from_file()

        if not cve_ids:
            print(f"[{get_current_time()}] ⚠️ Nessun CVE trovato nel file!")
            return

        # Match con EPSS
        matched = match_epss(cve_ids)

        # Salva risultati
        save_output(matched)

        print(f"[{get_current_time()}] ✨ Analisi completata con successo!")
        print(f"[{get_current_time()}] 📁 Tutti i file intermedi sono salvati in: {INTERMEDIATE_DIR}")

    except Exception as e:
        print(f"[{get_current_time()}] ❌ Errore durante l'esecuzione: {str(e)}")

if __name__ == "__main__":
    main()

[2025-04-28 14:42:05] 🚀 Avvio analisi CVE-EPSS
[2025-04-28 14:42:05] 📖 Leggendo CVE dal file...
[2025-04-28 14:42:05] 🔍 Trovati 5404 CVE unici.
[2025-04-28 14:42:05] 📊 Caricamento database EPSS...

DataFrame originale:
              #model_version:v2025.03.14 score_date:2025-04-28T12:55:00Z
cve                                 epss                      percentile
CVE-1999-0001                    0.01297                         0.78576
CVE-1999-0002                    0.16835                         0.94542
CVE-1999-0003                    0.90483                         0.99563
CVE-1999-0004                    0.04164                         0.88017

Tipi di dati:
#model_version:v2025.03.14         object
score_date:2025-04-28T12:55:00Z    object
dtype: object

DataFrame dopo pulizia:
             cve     epss
1  CVE-1999-0001  0.01297
2  CVE-1999-0002  0.16835
3  CVE-1999-0003  0.90483
4  CVE-1999-0004  0.04164
5  CVE-1999-0005  0.17478

[2025-04-28 14:44:33] 📈 Statistiche matching:
  

In [None]:
import pandas as pd
import numpy as np
import json
import os
from datetime import datetime
from google.colab import files
import re
from typing import Dict, List, Optional, Tuple

class CVEImpactAnalyzer:
    def __init__(self):
        # Configurazione
        self.BASE_DIR = "/content/impact_analysis"
        self.DATA_DIR = f"{self.BASE_DIR}/data"
        self.OUTPUT_DIR = f"{self.BASE_DIR}/output"

        # File di input
        self.CVE_FILE = "/content/output_cve.txt"
        self.EPSS_FILE = "/content/cve_with_epss.csv"

        # Pesi per il calcolo dell'impact score
        self.weights = {
            'epss_score': 0.6,           # EPSS ha un peso maggiore
            'time_factor': 0.2,          # Fattore temporale
            'prevalence_factor': 0.2     # Fattore di prevalenza
        }

        # Timestamp corrente
        self.current_time = datetime.utcnow()

        # Crea le directory necessarie
        self._create_directories()

    def _create_directories(self):
        """Crea le directory necessarie per l'analisi"""
        for directory in [self.BASE_DIR, self.DATA_DIR, self.OUTPUT_DIR]:
            if not os.path.exists(directory):
                os.makedirs(directory)
                print(f"Created directory: {directory}")

    def load_data(self) -> Tuple[List[str], pd.DataFrame]:
        """Carica i dati dai file esistenti"""
        # Carica lista CVE
        with open(self.CVE_FILE, 'r') as f:
            content = f.read()
            cve_list = re.findall(r'CVE-\d{4}-\d+', content)
            cve_list = list(set(cve_list))  # Rimuovi duplicati

        # Carica dati EPSS
        epss_df = pd.read_csv(self.EPSS_FILE)

        print(f"Loaded {len(cve_list)} unique CVEs")
        print(f"Loaded EPSS data with shape {epss_df.shape}")

        return cve_list, epss_df

    def calculate_time_factor(self, cve_id: str) -> float:
        """Calcola il fattore temporale basato sull'anno del CVE"""
        try:
            year = int(cve_id.split('-')[1])
            current_year = self.current_time.year

            # Calcola il decadimento temporale
            age = current_year - year
            time_factor = np.exp(-age / 5)  # Decadimento esponenziale su 5 anni

            return time_factor
        except:
            return 0.5  # Valore di default se c'è un errore

    def calculate_prevalence_factor(self, cve_id: str, all_cves: List[str]) -> float:
        """Calcola un fattore di prevalenza basato sull'anno del CVE"""
        try:
            year = cve_id.split('-')[1]
            same_year_cves = len([cve for cve in all_cves if year in cve])
            total_cves = len(all_cves)

            # Normalizza il fattore di prevalenza
            prevalence = same_year_cves / total_cves if total_cves > 0 else 0
            return 1 - prevalence  # Inverti il fattore (più raro = più importante)
        except:
            return 0.5

    def calculate_impact_scores(self, cve_list: List[str], epss_df: pd.DataFrame) -> pd.DataFrame:
        """Calcola gli impact score per tutti i CVE"""
        results = []

        for cve_id in cve_list:
            # Trova EPSS score
            epss_score = epss_df[epss_df['CVE_ID'] == cve_id]['EPSS_Score'].iloc[0] if len(epss_df[epss_df['CVE_ID'] == cve_id]) > 0 else None

            # Calcola altri fattori
            time_factor = self.calculate_time_factor(cve_id)
            prevalence_factor = self.calculate_prevalence_factor(cve_id, cve_list)

            # Calcola impact score
            if epss_score is not None:
                total_score = (
                    self.weights['epss_score'] * epss_score +
                    self.weights['time_factor'] * time_factor +
                    self.weights['prevalence_factor'] * prevalence_factor
                )
            else:
                total_score = None

            # Prepara il risultato
            result = {
                'cve_id': cve_id,
                'epss_score': epss_score,
                'time_factor': time_factor,
                'prevalence_factor': prevalence_factor,
                'total_impact_score': total_score,
                'year': int(cve_id.split('-')[1]),
                'has_epss': epss_score is not None
            }

            results.append(result)

        return pd.DataFrame(results)

    def analyze_and_save(self) -> pd.DataFrame:
        """Esegue l'analisi completa e salva i risultati"""
        # Carica i dati
        cve_list, epss_df = self.load_data()

        # Calcola gli impact score
        results_df = self.calculate_impact_scores(cve_list, epss_df)

        # Aggiungi statistiche per anno
        results_df['year_avg_impact'] = results_df.groupby('year')['total_impact_score'].transform('mean')
        results_df['year_count'] = results_df.groupby('year')['cve_id'].transform('count')

        # Salva i risultati
        timestamp = self.current_time.strftime('%Y%m%d_%H%M%S')

        # Salva il dataset completo
        output_file = f"{self.OUTPUT_DIR}/impact_analysis_{timestamp}.csv"
        results_df.to_csv(output_file, index=False)

        # Crea e salva il report
        report_file = f"{self.OUTPUT_DIR}/analysis_report_{timestamp}.txt"
        self.create_report(results_df, report_file)

        # Prepara il dataset per machine learning
        ml_ready_df = self.prepare_ml_dataset(results_df)
        ml_file = f"{self.OUTPUT_DIR}/ml_ready_dataset.csv"
        ml_ready_df.to_csv(ml_file, index=False)

        return results_df

    def create_report(self, df: pd.DataFrame, output_file: str):
        """Crea un report dettagliato dell'analisi"""
        with open(output_file, 'w') as f:
            f.write(f"CVE Impact Analysis Report\n")
            f.write(f"Generated on: {self.current_time.isoformat()}\n\n")

            f.write("General Statistics:\n")
            f.write(f"Total CVEs analyzed: {len(df)}\n")
            f.write(f"CVEs with EPSS scores: {df['has_epss'].sum()}\n")
            f.write(f"Average impact score: {df['total_impact_score'].mean():.4f}\n\n")

            f.write("Impact Score Distribution:\n")
            f.write(df['total_impact_score'].describe().to_string())
            f.write("\n\n")

            f.write("Top 10 Highest Impact CVEs:\n")
            top_10 = df.nlargest(10, 'total_impact_score')
            for _, row in top_10.iterrows():
                f.write(f"{row['cve_id']}: {row['total_impact_score']:.4f}\n")

            f.write("\nYearly Statistics:\n")
            yearly_stats = df.groupby('year').agg({
                'total_impact_score': ['count', 'mean', 'std'],
                'epss_score': 'mean'
            }).round(4)
            f.write(yearly_stats.to_string())

    def prepare_ml_dataset(self, df: pd.DataFrame) -> pd.DataFrame:
        """Prepara il dataset per machine learning"""
        ml_df = df.copy()

        # Aggiungi feature derivate
        ml_df['year_normalized'] = (ml_df['year'] - ml_df['year'].min()) / (ml_df['year'].max() - ml_df['year'].min())
        ml_df['has_epss_binary'] = ml_df['has_epss'].astype(int)
        ml_df['impact_score_binned'] = pd.qcut(ml_df['total_impact_score'].fillna(0), q=5, labels=['VL', 'L', 'M', 'H', 'VH'])

        # Calcola statistiche rolling per anno
        ml_df = ml_df.sort_values('year')
        ml_df['rolling_avg_impact'] = ml_df.groupby('year')['total_impact_score'].transform(
            lambda x: x.rolling(window=3, min_periods=1).mean()
        )

        return ml_df

def main():
    # Inizializza l'analizzatore
    analyzer = CVEImpactAnalyzer()

    # Esegui l'analisi
    print("Starting CVE impact analysis...")
    results_df = analyzer.analyze_and_save()

    # Stampa alcune statistiche
    print("\nAnalysis completed!")
    print(f"Total CVEs analyzed: {len(results_df)}")
    print(f"CVEs with impact scores: {results_df['total_impact_score'].notna().sum()}")
    print("\nImpact Score Statistics:")
    print(results_df['total_impact_score'].describe())

    # Download automatico dei file
    #files.download(f"{analyzer.OUTPUT_DIR}/impact_analysis_{analyzer.current_time.strftime('%Y%m%d_%H%M%S')}.csv")
    #files.download(f"{analyzer.OUTPUT_DIR}/analysis_report_{analyzer.current_time.strftime('%Y%m%d_%H%M%S')}.txt")
    #files.download(f"{analyzer.OUTPUT_DIR}/ml_ready_dataset_{analyzer.current_time.strftime('%Y%m%d_%H%M%S')}.csv")

if __name__ == "__main__":
    main()

Starting CVE impact analysis...
Loaded 5404 unique CVEs
Loaded EPSS data with shape (5404, 2)

Analysis completed!
Total CVEs analyzed: 5404
CVEs with impact scores: 5404

Impact Score Statistics:
count    5404.000000
mean        0.298035
std         0.094373
min         0.235514
25%         0.238942
50%         0.269679
75%         0.314736
max         0.947940
Name: total_impact_score, dtype: float64


In [None]:
!pip install pandas numpy scikit-learn seaborn matplotlib joblib



In [None]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.linear_model import ElasticNet
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime
import joblib
import os
from google.colab import files

class ThreatRiskRegressor:
    def __init__(self):
        # Configurazione
        self.BASE_DIR = "/content/risk_assessment"
        self.MODELS_DIR = f"{self.BASE_DIR}/models"
        self.RESULTS_DIR = f"{self.BASE_DIR}/results"
        self.DATA_DIR = f"{self.BASE_DIR}/data"

        # File di input precedenti
        self.CVE_EPSS_FILE = "/content/cve_with_epss.csv"
        self.IMPACT_FILE = "/content/impact_analysis/output/ml_ready_dataset.csv"

        # Configurazione delle colonne
        self.numeric_features = ['asset_value', 'affected_users', 'business_impact', 'existing_controls']
        self.categorical_features = ['attack_vector', 'authentication_required']

        # Inizializzazione
        self.model = None
        self.preprocessor = None
        self._create_directories()

    def _create_directories(self):
        """Crea le directory necessarie"""
        for directory in [self.BASE_DIR, self.MODELS_DIR, self.RESULTS_DIR, self.DATA_DIR]:
            if not os.path.exists(directory):
                os.makedirs(directory)
                print(f"Created directory: {directory}")

    def create_sample_input(self):
        """Crea un file di input di esempio"""
        sample_data = {
            'threat_name': ['DataBreach2025'],
            'asset_value': [8],
            'affected_users': [5000],
            'business_impact': [9],
            'attack_vector': ['network'],
            'authentication_required': [False],
            'existing_controls': [2]
        }

        df = pd.DataFrame(sample_data)
        sample_file = f"{self.DATA_DIR}/sample_threat_input.csv"
        df.to_csv(sample_file, index=False)
        print(f"Sample input file created: {sample_file}")
        return sample_file

    def prepare_historical_data(self):
        """Prepara i dati storici dai file precedenti"""
        # Carica i dati EPSS e Impact
        epss_df = pd.read_csv(self.CVE_EPSS_FILE)
        impact_df = pd.read_csv(self.IMPACT_FILE)

        # Unisci i dataset
        historical_data = pd.merge(epss_df, impact_df, left_on='CVE_ID', right_on='cve_id', how='inner')

        # Crea feature sintetiche per il training
        historical_data['asset_value'] = np.random.randint(1, 11, size=len(historical_data))
        historical_data['affected_users'] = np.random.randint(100, 10000, size=len(historical_data))
        historical_data['business_impact'] = historical_data['total_impact_score'] * 10
        historical_data['attack_vector'] = np.random.choice(['network', 'local', 'physical', 'adjacent'], size=len(historical_data))
        historical_data['authentication_required'] = np.random.choice([True, False], size=len(historical_data))
        historical_data['existing_controls'] = np.random.randint(0, 5, size=len(historical_data))

        return historical_data

    def create_preprocessor(self):
        """Crea il preprocessor per le feature"""
        numeric_transformer = StandardScaler()
        categorical_transformer = OneHotEncoder(drop='first', sparse_output=False)

        self.preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, self.numeric_features),
                ('cat', categorical_transformer, self.categorical_features)
            ])

        return self.preprocessor

    def train_model(self):
        """Addestra il modello usando i dati storici"""
        try:
            # Prepara i dati
            historical_data = self.prepare_historical_data()

            # Prepara X e y
            X = historical_data[self.numeric_features + self.categorical_features]
            y = historical_data['total_impact_score']

            # Crea pipeline
            self.model = Pipeline([
                ('preprocessor', self.create_preprocessor()),
                ('regressor', ElasticNet(random_state=42))
            ])

            # Parametri per GridSearchCV
            param_grid = {
                'regressor__alpha': [0.0001, 0.001, 0.01, 0.1, 1.0],
                'regressor__l1_ratio': [0.1, 0.5, 0.9],
            }

            # GridSearchCV
            grid_search = GridSearchCV(
                self.model,
                param_grid,
                cv=5,
                scoring='neg_mean_squared_error',
                n_jobs=-1
            )

            # Fit
            grid_search.fit(X, y)
            self.model = grid_search.best_estimator_

            return grid_search.best_params_

        except Exception as e:
            print(f"Errore durante il training del modello: {str(e)}")
            print("\nDati di debug:")
            print("Shape of X:", X.shape if 'X' in locals() else "X not created")
            print("Shape of y:", y.shape if 'y' in locals() else "y not created")
            print("Columns in historical_data:", historical_data.columns if 'historical_data' in locals() else "historical_data not created")
            raise

    def predict_risk(self, input_file: str) -> pd.DataFrame:
        """Predici il risk score per nuovi dati"""
        if self.model is None:
            raise ValueError("Model not trained!")

        try:
            # Carica input
            input_df = pd.read_csv(input_file)

            # Verifica colonne
            required_columns = self.numeric_features + self.categorical_features + ['threat_name']
            missing_columns = set(required_columns) - set(input_df.columns)
            if missing_columns:
                raise ValueError(f"Missing columns in input file: {missing_columns}")

            # Predici
            X = input_df[self.numeric_features + self.categorical_features]
            risk_scores = self.model.predict(X)

            # Prepara risultati
            results = input_df[['threat_name']].copy()
            results['risk_score'] = risk_scores
            results['risk_level'] = pd.qcut(risk_scores, q=5, labels=['Very Low', 'Low', 'Medium', 'High', 'Very High'])

            # Aggiungi dettagli addizionali
            results['timestamp'] = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
            results['input_features'] = X.apply(lambda row: dict(row), axis=1)

            return results

        except Exception as e:
            print(f"Errore durante la predizione: {str(e)}")
            print("\nDati di debug:")
            print("Input file columns:", input_df.columns if 'input_df' in locals() else "input_df not loaded")
            print("Required columns:", required_columns)
            raise

def main():
    try:
        # Inizializza
        risk_regressor = ThreatRiskRegressor()

        # Crea file di esempio se necessario
        sample_file = risk_regressor.create_sample_input()
        print("\nEsempio di file di input creato. Struttura:")
        print(pd.read_csv(sample_file).to_string())

        # Addestra il modello
        print("\nAddestrando il modello...")
        best_params = risk_regressor.train_model()
        print(f"Best parameters: {best_params}")

        # Carica il file di input custom
        print("\nPer favore, carica il tuo file CSV di input (o usa il file di esempio)...")
        try:
            uploaded = files.upload()
            input_file = next(iter(uploaded))
        except:
            print("Usando il file di esempio...")
            input_file = sample_file

        # Predici il rischio
        results = risk_regressor.predict_risk(input_file)

        # Stampa e salva risultati
        print("\nRisultati dell'analisi del rischio:")
        print(results.to_string())

        # Salva risultati
        timestamp = datetime.utcnow().strftime('%Y%m%d_%H%M%S')
        output_file = f"{risk_regressor.RESULTS_DIR}/risk_assessment_{timestamp}.csv"
        results.to_csv(output_file, index=False)
        print(f"\nRisultati salvati in: {output_file}")

    except Exception as e:
        print(f"Errore nell'esecuzione del programma: {str(e)}")

if __name__ == "__main__":
    main()

Sample input file created: /content/risk_assessment/data/sample_threat_input.csv

Esempio di file di input creato. Struttura:
      threat_name  asset_value  affected_users  business_impact attack_vector  authentication_required  existing_controls
0  DataBreach2025            8            5000                9       network                    False                  2

Addestrando il modello...
Best parameters: {'regressor__alpha': 0.0001, 'regressor__l1_ratio': 0.1}

Per favore, carica il tuo file CSV di input (o usa il file di esempio)...


Saving custom_threat_input.csv to custom_threat_input (3).csv

Risultati dell'analisi del rischio:
        threat_name  risk_score risk_level            timestamp                                                                                                                                           input_features
0    DataBreach2025    0.899882       High  2025-04-28 14:45:09   {'asset_value': 8, 'affected_users': 5000, 'business_impact': 9, 'existing_controls': 2, 'attack_vector': 'network', 'authentication_required': False}
1  RansomwareAttack    0.999862  Very High  2025-04-28 14:45:09  {'asset_value': 10, 'affected_users': 3000, 'business_impact': 10, 'existing_controls': 3, 'attack_vector': 'network', 'authentication_required': True}
2  PhishingCampaign    0.699921   Very Low  2025-04-28 14:45:09   {'asset_value': 6, 'affected_users': 1000, 'business_impact': 7, 'existing_controls': 4, 'attack_vector': 'network', 'authentication_required': False}
3    InternalThreat    0.799902  