In [1]:
# ============================================================
#  DYSARTHRIA DETECTION PIPELINE — KAGGLE SAFE VERSION
#  - No Parselmouth (all features from librosa / numpy)
#  - Works with train/val/test sheets in XLSX
# ============================================================

import os
import numpy as np
import pandas as pd
import librosa
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score, f1_score, classification_report
from xgboost import XGBClassifier

# ------------------------------------------------------------
# 1. Load metadata sheets
# ------------------------------------------------------------
meta_path = "/kaggle/input/train-sand/task1/sand_task_1.xlsx"

train_df = pd.read_excel(meta_path, sheet_name="Training Baseline - Task 1")
val_df   = pd.read_excel(meta_path, sheet_name="Validation Baseline - Task 1")
extra_df = pd.read_excel(meta_path, sheet_name="SAND - TRAINING set - Task 1") 

BASE = "/kaggle/input/train-sand/task1/training"

TASK_DIRS = {
    "phonationA": "phonationA",
    "phonationE": "phonationE",
    "phonationI": "phonationI",
    "phonationO": "phonationO",
    "phonationU": "phonationU",
    "rhythmPA":  "rhythmPA",
    "rhythmTA":  "rhythmTA",
    "rhythmKA":  "rhythmKA",
}

# ============================================================
#  Feature extraction functions (Kaggle-safe)
# ============================================================

def get_mfcc(y, sr):
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
    return np.concatenate([mfcc.mean(axis=1), mfcc.std(axis=1)])


# ---------- LPC Formants (Kaggle-safe) ----------
def get_formants(y, sr, order=12):
    try:
        y_pre = librosa.effects.preemphasis(y)
        A = librosa.lpc(y_pre, order)
        roots = np.roots(A)
        roots = roots[np.imag(roots) >= 0]
        ang = np.angle(roots)
        formants = sorted(ang * (sr / (2 * np.pi)))
        formants = [f for f in formants if 90 < f < 5000]
        while len(formants) < 3:
            formants.append(0)
        return np.array(formants[:3])
    except:
        return np.array([0, 0, 0])


# ---------- Jitter (cycle variability without Praat) ----------
def get_jitter(y, sr):
    zc = librosa.zero_crossings(y, pad=False)
    idx = np.where(zc)[0]
    if len(idx) < 2:
        return 0
    diffs = np.diff(idx) / sr
    return np.std(diffs) / (np.mean(diffs) + 1e-6)


# ---------- Shimmer (amplitude variability) ----------
def get_shimmer(y):
    frame_amp = librosa.feature.rms(y=y)[0]
    return np.std(frame_amp) / (np.mean(frame_amp) + 1e-6)


# ---------- HNR (harmonic to noise ratio using HPSS) ----------
def get_hnr(y):
    y_harm, y_perc = librosa.effects.hpss(y)
    harm_energy = np.sum(y_harm ** 2)
    noise_energy = np.sum(y_perc ** 2) + 1e-6
    return 10 * np.log10(harm_energy / noise_energy)


# ---------- CPP (simple spectral peak prominence) ----------
def get_cpp(y, sr):
    S = librosa.amplitude_to_db(np.abs(librosa.stft(y)))
    freqs = librosa.fft_frequencies(sr=sr)
    region = S[(freqs > 80) & (freqs < 4000)]
    return region.max() - region.mean()


# ---------- Energy ----------
def get_energy(y):
    return np.mean(y**2), np.std(y**2)


# ---------- VOT / rhythmic stability for PA / TA / KA ----------
def get_vot(y, sr):
    onset_env = librosa.onset.onset_strength(y=y, sr=sr)
    peaks = librosa.onset.onset_detect(onset_envelope=onset_env, sr=sr)

    if len(peaks) < 2:
        return np.array([0, 0, 0, 0])

    intervals = np.diff(peaks) / sr
    return np.array([intervals.mean(), intervals.std(), len(intervals), intervals.max()])


