# NIDS-ML: Sniffer Validation Pipeline

## Scopo
Questo notebook valida che lo sniffer funzioni correttamente testando i modelli trainati sui CSV CIC-IDS2017.

## Fasi
1. **Setup**: Clona repo, installa dipendenze
2. **Calibrazione**: Verifica feature alignment
3. **Evaluation CSV**: Testa modelli su tutti i giorni
4. **Confronto**: Ranking modelli per performance
5. **Test PCAP** (opzionale): Se disponibile

## Output
- Metriche per ogni modello/CSV
- Ranking modelli
- Report downloadabile

In [None]:
# ============================================================================
# CONFIGURAZIONE
# ============================================================================

# GitHub repo
GITHUB_REPO = "https://github.com/tuouser/NIDS-ML-SSR2.git"
GITHUB_BRANCH = "main"

# Quali modelli testare: 'all', 'xgboost', 'lightgbm', 'random_forest', 'best'
MODEL_TYPE = 'all'

# Task
TASK = 'binary'

# Sample size per CSV (None = tutto, numero = sample)
# Consigliato: None per test completi, 100000 per test veloci
SAMPLE_SIZE = None

# Test anche su PCAP se disponibile
TEST_PCAP = False

print("Configurazione:")
print(f"  Model type: {MODEL_TYPE}")
print(f"  Task: {TASK}")
print(f"  Sample: {SAMPLE_SIZE or 'tutto'}")

---
## 1. Setup Ambiente

In [None]:
import os
import sys
import json
from pathlib import Path
from datetime import datetime

# Rileva ambiente
if 'KAGGLE_KERNEL_RUN_TYPE' in os.environ:
    ENV = 'kaggle'
    PROJECT_ROOT = Path('/kaggle/working/NIDS-ML-SSR2')
elif 'COLAB_GPU' in os.environ:
    ENV = 'colab'
    PROJECT_ROOT = Path('/content/NIDS-ML-SSR2')
else:
    ENV = 'local'
    PROJECT_ROOT = Path.cwd()

print(f"Ambiente: {ENV}")
print(f"Project root: {PROJECT_ROOT}")

In [None]:
# Clone repo se necessario
if ENV in ['kaggle', 'colab'] and not PROJECT_ROOT.exists():
    !git clone --branch {GITHUB_BRANCH} {GITHUB_REPO} {PROJECT_ROOT}
    print(f"Repo clonato in {PROJECT_ROOT}")
else:
    print("Repo gi√† presente o ambiente locale")

In [None]:
# Setup path
os.chdir(PROJECT_ROOT)
sys.path.insert(0, str(PROJECT_ROOT))

# Installa dipendenze se necessario
if ENV in ['kaggle', 'colab']:
    !pip install -q scapy

print(f"Working directory: {os.getcwd()}")

In [None]:
# Link dataset Kaggle
if ENV == 'kaggle':
    DATA_RAW = PROJECT_ROOT / "data" / "raw"
    DATA_RAW.mkdir(parents=True, exist_ok=True)
    
    # Dataset CIC-IDS2017 CSV
    kaggle_csv = Path("/kaggle/input/cicids2017")
    if kaggle_csv.exists():
        for f in kaggle_csv.glob("*.csv"):
            dest = DATA_RAW / f.name
            if not dest.exists():
                os.symlink(f, dest)
        print(f"CSV linkati: {len(list(DATA_RAW.glob('*.csv')))}")
    
    # Dataset PCAP (se disponibile)
    kaggle_pcap = Path("/kaggle/input/cicids2017-pcap")
    if kaggle_pcap.exists():
        PCAP_DIR = kaggle_pcap
        print(f"PCAP disponibili: {len(list(PCAP_DIR.glob('*.pcap')))}")
    else:
        PCAP_DIR = None
        print("PCAP non disponibili")
else:
    DATA_RAW = PROJECT_ROOT / "data" / "raw"
    PCAP_DIR = PROJECT_ROOT / "data" / "pcap"

In [None]:
# Verifica CSV disponibili
csv_files = sorted(DATA_RAW.glob("*.csv"))
print(f"\nCSV disponibili ({len(csv_files)}):")
for f in csv_files:
    size_mb = f.stat().st_size / (1024**2)
    print(f"  {f.name}: {size_mb:.1f} MB")

