In [1]:
import math
import torch
import torch.nn as nn
import snntorch as snn
from snntorch import surrogate
from data import EEGDataset
from torch import nn, save, load
from torch.utils.data import DataLoader, TensorDataset
import numpy as np

import matplotlib.pyplot as plt

In [2]:
import numpy as np
import pandas as pd
from scipy.io import loadmat
import torch
from torch.utils.data import DataLoader, Dataset

class EEGDataset(Dataset):
    
    def __init__(self, path):
        
        ####################
        ### Loading data ###
        ####################
        
        # '/home/ensismoebius/Documentos/UNESP/doutorado/databases/Base de Datos Habla Imaginada/S01/S01_EEG.mat',
        mat = loadmat(path, struct_as_record=True, squeeze_me=True, mat_dtype=False)
        
        #################################
        ### Setting up the properties ###
        #################################
        
        self.estimuli = {
                1 : "A",
                2 : "E",
                3 : "I",
                4 : "O",
                5 : "U",
                6 : "Arriba",
                7 : "Abajo",
                8 : "Adelante",
                9 : "Atrás",
                10 : "Derecha",
                11 : "Izquierda"
            }
        
        # Modalities
        self.modalities = {
            1 : "Imaginada",
            2 : "Falada" 
        }
        
        # Artfacts
        self.artfacts = {
            -1 : "Indiferente",  
            1 : "Com artefato", 
            2 : "Sem artefato" 
        }

        self.dataframe = pd.DataFrame(mat['EEG'])

        self._joinIntoArray(0, 4096, 'F3', self.dataframe)
        self._joinIntoArray(0, 4096, 'F4', self.dataframe)
        self._joinIntoArray(0, 4096, 'C3', self.dataframe)
        self._joinIntoArray(0, 4096, 'C4', self.dataframe)
        self._joinIntoArray(0, 4096, 'P3', self.dataframe)
        self._joinIntoArray(0, 4096, 'P4', self.dataframe)
        
        self._joinIntoValue(0, 1, 'Modalidade', self.dataframe)
        self._joinIntoValue(0, 1, 'Estímulo', self.dataframe)
        self._joinIntoValue(0, 1, 'Artefatos', self.dataframe)
        
        self.filteredData = self.dataframe[(self.dataframe['Modalidade'] == 1) & (self.dataframe['Artefatos'] == 1)]
        
        self.labels = self.filteredData['Estímulo'].values
        self.data = self.filteredData[['F3','F4','C3','C4','P3','P4']].values
        
    def __len__(self):
        return len(self.labels)
        
    def __getitem__(self, idx):
        sample_data =  torch.tensor(self.data[idx].tolist(),dtype=torch.float) # Convert numpy arrays to tensors
        sample_label = torch.tensor(self.labels[idx].tolist(),dtype=torch.float)  # Ensure labels are also tensors
        return sample_data, sample_label

    # Auxiliary methods            
    def _joinIntoArray(self, start_col, end_col, newColumn, dataframe):
        cols_to_join = dataframe.iloc[:, start_col:end_col].columns
        dataframe[newColumn] = dataframe[cols_to_join].apply(lambda x: np.array(pd.to_numeric(x, errors='coerce')), axis=1)
        dataframe.drop(cols_to_join, axis=1, inplace=True)
    
    def _joinIntoValue(self, start_col, end_col, newColumn, dataframe):
        cols_to_join = dataframe.iloc[:, start_col:end_col].columns
        dataframe[newColumn] = dataframe[cols_to_join].apply(lambda x: pd.to_numeric(x, errors='coerce'), axis=1)
        dataframe.drop(cols_to_join, axis=1, inplace=True)

In [3]:
def get_tensor_size_after_conv(input_size, convolution_kernel_size):
    return 1 +(input_size - convolution_kernel_size)

def get_tensor_size_after_maxpool(input_size, poll_kernel_size):
    return math.floor(input_size / poll_kernel_size)

In [4]:
batch_size = 4
data_path='/home/ensismoebius/Documentos/UNESP/doutorado/databases/Base de Datos Habla Imaginada/S01/S01_EEG.mat'
device = torch.device("cpu")
input_channels = 6  # number of EEG channels
input_timepoints = 4096  # number of time points in each EEG sample

beta = 0.9  # neuron decay rate 
spike_grad = surrogate.fast_sigmoid() # fast sigmoid surrogate gradient

