In [None]:
# EEG Classification BCI Task
# Computational Intelligence Course Final Project
# Armin Panjehpour - 98101288

In [1]:
# Accessing google drive

from google.colab import drive
drive.mount('/content/gdrive')
root_path = 'gdrive/My Drive/Colab Notebooks/CI_Project_Dataset/'  


Mounted at /content/gdrive


In [120]:
import numpy as np
import matplotlib.pyplot as plt
import math
import torch
from scipy.io import loadmat
from scipy.fft import fft
from scipy import stats
import tensorflow as tf
from tensorflow import keras

In [3]:
# Load Dataset
Data = mat = loadmat('/content/gdrive/MyDrive/Colab Notebooks/CI_Project_Dataset/CI_Project_data.mat')
print(Data.keys())
print('Dataset loaded!')

dict_keys(['__header__', '__version__', '__globals__', 'TestData', 'TrainData', 'TrainLabel'])
Dataset loaded!


In [156]:
# Training and Test Dataset
Train_Data = Data['TrainData']
Test_Data = Data['TestData']

# Training Dataset Labels
Train_Label = np.squeeze(Data['TrainLabel'])-1

# Dataset Size
print('Train_Data: ',Train_Data.shape)
print('Test_Data: ',Test_Data.shape)
print('Train_Label: ',Train_Label.shape)


print('Train\Test Data Created!')

Train_Data:  (30, 384, 120)
Test_Data:  (30, 384, 40)
Train_Label:  (120,)
Train\Test Data Created!


In [5]:
# Sampling Rate

Trial_Time = 1.5 # in seconds
Fs = Train_Data.shape[1]/Trial_Time

print('Sampling Rate is: ',Fs)

Sampling Rate is:  256.0


In [None]:
############################################ Calculate Different Features for Each Channel ############################################

In [10]:
def Var_Feature(data):
  # variance of each channel
  # dim1: channel number, dim2: time, dim3: trial number
  var = np.var(data, axis=1)
  return var

In [11]:
def amp_hist_Feature(data, n_bins, min_amp, max_amp):
  # amplitude histogram
  # dim1: channel number, dim2: time, dim3: trial number
  n_channels = data.shape[0]
  n_trials = data.shape[-1]
  
  hist_vals = np.zeros((n_channels,n_trials,n_bins))
  for i in range(n_channels):
    for j in range(n_trials):
      in_range = np.asarray(np.where((np.logical_and(1 <= data[i,:,j], data[i,:,j] <= 2)) == True))
      selected_chan_data = data[i, in_range, j]
      hist_vals[i,j,:] = np.histogram(selected_chan_data, n_bins)[0]
      
  
  return np.asarray(hist_vals)

In [12]:
def AR_Coeffs(data, order):
  # autoregressive model coefficients
  # dim1: channel number, dim2: time, dim3: trial number
  n_trials = data.shape[-1]
  n_samples = data.shape[1]
  n_channels = data.shape[0]
  Y = np.zeros((n_channels, n_samples-order, n_trials))
  Y = data[:, order:n_samples, :]

  X = np.zeros((n_trials, n_channels, n_samples-order, order+1))
  X[:,:,:,0] = 1

  Coeffs = np.zeros((n_trials, n_channels, order+1))

  for i in range(n_trials):
    for j in range(n_channels):
      for k in range(n_samples-order):
        for z in range(order):
          if(k-z >= 0):
            X[i,j,k,order-z-1] = data[j,k-z,i]
          else:
            X[i,j,k,order-z-1] = 0

      a = ((X[i,j,:,:].T @ X[i,j,:,:]) + np.asarray(0.00001*np.random.random((order+1, order+1))))
      Coeffs[i,j,:] = np.linalg.inv(a) @ (X[i,j,:,:].T) @ Y[j,:,i]

  return Coeffs