In [None]:
# Verifica modelli disponibili
from src.model_versioning import list_model_versions

print("\nModelli disponibili:")
for mt in ['xgboost', 'lightgbm', 'random_forest']:
    versions = list_model_versions(model_type=mt, task=TASK)
    if versions:
        print(f"\n  {mt.upper()}:")
        for v in versions:
            print(f"    - {v['version_id']}")

# Best model
best_path = PROJECT_ROOT / "models" / "best_model" / f"model_{TASK}.pkl"
if best_path.exists():
    meta_path = best_path.parent / "metadata.json"
    if meta_path.exists():
        with open(meta_path) as f:
            meta = json.load(f)
        print(f"\n  BEST MODEL: {meta.get('best_model', 'N/A')}")

---
## 2. Calibrazione Feature

Verifica che le feature nel CSV siano quelle attese dal modello.

In [None]:
import pandas as pd
import numpy as np
from src.feature_engineering import load_artifacts

def analyze_csv_quick(csv_path, sample=5000):
    """
    Analisi rapida di un CSV.
    """
    df = pd.read_csv(csv_path, low_memory=False, nrows=sample*2)
    df.columns = df.columns.str.strip()  # Rimuovi spazi
    
    if len(df) > sample:
        df = df.sample(n=sample, random_state=42)
    
    # Label
    label_col = None
    for col in df.columns:
        if 'label' in col.lower():
            label_col = col
            break
    
    result = {
        'file': csv_path.name,
        'rows': len(df),
        'cols': len(df.columns),
    }
    
    if label_col:
        counts = df[label_col].value_counts()
        result['benign'] = counts.get('BENIGN', 0)
        result['attacks'] = len(df) - result['benign']
        result['attack_types'] = [l for l in counts.index if l != 'BENIGN']
    
    return result

# Analizza tutti i CSV
print("="*70)
print("ANALISI CSV")
print("="*70)

csv_info = []
for csv_path in csv_files:
    info = analyze_csv_quick(csv_path)
    csv_info.append(info)
    
    attacks = info.get('attacks', 0)
    attack_types = ', '.join(info.get('attack_types', [])[:3])
    print(f"\n{info['file'][:50]}")
    print(f"  Righe: {info['rows']:,} | Attacchi: {attacks:,}")
    if attack_types:
        print(f"  Tipi: {attack_types}")

In [None]:
# Verifica feature del modello
try:
    scaler, selected_features, _, scaler_columns = load_artifacts()
    print(f"\nFeature del modello: {len(selected_features)}")
    print(f"Colonne scaler: {len(scaler_columns)}")
    
    # Verifica presenza in un CSV
    test_csv = csv_files[0]
    df_test = pd.read_csv(test_csv, nrows=10)
    df_test.columns = df_test.columns.str.strip()
    
    missing = [f for f in selected_features if f not in df_test.columns]
    if missing:
        print(f"\n‚ö†Ô∏è  Feature mancanti: {missing[:5]}")
    else:
        print("\n‚úì Tutte le feature presenti nei CSV")
        
except Exception as e:
    print(f"\n‚ö†Ô∏è  Errore caricamento artifacts: {e}")

---
## 3. Evaluation su CSV

Testa i modelli su ogni CSV e calcola metriche reali (F1, Recall, FPR).

### Cosa significano le metriche?

| Metrica | Significato | Valore ideale |
|---------|-------------|---------------|
| **F1** | Media armonica di Precision e Recall | > 0.90 |
| **Recall** | % di attacchi rilevati | > 0.95 |
| **Precision** | % di alert che sono veri attacchi | > 0.90 |
| **FPR** | % di traffico benigno classificato come attacco | < 0.02 |

### Interpretazione per CSV

| CSV | Attacchi | F1 atteso | Note |
|-----|----------|-----------|------|
| Monday | 0 | 0.00 | CORRETTO! Nessun attacco, F1=0 √® giusto. Guarda FPR. |
| Tuesday | Brute Force | > 0.85 | Attacchi SSH/FTP |
| Wednesday | DoS | > 0.95 | Attacchi volumetrici, facili da rilevare |
| Thursday | Web Attack | > 0.80 | Pi√π difficili |
| Friday | DDoS, Botnet | > 0.95 | Attacchi volumetrici |

