In [1]:
# ============================================
# MER Benchmark: CLAP embeddings + gold labels
# ============================================

# ---------- Parameters ----------
LABELS_CSV = "final_gold_combined.csv"      # gold top-3 labels per song
EMBEDS_CSV = "clap_embeddings.csv"    # your CLAP *audio* embeddings per song

OUT_DIR       = "mer_benchmark_outputs"
TEST_SIZE     = 0.20
VAL_SIZE      = 0.10     # fraction of the remaining after test split
RANDOM_STATE  = 42
CALIBRATE_THRESHOLDS = True   # set False to skip per-class threshold tuning

# Emotion tag vocabulary (order matters)
TAGS = [
    "Stimulating", "Playful", "Soothing", "Sensory-Calming",
    "Grounding", "Focusing", "Transitional", "Anxiety-Reduction"
]

In [None]:
# ---------- Imports ----------
import os, json, ast, warnings
from pathlib import Path
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import (
    f1_score, jaccard_score, accuracy_score,
    precision_score, recall_score
)
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.neighbors import KNeighborsClassifier

warnings.filterwarnings("ignore", category=UserWarning)

In [3]:
# ---------- Utilities ----------
def ensure_outdir(path):
    Path(path).mkdir(parents=True, exist_ok=True)

def load_labels(labels_csv, tags):
    df = pd.read_csv(labels_csv)
    need = {"filename","final_top1","final_top2","final_top3"}
    missing = need - set(df.columns)
    if missing:
        raise ValueError(f"Label CSV missing columns: {missing}")

    # multi-hot
    Y = []
    for _, r in df.iterrows():
        chosen = set([str(r["final_top1"]), str(r["final_top2"]), str(r["final_top3"])])
        Y.append([1 if t in chosen else 0 for t in tags])
    Y = np.array(Y, dtype=int)

    # normalize filename
    df["filename"] = df["filename"].astype(str).str.strip()
    return df, Y

def parse_embedding_row(row):
    """
    Parse an embedding from a row that might store it as:
    - A single 'embedding' column with a JSON / Python-like list string, or
    - Many numeric columns (we'll select numeric cols excluding ID cols).
    """
    if "embedding" in row.index:
        val = row["embedding"]
        if isinstance(val, (list, np.ndarray)):
            return np.array(val, dtype=float)
        if isinstance(val, str):
            try:
                arr = ast.literal_eval(val)
                return np.array(arr, dtype=float)
            except Exception:
                pass

    numeric = []
    for col, v in row.items():
        if col.lower() in {"filename","file_name","path","song","track"}:
            continue
        try:
            numeric.append(float(v))
        except Exception:
            continue
    return np.array(numeric, dtype=float)

def load_embeddings(embeds_csv):
    df = pd.read_csv(embeds_csv)
    if "file_name" in df.columns and "filename" not in df.columns:
        df = df.rename(columns={"file_name":"filename"})
    if "filename" not in df.columns:
        # try to infer
        for c in df.columns:
            if "file" in c.lower() and "name" in c.lower():
                df = df.rename(columns={c:"filename"})
                break
    if "filename" not in df.columns:
        raise ValueError("Embeddings CSV must contain a 'filename' column (or 'file_name').")
    df["filename"] = df["filename"].astype(str).str.strip()

    X_rows = [parse_embedding_row(r) for _, r in df.iterrows()]
    X = np.vstack(X_rows)
    return df[["filename"]].copy(), X

def match_and_merge(labels_df, Y, embeds_df, X):
    merged = labels_df.merge(embeds_df, on="filename", how="inner")
    # align Y and X to merged order
    idx_lut_L = {fn:i for i, fn in enumerate(labels_df["filename"].tolist())}
    idx_lut_E = {fn:i for i, fn in enumerate(embeds_df["filename"].tolist())}
    y_sel = np.vstack([Y[idx_lut_L[fn]] for fn in merged["filename"]])
    x_sel = np.vstack([X[idx_lut_E[fn]] for fn in merged["filename"]])
    return merged, x_sel, y_sel

def summarize(merged_df, X, Y, tags):
    print(f"Total merged songs: {len(merged_df)}")
    print(f"Feature matrix: {X.shape}   (n_songs, emb_dim)")
    print(f"Label matrix:   {Y.shape}   (n_songs, n_tags={len(tags)})")
    counts = Y.sum(axis=0)
    print("\nPer-tag positive counts:")
    for t, c in zip(tags, counts):
        print(f"  {t:18s}: {int(c)}")
    if "source" in merged_df.columns:
        print("\nSource breakdown:")
        print(merged_df["source"].value_counts())

def sigmoid(x):
    return 1.0/(1.0 + np.exp(-x))

