In [1]:
import os
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import mne
from torch.utils.data import Dataset, DataLoader, random_split

from sklearn.metrics import accuracy_score


In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

task_mapping = {
    # 1: "baseline", 2: "baseline",
    3: "task1", 7: "task1", 11: "task1",
    4: "task2",  8: "task2",12: "task2",
    5: "task3",  9: "task3", 13: "task3",
    6: "task4", 10: "task4", 14: "task4"
}

In [3]:
def make_sliding_epochs_with_offset(raw, duration, overlap, offset_sec=0.0):
    raw_offset = raw.copy()
    raw_offset.crop(tmin=offset_sec, tmax=None)
    epochs = mne.make_fixed_length_epochs(
        raw_offset, duration=duration, overlap=overlap, preload=True, verbose=False
    )
    return epochs


import mne
import numpy as np

def load_eeg_data(edf_file_path, baseline_files=None):
    raw = mne.io.read_raw_edf(edf_file_path, preload=True, verbose=False)
    raw.pick(['Cz..', 'C3..', 'C4..'])
    raw.filter(1., 40., fir_design='firwin', verbose=False)

    event_times = raw.annotations.onset
    event_labels = raw.annotations.description
    label_mapping = {'T0': 0, 'T1': 1, 'T2': 2}
    event_ids = np.array([label_mapping[label] for label in event_labels])

    events = np.zeros((len(event_times), 3), dtype=int)
    events[:, 0] = (event_times * raw.info['sfreq']).astype(int)
    events[:, -1] = event_ids

    event_id = {key: value for key, value in label_mapping.items()}

    epochs = mne.Epochs(
        raw, events, event_id=event_id, tmin=-0.2, tmax=1.0,
        baseline=(None, 0), preload=True, verbose=False
    )

    # if baseline_files:
    #     baseline_data = []
    #     for baseline_file in baseline_files:

    #         baseline_raw = mne.io.read_raw_edf(baseline_file, preload=True, verbose=False)
    #         baseline_raw.pick(['Cz..', 'C3..', 'C4..'])
    #         baseline_data.append(baseline_raw.get_data())

    #     baseline_mean = np.mean(np.concatenate(baseline_data, axis=0), axis=0)
    #     epochs._data -= baseline_mean

    return epochs


In [4]:
root_dir = "./files/"

data_dict = {}


i = 0

for subject in sorted(os.listdir(root_dir)):
    subject_path = os.path.join(root_dir, subject)
    
    if os.path.isdir(subject_path) and re.match(r"S\d{3}", subject):
        # control the number of persons
        if(i < 30):
            i+=1
        else:
            break

        edf_files = sorted([f for f in os.listdir(subject_path) if f.endswith(".edf")])

        baseline_files = [os.path.join(subject_path, f) for f in edf_files if re.match(rf"{subject}R0[12]\.edf", f)]

        for edf_file in edf_files:
            match = re.match(r"(S\d{3})R(\d{2})\.edf", edf_file)
            if match:
                subject_id, session_id = match.groups()
                session_id = int(session_id)

                if session_id in task_mapping:
                    task = task_mapping[session_id]
                    full_path = os.path.join(subject_path, edf_file)

                    if subject_id not in data_dict:
                        data_dict[subject_id] = {task: []}
                    if task not in data_dict[subject_id]:
                        data_dict[subject_id][task] = []
                    
                    data_dict[subject_id][task].append((full_path, baseline_files))


In [5]:
train_files, test_files = [], []

for subject_id, tasks in data_dict.items():
    for task, file_list in tasks.items():
        if len(file_list) >= 3:
            train_files.extend(file_list[:2])
            test_files.append(file_list[2])

class EEGMotorImageryDataset(Dataset):
    def __init__(self, file_list):
        self.file_list = file_list
        self.data = []
        self.labels = []

        for file_path, baseline_files in self.file_list:
            epochs = load_eeg_data(file_path, baseline_files)

            self.data.append(epochs.get_data())
            self.labels.append(epochs.events[:, -1])

       
        self.data = np.concatenate(self.data, axis=0).astype(np.float32)
        self.labels = np.concatenate(self.labels, axis=0)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        label = self.labels[idx]
        sample = sample[np.newaxis, :, :]
        return torch.tensor(sample), torch.tensor(label)


In [6]:
train_dataset = EEGMotorImageryDataset(train_files)
test_dataset = EEGMotorImageryDataset(test_files)

batch_size = 1
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


In [7]:
import numpy as np

def convert_dataloader_to_numpy(dataloader):
    X_list, y_list = [], []

    for X_batch, y_batch in dataloader:
        X_list.append(X_batch.numpy())
        y_list.append(y_batch.numpy())

    X_array = np.concatenate(X_list, axis=0)
    y_array = np.concatenate(y_list, axis=0)

    return X_array, y_array

X_train, y_train = convert_dataloader_to_numpy(train_loader)
X_test, y_test = convert_dataloader_to_numpy(test_loader)

