In [158]:
# setup
import os, sys, json, itertools, time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
import shap

from scipy.stats import randint
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV, KFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    roc_auc_score, roc_curve, confusion_matrix, accuracy_score, recall_score, precision_score, f1_score, matthews_corrcoef, precision_recall_curve, average_precision_score
)
from sklearn.utils import check_random_state


# project root
sys.path.append("/Users/gilanorup/Desktop/Studium/MSc/MA/code/masters_thesis_gn/src")

from config.constants import GIT_DIRECTORY
from config.feature_sets import get_linguistic_features, get_acoustic_features
from regression.model_evaluation_helpers import confidence_intervals, bootstrap_index_generator, load_demographics, load_task_dataframe, get_model_feature_list, build_feature_sets

# config
scores_csv = os.path.join(GIT_DIRECTORY, "data", "language_scores_all_subjects.csv")
scores = pd.read_csv(scores_csv)
demographics_csv = os.path.join(GIT_DIRECTORY, "data", "demographics_data.csv")
demographics = load_demographics(demographics_csv)

task = "picnicScene"
target = "PhonemicFluencyScore"
model = "full"
demographic_variables = ["Age","Gender","Education_level","Country","Socioeconomic"]

random_state = 42
n_boot = 1000
ci = 0.95
baseline_threshold = 0.5
default_parameters = dict(n_estimators=500, random_state=random_state, n_jobs=-1, class_weight="balanced")

out_dir = os.path.join(GIT_DIRECTORY, "results", "classification")
os.makedirs(out_dir, exist_ok=True)

import seaborn as sns
import matplotlib

base_models = ["demographics", "acoustic", "linguistic", "acoustic+linguistic", "full"]
palette = sns.color_palette("Set2", n_colors=len(base_models))
colors = {
    m: matplotlib.colors.to_hex(c)
    for m, c in zip(base_models, palette[::-1])
}

# plot style
plt.style.use("default")
matplotlib.rcParams['font.family'] = 'Arial'
plt.rcParams.update({
    "figure.facecolor": "white",
    "axes.facecolor": "white",
    "savefig.facecolor": "white",
    "axes.edgecolor": "black",
    "axes.grid": False,
})
plt.rcParams.update({"savefig.dpi": 600, "savefig.bbox": "tight"})


In [159]:
# calculate norms

# compute norms for scores (M, SD & z-scores) -> define low (z-score <-1), middle (z-score -1;1), high performers (z-score >1)
def build_population_norms(scores_df, score_cols):
    rows = []
    for score in score_cols:
        s = scores_df[score].dropna().astype(float)
        n = int(s.shape[0])
        if n == 0:
            continue

        mean = float(s.mean())
        std  = float(s.std(ddof=1))

        z = (s - mean) / std
        low_mask = z < -1
        low_n = int(low_mask.sum()) # count how many people are in low category

        rows.append({
            "score": score,
            "mean": mean,
            "std": std,
            "low_threshold": mean - std,
            "high_threshold": mean + std,
            "n": n,
            "low_n": low_n,
            "not_low_n": n - low_n,
            "low_prop": (low_n / n),
            "not_low_prop": 1 - (low_n / n)
        })

    return pd.DataFrame(rows).sort_values("score").reset_index(drop=True)

# score_cols = ["PictureNamingScore", "SemanticFluencyScore", "PhonemicFluencyScore"]
# norms_df = build_population_norms(scores, score_cols)
# norms_df.to_csv(os.path.join(GIT_DIRECTORY, "results", "classification", "population_norms.csv"), index=False)
# print(norms_df)

# load precomputed population norms
def load_population_norms(path: str) -> dict:
    norms_df = pd.read_csv(path)
    norms = {}
    for _, r in norms_df.iterrows():
        norms[str(r["score"])] = {
            "mean": float(r["mean"]),
            "std": float(r["std"]),
            "low_threshold": float(r["low_threshold"]),
            "high_threshold": float(r["high_threshold"]),
            "n": int(r["n"]),
        }
    return norms

norms_path = os.path.join(GIT_DIRECTORY, "results", "classification", "population_norms.csv")
population_norms = load_population_norms(norms_path)


In [160]:
# helpers

# compare subjects to norms
def compute_norms_and_labels(df: pd.DataFrame, target_score: str):
    # compare each individual's score to saved population norms
    if target_score not in population_norms:
        raise ValueError(f"no population norm found for {target_score} in {norms_path}")
    mean = population_norms[target_score]["mean"]
    std  = population_norms[target_score]["std"]

    # z-score vs population mean/SD
    z = (df[target_score] - mean) / std
    df[f"{target_score}_z"] = z

    # binary label: low (< -1 SD) vs not_low
    df["is_low"] = (z < -1).astype(int)

    return df, float(mean), float(std)

# after randomized search and grid search: load best parameter set
def load_tuned_rf_params():
    tuned_path = os.path.join(out_dir, "grid_search", "best_params_final.csv")
    if os.path.exists(tuned_path):
        try:
            df = pd.read_csv(tuned_path)
            params = df.iloc[0].to_dict() # convert parameters from csv to dictionary
            params["class_weight"] = "balanced"
            params["random_state"] = 42
            params["n_jobs"] = -1
            return params
        except Exception as e:
            print("Failed to read tuned params, using defaults. Error:", e)
    return dict(default_parameters)

# calculate metrics to evaluate random forest classifier performance
def classification_metrics(y_true, y_prob, threshold=0.5):
    y_true = np.asarray(y_true).astype(int)
    y_prob = np.asarray(y_prob) # predicted probability for positive class (low)
    y_pred = (y_prob >= threshold).astype(int)

    auc  = roc_auc_score(y_true, y_prob) if len(np.unique(y_true)) == 2 else np.nan
    sens = recall_score(y_true, y_pred, pos_label=1, zero_division=0) # recall -> TPR (positive / low = class 1)
    spec = recall_score(y_true, y_pred, pos_label=0, zero_division=0) # recall -> TNR (negative / middle-high = class 0)
    acc  = accuracy_score(y_true, y_pred)
    f1   = f1_score(y_true, y_pred, zero_division=0)
    mcc  = matthews_corrcoef(y_true, y_pred)

    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    prec = (tp / (tp + fp)) if (tp + fp) > 0 else np.nan
    npv  = (tn / (tn + fn)) if (tn + fn) > 0 else np.nan
    bal_acc = 0.5 * (sens + spec)

    return {
        "auroc": auc,
        "sensitivity": sens,
        "specificity": spec,
        "accuracy": acc,
        "precision": prec,
        "npv": npv,
        "f1": f1,
        "balanced_accuracy": bal_acc,
        "mcc": mcc,
        "tp": tp, "fp": fp, "tn": tn, "fn": fn,
        "threshold": threshold
    }

# find best threshold for sensitivity-specificity using youden's index
def find_best_threshold_youden(y_true, y_prob):
    fpr, tpr, thresholds = roc_curve(y_true, y_prob) # false positive rate & true positive rate (sensitivity)
    specificity = 1 - fpr
    J = tpr + specificity - 1 # calculate youden's index
    idx = np.argmax(J) # find index of maximum J value -> where combination of sensitivity and specificity is best
    best_thr = thresholds[idx] # find corresponding threshold
    return {
        "best_threshold": float(best_thr),
        "sensitivity": float(tpr[idx]), # at that threshold
        "specificity": float(specificity[idx]), # at that threshold
        "J": float(J[idx]) # youden's index
    }

