# Train spontaneous  here and test with spontaneous

In [1]:
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import balanced_accuracy_score, f1_score

In [2]:
path = "/mnt/d/Roshidat_Msc_Project/Audio_parkinson/pd&Hc_multi/spontaneous_h_fusion/spontaneous_fused_readtext.npz"

In [3]:
data = np.load(path)
X, y, ids = data["X"], data["y"], data["ids"]

train_data, test_data, train_labels, test_labels = train_test_split(X, y, test_size=0.2, random_state=42)

svm = make_pipeline(StandardScaler(with_mean=True, with_std=True),
                    SVC(kernel="rbf", C=1.0, gamma="scale", class_weight="balanced"))

In [4]:
svm.fit(train_data, train_labels)
y_predict = svm.predict(test_data)

print("SVM  bal-acc:", balanced_accuracy_score(test_labels, y_predict), "macro-F1:", f1_score(test_labels, y_predict, average="macro"))

SVM  bal-acc: 0.9 macro-F1: 0.873015873015873


# RandomForest – scaling not necessary

In [5]:
rf = RandomForestClassifier(n_estimators=400, class_weight="balanced", max_depth=None, random_state=42)
rf.fit(train_data, train_labels)
yp = rf.predict(test_data)
print("RF   bal-acc:", balanced_accuracy_score(test_labels, y_predict), "macro-F1:", f1_score(test_labels, y_predict, average="macro"))

RF   bal-acc: 0.9 macro-F1: 0.873015873015873


# Calibrated Classifier CV
quick check on the accuracy

In [6]:
import re, numpy as np
from sklearn.model_selection import StratifiedGroupKFold, GridSearchCV
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import balanced_accuracy_score, f1_score, confusion_matrix, classification_report

data = np.load("/mnt/d/Roshidat_Msc_Project/Audio_parkinson/pd&Hc_multi/spontaneous_h_fusion/spontaneous_fused_readtext.npz", allow_pickle=False)
X, y, ids = data["X"], data["y"], data["ids"]
patients = np.array([re.match(r"^ID\d+", i).group(0) if re.match(r"^ID\d+", i) else i.split("_")[0] for i in ids])

# 1) patient-stratified CV score (5 folds or 3 if tiny)
cv = StratifiedGroupKFold(n_splits=5, shuffle=True, random_state=42)
pipe = make_pipeline(StandardScaler(with_mean=True, with_std=True),
                     SVC(kernel="rbf", class_weight="balanced", probability=True))

ba, f1 = [], []
for tr, te in cv.split(X, y, groups=patients):
    pipe.fit(X[tr], y[tr])
    yp = pipe.predict(X[te])
    ba.append(balanced_accuracy_score(y[te], yp))
    f1.append(f1_score(y[te], yp, average="macro"))
print(f"SVM (patient CV) BA {np.mean(ba):.3f}±{np.std(ba):.3f} | F1 {np.mean(f1):.3f}±{np.std(f1):.3f}")

# 2) quick hyperparam tuning on CV (still grouped)
param_grid = {"svc__C":[0.3,1,3,10], "svc__gamma":["scale", 0.01, 0.03, 0.1]}
gs = GridSearchCV(pipe, param_grid, scoring="balanced_accuracy", cv=cv, n_jobs=-1)
gs.fit(X, y, groups=patients)
print("Best params:", gs.best_params_, "CV BA:", gs.best_score_)

# 3) Train final model on ALL Spontaneous and inspect errors via CV last split
best = gs.best_estimator_
tr, te = list(cv.split(X, y, groups=patients))[-1]
best.fit(X[tr], y[tr])
yp = best.predict(X[te])
print("Fold confusion:\n", confusion_matrix(y[te], yp))
print(classification_report(y[te], yp, digits=3))


SVM (patient CV) BA 0.983±0.033 | F1 0.958±0.085
Best params: {'svc__C': 0.3, 'svc__gamma': 'scale'} CV BA: 0.9833333333333334
Fold confusion:
 [[3 0]
 [0 4]]
              precision    recall  f1-score   support

           0      1.000     1.000     1.000         3
           1      1.000     1.000     1.000         4

    accuracy                          1.000         7
   macro avg      1.000     1.000     1.000         7
weighted avg      1.000     1.000     1.000         7



In [7]:
import re, numpy as np
from sklearn.model_selection import LeaveOneGroupOut
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import balanced_accuracy_score, f1_score

# X,y,ids from fused_spontaneous.npz
patients = np.array([re.match(r"^ID\d+", i).group(0) if re.match(r"^ID\d+", i) else i.split("_")[0] for i in ids])

pipe = make_pipeline(StandardScaler(with_mean=True, with_std=True),
                     SVC(kernel="rbf", C=0.3, gamma="scale", class_weight="balanced"))
logo = LeaveOneGroupOut()
ba, f1 = [], []
for tr, te in logo.split(X, y, groups=patients):
    pipe.fit(X[tr], y[tr])
    yp = pipe.predict(X[te])
    ba.append(balanced_accuracy_score(y[te], yp))
    f1.append(f1_score(y[te], yp, average="macro"))