X_train = X_train.reshape(X_train.shape[0], -1)
X_test = X_test.reshape(X_test.shape[0], -1)

print(f"Train_shape: {X_train.shape}, Test_shape: {X_test.shape}")


Train_shape: (6960, 579), Test_shape: (3480, 579)


In [8]:
np.unique(y_test, return_counts=True)

(array([0, 1, 2]), array([1680,  908,  892]))

## Using SVM

In [9]:
from sklearn.svm import SVC

model = SVC(kernel='rbf', C=1.0)
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
acc = accuracy_score(y_test, y_pred)
print(f"Accuracy: {acc:.2%}")

Accuracy: 53.68%


In [10]:
np.unique(y_pred, return_counts=True)

(array([0, 1, 2]), array([2669,  247,  564]))

## Using XGBoost

In [11]:
import xgboost as xgb

dtrain = xgb.DMatrix(X_train, label=y_train)
dtest = xgb.DMatrix(X_test, label=y_test)

params = {
    "objective": "multi:softmax",
    "num_class": len(np.unique(y_train)),
    "eval_metric": "mlogloss",
    "max_depth": 6,
    "eta": 0.1,
    "subsample": 0.8,
    "colsample_bytree": 0.8,
    "seed": 42
}

num_rounds = 100
bst = xgb.train(params, dtrain, num_rounds)

y_pred = bst.predict(dtest)
accuracy = accuracy_score(y_test, y_pred)
print(f"Accuracy: {accuracy:.2%}")

Accuracy: 54.22%


In [12]:
np.unique(y_pred, return_counts=True)

(array([0., 1., 2.], dtype=float32), array([2314,  503,  663]))

## Using EEGNet

In [13]:
class EEGNet(nn.Module):
    def __init__(self):
        super(EEGNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=(1, 64), padding=(0, 32))
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=(2, 32), groups=16)
        self.bn2 = nn.BatchNorm2d(32)
        self.pool1 = nn.AvgPool2d((1, 8))
        self.dropout1 = nn.Dropout(0.5)
        self.conv3 = nn.Conv2d(32, 64, kernel_size=(1, 16), padding=(0, 8))
        self.bn3 = nn.BatchNorm2d(64)
        self.pool2 = nn.AvgPool2d((1, 4))
        self.flatten = nn.Flatten()
        self.fc = None

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.pool1(x)
        x = self.dropout1(x)
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.pool2(x)
        x = self.flatten(x)

        if self.fc is None:
            feature_dim = x.shape[1]
            self.fc = nn.Linear(feature_dim, 4).to(x.device)

        x = self.fc(x)

        return x

In [14]:
model = EEGNet().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

num_epochs = 20
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for x, y in train_loader:
        y = y.long()
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        outputs = model(x)
        loss = loss_fn(outputs, y)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()

    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for x, y in test_loader:
            x, y = x.to(device), y.to(device)
            outputs = model(x)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == y).sum().item()
            total += y.size(0)
    
    print(f"Epoch {epoch+1}/{num_epochs} | Loss: {running_loss / len(train_loader)} | Test Accuracy: {100 * correct / total:.2f}%")

model.eval()
correct = 0
total = 0
with torch.no_grad():
    for x, y in test_loader:
        x, y = x.to(device), y.to(device)
        outputs = model(x)
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == y).sum().item()
        total += y.size(0)

print(f"Test Accuracy: {100 * correct / total:.2f}%")

Epoch 1/20 | Loss: 1.0347491421287174 | Test Accuracy: 48.30%
Epoch 2/20 | Loss: 0.9626098852476169 | Test Accuracy: 26.47%
Epoch 3/20 | Loss: 0.9505818949990916 | Test Accuracy: 48.48%
Epoch 4/20 | Loss: 0.9416262277059428 | Test Accuracy: 48.48%
Epoch 5/20 | Loss: 0.9340976879707185 | Test Accuracy: 1.61%
Epoch 6/20 | Loss: 0.9311169998003063 | Test Accuracy: 48.30%
Epoch 7/20 | Loss: 0.9297325859948907 | Test Accuracy: 38.28%
Epoch 8/20 | Loss: 0.9293498419428609 | Test Accuracy: 48.56%
Epoch 9/20 | Loss: 0.9220344004103507 | Test Accuracy: 48.94%
Epoch 10/20 | Loss: 0.9225968111456296 | Test Accuracy: 48.25%
Epoch 11/20 | Loss: 0.9201991809125262 | Test Accuracy: 48.30%
Epoch 12/20 | Loss: 0.9213423543620384 | Test Accuracy: 48.88%
Epoch 13/20 | Loss: 0.9192601781862991 | Test Accuracy: 46.29%
Epoch 14/20 | Loss: 0.9181311266550302 | Test Accuracy: 26.70%
Epoch 15/20 | Loss: 0.9187802566964736 | Test Accuracy: 25.72%
Epoch 16/20 | Loss: 0.9181180224143739 | Test Accuracy: 31.70%
Ep