In [None]:
# Importing libraries
import torch
import argparse
import easydict
import numpy as np
import matplotlib.pyplot as plt
import math
import pywt
from skimage.transform import resize
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Bidirectional, Dense, BatchNormalization, Dropout, Lambda
from keras.utils import to_categorical
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers import Adam
import tensorflow.keras.backend as K
from tensorflow import keras
from keras.callbacks import Callback
from tensorflow.keras.layers import Input, Conv2D, Activation, Add, AveragePooling2D, Flatten
from tensorflow.keras.models import Model
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import sys
import os
import random
import time
import torch; torch.utils.backcompat.broadcast_warning.enabled = True
from torchvision import transforms, datasets
import torch.nn.functional as F
import torch.optim
import torch.backends.cudnn as cudnn; cudnn.benchmark = True
from scipy.fftpack import fft, rfft, fftfreq, irfft, ifft, rfftfreq
from scipy import signal
import importlib
from torch.autograd import Variable
import gc
import scipy


## Index
40 classi:
- wavelet + resnet (tf)
- data reduction + bilstm (tf)
- bilstm (torch spampinato)
- cnn (torch)
  
2 classi:
- bilstm (tf)
- cnn (torch)

Getting the dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

In [4]:
# Importing the files from the data folder
file_path = 'data/eeg_5_95_std.pth'
#eeg5_95 = torch.load(file_path)
splits_all = torch.load('drive/MyDrive/Tesi_Valeau/data/block_splits_by_image_all.pth')
#splits_single = torch.load('data/block_splits_by_image_single.pth')
eeg14_70 = torch.load('drive/MyDrive/Tesi_Valeau/data/eeg_14_70_std.pth')
#eeg55_95 = torch.load('drive/MyDrive/Tesi_Valeau/data/eeg_55_95_std.pth')
eeg_raw = torch.load('drive/MyDrive/Tesi_Valeau/data/eeg_signals_raw_with_mean_std.pth')



In [5]:
# We focus on the dataset filtered with notch filter and 14-71 Hz
dataset14 = eeg14_70['dataset']
labels14  = eeg14_70['labels']
images14 = eeg14_70['images']


In [None]:
# Now we perform some further data pre-processing for the 40 class classification
datasettone = [dataset14[i]['eeg'] for i in range(len(dataset14))]
dataset = []
for el in datasettone:
    dataset.append(el[:,40:490])
print(len(dataset), datasettone[0].shape, dataset[0].shape)

Wavelet transform

In [None]:
import scipy

def create_wavelet_image(signal_tensor):
    num_channels, time_length = signal_tensor.shape
    #signal_array = signal_tensor#.detach().cpu().numpy()
    # Sample the length of the signal to one third of the original
    signal_array=scipy.signal.decimate(signal_tensor, 3, axis=1)

    wavelet_images = []
    scale = [2**(x/2) for x in range(33)] # option to use this scale instead of np.arange(1,33), to be fed into the pywt.cwt() function
    for channel in range(num_channels):

        # Perform wavelet transform for the current channel
        coefficients, _ = pywt.cwt(signal_array[channel], np.arange(1,33), 'gaus2')
        normalized_coefficients = (coefficients - np.min(coefficients)) / (np.max(coefficients) - np.min(coefficients))
        wavelet_images.append(normalized_coefficients)

    # Stacking wavelet images along a new dimension to create a multi-channel image
    multi_channel_image = np.stack(wavelet_images, axis=0)
    multi_channel_image_tensor = torch.tensor(multi_channel_image)
    return multi_channel_image_tensor

# As an exaple, we take the first signal of the dataset to see the wavelet
signal_tensor = dataset[0]
print(signal_tensor.shape, dataset[0].shape)
# create multi-channel wavelet image for first sample
wavelet_image = create_wavelet_image(signal_tensor)
print(wavelet_image.size())
# Plot wavelet image
plt.imshow(wavelet_image[0])
plt.colorbar(label='Normalized Amplitude')
plt.title('Wavelet Transform Image (Channel 0)')
plt.xlabel('Pixel')
plt.ylabel('Pixel')
plt.show()


In [None]:
# Creating the wavelet dataset
generator1 = torch.Generator().manual_seed(42)
train_id, prov_id = torch.utils.data.random_split(range(len(dataset)), [0.7, 0.3], generator=generator1)
val_id, test_id = torch.utils.data.random_split(prov_id, [0.5, 0.5], generator=generator1)

wave_dataset_train = []
wave_dataset_val = []
wave_dataset_test = []
data_train = [dataset[i] for i in train_id]
data_val = [dataset[i] for i in val_id]
data_test = [dataset[i] for i in test_id]

print('done')
for x in range(len(data_train)):
    wave_dataset_train.append(create_wavelet_image(data_train[x]))
#    if x%10 == 0:
#        print(x)

for x in range(len(data_val)):
    wave_dataset_val.append(create_wavelet_image(data_val[x]))

for x in range(len(data_test)):
    wave_dataset_test.append(create_wavelet_image(data_test[x]))
print(len(wave_dataset_train), wave_dataset_train[0].shape)


In [None]:
# Getting the labels and changing everything to be a numpy array
y_train = np.array([dataset14[i]['label'] for i in train_id])
y_val = np.array([dataset14[i]['label'] for i in val_id])
y_test = np.array([dataset14[i]['label'] for i in test_id])
print(len(y_train))

