In [1]:
# === Imports & Config ===
import os
import re
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from scipy.signal import welch
from sklearn.model_selection import StratifiedKFold, GridSearchCV, StratifiedShuffleSplit
from sklearn.preprocessing import StandardScaler, label_binarize
from sklearn.pipeline import Pipeline
from sklearn.metrics import (accuracy_score, confusion_matrix, classification_report,
                             roc_curve, auc)
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
import joblib

# ---- Paths & defaults (edit if needed) ----
INPUT_DIR  = r"C:\Users\Admin\Desktop\ALVIN\dataset"
OUTPUT_DIR = r"C:\Users\Admin\Desktop\ALVIN\outputs 1"

WINDOW_SEC   = 5.0     # window length in seconds
OVERLAP      = 0.5     # 50% overlap
DURATION_SEC = 60.0    # seconds per file (used to infer fs)
FS_KNOWN     = None    # set to e.g. 516.67 if you know exact fs; else leave None

# Channel set (19-ch 10/20)
CHANNELS = ['Fp1','Fp2','F3','F4','F7','F8','T3','T4','C3','C4',
            'T5','T6','P3','P4','O1','O2','Fz','Cz','Pz']

# EEG bands
BANDS = {
    "Delta": (0.5, 4.0),
    "Theta": (4.0, 7.0),
    "Alpha": (8.0, 13.0),
    "Beta":  (13.0, 30.0),
    "Gamma": (30.0, 45.0),
}

# Reproducibility
RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)

# Ensure output dir exists
os.makedirs(OUTPUT_DIR, exist_ok=True)

# NEW: split subfolders (just dirs, not logic)
TRAIN_DIR = os.path.join(OUTPUT_DIR, "train")
VAL_DIR   = os.path.join(OUTPUT_DIR, "val")
TEST_DIR  = os.path.join(OUTPUT_DIR, "test")
for d in (TRAIN_DIR, VAL_DIR, TEST_DIR):
    os.makedirs(d, exist_ok=True)


In [3]:
# === Helpers: I/O & labeling ===
def likely_unlabeled(df_head, expected):
    cols = list(df_head.columns)
    if cols == expected:
        return False
    num_like = 0
    for c in cols:
        try:
            float(str(c).strip()); num_like += 1
        except Exception:
            pass
    return num_like >= len(cols) // 2

def ensure_labeled(path, out_dir, expected_channels):
    """
    Ensure an EEG CSV has proper 19-channel headers. If not, assign them and
    save a corrected copy in OUTPUT_DIR. Returns (df, subject_id).
    """
    stem = os.path.splitext(os.path.basename(path))[0]
    try:
        df_try = pd.read_csv(path, nrows=3)
    except Exception:
        df = pd.read_csv(path, header=None)
        df.columns = expected_channels
        df.to_csv(os.path.join(out_dir, f"{stem}_corrected.csv"), index=False)
        return df, stem

    if likely_unlabeled(df_try, expected_channels):
        df = pd.read_csv(path, header=None)
        df.columns = expected_channels
        df.to_csv(os.path.join(out_dir, f"{stem}_corrected.csv"), index=False)
        return df, stem
    else:
        df = pd.read_csv(path)
        # Align if columns contain all expected channels
        if list(df.columns) != expected_channels:
            if set(expected_channels).issubset(set(df.columns)):
                df = df[expected_channels]
            else:
                raise ValueError(f"{path}: columns do not match expected 19 channels.")
        return df, stem

# === Helpers: PSD & bandpowers ===
def welch_psd(x, fs, nperseg, noverlap):
    return welch(x, fs=fs, nperseg=int(nperseg), noverlap=int(noverlap), detrend='constant')

def integrate_bandpower(f, Pxx, band):
    fmin, fmax = band
    idx = (f >= fmin) & (f < fmax)
    if not np.any(idx):
        return 0.0
    return float(np.trapz(Pxx[idx], f[idx]))

def compute_wide_summary(df, fs, bands, channels, out_csv=None):
    """
    Compute absolute & relative band powers over full recording (one row).
    """
    nperseg = int(4 * fs)
    noverlap = int(0.5 * nperseg)
    feats = {}
    for ch in channels:
        x = pd.to_numeric(df[ch], errors='coerce').fillna(0.0).values
        f, Pxx = welch_psd(x, fs, nperseg, noverlap)
        total = float(np.trapz(Pxx, f))
        for b, rng in bands.items():
            abs_p = integrate_bandpower(f, Pxx, rng)
            rel_p = abs_p / total if total > 0 else np.nan
            feats[f"{ch}_{b}"] = abs_p
            feats[f"{ch}_{b}_Rel"] = rel_p
    wide = pd.DataFrame([feats])
    if out_csv:
        wide.to_csv(out_csv, index=False)
    return wide