# bootstrap classification metrics (mean + CI)
def bootstrap_classification_metrics(oof_df, threshold=baseline_threshold, n_boot=n_boot, ci=ci, random_state=random_state):
    y = oof_df["y_true"].to_numpy() # true labels
    p = oof_df["y_prob"].to_numpy() # predicted probabilities
    n = len(y)

    idxs = bootstrap_index_generator(n, n_boot, random_state)
    rows = []
    auprc_list = []

    for b in range(n_boot): # create bootstrap sample of y & p
        idx = idxs[b]
        yb, pb = y[idx], p[idx]
        rows.append(classification_metrics(yb, pb, threshold=threshold))
        auprc_list.append(average_precision_score(yb, pb) if np.unique(yb).size == 2 else np.nan)

    boot = pd.DataFrame(rows)
    boot["auprc"] = auprc_list

    out = {}
    for m in ["auroc","auprc","sensitivity","specificity","accuracy","precision","npv","f1","balanced_accuracy", "mcc"]: # compute metrics on bootstrapped sample
        lo, hi = confidence_intervals(boot[m].to_numpy(), ci=ci)
        out[f"{m}_mean"] = float(np.nanmean(boot[m]))
        out[f"{m}_ci_low"] = float(lo)
        out[f"{m}_ci_high"] = float(hi)

    out["threshold"] = threshold
    return boot, pd.DataFrame([out])

# plot ROC-curve (bootstrapped, with CI) with mean AUROC
def plot_roc_curve(oof_df, save_path, n_boot=n_boot, random_state=random_state):
    y = oof_df["y_true"].to_numpy()
    p = oof_df["y_prob"].to_numpy()
    n = len(y)
    fpr_grid = np.linspace(0, 1, 101)

    # bootstrap sampling
    idxs = bootstrap_index_generator(n, n_boot, random_state)
    tprs = np.empty((n_boot, fpr_grid.size), dtype=float)
    aucs = np.empty(n_boot, dtype=float)
    for b in range(n_boot):
        idx = idxs[b]
        fpr_b, tpr_b, _ = roc_curve(y[idx], p[idx])
        aucs[b] = roc_auc_score(y[idx], p[idx]) if len(np.unique(y[idx])) == 2 else np.nan
        tprs[b] = np.interp(fpr_grid, fpr_b, tpr_b, left=0.0, right=1.0)

    # mean TPR (ROC curve) with CI
    tpr_mean = np.nanmean(tprs, axis=0)
    tpr_lo   = np.nanquantile(tprs, 0.025, axis=0)
    tpr_hi   = np.nanquantile(tprs, 0.975, axis=0)

    # mean AUC
    auc_mean = np.nanmean(aucs)

    # plot
    plt.figure(figsize=(6,5))
    plt.plot(fpr_grid, tpr_mean, label=f"Mean ROC (bootstrapped)", color="C0")
    plt.fill_between(fpr_grid, tpr_lo, tpr_hi, alpha=0.2, label="95% CI", color="C0")
    plt.plot([0,1],[0,1], linestyle="--", linewidth=1, color="orange", label="Chance")
    plt.plot([], [], ' ', label=f"AUROC = {auc_mean:.3f}")
    plt.xlabel("False Positive Rate", fontsize=12)
    plt.ylabel("True Positive Rate (Sensitivity)", fontsize=12)
    plt.legend(loc="lower right", frameon=True, fancybox=True, framealpha=0.9)
    plt.gca().spines['top'].set_visible(False)
    plt.gca().spines['right'].set_visible(False)
    plt.tight_layout()
    plt.savefig(save_path, dpi=600, bbox_inches="tight")
    plt.close()


# plot confusion matrix for classifier's predictions
def plot_confusion_matrix(oof_df, save_path):
    # use labels produced by clf.predict
    y_true = oof_df["y_true"].to_numpy().astype(int)
    y_pred = oof_df["y_pred"].to_numpy().astype(int)

    cm = confusion_matrix(y_true, y_pred)
    cm_display = cm  # raw counts instead of percentages

    fig, ax = plt.subplots(figsize=(4.8, 4.2))
    im = ax.imshow(cm_display, cmap="Blues", interpolation="nearest")

    # colorbar
    cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
    cbar.ax.set_ylabel("Count", rotation=270, labelpad=12)

    ax.set_xticks([0, 1]); ax.set_yticks([0, 1])
    ax.set_xticklabels(["Middle-High (0)", "Low (1)"])
    ax.set_yticklabels(["Middle-High (0)", "Low (1)"])
    ax.set_ylabel("True", fontsize=12); ax.set_xlabel("Predicted", fontsize=12)

    fig.patch.set_facecolor("white")
    ax.set_facecolor("white")

    # annotations on top of matrix (white when too dark)
    vmax = cm_display.max() if cm_display.size else 0
    thresh = vmax / 2.0 if vmax > 0 else 0
    for i in range(2):
        for j in range(2):
            color = "white" if cm_display[i, j] > thresh else "black"
            ax.text(j, i, f"{cm_display[i, j]:.0f}", ha="center", va="center", color=color, fontsize=10)

    fig.tight_layout()
    plt.savefig(save_path, dpi=600, bbox_inches="tight", facecolor="white")
    plt.close()

# find threshold that maximizes F1 on precision-recall-curve
def find_best_threshold_f1(y_true, y_prob):
    # compute precision-recall curve from all possible thresholds, calculate f1 score
    p, r, t = precision_recall_curve(y_true, y_prob, pos_label=1)
    t = np.append(t, 1.0)
    f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)
    # find threshold that maximizes F1
    idx = int(np.nanargmax(f1))
    return {
        "best_threshold": float(t[idx]),
        "precision": float(p[idx]),
        "recall": float(r[idx]),
        "f1": float(f1[idx]),
    }

# find threshold to achieve certain target recall
def find_threshold_for_target_recall(y_true, y_prob, target_recall=0.80):
    # compute precision-recall curve from all possible thresholds, calculate f1 score
    p, r, t = precision_recall_curve(y_true, y_prob, pos_label=1)
    t = np.append(t, 1.0)
    # indices where recall >= target
    idxs = np.where(r >= target_recall)[0]
    if len(idxs) == 0:
        # pick closest recall if none reach target
        idx = int(np.argmin(np.abs(r - target_recall)))
    else:
        idx = int(idxs[-1])  # highest threshold that still meets recall
    return {
        "threshold": float(t[idx]),
        "precision": float(p[idx]),
        "recall": float(r[idx]),
    }