X_train_w = np.array([tensor.numpy() for tensor in wave_dataset_train])
X_val_w = np.array([tensor.numpy() for tensor in wave_dataset_val])
X_test_w = np.array([tensor.numpy() for tensor in wave_dataset_test])


CNN with skip connections

In [None]:
# CNN with skip connection model
def convolutional_block(x, filters, strides=(1, 1)):
    shortcut = x

    # first convolutional layer
    x = Conv2D(filters, (3, 3), strides=strides, padding='same')(x)
    x = BatchNormalization()(x)
    x = Activation('relu')(x)

    # second convolutional layer
    x = Conv2D(filters, (3, 3), padding='same')(x)
    x = BatchNormalization()(x)

    # skip connection
    shortcut = Conv2D(filters, (1, 1), strides=strides, padding='same')(shortcut)
    shortcut = BatchNormalization()(shortcut)

    x = Add()([x, shortcut])
    x = Activation('relu')(x)
    return x

def residual_block(x, filters):
    for _ in range(6):
        x = convolutional_block(x, filters)
    return x

input_layer = Input(shape=(128, 32, 150))

# First convolutional block
x = Conv2D(64, (7, 7), strides=(2, 2), padding='same')(input_layer)
x = BatchNormalization()(x)
x = Activation('relu')(x)

# Residual blocks
x = residual_block(x, 64)
#x = residual_block(x, 128)
#x = residual_block(x, 256)

# Final layers
x = BatchNormalization()(x)
x = Activation('relu')(x)
x = AveragePooling2D(pool_size=(2, 2))(x)
x = Flatten()(x)
output_layer_w = Dense(40, activation='softmax')(x)

model_w = Model(inputs=input_layer, outputs=output_layer_w)
model_w.summary()


In [14]:
# for saving metrica
class MetricsCallback(Callback):
    def __init__(self):
        super(MetricsCallback, self).__init__()
        self.train_losses = []
        self.train_accuracies = []
        self.val_losses = []
        self.val_accuracies = []

    def on_epoch_end(self, epoch, logs=None):
        self.train_losses.append(logs['loss'])
        self.train_accuracies.append(logs['accuracy'])
        self.val_losses.append(logs['val_loss'])
        self.val_accuracies.append(logs['val_accuracy'])

metrics_callback = MetricsCallback()


In [None]:
# Here we compile and train the model
metrics2 = MetricsCallback()


model_w.compile(optimizer=Adam(learning_rate=0.001),
              loss=SparseCategoricalCrossentropy(),
              metrics=['accuracy'])

history = model_w.fit(X_train_w, y_train,
                    batch_size=64,
                    epochs=200,
                    validation_data=(X_val_w, y_val), callbacks=[metrics2], verbose = 1)


loss_w, accuracy_w = model_w.evaluate(X_test_w, y_test)
print("Test Loss:", loss_w)
print("Test Accuracy:", accuracy_w)

# save training and validation metrics to text file
with open('metrics2.txt', 'w') as f:
    f.write("Epoch\tTrain Loss\tTrain Accuracy\tVal Loss\tVal Accuracy\n")
    for epoch, (train_loss, train_accuracy, val_loss, val_accuracy) in enumerate(zip(metrics_callback.train_losses,
                                                                                      metrics_callback.train_accuracies,
                                                                                      metrics_callback.val_losses,
                                                                                      metrics_callback.val_accuracies)):
        f.write(f"{epoch+1}\t{train_loss}\t{train_accuracy}\t{val_loss}\t{val_accuracy}\n")

with open('metrics.txt', 'a') as f:
    f.write(f"\nTest Loss: {loss_w}\n")
    f.write(f"Test Accuracy: {accuracy_w}\n")

#### Data dimension reduction

What we do here is chaning the shape of the input signals: from a 128x500 tensor, each input becomes a 60x8 tensor. We do so by dividing the electrodos in three sections: middle (8 electrodes), right and left (60 electrodes each). They we combine the information in this way:
$$
 C = [c_i]_{i=1}^{128}  \\
 C^m = [c_i]_i^{l_{ch}} \\
 d_j = c_j^l-c_j^r, \ j = 1 ... l_{ch}, \\ D = [d_j]_{j = 1}^{l_{ch}} \\
 S = [D \ C^{m^T}]
$$

Below, we do what we just explained, with the correct indices and electrode configuration.

