# Week 05 — Model Evaluation & Comparison

This notebook compares all models from Weeks 03–04 on the NSL-KDD dataset test split. It focuses on:

- Metric table (Precision, Recall, F1, ROC-AUC, PR-AUC)
- Family-wise deep-dive (especially **R2L** and **U2R**)
- Robustness checks (thresholds & seeds)
- Calibration of supervised models
- Runtime / complexity
- Final ranked shortlist with trade-offs


In [None]:
# Imports, project root, and paths.
import warnings
warnings.filterwarnings(
    "ignore",
    message=".*covariance matrix associated to your dataset is not full rank.*"
)

import os, sys, time
from pathlib import Path

this_dir = Path.cwd()
project_root = this_dir.parent if this_dir.name == "notebooks" else this_dir
sys.path.append(str(project_root))

import numpy as np
import pandas as pd

import matplotlib.pyplot as plt
import seaborn as sns

from collections import Counter, defaultdict

# Sklearn metrics & helpers
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    precision_recall_fscore_support,
    roc_auc_score,
    average_precision_score,
    RocCurveDisplay,
    PrecisionRecallDisplay,
    brier_score_loss,
)
from sklearn.calibration import CalibrationDisplay

from sklearn.model_selection import StratifiedKFold

# Project utilities (follow the same pattern as previous weeks)
from src.utils import Paths, set_global_seed

# Optional: custom helpers if they exist in your repo.
try:
    from src.eval import to_binary  # used in Week 04
except ImportError:
    to_binary = None

set_global_seed(42)
paths = Paths().ensure()

print("Project root:", project_root)
print("Using paths:", paths)


In [None]:
# Load processed train/test arrays from Week 01.
X_train_path = paths.data_proc / "X_train.npy"
X_test_path  = paths.data_proc / "X_test.npy"
y_train_path = paths.data_proc / "y_train.npy"
y_test_path  = paths.data_proc / "y_test.npy"

X_train = np.load(X_train_path)
X_test  = np.load(X_test_path)
y_train = np.load(y_train_path)
y_test  = np.load(y_test_path)

print("X_train:", X_train.shape, "X_test:", X_test.shape)
print("y_train classes:", Counter(y_train))
print("y_test  classes:", Counter(y_test))

# Derive a binary label vector (normal vs attack) if a helper exists.
# This is mainly for unsupervised anomaly detectors.
if to_binary is not None:
    # Adjust `normal_label` to match your encoding, e.g. 'normal' or 'Normal'.
    normal_label = "normal"
    y_train_bin = to_binary(y_train, normal_label)
    y_test_bin  = to_binary(y_test, normal_label)
    print("Binary train counts:", Counter(y_train_bin))
    print("Binary test  counts:", Counter(y_test_bin))
else:
    y_train_bin = None
    y_test_bin = None
    print("NOTE: src.eval.to_binary not found – binary labels not created automatically.")


In [None]:
# Helper functions for unified metric computation and plotting.

def multiclass_metrics(y_true, y_pred, y_proba=None, labels=None, average="macro"):
    """Compute macro metrics + per-class metrics for a multiclass classifier.
    
    Parameters
    ----------
    y_true, y_pred : array-like
        True and predicted labels (families).
    y_proba : array-like of shape (n_samples, n_classes), optional
        Predicted probabilities for each class (for ROC/PR).
    labels : list, optional
        Label order. If None, inferred from y_true.
    average : str
        Averaging scheme for global metrics.
    """
    if labels is None:
        labels = np.unique(y_true)
    
    prec, rec, f1, support = precision_recall_fscore_support(
        y_true, y_pred, labels=labels, average=None, zero_division=0
    )
    macro_prec, macro_rec, macro_f1, _ = precision_recall_fscore_support(
        y_true, y_pred, average=average, zero_division=0
    )
    
    per_class_df = pd.DataFrame({
        "family": labels,
        "precision": prec,
        "recall": rec,
        "f1": f1,
        "support": support,
    })
    
    metrics = {
        "precision_macro": macro_prec,
        "recall_macro": macro_rec,
        "f1_macro": macro_f1,
    }
    
    # Multi-class ROC-AUC (OvR) and PR-AUC if probabilities are available.
    if y_proba is not None:
        # Binarize labels for OvR using a simple label -> index mapping.
        label_to_idx = {lab: i for i, lab in enumerate(labels)}
        y_true_idx = np.array([label_to_idx[lab] for lab in y_true])
        Y_true_ovr = np.eye(len(labels))[y_true_idx]
        
        try:
            roc_macro_ovr = roc_auc_score(Y_true_ovr, y_proba, average="macro", multi_class="ovr")
        except Exception:
            roc_macro_ovr = np.nan
        
        try:
            pr_macro_ovr = average_precision_score(Y_true_ovr, y_proba, average="macro")
        except Exception:
            pr_macro_ovr = np.nan
        
        metrics["roc_auc_ovr_macro"] = roc_macro_ovr
        metrics["pr_auc_ovr_macro"] = pr_macro_ovr
    else:
        metrics["roc_auc_ovr_macro"] = np.nan
        metrics["pr_auc_ovr_macro"] = np.nan
    
    return metrics, per_class_df


