## Imports

In [1]:
import os
import h5py
import numpy as np
from pathlib import Path

import matplotlib.pyplot as plt

import keras
from keras.models import Sequential
from keras.layers import Conv1D, LSTM, Dropout, BatchNormalization, Dense, GlobalAveragePooling1D, TimeDistributed, MaxPooling1D, Flatten
from keras.optimizers import Nadam
from keras.callbacks import EarlyStopping
from keras.utils import to_categorical
from keras.regularizers import l2
from keras.optimizers.schedules import ExponentialDecay

from sklearn.utils import shuffle

from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay

from parse_data import load_intra_train, load_intra_test, load_cross_train, load_cross_test

## Constants

In [2]:
DOWNSAMPLING_STEP = 8
NUM_CLASSES = 4
BATCH_SIZE = 3

ENCODE_MAP = {
        'rest': 0,
        'motor': 1,
        'memory': 2,
        'math': 3,
    }

INTRA_OR_CROSS_MODE = "cross"

if INTRA_OR_CROSS_MODE == "cross":
    load_train = load_cross_train
    load_test = load_cross_test
elif INTRA_OR_CROSS_MODE == "intra":
    load_train = load_intra_train
    load_test = load_intra_test


## Load data

In [3]:
filenamepath_train = ("C:/Users/guill/Desktop/Uni/DeepLearning/Final Project data/Cross/train")
filenamepath_test = ("C:/Users/guill/Desktop/Uni/DeepLearning/Final Project data/Cross")

X_train, y_train = load_cross_train(folder_path=filenamepath_train,batch_size=BATCH_SIZE,downsample_step=DOWNSAMPLING_STEP,logs=True)

X.shape from C:\Users\guill\Desktop\Uni\DeepLearning\Final Project data\Cross\train: (94976, 3, 248)


## Transformers

In [4]:
import torch.nn as nn
import numpy as np
import torch

print("Defining model")

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.pe = pe.unsqueeze(0)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :].to(x.device)

class TransformerClassifier(nn.Module):
    def __init__(self, input_dim=248, model_dim=64, num_heads=4, num_layers=2, num_classes=4, dropout=0.2):
        super().__init__()
        self.input_fc = nn.Linear(input_dim, model_dim)
        self.pos_encoder = PositionalEncoding(model_dim)
        encoder_layer = nn.TransformerEncoderLayer(d_model=model_dim, nhead=num_heads, dropout=dropout, batch_first=True)
        self.transformer = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Linear(model_dim, num_classes)
        )

    def forward(self, x):
        x = self.input_fc(x)
        x = self.pos_encoder(x)
        x = self.transformer(x)
        x = x.transpose(1, 2)
        return self.classifier(x)


Defining model


In [5]:
def train_model(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for xb, yb in dataloader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        preds = model(xb)
        loss = criterion(preds, yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(dataloader)

def evaluate_model(model, dataloader, device):
    model.eval()
    all_preds = []
    all_targets = []
    with torch.no_grad():
        for xb, yb in dataloader:
            xb, yb = xb.to(device), yb.to(device)
            preds = model(xb)
            predicted = torch.argmax(preds, dim=1)
            all_preds.extend(predicted.cpu().numpy())
            all_targets.extend(yb.cpu().numpy())
    acc = np.mean(np.array(all_preds) == np.array(all_targets))
    return acc, all_preds, all_targets


In [None]:
import torch
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split

print("Enter training")

X_train_np, X_val_np, y_train_np, y_val_np = train_test_split(
    np.array(X_train), np.array(y_train), test_size=0.2, stratify=y_train, random_state=42
)


X_train_tensor = torch.tensor(X_train_np, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_np, dtype=torch.long)
X_val_tensor = torch.tensor(X_val_np, dtype=torch.float32)
y_val_tensor = torch.tensor(y_val_np, dtype=torch.long)


train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=8, shuffle=True)
val_loader = DataLoader(TensorDataset(X_val_tensor, y_val_tensor), batch_size=8)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = TransformerClassifier(input_dim=X_train_tensor.shape[2]).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-4)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', patience=5, factor=0.5)