In [None]:
# Channel mapping (second version)
#1-32 green channels, 33-64 yellow channels, 65-96 red channels, 97-128 white channels
channel_map = {
    # Green channels
    1:'Fp1', 2:'Fp2', 3: 'F7', 4:'F3', 5:'Fz', 6:'F4', 7:'F8', 8:'FC5', 9:'FC1',
    10:'FC2', 11:'FC6', 12:'T7', 13:'C3', 14:'Cz', 15: 'C4', 16: 'T8', 17: 'TP9',
    18:'CP5', 19:'CP1', 20:'CP2', 21:'CP6', 22:'TP10', 23:'P7', 24:'P3', 25:'Pz',
    26:'P4', 27:'P8', 28:'PO9', 29:'O1', 30:'Oz', 31:'O2', 32:'P010',
    # Yellow channels
    33:'AF7', 34:'AF3', 35:'AF4', 36:'AF8', 37:'F5', 38:'F1', 39:'F2', 40:'F6',
    41:'FT9', 42:'FT7', 43:'FC3', 44:'FC4', 45:'FT8', 46:'FT10', 47:'C5', 48:'C1',
    49:'C2', 50:'C6', 51:'TP7', 52:'CP3', 53:'CPz', 54:'CP4', 55:'TP8', 56:'P5',
    57:'P1', 58:'P2', 59:'P6', 60:'PO7', 61:'PO3', 62:'POz', 63:'PO4', 64:'PO8',
    # Red channels
    65: 'Fpz', 66: 'F9', 67: 'AFF5h', 68: 'AFF1h', 69: 'AFF2h', 70: 'AFF6h',
    71: 'F10', 72: 'FTT9h', 73: 'FTT7h', 74: 'FCC5h', 75: 'FCC3h', 76: 'FCC1h',
    77: 'FCC2h', 78: 'FCC4h', 79: 'FCC6h', 80: 'FTT8h', 81: 'FTT10h', 82: 'TPP9h',
    83: 'TPP7h', 84: 'CPP5h', 85: 'CPP3h', 86: 'CPP1h', 87: 'CPP2h', 88: 'CPP4h',
    89: 'CPP6h', 90: 'TPP8h', 91: 'TPP10h', 92: 'POO9h', 93: 'POO1', 94: 'POO2',
    95: 'POO10h', 96: 'Iz',
    # White channels
    97: 'AFp1', 98: 'AFp2', 99: 'FFT9h', 100: 'FFT7h', 101: 'FFC5h', 102: 'FFC3h',
    103: 'FFC1h', 104: 'FFC2h', 105: 'FFC4h', 106: 'FFC6h', 107: 'FFT8h', 108: 'FFT10h',
    109: 'TTP7h', 110: 'CCP5h', 111: 'CCP3h', 112: 'CCP1h', 113: 'CCP2h', 114:'CCP4h',
    115: 'CCP6h', 116: 'TTP8h', 117: 'P9', 118: 'PPO9h', 119: 'PPO5h', 120: 'PPO1h',
    121: 'PPO2h', 122: 'PPO6h', 123: 'PPO10h', 124: 'P10', 125: 'I1', 126: 'OI1h',
    127: 'OI2h', 128:'I2'
}

In [None]:
# middle indexes
m = [64, 4, 13, 52,24,61,29,95]
# totality of green electrodes, then divided in right and left parts
lg = [1,3,4,8,9,12,13,17,18,19,23,24,28,29]
rg = [2,7,6,11,10,16,15,22,21,10,27,26,32,31]
lg = [x-1 for x in lg]
rg = [x-1 for x in rg]
# Same for yellow electrodes
ly = [1,2,5,6,9,10,11,15,16,19,20,24,25,28,29]
ry = [4,3,8,7,14,13,12,18,17,23,22,27,26,32,31]
ly =[x +31 for x in ly ]
ry =[x +31 for x in ry ]
# red channels
lr= [2,3,4,8,9,10,11,12,18,19,20,21,22,28,29]
rr =[7,6,5,17,16,15,14,13,27,26,25,24,23,31,30]
lr =[x +63 for x in lr ]
rr =[x +63 for x in rr ]
# white channels
lw=[1,3,4,5,6,7,13,14,15,16,21,22,23,24,29,30]
rw=[2,12,11,10,9,8,20,19,18,17,28,27,26,25,32,31]
lw =[x +95 for x in lw ]
rw =[x +95 for x in rw ]
print(len(lw), len(rw))
# Ordered indexes of right and left electrodes
l = lg + ly + lr+lw
r = rg +ry + rr+rw
print(max(r), max(l))

# creating a new dataset with reduced dimensionlity
reduced_data = []
for el in dataset:
    m_ch = el[m , :]
    d_ch = el[l,:]-el[r,:]
    S = torch.mm(d_ch, m_ch.t())
    reduced_data.append(S)


print(len(reduced_data), reduced_data[0].shape)

In [26]:
# ID Split for reduced dataset
labels = [dataset14[i]['label'] for i in range(len(dataset14))]
generator1 = torch.Generator().manual_seed(42)
train_id, val_id, test_id = torch.utils.data.random_split(range(len(dataset)), [0.7, 0.15, 0.15], generator=generator1)


In [None]:
# Dataset split for LSTM
X_train = [reduced_data[i] for i in train_id]
X_val = [reduced_data[i] for i in val_id]
X_test = [reduced_data[i] for i in test_id]
y_train = np.array([labels[i] for i in train_id])
y_val = np.array([labels[i] for i in val_id])
y_test = np.array([labels[i] for i in test_id])

# Convert the list of torch tensors to a numpy array
X_train = np.array([tensor.numpy() for tensor in X_train])
X_val = np.array([tensor.numpy() for tensor in X_val])
X_test = np.array([tensor.numpy() for tensor in X_test])

print(len(X_train))


In [None]:
# Bi-LSTM model for dimensionally reduced data
input_shape = (60,8)
input_layer = tf.keras.Input(shape=input_shape)