def binary_threshold_metrics(y_true_bin, scores, thresholds):
    """Sweep thresholds over anomaly scores for binary detectors.
    
    Assumes higher `scores` = more anomalous (1 = attack, 0 = normal).
    Returns a DataFrame with precision/recall/F1 for each threshold.
    """
    rows = []
    for thr in thresholds:
        y_pred_bin = (scores >= thr).astype(int)
        prec, rec, f1, _ = precision_recall_fscore_support(
            y_true_bin, y_pred_bin, average="binary", zero_division=0
        )
        rows.append({
            "threshold": thr,
            "precision": prec,
            "recall": rec,
            "f1": f1,
        })
    return pd.DataFrame(rows)


def plot_confusion(cm, labels, title):
    fig, ax = plt.subplots(figsize=(6, 5))
    sns.heatmap(cm, annot=True, fmt="d", cmap="YlGnBu",
                xticklabels=labels, yticklabels=labels, ax=ax)
    ax.set_xlabel("Predicted")
    ax.set_ylabel("Actual")
    ax.set_title(title)
    plt.tight_layout()
    return fig, ax


In [None]:
# Supervised models from Week 04.
# 
# Strategy:
# - Prefer factory functions from src.models (same as Week 04).
# - If not available, fall back to simple sklearn baselines.
#
# NOTE: If you already saved trained models in `paths.models`, you can
#       skip training here and just load them with joblib instead.

from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression

try:
    from src.models import (
        make_rf_classifier_grid,
        make_svm_rbf_grid,
    )
    HAVE_FACTORY = True
except ImportError:
    HAVE_FACTORY = False

supervised_models = {}
fit_times = {}
predict_times = {}
probas_cache = {}
preds_cache = {}

labels = np.unique(y_train)


if HAVE_FACTORY:
    from sklearn.model_selection import GridSearchCV
    
    print("Using src.models factory functions + GridSearchCV (similar to Week 04).")
    
    # Random Forest
    rf_spec = make_rf_classifier_grid()
    rf_gs = GridSearchCV(
        estimator=rf_spec.model,
        param_grid=rf_spec.param_grid,
        scoring="f1_macro",
        n_jobs=-1,
        cv=3,
        verbose=1,
    )
    t0 = time.perf_counter()
    rf_gs.fit(X_train, y_train)
    fit_times["RandomForest (GS)"] = time.perf_counter() - t0
    
    t0 = time.perf_counter()
    y_pred_rf = rf_gs.predict(X_test)
    predict_times["RandomForest (GS)"] = time.perf_counter() - t0
    y_proba_rf = rf_gs.predict_proba(X_test)
    
    supervised_models["RandomForest (GS)"] = rf_gs.best_estimator_
    preds_cache["RandomForest (GS)"] = y_pred_rf
    probas_cache["RandomForest (GS)"] = y_proba_rf
    
    # SVM (RBF)
    svm_spec = make_svm_rbf_grid()
    svm_gs = GridSearchCV(
        estimator=svm_spec.model,
        param_grid=svm_spec.param_grid,
        scoring="f1_macro",
        n_jobs=-1,
        cv=3,
        verbose=1,
    )
    t0 = time.perf_counter()
    svm_gs.fit(X_train, y_train)
    fit_times["SVM RBF (GS)"] = time.perf_counter() - t0
    
    t0 = time.perf_counter()
    y_pred_svm = svm_gs.predict(X_test)
    predict_times["SVM RBF (GS)"] = time.perf_counter() - t0
    
    # Some SVMs may be configured without probability; enable if needed.
    if hasattr(svm_gs.best_estimator_, "predict_proba"):
        y_proba_svm = svm_gs.best_estimator_.predict_proba(X_test)
    else:
        y_proba_svm = None
    
    supervised_models["SVM RBF (GS)"] = svm_gs.best_estimator_
    preds_cache["SVM RBF (GS)"] = y_pred_svm
    probas_cache["SVM RBF (GS)"] = y_proba_svm

