In [None]:
import numpy as np
import glob
import os
import pandas as pd
import scipy.signal as signal
import mne
import random
import torchvision
import torch
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset, random_split
from torchvision.datasets import DatasetFolder
from torch.optim.lr_scheduler import LinearLR
from mne import preprocessing, Epochs
import utils
import matplotlib.pyplot as plt

### Data pre-processing

### Band pass filtering and events

In [None]:
eeg_file_path = 'data/eeg_data_A/'
eeg_training_files = glob.glob('data/eeg_data_A/A0*T.gdf')

eeg_eval_files = glob.glob(os.path.join(eeg_file_path, 'A0*E.gdf'))

eeg_train_obj, epoch_train_obj = utils.band_pass_filter(eeg_training_files)
eeg_eval_obj, epoch_eval_obj = utils.band_pass_filter(eeg_eval_files)


### Convert raw data to PyTorch tensor

In [None]:
eeg_data = utils.raw_to_tensor(eeg_train_obj)
eeg_test_data = utils.raw_to_tensor(eeg_eval_obj)
split_size = 1000  

smaller_tensors = []
test_tensors = []

for tensor in eeg_data: 
    splits = utils.split_tensor(tensor, split_size)
    smaller_tensors.extend(splits)

for tensor in eeg_test_data:  
    splits = utils.split_tensor(tensor, split_size)
    test_tensors.extend(splits)

print(len(smaller_tensors))

### Dataset creation

#### User-defined EEGDataset class to work with the DataLoader

In [None]:

class EEGDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]
    
eeg_data= EEGDataset(smaller_tensors)
eeg_test_data = EEGDataset(test_tensors)



#### Augment the data for better generalization, and also for for a larger dataset

In [None]:

aug_data = []

for tensor in eeg_data:
    aug_data.append(tensor)
    shifted_tensor = utils.time_shift(tensor.clone(), shift=40)
    aug_data.append(shifted_tensor)

    noisy_tensor = utils.add_noise(tensor.clone(), noise_level=0.9)
    aug_data.append(noisy_tensor)

    warped_tensor = utils.time_warp(tensor.clone(), factor=0.5)
    aug_data.append(warped_tensor)



max_length = max(tensor.shape[1] for tensor in aug_data)

def pad_tensor(tensor, max_length):
    padding_size = max_length - tensor.shape[1]
    if padding_size > 0:
        return torch.nn.functional.pad(tensor, (0, padding_size))
    return tensor

padded_tensors = [pad_tensor(torch.tensor(tensor, dtype=torch.float32), max_length) for tensor in aug_data]

aug_data_set = EEGDataset(padded_tensors)

eeg_data = EEGDataset(aug_data_set)

#### Load data

In [None]:
eeg_train_set, eeg_val_set = random_split(eeg_data, [7500,3396]) 
BATCH_SIZE = 70
train_loader = DataLoader(eeg_train_set, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(eeg_val_set, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(eeg_test_data, batch_size=BATCH_SIZE, shuffle=False)


#### Convolutional network class definition

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
embed_size = 40
nhead = 10 
num_layers = 6  

class ConvNet(nn.Module):
    def __init__(self, dropout_rate = 0.5):
        super(ConvNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 40, (1, 4), (1, 1))
        self.elu1 = nn.ELU()
        self.dropout1 = nn.Dropout(dropout_rate)

        self.conv2 = nn.Conv2d(40, 40, (25, 1), (1, 1))
        self.elu2 = nn.ELU()
        self.dropout2 = nn.Dropout(dropout_rate)

        self.flatten = nn.Flatten()
      
        # after conv2, the height is 1
        output_size = 40 * 1 * 997  # 40 channels, height 1, width 997
        self.fc1 = nn.Linear(output_size, embed_size)  # adjusted for the correct input size
        self.dropout3 = nn.Dropout(dropout_rate)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x.unsqueeze(1) #becomes [N, 1, 25, 1000]
        x = self.conv1(x)
        x = self.elu1(x)
        x = self.dropout1(x)

        x = self.conv2(x)
        x = self.elu2(x)
        x = self.dropout2(x)

        x = self.flatten(x)
        x = self.dropout3(x)
        x = self.fc1(x)
        return x


#### Encoder and Decoder

In [None]:
conv_net = ConvNet().to(device)
encoder_layer = nn.TransformerEncoderLayer(d_model=embed_size, nhead=nhead,dropout=0.5, dim_feedforward=4).to(device)
decoder_layer = nn.TransformerDecoderLayer(d_model=embed_size, nhead=nhead, dropout=0.5, dim_feedforward=4).to(device)


transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers).to(device)
transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers=num_layers).to(device)
model_params = list(transformer_encoder.parameters()) + list(transformer_decoder.parameters())


#### Training Loop

In [None]:

criterion = nn.L1Loss()    
optimizer = optim.SGD(model_params, 
                        lr=0.001, 
                        momentum=0.9)

scheduler = LinearLR(optimizer, start_factor=1.0, end_factor=0.25, total_iters=10)

train_losses = []
val_losses = []
losses = []
n = 1

epochs = 100
for epoch in range(epochs):
    train_loss = 0.0
    #transformer.train()

    for batch in train_loader: 
        src_data = batch
        src_data = src_data.cuda()

        optimizer.zero_grad()
        
        src_data = conv_net(src_data)
       
        memory = transformer_encoder(src_data)
        
        out_batch = transformer_decoder(src_data, memory)
        loss = criterion(out_batch, src_data)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)
        train_losses.append(avg_train_loss)
        n += 1
    
    val_loss = 0.0
    for batch in val_loader:
        src_data = batch
        src_data = src_data.cuda()
        src_data = conv_net(src_data)
    

        memory = transformer_encoder(src_data)

        out_batch = transformer_decoder(src_data, memory)
        loss = criterion(out_batch, src_data)
        avg_val_loss = val_loss / len(val_loader)
        val_losses.append(avg_val_loss)
        val_loss += loss.item()

    scheduler.step()

    print("Epoch: {} Train Loss: {} Val Loss: {}".format(
                  epoch, 
                  train_loss/len(train_loader), 
                  val_loss/len(val_loader)))


In [None]:

plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Training Loss')
plt.plot(val_losses, label='Validation Loss')
plt.title('Training and Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
test_loss = 0.0

with torch.no_grad():  
    for batch in test_loader:  
        src_data = batch
        src_data = src_data.cuda()

        
        src_data = conv_net(src_data)  
        memory = transformer_encoder(src_data)
        out_batch = transformer_decoder(src_data, memory)

        
        loss = criterion(out_batch, src_data)
        test_loss += loss.item()

avg_test_loss = test_loss / len(test_loader)

print(f"Test Loss: {avg_test_loss}")