print(f"LOPOCV | BA {np.mean(ba):.3f}±{np.std(ba):.3f} | F1 {np.mean(f1):.3f}±{np.std(f1):.3f}")


LOPOCV | BA 0.972±0.164 | F1 0.972±0.164




In [8]:
from sklearn.model_selection import StratifiedGroupKFold
from sklearn.metrics import balanced_accuracy_score, f1_score

cv = StratifiedGroupKFold(n_splits=5, shuffle=True, random_state=42)
y_true, y_pred = [], []
for tr, te in cv.split(X, y, groups=patients):
    pipe.fit(X[tr], y[tr])
    y_true.extend(y[te]); y_pred.extend(pipe.predict(X[te]))
y_true = np.array(y_true); y_pred = np.array(y_pred)

def boot_ci(metric_fn, y_true, y_pred, B=2000, rng=0):
    rng = np.random.default_rng(rng)
    stats = []
    n = len(y_true)
    for _ in range(B):
        idx = rng.integers(0, n, n)
        stats.append(metric_fn(y_true[idx], y_pred[idx]))
    stats = np.sort(stats)
    return stats[int(0.025*B)], stats[int(0.975*B)]

ba = balanced_accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, average="macro")
ba_lo, ba_hi = boot_ci(balanced_accuracy_score, y_true, y_pred)
f1_lo, f1_hi = boot_ci(lambda a,b: f1_score(a,b,average="macro"), y_true, y_pred)
print(f"BA {ba:.3f} (95% CI {ba_lo:.3f}–{ba_hi:.3f}) | F1 {f1:.3f} (95% CI {f1_lo:.3f}–{f1_hi:.3f})")


BA 0.969 (95% CI 0.893–1.000) | F1 0.972 (95% CI 0.908–1.000)


 i want to do ablation study and external validation

In [13]:
import re, numpy as np
path = "/mnt/d/Roshidat_Msc_Project/Audio_parkinson/pd&Hc_multi/spontaneous_h_fusion/spontaneous_fused_readtext.npz"
data = np.load(path, allow_pickle=False)
X, y, ids = data["X"], data["y"], data["ids"]

D_TEXT, D_AUDIO, D_CLIP = 768, 768, 512
sl_text  = slice(0, D_TEXT)
sl_audio = slice(D_TEXT, D_TEXT + D_AUDIO)
sl_clip  = slice(D_TEXT + D_AUDIO, D_TEXT + D_AUDIO + D_CLIP)

# patient grouping: split by patient to avoid speaker leakage
patients = np.array([re.match(r"^ID\d+", i).group(0) if re.match(r"^ID\d+", i) else i.split("_")[0] for i in ids])

In [14]:
from sklearn.model_selection import StratifiedGroupKFold
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import balanced_accuracy_score, f1_score

def eval_cv(Xsub, y, groups, clf, n_splits=5, seed=42):
    cv = StratifiedGroupKFold(n_splits=n_splits, shuffle=True, random_state=seed)
    ba, f1 = [], []
    for tr, te in cv.split(Xsub, y, groups=groups):
        clf.fit(Xsub[tr], y[tr])
        yp = clf.predict(Xsub[te])
        ba.append(balanced_accuracy_score(y[te], yp))
        f1.append(f1_score(y[te], yp, average="macro"))
    return float(np.mean(ba)), float(np.std(ba)), float(np.mean(f1)), float(np.std(f1))



In [15]:
svm = make_pipeline(
    StandardScaler(with_mean=True, with_std=True),
    SVC(kernel="rbf", C=1.0, gamma="scale", class_weight="balanced")
)
rf = RandomForestClassifier(
    n_estimators=400, class_weight="balanced", max_depth=None, random_state=42
)


In [16]:
def make_view(X, slices):
    return np.concatenate([X[:, s] for s in slices], axis=1)

ablations = {
    "text_only":   [sl_text],
    "audio_only":  [sl_audio],
    "clip_only":   [sl_clip],
    "text+audio":  [sl_text, sl_audio],
    "text+clip":   [sl_text, sl_clip],
    "audio+clip":  [sl_audio, sl_clip],
    "all_three":   [sl_text, sl_audio, sl_clip],
}

print("=== SVM (RBF) ===")
for name, sls in ablations.items():
    Xsub = make_view(X, sls)
    ba_m, ba_s, f1_m, f1_s = eval_cv(Xsub, y, patients, svm)
    print(f"{name:12s} | BA {ba_m:.3f}±{ba_s:.3f} | F1 {f1_m:.3f}±{f1_s:.3f}")

print("\n=== RandomForest ===")
for name, sls in ablations.items():
    Xsub = make_view(X, sls)
    ba_m, ba_s, f1_m, f1_s = eval_cv(Xsub, y, patients, rf)
    print(f"{name:12s} | BA {ba_m:.3f}±{ba_s:.3f} | F1 {f1_m:.3f}±{f1_s:.3f}")


