In [14]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score, precision_score, recall_score, accuracy_score
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay, classification_report
from torchvision.models import resnet18, ResNet18_Weights
from torch.utils.data import Dataset
import glob
import matplotlib.pyplot as plt
import numpy as np
import scipy
import mne


In [9]:
data = r"/Users/josephthi/Desktop/UCI_Classes/cs184A/CS_184A_Final Project/data/ds004504/derivatives"

In [15]:
device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")

In [30]:
class EEGNet(nn.Module):
    def __init__(self, num_classes=3, Ch=19, T=120):
        super().__init__()

        # Block 1 — temporal convolution
        self.conv1 = nn.Conv2d(1, 8, kernel_size=(1, 16), padding=(0, 8), bias=False)
        self.bn1 = nn.BatchNorm2d(8)

        # Block 2 — depthwise convolution across channels
        self.depthwise = nn.Conv2d(
            8,
            16,
            kernel_size=(Ch, 1),
            groups=8,
            bias=False
        )
        self.bn2 = nn.BatchNorm2d(16)
        self.pool2 = nn.AvgPool2d(kernel_size=(1, 4))
        self.drop2 = nn.Dropout(0.5)

        # Block 3 — separable convolution
        self.separable = nn.Conv2d(
            16, 16,
            kernel_size=(1, 8),
            padding=(0, 4),
            bias=False
        )
        self.bn3 = nn.BatchNorm2d(16)
        self.pool3 = nn.AvgPool2d(kernel_size=(1, 4))
        self.drop3 = nn.Dropout(0.5)

        # Compute final feature size automatically
        dummy = torch.zeros(1, 1, T, Ch)
        dummy = self.forward_features(dummy)
        self.flat_dim = dummy.shape[1]

        # Final classifier
        self.fc = nn.Linear(self.flat_dim, num_classes)

    def forward_features(self, x):
        x = F.elu(self.bn1(self.conv1(x)))
        x = F.elu(self.bn2(self.depthwise(x)))
        x = self.pool2(x)
        x = self.drop2(x)

        x = F.elu(self.bn3(self.separable(x)))
        x = self.pool3(x)
        x = self.drop3(x)

        x = x.reshape(x.size(0), -1)
        return x

    def forward(self, x):
        x = self.forward_features(x)
        return self.fc(x)

In [31]:
def evaluate(model, X, Y, params = ["acc"]):
    results = []
    batch_size = 100
    
    predicted = []
    
    for i in range(len(X)/batch_size):
        s = i*batch_size
        e = i*batch_size+batch_size
        
        inputs = Variable(torch.from_numpy(X[s:e]).cuda(0))
        pred = model(inputs)
        
        predicted.append(pred.data.cpu().numpy())
        
        
    inputs = Variable(torch.from_numpy(X).cuda(0))
    predicted = model(inputs)
    
    predicted = predicted.data.cpu().numpy()
    
    for param in params:
        if param == 'acc':
            results.append(accuracy_score(Y, np.round(predicted)))
        if param == "auc":
            results.append(roc_auc_score(Y, predicted))
        if param == "recall":
            results.append(recall_score(Y, np.round(predicted)))
        if param == "precision":
            results.append(precision_score(Y, np.round(predicted)))
        if param == "fmeasure":
            precision = precision_score(Y, np.round(predicted))
            recall = recall_score(Y, np.round(predicted))
            results.append(2*precision*recall/ (precision+recall))
    return results

In [21]:
def append_data(path):
    X_data = []
    y_data = []
    for data in sorted(os.listdir(path)):
        file_path = os.path.join(path, data)
        if 'X' in data:
            X = np.load(file_path)
            X_data.append(X)
        elif 'y' in data:
            y = np.load(file_path)
            y_data.append(y)
        
    return X_data, y_data

In [25]:
class EEGDataset(Dataset):
    def __init__(self, X, y):
        super(EEGDataset, self).__init__()
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.long)  # long is correct for CE loss

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [26]:
X_data, y_data = append_data(r"data/raw_eeg_data")
# X_data_n, y_data_n = append_data(r"data/noisy_spectrogram_data")

X_all_list = X_data 
y_all_list = y_data 

X = np.concatenate(X_all_list, axis = 0)
y = np.concatenate(y_all_list, axis = 0)

# --- Normalize inputs (global z-score) ---
# Spectrogram values are very small; normalize to zero-mean unit-variance
# X_mean = X.mean()
# X_std = X.std()
# X = (X - X_mean) / (X_std + 1e-8)

# print('After normalization: X mean {:.3e}, std {:.3e}'.format(X.mean(), X.std()))

In [27]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=0.2,
    random_state=42,
    shuffle=True,
    stratify=y  # keeps class balance
)


train_dataset = EEGDataset(X_train, y_train)
test_dataset  = EEGDataset(X_test, y_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader  = torch.utils.data.DataLoader(test_dataset,  batch_size=32, shuffle=False)

In [32]:
model = EEGNet(num_classes=3, Ch=19, T=120).to(device)

In [None]:
learning_rate = 1e-4
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)
criterion = torch.nn.CrossEntropyLoss()


In [None]:
batch_size = 32
optimizer = optim.Adam()

for epoch in range(10):  # loop over the dataset multiple times
    print("\nEpoch ", epoch)
    
    running_loss = 0.0
    for i in range(len(X_train)/batch_size-1):
        s = i*batch_size
        e = i*batch_size+batch_size
        
        inputs = torch.from_numpy(X_train[s:e])
        labels = torch.FloatTensor(np.array([y_train[s:e]]).T*1.0)
        
        # wrap them in Variable
        inputs, labels = Variable(inputs.cuda(0)), Variable(labels.cuda(0))

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        
        
        optimizer.step()
        
        running_loss += loss.data[0]
    
    # Validation accuracy
    params = ["acc", "auc", "fmeasure"]
    print(params)
    print("Training Loss ", running_loss)
    print("Train - ", evaluate(net, X_train, y_train, params))
    print("Validation - ", evaluate(net, X_val, y_val, params))
    print("Test - ", evaluate(net, X_test, y_test, params))

RuntimeError: Calculated padded input size per channel: (241 x 19). Kernel size: (1 x 64). Kernel size can't be greater than actual input size