In [None]:
import tensorflow as tf
!pip install tensorflow-addons
!pip install mne
import tensorflow_addons as tfa
from tqdm.notebook import tqdm

from tensorflow.keras.models import Model, Sequential, load_model
from tensorflow.keras import regularizers
from tensorflow.keras.layers import Input, Dense, Activation, Dropout, SpatialDropout1D, SpatialDropout2D, BatchNormalization
from tensorflow.keras.layers import Flatten, InputSpec, Layer, Concatenate, AveragePooling2D, MaxPooling2D, Reshape, Permute
from tensorflow.keras.layers import Conv2D, SeparableConv2D, DepthwiseConv2D, LayerNormalization
from tensorflow.keras.layers import TimeDistributed, Lambda, AveragePooling1D, Add, Conv1D, Multiply
from tensorflow.keras.layers import Conv2D, DepthwiseConv2D, BatchNormalization, Activation, Concatenate, Input, AveragePooling2D, Dropout, Flatten, Dense, LSTM
from tensorflow.keras.models import Model
from tensorflow.keras.constraints import max_norm, unit_norm 
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.optimizers import Adam
from tensorflow_addons.layers import WeightNormalization
from tensorflow.keras.utils import plot_model

import random
import mne
import matplotlib.pyplot as plt
import sklearn
from sklearn.metrics import silhouette_score, confusion_matrix

import pandas as pd
import numpy as np
from glob import glob

# Model

In [None]:
Fs = 256             # Sampling frequency
n_channels = 4       # Number of channels
Wn = 1               # Sampling window duration
n_samples = Wn*Fs    # sampling window length per channel

n_ff = [2,4,8,16]    # Number of frequency filters for each inception module of EEG-ITNet
n_sf = [1,1,1,1]     # Number of spatial filters in each frequency sub-band of EEG-ITNet
batch_size = 32 
epochs = 500

In [None]:
def Hybrid_CNN_LSTM(Chans, Samples, out_class=3, drop_rate=0.2):
    Input_block = Input(shape=(Chans, Samples, 1))
    
    # CNN
    block1 = Conv2D(32, (1, 16), activation='relu', padding='same', name='Conv1')(Input_block)
    block1 = BatchNormalization()(block1)
    block1 = DepthwiseConv2D((Chans, 1), activation='relu', depth_multiplier=1, padding='valid', name='DepthConv1')(block1)
    block1 = BatchNormalization()(block1)
    block1 = Activation('elu')(block1)

    block2 = Conv2D(64, (1, 32), activation='relu', padding='same', name='Conv2')(Input_block)
    block2 = BatchNormalization()(block2)
    block2 = DepthwiseConv2D((Chans, 1), activation='relu', depth_multiplier=1, padding='valid', name='DepthConv2')(block2)
    block2 = BatchNormalization()(block2)
    block2 = Activation('elu')(block2)

    block3 = Conv2D(128, (1, 64), activation='relu', padding='same', name='Conv3')(Input_block)
    block3 = BatchNormalization()(block3)
    block3 = DepthwiseConv2D((Chans, 1), activation='relu', depth_multiplier=1, padding='valid', name='DepthConv3')(block3)
    block3 = BatchNormalization()(block3)
    block3 = Activation('elu')(block3)

    block4 = Conv2D(256, (1, 128), activation='relu', padding='same', name='Conv4')(Input_block)
    block4 = BatchNormalization()(block4)
    block4 = DepthwiseConv2D((Chans, 1), activation='relu', depth_multiplier=1, padding='valid', name='DepthConv4')(block4)
    block4 = BatchNormalization()(block4)
    block4 = Activation('elu')(block4)

    # Concaténation des blocs CNN
    block = Concatenate(axis=-1)([block1, block2, block3, block4])

    # Réduction de dimension et préparation pour LSTM
    lstm_input = AveragePooling2D((1, 4))(block)
    lstm_input = Dropout(drop_rate)(lstm_input)
    lstm_input = Flatten()(lstm_input)
    lstm_input = Dense(128, activation='relu')(lstm_input)
    lstm_input = Dropout(drop_rate)(lstm_input)
    
    # Ajout de LSTM
    lstm_output = LSTM(64, return_sequences=False)(tf.expand_dims(lstm_input, axis=1))

    # Partie classification
    out = Dense(out_class, activation='softmax')(lstm_output)

    # Création du modèle
    model = Model(inputs=Input_block, outputs=out)
    return model