# === Helpers: windowing & window features ===
def segment_windows(df, fs, win_sec, overlap):
    win_len = int(win_sec * fs)
    hop = max(1, int(win_len * (1 - overlap)))
    return [(s, s + win_len) for s in range(0, len(df) - win_len + 1, hop)]

def compute_window_features(df, fs, window, bands, channels):
    s, e = window
    xw = df.iloc[s:e]
    nperseg = max(4, int(2 * fs))
    noverlap = int(0.5 * nperseg)
    feats = {}
    for ch in channels:
        x = pd.to_numeric(xw[ch], errors='coerce').fillna(0.0).values
        f, Pxx = welch_psd(x, fs, nperseg, noverlap)
        total = float(np.trapz(Pxx, f))
        for b, rng in bands.items():
            abs_p = integrate_bandpower(f, Pxx, rng)
            rel_p = abs_p / total if total > 0 else np.nan
            feats[f"{ch}_{b}"] = abs_p
            feats[f"{ch}_{b}_Rel"] = rel_p
    feats["StartSample"] = s
    feats["Fs"] = fs
    return feats

# === Optional plotting helpers ===
def plot_psd_per_subject(df, fs, channels, out_path, subj):
    nperseg = int(4 * fs)
    noverlap = int(0.5 * nperseg)
    plt.figure(figsize=(12, 8))
    for ch in channels:
        x = pd.to_numeric(df[ch], errors='coerce').fillna(0.0).values
        f, Pxx = welch_psd(x, fs, nperseg, noverlap)
        plt.plot(f, 10*np.log10(Pxx), label=ch)
    plt.xlim(0, 50)
    plt.xlabel("Frequency (Hz)")
    plt.ylabel("Power Spectral Density (dB/Hz)")
    plt.title(f"EEG Channel PSDs — {subj}")
    plt.legend(bbox_to_anchor=(1.05, 1), loc="upper left", fontsize=8)
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()

def plot_band_bars(wide_df, channels, bands, out_abs, out_rel, subj):
    xs = np.arange(len(channels))
    width = 0.15
    band_list = list(bands.keys())

    # Absolute
    plt.figure(figsize=(14, 6))
    for i, b in enumerate(band_list):
        vals = [wide_df[f"{ch}_{b}"].values[0] for ch in channels]
        plt.bar(xs + i*width, vals, width, label=b)
    plt.xticks(xs + width*2, channels, rotation=45)
    plt.ylabel("Absolute Band Power")
    plt.title(f"Band Powers per Channel (Absolute) — {subj}")
    plt.legend()
    plt.tight_layout()
    plt.savefig(out_abs, dpi=150)
    plt.close()

    # Relative
    plt.figure(figsize=(14, 6))
    for i, b in enumerate(band_list):
        vals = [wide_df[f"{ch}_{b}_Rel"].values[0] for ch in channels]
        plt.bar(xs + i*width, vals, width, label=b)
    plt.xticks(xs + width*2, channels, rotation=45)
    plt.ylabel("Relative Band Power")
    plt.title(f"Band Powers per Channel (Relative) — {subj}")
    plt.legend()
    plt.tight_layout()
    plt.savefig(out_rel, dpi=150)
    plt.close()

In [5]:
# === Process all subjects: per-subject wide summary + windowed master features ===
make_plots = True  # set False if you don't want per-subject plots

# Discover input CSVs (ignore previous outputs)
files = [f for f in os.listdir(INPUT_DIR) if f.lower().endswith(".csv")]
exclude = re.compile(r"(bandpowers|features|corrected|wide|master|model|report|roc|cm)", re.I)
candidates = [f for f in files if not exclude.search(f)]
if not candidates:
    raise FileNotFoundError(f"No CSVs found in {INPUT_DIR}")

print(f"Found {len(candidates)} file(s):", ", ".join(sorted(candidates)))

master_rows = []
subjects = []
wide_paths = []

