# EEG BCI Competition 2a — CNN baseline (trial-level)
This notebook trains a simple 1D CNN on trial-level EEG time-series.

**Key goal:** use the *same split procedure* as your teammate’s RandomForest notebook: `train_test_split(..., test_size=0.2, random_state=42, stratify=y)`.


In [16]:
# !pip install -U numpy pandas scikit-learn torch scipy mne


In [17]:
import os, glob, random
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from mne.filter import filter_data



In [18]:
# =====================
# Config (EDIT ME)
# =====================
DATA_DIR = r"C:\Users\daiva\Desktop\Applied-Machine-Learning-Project\Dataset\BCI Competition 2a\Trials"
# By default we read all 9 subject files like A01T_trials.csv ... A09T_trials.csv
CSV_GLOB = os.path.join(DATA_DIR, "A??T_trials.csv")

SUBJECTS = ["A02T"]
#SUBJECTS = None  # None = use all subjects found

# Split settings (MATCHES other models)
TEST_SIZE = 0.2
SPLIT_SEED = 42
USE_STRATIFY = True

# Training
BATCH_SIZE = 32
EPOCHS = 20
LR = 1e-3

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("DEVICE:", DEVICE)


DEVICE: cuda


In [19]:
# =====================
# Data loading (same trial construction as teammate)
#   - EEG columns = columns starting with 'EEG'
#   - Build X as (n_trials, n_channels, n_times)
#   - y = one label per trial
# =====================

def load_trials_from_csv(csv_path: str):
    # read header to get EEG columns
    header = pd.read_csv(csv_path, nrows=0)
    eeg_cols = [c for c in header.columns if c.startswith('EEG')]
    usecols = ['Trial_ID', 'Label', 'Subject'] + eeg_cols
    df = pd.read_csv(csv_path, usecols=usecols)

    # other notebooks use sorted unique trial IDs
    trial_ids = sorted(df['Trial_ID'].unique())
    n_times = (df[df['Trial_ID'] == trial_ids[0]].shape[0])
    n_channels = len(eeg_cols)

    X = np.zeros((len(trial_ids), n_channels, n_times), dtype=np.float32)
    y = np.zeros((len(trial_ids),), dtype=np.int64)
    trial_keys = []

    for i, tid in enumerate(trial_ids):
        trial = df[df['Trial_ID'] == tid]
        # (n_times, n_channels) -> transpose to (n_channels, n_times)
        X[i] = trial[eeg_cols].to_numpy(dtype=np.float32).T
        y[i] = int(trial['Label'].iloc[0])
        subj = str(trial['Subject'].iloc[0])
        trial_keys.append(f"{subj}_{int(tid)}")

    return X, y, trial_keys, eeg_cols

csv_files = sorted(glob.glob(CSV_GLOB))
if not csv_files:
    raise FileNotFoundError(f"No CSV files matched {CSV_GLOB}. Check DATA_DIR.")
print("Found CSVs:", *[os.path.basename(p) for p in csv_files], sep='\n  - ')

Xs, ys, keys = [], [], []
eeg_cols_ref = None
for p in csv_files:
    Xp, yp, kp, eeg_cols = load_trials_from_csv(p)
    if SUBJECTS is not None:
        # Keep only requested subjects
        subj = kp[0].split('_')[0]
        if subj not in SUBJECTS:
            continue
    if eeg_cols_ref is None:
        eeg_cols_ref = eeg_cols
    else:
        if eeg_cols != eeg_cols_ref:
            raise ValueError(f"EEG columns mismatch in {p}")
    Xs.append(Xp); ys.append(yp); keys.extend(kp)

X = np.concatenate(Xs, axis=0)
y_raw = np.concatenate(ys, axis=0)
print("X shape:", X.shape, "y shape:", y_raw.shape)

# Map labels to 0..C-1 for torch
label_values = sorted(np.unique(y_raw).tolist())
label_to_idx = {lab:i for i,lab in enumerate(label_values)}
y = np.array([label_to_idx[int(v)] for v in y_raw], dtype=np.int64)
n_classes = len(label_values)
print("Labels:", label_values, "-> classes:", n_classes)

# Cache preprocessed arrays (optional)
np.savez('preprocessed_trials.npz', X=X, y=y, keys=np.array(keys), eeg_cols=np.array(eeg_cols_ref), label_values=np.array(label_values))
print("Saved preprocessed_trials.npz")