# plot precision-recall curve
def plot_pr_curve(oof_df, save_path):
    y_true = oof_df["y_true"].to_numpy().astype(int)
    y_prob = oof_df["y_prob"].to_numpy().astype(float)

    precision, recall, _ = precision_recall_curve(y_true, y_prob, pos_label=1)
    ap = average_precision_score(y_true, y_prob)
    prev = y_true.mean()

    plt.figure(figsize=(6,5))
    plt.step(recall, precision, where="post", label="Precision-Recall curve")
    plt.hlines(prev, 0, 1, linestyles="--", linewidth=1,
               label=f"Baseline (prevalence={prev:.2f})")
    plt.plot([], [], ' ', label=f"AUPRC = {ap:.3f}")
    plt.xlabel("Recall", fontsize=12)
    plt.ylabel("Precision", fontsize=12)
    plt.legend(loc="upper right")
    plt.gca().spines['top'].set_visible(False)
    plt.gca().spines['right'].set_visible(False)
    plt.tight_layout()
    plt.savefig(save_path, dpi=600, bbox_inches="tight")
    plt.close()

# plot confusion matrix at other threshold
def plot_confusion_matrix_at_threshold(oof_df, threshold, save_path, normalize=False):
    y_true = oof_df["y_true"].to_numpy().astype(int)
    y_pred = (oof_df["y_prob"].to_numpy().astype(float) >= float(threshold)).astype(int)

    cm = confusion_matrix(y_true, y_pred, labels=[0,1])
    if normalize:
        with np.errstate(invalid="ignore", divide="ignore"):
            cm = cm.astype(float) / cm.sum(axis=1, keepdims=True)

    fig, ax = plt.subplots(figsize=(4.8, 4.2))
    im = ax.imshow(cm, cmap="Blues", interpolation="nearest")

    cbar = plt.colorbar(im, ax=ax, fraction=0.046, pad=0.04)
    cbar.ax.set_ylabel("Count" if not normalize else "Proportion", rotation=270, labelpad=12)

    ax.set_xticks([0,1]); ax.set_yticks([0,1])
    ax.set_xticklabels(["Middle-High (0)", "Low (1)"])
    ax.set_yticklabels(["Middle-High (0)", "Low (1)"])
    ax.set_ylabel("True", fontsize=12); ax.set_xlabel("Predicted", fontsize=12)

    vmax = np.nanmax(cm) if cm.size else 0
    thresh = vmax/2.0 if vmax>0 else 0
    for i in range(2):
        for j in range(2):
            val = cm[i, j]
            txt = f"{val:.2f}" if normalize else f"{int(val)}"
            ax.text(j, i, txt, ha="center", va="center",
                    color=("white" if val > thresh else "black"), fontsize=10)

    fig.tight_layout()
    plt.savefig(save_path, dpi=600, bbox_inches="tight", facecolor="white")
    plt.close()

# bootstrap probability-threshold at specific recall target & bootstrap metrics at that threshold
def bootstrap_recall_target(oof_df, target_recall=0.80, n_boot=n_boot, ci=ci, random_state=random_state):
    y = oof_df["y_true"].to_numpy().astype(int)
    p = oof_df["y_prob"].to_numpy().astype(float)
    n = len(y)

    idxs = bootstrap_index_generator(n, n_boot, random_state)
    rows = []

    for b in range(n_boot):
        idx = idxs[b]
        yb, pb = y[idx], p[idx]

        # skip resamples with only one class (cannot compute PR)
        if np.unique(yb).size < 2:
            continue

        prec, rec, thr = precision_recall_curve(yb, pb, pos_label=1)
        thr = np.append(thr, 1.0)  # len(thr) = len(rec) - 1

        meets = np.where(rec >= target_recall)[0]
        j = int(meets[-1]) if len(meets) else int(np.argmin(np.abs(rec - target_recall)))

        thr_b = float(thr[j])
        met = classification_metrics(yb, pb, threshold=thr_b)
        met.update({
            "threshold": thr_b,
            "precision_at_target": float(prec[j]),
            "recall_at_target": float(rec[j])
        })
        rows.append(met)

    boot_df = pd.DataFrame(rows)

    # bootstrapped mean + CI
    summary = {}
    for col in [
        "threshold", "precision_at_target", "recall_at_target",
        "accuracy", "sensitivity", "specificity", "precision",
        "f1", "balanced_accuracy", "mcc", "auroc"
    ]:
        if col in boot_df:
            arr = boot_df[col].to_numpy()
            mean = np.nanmean(arr)
            lo, hi = confidence_intervals(arr, ci=ci)
        else:
            mean, lo, hi = np.nan, np.nan, np.nan

        summary[f"{col}_mean"] = mean
        summary[f"{col}_ci_low"] = lo
        summary[f"{col}_ci_high"] = hi

    summary["target_recall"] = float(target_recall)
    summary_df = pd.DataFrame([summary])
    return boot_df, summary_df

# apply bootstrapping to finding the best thresholds
def bootstrap_all_thresholds(oof_df, youden_thr, f1_thr, rec80_thr, n_boot=n_boot, ci=ci, random_state=random_state, recall_target=0.80, bootstrap_threshold="reselect"  # "fixed" or "reselect"
):
    # fixed: bootstrap metrics at threshold found on full predictions
    # reselect: re-pick threshold inside each bootstrap

    pieces_raw   = []
    pieces_summ  = []

    # fixed thresholds (baseline, Youden-J, max-F1, recall80-fixed)
    thr_list = [
        ("baseline_0.50", 0.50),
        ("youdenJ", youden_thr),
        ("max_f1", f1_thr),
        ("recall80_fixed", rec80_thr)
    ]

    for name, thr in thr_list:
        boot_raw, boot_summ = bootstrap_classification_metrics(
            oof_df, threshold=thr, n_boot=n_boot, ci=ci, random_state=random_state
        )
        boot_raw = boot_raw.assign(threshold_type=name)
        boot_summ = boot_summ.assign(threshold_type=name)
        pieces_raw.append(boot_raw)
        pieces_summ.append(boot_summ)

    # recall target with reselection per bootstrap
    if bootstrap_threshold == "reselect":
        boot_rec_raw, boot_rec_summ = bootstrap_recall_target(
            oof_df, target_recall=recall_target, n_boot=n_boot, ci=ci, random_state=random_state
        )
        boot_rec_raw  = boot_rec_raw.assign(threshold_type=f"recall>={recall_target:.2f}_reselect")
        boot_rec_summ = boot_rec_summ.assign(threshold_type=f"recall>={recall_target:.2f}_reselect")
        pieces_raw.append(boot_rec_raw)
        pieces_summ.append(boot_rec_summ)

    all_raw  = pd.concat(pieces_raw, ignore_index=True)
    all_summ = pd.concat(pieces_summ, ignore_index=True)

    # order columns
    metric_cols = ["auroc","auprc","sensitivity","specificity","accuracy","precision","npv","f1","balanced_accuracy","mcc","threshold"]
    summ_cols = [c for c in all_summ.columns if c.endswith("_mean") or c.endswith("_ci_low") or c.endswith("_ci_high")]
    ordered = ["threshold_type"] + sorted(set(all_summ.columns) - set(summ_cols) - {"threshold_type"}) + sorted(summ_cols)
    all_summ = all_summ[ordered]
    return all_raw, all_summ


In [161]:
# SHAP helpers