for fname in sorted(candidates):
    path = os.path.join(INPUT_DIR, fname)
    df, subj = ensure_labeled(path, OUTPUT_DIR, CHANNELS)
    subjects.append(subj)

    # fs: known or inferred
    fs = FS_KNOWN if FS_KNOWN else (df.shape[0] / float(DURATION_SEC))
    print(f"[{subj}] samples={len(df)}, fs={fs:.4f} Hz")

    # Wide summary (full 60 s)
    wide_csv = os.path.join(OUTPUT_DIR, f"{subj}_bandpowers_wide.csv")
    wide = compute_wide_summary(df, fs, BANDS, CHANNELS, out_csv=wide_csv)
    wide_paths.append(wide_csv)

    # Optional subject plots
    if make_plots:
        plot_psd_per_subject(df, fs, CHANNELS, os.path.join(OUTPUT_DIR, f"psd_{subj}.png"), subj)
        plot_band_bars(wide, CHANNELS, BANDS,
                       os.path.join(OUTPUT_DIR, f"band_abs_{subj}.png"),
                       os.path.join(OUTPUT_DIR, f"band_rel_{subj}.png"),
                       subj)

    # Windowed features
    windows = segment_windows(df, fs, WINDOW_SEC, OVERLAP)
    for w in windows:
        feats = compute_window_features(df, fs, w, BANDS, CHANNELS)
        feats["Subject"] = subj
        master_rows.append(feats)

# Save master features
if not master_rows:
    raise RuntimeError("No windowed features produced. Check WINDOW_SEC/OVERLAP vs. file length.")

master = pd.DataFrame(master_rows)
master_csv = os.path.join(OUTPUT_DIR, f"eeg_biometric_features_{int(WINDOW_SEC)}s_master.csv")
master.to_csv(master_csv, index=False)
print("Saved master features:", master_csv)
print("Subjects processed:", sorted(set(subjects)))


Found 36 file(s): s00.csv, s01.csv, s02.csv, s03.csv, s04.csv, s05.csv, s06.csv, s07.csv, s08.csv, s09.csv, s10.csv, s11.csv, s12.csv, s13.csv, s14.csv, s15.csv, s16.csv, s17.csv, s18.csv, s19.csv, s20.csv, s21.csv, s22.csv, s23.csv, s24.csv, s25.csv, s26.csv, s27.csv, s28.csv, s29.csv, s30.csv, s31.csv, s32.csv, s33.csv, s34.csv, s35.csv
[s00] samples=31000, fs=516.6667 Hz
[s01] samples=31000, fs=516.6667 Hz
[s02] samples=31000, fs=516.6667 Hz
[s03] samples=31000, fs=516.6667 Hz
[s04] samples=31000, fs=516.6667 Hz
[s05] samples=31000, fs=516.6667 Hz
[s06] samples=31000, fs=516.6667 Hz
[s07] samples=31000, fs=516.6667 Hz
[s08] samples=31000, fs=516.6667 Hz
[s09] samples=31000, fs=516.6667 Hz
[s10] samples=31000, fs=516.6667 Hz
[s11] samples=31000, fs=516.6667 Hz
[s12] samples=31000, fs=516.6667 Hz
[s13] samples=31000, fs=516.6667 Hz
[s14] samples=31000, fs=516.6667 Hz
[s15] samples=31000, fs=516.6667 Hz
[s16] samples=31000, fs=516.6667 Hz
[s17] samples=31000, fs=516.6667 Hz
[s18] sampl

In [6]:
# === NEW: Create stratified Train/Validation/Test splits (60/20/20) and save ===
dfm_full = pd.read_csv(master_csv)

# We stratify by "Subject"
y_all = dfm_full["Subject"].values

# 1) TEST = 20% (hidden)
sss1 = StratifiedShuffleSplit(n_splits=1, test_size=0.20, random_state=RANDOM_SEED)
trainval_idx, test_idx = next(sss1.split(dfm_full, y_all))

# 2) From remaining 80%, VAL = 25% of that -> 20% overall
y_trainval = y_all[trainval_idx]
sss2 = StratifiedShuffleSplit(n_splits=1, test_size=0.25, random_state=RANDOM_SEED)
train_idx_rel, val_idx_rel = next(sss2.split(dfm_full.iloc[trainval_idx], y_trainval))

train_idx = trainval_idx[train_idx_rel]
val_idx   = trainval_idx[val_idx_rel]

df_train = dfm_full.iloc[train_idx].reset_index(drop=True)
df_val   = dfm_full.iloc[val_idx].reset_index(drop=True)
df_test  = dfm_full.iloc[test_idx].reset_index(drop=True)

train_csv = os.path.join(TRAIN_DIR, "eeg_biometric_features_train.csv")
val_csv   = os.path.join(VAL_DIR,   "eeg_biometric_features_val.csv")
test_csv  = os.path.join(TEST_DIR,  "eeg_biometric_features_test.csv")