# Data Preparation

Train Data

In [None]:
import pandas as pd
files1 = glob("main/record/Train/*gauche*")                                           
files2 = glob("main/record/Train/*droite*")                                           
files3 = glob("main/record/Train/*neutre*")


dfs1 = []
dfs2 = []
dfs3 = []

for f in files1:
    df = pd.read_csv(f)
    cols_remove = ['timestamps', 'Right AUX']
    df = df.loc[:, ~df.columns.isin(cols_remove)]
    df.columns = df.columns.str.replace('RAW_', '', 1)
    df = df.fillna(df.mean())
    dfs1.append(df)  

data1 = pd.concat(dfs1, ignore_index=True)


for f in files2:
    df = pd.read_csv(f)
    cols_remove = ['timestamps', 'Right AUX']
    df = df.loc[:, ~df.columns.isin(cols_remove)]
    df.columns = df.columns.str.replace('RAW_', '', 1)
    df = df.fillna(df.mean())
    dfs2.append(df)  

data2 = pd.concat(dfs2, ignore_index=True)

for f in files3:
    df = pd.read_csv(f)
    cols_remove = ['timestamps', 'Right AUX']
    df = df.loc[:, ~df.columns.isin(cols_remove)]
    df.columns = df.columns.str.replace('RAW_', '', 1)
    df = df.fillna(df.mean())
    dfs3.append(df)  

data3 = pd.concat(dfs3, ignore_index=True)

In [None]:
print(data1)
print(data1.shape)

MNE Epochs Function

In [None]:
def convertDF2MNE(sub):
    info = mne.create_info(list(sub.columns), ch_types=['eeg'] * len(sub.columns), sfreq=256)
    info.set_montage('standard_1020')
    data=mne.io.RawArray(sub.T, info)
    data.set_eeg_reference()
    epochs=mne.make_fixed_length_epochs(data,duration=Wn,overlap=0.2*Wn)
    return epochs.get_data()

In [None]:
x_left = np.empty((0, n_channels, n_samples))
y_left = np.empty(0)


x_right = np.empty((0, n_channels, n_samples))
y_right = np.empty(0)

x_neutral = np.empty((0, n_channels, n_samples))
y_neutral = np.empty(0)



data = convertDF2MNE(data1)
for i in enumerate(data):
  label=0
  y_left=np.append(y_left,label)
x_left = np.append(x_left,data,axis=0)

data = convertDF2MNE(data2)
for i in enumerate(data):
  label=1
  y_right=np.append(y_right,label)
x_right = np.append(x_right,data,axis=0)


data = convertDF2MNE(data3)
for i in enumerate(data):
  label=2
  y_neutral=np.append(y_neutral,label)
x_neutral = np.append(x_neutral,data,axis=0)

In [None]:
print(data.shape)
print(x_right.shape, y_right.shape)
print(x_left.shape, y_left.shape)
print(x_neutral.shape, y_neutral.shape)

Train, Validation Split

In [None]:
x_left = x_left[:,:,:,np.newaxis]
x_right = x_right[:,:,:,np.newaxis]
x_neutral = x_neutral[:,:,:,np.newaxis]
split = 64

x_train = np.concatenate((x_left[split:,:,:,:], x_right[split:,:,:,:], x_neutral[split:,:,:,:]), axis=0)
x_val = np.concatenate((x_left[:split,:,:,:], x_right[:split,:,:,:], x_neutral[split:,:,:,:]), axis=0)

y_train = np.concatenate((y_left[split:], y_right[split:], y_neutral[split:]))
y_val = np.concatenate((y_left[:split], y_right[:split], y_neutral[split:]))


In [None]:
np.argwhere(np.isnan(x_train))

Test Data

In [None]:
import pandas as pd
files1 = glob("main/record/Test/*gauche*")                                           
files2 = glob("main/record/Test/*droite*")                                           
files3 = glob("main/record/Test/*neutre*")