def multilabel_metrics(y_true, y_prob, threshold=0.5):
    y_pred = (y_prob >= threshold).astype(int)
    return (
        {
            "micro_f1": float(f1_score(y_true, y_pred, average="micro", zero_division=0)),
            "macro_f1": float(f1_score(y_true, y_pred, average="macro", zero_division=0)),
            "micro_jaccard": float(jaccard_score(y_true, y_pred, average="micro", zero_division=0)),
            "macro_jaccard": float(jaccard_score(y_true, y_pred, average="macro", zero_division=0)),
            "subset_accuracy": float(accuracy_score(y_true, y_pred)),
        },
        y_pred,
    )

def per_class_report(y_true, y_pred, tags):
    rows = []
    for i, t in enumerate(tags):
        rows.append({
            "tag": t,
            "precision": float(precision_score(y_true[:,i], y_pred[:,i], zero_division=0)),
            "recall":    float(recall_score   (y_true[:,i], y_pred[:,i], zero_division=0)),
            "f1":        float(f1_score       (y_true[:,i], y_pred[:,i], zero_division=0)),
        })
    return pd.DataFrame(rows).sort_values("tag")

def topk_overlap_metrics(y_true, y_prob, k=3):
    j_scores, pks, rks = [], [], []
    for i in range(len(y_true)):
        pred_idx = np.argsort(-y_prob[i])[:k]
        pred_set = set(pred_idx.tolist())
        gold_set = set(np.where(y_true[i]==1)[0].tolist())
        inter = len(pred_set & gold_set)
        union = len(pred_set | gold_set)
        j_scores.append(inter/union if union>0 else 0.0)
        pks.append(inter/k if k>0 else 0.0)
        rks.append(inter/len(gold_set) if len(gold_set)>0 else 0.0)
    return {
        "mean_jaccard_at_k": float(np.mean(j_scores)),
        "mean_precision_at_k": float(np.mean(pks)),
        "mean_recall_at_k": float(np.mean(rks)),
        "k": int(k),
    }

def calibrate_thresholds_per_class(y_true_val, y_prob_val, grid=None):
    """
    Returns an array of per-class thresholds chosen to maximize F1 on validation.
    """
    if grid is None:
        grid = np.linspace(0.2, 0.8, 13)  # 0.20, 0.25, ..., 0.80
    C = y_true_val.shape[1]
    ths = np.full(C, 0.5, dtype=float)
    for c in range(C):
        best_f1, best_t = -1.0, 0.5
        for t in grid:
            pred = (y_prob_val[:,c] >= t).astype(int)
            f1 = f1_score(y_true_val[:,c], pred, zero_division=0)
            if f1 > best_f1:
                best_f1, best_t = f1, t
        ths[c] = best_t
    return ths

def apply_thresholds(y_prob, thresholds):
    return (y_prob >= thresholds[None, :]).astype(int)

def evaluate_split(y_true, y_prob, tag_names, thresh=0.5, per_class=False):
    metrics, y_pred = multilabel_metrics(y_true, y_prob, threshold=thresh)
    topk = topk_overlap_metrics(y_true, y_prob, k=3)
    out = {"metrics": metrics, "topk@3": topk}
    if per_class:
        out["per_class"] = per_class_report(y_true, y_pred, tag_names)
    return out

In [None]:
# ---------- Load & Merge ----------
ensure_outdir(OUT_DIR)

labels_df, Y_all = load_labels(LABELS_CSV, TAGS)
embeds_df, X_all = load_embeddings(EMBEDS_CSV)

merged_df, X, Y = match_and_merge(labels_df, Y_all, embeds_df, X_all)
summarize(merged_df, X, Y, TAGS)

# ---------- Train/Val/Test split ----------
# Stratify by final_top1 for stability (simple proxy for label distribution)
y_primary = merged_df["final_top1"].astype(str)

X_trv, X_test, Y_trv, Y_test, idx_trv, idx_test = train_test_split(
    X, Y, np.arange(len(Y)), test_size=TEST_SIZE, random_state=RANDOM_STATE, stratify=y_primary
)

y_primary_trv = y_primary.iloc[idx_trv]
X_train, X_val, Y_train, Y_val, idx_train, idx_val = train_test_split(
    X_trv, Y_trv, np.arange(len(Y_trv)),
    test_size=VAL_SIZE/(1-TEST_SIZE), random_state=RANDOM_STATE, stratify=y_primary_trv
)

print(f"\nSplit sizes → Train: {X_train.shape[0]} | Val: {X_val.shape[0]} | Test: {X_test.shape[0]}")


In [5]:
# ---------- Standardize features ----------
scaler = StandardScaler(with_mean=True, with_std=True)
X_train_s = scaler.fit_transform(X_train)
X_val_s   = scaler.transform(X_val)
X_test_s  = scaler.transform(X_test)