=== SVM (RBF) ===
text_only    | BA 0.983±0.033 | F1 0.958±0.085
audio_only   | BA 0.983±0.033 | F1 0.958±0.085
clip_only    | BA 0.983±0.033 | F1 0.958±0.085
text+audio   | BA 0.983±0.033 | F1 0.958±0.085
text+clip    | BA 0.983±0.033 | F1 0.958±0.085
audio+clip   | BA 0.983±0.033 | F1 0.958±0.085
all_three    | BA 0.983±0.033 | F1 0.958±0.085

=== RandomForest ===
text_only    | BA 0.983±0.033 | F1 0.958±0.085
audio_only   | BA 0.983±0.033 | F1 0.958±0.085
clip_only    | BA 0.983±0.033 | F1 0.958±0.085
text+audio   | BA 0.983±0.033 | F1 0.958±0.085
text+clip    | BA 0.983±0.033 | F1 0.958±0.085
audio+clip   | BA 0.983±0.033 | F1 0.958±0.085
all_three    | BA 0.983±0.033 | F1 0.958±0.085


In [17]:
rng = np.random.default_rng(123)
perm_bas = []
for i in range(50):  # 50 permutations
    y_perm = rng.permutation(y)
    Xsub = make_view(X, [sl_text, sl_audio, sl_clip])  # all features
    ba_m, _, _, _ = eval_cv(Xsub, y_perm, patients, svm, n_splits=5, seed=42+i)
    perm_bas.append(ba_m)

print("Permutation BA mean±sd over 50 runs:", np.mean(perm_bas), "±", np.std(perm_bas))




Permutation BA mean±sd over 50 runs: 0.49539523809523817 ± 0.11340795472545714


In [18]:
# A) Exact duplicates across samples?
import hashlib
hashes = [hashlib.md5(X[i].tobytes()).hexdigest() for i in range(X.shape[0])]
dups = [ids[i] for i in range(X.shape[0]) if hashes.count(hashes[i]) > 1]
print("Duplicate feature vectors:", sorted(set(dups))[:10])

# B) Do any patients appear in both train and test folds in a manual split?
# (If you are not using GroupKFold elsewhere, switch to it.)


Duplicate feature vectors: []


In [19]:
import re, numpy as np
from collections import Counter
from sklearn.model_selection import StratifiedKFold
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.metrics import balanced_accuracy_score, f1_score

# X, y, ids loaded from fused_readtext.npz
# patient id: "ID00_hc_0_0_0" -> "ID00"
patients = np.array([re.match(r"^ID\d+", i).group(0) if re.match(r"^ID\d+", i) else i.split("_")[0] for i in ids])

# one label per patient (majority vote)
pat2ys = {}
for pid, yy in zip(patients, y):
    pat2ys.setdefault(pid, []).append(int(yy))
patients_unique = np.array(sorted(pat2ys.keys()))
patient_labels = np.array([Counter(pat2ys[pid]).most_common(1)[0][0] for pid in patients_unique])

# choose K so every fold can contain both classes
min_per_class = min(np.bincount(patient_labels))
K = min(5, max(2, min_per_class))   # e.g., 3 if classes are small
skf = StratifiedKFold(n_splits=K, shuffle=True, random_state=42)

svm = make_pipeline(StandardScaler(with_mean=True, with_std=True),
                    SVC(kernel="rbf", C=1.0, gamma="scale", class_weight="balanced"))

def eval_patient_folds(X, y, patients, patients_unique, patient_labels, clf):
    ba, f1 = [], []
    # map sample->patient index
    pid_to_idx = {pid:i for i,pid in enumerate(patients_unique)}
    sample_pat_idx = np.array([pid_to_idx[p] for p in patients])

    for tr_pat, te_pat in skf.split(patients_unique, patient_labels):
        tr_mask = np.isin(sample_pat_idx, tr_pat)
        te_mask = np.isin(sample_pat_idx, te_pat)
        # sanity: both classes present in test
        if len(np.unique(y[te_mask])) < 2:
            # skip or merge with neighbor fold; easiest is to reduce K and rerun
            continue
        clf.fit(X[tr_mask], y[tr_mask])
        yp = clf.predict(X[te_mask])
        ba.append(balanced_accuracy_score(y[te_mask], yp))
        f1.append(f1_score(y[te_mask], yp, average="macro"))
    return np.mean(ba), np.std(ba), np.mean(f1), np.std(f1)

ba_m, ba_s, f1_m, f1_s = eval_patient_folds(X, y, patients, patients_unique, patient_labels, svm)
print(f"SVM (patient-stratified {K}-fold) | BA {ba_m:.3f}±{ba_s:.3f} | F1 {f1_m:.3f}±{f1_s:.3f}")


SVM (patient-stratified 5-fold) | BA 0.967±0.067 | F1 0.969±0.062


In [20]:
rng = np.random.default_rng(123)
perm = []
for i in range(50):
    y_perm = rng.permutation(y)
    ba_m, _, _, _ = eval_patient_folds(X, y_perm, patients, patients_unique, patient_labels, svm)
    perm.append(ba_m)
print("Permutation (patient-folds) BA mean±sd:", np.mean(perm), "±", np.std(perm))


Permutation (patient-folds) BA mean±sd: 0.49162142857142854 ± 0.11569581689276955