df_train.to_csv(train_csv, index=False)
df_val.to_csv(val_csv, index=False)
df_test.to_csv(test_csv, index=False)

print("Saved splits:")
print(" - TRAIN:", train_csv, df_train.shape)
print(" - VAL  :", val_csv,   df_val.shape)
print(" - TEST :", test_csv,  df_test.shape)

Saved splits:
 - TRAIN: C:\Users\Admin\Desktop\ALVIN\outputs 1\train\eeg_biometric_features_train.csv (496, 193)
 - VAL  : C:\Users\Admin\Desktop\ALVIN\outputs 1\val\eeg_biometric_features_val.csv (166, 193)
 - TEST : C:\Users\Admin\Desktop\ALVIN\outputs 1\test\eeg_biometric_features_test.csv (166, 193)


In [7]:
# === Modeling setup: load splits, define utilities (same helpers reused) ===
df_train = pd.read_csv(train_csv)
df_val   = pd.read_csv(val_csv)

FEATURE_DROP = ["Subject", "StartSample", "Fs"]

X_tr = df_train.drop(columns=FEATURE_DROP, errors="ignore").values
y_tr = df_train["Subject"].values
X_va = df_val.drop(columns=FEATURE_DROP, errors="ignore").values
y_va = df_val["Subject"].values

CLASSES = sorted(pd.concat([df_train["Subject"], df_val["Subject"]]).unique().tolist())

def plot_confusion_matrix(cm, classes, out_path, title):
    plt.figure(figsize=(6, 5))
    plt.imshow(cm, cmap="Blues")
    plt.title(title)
    plt.xticks(np.arange(len(classes)), classes, rotation=45)
    plt.yticks(np.arange(len(classes)), classes)
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, str(cm[i, j]), ha='center', va='center', fontsize=8)
    plt.colorbar()
    plt.tight_layout()
    plt.savefig(out_path, dpi=150)
    plt.close()

def compute_ovr_roc_eer(proba, y_true, class_names, out_png, out_csv):
    Y = label_binarize(y_true, classes=class_names)
    aucs, eers = [], []
    plt.figure(figsize=(6, 5))
    for i, cname in enumerate(class_names):
        fpr, tpr, thr = roc_curve(Y[:, i], proba[:, i])
        roc_auc = auc(fpr, tpr)
        fnr = 1 - tpr
        idx = np.nanargmin(np.abs(fnr - fpr))
        eer = float((fnr[idx] + fpr[idx]) / 2.0)
        aucs.append(float(roc_auc)); eers.append(float(eer))
        plt.plot(fpr, tpr, label=f"{cname} AUC={roc_auc:.3f}, EER={eer:.3f}")
    plt.plot([0,1],[0,1],'--', lw=1)
    plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate")
    plt.title("One-vs-Rest ROC")
    plt.legend(fontsize=7, loc="lower right")
    plt.tight_layout()
    plt.savefig(out_png, dpi=150)
    plt.close()

    df = pd.DataFrame({"Class": class_names, "AUC": aucs, "EER": eers})
    df.loc[len(df)] = ["macro_avg", float(np.mean(aucs)), float(np.mean(eers))]
    df.to_csv(out_csv, index=False)
    return df

# Pipelines & grids (unchanged from old code)
rf_pipe = Pipeline([
    ("scaler", StandardScaler(with_mean=True, with_std=True)),
    ("clf", RandomForestClassifier(random_state=RANDOM_SEED))
])
rf_grid = {
    "clf__n_estimators": [300, 500],
    "clf__max_depth": [None, 12, 18],
    "clf__min_samples_leaf": [1, 2],
}

svm_pipe = Pipeline([
    ("scaler", StandardScaler(with_mean=True, with_std=True)),
    ("clf", SVC(kernel="rbf", probability=True, random_state=RANDOM_SEED))
])
svm_grid = {
    "clf__C": [1, 5, 10],
    "clf__gamma": ["scale", 0.01, 0.001],
}

# CV on TRAIN only; choose by VAL accuracy
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=RANDOM_SEED)

candidates = []
for name, pipe, grid in [("RandomForest", rf_pipe, rf_grid), ("SVM_RBF", svm_pipe, svm_grid)]:
    gs = GridSearchCV(pipe, grid, cv=skf, n_jobs=-1, scoring="accuracy", verbose=0)
    gs.fit(X_tr, y_tr)
    best = gs.best_estimator_
    val_acc = accuracy_score(y_va, best.predict(X_va))
    candidates.append((name, best, gs.best_params_, gs.best_score_, val_acc))
    print(f"[{name}] CV acc={gs.best_score_:.4f} | VAL acc={val_acc:.4f} | best={gs.best_params_}")