else:
    print("src.models factories not found – using simple sklearn baselines.")
    
    # Random Forest baseline
    rf = RandomForestClassifier(
        n_estimators=200,
        max_depth=None,
        n_jobs=-1,
        random_state=42,
    )
    t0 = time.perf_counter()
    rf.fit(X_train, y_train)
    fit_times["RandomForest"] = time.perf_counter() - t0
    
    t0 = time.perf_counter()
    y_pred_rf = rf.predict(X_test)
    predict_times["RandomForest"] = time.perf_counter() - t0
    y_proba_rf = rf.predict_proba(X_test)
    
    supervised_models["RandomForest"] = rf
    preds_cache["RandomForest"] = y_pred_rf
    probas_cache["RandomForest"] = y_proba_rf
    
    # SVM RBF baseline
    svm = SVC(
        kernel="rbf",
        C=3.0,
        gamma="scale",
        probability=True,
        random_state=42,
    )
    t0 = time.perf_counter()
    svm.fit(X_train, y_train)
    fit_times["SVM RBF"] = time.perf_counter() - t0
    
    t0 = time.perf_counter()
    y_pred_svm = svm.predict(X_test)
    predict_times["SVM RBF"] = time.perf_counter() - t0
    y_proba_svm = svm.predict_proba(X_test)
    
    supervised_models["SVM RBF"] = svm
    preds_cache["SVM RBF"] = y_pred_svm
    probas_cache["SVM RBF"] = y_proba_svm
    
    # Multinomial Logistic Regression baseline
    lr = LogisticRegression(
        multi_class="multinomial",
        max_iter=500,
        n_jobs=-1,
        random_state=42,
    )
    t0 = time.perf_counter()
    lr.fit(X_train, y_train)
    fit_times["Logistic Regression"] = time.perf_counter() - t0
    
    t0 = time.perf_counter()
    y_pred_lr = lr.predict(X_test)
    predict_times["Logistic Regression"] = time.perf_counter() - t0
    y_proba_lr = lr.predict_proba(X_test)
    
    supervised_models["Logistic Regression"] = lr
    preds_cache["Logistic Regression"] = y_pred_lr
    probas_cache["Logistic Regression"] = y_proba_lr

print("Supervised models:", list(supervised_models.keys()))


In [None]:
# Unsupervised anomaly detectors from Week 03.
#
# We assume Week 03 defined helpers such as:
#   - run_zscore(df_bin, ...)
#   - run_elliptic_envelope(df_bin, ...)
#   - run_mahalanobis(df_bin, alpha=0.99, robust=True)
#
# that return a result object with attributes:
#   - name
#   - y_true (binary)
#   - y_pred (binary)
#   - scores (float anomaly scores, higher = more anomalous)
#   - confusion (2x2 confusion matrix)
#
# If your interface is different, adjust this cell accordingly.

unsup_results = {}

try:
    from src.unsupervised import run_zscore, run_elliptic_envelope, run_mahalanobis
    HAVE_UNSUP = True
except ImportError:
    HAVE_UNSUP = False

