In [None]:
import os
import numpy as np
from google.colab import drive
from scipy.io import loadmat
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.model_selection import cross_val_score
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from sklearn.preprocessing import OneHotEncoder

In [None]:
drive.mount('/content/drive')
os.chdir('/content/drive/MyDrive/BCI')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!pip install mne



In [None]:
from src.erp import get_erp_features
from src.mi import MI
from src.ssvep import get_ssvep_features

In [None]:
def load_data(ses, sub, par):
    # example: 'data/session1/s1/sess01_subj01_EEG_ERP.mat'
    if len(str(sub))>1:
        filepath = f'data/session{ses}/s{sub}/sess0{ses}_subj{sub}_EEG_{par}.mat'
    else:  
        filepath = f'data/session{ses}/s{sub}/sess0{ses}_subj0{sub}_EEG_{par}.mat'

    train_data = loadmat(filepath)[f'EEG_{par}_train'][0][0]
    test_data = loadmat(filepath)[f'EEG_{par}_test'][0][0]
    return train_data, test_data

In [None]:
def select_trials(data_t, data_y, num_trials=100):
    sampled_indices = np.random.choice(len(data_t[0]), num_trials, replace=False)
    return data_t[0][sampled_indices], data_y[0][sampled_indices]

def get_data_xyt(train_data, test_data):
    train_t, train_y = select_trials(train_data['t'], train_data['y_dec'], 100)
    test_t, test_y = select_trials(test_data['t'], test_data['y_dec'], 100)
    test_t += train_data['x'].shape[0]  # due to concatenation

    data_x = np.concatenate((train_data['x'], test_data['x']))
    data_y = np.concatenate((train_y, test_y))
    data_t = np.concatenate((train_t, test_t))
    return data_x, data_y, data_t

In [None]:
class Features():
    def __init__(self):
        self.mi = None

    def get_features(self, data_x, data_t, data_y=None):
        if data_y is not None:    
            assert(len(data_y)==len(data_t))
            self.mi = MI()
            mi_features = self.mi.get_mi_features(data_x, data_t, data_y)
        else:
            if self.mi is None:
                raise Exception("Give MI data first to train CSP")
            else:
                mi_features = self.mi.get_mi_features(data_x, data_t)
        erp_features = get_erp_features(data_x, data_t)
        ssvep_features = get_ssvep_features(data_x, data_t)
        features = np.concatenate((mi_features, erp_features, ssvep_features), axis=1)
        return features

In [None]:
def classify_LDA(train_x, train_y, test_x, test_y):
    clf = LDA()
    clf.fit(train_x, train_y)
    acc = clf.score(test_x, test_y)
    return acc

In [None]:
def classify_DNN(train_x, train_y, test_x, test_y):
    onehot_encoder = OneHotEncoder(sparse=False)
    train_y_hot = onehot_encoder.fit_transform(train_y.reshape(len(train_y), 1))
    test_y_hot = onehot_encoder.fit_transform(test_y.reshape(len(test_y), 1))

    model = tf.keras.models.Sequential()
    input_size = train_x.shape[1]
    output_size = train_y_hot.shape[1]
    model.add(Dense(input_size, activation='tanh'))
    model.add(Dense(50, activation='tanh'))
    model.add(Dense(output_size, activation='softmax'))
    model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

    history = model.fit(train_x, train_y_hot, batch_size=8, epochs=50, verbose=0)
    scores = model.evaluate(test_x, test_y_hot, verbose=0)
    loss, accuracy = scores[0], scores[1]
    return accuracy

