In [None]:
from common import Trial
from typing import List
from tqdm import tqdm
import re
import traceback
import numpy as np
import os
import pandas as pd
import sys
from sklearn.model_selection import train_test_split
import keras
from keras.layers import Dense, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.models import Sequential
import matplotlib.pylab as plt



In [None]:
channel_map = {'FP1': 0,
 'FP2': 1,
 'F7': 2,
 'F8': 3,
 'AF1': 4,
 'AF2': 5,
 'FZ': 6,
 'F4': 7,
 'F3': 8,
 'FC6': 9,
 'FC5': 10,
 'FC2': 11,
 'FC1': 12,
 'T8': 13,
 'T7': 14,
 'CZ': 15,
 'C3': 16,
 'C4': 17,
 'CP5': 18,
 'CP6': 19,
 'CP1': 20,
 'CP2': 21,
 'P3': 22,
 'P4': 23,
 'PZ': 24,
 'P8': 25,
 'P7': 26,
 'PO2': 27,
 'PO1': 28,
 'O2': 29,
 'O1': 30,
 'X': 31,
 'AF7': 32,
 'AF8': 33,
 'F5': 34,
 'F6': 35,
 'FT7': 36,
 'FT8': 37,
 'FPZ': 38,
 'FC4': 39,
 'FC3': 40,
 'C6': 41,
 'C5': 42,
 'F2': 43,
 'F1': 44,
 'TP8': 45,
 'TP7': 46,
 'AFZ': 47,
 'CP3': 48,
 'CP4': 49,
 'P5': 50,
 'P6': 51,
 'C1': 52,
 'C2': 53,
 'PO7': 54,
 'PO8': 55,
 'FCZ': 56,
 'POZ': 57,
 'OZ': 58,
 'P2': 59,
 'P1': 60,
 'CPZ': 61,
 'nd': 62,
 'Y': 63}

In [None]:
with open('eeg_full/co2a0000364.txt', 'r') as f:
    file_content = f.read()
    
print(file_content[:200])


In [None]:
trials = list(filter(lambda string: len(string) > 0, re.split(r'^# co\w{9}.rd', file_content, flags=re.MULTILINE)))
trials[0]

trial = trials[0]
subject_id = file_content[2:13]
if 'a' == subject_id[3]:
    subject_class = 1
elif 'c' == subject_id[3]:
    subject_class = 0
else:
    raise ValueError('Invalid subject class: ' + subject_id)
    
lines = trial.split('\n')
trial_type_str, trial_number = tuple(lines[3].split(' , trial'))
trial_number = int(trial_number)

if trial_type_str == '# S1 obj':
    trial_type = 0
elif trial_type_str == '# S2 match':
    trial_type = 1
elif trial_type_str == '# S2 nomatch':
    trial_type = 2
else:
    raise ValueError('Invalid trial_type_str: ' + trial_type_str)

lines = lines[4:]


In [None]:
measurements = [(channel_map[line.split()[1]], int(line.split()[2]), float(line.split()[3])) for line in lines if len(line) > 0 and line[0] != '#']
measurements = np.array(measurements)

print(measurements)

In [None]:
eeg = measurements[:, 2].reshape((64,256))
eeg = np.array([np.mean(eeg_channel.reshape(-1, 4), axis=1) for eeg_channel in eeg[::1]])
eeg = (eeg - np.min(eeg))/np.ptp(eeg)

In [None]:
eeg

In [None]:
Trial(subject_id, subject_class, trial_number, trial_type, eeg)

In [None]:
errors = 0
zeros = 0