In [None]:
import joblib
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
import time

def test_model_on_csv(csv_path, model_path, scaler, selected_features, scaler_columns, 
                      task='binary', sample_size=None):
    """
    Testa un modello su un CSV e restituisce metriche.
    """
    # Carica modello
    model = joblib.load(model_path)
    
    # Carica CSV
    df = pd.read_csv(csv_path, low_memory=False)
    df.columns = df.columns.str.strip()
    
    original_size = len(df)
    
    if sample_size and len(df) > sample_size:
        df = df.sample(n=sample_size, random_state=42)
    
    # Label
    label_col = None
    for col in df.columns:
        if 'label' in col.lower():
            label_col = col
            break
    
    if not label_col:
        return {'error': 'Label column not found'}
    
    # Prepara y
    if task == 'binary':
        y_true = (df[label_col].str.strip().str.upper() != 'BENIGN').astype(int)
    else:
        y_true = df[label_col]
    
    # Prepara X
    for col in scaler_columns:
        if col not in df.columns:
            df[col] = 0
    
    X = df[scaler_columns].replace([np.inf, -np.inf], np.nan).fillna(0)
    
    # Trasforma
    X_scaled = pd.DataFrame(scaler.transform(X), columns=scaler_columns)
    X_selected = pd.DataFrame(
        X_scaled[selected_features].values,
        columns=list(selected_features)
    )
    
    # Predici
    start = time.time()
    y_pred = model.predict(X_selected)
    pred_time = time.time() - start
    
    # Metriche
    if task == 'binary':
        cm = confusion_matrix(y_true, y_pred)
        if cm.shape == (2, 2):
            tn, fp, fn, tp = cm.ravel()
        else:
            # Solo una classe
            if y_true.sum() == 0:  # Solo benign
                tn, fp, fn, tp = len(y_true) - (y_pred == 1).sum(), (y_pred == 1).sum(), 0, 0
            else:  # Solo attack
                tn, fp, fn, tp = 0, 0, (y_pred == 0).sum(), (y_pred == 1).sum()
        
        return {
            'csv': csv_path.name,
            'total_samples': original_size,
            'tested_samples': len(df),
            'attacks_in_data': int((y_true == 1).sum()),
            'benign_in_data': int((y_true == 0).sum()),
            'accuracy': float(accuracy_score(y_true, y_pred)),
            'precision': float(precision_score(y_true, y_pred, zero_division=0)),
            'recall': float(recall_score(y_true, y_pred, zero_division=0)),
            'f1': float(f1_score(y_true, y_pred, zero_division=0)),
            'fpr': float(fp / (fp + tn)) if (fp + tn) > 0 else 0,
            'fnr': float(fn / (fn + tp)) if (fn + tp) > 0 else 0,
            'tp': int(tp), 'fp': int(fp), 'tn': int(tn), 'fn': int(fn),
            'pred_time_sec': pred_time
        }
    else:
        return {
            'csv': csv_path.name,
            'accuracy': float(accuracy_score(y_true, y_pred)),
            'f1_weighted': float(f1_score(y_true, y_pred, average='weighted', zero_division=0))
        }

In [None]:
# Raccogli modelli da testare
models_to_test = []

if MODEL_TYPE == 'best':
    best_path = PROJECT_ROOT / "models" / "best_model" / f"model_{TASK}.pkl"
    if best_path.exists():
        models_to_test.append(('best_model', best_path))

elif MODEL_TYPE == 'all':
    for mt in ['xgboost', 'lightgbm', 'random_forest']:
        versions = list_model_versions(model_type=mt, task=TASK)
        for v in versions:
            name = f"{mt}/{v['version_id']}"
            models_to_test.append((name, v['model_path']))

else:
    versions = list_model_versions(model_type=MODEL_TYPE, task=TASK)
    for v in versions:
        name = f"{MODEL_TYPE}/{v['version_id']}"
        models_to_test.append((name, v['model_path']))

