In [5]:
# ---------- FIXED single cell: robust benchmark + tiny EEGNet ----------
# Requirements: numpy, torch, sklearn, mne, pandas, joblib, matplotlib
import os, random, numpy as np, torch
import torch.nn as nn, torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import StratifiedKFold, train_test_split
from sklearn.metrics import accuracy_score, f1_score, confusion_matrix
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from mne.decoding import CSP
import joblib, pandas as pd

# reproducibility
def seed_everything(seed=42):
    random.seed(seed); np.random.seed(seed); torch.manual_seed(seed)
    if torch.cuda.is_available(): torch.cuda.manual_seed_all(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
seed_everything(42)

# -------- load + sanitize preprocessed.npz --------
path = "preprocessed.npz"
if not os.path.exists(path):
    raise FileNotFoundError("Put preprocessed.npz (X,y, optional meta) in working dir.")

d = np.load(path, allow_pickle=True)
if 'X' not in d or 'y' not in d:
    raise KeyError("preprocessed.npz must contain arrays 'X' and 'y'")

X = d['X']  # expected shape (n_epochs, n_chans, n_times)
y = d['y'].astype(int)
# safe meta
meta = {}
if 'meta' in d:
    meta_raw = d['meta']
    try:
        meta = meta_raw.item() if meta_raw.shape == () else dict(meta_raw)
    except Exception:
        meta = {}

print("Loaded X.shape", X.shape, "y.shape", y.shape)
# fix labels to be 0..C-1
y = y - y.min()
print("Labels after fix:", np.unique(y), "counts:", dict(zip(*np.unique(y, return_counts=True))))

# -------- CSP + classical baseline (stratified k-fold) --------
use_csp = True
results_classical = {}
if use_csp:
    n_csp = min(6, X.shape[0])  # don't request more components than samples
    csp = CSP(n_components=n_csp, log=True, norm_trace=False)
    try:
        X_csp = csp.fit_transform(X, y)
    except Exception as e:
        print("CSP fit_transform failed:", e)
        X_csp = None
    if X_csp is not None:
        classifiers = {
            'LDA': Pipeline([('sc', StandardScaler()), ('clf', LinearDiscriminantAnalysis())]),
            'SVM-rbf': Pipeline([('sc', StandardScaler()), ('clf', SVC(kernel='rbf', C=1, probability=True))]),
            'RandomForest': RandomForestClassifier(n_estimators=100, random_state=42),
            'MLP': Pipeline([('sc', StandardScaler()), ('clf', MLPClassifier(hidden_layer_sizes=(50,), max_iter=300))])
        }
        skf = StratifiedKFold(n_splits=min(5, max(2, int(np.floor(X.shape[0]/2)))), shuffle=True, random_state=42)
        for name, clf in classifiers.items():
            accs, f1s = [], []
            for tr, te in skf.split(X_csp, y):
                clf.fit(X_csp[tr], y[tr])
                p = clf.predict(X_csp[te])
                accs.append(accuracy_score(y[te], p)); f1s.append(f1_score(y[te], p, average='weighted'))
            results_classical[name] = {'acc_mean': float(np.mean(accs)), 'acc_std': float(np.std(accs)),
                                       'f1_mean': float(np.mean(f1s)), 'f1_std': float(np.std(f1s))}
            print(f"[CSP] {name}: acc {results_classical[name]['acc_mean']:.3f} ± {results_classical[name]['acc_std']:.3f}, f1 {results_classical[name]['f1_mean']:.3f}")

# save CSP objects if present
if use_csp and 'csp' in locals():
    joblib.dump({'csp': csp, 'classical_results': results_classical}, 'classical_artifacts.pkl')

# -------- Tiny EEGNet model (only if enough samples) --------
n_samples = X.shape[0]
MIN_SAMPLES_FOR_CNN = 50  # safety threshold
device = 'cuda' if torch.cuda.is_available() else 'cpu'

class TorchEEGDataset(Dataset):
    def __init__(self, X, y, augment=False):
        self.X = X.astype(np.float32)
        self.y = y.astype(np.int64)
        self.augment = augment
    def __len__(self): return len(self.y)
    def __getitem__(self, idx):
        x = self.X[idx]
        # no heavy augment by default; could add augmentation here if desired
        return torch.tensor(x, dtype=torch.float32), torch.tensor(int(self.y[idx]), dtype=torch.long)

class EEGNet(nn.Module):
    def __init__(self, chans, samples, classes=2, kern_len=64, F1=8, D=2, F2=16, dropout=0.5):
        super().__init__()
        self.first = nn.Sequential(
            nn.Conv2d(1, F1, (1, kern_len), padding=(0, kern_len//2), bias=False),
            nn.BatchNorm2d(F1),
            nn.Conv2d(F1, F1*D, (chans, 1), bias=False),
            nn.BatchNorm2d(F1*D),
            nn.ELU(),
            nn.AvgPool2d((1,4)),
            nn.Dropout(dropout)
        )
        self.second = nn.Sequential(
            nn.Conv2d(F1*D, F2, (1, 16), bias=False),
            nn.BatchNorm2d(F2),
            nn.ELU(),
            nn.AvgPool2d((1,8)),
            nn.Flatten()
        )
        # compute flattened dim dynamically
        with torch.no_grad():
            dummy = torch.zeros(1,1,chans,samples)
            feat = self.first(dummy)
            feat = self.second(feat)
            hid_dim = feat.shape[1]
        self.classify = nn.Linear(hid_dim, classes)
    def forward(self, x):
        # x: B x chans x samples
        x = x.unsqueeze(1)   # B x 1 x chans x samples
        x = self.first(x)
        x = self.second(x)
        return self.classify(x)

# Train function (uses DataLoader that returns torch tensors)
def train_model(model, train_loader, val_loader, device='cpu', epochs=30, lr=1e-3):
    model = model.to(device)
    opt = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4)
    loss_fn = nn.CrossEntropyLoss()
    best_acc = 0.0; best_state = None
    for ep in range(1, epochs+1):
        model.train()
        running_losses = []
        for xb, yb in train_loader:
            xb = xb.to(device); yb = yb.to(device)
            opt.zero_grad()
            logits = model(xb)
            loss = loss_fn(logits, yb)
            loss.backward(); opt.step()
            running_losses.append(loss.item())
        # validation
        model.eval()
        ys=[]; preds=[]
        with torch.no_grad():
            for xb, yb in val_loader:
                xb = xb.to(device)
                logits = model(xb)
                preds.extend(logits.argmax(dim=1).cpu().numpy())
                ys.extend(yb.numpy())
        acc = accuracy_score(ys, preds); f1v = f1_score(ys, preds, average='weighted')
        if acc > best_acc:
            best_acc = acc; best_state = model.state_dict()
        if ep==1 or ep%5==0:
            print(f"[EEGNet] ep{ep}: tr_loss={np.mean(running_losses):.4f}, val_acc={acc:.4f}, f1={f1v:.4f}")
    if best_state is not None:
        model.load_state_dict(best_state)
    return model, best_acc

if n_samples < MIN_SAMPLES_FOR_CNN:
    print(f"Only {n_samples} samples available (<{MIN_SAMPLES_FOR_CNN}). Skipping CNN training. Use CSP/classical or gather more data.")
    # produce summary dataframe
    rows=[]
    for k,v in results_classical.items():
        rows.append({'model': k+' (CSP)', 'acc_mean': v['acc_mean'], 'acc_std': v['acc_std'], 'f1_mean': v['f1_mean']})
    df = pd.DataFrame(rows).sort_values('acc_mean', ascending=False).reset_index(drop=True)
    print("\nSummary (classical only):\n", df)
else:
    # split stratified train/test for CNN (keep a small holdout)
    tr_idx, te_idx = train_test_split(np.arange(n_samples), test_size=0.2, stratify=y, random_state=42)
    ds_tr = TorchEEGDataset(X[tr_idx], y[tr_idx], augment=False)
    ds_te = TorchEEGDataset(X[te_idx], y[te_idx], augment=False)
    loader_tr = DataLoader(ds_tr, batch_size=16, shuffle=True)
    loader_te = DataLoader(ds_te, batch_size=32, shuffle=False)
    chans, samples = X.shape[1], X.shape[2]
    num_classes = int(len(np.unique(y)))
    model = EEGNet(chans, samples, classes=num_classes)
    model, best_acc = train_model(model, loader_tr, loader_te, device=device, epochs=25, lr=1e-3)
    torch.save(model.state_dict(), "eegnet_best.pth")
    print("EEGNet best acc on holdout:", best_acc)
    # evaluation confusion
    ys, preds = [], []
    model.eval()
    with torch.no_grad():
        for xb, yb in loader_te:
            xb = xb.to(device)
            logits = model(xb)
            preds.extend(logits.argmax(dim=1).cpu().numpy())
            ys.extend(yb.numpy())
    print("Confusion (EEGNet):\n", confusion_matrix(ys, preds))
    # summary
    rows=[]
    for k,v in results_classical.items():
        rows.append({'model': k+' (CSP)', 'acc_mean': v['acc_mean'], 'acc_std': v['acc_std'], 'f1_mean': v['f1_mean']})
    rows.append({'model': 'EEGNet (raw)', 'acc_mean': float(best_acc), 'acc_std': 0.0, 'f1_mean': None})
    df = pd.DataFrame(rows).sort_values('acc_mean', ascending=False).reset_index(drop=True)
    print("\nSummary:\n", df)
    df.to_csv('benchmark_summary.csv', index=False)

# Save summary artifacts
try:
    df.to_csv('benchmark_summary.csv', index=False)
    print("Saved benchmark_summary.csv")
except Exception:
    pass

# ---------- End of cell ----------


Loaded X.shape (30, 64, 561) y.shape (30,)
Labels after fix: [0 1] counts: {np.int64(0): np.int64(14), np.int64(1): np.int64(16)}
Computing rank from data with rank=None
    Using tolerance 0.00082 (2.2e-16 eps * 64 dim * 5.8e+10  max singular value)
    Estimated rank (data): 64
    data: rank 64 computed from 64 data channels with 0 projectors
Reducing data rank from 64 -> 64
Estimating class=0 covariance using EMPIRICAL
Done.
Estimating class=1 covariance using EMPIRICAL
Done.
[CSP] LDA: acc 0.967 ± 0.067, f1 0.966
[CSP] SVM-rbf: acc 0.933 ± 0.082, f1 0.933
[CSP] RandomForest: acc 0.967 ± 0.067, f1 0.966




[CSP] MLP: acc 1.000 ± 0.000, f1 1.000
Only 30 samples available (<50). Skipping CNN training. Use CSP/classical or gather more data.

Summary (classical only):
                 model  acc_mean   acc_std   f1_mean
0           MLP (CSP)  1.000000  0.000000  1.000000
1           LDA (CSP)  0.966667  0.066667  0.965714
2  RandomForest (CSP)  0.966667  0.066667  0.965714
3       SVM-rbf (CSP)  0.933333  0.081650  0.933333
Saved benchmark_summary.csv


In [6]:
import numpy as np

def add_noise(epoch, sigma=0.01):
    return epoch + np.random.normal(0, sigma, epoch.shape)

def time_shift(epoch, shift_samples):
    return np.roll(epoch, shift_samples, axis=-1)

def channel_dropout(epoch, drop_prob=0.1):
    ep = epoch.copy()
    n_ch = ep.shape[0]
    for ch in range(n_ch):
        if np.random.rand() < drop_prob:
            ep[ch,:] = 0
    return ep

def random_augment(epoch):
    e = epoch.copy()
    if np.random.rand() < 0.5:
        e = add_noise(e, sigma=0.01)
    if np.random.rand() < 0.5:
        shift = np.random.randint(-10, 11)  # +/- up to 10 samples
        e = time_shift(e, shift)
    if np.random.rand() < 0.3:
        e = channel_dropout(e, drop_prob=0.15)
    return e


In [9]:
# Tiny EEGNet with safe auto-computed flatten + k-fold training

import numpy as np
import torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score

# ---- augment helpers (reuse from above if already defined) ----
def add_noise(epoch, sigma=0.01):
    return epoch + np.random.normal(0, sigma, epoch.shape)

def time_shift(epoch, shift_samples):
    return np.roll(epoch, shift_samples, axis=-1)

def channel_dropout(epoch, drop_prob=0.1):
    ep = epoch.copy()
    n_ch = ep.shape[0]
    for ch in range(n_ch):
        if np.random.rand() < drop_prob:
            ep[ch, :] = 0
    return ep

def random_augment(epoch):
    e = epoch.copy()
    if np.random.rand() < 0.5:
        e = add_noise(e, sigma=0.01)
    if np.random.rand() < 0.5:
        shift = np.random.randint(-10, 11)
        e = time_shift(e, shift)
    if np.random.rand() < 0.3:
        e = channel_dropout(e, drop_prob=0.15)
    return e


# ---- dataset ----
class SmallEEGDataset(Dataset):
    def __init__(self, X, y, augment=False):
        self.X = X.astype(np.float32)
        self.y = y.astype(int)
        self.augment = augment

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        x = self.X[idx]
        if self.augment:
            x = random_augment(x)
        return x, int(self.y[idx])


# ---- Tiny EEGNet (auto-compute flatten) ----
class TinyEEGNet(nn.Module):
    def __init__(self, chans, samples, classes=2):
        super().__init__()

        self.features = nn.Sequential(
            nn.Conv2d(1, 4, kernel_size=(1, 32), padding=(0, 16), bias=False),
            nn.BatchNorm2d(4),
            nn.Conv2d(4, 4, kernel_size=(chans, 1), bias=False),
            nn.ELU(),
            nn.AvgPool2d((1, 4)),
            nn.Dropout(0.5)
        )

        # compute flatten dim automatically
        with torch.no_grad():
            dummy = torch.zeros(1, 1, chans, samples)
            feat = self.features(dummy)
            flat_dim = feat.view(1, -1).shape[1]

        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(flat_dim, classes)
        )

    def forward(self, x):
        x = x.unsqueeze(1)
        x = self.features(x)
        x = self.classifier(x)
        return x


# ---- load data ----
d = np.load("preprocessed.npz", allow_pickle=True)
X = d["X"]
y = d["y"].astype(int)

n_epochs, n_chans, n_times = X.shape
device = "cuda" if torch.cuda.is_available() else "cpu"

skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
fold_accs = []

for fold, (tr, te) in enumerate(skf.split(X, y), 1):
    ds_tr = SmallEEGDataset(X[tr], y[tr], augment=True)
    ds_te = SmallEEGDataset(X[te], y[te], augment=False)

    loader_tr = DataLoader(ds_tr, batch_size=8, shuffle=True)
    loader_te = DataLoader(ds_te, batch_size=16, shuffle=False)

    model = TinyEEGNet(n_chans, n_times, classes=len(np.unique(y))).to(device)
    opt = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
    loss_fn = nn.CrossEntropyLoss()

    for ep in range(1, 21):
        model.train()
        for xb, yb in loader_tr:
            xb = xb.to(device).float()

            yb = torch.tensor(yb, dtype=torch.long).to(device)

            opt.zero_grad()
            logits = model(xb)
            loss = loss_fn(logits, yb)
            loss.backward()
            opt.step()

    # validation
    model.eval()
    preds, gts = [], []
    with torch.no_grad():
        for xb, yb in loader_te:
            xb = xb.to(device)
            logits = model(xb)
            preds.extend(logits.argmax(dim=1).cpu().numpy())
            gts.extend(yb)

    acc = accuracy_score(gts, preds)
    print(f"Fold {fold} acc {acc:.3f}")
    fold_accs.append(acc)

print("Mean acc:", np.mean(fold_accs))


  yb = torch.tensor(yb, dtype=torch.long).to(device)


Fold 1 acc 0.333


  yb = torch.tensor(yb, dtype=torch.long).to(device)


Fold 2 acc 0.500


  yb = torch.tensor(yb, dtype=torch.long).to(device)


Fold 3 acc 0.500


  yb = torch.tensor(yb, dtype=torch.long).to(device)


Fold 4 acc 0.500


  yb = torch.tensor(yb, dtype=torch.long).to(device)


Fold 5 acc 0.500
Mean acc: 0.4666666666666666