if HAVE_UNSUP and y_test_bin is not None:
    # Rebuild a binary DataFrame similar to Week 03 if needed.
    # Here we only have the processed X/y; adapt if Week 03 used raw features.
    df_test_bin = pd.DataFrame(X_test)
    df_test_bin["label_bin"] = y_test_bin
    
    # Z-score
    z_res = run_zscore(df_test_bin, contamination=0.1)
    unsup_results["Z-score"] = z_res
    
    # Elliptic Envelope
    ell_res = run_elliptic_envelope(df_test_bin, contamination=0.1, random_state=0)
    unsup_results["EllipticEnvelope"] = ell_res
    
    # Mahalanobis (robust covariance + chi-square threshold)
    mahal_res = run_mahalanobis(df_test_bin, alpha=0.99, robust=True)
    unsup_results["Mahalanobis (robust, alpha=0.99)"] = mahal_res
    
    print("Unsupervised results:", list(unsup_results.keys()))
else:
    print("Unsupervised helpers not available or no binary labels – skipping Week 03 models.")


In [None]:
# Global metric table: Precision, Recall, F1 (macro), ROC-AUC (OvR), PR-AUC.
rows = []

# Supervised models (multiclass)
labels = np.unique(y_test)

for name, model in supervised_models.items():
    y_pred = preds_cache[name]
    y_proba = probas_cache.get(name, None)
    metrics, _ = multiclass_metrics(y_test, y_pred, y_proba=y_proba, labels=labels)
    rows.append({
        "model": name,
        "type": "supervised",
        **metrics,
    })

# Unsupervised models (binary); treat `1` = anomaly/attack.
if unsup_results and y_test_bin is not None:
    for name, res in unsup_results.items():
        y_true_b = res.y_true if hasattr(res, "y_true") else y_test_bin
        y_pred_b = res.y_pred if hasattr(res, "y_pred") else None
        scores   = res.scores if hasattr(res, "scores") else None
        
        if y_pred_b is None and scores is not None:
            # Default threshold at 0 (or median score) if only scores exist.
            thr = np.median(scores)
            y_pred_b = (scores >= thr).astype(int)
        
        prec, rec, f1, _ = precision_recall_fscore_support(
            y_true_b, y_pred_b, average="binary", zero_division=0
        )
        
        # ROC-AUC / PR-AUC if scores exist.
        if scores is not None:
            try:
                roc_auc = roc_auc_score(y_true_b, scores)
            except Exception:
                roc_auc = np.nan
            try:
                pr_auc = average_precision_score(y_true_b, scores)
            except Exception:
                pr_auc = np.nan
        else:
            roc_auc = np.nan
            pr_auc = np.nan
        
        rows.append({
            "model": name,
            "type": "unsupervised",
            "precision_macro": prec,
            "recall_macro": rec,
            "f1_macro": f1,
            "roc_auc_ovr_macro": roc_auc,
            "pr_auc_ovr_macro": pr_auc,
        })

metric_table = pd.DataFrame(rows).sort_values("f1_macro", ascending=False).reset_index(drop=True)
metric_table


In [None]:
# Family-wise deep-dive for the best supervised model.
#
# Use the top model by macro F1 (among supervised models)
# and inspect per-family performance and confusion matrix.

sup_only = metric_table[metric_table["type"] == "supervised"]
best_name = sup_only.sort_values("f1_macro", ascending=False)["model"].iloc[0]
print("Best supervised model by macro F1:", best_name)

best_pred = preds_cache[best_name]
best_proba = probas_cache.get(best_name, None)

labels = np.unique(y_test)
metrics_best, per_class_best = multiclass_metrics(
    y_test, best_pred, y_proba=best_proba, labels=labels
)
display(per_class_best.sort_values("f1"))

cm_best = confusion_matrix(y_test, best_pred, labels=labels)
fig, ax = plot_confusion(cm_best, labels, f"Confusion matrix – {best_name}")
plt.show()


In [None]:
# R2L and U2R error analysis with example rows.
#
# This assumes your `y_test` uses family labels such as 'R2L' and 'U2R'.
# Adjust the family names below if your encoding is different.

r2l_label = "R2L"
u2r_label = "U2R"

mask_r2l = (y_test == r2l_label)
mask_u2r = (y_test == u2r_label)

print("R2L support:", mask_r2l.sum())
print("U2R support:", mask_u2r.sum())

# Indices where R2L / U2R were misclassified by the best model.
mis_r2l_idx = np.where(mask_r2l & (best_pred != r2l_label))[0]
mis_u2r_idx = np.where(mask_u2r & (best_pred != u2r_label))[0]