# ============================================================
#  Extract features for a single audio file
# ============================================================
def extract_features(path):
    if not os.path.exists(path):
        return None

    y, sr = librosa.load(path, sr=16000)
    feats = {}

    # MFCCs
    mf = get_mfcc(y, sr)
    for i, v in enumerate(mf):
        feats[f"mfcc_{i}"] = v

    # Formants
    f1, f2, f3 = get_formants(y, sr)
    feats["F1"], feats["F2"], feats["F3"] = f1, f2, f3

    # Voice quality
    feats["jitter"] = get_jitter(y, sr)
    feats["shimmer"] = get_shimmer(y)
    feats["HNR"] = get_hnr(y)
    feats["CPP"] = get_cpp(y, sr)

    # Energy
    en_m, en_s = get_energy(y)
    feats["energy_mean"] = en_m
    feats["energy_std"] = en_s

    # VOT only for rhythm tasks
    if any(k in path for k in ["PA", "TA", "KA"]):
        vot = get_vot(y, sr)
        feats["vot_mean"] = vot[0]
        feats["vot_std"] = vot[1]
        feats["vot_count"] = vot[2]
        feats["vot_max"] = vot[3]
    
    return feats


# ============================================================
#  Extract features per subject
# ============================================================
def extract_subject(ID):
    feat_dict = {}

    for task, folder in TASK_DIRS.items():
        file_path = f"{BASE}/{folder}/{ID}_{folder}.wav"

        feats = extract_features(file_path)
        print('feat extracted')
        if feats is None:
            continue

        # Prefix
        for k, v in feats.items():
            feat_dict[f"{folder}_{k}"] = v

    return feat_dict


# ============================================================
#  Build complete feature table
# ============================================================
from tqdm import tqdm

def extract_subject(ID):
    feat_dict = {"ID": ID}
    found_any = False

    for task, folder in TASK_DIRS.items():
        file_path = f"{BASE}/{folder}/{ID}_{folder}.wav"

        if not os.path.exists(file_path):
            # Mark missing for debugging
            feat_dict[f"{folder}_missing"] = 1
            continue

        found_any = True
        feats = extract_features(file_path)

        for k, v in feats.items():
            feat_dict[f"{folder}_{k}"] = v

    feat_dict["found_audio"] = int(found_any)
    return feat_dict


def build_table(meta):
    rows = []
    print("\nExtracting features for", len(meta), "subjects...\n")

    for _, r in tqdm(meta.iterrows(), total=len(meta)):
        ID = r["ID"]

        feats = extract_subject(ID)

        feats["age"] = r["Age"]
        feats["sex"] = r["Sex"]
        feats["label"] = r["Class"]

        rows.append(feats)

    df = pd.DataFrame(rows)
    return df



print("Extracting TRAIN features...")
train_feat = build_table(train_df)

print("Extracting VAL features...")
val_feat = build_table(val_df)

# print("Extracting TEST features...")
# test_feat = build_table(test_df)

# train_df = train_df[["ID", "Class"]]
# val_df = val_df[["ID", "Class"]]
# # ============================================================
# #  10-fold Cross Validation on TRAIN ONLY
# # ============================================================
# # ============================================================
# X = train_feat.drop(columns=["ID", "label", "sex", "found_audio"])
# y = train_feat["label"]

# scaler = StandardScaler()
# X_scaled = scaler.fit_transform(X)

# skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=42)

# accs, f1s = [], []

# for tr_idx, va_idx in skf.split(X_scaled, y):
#     X_tr, X_va = X_scaled[tr_idx], X_scaled[va_idx]
#     y_tr, y_va = y.iloc[tr_idx], y.iloc[va_idx]

#     model = XGBClassifier(
#         n_estimators=300,
#         learning_rate=0.05,
#         max_depth=6,
#         subsample=0.8,
#         colsample_bytree=0.8,
#         objective="multi:softprob",
#         num_class=5,
#         eval_metric="mlogloss"
#     )
#     model.fit(X_tr, y_tr)
#     preds = model.predict(X_va)

#     accs.append(accuracy_score(y_va, preds))
#     f1s.append(f1_score(y_va, preds, average="weighted"))

# print("=== 10-FOLD CV RESULTS ===")
# print("Accuracy:", np.mean(accs))
# print("F1 Score:", np.mean(f1s))


# # ============================================================
# #  Final evaluation on VAL & TEST splits
# # ============================================================
# final_model = model  # last trained model

# def evaluate(df):
#     X = df.drop(columns=["ID", "label"]).fillna(0)
#     X = scaler.transform(X)
#     preds = final_model.predict(X)
#     print(classification_report(df["label"], preds))

