In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
import pickle as pkl
from tensorflow import keras
import tensorflow.keras.backend as K
from keras.models import Sequential,Model
from keras.models import Sequential 
from keras.layers import Dense, Dropout, Flatten, Input, Reshape, BatchNormalization, Bidirectional, LSTM, Multiply, Activation, Normalization, TimeDistributed, Lambda
from keras.layers import Conv3D, Conv2D, Conv1D, MaxPool3D, MaxPool2D, MaxPool1D, AvgPool3D, AvgPool2D, AvgPool1D, GlobalMaxPool3D, Attention
from keras.layers import GlobalMaxPool2D, GlobalMaxPool1D, SpatialDropout3D, SpatialDropout2D, SpatialDropout1D, GlobalAvgPool3D, MultiHeadAttention
from keras.layers import GlobalAvgPool2D, GlobalAvgPool1D, SeparableConv2D, MaxPooling2D, SeparableConv1D, Add, Concatenate, LeakyReLU, ELU, Activation, PReLU
from tensorflow.keras.optimizers import Adam
from tensorflow.keras import regularizers
from keras.utils import np_utils
from sklearn.model_selection import train_test_split, StratifiedKFold, KFold
from sklearn.metrics import confusion_matrix, f1_score, accuracy_score, precision_recall_fscore_support
from sklearn.utils.class_weight import compute_class_weight
import gc

# **Data Loading**

In [2]:
from google.colab import drive
drive.mount('/content/drive/')
gc.collect()

Mounted at /content/drive/


52

In [3]:
input_path='/content/drive/MyDrive/data_preprocessed_python/'

In [14]:
eeg_signal = []
valence = []
arousal = []
window_size = 256
skip = 256

for person in range(7,8):
  print('Person No.' + str(person))
  
  
  address = input_path+'s'+str(person)+'.dat'

  with open(address, 'rb') as file:
    data = pkl.load(file, encoding = 'latin1')

  eeg = data['data']
  label = data['labels']
  
  # Assigning classes
  label[label<5] = 0
  label[label>=5] = 1

  val = label.T[0] # Valence label
  aro = label.T[1] # Arousal label

  del data, label
  # Iterating through 40 vidoes/trials
  for i in range(40):
    sig = eeg[i].T
    sig = sig[384:, :32]
    # Segmenting into 3 seconds (384 timesteps) windows without overlap
    start = 0
    while start + window_size <=sig.shape[0]:
      eeg_signal.append(sig[start:start+window_size,:])
      valence.append(val[i])
      arousal.append(aro[i])
      start += skip 
  del eeg, val, aro, sig
#eeg_signal = np.array(eeg_signal)
#print(eeg_signal.shape)
eeg_signal = np.reshape(eeg_signal,[-1,256,32])
data = np.asarray(eeg_signal, dtype = np.float32) # Using 32 bit floating point value to save memory
del eeg_signal
valence = np.asarray(valence, dtype = np.int8)
arousal = np.asarray(arousal, dtype = np.int8)

print(data.shape)
print(valence.shape, valence[valence == 0].shape, valence[valence == 1].shape)
print(arousal.shape, arousal[arousal == 0].shape, arousal[arousal == 1].shape)

valence = np_utils.to_categorical(valence)
arousal = np_utils.to_categorical(arousal)
gc.collect()

Person No.7
(1200, 256, 32)
(1200,) (360,) (840,)
(1200,) (450,) (750,)


0

In [15]:
 X_train, x_test, Y_train, y_test = train_test_split(data, valence, test_size=0.1, random_state=4)
 print(X_train.shape,x_test.shape,Y_train.shape,y_test.shape)

(1080, 256, 32) (120, 256, 32) (1080, 2) (120, 2)


## Previous 2D Model

In [19]:
def ReshapeLayer(x):
    
    shape = x.shape
    
    reshape = Reshape((shape[1],shape[2],1))(x)
    
    
    return reshape
def get_CNN_model(x):
  x = Bidirectional(LSTM(32, return_sequences=True))(x)
  x = Reshape((-1, x.shape[2],1), name='Reshape_1')(x)
  x = Conv2D(filters=16, kernel_size=(5,5), strides=(2,2), padding='same', name = 'Conv_1')(x)
  x = BatchNormalization()(x)
  x = MaxPooling2D(pool_size=(2,2))(x)
  x = Dropout(0.2)(x)
  x = Conv2D(filters=32, kernel_size=(7,7), strides=(2,2), padding='same', name = 'Conv_2')(x)
  x = BatchNormalization()(x)
  x = MaxPooling2D(pool_size=(2,2))(x)
  x = Dropout(0.2)(x)
  x = Conv2D(filters=64, kernel_size=(9,9), strides=(2,2), padding='same', name = 'Conv_3')(x)
  x = BatchNormalization()(x)
  x = MaxPooling2D(pool_size=(1,1))(x)
  x = Dropout(0.2)(x)
  x = Flatten()(x)
  x = Dense(256,activation='tanh')(x)
  x = Dropout(0.4)(x)
  x = Dense(32,activation='relu')(x)
  x = Dropout(0.4)(x)
  x = Dense(2,activation='softmax')(x)
  return x