# Select by validation accuracy (tie-breaker: CV acc)
candidates.sort(key=lambda t: (t[4], t[3]), reverse=True)
best_name, best_model, best_params, best_cv_acc, best_val_acc = candidates[0]
print(f"\nSelected model: {best_name} | VAL acc={best_val_acc:.4f} | CV={best_cv_acc:.4f} | params={best_params}")

# Save tuned (train-only) model
tmp_model_path = os.path.join(OUTPUT_DIR, f"chosen_{best_name}_train_tuned.joblib")
joblib.dump(best_model, tmp_model_path)
print("Saved tuned model:", tmp_model_path)

[RandomForest] CV acc=0.9919 | VAL acc=0.9880 | best={'clf__max_depth': None, 'clf__min_samples_leaf': 1, 'clf__n_estimators': 500}
[SVM_RBF] CV acc=0.9637 | VAL acc=0.9759 | best={'clf__C': 10, 'clf__gamma': 0.001}

Selected model: RandomForest | VAL acc=0.9880 | CV=0.9919 | params={'clf__max_depth': None, 'clf__min_samples_leaf': 1, 'clf__n_estimators': 500}
Saved tuned model: C:\Users\Admin\Desktop\ALVIN\outputs 1\chosen_RandomForest_train_tuned.joblib


In [8]:
# Load TEST and merge TRAIN+VAL
df_test = pd.read_csv(test_csv)
df_trainval = pd.concat([df_train, df_val], axis=0, ignore_index=True)

X_trva = df_trainval.drop(columns=FEATURE_DROP, errors="ignore").values
y_trva = df_trainval["Subject"].values

X_te = df_test.drop(columns=FEATURE_DROP, errors="ignore").values
y_te = df_test["Subject"].values

# Retrain chosen model on TRAIN+VAL
final_model = joblib.load(os.path.join(OUTPUT_DIR, f"chosen_{best_name}_train_tuned.joblib"))
final_model.fit(X_trva, y_trva)

# TEST evaluation
y_pred = final_model.predict(X_te)
test_acc = accuracy_score(y_te, y_pred)
print(f"[FINAL] {best_name} — TEST accuracy = {test_acc:.4f}")

# Confusion Matrix (TEST)
CLASSES_ALL = sorted(pd.concat([df_trainval["Subject"], df_test["Subject"]]).unique().tolist())
cm = confusion_matrix(y_te, y_pred, labels=CLASSES_ALL)
cm_png = os.path.join(OUTPUT_DIR, f"cm_TEST_{best_name}.png")
plot_confusion_matrix(cm, CLASSES_ALL, cm_png, f"Confusion Matrix — TEST — {best_name} (acc={test_acc:.3f})")
print("Saved:", cm_png)

# ROC / AUC / EER (TEST)
proba_te = final_model.predict_proba(X_te)
roc_png = os.path.join(OUTPUT_DIR, f"roc_TEST_{best_name}.png")
verif_csv = os.path.join(OUTPUT_DIR, f"verification_TEST_{best_name}.csv")
compute_ovr_roc_eer(proba_te, y_te, CLASSES_ALL, roc_png, verif_csv)
print("Saved:", roc_png)
print("Saved:", verif_csv)

# Classification report (TEST)
rep_csv = os.path.join(OUTPUT_DIR, f"class_report_TEST_{best_name}.csv")
pd.DataFrame(classification_report(y_te, y_pred, labels=CLASSES_ALL, output_dict=True)).transpose().to_csv(rep_csv)
print("Saved:", rep_csv)

# Save final model
final_model_path = os.path.join(OUTPUT_DIR, f"final_{best_name}_trainval.joblib")
joblib.dump(final_model, final_model_path)
print("Saved:", final_model_path)

[FINAL] RandomForest — TEST accuracy = 0.9819
Saved: C:\Users\Admin\Desktop\ALVIN\outputs 1\cm_TEST_RandomForest.png
Saved: C:\Users\Admin\Desktop\ALVIN\outputs 1\roc_TEST_RandomForest.png
Saved: C:\Users\Admin\Desktop\ALVIN\outputs 1\verification_TEST_RandomForest.csv
Saved: C:\Users\Admin\Desktop\ALVIN\outputs 1\class_report_TEST_RandomForest.csv
Saved: C:\Users\Admin\Desktop\ALVIN\outputs 1\final_RandomForest_trainval.joblib