In [None]:
# ---------- Baselines ----------
baselines = [
    ("logreg_ovr", OneVsRestClassifier(LogisticRegression(
        solver="liblinear", C=1.0, max_iter=200
    ))),

    ("linear_svm_ovr", OneVsRestClassifier(LinearSVC(
        C=1.0
    ))),  # uses decision_function; we'll map to probs via sigmoid

    ("rf_ovr", OneVsRestClassifier(RandomForestClassifier(
        n_estimators=400, max_depth=None, n_jobs=-1, random_state=RANDOM_STATE
    ))),

    ("mlp_ovr", OneVsRestClassifier(MLPClassifier(
        hidden_layer_sizes=(512,), activation="relu", alpha=1e-4,
        max_iter=200, early_stopping=True, n_iter_no_change=10, random_state=RANDOM_STATE
    ))),

    ("knn_ovr", OneVsRestClassifier(KNeighborsClassifier(
        n_neighbors=15, weights="distance"
    ))),
]

results = {}
ensure_outdir(os.path.join(OUT_DIR, "per_model"))

def model_predict_proba(ovr_model, Xs):
    """
    Get per-class probabilities for an OvR model, falling back to decision_function->sigmoid when needed.
    Returns numpy array (n_samples, n_classes)
    """
    # Try predict_proba (works for LR, RF, KNN, MLP in most cases)
    if hasattr(ovr_model, "predict_proba"):
        try:
            P = ovr_model.predict_proba(Xs)
            if isinstance(P, list):  # some OvR impls return list of arrays
                P = np.column_stack([p[:,1] if p.ndim==2 else p for p in P])
            return P
        except Exception:
            pass

    # Fallback: decision_function mapped through sigmoid (for LinearSVC, etc.)
    if hasattr(ovr_model, "decision_function"):
        dec = ovr_model.decision_function(Xs)
        if dec.ndim == 1:  # single class edge case
            dec = dec[:, None]
        return sigmoid(dec)

    # Worst-case: use predict and cast to {0,1} "probabilities"
    yp = ovr_model.predict(Xs)
    yp = yp.astype(float)
    return yp

for name, model in baselines:
    print(f"\n=== Training: {name} ===")
    model.fit(X_train_s, Y_train)

    # Probabilities on val/test
    Y_val_prob  = model_predict_proba(model, X_val_s)
    Y_test_prob = model_predict_proba(model, X_test_s)

    # --- Default threshold 0.5 ---
    val_eval  = evaluate_split(Y_val,  Y_val_prob, TAGS, thresh=0.5, per_class=True)
    test_eval = evaluate_split(Y_test, Y_test_prob, TAGS, thresh=0.5, per_class=True)

    # --- Optional: per-class threshold calibration on VAL ---
    calib = None
    test_eval_cal = None
    if CALIBRATE_THRESHOLDS:
        ths = calibrate_thresholds_per_class(Y_val, Y_val_prob)
        Y_test_pred_cal = apply_thresholds(Y_test_prob, ths)
        # compute metrics with calibrated preds
        metrics_cal = {
            "micro_f1": float(f1_score(Y_test, Y_test_pred_cal, average="micro", zero_division=0)),
            "macro_f1": float(f1_score(Y_test, Y_test_pred_cal, average="macro", zero_division=0)),
            "micro_jaccard": float(jaccard_score(Y_test, Y_test_pred_cal, average="micro", zero_division=0)),
            "macro_jaccard": float(jaccard_score(Y_test, Y_test_pred_cal, average="macro", zero_division=0)),
            "subset_accuracy": float(accuracy_score(Y_test, Y_test_pred_cal)),
        }
        test_eval_cal = {
            "metrics": metrics_cal,
            "per_class": per_class_report(Y_test, Y_test_pred_cal, TAGS),
            "thresholds": ths.tolist()
        }
        calib = ths.tolist()

    results[name] = {
        "val": {
            "metrics": val_eval["metrics"],
            "topk@3":  val_eval["topk@3"],
        },
        "test": {
            "metrics": test_eval["metrics"],
            "topk@3":  test_eval["topk@3"],
        },
        "calibrated_test" : test_eval_cal,   # may be None
    }

    # Save per-class csvs
    val_eval["per_class"].to_csv(os.path.join(OUT_DIR, "per_model", f"{name}_per_class_val.csv"), index=False)
    test_eval["per_class"].to_csv(os.path.join(OUT_DIR, "per_model", f"{name}_per_class_test.csv"), index=False)
    if test_eval_cal is not None:
        test_eval_cal["per_class"].to_csv(os.path.join(OUT_DIR, "per_model", f"{name}_per_class_test_calibrated.csv"), index=False)

    # Print quick summary
    print("  [VAL] micro F1: {:.3f} | macro F1: {:.3f} | micro J: {:.3f}".format(
        results[name]["val"]["metrics"]["micro_f1"],
        results[name]["val"]["metrics"]["macro_f1"],
        results[name]["val"]["metrics"]["micro_jaccard"],
    ))
    print("  [TEST] micro F1: {:.3f} | macro F1: {:.3f} | micro J: {:.3f}".format(
        results[name]["test"]["metrics"]["micro_f1"],
        results[name]["test"]["metrics"]["macro_f1"],
        results[name]["test"]["metrics"]["micro_jaccard"],
    ))
    if test_eval_cal is not None:
        print("  [TEST-Calib] micro F1: {:.3f} | macro F1: {:.3f} | micro J: {:.3f}".format(
            test_eval_cal["metrics"]["micro_f1"],
            test_eval_cal["metrics"]["macro_f1"],
            test_eval_cal["metrics"]["micro_jaccard"],
        ))