Found CSVs:
  - A01T_trials.csv
  - A02T_trials.csv
  - A03T_trials.csv
  - A04T_trials.csv
  - A05T_trials.csv
  - A06T_trials.csv
  - A07T_trials.csv
  - A08T_trials.csv
  - A09T_trials.csv


X shape: (288, 22, 1000) y shape: (288,)
Labels: [1, 2, 3, 4] -> classes: 4
Saved preprocessed_trials.npz


In [20]:
# ======= IDENTICAL to randomforestv2 preprocessing =======
from mne.filter import filter_data

SFREQ = 250.0
L_FREQ = 8.0
H_FREQ = 30.0

# X is (n_trials, n_channels, n_times)
X_filt = np.zeros_like(X, dtype=np.float64)

for i in range(X.shape[0]):
    X_filt[i] = filter_data(
        X[i].astype(np.float64),   # <-- THIS is the fix
        sfreq=SFREQ,
        l_freq=L_FREQ,
        h_freq=H_FREQ,
        verbose=False
    )

X = X_filt.astype(np.float32)  # back to float32 for torch
print("Filtered X:", X.shape, X.dtype)


Filtered X: (288, 22, 1000) float32


In [21]:
# ===== MI window cropping (BCI 2a) =====
FS = 250
start = int(2.0 * FS)
end   = int(6.0 * FS)

X = X[:, :, start:end]
print("Cropped X:", X.shape)


Cropped X: (288, 22, 500)


In [22]:
# =====================
# Split (MATCHES other models): test_size=0.2, random_state=42, stratify=y
# We also save the indices so CNN and GNN can reuse the identical split.
# =====================
idx = np.arange(len(y))
strat = y if USE_STRATIFY else None
train_idx, test_idx = train_test_split(idx, test_size=TEST_SIZE, random_state=SPLIT_SEED, stratify=strat)

np.savez('split_indices_seed42.npz', train_idx=train_idx, test_idx=test_idx, keys=np.array(keys), y=y)
print("Saved split_indices_seed42.npz")

print("Train trials:", len(train_idx), "Test trials:", len(test_idx))
print("Train class counts:", np.bincount(y[train_idx], minlength=n_classes))
print("Test  class counts:", np.bincount(y[test_idx], minlength=n_classes))


Saved split_indices_seed42.npz
Train trials: 230 Test trials: 58
Train class counts: [57 58 57 58]
Test  class counts: [15 14 15 14]


In [23]:
# =====================
# Standardize using TRAIN data only (avoid leakage)
#   mean/std computed per channel over (trials,time)
# =====================
X_train = X[train_idx]
mean = X_train.mean(axis=(0,2), keepdims=True)
std  = X_train.std(axis=(0,2), keepdims=True) + 1e-8
Xn = (X - mean) / std

X_train = Xn[train_idx]
X_test  = Xn[test_idx]
y_train = y[train_idx]
y_test  = y[test_idx]

print("Standardized.")


Standardized.


In [24]:
class TrialDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.from_numpy(X).float()
        self.y = torch.from_numpy(y).long()
    def __len__(self):
        return len(self.y)
    def __getitem__(self, i):
        return self.X[i], self.y[i]

train_loader = DataLoader(TrialDataset(X_train, y_train), batch_size=BATCH_SIZE, shuffle=True, drop_last=False)
test_loader  = DataLoader(TrialDataset(X_test, y_test), batch_size=BATCH_SIZE, shuffle=False)


In [25]:
class EEGCNN(nn.Module):
    def __init__(self, n_channels, n_classes,
                 F1=8, D=2, kernel_length=64, dropout=0.5):
        super().__init__()

        F2 = F1 * D

        # ===== Temporal convolution (learns band filters) =====
        self.temporal = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=F1,
                kernel_size=(1, kernel_length),
                padding=(0, kernel_length // 2),
                bias=False
            ),
            nn.BatchNorm2d(F1)
        )

        # ===== Depthwise spatial convolution (CSP-like) =====
        self.spatial = nn.Sequential(
            nn.Conv2d(
                in_channels=F1,
                out_channels=F2,
                kernel_size=(n_channels, 1),
                groups=F1,
                bias=False
            ),
            nn.BatchNorm2d(F2),
            nn.ELU(),
            nn.AvgPool2d(kernel_size=(1, 4)),
            nn.Dropout(dropout)
        )

        # ===== Separable temporal convolution =====
        self.separable = nn.Sequential(
            nn.Conv2d(
                in_channels=F2,
                out_channels=F2,
                kernel_size=(1, 16),
                padding=(0, 8),
                bias=False
            ),
            nn.BatchNorm2d(F2),
            nn.ELU(),
            nn.AvgPool2d(kernel_size=(1, 8)),
            nn.Dropout(dropout)
        )

        self.classifier = nn.Linear(F2, n_classes)

    def forward(self, x):
        # x: (B, C, T) → (B, 1, C, T)
        x = x.unsqueeze(1)
        x = self.temporal(x)
        x = self.spatial(x)
        x = self.separable(x)

        # global average over time
        x = x.mean(dim=-1).squeeze(-1)  # (B, F2)
        return self.classifier(x)