dfs1 = []
dfs2 = []
dfs3 = []

for f in files1:
    df = pd.read_csv(f)
    cols_remove = ['timestamps', 'Right AUX']
    df = df.loc[:, ~df.columns.isin(cols_remove)]
    df.columns = df.columns.str.replace('RAW_', '', 1)
    df = df.fillna(df.mean())
    dfs1.append(df)  

data1_test = pd.concat(dfs1, ignore_index=True)
# print(data1)

for f in files2:
    df = pd.read_csv(f)
    cols_remove = ['timestamps', 'Right AUX']
    df = df.loc[:, ~df.columns.isin(cols_remove)]
    df.columns = df.columns.str.replace('RAW_', '', 1)
    df = df.fillna(df.mean())
    dfs2.append(df)  

data2_test = pd.concat(dfs2, ignore_index=True)

for f in files3:
    df = pd.read_csv(f)
    cols_remove = ['timestamps', 'Right AUX']
    df = df.loc[:, ~df.columns.isin(cols_remove)]
    df.columns = df.columns.str.replace('RAW_', '', 1)
    df = df.fillna(df.mean())
    dfs3.append(df)  

data3_test = pd.concat(dfs3, ignore_index=True)

In [None]:
print(data1)
print(data1.shape)

In [None]:
x_test = np.empty((0, n_channels, n_samples))
y_test = np.empty(0)


# Gauche
data = convertDF2MNE(data1_test)
for _ in enumerate(data):
    label = 0
    y_test = np.append(y_test, label)
x_test = np.concatenate((x_test, data), axis=0)

# Droite
data = convertDF2MNE(data2_test)
for _ in enumerate(data):
    label = 1
    y_test = np.append(y_test, label)
x_test = np.concatenate((x_test, data), axis=0)

# Neutre
data = convertDF2MNE(data3_test)
for _ in enumerate(data):
    label = 2  # Correction de l'étiquette neutre
    y_test = np.append(y_test, label)
x_test = np.concatenate((x_test, data), axis=0)

x_test = x_test[:, :, :, np.newaxis]


In [None]:
x_left = np.empty((0, n_channels, n_samples))
y_left = np.empty(0)


x_right = np.empty((0, n_channels, n_samples))
y_right = np.empty(0)

x_neutral = np.empty((0, n_channels, n_samples))
y_neutral = np.empty(0)



data = convertDF2MNE(data1)
for i in enumerate(data):
  label=0
  y_left=np.append(y_left,label)
x_left = np.append(x_left,data,axis=0)

data = convertDF2MNE(data2)
for i in enumerate(data):
  label=1
  y_right=np.append(y_right,label)
x_right = np.append(x_right,data,axis=0)


data = convertDF2MNE(data3)
for i in enumerate(data):
  label=2
  y_neutral=np.append(y_neutral,label)
x_neutral = np.append(x_neutral,data,axis=0)

In [None]:
print(data.shape)
print(x_right.shape, y_right.shape)
print(x_left.shape, y_left.shape)
print(x_neutral.shape, y_neutral.shape)

# Training

In [None]:
class DataLoader(tf.keras.utils.Sequence):
    def __init__(self, images, labels, batch_size=32, shuffle=True):
        super().__init__()
        self.images = images
        self.labels = labels
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.key_array = np.arange(self.images.shape[0], dtype=np.uint32)
        self.on_epoch_end()

    def __len__(self):
        return len(self.key_array)//self.batch_size

    def __getitem__(self, index):
        keys = self.key_array[index*self.batch_size:(index+1)*self.batch_size]
        x = np.asarray(self.images[keys], dtype=np.float32)
        y = np.asarray(self.labels[keys], dtype=np.float32)
        return x, y

    def on_epoch_end(self):
        if self.shuffle:
            self.key_array = np.random.permutation(self.key_array)

In [None]:
dataloader = DataLoader(images=x_train, labels=y_train, batch_size=32, shuffle=True)
n_batches = len(dataloader)

In [None]:
optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
ce_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=False)

loss_train = np.zeros(shape=(epochs,), dtype=np.float32)
acc_train = np.zeros(shape=(epochs,), dtype=np.float32)
loss_val = np.zeros(shape=(epochs,))
acc_val = np.zeros(shape=(epochs,))