print("Misclassified R2L:", len(mis_r2l_idx))
print("Misclassified U2R:", len(mis_u2r_idx))

# Reconstruct a feature DataFrame using the preprocessor if available,
# otherwise fall back to a numeric DataFrame.
try:
    import joblib
    pre_path = paths.data_proc / "preprocessor.joblib"
    pre = joblib.load(pre_path)
    feature_names = pre.get_feature_names_out()
    X_test_df = pd.DataFrame(X_test, columns=feature_names)
except Exception as e:
    print("Could not load preprocessor or feature names:", e)
    X_test_df = pd.DataFrame(X_test)
    X_test_df["family"] = y_test

# Show a few misclassified examples for each rare family.
def show_examples(indices, family_name, n=5):
    if len(indices) == 0:
        print(f"No misclassifications found for {family_name}.")
        return
    idx_sel = indices[:n]
    ex = X_test_df.iloc[idx_sel].copy()
    ex["y_true"] = y_test[idx_sel]
    ex["y_pred"] = best_pred[idx_sel]
    display(ex)

print("\nExample misclassified R2L samples:")
show_examples(mis_r2l_idx, "R2L")

print("\nExample misclassified U2R samples:")
show_examples(mis_u2r_idx, "U2R")


In [None]:
# Robustness: sensitivity to threshold for unsupervised models.
#
# For each unsupervised detector with a `scores` attribute, sweep
# thresholds and plot F1 vs threshold.

if unsup_results and y_test_bin is not None:
    thresholds = np.linspace(0.1, 0.9, 9)  # generic range; adjust to your score scale
    
    for name, res in unsup_results.items():
        if not hasattr(res, "scores"):
            print(f"[{name}] no scores attribute – skipping threshold sweep.")
            continue
        
        scores = res.scores
        # Normalize scores to [0, 1] for a more stable threshold range, if needed.
        s_min, s_max = np.min(scores), np.max(scores)
        if s_max > s_min:
            scores_norm = (scores - s_min) / (s_max - s_min)
        else:
            scores_norm = scores
        
        df_thr = binary_threshold_metrics(res.y_true, scores_norm, thresholds)
        display(df_thr)
        
        fig, ax = plt.subplots()
        ax.plot(df_thr["threshold"], df_thr["f1"], marker="o")
        ax.set_xlabel("Threshold (on normalized score)")
        ax.set_ylabel("F1 (binary)")
        ax.set_title(f"Threshold sensitivity – {name}")
        plt.tight_layout()
        plt.show()
else:
    print("No unsupervised results (or binary labels) available – skipping threshold robustness.")


In [None]:
# Robustness: stability across random seeds for supervised models.
#
# We will re-train each supervised model on a subset of seeds and track
# the variation in macro F1 on a validation fold (not the test set).

seed_results = []

# Simple single-fold split (train/val) using StratifiedKFold.
skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)
train_idx, val_idx = next(iter(skf.split(X_train, y_train)))
X_tr, X_val = X_train[train_idx], X_train[val_idx]
y_tr, y_val = y_train[train_idx], y_train[val_idx]

seed_list = [1, 2, 3, 4, 5]

for base_name, model in supervised_models.items():
    for seed in seed_list:
        # Rebuild a fresh instance with the new seed when possible.
        if hasattr(model, "get_params"):
            params = model.get_params()
            if "random_state" in params:
                params["random_state"] = seed
            cls = model.__class__
            model_seed = cls(**params)
        else:
            # Fallback: re-use the fitted model (not ideal, but avoids errors).
            model_seed = model
        
        model_seed.fit(X_tr, y_tr)
        y_val_pred = model_seed.predict(X_val)
        f1_macro, _, _, _ = precision_recall_fscore_support(
            y_val, y_val_pred, average="macro", zero_division=0
        )
        
        seed_results.append({
            "model": base_name,
            "seed": seed,
            "f1_macro_val": f1_macro,
        })

seed_df = pd.DataFrame(seed_results)
seed_df