# print("\n=== VALIDATION SET ===")
# evaluate(val_feat)

Extracting TRAIN features...

Extracting features for 219 subjects...



  0%|          | 0/219 [00:17<?, ?it/s]


KeyboardInterrupt: 

In [None]:
train_feat.to_feather("/kaggle/working/train_features.feather")
val_feat.to_feather("/kaggle/working/val_features.feather")

print("Saved feature tables!")



In [None]:
import os
os.listdir("/kaggle/working")

In [None]:
import pandas as pd

train_feat = pd.read_feather("/kaggle/working/train_features.feather")
val_feat   = pd.read_feather("/kaggle/working/val_features.feather")


In [None]:
# Convert class labels 1–5 → 0–4
X = train_feat.drop(columns=["ID", "label","age", "sex", "found_audio"])
y = train_feat["label"]- 1

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from imblearn.combine import SMOTETomek
from imblearn.over_sampling import SMOTE

smt = SMOTETomek(random_state = 42,smote=SMOTE(k_neighbors=1))
X_smt,y_smt = smt.fit_resample(X_scaled,y)

In [None]:
skf = StratifiedKFold(n_splits=4, shuffle=True, random_state=42)

accs, f1s = [], []

for tr_idx, va_idx in skf.split(X_smt, y_smt):
    X_tr, X_va = X_smt[tr_idx], X_smt[va_idx]
    y_tr, y_va = y_smt.iloc[tr_idx], y_smt.iloc[va_idx]

    model = XGBClassifier(
        n_estimators=300,
        learning_rate=0.05,
        max_depth=6,
        subsample=0.8,
        colsample_bytree=0.8,
        objective="multi:softprob",
        num_class=5,
        eval_metric="mlogloss"
    )
    model.fit(X_tr, y_tr)
    preds = model.predict(X_va)

    accs.append(accuracy_score(y_va, preds))
    f1s.append(f1_score(y_va, preds, average="weighted"))

print("=== 4-FOLD CV RESULTS ===")
print("Accuracy:", np.mean(accs))
print("F1 Score:", np.mean(f1s))

In [None]:
from sklearn.metrics import confusion_matrix as cm
import seaborn as sns

In [None]:
# ============================================================
#  Final evaluation on VAL & TEST splits
# ============================================================
final_model = model  # last trained model
X_val = val_feat.drop(columns=["ID", "label","age", "sex", "found_audio"])
y_val = val_feat["label"]- 1
X_val = scaler.transform(X_val)
preds = final_model.predict(X_val)
print(classification_report(y_val, preds))
sns.heatmap(cm(y_val,preds),annot = True)

In [None]:
X.shape, y.shape, np.unique(y, return_counts=True)

# HYPERPARAMETER TUNNING XGBOOST

In [None]:
import optuna
import numpy as np
from sklearn.metrics import make_scorer, f1_score
from sklearn.model_selection import cross_val_score

In [None]:
!pip install --force-reinstall numpy==1.26.4 scikit-learn==1.2.2 joblib==1.3.2

In [None]:
def objective(trial):

    # XGBoost param space
    param = {
        'objective': 'multi:softprob',
        'num_class': 5,                           # <<== CRITICAL
        'eval_metric': 'mlogloss',
        'verbosity': 0,
        'booster': 'gbtree',
        'lambda': trial.suggest_float('lambda', 1e-3, 10.0, log=True),
        'alpha': trial.suggest_float('alpha', 1e-3, 10.0, log=True),
        'subsample': trial.suggest_float('subsample', 0.4, 1.0),
        'colsample_bytree': trial.suggest_float('colsample_bytree', 0.4, 1.0),
        'max_depth': trial.suggest_int('max_depth', 3, 10),
        'eta': trial.suggest_float('eta', 0.01, 0.3),
        'gamma': trial.suggest_float('gamma', 0, 10),
        'min_child_weight': trial.suggest_int('min_child_weight', 1, 10),
        'max_delta_step': trial.suggest_int('max_delta_step', 0, 10),
        'nthread': 1                               # <<== Optuna safe
    }

    model = XGBClassifier(**param)

    skf = StratifiedKFold(n_splits=3, shuffle=True, random_state=42)

    f1_scores = []

    # Fold-wise CV
    for train_idx, val_idx in skf.split(X_smt, y_smt):
        
        X_train, X_val = X_smt[train_idx], X_smt[val_idx]
        y_train, y_val = y_smt.iloc[train_idx], y_smt.iloc[val_idx]
        
        model.fit(X_train, y_train)
        
        y_pred = model.predict(X_val)
        f1 = f1_score(y_val, y_pred, average='macro')
        f1_scores.append(f1)

    return np.mean(f1_scores)