In [None]:
model = Hybrid_CNN_LSTM(Chans=n_channels,Samples=n_samples)
model.compile(optimizer=optimizer,loss=ce_loss)
model.summary()

In [None]:
x_train.shape, y_train.shape

In [None]:
for epoch in range(epochs):
  epoch_loss_avg = tf.keras.metrics.Mean() # Keeping track of the training loss
  epoch_acc_avg = tf.keras.metrics.Mean() # Keeping track of the training accuracy

  print('==== Epoch #{0:3d} ===='.format(epoch))

  for batch in tqdm(range(n_batches)):
    x, y = dataloader[batch]

    with tf.GradientTape() as tape: # Forward pass
      y_ = model(x, training=True)
      loss = ce_loss(y_true=y, y_pred=y_)

    grad = tape.gradient(loss, model.trainable_variables) # Backpropagation
    optimizer.apply_gradients(zip(grad, model.trainable_variables)) # Update network weights

    epoch_loss_avg(loss)
    epoch_acc_avg(sklearn.metrics.accuracy_score(y_true=y, y_pred=np.argmax(y_, axis=-1)))
    
  dataloader.on_epoch_end()

  loss_train[epoch] = epoch_loss_avg.result()
  acc_train[epoch] = epoch_acc_avg.result()

  print('---- Training ----')
  print('Loss  =  {0:.3f}'.format(loss_train[epoch]))
  print('Acc   =  {0:.3f}'.format(acc_train[epoch]))

  y_ = model.predict(x_val) # Validation predictions
  loss_val[epoch] = ce_loss(y_true=y_val, y_pred=y_).numpy()
  acc_val[epoch] = sklearn.metrics.accuracy_score(y_true=y_val, y_pred=np.argmax(y_, axis=-1))

  print('--- Validation ---')
  print('Loss  =  {0:.3f}'.format(loss_val[epoch]))
  print('Acc   =  {0:.3f}'.format(acc_val[epoch]))

In [None]:
model.save("main/model/model1.h5")

In [None]:
# Training 
acc= sum(acc_train)/len(acc_train)
print(f'Training Accuracy : {acc}')

# Validation
acval = sum(acc_val)/ len(acc_val)
print(f'Validation Accuracy : {acval}')

In [None]:
# Loss Plotting
plt.figure(figsize=(12,4))
plt.subplot(1,2,1)
plt.plot(loss_train, label = 'Training Loss')
plt.plot(loss_val, label= 'Validation Loss')
plt.title('Model Loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['Training', 'Validation'], loc='upper left')
plt.show()

# Accuracy Plotting
plt.figure(figsize=(12,4))
plt.subplot(1,2,1)
plt.plot(acc_train, label = 'Training Accuracy')
plt.plot(acc_val, label= 'Validation Accuracy')
plt.title('Model Accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['Training', 'Validation'], loc='upper left')
plt.show()

# Testing

In [None]:
test_model = tf.keras.models.load_model("main/model/model1.h5")

# Model Architecture 
test_model.summary()

In [None]:
y_ = test_model.predict(x_test)  

loss_val[0] = ce_loss(y_true=y_test, y_pred=y_).numpy()
acc_val[0] = sklearn.metrics.accuracy_score(y_true=y_test, y_pred=np.argmax(y_, axis=-1))

for i in range(x_test.shape[0]):
    if y_[i][0] > y_[i][1] and y_[i][0] > y_[i][2]:  
        print('Predicted value : 0 with accuracy = {0:.3f}'.format(y_[i][0]))
    elif y_[i][1] > y_[i][0] and y_[i][1] > y_[i][2]:  
        print('Predicted value : 1 with accuracy = {0:.3f}'.format(y_[i][1]))
    else: 
        print('Predicted value : 2 with accuracy = {0:.3f}'.format(y_[i][2]))
    print('Actual value = {}\n'.format(y_test[i]))

print('--- Test ---')
print('Loss  =  {0:.3f}'.format(loss_val[0]))
print('Acc   =  {0:.3f}'.format(acc_val[0]))