In [None]:
def cross_val(mi_data, erp_data, ssvep_data, kfolds=5):
    # ERP and SSVEP don't need any correction due to CV, so they are already samples with features
    data_mi_x, data_mi_y, data_mi_t = mi_data                   # raw MI data, since we need to train CSP for each fold in CV
    data_erp_x, data_erp_y, data_erp_t = erp_data               # raw ERP data
    data_ssvep_x, data_ssvep_y, data_ssvep_t = ssvep_data       # raw SSVEP data

    assert(len(data_mi_t)==len(data_erp_t))
    assert(len(data_mi_t)==len(data_ssvep_t))

    fold_len = int(len(data_mi_t)/kfolds)                       # 200/5 = 40
    indices = np.split(np.random.choice(len(data_mi_t), size=len(data_mi_t), replace=False), kfolds)

    lda_cv_scores, dnn_cv_scores = [], []
    for k in range(0, kfolds):
        # data division part
        #####
        inds = indices.copy()
        test_inds = np.array(inds.pop(k)).flatten()
        train_inds = np.array(inds).flatten()
        
        ft = Features()

        mi_train_y, mi_train_t = data_mi_y[train_inds], data_mi_t[train_inds]
        mi_test_y, mi_test_t = data_mi_y[test_inds], data_mi_t[test_inds]
        mi_train_x = ft.get_features(data_mi_x, mi_train_t, mi_train_y)   # here we get features by training CSP
        mi_test_x = ft.get_features(data_mi_x, mi_test_t)                  # here we get features using trained CSP
        
        erp_x = ft.get_features(data_erp_x, data_erp_t)
        erp_train_x, erp_train_y = erp_x[train_inds], data_erp_y[train_inds]
        erp_test_x, erp_test_y = erp_x[test_inds], data_erp_y[test_inds]

        ssvep_x = ft.get_features(data_ssvep_x, data_ssvep_t)
        ssvep_train_x, ssvep_train_y = ssvep_x[train_inds], data_ssvep_y[train_inds]
        ssvep_test_x, ssvep_test_y = ssvep_x[test_inds], data_ssvep_y[test_inds]
        
        ##
        train_x = np.concatenate((erp_train_x, ssvep_train_x, mi_train_x))              # concatenated features
        train_y = np.concatenate((erp_train_y, ssvep_train_y + 4, mi_train_y + 2))      # concatenated labels

        test_x = np.concatenate((erp_test_x, ssvep_test_x, mi_test_x))                  # concatenated features
        test_y = np.concatenate((erp_test_y, ssvep_test_y + 4, mi_test_y + 2))          # concatenated labels
        #####
        
        # classifier part
        # can add another classifiers here
        #####
        lda_accuracy = classify_LDA(train_x, train_y, test_x, test_y)
        dnn_accuracy = classify_DNN(train_x, train_y, test_x, test_y)
        lda_cv_scores.append(lda_accuracy)
        dnn_cv_scores.append(dnn_accuracy)
        #####

    lda_average = np.average(lda_cv_scores)
    dnn_average = np.average(dnn_cv_scores)

    return lda_average, dnn_average

In [None]:
def main_loop(sessions, subjects):
    accuracies = {}
    for ses in sessions:
        accuracies[ses] = {}
        for sub in subjects:
            accuracies[ses][sub] = {}
            # MI
            train_mi_data, test_mi_data = load_data(ses, sub, 'MI')
            mi_data = get_data_xyt(train_mi_data, test_mi_data)
            
            # ERP
            train_erp_data, test_erp_data = load_data(ses, sub, 'ERP')
            erp_data = get_data_xyt(train_erp_data, test_erp_data)
            
            # SSVEP
            train_ssvep_data, test_ssvep_data = load_data(ses, sub, 'SSVEP')
            ssvep_data = get_data_xyt(train_ssvep_data, test_ssvep_data)

            # Cross Validation
            lda, dnn = cross_val(mi_data, erp_data, ssvep_data, kfolds=5)
            accuracies[ses][sub]['LDA average'] = lda
            accuracies[ses][sub]['DNN average'] = dnn
            print(f'Session {ses}, Subject {sub}, LDA {lda}, DNN {dnn}')
    return accuracies

In [None]:
sessions = ['1', '2']
subjects = ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10']

In [None]:
accs = main_loop(sessions, subjects)

Session 1, Subject 1, LDA 0.635




Session 1, Subject 2, LDA 0.7766666666666667
Session 1, Subject 3, LDA 0.745
Session 1, Subject 4, LDA 0.6950000000000001
Session 1, Subject 5, LDA 0.7683333333333333
Session 1, Subject 6, LDA 0.735
Session 1, Subject 7, LDA 0.76
Session 1, Subject 8, LDA 0.7
Session 1, Subject 9, LDA 0.6833333333333333