# normalize to a single-class explanation
def to_single_output_explanation(expl, class_index=1):
    vals = np.array(expl.values)
    base = np.array(expl.base_values)
    data = np.array(expl.data)

    # (n_samples, n_features) -> already single-output (binary)
    if vals.ndim == 2:
        single = (vals.shape[0] == 1)
        return shap.Explanation(
            values=vals[0, :] if single else vals,
            # only flatten base when single sample; otherwise keep original shape
            base_values=float(np.ravel(base)[0]) if (single and base.size >= 1) else base,
            # only flatten data when single sample
            data=data[0, :] if (data.ndim >= 2 and data.shape[0] == 1) else data,
            feature_names=expl.feature_names
        )

    # (n_features,) -> single sample collapsed to 1D
    if vals.ndim == 1:
        b = float(np.ravel(base)[0]) if base.size >= 1 else base
        d = data if data.ndim == 1 else data[0, :]
        return shap.Explanation(values=vals, base_values=b, data=d, feature_names=expl.feature_names)

    # (n_samples, n_features, n_outputs) -> pick class_index
    if vals.ndim == 3:
        k = class_index if vals.shape[2] > class_index else 0
        vals_sel = vals[:, :, k]
        base_sel = base[:, k] if base.ndim == 2 else (base[k] if base.ndim == 1 else base)
        single = (vals_sel.shape[0] == 1)
        data_sel = data
        return shap.Explanation(
            values=vals_sel[0, :] if single else vals_sel,
            # only flatten base when single sample; otherwise keep original shape
            base_values=float(np.ravel(base_sel)[0]) if (single and np.size(base_sel) >= 1) else base_sel,
            # only flatten data when single sample
            data=data_sel[0, :] if (data_sel.ndim >= 2 and data_sel.shape[0] == 1) else data_sel,
            feature_names=expl.feature_names
        )

    raise ValueError(f"Unexpected SHAP values shape: {vals.shape}")

# get mean absolute SHAP values and beeswarm plot for top 20
def shap_beeswarm_and_importance(df_use, feature_columns, fold_col, rf_params, out_prefix, demographic_variables=demographic_variables):

    linguistic = get_linguistic_features()
    acoustic   = get_acoustic_features()
    def family_of(c):
        if c in linguistic: return "linguistic"
        if c in acoustic:   return "acoustic"
        if c in demographic_variables: return "demographics"
        return "other"
    superscript_for = {"linguistic":"¹", "acoustic":"²", "demographics":"³", "other":""}

    X_all = df_use[feature_columns].copy()
    y_all = df_use["is_low"].astype(int).to_numpy()
    folds = sorted(df_use[fold_col].unique())

    vals_list, data_list = [], []

    for f in folds:
        mask = (df_use[fold_col] == f)
        X_tr, y_tr = X_all.loc[~mask], y_all[~mask]
        X_te        = X_all.loc[mask]

        clf = RandomForestClassifier(**rf_params)
        clf.fit(X_tr, y_tr)

        # TreeExplainer for SHAP values
        explainer = shap.TreeExplainer(clf)  # default model_output="raw"
        ex_all = explainer(X_te)

        # ensure single-output (class 1)
        ex = to_single_output_explanation(ex_all, class_index=1)

        vals_list.append(np.array(ex.values))  # [n_te, n_feat]
        data_list.append(np.array(ex.data))    # [n_te, n_feat]

    V = np.vstack(vals_list)   # [N, F]
    D = np.vstack(data_list)   # [N, F]

    # label with superscripts for feature-categories
    names = list(feature_columns)
    labeled = [f"{n}{superscript_for[family_of(n)]}" for n in names]

    expl = shap.Explanation(values=V,
                            base_values=np.mean(V, axis=0),
                            data=D,
                            feature_names=labeled)

    # beeswarm plot for top 20
    shap.plots.beeswarm(expl, max_display=20, show=False)
    plt.figtext(0.0, 0.01, "¹ linguistic   ² acoustic   ³ demographics",
                ha="left", va="bottom", fontsize=9)
    plt.tight_layout(rect=[0, 0.03, 1, 1])
    fig = plt.gcf(); ax = plt.gca()
    fig.patch.set_facecolor("white"); ax.set_facecolor("white")
    plt.savefig(out_prefix + "_shap_beeswarm.png", dpi=600, bbox_inches="tight")
    plt.close()

    # save mean absolute SHAP values
    mean_abs = np.abs(V).mean(axis=0)
    feature_importances = (pd.DataFrame({"feature": names, "mean_abs_shap": mean_abs})
             .sort_values("mean_abs_shap", ascending=False))
    feature_importances.to_csv(out_prefix + "_shap_importance.csv", index=False)
    return feature_importances

# plot one individual (low performer) as SHAP waterfall plot
def plot_local_waterfall(task, target_score, model_name, subject_id=None,
                         outdir=out_dir, max_display=15, random_state=random_state):

    rf_params = load_tuned_rf_params()
    df = load_data_for(task, target_score)
    df, _, _ = compute_norms_and_labels(df, target_score)
    model_features = build_feature_sets(df.columns, demographic_variables)
    feature_columns = model_features[model_name]

    subset = [target_score] + feature_columns
    df_use = df.dropna(subset=subset).copy()
    X_all = df_use[feature_columns].copy()
    y_all = df_use["is_low"].astype(int).to_numpy()

    # builds predictions to pick low performer
    clf = RandomForestClassifier(**rf_params)
    oof_rows = []
    for f in sorted(df_use["fold"].unique()):
        mask = (df_use["fold"] == f)
        X_train, y_train = X_all.loc[~mask], y_all[~mask]
        X_test,  y_test  = X_all.loc[mask],  y_all[mask]
        clf.fit(X_train, y_train)
        y_prob = clf.predict_proba(X_test)[:, 1]
        oof_rows.append(pd.DataFrame({
            "Subject_ID": df_use.loc[mask, "Subject_ID"].values,
            "fold": f, "y_true": y_test, "y_prob": y_prob
        }))
    oof = pd.concat(oof_rows, ignore_index=True)

    # choose subject with highest predicted probability for being low performer
    if subject_id is None:
        lows = oof[oof["y_true"] == 1]
        if lows.empty:
            raise ValueError("No true low performers found in this subset.")
        subject_id = lows.sort_values("y_prob", ascending=False)["Subject_ID"].iloc[0]

    row = df_use[df_use["Subject_ID"] == subject_id]
    if row.empty:
        raise ValueError(f"Subject_ID {subject_id} not found.")
    s_fold = int(row["fold"].iloc[0])

    # trains model on folds except the subject's fold
    train_df = df_use[df_use["fold"] != s_fold].copy()
    test_row = row.copy()

    X_train = train_df[feature_columns]; y_train = train_df["is_low"].astype(int)
    X_test_one = test_row[feature_columns]; y_test_one = int(test_row["is_low"].iloc[0])

    # fit local model & compute SHAP
    clf_local = RandomForestClassifier(**rf_params)
    clf_local.fit(X_train, y_train)

    explainer = shap.TreeExplainer(clf_local)
    ex_all = explainer(X_test_one)

    ex = to_single_output_explanation(ex_all, class_index=1)

    # label features with superscripts
    superscript_for = {"linguistic": "¹", "acoustic": "²", "demographics": "³", "other": ""}
    linguistic = get_linguistic_features()
    acoustic   = get_acoustic_features()
    families = {
        c: ("linguistic" if c in linguistic else
            "acoustic"   if c in acoustic   else
            "demographics" if c in demographic_variables else "other")
        for c in feature_columns
    }

    # label with superscripts for feature-categories
    labeled = [f"{n}{superscript_for.get(families.get(n,'other'), '')}" for n in ex.feature_names]
    ex_labeled = shap.Explanation(values=ex.values, base_values=ex.base_values, data=ex.data, feature_names=labeled)

    prefix = f"{task}_{target_score}_{model_name}"
    save_prefix = os.path.join(outdir, prefix)

    # create waterfall plot
    pred_prob = float(clf_local.predict_proba(X_test_one)[:, 1][0])
    title = (f"Local SHAP Waterfall\n"
             f"Subject {subject_id} | True label: {y_test_one} | Predicted P(low)={pred_prob:.3f}")

    plt.figure(figsize=(10, 6))
    shap.plots.waterfall(ex_labeled, max_display=max_display, show=False)
    plt.title(title, fontsize=11)
    plt.figtext(0.00, 0.02, "¹ linguistic   ² acoustic   ³ demographics",
                ha="left", va="top", fontsize=10)
    plt.subplots_adjust(left=0.30, right=0.96, bottom=0.15, top=0.85)
    out_path = f"{save_prefix}_waterfall_subject-{subject_id}.png"
    fig = plt.gcf()
    ax  = plt.gca()
    fig.patch.set_facecolor("white")
    ax.set_facecolor("white")
    plt.savefig(out_path, dpi=600, bbox_inches="tight")
    plt.close()
    print("saved waterfall plot:", out_path)

    return {"subject_id": subject_id, "fold": s_fold, "pred_prob": pred_prob, "path": out_path}


