# DETECTING CONFUSION LEVEL
Python code related to my thesis project where I had to conduct an experiment where participants took cognitive tests. During the cognitive tests, we recorded their EEG signals. At the end of each cognitive exercise, we asked the participants to indicate their level of confusion. I then trained models to detect the intensity of confusion (not confused, slightly, moderately and very confused) or the confusion state (no confusion, confusion)

PROGRAMMING AIM: We will use classes, exceptions and follow the design of the MVC model!

Check if we are using the GPU Alienware


In [None]:
import tensorflow as tf
tf.test.gpu_device_name()
import numpy as np
seed = 7
np.random.seed(seed)

Check if Cuda is installed

In [None]:
!nvcc --version

nvcc: NVIDIA (R) Cuda compiler driver
Copyright (c) 2005-2019 NVIDIA Corporation
Built on Sun_Jul_28_19:07:16_PDT_2019
Cuda compilation tools, release 10.1, V10.1.243


Our future methods will catch commun exceptions with this function

In [None]:
import inspect
def ExceptionMessage(exc):
  print("Exception of type:", exc.__class__)
  print("Called by fonction:", inspect.stack()[1][3])
  print("Message:", exc)

# DRIVE

##GoogleDrive Class (model)

We are going to create a class with the attributes of the drive, i.e., its URLs so later we can have access to EEG files. 

```
# Note: If we follow the MVC model, the GoogleDrive class is a model.
```

In [None]:
class GoogleDrive:
  """Class with the urls of the drive to load its files later on"""
  #Attributes: url of drive, and url of the EEG files
  def __init__(self, urlDrive='/content/drive', urlFiles= '/content/drive/My Drive/Colab Notebooks/Data/CognitiveTest/', urlLabels = '../content/drive/My Drive/Colab Notebooks/Data/CognitiveTest/All_exercices.csv'):
    self._urlDrive = urlDrive
    self._urlFiles = urlFiles
    self._urlLabels = urlLabels
    
  #Get the drive url
  def GetUrlDrive(self):
    return self._urlDrive
  
  #Get the url of the EEG signals files
  def GetUrlFiles(self):
    return self._urlFiles
  
  #Get the url of the EEG labels
  def GetUrlLabel(self):
    return self._urlLabels

##ViewDrive Class (view)

We are going to create a class to see the files present on the drive.


```
# Note: Following the MVC model, this class would be a view.
```



In [None]:
import os
class ViewDrive:
  """Class to get a view of the drive (print its files and so on)"""
  def __init__(self, GoogleDrive):
    self._GoogleDrive = GoogleDrive
    
  # See files in drive  
  def SeeDataDrive(self):
    url = self._GoogleDrive.GetUrlFiles()
    if os.path.exists(url):
      print(sorted(os.listdir(url)))
    else:
      print("Drive URL not found")

##DriveController Class (controller)

Then we are going to mount our drive using its URL. We will also display its content. To perform these manipulations we will create the controller class of the drive. The controller uses the GoogleDrive class and the ViewDrive class


```
# Note: Following the MVC model, this class would be a controller.
```



In [None]:
from google.colab import drive
class DriveController:
  """Class to perform action on the Drive e.g. mount it so that we can access its files"""
  def __init__(self, GoogleDrive=GoogleDrive()):
    self._GoogleDrive = GoogleDrive
    self._ViewDrive = ViewDrive(self._GoogleDrive)
    
  #Load the driver helper to access data later and mount.
  def MountDrive(self):
    try:
      drive.mount(self._GoogleDrive.GetUrlDrive()) #Catch any external errors related to the drive
      self._ViewDrive.SeeDataDrive()
    except Exception as exc:
      ExceptionMessage(exc)

In [None]:
DC = DriveController()
DC.MountDrive()