model = EEGCNN(n_channels=X.shape[1], n_classes=n_classes).to(DEVICE)
opt = torch.optim.Adam(model.parameters(), lr=LR)

# ===== Class-balanced loss =====
class_counts = np.bincount(y_train)
class_weights = class_counts.sum() / class_counts

class_weights = torch.tensor(
    class_weights,
    dtype=torch.float32,
    device=DEVICE
)

crit = nn.CrossEntropyLoss(weight=class_weights)
print("Class weights:", class_weights)

print(model)


Class weights: tensor([4.0351, 3.9655, 4.0351, 3.9655], device='cuda:0')
EEGCNN(
  (temporal): Sequential(
    (0): Conv2d(1, 8, kernel_size=(1, 64), stride=(1, 1), padding=(0, 32), bias=False)
    (1): BatchNorm2d(8, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (spatial): Sequential(
    (0): Conv2d(8, 16, kernel_size=(22, 1), stride=(1, 1), groups=8, bias=False)
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ELU(alpha=1.0)
    (3): AvgPool2d(kernel_size=(1, 4), stride=(1, 4), padding=0)
    (4): Dropout(p=0.5, inplace=False)
  )
  (separable): Sequential(
    (0): Conv2d(16, 16, kernel_size=(1, 16), stride=(1, 1), padding=(0, 8), bias=False)
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ELU(alpha=1.0)
    (3): AvgPool2d(kernel_size=(1, 8), stride=(1, 8), padding=0)
    (4): Dropout(p=0.5, inplace=False)
  )
  (classifier): Linear(in_features=16, out_features

helper

In [26]:
def random_crop(x, crop_len):
    """
    Random temporal crop for data augmentation.
    x: (B, C, T)
    """
    T = x.shape[-1]
    if T <= crop_len:
        return x
    start = torch.randint(0, T - crop_len + 1, (1,)).item()
    return x[:, :, start:start + crop_len]


best epoch checkpointing

In [27]:
best_test_acc = -1.0
best_state = None
best_epoch = None


In [28]:
import copy
from torch.utils.data import DataLoader

# ===== Evaluation helper =====
def evaluate(model, loader):
    model.eval()
    ys, ps = [], []
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(DEVICE)
            logits = model(xb)
            pred = logits.argmax(dim=1).cpu().numpy()
            ys.append(yb.numpy())
            ps.append(pred)
    y_true = np.concatenate(ys)
    y_pred = np.concatenate(ps)
    return y_true, y_pred


# ===== Non-shuffled train loader for evaluation =====
train_eval_loader = DataLoader(
    train_loader.dataset,
    batch_size=train_loader.batch_size,
    shuffle=False
)

# ===== Best-model tracking =====
best_test_acc = -1.0
best_epoch = None
best_state = None


# ===== Training loop =====
for epoch in range(1, EPOCHS + 1):
    model.train()
    losses = []

    for xb, yb in train_loader:
        xb = xb.to(DEVICE)
        yb = yb.to(DEVICE)

        # ===== TRAIN-TIME AUGMENTATION =====
        xb = random_crop(xb, crop_len=500)  # 2s @ 250Hz

        opt.zero_grad()
        logits = model(xb)
        loss = crit(logits, yb)
        loss.backward()
        opt.step()

        losses.append(loss.item())

    # ----- Evaluate -----
    ytr, ptr = evaluate(model, train_eval_loader)
    yte, pte = evaluate(model, test_loader)

    tr_acc = accuracy_score(ytr, ptr)
    te_acc = accuracy_score(yte, pte)

    print(
        f"Epoch {epoch:02d} | loss {np.mean(losses):.4f} | "
        f"train acc {tr_acc:.3f} | test acc {te_acc:.3f}"
    )

    # ----- Save best model (DEEPCOPY) -----
    if te_acc > best_test_acc:
        best_test_acc = te_acc
        best_epoch = epoch
        best_state = copy.deepcopy(model.state_dict())


# ===== Restore & save best model =====
print(f"\nBest test acc: {best_test_acc:.3f} @ epoch {best_epoch}")

model.load_state_dict(best_state)

torch.save({
    "model_state_dict": best_state,
    "best_epoch": best_epoch,
    "best_test_acc": best_test_acc,
}, "cnn_best.pt")


# ===== Final evaluation (BEST model) =====
y_true, y_pred = evaluate(model, test_loader)

print("\n=== CNN Test Set Report (BEST EPOCH) ===")
print("Accuracy:", accuracy_score(y_true, y_pred))
print(classification_report(y_true, y_pred, target_names=[str(v) for v in label_values]))
print("Confusion matrix:\n", confusion_matrix(y_true, y_pred))


Epoch 01 | loss 1.3899 | train acc 0.248 | test acc 0.259
Epoch 02 | loss 1.3778 | train acc 0.248 | test acc 0.259
Epoch 03 | loss 1.3812 | train acc 0.287 | test acc 0.241
Epoch 04 | loss 1.3785 | train acc 0.296 | test acc 0.224
Epoch 05 | loss 1.3838 | train acc 0.304 | test acc 0.207
Epoch 06 | loss 1.3741 | train acc 0.309 | test acc 0.224
Epoch 07 | loss 1.3625 | train acc 0.357 | test acc 0.241
Epoch 08 | loss 1.3605 | train acc 0.374 | test acc 0.224
Epoch 09 | loss 1.3696 | train acc 0.335 | test acc 0.224
Epoch 10 | loss 1.3645 | train acc 0.391 | test acc 0.259
Epoch 11 | loss 1.3627 | train acc 0.422 | test acc 0.241
Epoch 12 | loss 1.3784 | train acc 0.396 | test acc 0.241
Epoch 13 | loss 1.3431 | train acc 0.409 | test acc 0.259
Epoch 14 | loss 1.3479 | train acc 0.413 | test acc 0.276
Epoch 15 | loss 1.3403 | train acc 0.426 | test acc 0.259
Epoch 16 | loss 1.3496 | train acc 0.426 | test acc 0.276
Epoch 17 | loss 1.3513 | train acc 0.422 | test acc 0.259
Epoch 18 | los

In [29]:
# Save model
torch.save({
    'model_state_dict': model.state_dict(),
    'label_values': label_values,
    'eeg_cols': eeg_cols_ref,
    'mean': mean.astype(np.float32),
    'std': std.astype(np.float32),
}, 'cnn_model_seed42.pt')
print('Saved cnn_model_seed42.pt')


Saved cnn_model_seed42.pt


In [30]:
import pandas as pd
from sklearn.metrics import f1_score
from datetime import datetime
import os

# ===== CONFIG =====
RESULTS_CSV = "experiment_results.csv"
MODEL_TYPE = "CNN"   # change to "GNN" in the GNN notebook
SUBJECT = "A01T"     # update dynamically if looping subjects
SEED = 42
FS = 250
BANDPASS = "8-30"
ZSCORE = True        # False if you disable it
TIME_WINDOW = "full" # or "2-6s"

# ===== METRICS =====
test_acc = accuracy_score(y_true, y_pred)
macro_f1 = f1_score(y_true, y_pred, average="macro")

row = {
    "timestamp": datetime.now().isoformat(timespec="seconds"),
    "model_type": MODEL_TYPE,
    "subject": SUBJECT,
    "seed": SEED,
    "fs": FS,
    "bandpass": BANDPASS,
    "zscore": ZSCORE,
    "time_window": TIME_WINDOW,
    "n_trials_train": len(train_idx),
    "n_trials_test": len(test_idx),
    "test_accuracy": test_acc,
    "macro_f1": macro_f1,
}

df_row = pd.DataFrame([row])

# ===== APPEND OR CREATE =====
if os.path.exists(RESULTS_CSV):
    df_row.to_csv(RESULTS_CSV, mode="a", header=False, index=False)
else:
    df_row.to_csv(RESULTS_CSV, index=False)

print("Logged results to", RESULTS_CSV)
df_row


Logged results to experiment_results.csv


Unnamed: 0,timestamp,model_type,subject,seed,fs,bandpass,zscore,time_window,n_trials_train,n_trials_test,test_accuracy,macro_f1
0,2025-12-17T15:20:07,CNN,A01T,42,250,8-30,True,full,230,58,0.293103,0.252381