def bi_lstm_block(x, units, return_sequences=True):
    x = Bidirectional(LSTM(units, return_sequences = return_sequences))(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    return x

#LSTM blocks
x = bi_lstm_block(input_layer, 128)
x = bi_lstm_block(x, 128)
#x = bi_lstm_block(x, 128)
#x = bi_lstm_block(x, 128)
x = bi_lstm_block(x, 128, return_sequences=False)


# Dense blocks
x = Dense(128, activation='relu')(x)
x = Dense(64, activation='relu')(x)
x = Dense(32, activation='relu')(x)

output_layer = Dense(40, activation='softmax')(x)

model = keras.Model(inputs=input_layer, outputs = output_layer)
model.summary()



In [30]:
# for saving metrica
class MetricsCallback(Callback):
    def __init__(self):
        super(MetricsCallback, self).__init__()
        self.train_losses = []
        self.train_accuracies = []
        self.val_losses = []
        self.val_accuracies = []

    def on_epoch_end(self, epoch, logs=None):
        self.train_losses.append(logs['loss'])
        self.train_accuracies.append(logs['accuracy'])
        self.val_losses.append(logs['val_loss'])
        self.val_accuracies.append(logs['val_accuracy'])

metrics_callback = MetricsCallback()


In [None]:
# Training of the Bi-LSTM model
optimizer = Adam(learning_rate=0.0001, clipnorm=1.0)
model.compile(loss='sparse_categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])


print("Shape of X_train:", X_train.shape)
print("Shape of y_train:", y_train.shape)

history = model.fit(X_train, y_train,
                    batch_size=64,
                    epochs=250,
                    validation_data=(X_val, y_val), callbacks=[metrics_callback],verbose=1)


test_loss, test_accuracy = model.evaluate(X_test, y_test)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)

# save training and validation metrics to text file
with open('metrics.txt', 'w') as f:
    f.write("Epoch\tTrain Loss\tTrain Accuracy\tVal Loss\tVal Accuracy\n")
    for epoch, (train_loss, train_accuracy, val_loss, val_accuracy) in enumerate(zip(metrics_callback.train_losses,
                                                                                      metrics_callback.train_accuracies,
                                                                                      metrics_callback.val_losses,
                                                                                      metrics_callback.val_accuracies)):
        f.write(f"{epoch+1}\t{train_loss}\t{train_accuracy}\t{val_loss}\t{val_accuracy}\n")

# save test metrics
with open('metrics.txt', 'a') as f:
    f.write(f"\nTest Loss: {test_loss}\n")
    f.write(f"Test Accuracy: {test_accuracy}\n")

Full dataset without dimension reduction.  Bi-LSTM model implemented with PyTorch, using Percive Lab code base (da runnare)

In [33]:
# Defying all the paramethers
opt = easydict.EasyDict({
    "eeg_dataset": r"drive/MyDrive/Tesi_Valeau/data/eeg_14_70_std.pth",
    "splits_path": r"drive/MyDrive/Tesi_Valeau/data/block_splits_by_image_all.pth",
    "split_num" : 0,
    "subject": 0,
    "time_low": 20,
    "time_high":460,
    "model_type":'bilstm',
    "model_params":'',
    "pretrained_net":'',
    "batch_size":512,
    "optim":"Adam",
    "learning_rate":0.001,
    "learning_rate_decay_by":0.5,
    "learning_rate_decay_every":10,
    "data-workers":4,
    "epochs":1700,
    "saveCheck":200,
    "no_cuda":False})


In [35]:
# Loading the dataset
class EEGDataset:

    # Constructor
    def __init__(self, eeg_signals_path):
        # Load EEG signals
        loaded = torch.load(eeg_signals_path)
        if opt.subject!=0:
            self.data = [loaded['dataset'][i] for i in range(len(loaded['dataset']) ) if loaded['dataset'][i]['subject']==opt.subject]
        else:
            self.data=loaded['dataset']
        self.labels = loaded["labels"]
        self.images = loaded["images"]

        # Compute size
        self.size = len(self.data)

    # Get size
    def __len__(self):
        return self.size

    # Get item
    def __getitem__(self, i):
        # Process EEG
        eeg = self.data[i]["eeg"].float().t()
        eeg = eeg[opt.time_low:opt.time_high,:]

        if opt.model_type == "model10":
            eeg = eeg.t()
            eeg = eeg.view(1,128,opt.time_high-opt.time_low)
        # Get label
        label = self.data[i]["label"]
        # Return
        return eeg, label

# Splitter class
class Splitter:

    def __init__(self, dataset, split_path, split_num=0, split_name="train"):
        # Set EEG dataset
        self.dataset = dataset
        # Load split
        loaded = torch.load(split_path)
        self.split_idx = loaded["splits"][split_num][split_name]
        # Filter data
        self.split_idx = [i for i in self.split_idx if 450 <= self.dataset.data[i]["eeg"].size(1) <= 600]
        # Compute size
        self.size = len(self.split_idx)

    # Get size
    def __len__(self):
        return self.size

    # Get item
    def __getitem__(self, i):
        # Get sample from dataset
        eeg, label = self.dataset[self.split_idx[i]]
        # Return
        return eeg, label


# Load dataset
dataset = EEGDataset(opt.eeg_dataset)
# Create loaders
loaders = {split: DataLoader(Splitter(dataset, split_path = opt.splits_path, split_num = opt.split_num, split_name = split), batch_size = opt.batch_size, drop_last = True, shuffle = True) for split in ["train", "val", "test"]}


In [None]:
# New implementation of Bi-LSTM network on pytorch

class Model(nn.Module):

    def __init__(self, input_size=128, hidden_sizes=[128,128, 128,128,64], dense_sizes=[64,32], output_size=128, dropout_probs=[0.5,0.3]):
        super().__init__()

        # LSTM blocks
        self.lstm1 = nn.LSTM(input_size, hidden_sizes[0], batch_first=True, bidirectional=True)
        self.batchnorm1 = nn.BatchNorm1d(hidden_sizes[0]*2)
        self.dropout1 = nn.Dropout(dropout_probs[0])

        self.lstm2 = nn.LSTM(hidden_sizes[0]*2, hidden_sizes[1], batch_first=True, bidirectional=True)
        self.batchnorm2 = nn.BatchNorm1d(hidden_sizes[1]*2)
        self.dropout2 = nn.Dropout(dropout_probs[0])

        self.lstm3 = nn.LSTM(hidden_sizes[1]*2, hidden_sizes[2], batch_first=True, bidirectional=True)

        self.batchnorm3 = nn.BatchNorm1d(hidden_sizes[2]*2)
        self.dropout3 = nn.Dropout(dropout_probs[0])

        self.lstm4 = nn.LSTM(hidden_sizes[2]*2 , hidden_sizes[3], batch_first=True, bidirectional=True)
        self.batchnorm4 = nn.BatchNorm1d(hidden_sizes[3] *2)
        self.dropout4 = nn.Dropout(dropout_probs[0])

        self.lstm5 = nn.LSTM(hidden_sizes[3]*2 , hidden_sizes[4], batch_first=True, bidirectional=True)


        # Dense blocks
        # To be noted: when changing the number of layers remember to change hidden_sizes index
        self.dense1 = nn.Linear(hidden_sizes[4]*2 , dense_sizes[0])
        self.batchnorm5 = nn.BatchNorm1d(dense_sizes[0])
        self.dropout5 = nn.Dropout(dropout_probs[1])

        self.dense2 = nn.Linear(dense_sizes[0], dense_sizes[1])
        self.batchnorm6 = nn.BatchNorm1d(dense_sizes[1])
        self.dropout6 = nn.Dropout(dropout_probs[1])


        self.output = nn.Linear(dense_sizes[1], output_size)
        self.classifier = nn.Linear(output_size, 40)

    def forward(self, x):
        # Prepare LSTM initial states
        batch_size = x.size(0)
        def init_lstm_state(hidden_size):
            h_0 = torch.zeros(2, batch_size, hidden_size)  # 2 for bidirectional
            c_0 = torch.zeros(2, batch_size, hidden_size)
            if x.is_cuda:
                h_0, c_0 = h_0.cuda(), c_0.cuda()
            return (Variable(h_0), Variable(c_0))

        lstm_init1 = init_lstm_state(self.lstm1.hidden_size)
        lstm_init2 = init_lstm_state(self.lstm2.hidden_size)
        lstm_init3 = init_lstm_state(self.lstm3.hidden_size)

        lstm_init4 = init_lstm_state(self.lstm4.hidden_size)
        lstm_init5 = init_lstm_state(self.lstm5.hidden_size)

        # Forward through LSTM layers
        #print(x.shape, type(x),x )
        x, _ = self.lstm1(x, lstm_init1)
        x = self.batchnorm1(x.permute(0, 2, 1)).permute(0, 2, 1)
        x = self.dropout1(x)

        x, _ = self.lstm2(x, lstm_init2)
        x = self.batchnorm2(x.permute(0, 2, 1)).permute(0, 2, 1)
        x = self.dropout2(x)

        x, _ = self.lstm3(x, lstm_init3)
        x = self.batchnorm3(x.permute(0, 2, 1)).permute(0, 2, 1)
        x = self.dropout3(x)

        x, _ = self.lstm4(x, lstm_init4)
        x = self.batchnorm4(x.permute(0, 2, 1)).permute(0, 2, 1)
        x = self.dropout4(x)

        x, _ = self.lstm5(x, lstm_init5)
        x = x[:, -1, :]

        # Forward through Dense layers
        x = F.relu(self.dense1(x))
        x = self.batchnorm5(x)
        x = self.dropout5(x)

        x = F.relu(self.dense2(x))
        x = self.batchnorm6(x)
        x = self.dropout6(x)

        x = F.relu(self.output(x))
        x = self.classifier(x)

        return x

model = Model()
print(model)

In [None]:
# Load model

# Empty cache
gc.collect()
torch.cuda.empty_cache()
torch.cuda.memory_summary(device=None, abbreviated=False)

# importing extra parameters
model_options = {key: int(value) if value.isdigit() else (float(value) if value[0].isdigit() else value) for (key, value) in [x.split("=") for x in opt.model_params]}
# Create discriminator model/optimizer
model = Model(**model_options)
optimizer = getattr(torch.optim, opt.optim)(model.parameters(), lr = opt.learning_rate, weight_decay = 1e-5)

# Setup CUDA
if not opt.no_cuda:
    model.cuda()
    print("Copied to CUDA")

if opt.pretrained_net != '':
        model = torch.load(opt.pretrained_net)
        print(model)

# initialize training,validation, test losses and accuracy list
losses_per_epoch={"train":[], "val":[],"test":[]}
accuracies_per_epoch={"train":[],"val":[],"test":[]}

best_accuracy = 0
best_accuracy_val = 0
best_epoch = 0
# Start training

predicted_labels = []
correct_labels = []

for epoch in range(1, opt.epochs+1):
    # Initialize loss/accuracy variables
    losses = {"train": 0, "val": 0, "test": 0}
    accuracies = {"train": 0, "val": 0, "test": 0}
    counts = {"train": 0, "val": 0, "test": 0}
    # Adjust learning rate for SGD
    if opt.optim == "SGD":
        lr = opt.learning_rate * (opt.learning_rate_decay_by ** (epoch // opt.learning_rate_decay_every))
        for param_group in optimizer.param_groups:
            param_group['lr'] = lr
    # Process each split
    for split in ("train", "val", "test"):
        # Set network mode
        if split == "train":
            model.train()
            torch.set_grad_enabled(True)
        else:
            model.eval()
            torch.set_grad_enabled(False)
        # Process all split batches
        for i, (input, target) in enumerate(loaders[split]):
            # Check CUDA
            if not opt.no_cuda:
                input = input.to("cuda")
                target = target.to("cuda")
            # Forward
            output = model(input)

            # Compute loss
            loss = F.cross_entropy(output, target)
            losses[split] += loss.item()
            # Compute accuracy
            _,pred = output.data.max(1)
            correct = pred.eq(target.data).sum().item()
            accuracy = correct/input.data.size(0)
            accuracies[split] += accuracy
            counts[split] += 1
            # Backward and optimize
            if split == "train":
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

    # Print info at the end of the epoch
    if accuracies["val"]/counts["val"] >= best_accuracy_val:
        best_accuracy_val = accuracies["val"]/counts["val"]
        best_accuracy = accuracies["test"]/counts["test"]
        best_epoch = epoch

    TrL,TrA,VL,VA,TeL,TeA=  losses["train"]/counts["train"],accuracies["train"]/counts["train"],losses["val"]/counts["val"],accuracies["val"]/counts["val"],losses["test"]/counts["test"],accuracies["test"]/counts["test"]
    print("Model: {11} - Subject {12} - Time interval: [{9}-{10}]  [{9}-{10} Hz] - Epoch {0}: TrL={1:.4f}, TrA={2:.4f}, VL={3:.4f}, VA={4:.4f}, TeL={5:.4f}, TeA={6:.4f}, TeA at max VA = {7:.4f} at epoch {8:d}".format(epoch,
                                                                                                         losses["train"]/counts["train"],
                                                                                                         accuracies["train"]/counts["train"],
                                                                                                         losses["val"]/counts["val"],
                                                                                                         accuracies["val"]/counts["val"],
                                                                                                         losses["test"]/counts["test"],
                                                                                                         accuracies["test"]/counts["test"],
                                                                                                         best_accuracy, best_epoch, opt.time_low,opt.time_high, opt.model_type,opt.subject))

    losses_per_epoch['train'].append(TrL)
    losses_per_epoch['val'].append(VL)
    losses_per_epoch['test'].append(TeL)
    accuracies_per_epoch['train'].append(TrA)
    accuracies_per_epoch['val'].append(VA)
    accuracies_per_epoch['test'].append(TeA)

    if epoch%opt.saveCheck == 0:
                torch.save(model, '%s__subject%d_epoch_%d.pth' % (opt.model_type, opt.subject,epoch))

print(f"TrL: {min((val, id+1) for id, val in enumerate(losses_per_epoch['train']))}")
print(f"VL: {min((val, id+1) for id, val in enumerate(losses_per_epoch['val']))}")
print(f"TeL: {min((val, id+1) for id, val in enumerate(losses_per_epoch['test']))}")
print(f"TrV: {max((val, id+1) for id, val in enumerate(accuracies_per_epoch['train']))}")
print(f"VA: {max((val, id+1) for id, val in enumerate(accuracies_per_epoch['val']))}")
print(f"TeA: {max((val, id+1) for id, val in enumerate(accuracies_per_epoch['test']))}")


CNN with PyTorch for 40 classes

In [None]:
# Adjust the dataset for 40 class CNN
datasettone = [dataset14[i]['eeg'] for i in range(len(dataset14))]
dataset = []
for el in datasettone:
    dataset.append(el[:,40:490])


In [None]:
# ID Split for 40 class CNN
labels = [dataset14[i]['label'] for i in range(len(dataset14))]
generator1 = torch.Generator().manual_seed(42)
train_id, val_id = torch.utils.data.random_split(range(len(dataset)), [0.8, 0.2], generator=generator1)

# Dataset split for CNN
X_train = torch.stack([dataset[i] for i in train_id])
X_val = torch.stack([dataset[i] for i in val_id])
y_train = torch.tensor([labels[i] for i in train_id])
y_val = torch.tensor([labels[i] for i in val_id])




In [None]:

torch.cuda.empty_cache()

# CNN model
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv1d(128, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, padding=1)
        self.conv4 = nn.Conv1d(256, 512, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(2)
        self.dropout = nn.Dropout(0.5)

        self.flatten_size = self._get_flatten_size()
        self.fc1 = nn.Linear(self.flatten_size, 1024, bias=True)
        self.fc2 = nn.Linear(1024, 40, bias=True)

    def _get_flatten_size(self):
        x = torch.randn(1, 128, 450)
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = self.pool(torch.relu(self.conv3(x)))
        x = self.pool(torch.relu(self.conv4(x)))
        return x.numel()

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.pool(x)
        x = torch.relu(self.conv2(x))
        x = self.pool(x)
        x = torch.relu(self.conv3(x))
        x = self.pool(x)
        x = torch.relu(self.conv4(x))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.dropout(torch.relu(self.fc1(x)))
        x = self.fc2(x)
        return x


# Moving the dataset to the GPU
x_train = X_train.to(device="cuda")
x_val = X_val.to(device="cuda")
y_train = y_train.to(device='cuda')
y_val = y_val.to(device='cuda')


train_dataset = TensorDataset(x_train, y_train)
val_dataset = TensorDataset(x_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# Initialize model, loss function, and optimizer
model = SimpleCNN().to(device='cuda')
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

scaler = torch.cuda.amp.GradScaler()

# Training loop
num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels=inputs.to(device="cuda"), labels.to(device="cuda")
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()


        running_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_accuracy = correct / total

    # Validation loop
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for val_inputs, val_labels in val_loader:
            val_outputs = model(val_inputs)
            val_loss += criterion(val_outputs, val_labels).item() * val_inputs.size(0)
            _, val_predicted = torch.max(val_outputs, 1)
            val_total += val_labels.size(0)
            val_correct += (val_predicted == val_labels).sum().item()

    val_loss = val_loss / len(val_loader.dataset)
    val_accuracy = val_correct / val_total

    # Print results for the current epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], "
          f"Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_accuracy:.4f}, "
          f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")


Performing the binary split to create a new dataset

In [None]:
# Using the mapping from class codes to class names
file = open("mapping.txt", "r")
content = file.readlines()
book = []
for el in content:
    book.append(el.split())



In [None]:
# Creating a dictionary for the classes
dic = {}
for i in range(len(labels14)):
    for el in book:
        if labels14[i] == el[0]:
            dic[i] = el[2]

print(dic)


In [None]:
# Now creating the new dataset containing the classes objects and animals
obj = [5,36, 18, 15, 6]
animal = [8, 12, 21, 35, 39]

# First objects
obj_id = []
for i in range(len(dataset14)):
    if dataset14[i]['label'] in obj:
        obj_id.append(i)
data_obj = [dataset14[i]['eeg'] for i in obj_id]

print(len(data_obj))

# Then animals
an_id = []
for i in range(len(dataset14)):
    if dataset14[i]['label'] in animal:
        an_id.append(i)
data_an = [dataset14[i]['eeg'] for i in an_id]

print(len(data_obj), len(data_an))
# Combining the classes
dataset_bin_prov = data_obj + data_an
labels_bin = [0 for x in range(1494)]+[1 for x in range(1500)]
dataset_bin = []
for el in dataset_bin_prov:
    dataset_bin.append(el[:,40:490])
print(dataset_bin[0].shape)

In [19]:
# ID Split for binary classification
generator1 = torch.Generator().manual_seed(42)
train_id, val_id, test_id = torch.utils.data.random_split(range(len(dataset_bin)), [0.7, 0.15, 0.15], generator=generator1)


In [None]:
# Dataset split for binary LSTM
X_train = [dataset_bin[i] for i in train_id]
X_val = [dataset_bin[i] for i in val_id]
X_test = [dataset_bin[i] for i in test_id]
y_train = np.array([labels_bin[i] for i in train_id])
y_val = np.array([labels_bin[i] for i in val_id])
y_test = np.array([labels_bin[i] for i in test_id])

# Convert the list of torch tensors to a numpy array
X_train = np.array([tensor.numpy() for tensor in X_train])
X_val = np.array([tensor.numpy() for tensor in X_val])
X_test = np.array([tensor.numpy() for tensor in X_test])

print(len(X_train))
print(X_train[0].shape)


In [None]:
# Bi-LSTM model for binary classification
input_shape = (128,450)
input_layer = tf.keras.Input(shape=input_shape)

def bi_lstm_block(x, units, return_sequences=True):
    x = Bidirectional(LSTM(units, return_sequences = return_sequences))(x)
    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    return x

#LSTM blocks
x = bi_lstm_block(input_layer, 128)
x = bi_lstm_block(x, 128)
x = bi_lstm_block(x, 128)
x = bi_lstm_block(x, 128)
x = bi_lstm_block(x, 128, return_sequences=False)


# Dense blocks
x = Dense(128, activation='relu')(x)
x = Dense(64, activation='relu')(x)
x = Dense(32, activation='relu')(x)

output_layer = Dense(1, activation='sigmoid')(x)

model = keras.Model(inputs=input_layer, outputs = output_layer)
model.summary()



In [23]:
# for saving metrica
class MetricsCallback(Callback):
    def __init__(self):
        super(MetricsCallback, self).__init__()
        self.train_losses = []
        self.train_accuracies = []
        self.val_losses = []
        self.val_accuracies = []

    def on_epoch_end(self, epoch, logs=None):
        self.train_losses.append(logs['loss'])
        self.train_accuracies.append(logs['accuracy'])
        self.val_losses.append(logs['val_loss'])
        self.val_accuracies.append(logs['val_accuracy'])

metrics_callback = MetricsCallback()


In [None]:
# Training of the Bi-LSTM model for 2 classes
optimizer = Adam(learning_rate=0.0001, clipnorm=1.0)
model.compile(loss='binary_crossentropy', optimizer=optimizer, metrics=['accuracy'])


print("Shape of X_train:", X_train.shape)
print("Shape of y_train:", y_train.shape)

history = model.fit(X_train, y_train,
                    batch_size=64,
                    epochs=250,
                    validation_data=(X_val, y_val), callbacks=[metrics_callback],verbose=1)


test_loss, test_accuracy = model.evaluate(X_test, y_test)
print("Test Loss:", test_loss)
print("Test Accuracy:", test_accuracy)

# save training and validation metrics to text file
with open('metrics.txt', 'w') as f:
    f.write("Epoch\tTrain Loss\tTrain Accuracy\tVal Loss\tVal Accuracy\n")
    for epoch, (train_loss, train_accuracy, val_loss, val_accuracy) in enumerate(zip(metrics_callback.train_losses,
                                                                                      metrics_callback.train_accuracies,
                                                                                      metrics_callback.val_losses,
                                                                                      metrics_callback.val_accuracies)):
        f.write(f"{epoch+1}\t{train_loss}\t{train_accuracy}\t{val_loss}\t{val_accuracy}\n")

# save test metrics
with open('metrics.txt', 'a') as f:
    f.write(f"\nTest Loss: {test_loss}\n")
    f.write(f"Test Accuracy: {test_accuracy}\n")

CNN for binary classification

In [26]:
# Dataset split for binary CNN
generator1 = torch.Generator().manual_seed(42)
train_id, val_id = torch.utils.data.random_split(range(len(dataset_bin)), [0.8, 0.2], generator=generator1)

y_train = torch.tensor([labels_bin[i] for i in train_id])

y_val = torch.tensor([labels_bin[i] for i in val_id])
x_train = [dataset_bin[i] for i in train_id]
x_train = torch.stack(x_train, dim = 0)
x_val = [dataset_bin[i] for i in val_id]
x_val = torch.stack(x_val, dim = 0)


x_train = x_train.to(device="cuda")
x_val = x_val.to(device="cuda")
y_train = y_train.to(device='cuda')
y_val = y_val.to(device='cuda')

In [None]:
# training CNN for binary classification
torch.cuda.empty_cache()


class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv1d(128, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(64, 128, kernel_size=3, padding=1)
        self.conv3 = nn.Conv1d(128, 256, kernel_size=3, padding=1)
        self.conv4 = nn.Conv1d(256, 512, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(2)
        self.dropout = nn.Dropout(0.5)

        self.flatten_size = self._get_flatten_size()
        self.fc1 = nn.Linear(self.flatten_size, 1024, bias=True)
        self.fc2 = nn.Linear(1024, 1, bias=True)

    def _get_flatten_size(self):
        x = torch.randn(1, 128, 450)
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = self.pool(torch.relu(self.conv3(x)))
        x = self.pool(torch.relu(self.conv4(x)))
        return x.numel()

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = self.pool(x)
        x = torch.relu(self.conv2(x))
        x = self.pool(x)
        x = torch.relu(self.conv3(x))
        x = self.pool(x)
        x = torch.relu(self.conv4(x))
        x = self.pool(x)
        x = x.view(x.size(0), -1)
        x = self.dropout(torch.relu(self.fc1(x)))
        x = torch.sigmoid(self.fc2(x))
        return x

device = "cuda"


y_train = y_train.float().unsqueeze(1)
y_val = y_val.float().unsqueeze(1)

train_dataset = TensorDataset(x_train, y_train)
val_dataset = TensorDataset(x_val, y_val)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)

# Initialize model, loss function, and optimizer
model = SimpleCNN().to(device='cuda')
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

scaler = torch.cuda.amp.GradScaler()

num_epochs = 100
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * inputs.size(0)
        predicted = (outputs > 0.5).float()
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    epoch_loss = running_loss / len(train_loader.dataset)
    epoch_accuracy = correct / total

    # Validation loop
    model.eval()
    val_loss = 0.0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for val_inputs, val_labels in val_loader:
            val_inputs, val_labels = val_inputs.to(device), val_labels.to(device)
            val_outputs = model(val_inputs)
            val_loss += criterion(val_outputs, val_labels).item() * val_inputs.size(0)
            val_predicted = (val_outputs > 0.5).float()
            val_total += val_labels.size(0)
            val_correct += (val_predicted == val_labels).sum().item()

    val_loss = val_loss / len(val_loader.dataset)
    val_accuracy = val_correct / val_total

    # Print results for the current epoch
    print(f"Epoch [{epoch+1}/{num_epochs}], "
          f"Train Loss: {epoch_loss:.4f}, Train Accuracy: {epoch_accuracy:.4f}, "
          f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_accuracy:.4f}")