print(f"Modelli da testare: {len(models_to_test)}")
for name, path in models_to_test:
    print(f"  - {name}")

In [None]:
%%time
# Esegui test su tutti i CSV
all_results = {}  # {model_name: {csv_name: metrics}}

print("="*80)
print("EVALUATION SU CSV")
print("="*80)

for model_name, model_path in models_to_test:
    print(f"\n{'#'*60}")
    print(f"# MODELLO: {model_name}")
    print(f"{'#'*60}")
    
    model_results = {}
    
    for csv_path in csv_files:
        print(f"\n  Testing: {csv_path.name[:40]}...", end=" ")
        
        try:
            result = test_model_on_csv(
                csv_path, model_path, 
                scaler, selected_features, scaler_columns,
                task=TASK, sample_size=SAMPLE_SIZE
            )
            
            if 'error' in result:
                print(f"ERRORE: {result['error']}")
            else:
                f1 = result['f1']
                recall = result['recall']
                fpr = result['fpr']
                attacks = result['attacks_in_data']
                
                print(f"F1={f1:.4f} | Recall={recall:.4f} | FPR={fpr:.4f} | Attacks={attacks:,}")
                model_results[csv_path.name] = result
                
        except Exception as e:
            print(f"ERRORE: {e}")
    
    all_results[model_name] = model_results

print("\n" + "="*80)
print("Evaluation completata!")

---
## 4. Tabelle Riepilogative

In [None]:
# Tabella F1 per modello/CSV
print("\n" + "="*100)
print("TABELLA F1 SCORE")
print("="*100)

# Header
csv_names = [f.name[:25] for f in csv_files]
header = f"{'Modello':<35}"
for name in csv_names:
    header += f" | {name[:10]:^10}"
header += " | MEDIA"
print(header)
print("-" * len(header))

# Righe
model_averages = []

for model_name in all_results:
    row = f"{model_name:<35}"
    f1_scores = []
    
    for csv_path in csv_files:
        csv_name = csv_path.name
        if csv_name in all_results[model_name]:
            f1 = all_results[model_name][csv_name].get('f1', 0)
            f1_scores.append(f1)
            row += f" | {f1:^10.4f}"
        else:
            row += f" | {'N/A':^10}"
    
    # Media (escludi Monday se F1=0 per mancanza attacchi)
    valid_f1 = [f for f in f1_scores if f > 0]
    avg = np.mean(valid_f1) if valid_f1 else 0
    row += f" | {avg:.4f}"
    model_averages.append((model_name, avg))
    
    print(row)

print("-" * len(header))

In [None]:
# Tabella FPR (False Positive Rate)
print("\n" + "="*100)
print("TABELLA FPR (False Positive Rate) - pi√π basso √® meglio")
print("="*100)

# Header
header = f"{'Modello':<35}"
for name in csv_names:
    header += f" | {name[:10]:^10}"
header += " | MEDIA"
print(header)
print("-" * len(header))

for model_name in all_results:
    row = f"{model_name:<35}"
    fpr_scores = []
    
    for csv_path in csv_files:
        csv_name = csv_path.name
        if csv_name in all_results[model_name]:
            fpr = all_results[model_name][csv_name].get('fpr', 0)
            fpr_scores.append(fpr)
            # Colora se FPR alto
            fpr_str = f"{fpr:.4f}"
            if fpr > 0.05:
                fpr_str = f"*{fpr:.3f}*"  # Evidenzia
            row += f" | {fpr_str:^10}"
        else:
            row += f" | {'N/A':^10}"
    
    avg = np.mean(fpr_scores) if fpr_scores else 0
    row += f" | {avg:.4f}"
    print(row)

print("-" * len(header))
print("* = FPR > 5% (alto)")

In [None]:
# RANKING FINALE
print("\n" + "="*60)
print("RANKING MODELLI (per F1 medio)")
print("="*60)

model_averages.sort(key=lambda x: x[1], reverse=True)

