# DREAMER

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from keras.models import Model
from keras.layers import Activation, AvgPool1D, Dense, Conv1D, Flatten, Dropout, Input, BatchNormalization, GlobalMaxPool1D, MaxPool1D, SpatialDropout1D, GlobalAvgPool1D
from keras.optimizers import Adam
from keras.utils import np_utils
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix
from scipy import signal
import pickle as pkl

In [None]:
sampling_rate = 128
window_size = 1280
overlap = 256
channel_len = 14
classes = 2

bands = {'delta': [0.5/(sampling_rate/2), 4/(sampling_rate/2)], 'theta': [4/(sampling_rate/2), 8/(sampling_rate/2)], \
         'alpha': [8/(sampling_rate/2), 14/(sampling_rate/2)], 'beta': [14/(sampling_rate/2), 30/(sampling_rate/2)], \
         'gamma': [30/(sampling_rate/2), 50/(sampling_rate/2)]}

## Preprocessing

In [None]:
def load_data(eeg_band = None):
  
  data = []
  val_label = []
  aro_label = []
  dom_label = []
  
  for person in range(1,24):
    
    # Navigating through file directory
    a = './Dreamer/person'
    b = '/eeg_samples/eeg'
    
    print('Person No. ' + str(person))

    # Loading Valence Label
    valence = pd.read_csv(a+str(person)+'/eeg_labels/valence.csv', header = None)
    valence = valence.values.ravel()
    
    # Loading Arousal Label
    arousal = pd.read_csv(a+str(person)+'/eeg_labels/arousal.csv', header = None)
    arousal = arousal.values.ravel()
    
    # Loading Dominance Label
    dominance = pd.read_csv(a+str(person)+'/eeg_labels/dominance.csv', header = None)
    dominance = dominance.values.ravel()
    
    # Assigning classes
    valence[valence<=3]=0
    valence[valence>3]=1  
    arousal[arousal<=3]=0
    arousal[arousal>3]=1
    dominance[dominance<=3]=0
    dominance[dominance>3]=1

    # Preprocessing
    for i in range(1,19):
      eeg = pd.read_csv(a+str(person)+b+str(i)+'.csv', header=None)
      eeg = eeg.values

      num, den = signal.butter(4, bands[eeg_band], 'band') # Butterworth filter of order N = 4
      band_signal = signal.filtfilt(num, den, eeg, axis=0)
      eeg = band_signal
      del band_signal, num, den
      
      scaler = StandardScaler().fit(eeg)
      scaled_eeg = scaler.transform(eeg)
      del eeg
      
      # Segmenting into 10 seconds (1280 timesteps) windows with 2 seconds (256 timesteps) overlap
      start = 0
      while start+window_size < scaled_eeg.shape[0]:
        data.append(scaled_eeg[start:start+window_size,:])
        val_label.append(valence[i-1])
        aro_label.append(arousal[i-1])
        dom_label.append(dominance[i-1])
        start = start+overlap
      del scaled_eeg

  data = np.array(data, dtype = np.float32) # Using 32 bit floating point value to save memory
  val_label = np.array(val_label, dtype = np.int8)
  aro_label = np.array(aro_label, dtype = np.int8)
  dom_label = np.array(dom_label, dtype = np.int8)

  print(val_label.shape, val_label[val_label == 0].shape, val_label[val_label == 1].shape)
  print(aro_label.shape, aro_label[aro_label == 0].shape, aro_label[aro_label == 1].shape)
  print(dom_label.shape, dom_label[dom_label == 0].shape, dom_label[dom_label == 1].shape)

  val_label = np_utils.to_categorical(val_label)
  aro_label = np_utils.to_categorical(aro_label)
  dom_label = np_utils.to_categorical(dom_label)      

  
  return (data, val_label, aro_label, dom_label)

In [None]:
FOLD = 10 # Number of folds
eeg_band = 'theta' # EEG band name

data, valence, arousal, dominance = load_data(eeg_band) # Loading processed data

nb_samples = data.shape[0] # Number of samples
factor = nb_samples//FOLD # Kth fold by this factor

shuffler = np.random.permutation(nb_samples) # Shuffling data

data = data[shuffler]
valence = valence[shuffler]
arousal = arousal[shuffler]
dominance = dominance[shuffler]

## CNN Model

In [None]:
eeg_input = Input(shape = (window_size, channel_len), name='eeg_input') # Input layer