Mounted at /content/drive
['All_exercices.csv', 'Graphs', 'Graphs0', 'Graphs1', 'P1_s0ex0.csv', 'P1_s0ex1.csv', 'P1_s0ex2.csv', 'P1_s0ex3.csv', 'P1_s1ex0.csv', 'P1_s1ex1.csv', 'P1_s1ex2.csv', 'P1_s1ex3.csv', 'P1_s2ex0.csv', 'P1_s2ex1.csv', 'P1_s2ex2.csv', 'P1_s2ex3.csv', 'P1_s3ex0.csv', 'P1_s3ex1.csv', 'P1_s3ex2.csv', 'P1_s3ex3.csv', 'P1_s4ex0.csv', 'P1_s4ex1.csv', 'P1_s4ex2.csv', 'P1_s4ex3.csv', 'P2_s0ex0.csv', 'P2_s0ex1.csv', 'P2_s0ex2.csv', 'P2_s0ex3.csv', 'P2_s1ex0.csv', 'P2_s1ex1.csv', 'P2_s1ex2.csv', 'P2_s1ex3.csv', 'P2_s2ex0.csv', 'P2_s2ex1.csv', 'P2_s2ex2.csv', 'P2_s2ex3.csv', 'P2_s3ex0.csv', 'P2_s3ex1.csv', 'P2_s3ex2.csv', 'P2_s3ex3.csv', 'P2_s4ex0.csv', 'P2_s4ex1.csv', 'P2_s4ex2.csv', 'P2_s4ex3.csv', 'P3_s0ex0.csv', 'P3_s0ex1.csv', 'P3_s0ex2.csv', 'P3_s0ex3.csv', 'P3_s1ex0.csv', 'P3_s1ex1.csv', 'P3_s1ex2.csv', 'P3_s1ex3.csv', 'P3_s2ex0.csv', 'P3_s2ex1.csv', 'P3_s2ex2.csv', 'P3_s2ex3.csv', 'P3_s3ex0.csv', 'P3_s3ex1.csv', 'P3_s3ex2.csv', 'P3_s3ex3.csv', 'P3_s4ex0.csv', 'P3_s4ex

# EXERCICES

##ExercicesList Class (model)

We are going going to create the class that contains our exercices data (participants' answers during cognitive tests, time they took to complete the exercise, labels of confusion...)

In [None]:
#Load Packages
import numpy as np
import pandas as pd

class ExercicesList:
  """Class containing the results of the cognitive exercises used in our experiment"""
  def __init__(self, GoogleDrive=GoogleDrive()):
    self._gd = GoogleDrive
    self._listExercices = pd.read_csv(self._gd.GetUrlLabel()).fillna(0) #list of cognitive exercices with participants' answers and confusion labels
    self._arrayExercices = np.array(self._listExercices)
    self._nbParticipants = 7 # number of participants of the experiment
    self._nbSeries = 5 # number of series of exercices
    self._nbExercices = 4 # number of exercices
    self._totalExercices = self._nbParticipants*self._nbSeries*self._nbExercices #10x5x4 = 200ex. performed
  
  # Get google drive
  def GetGD(self):
    return self._gd

  # Get list of exercices
  def GetListExercices(self):
    return self._listExercices

  # Get list of exercices
  def GetListExercices(self):
    return self._listExercices
  
  # Get array of exercices
  def GetArrayExercices(self):
    return self._arrayExercices

  # Get number of participants
  def GetNbParticipants(self):
    return self._nbParticipants
  
  # Get number of series
  def GetNbSeries(self):
    return self._nbSeries
  
  # Get number of exercices per series
  def GetNbExercices(self):
    return self._nbExercices

  # Get total number of exercices
  def GetTotalExercices(self):
    return self._totalExercices

# DATASET CREATION

## Dataset Class (model)
Dataset class that will contain our EEG signals, labels of confusion and possible extracted features

In [None]:
from numpy import savetxt, loadtxt

class Dataset:
  """Class containing our dataset to train our models (labels of confusion, eeg signals and possible extracted features)"""
  def __init__(self):
    self._inputList = [] # EEG signals would be saved under a list here
    self._inputArray = None # Numpy array of EEG signals
    self._normInputArray = None # Normalized numpy array of EEG signals
    self._bpArray = None # Bandpower features computed from EEG signals
    self._normBpArray = None # # Normalized Bandpower features computed from EEG signals
    self._labelList = None # Confusion level labels
    self._labelArray = None #Numpy array of confusion level

    self._X_train = None
    self._X_test = None
    self._y_train = None
    self._y_test = None

  # Set list that contains eeg signals
  def SetInputList(self,inputList):
    self._inputList = inputList

  # Get EEG signals list
  def GetInputList(self):
    return self._inputList

  def SetInputArray(self, arr):
    self._inputArray = arr

  #Reshape EEG signals array to 2D
  def Get2DInputArray(self):
    return self._inputArray.reshape(self._inputArray.shape[0], -1)
  
  #Reshape normalized input array to 2D
  def Get2DNormInputArray(self):
    return self._normInputArray.reshape(self._normInputArray.shape[0], -1)
  
  def GetInputArray(self):
    return self._inputArray
  
  #Set normalized input array to arr
  def SetNormInputArray(self,arr):
    self._normInputArray = arr

  #Get normalized input array
  def GetNormInputArray(self):
    return self._normInputArray
  
  def SetBpArray(self, arr):
    self._bpArray = arr
  
  def GetBpArray(self):
    return self._bpArray
  
  #Reshape bandpower array to 2D
  def Get2DBpArray(self):
    return self._bpArray.reshape(self._bpArray.shape[0], -1)
  
  #Reshape normalized bandpower array to 2D
  def Get2DNormBpArray(self):
    return self._normBpArray.reshape(self._normBpArray.shape[0], -1)
  
  #Set normalized bp array to arr
  def SetNormBpArray(self,arr):
    self._normBpArray = arr

  #Get normalized bp array
  def GetNormBpArray(self):
    return self._normBpArray
  
  def SetLabelList(self, arr):
    self._labelList = arr
  
  def GetLabelList(self):
    return self._labelList
  
  def SetLabelArray(self, arr):
    self._labelArray = arr

  def GetLabelArray(self):
    return self._labelArray
  
  #Set splitted dataset (eeg signals or bandpower features)
  def SetSplittedDataset(self,X_train, X_test, y_train, y_test):
    self._X_train = X_train
    self._X_test = X_test
    self._y_train = y_train
    self._y_test = y_test
  
  #Get splitted dataset
  def GetSplittedDataset(self):
    return self._X_train, self._X_test, self._y_train, self._y_test

  #Save bandpower features
  def SaveBpArray(self):
    savetxt('/content/drive/My Drive/Colab Notebooks/Data/CognitiveTest/bpData_conf_bin.csv', self.Get2DBpArray(), delimiter=',')
    print("Bandpower data saved")
  
  #Load bandpower features
  def LoadBpArray(self):
    data = loadtxt('/content/drive/My Drive/Colab Notebooks/Data/CognitiveTest/bpData_bin.csv', delimiter=',')
    data = data.reshape(data.shape[0],1,data.shape[1])
    self.SetBpArray(data)
    print("Bandpower loaded")

## DatasetView Class (view)
Tools to visualize the dataset

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(font_scale=1.2)
import plotly.express as px
from sklearn.decomposition import PCA

class DatasetView:
  """Class for dataset visualization"""
  def __init__(self, Dataset):
    self._dataset = Dataset
  
  # printing the list using loop 
  def PrintInputList(self):
    li = self._dataset.GetInputList()
    print(len(li))
    for x in range(len(li)): 
      print(li[x])

  # printing the list using loop 
  def PrintLabelList(self):
    li = self._dataset.GetLabelList()
    print(li)

  # printing the array
  def PrintInputArray(self):
    arr = self._dataset.GetInputArray()
    print("Array: ", arr)
    print("Shape: ", arr.shape)

  # printing the array
  def PrintLabelArray(self):
    arr = self._dataset.GetLabelArray()
    print("Array: ", arr)
    print("Shape: ", arr.shape)

  # Plot the amplitude of each channel for each seconds
  def PlotVolt(self,secs):
    # Define sampling frequency and time vector
    sf = 128.
    data = self._dataset.GetInputArray()

    #Retrieve data ranged by exercices instead of seconds. So we can later print the volts for the entire exercice
    d1 = int(data.shape[0]/secs)
    d2 = data.shape[1]*secs
    d3 = data.shape[2]
    data2 = np.reshape(data,(d1,d2,d3))
    print("data2 shape: ",data2.shape)

    part = 0 #participant number
    serie = 0 #serie number
    ex = 0 #exercice number
    interv = data2.shape[0]
    print(interv)
    for i in range(interv):
      if i%20==0 and i!=0:
        part = part+1
        serie = 0
      if ex==4:
        serie = serie + 1
        ex = 0
      time = np.arange(data2[i].shape[0]) / sf
      print(time)
      # Plot the signal
      plt.plot(time, data2[i], lw=1.5, color='k')
      plt.xlabel('Time (seconds)')
      plt.ylabel('Voltage')
      plt.title('EEG raw signal mV. Participant %i Serie %i Exercice %i ' % (part,serie,ex))
      sns.despine()
      plt.savefig('/content/drive/My Drive/Colab Notebooks/Data/CognitiveTest/Graphs/'+str(i)+'.png')  # write image to file
      plt.clf()

      ex = ex+1
  
  #Visualize dataset in 2D with PCA
  @staticmethod
  def Pca2D(X,y):
    pca = PCA(n_components=2)
    arr = X
    components = pca.fit_transform(arr)

    fig = px.scatter(components, x=0, y=1, color=y)
    fig.show()
  
  #Visualize dataset in 3D with PCA
  @staticmethod
  def Pca3D(X,y):
    pca = PCA(n_components=3)
    arr = X
    components = pca.fit_transform(arr)

    total_var = pca.explained_variance_ratio_.sum() * 100

    fig = px.scatter_3d(
        components, x=0, y=1, z=2, color=y,
        title=f'Total Explained Variance: {total_var:.2f}%',
        labels={'0': 'PC 1', '1': 'PC 2', '2': 'PC 3'}
    )
    fig.show()

##WorkerController Class (controller)
Class for extracting the results of the cognitive exercises (i.e., labels of confusion) and the EEG signals. This class works like a worker and retrieves the data from the Drive so that the manager (DatasetController) can then manage the data.

In [None]:
import pandas.io.common as pio

class WorkerController:
  """Class for extracting the results of the cognitive exercises (with labels of confusion) and the EEG signals. This class will set up ExercicesList and Dataset classes."""
  def __init__(self, ExercicesList=ExercicesList(), Dataset=Dataset()):
    self._exercicesList = ExercicesList
    self._dataset = Dataset
    self._datasetView = DatasetView(self._dataset)
  
  def GetExercicesList(self):
    return self._exercicesList
  
  def GetDataset(self):
    return self._exercicesList

  # Build our input list = load EEG signals from csv files
  def BuildInputList(self):
    inputList=[]
    for f in range(self._exercicesList.GetNbParticipants()):
      dataframe = []
      for i in range(self._exercicesList.GetNbSeries()):
        eegSerie = [] 
        for j in range(self._exercicesList.GetNbExercices()):
          ex = self.GetEEGExercice(f,i,j)
          eegSerie.append(ex)
        dataframe.append(eegSerie)
      inputList.append(dataframe)
    self._dataset.SetInputList(inputList)
    #self._datasetView.PrintInputList()

  # Get EEG signals from an exercice file
  def GetEEGExercice(self,participantNb,serieNb,exNb):
    participant = participantNb+1
    file = self._exercicesList.GetGD().GetUrlFiles()+'P'+str(participant)+'_s'+str(serieNb)+'ex'+str(exNb)+'.csv'
    print(file)
    try:
      test = pd.read_csv(file).fillna(0) #if we can't find the file on the drive (e.g., it has been erased)
    except NameError:
      print("File not Found on drive")
    except pio.EmptyDataError:
      print("Empty file on drive")
    return test
  
  # Make sure that each exercise lasts whole seconds (e.g., not half-seconds)
  @staticmethod
  def CutExercice(data):
    isNotReady = True
    while isNotReady:  
      if(data.shape[0] % 128 != 0):
        data = data[:-1]
      elif(data.shape[0] % 128 == 0):
        isNotReady = False
    return data

  # Divide each exercise into 14x128 eeg signals per second
  @staticmethod
  def DivideExercice(data):
    nb_secs = int(data.shape[0]/128)
    data = data.reshape(nb_secs, 128, 14)
    return data

  # Keep only X last seconds of the exercice
  @staticmethod
  def GetXsecs(data,size):
    if data.shape[0]>size:
      data = data[data.shape[0]-size:]
    #else:
      #data = np.array([])
      #length = size - data.shape[0]
      #data = np.pad(data, (0, length), mode='constant')
    print("shape ex: ",data.shape) 
    return data

  # Normalize exercice: Mean
  @staticmethod
  def MeanExercice(data):
    data = data.reshape(data.shape[1],data.shape[0])
    print(data.shape)
    data = np.mean(data,axis=1)
    data = data.reshape(1,data.shape[0])
    print(data)
    return data
  
  #Compute the bandpower for the entire exercice
  def ComputeBandpowerMean(data):
    data = data.reshape(data.shape[1],data.shape[0])
    bpArray = np.zeros((14, 5))
    for i in range(data.shape[0]):
      sf = data[i].shape[0]
      bpArray[i][0] = bandpower(data[i], sf, [1, 4], 'welch') #delta bandpower
      bpArray[i][1] = bandpower(data[i], sf, [4, 8], 'welch') #theta bandpower
      bpArray[i][2] = bandpower(data[i], sf, [8, 13], 'welch') #alpha bandpower
      bpArray[i][3] = bandpower(data[i], sf, [13, 30], 'welch') #beta bandpower
      bpArray[i][4] = bandpower(data[i], sf, [30, 43], 'welch') #gamma bandpower
    data = bpArray
    data = data.reshape(data.shape[1],data.shape[0])
    return data
  
  # Normalize exercice: Cut and divide each exercice in seconds
  @staticmethod
  def NormalizeExercice(dataframes,participantNb,serieNb,exNb,case):
    data_np = np.array(dataframes[participantNb][serieNb][exNb])
    data_np = WorkerController.CutExercice(data_np)
    if case == 0:
      data_np = WorkerController.DivideExercice(data_np)
      data_np = WorkerController.GetXsecs(data_np,180) 
    elif case == 1:
      data_np = ComputeBandpowerMean(data_np)
    else:
      data_np = WorkerController.MeanExercice(data_np)
    return data_np
  
  #Append each array (128x14) to the dataset
  @staticmethod
  def AppendDataset(data,dataset):
    for arr in data:
      dataset.append(arr)
    return dataset

 # Cut the EEG signals of the entire exercise into one-second signals and associate the level of confusion of the entire exercise to each second
  @staticmethod
  def GetExPerSecAndLabels(nbPart, nbSerie, nbEx, countExercices, dataframes, inputs, labels, arrEx, index, total_ex,case,type):
    countExercices = countExercices + 1
    
    # Cut the exercise data
    data_np = WorkerController.NormalizeExercice(dataframes,nbPart,nbSerie,nbEx,case=case)

    if data_np.size != 0:
      inputs = WorkerController.AppendDataset(data_np,inputs)
      # Get labels from reported self_confusion
      if type == 0: # Binary confusion
        end = 1 
      else: # 4 levels of confusion
        end = 2
      for t in range(data_np.shape[0]):
        label = arrEx[index][arrEx.shape[1]-end]
        labels.append(label)

    index = index+1
    print("We are at ",(countExercices/total_ex)*100, "%")
    return countExercices, inputs, labels, index
  

  #Use the input list of EEG signals and the labels extracted from the cognitive exercices results to build the dataset
  def BuildDataset(self):
    inputs = []
    labels = []
    index = 0
    
    dataframes = self._dataset.GetInputList()
    arrEx = self._exercicesList.GetArrayExercices()
    total_ex = self._exercicesList.GetTotalExercices()

    countExercices = 0
    for f in range(self._exercicesList.GetNbParticipants()):
      for i in range(self._exercicesList.GetNbSeries()):
        for j in range(self._exercicesList.GetNbExercices()):
          countExercices, inputs, labels, index = self.GetExPerSecAndLabels(f,i,j,countExercices, dataframes, inputs, labels, arrEx, index, total_ex,case=0,type=0)

    self._dataset.SetInputList(inputs)
    self._dataset.SetLabelList(labels)

##DatasetController Class (controller)
Now that our dataset has been successfully created by the worker, we can perform action on it

In [None]:
from collections import Counter
from sklearn.preprocessing import StandardScaler

class DatasetController:
  """Class that manages the dataset obtained by the WorkerController"""
  def __init__(self, Dataset):
    self._dataset = Dataset
    self._datasetView = DatasetView(self._dataset)

  def ListToNpArray(self):
    """Transform dataset lists into numpy arrays"""
    self._dataset.SetInputArray(np.array(self._dataset.GetInputList()))
    self._dataset.SetLabelArray(np.array(self._dataset.GetLabelList()))
    #self._datasetView.PrintInputArray()
    #self._datasetView.PrintLabelArray()

  def ComputeBandpower(self): 
    """From the raw EEG signals we compute the average bandpower"""
    sf = 128
    inputArr =  self._dataset.GetInputArray()
    inputArr = np.transpose(inputArr, (0, 2, 1))
    #print(inputArr.shape)
    bpArray = np.zeros((inputArr.shape[0], 14, 5))

    for i in range(inputArr.shape[0]):
      for j in range(inputArr.shape[1]):
        bpArray[i][j][0] = bandpower(inputArr[i][j], sf, [1, 4], 'welch') #delta bandpower
        bpArray[i][j][0] = bandpower(inputArr[i][j], sf, [4, 8], 'welch') #theta bandpower
        bpArray[i][j][1] = bandpower(inputArr[i][j], sf, [8, 13], 'welch') #alpha bandpower
        bpArray[i][j][2] = bandpower(inputArr[i][j], sf, [13, 30], 'welch') #beta bandpower
        bpArray[i][j][3] = bandpower(inputArr[i][j], sf, [30, 43], 'welch') #gamma bandpower
      print("We are at ",(i/inputArr.shape[0])*100, "%")
      self._dataset.SetBpArray(bpArray)

  # Perform normalized eeg signals array
  def PerformNormInput(self):
    arr = self._dataset.Get2DInputArray()
    arr = stats.zscore(arr)
    arr = np.float64(arr)
    arr[np.isnan(arr)] = 0
    self._dataset.SetNormInputArray(arr)
  
  # Perform normalized bp array
  def PerformNormBp(self):
    arr = self._dataset.Get2DBpArray()
    arr = stats.zscore(arr)
    arr = np.float64(arr)
    arr[np.isnan(arr)] = 0
    self._dataset.SetNormBpArray(arr)

  # Normalize dataset
  def NormDataset(self,case):
    if case == 0:
      self.PerformNormInput()
    else:
      self.PerformNormBp()

### Average bandpower

FEATURES EXTRACTION: from EEG signal to average bandpower: delta, alpha, beta, theta, gamma https://raphaelvallat.com/bandpower.html

In [None]:
!pip3 install mne
from scipy import stats
from scipy.signal import hilbert, welch
from scipy import fftpack
from scipy.integrate import simps
from mne.time_frequency import psd_array_multitaper

def bandpower(data, sf, band, method='welch', window_sec=None, relative=False):
    """Compute the average power of the signal x in a specific frequency band.

    Requires MNE-Python >= 0.14.

    Parameters
    ----------
    data : 1d-array
      Input signal in the time-domain.
    sf : float
      Sampling frequency of the data.
    band : list
      Lower and upper frequencies of the band of interest.
    method : string
      Periodogram method: 'welch' or 'multitaper'
    window_sec : float
      Length of each window in seconds. Useful only if method == 'welch'.
      If None, window_sec = (1 / min(band)) * 2.
    relative : boolean
      If True, return the relative power (= divided by the total power of the signal).
      If False (default), return the absolute power.

    Return
    ------
    bp : float
      Absolute or relative band power.
    """

    band = np.asarray(band)
    low, high = band

    # Compute the modified periodogram (Welch)
    if method == 'welch':
        if window_sec is not None:
            nperseg = window_sec * sf
        else:
            nperseg = (2 / low) * sf

        freqs, psd = welch(data, sf, nperseg=nperseg)

    elif method == 'multitaper':
        psd, freqs = psd_array_multitaper(data, sf, adaptive=True,
                                          normalization='full', verbose=0)

    # Frequency resolution
    freq_res = freqs[1] - freqs[0]

    # Find index of band in frequency vector
    idx_band = np.logical_and(freqs >= low, freqs <= high)

    # Integral approximation of the spectrum using parabola (Simpson's rule)
    bp = simps(psd[idx_band], dx=freq_res)

    if relative:
        bp /= simps(psd, dx=freq_res)
    return bp



In [None]:
dataset = Dataset()
#eL = ExercicesList()
#eL.GetListExercices()

In [None]:
wc = WorkerController(Dataset=dataset)
wc.BuildInputList()
wc.BuildDataset()

In [None]:
dc = DatasetController(Dataset=dataset)
dc.ListToNpArray()

dw = DatasetView(dataset)
dw.PrintInputArray()

In [None]:
dataset.GetLabelArray().shape
dataset.GetInputArray().shape
dv = DatasetView(dataset)

In [None]:
#dv.PlotVolt(30)

In [None]:
#dc.ComputeBandpower()
#dataset.SaveBpArray()
dataset.LoadBpArray()

In [None]:
dc.NormDataset(case=1)

In [None]:
#dv.Pca2D(dataset.GetNormBpArray(),dataset.GetLabelList())
#dv.Pca3D(dataset.GetNormBpArray(),dataset.GetLabelList())

# MACHINE LEARNING MODELS

## MLModel Class (model)

In [None]:
class MLModel:
  """Class containing the common parameters of all Machine learning models"""
  def __init__(self):
    self._model = None #will contain the trained model
    self._modelAccScore = [] #subset accuracy result of the model on the test set (filled by the controller)
    self._modelTrainAccScore = [] #subset accuracy result of the model on the training (filled by the controller)
    self._modelPreds = [] #will contain the predictions made by the model
    self._confMatrices = []
  
  #Get the current model
  def GetModel(self):
    return self._model

  #Set the current trained model
  def SetModel(self, model):
    self._model = model

  #Get the current model accuracy
  def GetModelAcc(self):
    return self._modelAccScore

  #Set the accuracy of the model
  def SetModelAcc(self,acc):
    self._modelAccScore = acc

  #Get the current model accuracy
  def GetModelTrainAcc(self):
    return self._modelTrainAccScore

  #Set the accuracy of the model
  def SetModelTrainAcc(self,acc):
    self._modelTrainAccScore = acc
  
  #Get the predictions made by the model
  def GetModelPreds(self):
    return self._modelPreds
 
  #Set the list of predictions made by the model
  def SetModelPreds(self,preds):
    self._modelPreds = preds

  #Set the list of confusions matrices given after the training
  def SetConfMatrices(self,confMatrices):
    self._confMatrices = confMatrices
  
  #Get the list of confusions matrices given after the training
  def GetConfMatrices(self):
    return self._confMatrices

##ViewMLModel Class (view)

In [None]:
!pip3 install pyriemann
from pyriemann.utils.viz import plot_confusion_matrix
from sklearn.metrics import classification_report
from sklearn.model_selection import learning_curve
from sklearn.metrics import confusion_matrix
import seaborn as sns

class ViewMLModel:
  """Class containing the methods for viewing Machine learning results"""
  def __init__(self, Dataset, MLModel):
    self._dataset = Dataset
    self._mLModel = MLModel

  #Get accuracy values for different values of k in KNN
  def kRangeAcc(self,k_range):
    # plot to see clearly
    plt.plot(k_range, self._mLModel.GetModelAcc())
    plt.xlabel('Value of K for KNN')
    plt.ylabel('Accuracy')
    plt.show()

  #Show classification report
  def ClassificationReport(self):
    Y = self._dataset.GetSplittedDataset()[3]
    print(self._mLModel.GetModelAcc())
    print(classification_report(Y,self._mLModel.GetModelPreds()))
  
  #Classification report matrix for lstm model
  def ClassificationReportLSTM(self):
    print(self._mLModel.GetModelAcc())
    Y = self._dataset.GetSplittedDataset()[3]
    Y = np_utils.to_categorical(Y,4)
    y_pred = self._mLModel.GetModelPreds()
    y_pred = np.argmax(y_pred, axis=1)
    Y = np.argmax(Y, axis=1)
    print(classification_report(Y,y_pred))
  
  #General confusion matrix that will be used for KNN and SVM
  def ConfMat(self):
    preds = self._mLModel.GetModelPreds()
    model = self._mLModel.GetModel()

    Y = self._dataset.GetSplittedDataset()[3]
    # make predictions on the testing data
    print("[INFO] predicting confusion level ...")
    acc         = np.mean(preds == Y)
    print("Classification accuracy: %f " % (acc))

    # plot the confusion matrices for both classifiers
    names        = ["Conf0", "Conf1", "Conf2", "Conf3"]
    #names        = possible_labels
    plt.figure(0)
    plot_confusion_matrix( Y, preds, names, title = str(type(model))+'\nAccuracy:{0:.3f}'.format(acc))
    plt.show()
  
  #Confusion matrix that will be used for LSTM
  def ConfMatLSTM(self):
    preds = self._mLModel.GetModelPreds()
    Y = self._dataset.GetSplittedDataset()[3]
    Y = np_utils.to_categorical(Y,4)
    model = self._mLModel.GetModel()

    # make predictions on the testing data
    print("[INFO] predicting confusion level ...")
    preds_       = preds.argmax(axis = -1)
    acc         = np.mean(preds_ == Y.argmax(axis=-1))
    print("Classification accuracy: %f " % (acc))

    # plot the confusion matrices for both classifiers
    #names        = ["Conf0", "Conf1", "Conf2", "Conf3"]
    names        = ["Conf0", "Conf1"]
    #names        = possible_labels
    plt.figure(0)
    plot_confusion_matrix(Y.argmax(axis = -1), preds_, names, title = str(type(model))+'\nAccuracy:{0:.3f}'.format(acc))
    plt.show()
  
  #Confusion matrix obtained after CV
  def ConfMatCV(self):
    acc = self._mLModel.GetModelAcc()
    acc = np.array(acc)
    std = np.std(acc)
    acc = np.mean(acc)
    confMatrices =  self._mLModel.GetConfMatrices()
    # Transform to df for easier plotting
    cm_df = pd.DataFrame(confMatrices,
                        index = ['Conf0','Conf1'], 
                        columns = ['Conf0','Conf1'])
                        #index = ['Conf0','Conf1','Conf2','Conf3'], 
                        #columns = ['Conf0','Conf1','Conf2','Conf3'])

    plt.figure(figsize=(5.5,4))
    sns.heatmap(cm_df, annot=True, cmap='Blues')
    plt.title(str(type(self._mLModel.GetModel()))+'\nAccuracy:{0:.3f}'.format(acc) +'\nStd:{0:.3f}'.format(std))
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.show()
  
  #Plot the validation curve for hyperparameters
  def ValidationCurve(self,params,paramName):
    plt.style.use('ggplot')
    scores = self._mLModel.GetModelAcc()
    scores = np.array(scores)
    scores_train = self._mLModel.GetModelTrainAcc()
    scores_train = np.array(scores_train)

    fig, ax = plt.subplots(figsize=(8, 4))
    ax.plot(params, np.mean(scores_train,axis=1), alpha=0.5, color='blue', label='train')
    ax.plot(params, np.mean(scores,axis=1), alpha=0.5, color='red', label='cv')
    ax.fill_between(params, np.mean(scores,axis=1) - np.std(scores,axis=1), 
                np.mean(scores,axis=1) + np.std(scores,axis=1), color='#888888', alpha=0.4)
    ax.fill_between(params, np.mean(scores,axis=1) - 2*np.std(scores,axis=1), 
                np.mean(scores,axis=1) + 2*np.std(scores,axis=1), color='#888888', alpha=0.2)
    ax.legend(loc='best')
    ax.set_ylabel("Accuracy")
    ax.set_xlabel(paramName);
  
  #Plot the validation curve of the LSTM
  @staticmethod
  def PlotLearningCurveLSTM(histo):
    #print(histo.history.keys())
    # summarize history for accuracy
    plt.plot(histo.history['accuracy'])
    plt.plot(histo.history['val_accuracy'])
    plt.plot(histo.history['loss'])
    plt.plot(histo.history['val_loss'])
    plt.title('model accuracy')
    #plt.ylabel('accuracy')
    plt.xlabel('epoch')
    plt.legend(['train accuracy', 'validation accuracy','train loss','validation loss'], loc='upper left')
    plt.show()

  #CV learning curve from https://scikit-learn.org/stable/auto_examples/model_selection/plot_learning_curve.html
  def plot_learning_curve(self, axes=None, ylim=None, cv=None, n_jobs=None, train_sizes=np.linspace(.1, 1.0, 5)):
    estimator = self._mLModel.GetModel()
    title = str(type(estimator))+ 'learning curve'
    X = self._dataset.GetSplittedDataset()[0]
    y = self._dataset.GetSplittedDataset()[3]

    if axes is None:
        _, axes = plt.subplots(1, 3, figsize=(20, 5))

    axes[0].set_title(title)
    if ylim is not None:
        axes[0].set_ylim(*ylim)
    axes[0].set_xlabel("Training examples")
    axes[0].set_ylabel("Score")

    train_sizes, train_scores, test_scores, fit_times, _ = \
        learning_curve(estimator, X, y, cv=cv, n_jobs=n_jobs,
                       train_sizes=train_sizes,
                       return_times=True)
    train_scores_mean = np.mean(train_scores, axis=1)
    train_scores_std = np.std(train_scores, axis=1)
    test_scores_mean = np.mean(test_scores, axis=1)
    test_scores_std = np.std(test_scores, axis=1)
    fit_times_mean = np.mean(fit_times, axis=1)
    fit_times_std = np.std(fit_times, axis=1)

    # Plot learning curve
    axes[0].grid()
    axes[0].fill_between(train_sizes, train_scores_mean - train_scores_std,
                         train_scores_mean + train_scores_std, alpha=0.1,
                         color="r")
    axes[0].fill_between(train_sizes, test_scores_mean - test_scores_std,
                         test_scores_mean + test_scores_std, alpha=0.1,
                         color="g")
    axes[0].plot(train_sizes, train_scores_mean, 'o-', color="r",
                 label="Training score")
    axes[0].plot(train_sizes, test_scores_mean, 'o-', color="g",
                 label="Cross-validation score")
    axes[0].legend(loc="best")

    return plt



##MLModelController Class (Controller)

In [None]:
from sklearn import metrics
from sklearn.metrics import precision_recall_fscore_support as score
from sklearn.neighbors import KNeighborsClassifier
from keras.optimizers import Adam
from keras.models import Sequential
from keras.layers import Dense
from keras.layers import LSTM, Dropout, GaussianNoise
from keras.layers import Activation
from keras.utils import np_utils
from keras.callbacks import EarlyStopping
from sklearn.svm import SVC
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import StratifiedKFold, TimeSeriesSplit
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler
from keras import callbacks 

class MLModelController:
  """Class that will perform actions on model """
  def __init__(self,Dataset,MLModel=MLModel()):
    self._dataset = Dataset
    self._mLModel = MLModel
    self._viewMlModel = ViewMLModel(Dataset,MLModel)

  @staticmethod
  def ComputeConfMatrices(confMatrices):
    """Method that will perform the mean of all saved matrices of confusion """
    confMatrices = np.array(confMatrices)
    confMatrices = np.mean(confMatrices,axis=0)
    confMatrices = confMatrices.astype('float') / confMatrices.sum(axis=1)[:, np.newaxis]
    return confMatrices
  
  def SaveInterModelResults(self,knn,y_pred,score,X_train, X_valid, y_train, y_valid):
    """Method that save the temporary results of the models """
    self._mLModel.SetModel(knn)
    self._mLModel.SetModelPreds(y_pred)
    self._mLModel.SetModelAcc(score)
    self._dataset.SetSplittedDataset(X_train, X_valid, y_train, y_valid)
  
  def SaveFinalModelResults(self,y_preds,scores,accScores,confMatrices):
    """Method that save the results of the models """
    self._mLModel.SetModelPreds(y_preds)
    self._mLModel.SetModelAcc(scores)
    self._mLModel.SetModelTrainAcc(accScores)
    confMatrices = MLModelController.ComputeConfMatrices(confMatrices)
    self._mLModel.SetConfMatrices(confMatrices)
    
  def KnnModel(self, case=1):
    """Perform KNN algorithm and evaluate the model on validation or testing set according to case value"""
    if case == 0:
      X = self._dataset.Get2DNormInputArray()
    else:
      X = self._dataset.Get2DNormBpArray()
    
    y = self._dataset.GetLabelArray()

    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state = 42)

    scores = []
    scores_train = []
    confMatrices = []
    y_preds = []

    final_scores_train = []
    final_scores = []

    ks = range(1,20)

    for k in ks:
      for train_idx, valid_idx, in cv.split(X, y):
        X_train, y_train = X[train_idx], y[train_idx]
        X_valid, y_valid = X[valid_idx], y[valid_idx]
        self.PerformKNN1CV(X,y,X_train,y_train,X_valid,y_valid,k,scores_train,scores,y_preds,confMatrices)

    
      self.SaveFinalModelResults(y_preds,scores,scores_train,confMatrices)
      self._viewMlModel.ConfMatCV()
      final_scores_train.append(scores_train)
      final_scores.append(scores)
      scores = []
      scores_train = []
    
    self.SaveFinalModelResults(y_preds,final_scores,final_scores_train,confMatrices)
    self._viewMlModel.ValidationCurve(ks,"k")
  
  def PerformKNN1CV(self,X,y,X_train,y_train,X_valid,y_valid,k,scores_train,scores,y_preds,confMatrices):
    """Perform KNN algorithm for 1 CV"""

    #Oversample data
    smt = SMOTE(random_state=0)
    X_train, y_train = smt.fit_resample(X_train, y_train)

    print(X_train.shape)
    print(sorted(Counter(y_train).items()))

    knn = KNeighborsClassifier(n_neighbors=k)
    knn.fit(X_train, y_train)

    score2 = knn.score(X_train, y_train)
    print("Training score: ",score2)
    scores_train.append(score2)

    y_pred=knn.predict(X_valid)
    score = metrics.accuracy_score(y_valid,y_pred)
    scores.append(score)

    print("done for k=",k)

    self.SaveInterModelResults(knn,y_pred,score,X_train,X_valid,y_train,y_valid)
    self._viewMlModel.ClassificationReport()
    #self._viewMlModel.ConfMat()

    y_preds.append(y_pred)

    conf_matrix = confusion_matrix(y_valid, knn.predict(X_valid))
    confMatrices.append(conf_matrix)
  
  def LstmModel(self):
    """Perform LSTM algorithm and evaluate the model on validation or testing set according to case value"""
    X_train = self._dataset.GetSplittedDataset()[0]
   
    y_train = self._dataset.GetSplittedDataset()[2]
    X_valid = self._dataset.GetSplittedDataset()[1]
    y_valid = self._dataset.GetSplittedDataset()[3]

    dv.Pca3D(X_train,y_train)

    X_train = X_train.reshape(X_train.shape[0],1,X_train.shape[1])

    print(X_train.shape)
    print(sorted(Counter(y_train).items()))
    X_valid = X_valid.reshape(X_valid.shape[0],1,X_valid.shape[1])

    batch_size= 128
    hidden_size= 47
    num_epochs = 1000
    num_classes = 5

    y_train = np_utils.to_categorical(y_train,num_classes)
    y_valid = np_utils.to_categorical(y_valid,num_classes)

    print(X_train.shape)
    model = Sequential()
    model.add(LSTM(units=hidden_size, input_shape=(X_train.shape[1], X_train.shape[2])))
    model.add(Dropout(0.2))
    model.add(Dense(units = num_classes))
    model.add(Activation('softmax'))

    opt = Adam(lr=0.001)

    model.compile(loss='categorical_crossentropy', optimizer=opt, metrics=['accuracy'])
    print(model.summary())

    #earlyStop=EarlyStopping(monitor="val_accuracy",verbose=1,mode='max',patience=10)
    earlystopping = callbacks.EarlyStopping(monitor ="val_loss",  
                                        mode ="min", patience = 5,  
                                        restore_best_weights = True) 
    history = model.fit(X_train, y_train, epochs=num_epochs, batch_size=batch_size, validation_data=(X_valid, y_valid), verbose=1)#, callbacks =[earlystopping])
    model2 = model
    y_pred = model2.predict(X_valid, batch_size=batch_size, verbose=1)

    score = model.evaluate(X_valid,y_valid, verbose=2,batch_size=batch_size)

    acc = np.mean(y_pred.argmax(axis = -1) == y_valid.argmax(axis=-1))

    self._mLModel.SetModel(model)
    self._mLModel.SetModelPreds(y_pred)
    self._mLModel.SetModelAcc(acc)
    self._viewMlModel.ClassificationReportLSTM()
    self._viewMlModel.ConfMatLSTM()

    self._viewMlModel.PlotLearningCurveLSTM(history)

  def SVMModel(self,case=1):
    """Perform SVM algorithm and evaluate the model on validation or testing set according to case value"""
    if case == 0:
      X = self._dataset.Get2DNormInputArray()
    else:
      X = self._dataset.Get2DNormBpArray()
    
    y = self._dataset.GetLabelArray()
    cv = StratifiedKFold(n_splits=5, shuffle=True, random_state = 42)

    scores = []
    scores_train = []
    confMatrices = []
    y_preds = []
    
    Cs = [0.01,0.1,1,10,100]
    final_scores_train = []
    final_scores = []
    gammas = 100

    for C in Cs:
      for train_idx, valid_idx, in cv.split(X, y):
        X_train, y_train = X[train_idx], y[train_idx]
        X_valid, y_valid = X[valid_idx], y[valid_idx]
        self.PerformSVM1CV(X,y,X_train,y_train,X_valid,y_valid,gammas,C,scores_train,scores,y_preds,confMatrices)
      
      self.SaveFinalModelResults(y_preds,scores,scores_train,confMatrices)
      self._viewMlModel.ConfMatCV()
      final_scores_train.append(scores_train)
      final_scores.append(scores)
      scores = []
      scores_train = []
    
    self.SaveFinalModelResults(y_preds,final_scores,final_scores_train,confMatrices)
    self._viewMlModel.ValidationCurve(Cs,"C")

  def PerformSVM1CV(self,X,y,X_train,y_train,X_valid,y_valid,gamma,C,scores_train,scores,y_preds,confMatrices):
    """Perform SVM algorithm for 1 CV"""

    #Oversample data
    smt = SMOTE(random_state=0)
    X_train, y_train = smt.fit_resample(X_train, y_train)

    print(X_train.shape)
    print(sorted(Counter(y_train).items()))

    svm = make_pipeline(SVC(gamma=gamma, C = C))
    svm.fit(X_train, y_train)

    score2 = svm.score(X_train, y_train)
    print("Training score: ",score2)
    scores_train.append(score2)

    y_pred=svm.predict(X_valid)
    score = metrics.accuracy_score(y_valid,y_pred)
    scores.append(score)

    self.SaveInterModelResults(svm,y_pred,score,X_train,X_valid,y_train,y_valid)
    self._viewMlModel.ClassificationReport()

    #self._viewMlModel.ConfMat()
    y_preds.append(y_pred)
    conf_matrix = confusion_matrix(y_valid, svm.predict(X_valid))
    confMatrices.append(conf_matrix)
    #self._viewMlModel.plot_learning_curve()

In [None]:
mld = MLModelController(Dataset=dataset)

In [None]:
mld.KnnModel(case=1)

In [None]:
mld.LstmModel()

In [None]:
mld.SVMModel(case=1)