In [162]:
# helper to run classification

# load data
def load_data_for(task, target_score):
    demo_use = demographics[["Subject_ID"] + demographic_variables].copy()
    df = load_task_dataframe(task_name=task, target=target_score,
                             scores=scores, demographics=demo_use)
    return df

# run classification for one task-score-combination
def run_classifier(task, target_score, model_name, outdir=out_dir, baseline_threshold=baseline_threshold, n_boot=n_boot, random_state=random_state):
    # use tuned hyperparameters
    rf_params = load_tuned_rf_params()

    # prepare data
    df = load_data_for(task, target_score)
    df, mean, std = compute_norms_and_labels(df, target_score) # compute the norms and z-values
    model_features = build_feature_sets(df.columns, demographic_variables)
    feature_columns = model_features[model_name]

    subset = [target_score] + feature_columns
    df_use = df.dropna(subset=subset).copy()
    X = df_use[feature_columns].copy()
    y = df_use["is_low"].to_numpy()

    clf = RandomForestClassifier(**rf_params)

    # fit once per fold, get y_prob and y_pred from that fitted model
    oof_rows = []
    for f in sorted(df_use["fold"].unique()):
        mask = (df_use["fold"] == f)
        X_train, y_train = X.loc[~mask], y[~mask]
        X_test,  y_test  = X.loc[mask],  y[mask]
        clf.fit(X_train, y_train)
        y_prob = clf.predict_proba(X_test)[:, 1] # probabilities for ROC/Youden/bootstrap
        y_pred = clf.predict(X_test).astype(int) # labels for baseline metrics
        oof_rows.append(pd.DataFrame({
            "Subject_ID": df_use.loc[mask, "Subject_ID"].values,
            "fold": f,
            "y_true": y_test,
            "y_prob": y_prob,
            "y_pred": y_pred
        }))
    oof = pd.concat(oof_rows, ignore_index=True)

    # calculate baseline metrics from y_pred labels, AUROC from y_prob
    y_true = oof["y_true"].to_numpy()
    y_pred = oof["y_pred"].to_numpy()
    y_prob = oof["y_prob"].to_numpy()

    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    sens = recall_score(y_true, y_pred, pos_label=1, zero_division=0)
    spec = recall_score(y_true, y_pred, pos_label=0, zero_division=0)
    acc  = accuracy_score(y_true, y_pred)
    prec = precision_score(y_true, y_pred, zero_division=0)
    f1   = f1_score(y_true, y_pred, zero_division=0)
    bal_acc = 0.5 * (sens + spec)
    mcc = matthews_corrcoef(y_true, y_pred)
    auc  = roc_auc_score(y_true, y_prob) if len(np.unique(y_true)) == 2 else np.nan
    ap = average_precision_score(y_true, y_prob) if len(np.unique(y_true)) == 2 else np.nan

    base_metrics = {
        "auroc": auc,
        "auprc": ap,
        "sensitivity": sens,
        "specificity": spec,
        "accuracy": acc,
        "precision": prec,
        "npv": (tn / (tn + fn)) if (tn + fn) > 0 else np.nan,
        "f1": f1,
        "balanced_accuracy": bal_acc,
        "mcc": mcc,
        "tp": tp, "fp": fp, "tn": tn, "fn": fn,
        "threshold": np.nan
    }

    # bootstrap baseline metrics
    boot_raw_b, boot_summ_b = bootstrap_classification_metrics(
        oof, threshold=baseline_threshold, n_boot=n_boot, ci=0.95, random_state=random_state
    )

    # for saving
    os.makedirs(outdir, exist_ok=True)
    prefix = f"{task}_{target_score}_{model_name}"

    # compute different thresholds and add metrics there
    youden = find_best_threshold_youden(y_true, y_prob)
    f1_best = find_best_threshold_f1(y_true, y_prob)
    rec80 = find_threshold_for_target_recall(y_true, y_prob, target_recall=0.80)

    boot_all_raw, boot_all_summ = bootstrap_all_thresholds(
        oof, youden_thr=youden["best_threshold"], f1_thr=f1_best["best_threshold"],
        rec80_thr=rec80["threshold"], n_boot=n_boot, ci=ci, random_state=random_state,
        recall_target=0.80, bootstrap_threshold="reselect"
    )

    boot_all_summ.to_csv(os.path.join(outdir, f"{prefix}_bootstrap_metrics_summary.csv"), index=False)

    # save threshold table
    thr_df = pd.DataFrame([
        {"task": task, "target": target_score, "model": model_name,
         "criterion": "max_f1", **f1_best},
        {"task": task, "target": target_score, "model": model_name,
         "criterion": "recall>=0.80", **rec80}
    ])
    thr_df.to_csv(os.path.join(outdir, f"{prefix}_pr_thresholds.csv"), index=False)

    # evaluate full metrics at those thresholds
    f1_metrics = classification_metrics(y_true, y_prob, threshold=f1_best["best_threshold"])
    rec_metrics = classification_metrics(y_true, y_prob, threshold=rec80["threshold"])
    youden_metrics = classification_metrics(y_true, y_prob, threshold=youden["best_threshold"])

    # combine all metrics into one dataframe
    metrics = pd.DataFrame([
        {"task": task, "target": target_score, "model": model_name,
         "threshold_type": "baseline_predict", **base_metrics},
        {"task": task, "target": target_score, "model": model_name,
         "threshold_type": "youdenJ", **youden_metrics, **youden},
        {"task": task, "target": target_score, "model": model_name,
         "threshold_type": "max_f1", **f1_metrics, **f1_best},
        {"task": task, "target": target_score, "model": model_name,
         "threshold_type": "recall>=0.80", **rec_metrics, **rec80}
    ])

    # save metrics based on random forest classifier pred & based on proba with youden's index + precision-recall based ones
    metrics.to_csv(os.path.join(outdir, f"{prefix}_metrics_summary.csv"), index=False)

    # save predictions
    oof.to_csv(os.path.join(outdir, f"{prefix}_oof.csv"), index=False)

    # plots
    plot_roc_curve(
        oof, save_path=os.path.join(outdir, f"{prefix}_roc_bootstrapped.png"),
        n_boot=n_boot, random_state=random_state
    )
    plot_confusion_matrix(
        oof,
        save_path=os.path.join(outdir, f"{prefix}_confusion_matrix.png")
    )
    plot_pr_curve(
        oof,
        save_path=os.path.join(outdir, f"{prefix}_precision-recall.png")
    )
    # confusion matrix: recall >= 0.80
    plot_confusion_matrix_at_threshold(
        oof, rec80["threshold"],
        save_path=os.path.join(outdir, f"{prefix}_cm_recall80.png"),
        normalize=False
    )
    # confusion matrix: balanced reference (max-F1)
    plot_confusion_matrix_at_threshold(
        oof, f1_best["best_threshold"],
        save_path=os.path.join(outdir, f"{prefix}_cm_maxF1.png"),
        normalize=False
    )
    # confusion matrix: Youden-J
    plot_confusion_matrix_at_threshold(
        oof, youden["best_threshold"],
        save_path=os.path.join(outdir, f"{prefix}_cm_youdenJ.png"),
        normalize=False
    )

    # SHAP (global) for full model
    if model_name == "full":
        try:
            out_prefix = os.path.join(outdir, prefix)
            shap_beeswarm_and_importance(
                df_use=df_use,
                feature_columns=feature_columns,
                fold_col="fold",
                rf_params=rf_params,
                out_prefix=out_prefix,
                demographic_variables=demographic_variables
            )
        except Exception as e:
            print("SHAP skipped due to error:", e)

    return {
        "oof": oof,
        "metrics_summary": metrics,
        "boot_baseline_summary": boot_summ_b
    }

