In [4]:
import kagglehub
import torch
import torch.nn as nn
import numpy as np
import pandas as pd
import os
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
from torch.utils.data import TensorDataset, DataLoader
import optuna

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

  from .autonotebook import tqdm as notebook_tqdm


device(type='cpu')

In [5]:
# Download dataset
# path_1 = kagglehub.dataset_download("xuannguyenuet2004/12-class-ssvep-eeg-data") proofed to be bad
path_1 = kagglehub.dataset_download("girgismicheal/steadystate-visual-evoked-potential-signals")
path_1 += "/SSVEP (BrainWheel)"
print("Download datasetaset files:", "\n", path_1)

Download datasetaset files: 
 /home/zeyadcode/.cache/kagglehub/datasets/girgismicheal/steadystate-visual-evoked-potential-signals/versions/1/SSVEP (BrainWheel)


In [6]:
# HYPER PARAMS
WINDOW_LENGTH = 128

# PARAMS RELATED TO DATASET ONLY
TRIAL_LENGTH = 640  # frequency of changing.. frequency

In [7]:
class EEGDataset(Dataset):
    def __init__(self, data_path, trial_length, window_length=128, stride=None) -> None:
        """
        todo complete documentation
        trial_length: the number of rows before frequency shift in the dataset

        N: sample length
        C: channels (number of electrodes)
        B: Batch Size
        """
        super().__init__()

        assert trial_length % window_length == 0, "Please choose window_length that divides by trial_length"
        self.data_path = data_path
        self.data = []
        self.labels = []

        if stride == None:
            stride = window_length

        # Load all subjects' data
        subject_dirs = [d for d in os.listdir(data_path) if d.startswith("subject_")]

        for subject_dir in subject_dirs:
            subject_path = os.path.join(data_path, subject_dir)
            sample_files = [f for f in os.listdir(subject_path) if f.endswith(".csv")]

            for sample_file in sample_files:
                sample_file_path = os.path.join(subject_path, sample_file)
                df = pd.read_csv(sample_file_path, header=None, skiprows=1)  # samples x (electrodes + 1)

                freqs = df.iloc[:, -1].values

                # first get of shape trial_length x freq
                n_rows = len(freqs)
                n_trials = n_rows // trial_length
                for t in range(n_trials):
                    start = t * trial_length
                    end = start + trial_length
                    block_freqs = freqs[start:end]  # shape Nx1

                    assert np.all(block_freqs == block_freqs[0]), f"Mixed labels in trial {t} of {sample_file}"

                    trial_label = np.ones([window_length]) * freqs[0]
                    # trial_label = block_freqs[0]
                    trial_data = df.iloc[start:end, :-1].values  # shape [trial_length x C]

                    for i in range(0, trial_length - window_length + 1, stride):
                        win = trial_data[i: i + window_length, :] # trial_window x C
                        self.data.append(win.astype(np.float32))
                        self.labels.append(trial_label)


        self.data = np.array(self.data)  # B x window_length x c
        self.labels = np.array(self.labels) # B x 1 = 5200 x 1 

        self.data = torch.tensor(self.data)
        self.labels = torch.tensor(self.labels)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

    def __len__(self):
        return len(self.data)


dataset = EEGDataset(path_1, TRIAL_LENGTH, WINDOW_LENGTH, stride=WINDOW_LENGTH)
unique_freqs = torch.unique(dataset.labels)
unique_freqs

tensor([ 8.5700, 10.0000, 12.0000], dtype=torch.float64)

In [8]:
X = dataset.data.numpy()
Y = dataset.labels.numpy()

X_train_val, X_test, Y_train_val, Y_test = train_test_split(
    X,
    Y,
    test_size=0.2,
    random_state=42,
    stratify=Y,
)

X_train, X_val, Y_train, Y_val = train_test_split(
    X_train_val,
    Y_train_val,
    test_size=0.2,
    random_state=42,
    stratify=Y_train_val,
)

# Turn to tensors
X_train_t = torch.from_numpy(X_train).float()
Y_train_t = torch.from_numpy(Y_train).long()

X_val_t   = torch.from_numpy(X_val).float()
Y_val_t   = torch.from_numpy(Y_val).long()

X_test_t  = torch.from_numpy(X_test).float()
Y_test_t  = torch.from_numpy(Y_test).long()

# Build dataset
train_ds = TensorDataset(X_train_t, Y_train_t)
val_ds   = TensorDataset(X_val_t,   Y_val_t)
test_ds  = TensorDataset(X_test_t,  Y_test_t)

print(f"train len: {len(train_ds)} - val len {len(val_ds)}, test len: {len(test_ds)}")
print(X.shape, Y.shape)

train len: 3328 - val len 832, test len: 1040
(5200, 128, 14) (5200, 128)


In [9]:
class SSVEPClassifier(nn.Module):
    def __init__(self, input_size: int,  out_size: int, hidden_size: int, num_layers: int, dropout: float, bidirectional: bool):
        super().__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.dir_mult = 2 if bidirectional else 1
        
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, dropout=dropout, bidirectional=bidirectional, device=device, batch_first=True)
        self.fc_out = nn.Sequential(
            nn.Linear(hidden_size, out_size),
        )

    def forward(self, x: torch.Tensor):
        h0 = torch.zeros([self.num_layers * self.dir_mult, x.shape[0], self.hidden_size], device=device)
        c0 = torch.zeros([self.num_layers * self.dir_mult, x.shape[0], self.hidden_size], device=device)

        out, (hn, cn) = self.lstm(x, (h0, c0)) # out shape [B x window_length x out_shape]
        return self.fc_out(out[:, -1])



In [11]:
input_size = dataset.data[0].shape[1] # of shape C x T, get the T out we care about it
out_size = len(unique_freqs)

# tunable parameters
batch_size = 32
lr = 1e-3
hidden_size=128
num_layers = 2
dropout=0
bidirectional = False

In [None]:
model = SSVEPClassifier(input_size, out_size, hidden_size, num_layers, dropout, bidirectional)
model(train_ds[0][0].unsqueeze(0))


# train_loader = DataLoader(train_ds,
#                           batch_size=batch_size,
#                           shuffle=True,
#                           drop_last=True)

# val_loader = DataLoader(val_ds,
#                         batch_size=batch_size,
#                         shuffle=False)

# test_loader = DataLoader(test_ds,
#                          batch_size=batch_size,
#                          shuffle=False)

tensor([[0.0515, 0.0717, 0.1270]], grad_fn=<AddmmBackward0>)