study_xgb = optuna.create_study(direction='maximize')
study_xgb.optimize(objective, n_trials=50)

In [None]:
print('Best trial:')
trial = study_xgb.best_trial
print('  F1-score: {}'.format(trial.value))
print('  Params: ')
for key, value in trial.params.items():
    print('    {}: {}'.format(key, value))
    
best_model_xgb = XGBClassifier(**study_xgb.best_params)
best_model_xgb.fit(X_smt, y_smt)

# Evaluate on the test set
y_pred = best_model_xgb.predict(X_val)

report = classification_report(y_val, y_pred)
sns.heatmap(cm(y_pred,y_val),annot = True)
print(report)

In [None]:
import joblib
joblib.dump(best_model_xgb, "xgb_model.pkl")
joblib.dump(scaler, "scaler.pkl")

print("Model and scaler saved successfully!")

In [None]:
from sklearn.calibration import CalibratedClassifierCV

calibrated_model = CalibratedClassifierCV(
    estimator=best_model_xgb,
    method='sigmoid',
    cv=3
)
calibrated_model.fit(X_smt, y_smt)

In [None]:
proba = best_model_xgb.predict_proba(X_val)

In [None]:
from sklearn.metrics import f1_score
def find_best_conf_threshold(y_true, proba):
    thresholds = np.arange(0.00, 1.01, 0.01)
    best_f1 = 0
    best_t = 0

    for t in thresholds:
        preds = []
        for row in proba:
            top_prob = np.max(row)
            top_class = np.argmax(row)

            second_class = np.argsort(row)[-2]

            # if confidence < threshold → choose second-best
            if top_prob < t:
                preds.append(second_class)
            else:
                preds.append(top_class)

        f1 = f1_score(y_true, preds, average="macro")
        if f1 > best_f1:
            best_f1 = f1
            best_t = t

    return best_t, best_f1

In [None]:
best_t, best_f1 = find_best_conf_threshold(y_val, proba)

print("Best threshold:", best_t)
print("Best macro-F1:", best_f1)

In [None]:
def threshold_predict(proba, threshold):
    preds = []
    for row in proba:
        top_prob = np.max(row)
        top_class = np.argmax(row)
        second_class = np.argsort(row)[-2]

        if top_prob < threshold:
            preds.append(second_class)
        else:
            preds.append(top_class)

    return np.array(preds)

In [None]:
preds = threshold_predict(proba, best_t)
print(classification_report(y_val, preds))
sns.heatmap(cm(y_val, preds), annot=True)

In [None]:
%%bash
set -e

# ===== CONFIGURE THESE 3 LINES =====
GITHUB_USER="Roxrite0509"
GITHUB_TOKEN="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"   # <-- paste PAT here
REPO_NAME="SAND_IEEE_COMPETTION"
# ===================================

# 1. Configure your Git identity
git config --global user.name "Roxrite0509"
git config --global user.email "roxrite0509@gmail.com"

# 2. Go to Kaggle working directory
cd /kaggle/working

echo "Working directory: $(pwd)"
echo "Files here:"
ls -A || true

# 3. Initialize repo if needed
if [ ! -d ".git" ]; then
    git init
fi

# 4. Ensure we are on 'main' branch
git branch -M main || true

# 5. Set GitHub remote with token (non-interactive auth)
git remote remove origin 2>/dev/null || true
git remote add origin "https://${Roxrite0509}:${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}@github.com/${Roxrite0509}/${SAND_IEEE_COMPETTION}"

# 6. Add ALL files
git add -A

# 7. Commit (allow empty so push always works)
git commit -m "Push from Kaggle notebook" --allow-empty || true

# 8. Push to GitHub (force overwrite)
git push -u origin main --force