# run classificaiton for different models of one task-score combination
def run_all_models(task: str, target: str, models=("demographics","acoustic","linguistic","acoustic+linguistic","full")):
    results = []
    for m in models:
        print(f"{task} - {target} - {m}")
        out = run_classifier(task, target, m)
        results.append(out["metrics_summary"].assign(task=task, target=target, model=m))
    return pd.concat(results, ignore_index=True)


In [163]:
# helper for hyperparameter-tuning
def classification_hyperparameter_tuning(
    n_iter_random=100, refine_with_grid=True, test_half_label=1, random_state=random_state
):
    outdir = os.path.join(out_dir, "grid_search")
    os.makedirs(outdir, exist_ok=True)

    task_name = "picnicScene"
    target = "SemanticFluencyScore"
    model_name = "full"

    # features
    features_path = os.path.join(GIT_DIRECTORY, "results", "features", "filtered", f"{task_name}_filtered2.csv")
    X_task = pd.read_csv(features_path)

    # scores
    scores_all = pd.read_csv(scores_csv)
    scores = scores_all[["Subject_ID", target]].dropna(subset=[target]).copy()

    # demographics
    demo = demographics[["Subject_ID"] + demographic_variables].copy()

    # folds for half-split
    folds = pd.read_csv(os.path.join(GIT_DIRECTORY, "data", "stratified_folds_grid_search.csv"))

    # merge
    df = (X_task.merge(demo, on="Subject_ID", how="left")
               .merge(scores, on="Subject_ID", how="inner")
               .merge(folds[["Subject_ID","fold"]], on="Subject_ID", how="inner"))

    linguistic = get_linguistic_features(); acoustic = get_acoustic_features()
    feature_columns = sorted(list(linguistic | acoustic)) + demographic_variables
    feature_columns = [c for c in feature_columns if c in df.columns]

    df = df.dropna(subset=[target] + feature_columns).reset_index(drop=True)

    # split in half
    train_df = df[df["fold"] != test_half_label].reset_index(drop=True)
    test_df  = df[df["fold"] == test_half_label].reset_index(drop=True)

    X_train = train_df[feature_columns].copy()
    X_test  = test_df[feature_columns].copy()

    # train M & SD for both halves (avoid leakage)
    y_train = (((train_df[target] - train_df[target].mean()) / train_df[target].std()) < -1).astype(int).to_numpy()
    y_test  = (((test_df[target]  - train_df[target].mean()) / train_df[target].std()) < -1).astype(int).to_numpy()

    print(f"{task_name} | full | train N={len(X_train)}, test N={len(X_test)}, features={len(feature_columns)}")

    # inner CV on train half only
    inner_cv = list(KFold(n_splits=5, shuffle=True, random_state=random_state).split(np.arange(len(X_train))))

    param_dist = {
        "n_estimators": randint(100, 1001),
        "max_depth": [None] + list(range(10, 31)),
        "max_features": ["sqrt", "log2"],
        "min_samples_split": randint(2, 11),
        "min_samples_leaf": randint(1, 6),
        "bootstrap": [True, False],
        "class_weight": ["balanced"]
    }

    # randomized search first
    rs = RandomizedSearchCV(
        estimator=RandomForestClassifier(random_state=random_state, n_jobs=-1),
        param_distributions=param_dist,
        n_iter=n_iter_random,
        scoring="roc_auc",
        cv=inner_cv,
        n_jobs=-1,
        verbose=2,
        random_state=random_state,
        refit=True,
        return_train_score=True
    )
    rs.fit(X_train, y_train)
    best_params = rs.best_params_
    best_cv = float(rs.best_score_)
    os.makedirs(outdir, exist_ok=True)
    pd.DataFrame(rs.cv_results_).to_csv(os.path.join(outdir, "randomized_search_cv_results.csv"), index=False)
    print("RandomizedSearch best params:", best_params)
    print("RandomizedSearch best inner-CV AUROC:", f"{best_cv:.3f}")

    final_params = best_params

    # grid search after
    if refine_with_grid:
        bp = best_params
        def win(center, radius, low, high, step=1):
            if center is None:
                return [None]
            a = max(low, int(center) - radius)
            b = min(high, int(center) + radius)
            return list(range(a, b + 1, step)) if a <= b else [int(center)]
        n_estimators_step = 50
        grid = {
            "n_estimators": win(bp["n_estimators"], radius=200, low=100, high=1000, step=n_estimators_step),
            "max_depth": [None] if bp["max_depth"] is None else win(bp["max_depth"], radius=3, low=10, high=30, step=1),
            "max_features": ["sqrt", "log2"],
            "min_samples_split": win(bp["min_samples_split"], radius=1, low=2, high=10, step=1),
            "min_samples_leaf": win(bp["min_samples_leaf"], radius=1, low=1, high=5, step=1),
            "bootstrap": [True, False],
            "class_weight": ["balanced"]
        }
        gs = GridSearchCV(
            estimator=RandomForestClassifier(random_state=random_state, n_jobs=-1),
            param_grid=grid,
            scoring="roc_auc",
            cv=inner_cv,
            n_jobs=-1,
            verbose=2,
            refit=True,
            return_train_score=False
        )
        gs.fit(X_train, y_train)
        best_params = gs.best_params_
        best_cv = float(gs.best_score_)
        pd.DataFrame(gs.cv_results_).to_csv(os.path.join(outdir, "grid_search_cv_results.csv"), index=False)
        print("GridSearch best params:", best_params)
        print("GridSearch best inner-CV AUROC:", f"{best_cv:.3f}")
        final_params = best_params

    pd.DataFrame([final_params]).to_csv(os.path.join(outdir, "best_params_final.csv"), index=False)
    return final_params