for i, (name, avg) in enumerate(model_averages, 1):
    # Calcola anche FPR medio
    fpr_list = []
    for csv_name, metrics in all_results.get(name, {}).items():
        fpr_list.append(metrics.get('fpr', 0))
    avg_fpr = np.mean(fpr_list) if fpr_list else 0
    
    medal = "ü•á" if i == 1 else "ü•à" if i == 2 else "ü•â" if i == 3 else "  "
    print(f"{medal} #{i:2} {name:<40} F1={avg:.4f}  FPR={avg_fpr:.4f}")

In [None]:
# Dettaglio per CSV con pochi attacchi (es. Monday)
print("\n" + "="*60)
print("DETTAGLIO CSV CON POCHI/NESSUN ATTACCO")
print("="*60)
print("\nNOTA: F1=0 su Monday √® CORRETTO perch√© non ci sono attacchi.")
print("      Quello che conta √® il FPR (falsi positivi).")

for model_name in list(all_results.keys())[:3]:  # Top 3 modelli
    print(f"\n{model_name}:")
    for csv_name, metrics in all_results[model_name].items():
        if 'Monday' in csv_name or metrics.get('attacks_in_data', 0) == 0:
            fpr = metrics.get('fpr', 0)
            fp = metrics.get('fp', 0)
            tn = metrics.get('tn', 0)
            print(f"  {csv_name[:30]}: FPR={fpr:.4f} (FP={fp:,}, TN={tn:,})")

---
## 5. Salvataggio Risultati

In [None]:
# Salva risultati
reports_dir = PROJECT_ROOT / "reports"
reports_dir.mkdir(exist_ok=True)

results_data = {
    'timestamp': datetime.now().isoformat(),
    'task': TASK,
    'sample_size': SAMPLE_SIZE,
    'models_tested': len(models_to_test),
    'csv_tested': len(csv_files),
    'ranking': [{'model': name, 'f1_avg': avg} for name, avg in model_averages],
    'detailed_results': all_results
}

output_path = reports_dir / "sniffer_evaluation_results.json"
with open(output_path, 'w') as f:
    json.dump(results_data, f, indent=2, default=str)

print(f"Risultati salvati in: {output_path}")

---
## 6. Download Output

In [None]:
import zipfile

if ENV in ['kaggle', 'colab']:
    zip_path = PROJECT_ROOT / "sniffer_evaluation_output.zip"
    
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as z:
        # Reports
        for f in reports_dir.glob("*.json"):
            z.write(f, f"reports/{f.name}")
    
    print(f"ZIP creato: {zip_path.name} ({zip_path.stat().st_size/1024:.1f} KB)")
else:
    print("Ambiente locale - risultati nelle cartelle del progetto")

---
## 7. Riepilogo Finale

In [None]:
print("="*70)
print("RIEPILOGO SNIFFER EVALUATION")
print("="*70)

print(f"\nModelli testati: {len(models_to_test)}")
print(f"CSV testati: {len(csv_files)}")

if model_averages:
    best_model, best_f1 = model_averages[0]
    print(f"\nMiglior modello: {best_model}")
    print(f"  F1 medio: {best_f1:.4f}")

# Check risultati
print("\n" + "-"*40)
print("CHECKLIST VALIDAZIONE:")

# Monday FPR
monday_ok = True
for model_name, results in all_results.items():
    for csv_name, metrics in results.items():
        if 'Monday' in csv_name:
            if metrics.get('fpr', 1) > 0.02:
                monday_ok = False
                break

print(f"  [{'‚úì' if monday_ok else '‚úó'}] FPR su Monday < 2%")

# F1 su Friday
friday_ok = any(
    metrics.get('f1', 0) > 0.90
    for results in all_results.values()
    for csv_name, metrics in results.items()
    if 'Friday' in csv_name and 'DDos' in csv_name
)
print(f"  [{'‚úì' if friday_ok else '‚úó'}] F1 su Friday-DDos > 90%")

# Almeno un modello buono
good_model = best_f1 > 0.85 if model_averages else False
print(f"  [{'‚úì' if good_model else '‚úó'}] Almeno un modello con F1 medio > 85%")

print("\n" + "="*70)
if monday_ok and friday_ok and good_model:
    print("‚úì VALIDAZIONE PASSATA - I modelli sono pronti per l'uso!")
else:
    print("‚ö†Ô∏è  VALIDAZIONE PARZIALE - Controlla i risultati sopra")
print("="*70)