best_val_acc = 0
best_train_loss = 1000
best_model_state = None
train_losses = []
val_accuracies = []
num_epochs = 50

for epoch in range(num_epochs):
    train_loss = train_model(model, train_loader, criterion, optimizer, device)
    val_acc, _, _ = evaluate_model(model, val_loader, device)
    scheduler.step(val_acc)

    train_losses.append(train_loss)
    val_accuracies.append(val_acc)
    print(f"Epoch {epoch+1}: Train Loss = {train_loss:.4f}, Val Accuracy = {val_acc:.4f}")

    if val_acc > best_val_acc or (val_acc == best_val_acc and train_loss < best_train_loss):
        best_val_acc = val_acc
        best_train_loss = train_loss
        best_model_state = model.state_dict()

# Load best model
if best_model_state is not None:
    model.load_state_dict(best_model_state)
    print(f"\n✅ Restored best model with Val Accuracy = {best_val_acc:.4f}")


Enter training
Epoch 1: Train Loss = 1.3901, Val Accuracy = 0.2500


# Training Progress

In [None]:
# Plot accuracy
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.plot(history.history['accuracy'], label='Train Acc')
plt.plot(history.history['val_accuracy'], label='Val Acc')
plt.title('Model Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.ylim((0,1))
plt.legend()

# Plot loss
plt.subplot(1, 2, 2)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Val Loss')
plt.title('Model Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()

plt.tight_layout()
plt.show()


# Testing & Evaluating

In [None]:
# Load and preprocess test data
X_test, y_test = load_cross_test(folder_path=filenamepath_test,batch_size=BATCH_SIZE,downsample_step=DOWNSAMPLING_STEP,logs=1)

# One-hot encode labels
y_test = to_categorical(y_test, num_classes=4)

In [None]:
[(0.715900182723999, [14, 100000, 0.00015, 0.98]), (0.7168548107147217, [12, 100000, 5e-05, 0.96]), (0.7189745903015137, [13, 100000, 5e-05, 0.92]), (0.7205188870429993, [14, 100000, 0.00015, 0.94]), (0.7219367623329163, [13, 100000, 5e-05, 0.98]), (0.7224982976913452, [14, 100000, 5e-05, 0.98]), (0.7247023582458496, [14, 100000, 0.0001, 0.94]), (0.7249410152435303, [13, 100000, 0.0001, 0.98]), (0.7268081903457642, [13, 100000, 5e-05, 0.94]), (0.7303038239479065, [14, 100000, 5e-05, 0.92]), (0.7378144860267639, [12, 100000, 0.00015, 0.98])]


In [None]:
# Evaluate
test_loss, test_acc = lstm_model.evaluate(X_test, y_test, verbose=1)
print("Test accuracy:", test_acc)

In [None]:
# This piece of code collects all the instances that were predicted as 'motor' by the model in this notebook.
# Purpose: Since this model could improve on distinguishing motor and math, we could feed all predictions of 'motor' to a new model, for a second opinion
# So this code determines on what instances to ask a second opinion
X_second_op = []

counter = 0
while counter < len(y_test):
    if (list(y_test[counter])) == [0.0, 1.0, 0.0, 0.0]:
        # feed to other model
        instance = (X_test[counter])
        
        X_second_op.append(instance)

    counter = counter + 1

X_second_op = np.asarray(X_second_op)


In [None]:

y_pred_probs = model.predict(X_test)
y_pred = np.argmax(y_pred_probs, axis=1)
y_true = np.argmax(y_test, axis=1)

cm = confusion_matrix(y_true, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["rest", "motor", "memory", "math"])
disp.plot(cmap=plt.cm.Blues)
plt.title("Confusion Matrix")
plt.show()


In [None]:
from sklearn.metrics import accuracy_score

refined_accuracy = accuracy_score(y_true, refined_preds)
print(f"Test accuracy after refinement: {refined_accuracy:.4f}")