In [164]:
#_ = classification_hyperparameter_tuning(
#    n_iter_random=100,
#    refine_with_grid=True,
#    test_half_label=1,
#    random_state=random_state
#)

In [165]:
# run for all models and save results
results_df = run_all_models(task, target, models=("demographics","acoustic","linguistic","acoustic+linguistic","full"))
results_df.tail()

picnicScene - PhonemicFluencyScore - demographics


  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)


picnicScene - PhonemicFluencyScore - acoustic


  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)


picnicScene - PhonemicFluencyScore - linguistic


  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)


picnicScene - PhonemicFluencyScore - acoustic+linguistic


  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)


picnicScene - PhonemicFluencyScore - full


  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)


Unnamed: 0,task,target,model,threshold_type,auroc,auprc,sensitivity,specificity,accuracy,precision,...,balanced_accuracy,mcc,tp,fp,tn,fn,threshold,best_threshold,J,recall
15,picnicScene,PhonemicFluencyScore,acoustic+linguistic,recall>=0.80,0.581566,,0.805556,0.239001,0.321827,0.153439,...,0.522278,0.037266,116,640,201,28,0.138753,,,0.805556
16,picnicScene,PhonemicFluencyScore,full,baseline_predict,0.576637,0.208256,0.013889,0.996433,0.852792,0.4,...,0.505161,0.051314,2,3,838,142,,,,
17,picnicScene,PhonemicFluencyScore,full,youdenJ,0.576637,,0.423611,0.741974,0.695431,0.219424,...,0.582792,0.129977,61,217,624,83,0.236732,0.236732,0.165585,
18,picnicScene,PhonemicFluencyScore,full,max_f1,0.576637,,0.423611,0.741974,0.695431,0.219424,...,0.582792,0.129977,61,217,624,83,0.236732,0.236732,,0.423611
19,picnicScene,PhonemicFluencyScore,full,recall>=0.80,0.576637,,0.805556,0.24019,0.322843,0.153642,...,0.522873,0.038203,116,639,202,28,0.141571,,,0.805556


In [166]:
# add waterfall plot
waterfall_plot = plot_local_waterfall(task, target, model)
waterfall_plot

saved waterfall plot: /Users/gilanorup/Desktop/Studium/MSc/MA/code/masters_thesis_gn/results/classification/picnicScene_PhonemicFluencyScore_full_waterfall_subject-922.png


{'subject_id': np.int64(922),
 'fold': 4,
 'pred_prob': 0.5618482743255889,
 'path': '/Users/gilanorup/Desktop/Studium/MSc/MA/code/masters_thesis_gn/results/classification/picnicScene_PhonemicFluencyScore_full_waterfall_subject-922.png'}

In [167]:


def plot_multi_roc_overlay(
    task, target, models, outdir,
    n_boot=1000, random_state=42,
    use_saved_oof=True, oof_dict=None,
    file_prefix_template="{task}_{target}_{model}_oof.csv",
    save_name=None,
    show_ci="none", # "none" / "all" / "one"
    ci_model="full", # which model gets CI if show_ci=="one"
    legend_metric="none" # "none" / "mean"  -> "mean" = add AUROC values
):
    rng = np.random.RandomState(random_state)
    fpr_grid = np.linspace(0, 1, 101)

    # display model names for legend
    label_map = {
        "full": "full",
        "acoustic+linguistic": "acoustic + linguistic",
        "linguistic": "linguistic",
        "acoustic": "acoustic",
        "demographics": "demographic"
    }

    # load OOFs
    data = {}
    for m in models:
        if use_saved_oof:
            path = os.path.join(outdir, file_prefix_template.format(task=task, target=target, model=m))
            df = pd.read_csv(path)
        else:
            if oof_dict is None or m not in oof_dict:
                raise ValueError(f"Missing OOF data for model '{m}'.")
            df = oof_dict[m].copy()
        df = df[["Subject_ID","y_true","y_prob"]].dropna().copy()
        df["Subject_ID"] = df["Subject_ID"].astype(str)
        data[m] = df

    # align subjects (intersection)
    common = set.intersection(*(set(d["Subject_ID"]) for d in data.values()))
    for m in models:
        data[m] = (data[m][data[m]["Subject_ID"].isin(common)]
                   .sort_values("Subject_ID").reset_index(drop=True))
    N = len(next(iter(data.values())))
    if N == 0:
        raise ValueError("After alignment, there are 0 samples.")

    # bootstrap ROC per model
    results = {}
    idxs = rng.randint(0, N, size=(n_boot, N))
    for m in models:
        dfm = data[m]
        y = dfm["y_true"].to_numpy().astype(int)
        p = dfm["y_prob"].to_numpy().astype(float)

        tprs = np.empty((n_boot, fpr_grid.size))
        aucs = np.empty(n_boot)
        for b in range(n_boot):
            ii = idxs[b]
            yb, pb = y[ii], p[ii]
            if np.unique(yb).size < 2:
                tprs[b, :] = np.nan
                aucs[b] = np.nan
                continue
            fpr_b, tpr_b, _ = roc_curve(yb, pb)
            aucs[b] = roc_auc_score(yb, pb)
            tprs[b, :] = np.interp(fpr_grid, fpr_b, tpr_b, left=0.0, right=1.0)

        results[m] = {
            "tpr_mean": np.nanmean(tprs, axis=0),
            "tpr_lo":   np.nanquantile(tprs, 0.025, axis=0),
            "tpr_hi":   np.nanquantile(tprs, 0.975, axis=0),
            "auc_mean": float(np.nanmean(aucs)),
        }

    # plot
    plt.figure(figsize=(6.8, 5.6))
    for m in models:
        r = results[m]
        lw = 1.2
        z  = 2   if m != "full" else 3  # full line on top
        display_name = label_map.get(m, m)

        if legend_metric == "mean":
            label = f"{display_name} | AUROC = {r['auc_mean']:.3f}"
        else:
            label = display_name

        # CI (behind line)
        if show_ci == "all" or (show_ci == "one" and m == ci_model):
            plt.fill_between(
                fpr_grid, r["tpr_lo"], r["tpr_hi"],
                alpha=0.15, color=colors.get(m, None), zorder=1
            )

        # mean ROC line
        plt.plot(
            fpr_grid, r["tpr_mean"],
            label=label, linewidth=lw, color=colors.get(m, None), zorder=z
        )

    # chance
    plt.plot([0,1], [0,1], linestyle="--", linewidth=1,
             color="gray", label="Chance")

    plt.xlabel("False Positive Rate", fontsize=12)
    plt.ylabel("True Positive Rate (Sensitivity)", fontsize=12)
    plt.legend(loc="lower right", frameon=True, fancybox=True, framealpha=0.9)
    plt.grid(alpha=0.15, linestyle='--')
    ax = plt.gca()
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)
    plt.tight_layout()

    if save_name is None:
        save_name = f"{task}_{target}_all_ROC_curves.png"
    save_path = os.path.join(outdir, save_name)
    plt.savefig(save_path, dpi=600, bbox_inches="tight", facecolor="white")
    plt.close()
    print("Saved:", save_path)
    return results