In [13]:
def FF_Feature(data):
  # form factor
  # dim1: channel number, dim2: time, dim3: trial number
  signal_std = np.std(data, axis=1)
  first_deriv_Std = np.std(np.diff(data, axis=1), axis=1)
  second_deriv_Std = np.std(np.diff(np.diff(data, axis=1), axis=1), axis=1)
  FF = (second_deriv_Std/first_deriv_Std)/(first_deriv_Std/signal_std)

  return FF

In [14]:
def cov_calculator(data1, data2):
  # covariancce calculator between two vector signals
  return np.sum(np.multiply(data1-np.mean(data1),data2-np.mean(data2)))/data1.shape[0]

In [80]:
def cov_Feature(data):
  # covariance between pairs of channels
  # dim1: channel number, dim2: time, dim3: trial number

  n_trials = data.shape[-1]
  n_samples = data.shape[1]
  n_channels = data.shape[0]

  cov_matrix = np.zeros((n_trials,n_channels,n_channels))
  cov_values = np.zeros((n_trials,int((n_channels-1)*n_channels/2)))

  for i in range(n_trials):
    for j in range(n_channels):
      for k in range(j+1):
        selected_data_ch1 = data[j,:,i]
        selected_data_ch2 = data[k,:,i]
        cov_values[i,j+k] = cov_calculator(selected_data_ch1,selected_data_ch2)

  
  return cov_values

In [16]:
def fft_calculator(data, Fs):
  # calculate single side band fft of a vector signal
  L = data.shape[0]
  fft_data = fft(data)
  P2 = np.abs(fft_data/L)
  P1 = P2[0:int(L/2)+1]
  P1[1:-1] = 2*P1[1:-1]

  f = Fs*np.arange(0,L/2+1)/L

  return f, P1

In [17]:
def max_freq_Feature(data, Fs):
  # find the frequency with the maximum amplitude for each channel
  # dim1: channel number, dim2: time, dim3: trial number
  
  n_trials = data.shape[-1]
  n_samples = data.shape[1]
  n_channels = data.shape[0]

  max_freq = np.zeros((n_trials,n_channels))

  for i in range(n_trials):
    for j in range(n_channels):
      selected_data = data[j,:,i]
      f, fft_selected_data = fft_calculator(selected_data, Fs)
      max_freq[i,j] = f[np.argmax(fft_selected_data)]

  return f, max_freq

In [19]:
def mean_freq_Feature(data, Fs):
  # find the normalized weighted mean of frequencies
  # dim1: channel number, dim2: time, dim3: trial number

  n_trials = data.shape[-1]
  n_samples = data.shape[1]
  n_channels = data.shape[0]

  mean_freq = np.zeros((n_trials,n_channels))

  for i in range(n_trials):
    for j in range(n_channels):
      selected_data = data[j,:,i]
      f, fft_selected_data = fft_calculator(selected_data, Fs)

      mean_freq[i,j] = np.sum(np.multiply(f, fft_selected_data))/np.sum(fft_selected_data)


  return f, mean_freq

In [None]:
############################################ Apply the Feature Functions on Training Data ############################################

In [20]:
# get trial numbers of two classes
class1_trialnum = np.squeeze(np.asarray(np.where(np.squeeze(Train_Label) == 1)))
class2_trialnum = np.squeeze(np.asarray(np.where(np.squeeze(Train_Label) == 2)))

print(class1_trialnum.shape, class2_trialnum.shape)

(60,) (60,)


In [54]:
# calculate variance of each trail and channel - one value for each channel and trial
var_feature = Var_Feature(Train_Data)

n_class = 2
var_feature_classes = np.zeros((n_class,var_feature.shape[0],int(var_feature.shape[1]/n_class)))

var_feature_classes[0,:,:] = var_feature[:,class1_trialnum]
var_feature_classes[1,:,:] = var_feature[:,class2_trialnum]


print(var_feature_classes.shape)

(2, 30, 60)


In [55]:
# calculate amplitude histogram - nbin values for each channel
n_bins = 5
min_amp = -10
max_amp = 10
hist_feature = amp_hist_Feature(Train_Data, n_bins, min_amp, max_amp)