In [5]:
# Create DataLoaders
eegDataset = EEGDataset(data_path)
train_loader = DataLoader(eegDataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(eegDataset, batch_size=batch_size, shuffle=True)

In [6]:
############################################################
######## Calculates the size of the feature vector #########
############################################################

# Calculate the size of the features after the 1st conv and pool layers
feature_size = get_tensor_size_after_conv(input_timepoints,5)
feature_size = get_tensor_size_after_maxpool(feature_size,2)
# Calculate the size of the features after the 2nd conv and pool layers
feature_size = get_tensor_size_after_conv(feature_size,5)
feature_size = get_tensor_size_after_maxpool(feature_size,2)
# Calculate the size of the features after the flatten
feature_size = feature_size * 32

In [7]:
feature_size

32672

In [8]:
################################
###### Creates the model #######
################################

#  Initialize  SNN
net = nn.Sequential(
    nn.Conv1d(in_channels=input_channels, out_channels=16, kernel_size=5),
    nn.MaxPool1d(kernel_size=2),
    
    nn.Conv1d(in_channels=16, out_channels=32, kernel_size=5),
    nn.MaxPool1d(kernel_size=2),

    snn.Leaky(beta=beta, spike_grad=spike_grad, init_hidden=True),
    nn.Flatten(),
    nn.Linear(feature_size, 128),
    snn.Leaky(beta=beta, spike_grad=spike_grad, init_hidden=True, output=True)
).to(device)

In [9]:
from snntorch import utils 

def forward_pass(net, data, num_steps):  
  spk_rec = [] # record spikes over time
  utils.reset(net)  # reset/initialize hidden states for all LIF neurons in net

  for step in range(num_steps): # loop over time
      spk_out, mem_out = net(data) # one time step of the forward-pass
      spk_rec.append(spk_out) # record spikes
  
  return torch.stack(spk_rec)

In [10]:
import snntorch.functional as SF

optimizer = torch.optim.Adam(net.parameters(), lr=2e-3, betas=(0.9, 0.999))
loss_fn = SF.mse_count_loss(correct_rate=0.8, incorrect_rate=0.2)

In [11]:
num_epochs = 30 # run for 1 epoch - each data sample is seen only once
num_steps = 25  # run for 25 time steps 

loss_hist = [] # record loss over iterations 
acc_hist = [] # record accuracy over iterations

# training loop
for epoch in range(num_epochs):
    for i, (data, targets) in enumerate(iter(train_loader)):
        data = data.to(device)
        targets = targets.to(device)

        net.train() 
        spk_rec = forward_pass(net, data, num_steps) # forward-pass
        loss_val = loss_fn(spk_rec, targets) # loss calculation
        optimizer.zero_grad() # null gradients
        loss_val.backward() # calculate gradients
        optimizer.step() # update weights
        loss_hist.append(loss_val.item()) # store loss

        # print every 25 iterations
        if i % 25 == 0:
          print(f"Epoch {epoch}, Iteration {i} \nTrain Loss: {loss_val.item():.2f}")

          # check accuracy on a single batch
          acc = SF.accuracy_rate(spk_rec, targets) 
          acc_hist.append(acc)
          print(f"Accuracy: {acc * 100:.2f}%\n")
        
        # uncomment for faster termination
        # if i == 150:
        #     break


  sample_data =  torch.tensor(self.data[idx].tolist(),dtype=torch.float) # Convert numpy arrays to tensors


Epoch 0, Iteration 0 
Train Loss: 1.00
Accuracy: 0.00%

Epoch 0, Iteration 25 
Train Loss: 10.02
Accuracy: 25.00%

Epoch 0, Iteration 50 
Train Loss: 9.96
Accuracy: 25.00%

Epoch 1, Iteration 0 
Train Loss: 9.88
Accuracy: 25.00%

Epoch 1, Iteration 25 
Train Loss: 9.55
Accuracy: 25.00%

Epoch 1, Iteration 50 
Train Loss: 9.03
Accuracy: 0.00%

Epoch 2, Iteration 0 
Train Loss: 8.76
Accuracy: 25.00%

Epoch 2, Iteration 25 
Train Loss: 8.85
Accuracy: 0.00%

Epoch 2, Iteration 50 
Train Loss: 9.09
Accuracy: 0.00%

Epoch 3, Iteration 0 
Train Loss: 8.62
Accuracy: 0.00%

Epoch 3, Iteration 25 
Train Loss: 5.51
Accuracy: 0.00%

Epoch 3, Iteration 50 
Train Loss: 2.81
Accuracy: 0.00%

Epoch 4, Iteration 0 
Train Loss: 2.87
Accuracy: 0.00%

Epoch 4, Iteration 25 
Train Loss: 2.88
Accuracy: 0.00%

Epoch 4, Iteration 50 
Train Loss: 2.82
Accuracy: 25.00%

Epoch 5, Iteration 0 
Train Loss: 2.88
Accuracy: 0.00%

Epoch 5, Iteration 25 
Train Loss: 2.90
Accuracy: 0.00%

Epoch 5, Iteration 50 
Train L

KeyboardInterrupt: 