# CNN model
def get_CNN():
  x = Conv1D(filters = 32, kernel_size = 5, strides = 2, padding = 'valid', activation='relu', name='conv1')(eeg_input)
  x = Conv1D(filters = 32, kernel_size = 5, strides = 2, padding = 'valid', activation='relu', name='conv2')(x)
  x = AvgPool1D(pool_size=2, name='avg_pool1')(x)
  x = BatchNormalization(name='batch_norm1')(x)
  x = SpatialDropout1D(rate=0.125, name = 'spatial_dropout1')(x)
  x = Conv1D(filters = 64, kernel_size = 5, strides = 2, padding = 'valid', activation='relu', name='conv3')(x)
  x = Conv1D(filters = 64, kernel_size = 5, strides = 2, padding = 'valid', activation='relu', name='conv4')(x)
  x = AvgPool1D(pool_size=2, name='avg_pool2')(x)
  x = BatchNormalization(name='batch_norm2')(x)
  x = SpatialDropout1D(rate=0.25, name = 'spatial_dropout2')(x)
  x = Conv1D(filters = 128, kernel_size = 3, strides = 1, padding = 'valid', activation='relu', name='conv5')(x)
  x = Conv1D(filters = 128, kernel_size = 3, strides = 1, padding = 'valid', activation='relu', name='conv6')(x)
  x = AvgPool1D(pool_size=2, name='avg_pool3')(x)
  x = BatchNormalization(name='batch_norm3')(x)
  x = SpatialDropout1D(rate=0.5, name = 'spatial_dropout3')(x)
  x = Conv1D(filters = 512, kernel_size = 3, strides = 1, padding = 'valid', activation='relu', name='conv7')(x)
  x = Conv1D(filters = 512, kernel_size = 3, strides = 1, padding = 'valid', activation='relu', name='conv8')(x)
  x = GlobalAvgPool1D(name='global_pool1')(x)
  x = BatchNormalization(name='batch_norm4')(x)
  x = Dropout(0.5)(x)
  x = Dense(64)(x)
  x = Activation('tanh')(x)
  x = Dense(8)(x)
  x = Activation('tanh')(x)
  x = Dropout(0.25, )(x)
  return x

def get_model():
  x = get_CNN()

  out = Dense(classes, activation='softmax', name = 'output')(x) # Output layer

  model = Model(inputs=eeg_input, outputs=out) # Creating a model instance
  
  adam = Adam(lr=1e-3,decay=1e-5) # Adam optimizer
  
  model.compile(optimizer=adam,loss='categorical_crossentropy',metrics=['categorical_accuracy']) # Compiling model
  model.summary()
  
  return model


## Training

### Valence 

In [None]:
val_res = {'accuracy': [], 'confusion_matrix': []}

for i in range(FOLD):
  X_train = np.concatenate((data[0 : i*factor], data[(i+1)*factor : nb_samples])) # Training data
  X_test = data[i*factor : (i+1)*factor] # Testing data
  val_train = np.concatenate((valence[0 : i*factor], valence[(i+1)*factor : nb_samples])) # Valence training labels
  val_test = valence[i*factor : (i+1)*factor] # Valence testing labels

  model = get_model()
  model.fit(X_train, val_train, epochs = 100, batch_size = 1024, shuffle = True) 
  
  acc = model.evaluate(X_test, val_test)
  print(acc)

  val_res['accuracy'].append(acc)

  pred = model.predict(X_test)
  val_res['confusion_matrix'].append(confusion_matrix(val_test.argmax(1), pred.argmax(1)))

    
# Dumping valence results
file = './eeg_data/Dreamer_valence_' + eeg_band + '.pkl'

with open(file, 'wb') as f:
  pkl.dump(val_res, f)


In [None]:
for i in val_res['accuracy']:
  print(round(i[1]*100, 2)) # Rounding off to two decimal places

In [None]:
for i in val_res['confusion_matrix']:
  print(i)

### Arousal

In [None]:
aro_res = {'accuracy': [], 'confusion_matrix': []}

for i in range(FOLD):
  X_train = np.concatenate((data[0 : i*factor], data[(i+1)*factor : nb_samples])) # Training data
  X_test = data[i*factor : (i+1)*factor] # Testing data
  aro_train = np.concatenate((arousal[0 : i*factor], arousal[(i+1)*factor : nb_samples])) # Arousal training labels
  aro_test = arousal[i*factor : (i+1)*factor] # Arousal testing labels

  model = get_model()
  model.fit(X_train, aro_train, epochs = 100, batch_size = 1024, shuffle = True)

  acc = model.evaluate(X_test, aro_test)
  print(acc)
    
  aro_res['accuracy'].append(acc)

  pred = model.predict(X_test)
  aro_res['confusion_matrix'].append(confusion_matrix(aro_test.argmax(1), pred.argmax(1)))


# Dumping arousal results
file = './eeg_data/Dreamer_arousal_' + eeg_band + '.pkl'

with open(file, 'wb') as f:
  pkl.dump(aro_res, f)


In [None]:
for i in aro_res['accuracy']:
  print(round(i[1]*100, 2)) # Rounding off to two decimal places

In [None]:
for i in aro_res['confusion_matrix']:
  print(i)

### Dominance

In [None]:
dom_res = {'accuracy': [], 'confusion_matrix': []}

for i in range(FOLD):
  X_train = np.concatenate((data[0 : i*factor], data[(i+1)*factor : nb_samples])) # Training data
  X_test = data[i*factor : (i+1)*factor] # Testing data
  dom_train = np.concatenate((dominance[0 : i*factor], dominance[(i+1)*factor : nb_samples])) # Dominance training labels
  dom_test = dominance[i*factor : (i+1)*factor] # Dominance testing labels

  model = get_model()
  model.fit(X_train, dom_train, epochs = 100, batch_size = 1024, shuffle = True)

  acc = model.evaluate(X_test, dom_test)
  print(acc)
    
  dom_res['accuracy'].append(acc)

  pred = model.predict(X_test)
  dom_res['confusion_matrix'].append(confusion_matrix(dom_test.argmax(1), pred.argmax(1)))


# Dumping dominance results
file = './eeg_data/Dreamer_dominance_' + eeg_band + '.pkl'

with open(file, 'wb') as f:
  pkl.dump(dom_res, f)


In [None]:
for i in dom_res['accuracy']:
  print(round(i[1]*100, 2)) # Rounding off to two decimal places

In [None]:
for i in dom_res['confusion_matrix']:
  print(i)