def get_model():
  input = Input(shape = (256,32))
  output = get_CNN_model(input)
  model_a = Model(input,output)
  opt = keras.optimizers.Adam(learning_rate=0.001)
  model_a.compile(loss=keras.losses.categorical_crossentropy, metrics=["accuracy"], optimizer=opt)
  return model_a

model = get_model()
model.summary()

Model: "model_2"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_3 (InputLayer)        [(None, 256, 32)]         0         
                                                                 
 bidirectional_2 (Bidirectio  (None, 256, 64)          16640     
 nal)                                                            
                                                                 
 Reshape_1 (Reshape)         (None, 256, 64, 1)        0         
                                                                 
 Conv_1 (Conv2D)             (None, 128, 32, 16)       416       
                                                                 
 batch_normalization_6 (Batc  (None, 128, 32, 16)      64        
 hNormalization)                                                 
                                                                 
 max_pooling2d_6 (MaxPooling  (None, 64, 16, 16)       0   

In [20]:
batch_size = 10
epochs = 200
kfold = KFold(n_splits=10, random_state=1, shuffle=True)
gc.collect()

9856

In [None]:
val_res = {'accuracy': [], 'f1_score': [], 'confusion_matrix': []}
foldNum=0
#model = get_model()
for train_index, val_index in kfold.split(X_train, Y_train):
  foldNum = foldNum + 1
  model_a = get_model()
  print("Results for fold",foldNum)
  x_train, x_val = X_train[train_index], X_train[val_index]
  y_train, y_val = Y_train[train_index], Y_train[val_index]
  print(x_train.shape)
  opt = keras.optimizers.Adam(learning_rate=0.001)
  model_a.compile(optimizer=opt, loss='categorical_crossentropy', metrics=["accuracy"])
  model_a.fit(x_train, y_train, epochs=50, batch_size=batch_size, verbose=1, validation_data=(x_val, y_val),shuffle=True)
  acc = model_a.evaluate(x_test, y_test)
  print(acc)
  val_res['accuracy'].append(acc)
  pred = model_a.predict(x_test)
  f1scr = f1_score(y_test.argmax(1), pred.argmax(1), average='macro')
  print(f1scr)
  val_res['f1_score'].append(f1scr)
  val_res['confusion_matrix'].append(confusion_matrix(y_test.argmax(1), pred.argmax(1)))
  gc.collect() # Garbage collecter
  del x_train, x_val, y_train, y_val, acc, f1scr
  gc.collect()

Results for fold 1
(972, 256, 32)
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50
[1.1701641082763672, 0.6833333373069763]
0.5052083333333333
Results for fold 2
(972, 256, 32)
Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Ep

In [None]:
a = []
f1 = []
for i in val_res['accuracy']:
  print(round(i[1]*100, 2))# Rounding off to two decimal places
  a.append(i[1]*100)
a = np.array(a)
print(np.mean(a))
for i in val_res['confusion_matrix']:
  print(i)
for i in val_res['f1_score']:
  print(i*100)
  f1.append(i)
f1 = np.array(f1)
print(np.mean(f1))


# **Arousal**

In [None]:
#valence
X_train, x_test, Y_train, y_test = train_test_split(data,arousal, test_size=0.1, random_state=4)
print(X_train.shape,x_test.shape,Y_train.shape,y_test.shape)

In [None]:
val_res = {'accuracy': [], 'f1_score': [], 'confusion_matrix': []}
foldNum=0
#model = get_model()
for train_index, val_index in kfold.split(X_train, Y_train):
  foldNum = foldNum + 1
  model = get_model()
  print("Results for fold",foldNum)
  x_train, x_val = X_train[train_index], X_train[val_index]
  y_train, y_val = Y_train[train_index], Y_train[val_index]
  print(x_train.shape)
  model_a.compile(optimizer=opt, loss='categorical_crossentropy', metrics=["accuracy"])
  model_a.fit(x_train, y_train, epochs=200, batch_size=batch_size, verbose=1, validation_data=(x_val, y_val),shuffle=True)
  acc = model_a.evaluate(x_test, y_test)
  print(acc)
  val_res['accuracy'].append(acc)
  pred = model_a.predict(x_test)
  f1scr = f1_score(y_test.argmax(1), pred.argmax(1), average='macro')
  print(f1scr)
  val_res['f1_score'].append(f1scr)
  val_res['confusion_matrix'].append(confusion_matrix(y_test.argmax(1), pred.argmax(1)))
  gc.collect() # Garbage collecter
  del x_train, x_val, y_train, y_val, acc, f1scr
  gc.collect()

In [None]:
a = []
f1 = []
for i in val_res['accuracy']:
  print(round(i[1]*100, 2))# Rounding off to two decimal places
  a.append(i[1]*100)
a = np.array(a)
print(np.mean(a))
for i in val_res['confusion_matrix']:
  print(i)
for i in val_res['f1_score']:
  print(i*100)
  f1.append(i)
f1 = np.array(f1)
print(np.mean(f1))