n_class = 2
hist_feature_classes = np.zeros((n_class,hist_feature.shape[0],int(hist_feature.shape[1]/n_class),hist_feature.shape[-1]))

hist_feature_classes[0,:,:,:] = hist_feature[:,class1_trialnum,:]
hist_feature_classes[1,:,:,:] = hist_feature[:,class2_trialnum,:]

print(hist_feature_classes.shape)

(2, 30, 60, 5)


In [56]:
# calculate Form Factor -  one value for each channel and trial
FF_feature = FF_Feature(Train_Data)

n_class = 2
FF_feature_classes = np.zeros((n_class,FF_feature.shape[0],int(FF_feature.shape[1]/n_class)))

FF_feature_classes[0,:,:] = FF_feature[:,class1_trialnum]
FF_feature_classes[1,:,:] = FF_feature[:,class2_trialnum]

print(FF_feature_classes.shape)

(2, 30, 60)


In [84]:
# calculate cov_Feature -  one normalized covariance matrix for each trial
cov_feature = cov_Feature(Train_Data)

n_class = 2
cov_feature_classes = np.zeros((n_class,int(cov_feature.shape[0]/n_class),cov_feature.shape[1]))

cov_feature_classes[0,:,:] = cov_feature[class1_trialnum,:]
cov_feature_classes[1,:,:] = cov_feature[class2_trialnum,:]
print(cov_feature_classes.shape)

(2, 60, 435)


In [87]:
# calculate max_freq_Feature -  one max freq for each trail and channel
f, max_freq_feature = max_freq_Feature(Train_Data, Fs)


n_class = 2
max_freq_feature_classes = np.zeros((n_class,int(max_freq_feature.shape[0]/n_class),max_freq_feature.shape[1]))

max_freq_feature_classes[0,:,:] = max_freq_feature[class1_trialnum,:]
max_freq_feature_classes[1,:,:] = max_freq_feature[class2_trialnum,:]


print(max_freq_feature_classes.shape)

(2, 60, 30)


In [63]:
# calculate mean_freq_Feature -  one mean freq for each trail and channel
f, mean_freq_feature = mean_freq_Feature(Train_Data, Fs)

n_class = 2
mean_freq_feature_classes = np.zeros((n_class,int(max_freq_feature.shape[0]/n_class),max_freq_feature.shape[1]))

mean_freq_feature_classes[0,:,:] = mean_freq_feature[class1_trialnum,:]
mean_freq_feature_classes[1,:,:] = mean_freq_feature[class2_trialnum,:]

print(mean_freq_feature_classes.shape)

(2, 60, 30)


In [67]:
def fisher_score_cal(bothclass_data,class1_data,class2_data):
  # calculate fisher score for each channel
  mean_bothclass = np.mean(bothclass_data)
  mean_class1 = np.mean(class1_data)
  mean_class2 = np.mean(class2_data)

  var_class1 = np.var(class1_data)
  var_class2 = np.var(class2_data)

  score = ((mean_bothclass-mean_class1)**2 + (mean_bothclass-mean_class2)**2)/(var_class1+var_class2)

  return score

In [85]:
selected_channel =  1

fisher_score_cal(cov_feature_classes,cov_feature_classes[0,:,:],cov_feature_classes[1,:,:])

0.0007767337301206576

In [None]:
# it seems that non of the features give us high fisher score just by them self

In [187]:
# lets check some features together
group_features_class1 = np.concatenate((var_feature_classes[0,:,:].T,max_freq_feature_classes[0,:,:],mean_freq_feature_classes[0,:,:]), axis=1)
group_features_class2 = np.concatenate((var_feature_classes[1,:,:].T,max_freq_feature_classes[1,:,:],mean_freq_feature_classes[1,:,:]), axis=1)
group_features_both = np.concatenate((group_features_class1,group_features_class2), axis=0)

# normalize feature matrices 
group_features_class1 = stats.zscore(group_features_class1, axis=1)
group_features_class2 = stats.zscore(group_features_class2, axis=1)

print(group_features_class1.shape, group_features_class2.shape, group_features_both.shape)