def read_trials(eeg_file: str) -> List[Trial]:
    global errors, zeros
    
    with open(eeg_file, 'r') as f:
        file_content = f.read()
        
        subject_id = file_content[2:13]
        if 'a' == subject_id[3]:
            subject_class = 1
        elif 'c' == subject_id[3]:
            subject_class = 0
        else:
            raise ValueError('Invalid subject class: ' + subject_id)
        
        trials = []
        trials_str = list(filter(lambda string: len(string) > 0, re.split(r'^# co\w{9}.rd', file_content, flags=re.MULTILINE)))

        
        for trial in trials_str:
            try:
                lines = list(filter(lambda line: len(line) > 0, map(lambda line: line.strip(), trial.split('\n'))))
                trial_type_str, trial_number = tuple(lines[2].split(', trial'))
                trial_number = int(trial_number)

                if 'err' in trial_type_str:
                    print('Skipping trial |{}| from file |{}| due to error type'.format(trial_number, eeg_file))
                    errors = errors + 1
                    continue
                elif trial_type_str.startswith('# S1 obj'):
                    trial_type = 0
                elif trial_type_str.startswith('# S2 match'):
                    trial_type = 1
                elif trial_type_str.startswith('# S2 nomatch'):
                    trial_type = 2
                else:
                    raise ValueError('Invalid trial_type_str: ' + trial_type_str)

                lines = lines[4:]

                measurements = [(channel_map[line.split()[1]], int(line.split()[2]), float(line.split()[3])) for line in lines if len(line) > 0 and line[0] != '#']
                measurements = np.array(measurements)
                eeg = measurements[:, 2].reshape((64,256))
                eeg = np.array([np.mean(eeg_channel.reshape(-1, 4), axis=1) for eeg_channel in eeg[::1]])
                eeg = (eeg - np.min(eeg))/np.ptp(eeg)
                

                if np.count_nonzero(eeg) == 0:
                    print('Skipping trial |{}| from file |{}| due to only 0 values'.format(trial_number, eeg_file))
                    zeros = zeros + 1
                    continue
                    
                trials.append(Trial(subject_id, subject_class, trial_number, trial_type, eeg))
            except:
                print(traceback.format_exc())
                print('Error in file: |{}|'.format(eeg_file))
                print('Error for: |{}|'.format(trial))
                print('Error for: |{}|'.format(lines[2]))
                raise
                
            
        return trials


In [None]:
all_trials = []

for file in tqdm(os.listdir('eeg_full')):
    all_trials = all_trials + read_trials('eeg_full/' + file)
    
print('Good trials: {}, Error trials: {}, Zeros trials: {}'.format(len(all_trials), errors, zeros))

In [None]:
df = pd.DataFrame.from_records([trial.to_dict() for trial in all_trials])
df = df.sample(frac=1).reset_index(drop=True)

In [None]:
sys.getsizeof(df)

In [None]:
df.head()

In [None]:
df.describe()

In [None]:
X = df['eeg'].values
y = df['subject_class'].values

# keras required format
X = np.rollaxis(np.dstack(X), -1)
X = X.reshape(X.shape[0], 64, 64, 1)
y = keras.utils.to_categorical(y, 2)

In [None]:
X.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

In [None]:
input_shape = (64, 64, 1)
num_classes = 2
batch_size=128
epochs=50

model = Sequential()
model.add(Conv2D(32, kernel_size=(5, 5), strides=(1, 1),
                 activation='relu',
                 input_shape=input_shape))
model.add(MaxPooling2D(pool_size=(2, 2), strides=(2, 2)))
model.add(Conv2D(64, (5, 5), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Flatten())
model.add(Dense(1000, activation='relu'))
model.add(Dense(num_classes, activation='softmax'))


In [None]:
model.compile(loss=keras.losses.categorical_crossentropy,
              optimizer=keras.optimizers.Adam(),
              metrics=['accuracy'])

model.summary()

In [None]:
class AccuracyHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.acc = []

    def on_epoch_end(self, batch, logs={}):
        self.acc.append(logs.get('val_acc'))
        
history = AccuracyHistory()


In [None]:
model.fit(X_train, y_train,
          batch_size=batch_size,
          epochs=epochs,
          verbose=1,
          validation_data=(X_test, y_test),
          callbacks=[history])


score = model.evaluate(X_test, y_test, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

plt.plot(range(1,len(history.acc) + 1), history.acc)
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.show()

In [None]:

plt.plot(range(1,len(history.acc) + 1), history.acc)
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.show()