In [168]:
overlay = plot_multi_roc_overlay(
    task,
    target,
    models=("demographics", "acoustic", "linguistic", "acoustic+linguistic", "full"),
    outdir=os.path.join(GIT_DIRECTORY, "results", "classification"),
    show_ci="one",
    ci_model="full",
    legend_metric="mean"   # <— key change
)


Saved: /Users/gilanorup/Desktop/Studium/MSc/MA/code/masters_thesis_gn/results/classification/picnicScene_PhonemicFluencyScore_all_ROC_curves.png


In [169]:
def plot_multi_pr_overlay(
    task,
    target,
    models,
    outdir,
    n_boot=1000,
    ci=0.95,
    random_state=42,
    use_saved_oof=True,
    oof_dict=None,
    file_prefix_template="{task}_{target}_{model}_oof.csv",
    save_name=None,
    legend_metric="mean", # "none" / "mean" (AUPRC)
    show_f1_for="full", # "all" / model name / None
):
    rng = np.random.RandomState(random_state)

    label_map = {
        "full": "full",
        "acoustic+linguistic": "acoustic + linguistic",
        "linguistic": "linguistic",
        "acoustic": "acoustic",
        "demographics": "demographic"
    }

    # load OOFs
    data = {}
    for m in models:
        if use_saved_oof:
            path = os.path.join(
                outdir,
                file_prefix_template.format(task=task, target=target, model=m)
            )
            df = pd.read_csv(path)
        else:
            if oof_dict is None or m not in oof_dict:
                raise ValueError(f"Missing OOF data for model '{m}'.")
            df = oof_dict[m].copy()

        df = df[["Subject_ID", "y_true", "y_prob"]].dropna().copy()
        df["Subject_ID"] = df["Subject_ID"].astype(str)
        data[m] = df

    # align on common subjects
    common = set.intersection(*(set(d["Subject_ID"]) for d in data.values()))
    if not common:
        raise ValueError("After alignment, there are 0 common samples across models.")

    for m in models:
        data[m] = (
            data[m][data[m]["Subject_ID"].isin(common)]
            .sort_values("Subject_ID")
            .reset_index(drop=True)
        )

    N = len(next(iter(data.values())))
    if N == 0:
        raise ValueError("No samples after alignment.")

    # shared bootstrap indices for AUPRC CIs
    idxs = bootstrap_index_generator(N, n_boot, random_state)
    alpha = 1.0 - ci

    results = {}

    # PR + bootstrapped AUPRC + max-F1 for each model
    for m in models:
        dfm = data[m]
        y = dfm["y_true"].to_numpy().astype(int)
        p = dfm["y_prob"].to_numpy().astype(float)

        # full-data PR curve
        pr_prec, pr_rec, _ = precision_recall_curve(y, p, pos_label=1)
        ap = average_precision_score(y, p)

        # F1-score
        f1_info = find_best_threshold_f1(y, p)

        # bootstrap AUPRC
        auprcs = np.empty(n_boot, dtype=float)
        for b in range(n_boot):
            ii = idxs[b]
            yb, pb = y[ii], p[ii]
            if np.unique(yb).size < 2:
                auprcs[b] = np.nan
            else:
                auprcs[b] = average_precision_score(yb, pb)

        ci_low = np.nanquantile(auprcs, alpha / 2.0)
        ci_high = np.nanquantile(auprcs, 1.0 - alpha / 2.0)

        results[m] = {
            "precision": pr_prec,
            "recall": pr_rec,
            "auprc": float(ap),
            "auprc_ci_low": float(ci_low),
            "auprc_ci_high": float(ci_high),
            "f1_info": f1_info,
        }

    # plot
    fig, ax = plt.subplots(figsize=(6.8, 5.6))
    fig.patch.set_facecolor("white")
    ax.set_facecolor("white")

    any_m = models[0]
    prevalence = data[any_m]["y_true"].mean()

    f1_value = None

    # draw PR curves + F1 marker
    for m in models:
        r = results[m]
        display_name = label_map.get(m, m)
        col = colors.get(m, None)
        lw = 1.2
        z = 2 if m != "full" else 3

        if legend_metric == "mean":
            label = f"{display_name} | AUPRC = {r['auprc']:.3f}"
        else:
            label = display_name

        ax.step(
            r["recall"],
            r["precision"],
            where="post",
            linewidth=lw,
            color=col,
            label=label,
            zorder=z,
        )

        # mark F1 point for chosen model(s), but no separate legend yet
        if show_f1_for in ("all", m):
            fi = r["f1_info"]
            ax.scatter(
                [fi["recall"]],
                [fi["precision"]],
                s=36,
                facecolors="white",
                edgecolors=col if col is not None else "black",
                zorder=z + 1,
            )
            if m == show_f1_for:
                f1_value = fi["f1"]

    # baseline
    baseline_label = f"Baseline (prevalence = {prevalence:.2f})"
    ax.hlines(
        prevalence,
        0.0,
        1.0,
        linestyles="--",
        linewidth=1,
        color="gray",
        label=baseline_label,
    )

    # base legend entries so far
    handles, labels = ax.get_legend_handles_labels()

    # add extra legend entry for F1 below baseline entry
    if show_f1_for in models and f1_value is not None:
        f1_marker = plt.Line2D(
            [0], [0],
            marker="o",
            linestyle="None",
            markerfacecolor="white",
            markeredgecolor=colors.get(show_f1_for, "black"),
            markersize=6,
        )
        f1_label = f"max F1 ({show_f1_for} model) = {f1_value:.3f}"
        handles.append(f1_marker)
        labels.append(f1_label)

    ax.legend(
        handles,
        labels,
        loc="upper right",
        frameon=True,
        fancybox=True,
        framealpha=0.9,
    )

    ax.set_xlabel("Recall", fontsize=12)
    ax.set_ylabel("Precision", fontsize=12)
    ax.grid(alpha=0.15, linestyle='--')
    ax.spines["top"].set_visible(False)
    ax.spines["right"].set_visible(False)

    fig.tight_layout()

    if save_name is None:
        save_name = f"{task}_{target}_all_PR_curves.png"

    save_path = os.path.join(outdir, save_name)
    fig.savefig(save_path, dpi=600, bbox_inches="tight", facecolor="white")
    plt.close(fig)

    print("Saved:", save_path)
    return results


In [170]:
pr_overlay = plot_multi_pr_overlay(
    task,
    target,
    models=("demographics", "acoustic", "linguistic", "acoustic+linguistic", "full"),
    outdir=os.path.join(GIT_DIRECTORY, "results", "classification"),
    legend_metric="mean",
    show_f1_for="full"
)


  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)
  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)
  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)
  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)
  f1 = np.where((p + r) > 0, 2 * p * r / (p + r), 0.0)


Saved: /Users/gilanorup/Desktop/Studium/MSc/MA/code/masters_thesis_gn/results/classification/picnicScene_PhonemicFluencyScore_all_PR_curves.png