(60, 90) (60, 90) (120, 90)


In [188]:
# score

# within class matrices
S1 = np.zeros((group_features_class1.shape[1],group_features_class1.shape[1]))
S2 = np.zeros((group_features_class2.shape[1],group_features_class2.shape[1]))

n_trials = 60
n_class = 2
for i in range(n_trials):
  S1 += (group_features_class1[i,:] - np.mean(group_features_class1[i,:])) @ (group_features_class1[i,:] - np.mean(group_features_class1[i,:])).T
  S2 += (group_features_class2[i,:] - np.mean(group_features_class2[i,:])) @ (group_features_class2[i,:] - np.mean(group_features_class2[i,:])).T

S1 /= n_trials
S2 /= n_trials  
print(S1.shape, S2.shape)

Sw = S1+S2

# between class matrix
Sb = np.zeros((group_features_class1.shape[1],group_features_class1.shape[1]))

mean_all = np.expand_dims(np.mean(group_features_both, axis=0), axis=1)
mean_class1 = np.expand_dims(np.mean(group_features_class1, axis=0), axis=1)
mean_class2 = np.expand_dims(np.mean(group_features_class2, axis=0), axis=1)

print(mean_all.shape, mean_class1.shape, mean_class2.shape)

Sb = ((mean_class1-mean_all) @ (mean_class1-mean_all).T) + ((mean_class2-mean_all) @ (mean_class2-mean_all).T)

print(Sb.shape)

# final score
J = np.trace(Sb)/np.trace(Sw)
print('final score is: ', J)

(90, 90) (90, 90)
(90, 1) (90, 1) (90, 1)
(90, 90)
final score is:  20.725104855812365


In [171]:
# CREATE MLP MODEL

# Setting up the layers
from tensorflow.keras.layers import Input, Dense, Activation, Softmax

input_layer_size = group_features_class1.shape[1]
n_class = 2

model = keras.Sequential([
                      Input(shape = (input_layer_size,)), # input layer
                      Dense(units = 128), # hidden layer one
                      Activation(activation = tf.math.tanh),
                      Dense(units = 32), # hidden layer two
                      Activation(activation = tf.math.tanh),
                      Dense(units = 2), # output layer
                      Softmax(axis = 1)
])

model.summary()

Model: "sequential_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_15 (Dense)            (None, 128)               11648     
                                                                 
 activation_10 (Activation)  (None, 128)               0         
                                                                 
 dense_16 (Dense)            (None, 32)                4128      
                                                                 
 activation_11 (Activation)  (None, 32)                0         
                                                                 
 dense_17 (Dense)            (None, 2)                 66        
                                                                 
 softmax_5 (Softmax)         (None, 2)                 0         
                                                                 
Total params: 15,842
Trainable params: 15,842
Non-trai

In [172]:
# Compling the model

# make our model ready for training
# 1. optimizer 2. loss function 3. metrics

model.compile(
    optimizer = keras.optimizers.SGD(learning_rate = 0.01),
    loss = keras.losses.CategoricalCrossentropy(),
    metrics = ['accuracy']
)

In [173]:
# change y to one hot labels
# we have 2 classes, so our MLP will have 2 output neurons which one neuron 
# will be one and the others zero

from tensorflow.keras.utils import to_categorical

y_tr_hot = to_categorical(Train_Label)

print(y_tr_hot.shape)

(120, 2)


In [175]:
# Training the model
# using .fit in keras
# inputs of .fit : 1. Train/Val Data 2. Batch Size(go over samples window size)
# 3. Number of Epochs(number of repeatation on all samples) 4. Callbacks


hist = model.fit(
    group_features_both,
    y_tr_hot,
    batch_size = 10,
    epochs = 200,
)


Epoch 164/200
Epoch 165/200
Epoch 166/200
Epoch 167/200
Epoch 168/200
Epoch 169/200
Epoch 170/200
Epoch 171/200
Epoch 172/200
Epoch 173/200
Epoch 174/200
Epoch 175/200

KeyboardInterrupt: ignored