In [None]:
# Calibration: reliability curves for supervised probabilities.
#
# We will plot reliability curves for the top-k supervised models
# that have `predict_proba`. For multi-class, we focus on the
# "attack vs normal" binary view if `to_binary` exists; otherwise,
# we plot per-class curves for the most frequent family.

top_k = 2

sup_sorted = sup_only.sort_values("f1_macro", ascending=False)["model"].tolist()
top_models = [m for m in sup_sorted if hasattr(supervised_models[m], "predict_proba")][:top_k]

if not top_models:
    print("No probabilistic supervised models available for calibration.")
else:
    for name in top_models:
        model = supervised_models[name]
        y_proba = probas_cache[name]
        
        if to_binary is not None and y_test_bin is not None and y_proba is not None:
            # Collapse to binary attack vs normal by summing columns.
            # Assume column order matches labels; locate normal column.
            labels = model.classes_
            normal_label = "normal"
            if normal_label in labels:
                normal_idx = list(labels).index(normal_label)
                p_normal = y_proba[:, normal_idx]
                p_attack = 1.0 - p_normal
                prob_pos = p_attack
                y_true_bin = y_test_bin
                
                fig, ax = plt.subplots()
                CalibrationDisplay.from_predictions(
                    y_true_bin, prob_pos, n_bins=10, name=name, ax=ax
                )
                ax.set_title(f"Reliability curve (attack vs normal) – {name}")
                plt.tight_layout()
                plt.show()
            else:
                print(f"{name}: normal label not found in classes – skipping binary calibration.")
        else:
            # Fallback: pick the most common class and plot a one-vs-rest curve.
            if y_proba is None:
                print(f"{name}: no predict_proba – skipping calibration.")
                continue
            
            labels = supervised_models[name].classes_
            _, counts = np.unique(y_test, return_counts=True)
            majority_idx = int(np.argmax(counts))
            majority_label = labels[majority_idx]
            prob_pos = y_proba[:, majority_idx]
            y_true_bin = (y_test == majority_label).astype(int)
            
            fig, ax = plt.subplots()
            CalibrationDisplay.from_predictions(
                y_true_bin, prob_pos, n_bins=10, name=name, ax=ax
            )
            ax.set_title(f"Reliability curve (one-vs-rest: {majority_label}) – {name}")
            plt.tight_layout()
            plt.show()


In [None]:
# Runtime / complexity summary.
#
# We reported fit and predict times earlier; here we assemble them into
# a compact table. For a rough memory estimate, we approximate model size
# via pickle byte size.

import pickle

model_sizes = {}
for name, model in supervised_models.items():
    try:
        model_bytes = pickle.dumps(model)
        model_sizes[name] = len(model_bytes)
    except Exception:
        model_sizes[name] = np.nan

runtime_rows = []
for name in supervised_models.keys():
    runtime_rows.append({
        "model": name,
        "fit_time_sec": fit_times.get(name, np.nan),
        "predict_time_sec": predict_times.get(name, np.nan),
        "model_bytes": model_sizes.get(name, np.nan),
    })

runtime_table = pd.DataFrame(runtime_rows).sort_values("fit_time_sec")
runtime_table


## Final summary and ranked shortlist

Use the `metric_table` and `runtime_table` to summarize trade-offs:

- **Detection quality**: macro F1, per-family F1 (especially R2L and U2R)
- **Robustness**: stability across seeds and threshold sensitivity
- **Calibration**: how well probabilities match observed frequencies
- **Complexity**: training time, prediction time, and model size

In the cell below, we create a compact summary table and a ranked shortlist.
Feel free to add narrative comments for your report.


In [None]:
# Merge metric and runtime information into one summary table.
summary = metric_table.merge(
    runtime_table,
    how="left",
    left_on="model",
    right_on="model",
)

summary_sorted = summary.sort_values(["type", "f1_macro"], ascending=[True, False])
summary_sorted


### Ranked shortlist (example structure)

You can base your written discussion on something like:

1. **[Model A]** — best overall macro F1 and strong R2L/U2R detection; moderate training time.
2. **[Model B]** — slightly lower F1 but much faster and smaller; well-calibrated probabilities.
3. **[Model C]** — strong on majority classes but weak on rare families; useful as a baseline.

Replace with the actual models and observations from your `summary_sorted` table and plots.
