# Model Evaluation Notebook

- Date: 2025-10-09
- Purpose: Evaluate trained models (base / sectional), inspect feature names, feature importances, permutation importance, and SHAP (if installed).

> Modify the CONFIG cell to switch approach/paths.


In [None]:
# === CONFIG ===
APPROACH = "base"  # "base" or "sectional"

# Derived paths
DATA_DIR   = f"data/processed/{APPROACH}"
MODEL_DIR  = f"models/{APPROACH}/latest"

# Optional caps for permutation importance and SHAP sampling
PI_N_REPEATS   = 5
PI_MAX_SAMPLES = 20000  # larger -> slower
SHAP_MAX_SAMPLES = 2000  # if shap is available

print("APPROACH =", APPROACH)
print("DATA_DIR =", DATA_DIR)
print("MODEL_DIR =", MODEL_DIR)


In [None]:
import json
import numpy as np
import pandas as pd
from pathlib import Path
from scipy import sparse
from scipy.sparse import load_npz
import joblib

from sklearn.metrics import roc_auc_score, average_precision_score, log_loss, accuracy_score, matthews_corrcoef
from sklearn.inspection import permutation_importance

DATA_DIR = Path(DATA_DIR)
MODEL_DIR = Path(MODEL_DIR)

# Load X / y
if (DATA_DIR / "X.npz").exists():
    X = load_npz(DATA_DIR / "X.npz")
elif (DATA_DIR / "X_dense.npz").exists():
    arr = np.load(DATA_DIR / "X_dense.npz")["X"]
    X = sparse.csr_matrix(arr)
else:
    raise FileNotFoundError("X(.npz) not found in " + str(DATA_DIR))

y = pd.read_csv(DATA_DIR / "y.csv")["is_top2"].to_numpy().astype(int)

# Load pipeline and model
pipe_path = MODEL_DIR / "feature_pipeline.pkl"
pipe_obj = joblib.load(pipe_path)
ct = pipe_obj["column_transformer"] if isinstance(pipe_obj, dict) and "column_transformer" in pipe_obj else pipe_obj

def find_model(md: Path):
    for name in ["model.pkl","model.joblib","model.bin"]:
        p = md / name
        if p.exists():
            return p
    raise FileNotFoundError("model file not found in " + str(md))

model = joblib.load(find_model(MODEL_DIR))

X.shape, y.shape, type(ct), type(model)


In [None]:
from sklearn.compose import ColumnTransformer

def get_feature_names_from_ct(ct) -> list[str]:
    names = []
    try:
        for name, trans, cols in ct.transformers_:
            if name == "remainder" and trans == "drop":
                continue
            if hasattr(trans, "named_steps"):
                last = None
                for _, step in trans.named_steps.items():
                    last = step
                if hasattr(last, "get_feature_names_out"):
                    feats = last.get_feature_names_out(cols)
                    names.extend([str(x) for x in feats])
                    continue
            if hasattr(trans, "get_feature_names_out"):
                feats = trans.get_feature_names_out(cols)
                names.extend([str(x) for x in feats])
            else:
                names.extend([str(c) for c in cols])
        try:
            names2 = ct.get_feature_names_out()
            if len(names2) == len(names):
                names = [str(x) for x in names2]
        except Exception:
            pass
    except Exception as e:
        print("WARN: feature name recovery failed:", e)
        names = [f"feat_{i}" for i in range(X.shape[1])]
    return names

feature_names = get_feature_names_from_ct(ct)
len(feature_names), feature_names[:10]


In [None]:
import numpy as np

def safe_predict_proba(model, X):
    if hasattr(model, "predict_proba"):
        return model.predict_proba(X)[:, 1]
    elif hasattr(model, "decision_function"):
        z = model.decision_function(X)
        return 1.0 / (1.0 + np.exp(-z))
    else:
        yp = model.predict(X)
        return np.clip(yp.astype(float), 0.0, 1.0)

y_prob = safe_predict_proba(model, X)
y_pred = (y_prob >= 0.5).astype(int)

metrics = {
    "auc": float(roc_auc_score(y, y_prob)),
    "pr_auc": float(average_precision_score(y, y_prob)),
    "logloss": float(log_loss(y, y_prob, labels=[0,1])),
    "accuracy": float(accuracy_score(y, y_pred)),
    "mcc": float(matthews_corrcoef(y, y_pred)),
    "n_rows": int(X.shape[0]),
    "n_features": int(X.shape[1]),
}
metrics


## Native Feature Importances (if available)

In [None]:
import pandas as pd

if hasattr(model, "feature_importances_"):
    fi = pd.DataFrame({"feature": feature_names, "importance": model.feature_importances_})
    fi = fi.sort_values("importance", ascending=False).reset_index(drop=True)
    fi.head(20)
else:
    print("model has no feature_importances_")


In [None]:
import matplotlib.pyplot as plt

if hasattr(model, "feature_importances_"):
    topk = 30
    top = fi.head(topk).iloc[::-1]
    plt.figure(figsize=(8, max(4, topk*0.3)))
    plt.barh(top["feature"], top["importance"])
    plt.title(f"Top {topk} Feature Importances")
    plt.tight_layout()
    plt.show()


## Permutation Importance (model-agnostic)

In [None]:
from sklearn.inspection import permutation_importance
from scipy import sparse
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

rng = np.random.RandomState(42)
n = X.shape[0]
max_n = max(1000, min(PI_MAX_SAMPLES, n))
idx = rng.choice(n, size=max_n, replace=False)
Xs = X[idx] if sparse.issparse(X) else X[idx, :]
ys = y[idx]

pi = permutation_importance(model, Xs, ys, scoring="neg_log_loss",
                            n_repeats=PI_N_REPEATS, random_state=42, n_jobs=-1)
pi_df = pd.DataFrame({"feature": feature_names, "pi_mean": pi.importances_mean, "pi_std": pi.importances_std})
pi_df = pi_df.sort_values("pi_mean", ascending=False).reset_index(drop=True)
display(pi_df.head(20))

topk = 30
top = pi_df.head(topk).iloc[::-1]
plt.figure(figsize=(8, max(4, topk*0.3)))
plt.barh(top["feature"], top["pi_mean"])
plt.title(f"Top {topk} Permutation Importance (neg_log_loss)")
plt.tight_layout()
plt.show()


## SHAP (if installed)

In [None]:
try:
    import shap
    import numpy as np

    k = min(SHAP_MAX_SAMPLES, X.shape[0])
    rng = np.random.RandomState(0)
    idx = rng.choice(X.shape[0], size=k, replace=False)
    Xk = X[idx]

    if hasattr(model, "predict_proba"):
        model_fn = lambda data: model.predict_proba(data)[:, 1]
    else:
        model_fn = lambda data: model.decision_function(data)

    try:
        explainer = shap.Explainer(model, feature_names=feature_names)
        sh = explainer(Xk)
        shap_values = sh.values
        shap.plots.bar(shap_values, max_display=30, show=True)
        shap.plots.beeswarm(shap_values, max_display=30, show=True)
    except Exception as e:
        print("Auto Explainer failed:", e)
        print("Falling back to KernelExplainer (slow) ...")
        explainer = shap.KernelExplainer(model_fn, Xk[:200])
        shap_values = explainer.shap_values(Xk, nsamples=100)
        shap.summary_plot(shap_values, feature_names=feature_names, max_display=30, show=True)
except Exception as e:
    print("SHAP not available or failed:", e)