In [None]:
# # ---------- Save all metrics ----------
# ensure_outdir(OUT_DIR)
# with open(os.path.join(OUT_DIR, "all_models_metrics.json"), "w") as f:
#     json.dump(results, f, indent=2)

# print("\nSaved all model metrics to:", os.path.join(OUT_DIR, "all_models_metrics.json"))
# ---------- Save all metrics (JSON-safe summary) ----------
from copy import deepcopy
import numpy as np
import json, os

def to_builtin(x):
    # convert numpy types to Python builtins
    if isinstance(x, (np.integer,)):
        return int(x)
    if isinstance(x, (np.floating,)):
        return float(x)
    if isinstance(x, (np.ndarray,)):
        return x.tolist()
    return x

serializable = {}
for name, res in results.items():
    entry = {
        "val":   {"metrics": {}, "topk@3": {}},
        "test":  {"metrics": {}, "topk@3": {}},
    }
    # metrics + topk@3 (already dicts of numbers)
    for split in ("val", "test"):
        for k, v in res[split]["metrics"].items():
            entry[split]["metrics"][k] = to_builtin(v)
        for k, v in res[split]["topk@3"].items():
            entry[split]["topk@3"][k] = to_builtin(v)

    # calibrated test (omit per-class DF; keep thresholds + metrics)
    if res.get("calibrated_test"):
        cal = res["calibrated_test"]
        entry["calibrated_test"] = {
            "metrics": {k: to_builtin(v) for k, v in cal["metrics"].items()},
            "thresholds": [float(t) for t in cal.get("thresholds", [])],
        }

    serializable[name] = entry

ensure_outdir(OUT_DIR)
with open(os.path.join(OUT_DIR, "all_models_metrics.json"), "w") as f:
    json.dump(serializable, f, indent=2)

print("\nSaved summary JSON to:", os.path.join(OUT_DIR, "all_models_metrics.json"))
print("Per-class CSVs are already saved under:", os.path.join(OUT_DIR, "per_model"))


In [None]:
# ---------- (Optional) Evaluate human vs CLAP-only subsets on TEST ----------
if "source" in merged_df.columns:
    print("\n=== Extra: Human vs CLAP-only subsets (TEST) ===")

    test_idx_orig = idx_test  # indices into merged_df
    test_fns = merged_df.iloc[test_idx_orig]["filename"].tolist()
    fn_to_pos = {fn:i for i, fn in enumerate(merged_df["filename"].tolist())}

    # build a mask per subset over Y_test order
    test_sources = merged_df.iloc[test_idx_orig]["source"].fillna("").tolist()
    human_mask = np.array([s.startswith("Human") for s in test_sources])
    clap_only_mask = np.array([s == "CLAP_only" for s in test_sources])

    def subset_eval(mask, model_name, probs):
        if not mask.any():
            return None
        return evaluate_split(Y_test[mask], probs[mask], TAGS, thresh=0.5, per_class=False)["metrics"]

    subset_summary = {}
    for name, model in baselines:
        # reuse probabilities computed above by re-running predict (cheap)
        probs = model_predict_proba(model, X_test_s)
        subset_summary[name] = {
            "human_subset"    : subset_eval(human_mask,    name, probs),
            "clap_only_subset": subset_eval(clap_only_mask, name, probs),
        }

    with open(os.path.join(OUT_DIR, "subset_test_metrics.json"), "w") as f:
        json.dump(subset_summary, f, indent=2)
    print("Saved subset metrics to:", os.path.join(OUT_DIR, "subset_test_metrics.json"))
else:
    print("\n(no 'source' column in labels — skipping human vs CLAP-only subset eval)")