## Tutorial #5: Applying Machine Learning Methods to EEG Data on Group Level

In this tutorial, same classification tasks in Tutorial-4 will be examined but this time analysis will be done on group level.

##### Dataset: 
The the previous tutorial data of the only one participant in 'Emotion-Antecedent Appraisal Checks: EEG and EMG data sets for Novelty and Pleasantness' is used. In this tutorial, all participans will be included

In [1]:
import mne
from os.path import isfile, join
from os import listdir
import numpy as np
from mne.decoding import Vectorizer

from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.model_selection import cross_val_score, train_test_split, GridSearchCV, StratifiedKFold, cross_val_predict
from sklearn.metrics import precision_recall_fscore_support, accuracy_score

from scipy import stats

# Models
from sklearn import svm
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.linear_model import LogisticRegression

In [2]:
#Load Dataset
data_folder = '../../study1/study1_eeg/epochdata/'
files = [data_folder+f for f in listdir(data_folder) if isfile(join(data_folder, f))]
ids = [int(f[-6:-4]) for f in files]

numberOfEpochs = np.zeros((len(ids), 3))
# Read the EEG epochs:
epochs_all_UN, epochs_all_UP, epochs_all_NP = [], [], []
for f in range(len(files)):
    epochs = mne.read_epochs(files[f], verbose=False)
    epochs_UN = epochs['FU', 'FN']
    epochs_UP = epochs['FU', 'FP']
    epochs_NP = epochs['FN', 'FP']
    numberOfEpochs[f,0] = int(len(epochs_UN.events))
    numberOfEpochs[f,1] = int(len(epochs_UP.events))
    numberOfEpochs[f,2] = int(len(epochs_NP.events))
    UN, UP, NP = [ids[f]], [ids[f]], [ids[f]]
    UN.append(epochs_UN)
    UP.append(epochs_UP)
    NP.append(epochs_NP)
    epochs_all_UN.append(UN)
    epochs_all_UP.append(UP)
    epochs_all_NP.append(NP)

#print(numberOfEpochs)
epochs_all_UN = np.array(epochs_all_UN)
epochs_all_UP = np.array(epochs_all_UP)
epochs_all_NP = np.array(epochs_all_NP)

  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = mne.read_epochs(files[f], verbose=False)
  epochs = m

In [3]:
print('Shape of epochs_UN: {}'.format(epochs_all_UN.shape))
print('Shape of epochs_UP: {}'.format(epochs_all_UP.shape))
print('Shape of epochs_NP: {}'.format(epochs_all_NP.shape))

Shape of epochs_UN: (25, 2)
Shape of epochs_UP: (25, 2)
Shape of epochs_NP: (25, 2)


In [4]:
def getData_labels(epochs):
    data, labels, ids = [], [], []
    for p in epochs:
        tmp_epoch = p[1]
        tmp_labels = tmp_epoch.events[:,-1]
        labels.extend(tmp_labels)
        tmp_id = p[0]
        ids.extend([tmp_id]*len(tmp_labels))
        data.extend(tmp_epoch.get_data())
        
    data = np.array(data)
    labels = np.array(labels)
    ids = np.array(ids)
    return data, labels, ids

### Example #1:  Classification between Unpleasant and Pleasant Events

In [5]:
data_UP, labels_UP, ids_UP = getData_labels(epochs_all_UP)

In [6]:
def handleMissingValues(data, labels):
    for d in range(len(data)):
        if np.all(np.isfinite(data[d])) == False:
            print(d)
            data[d] = np.nan_to_num(data[d])
        if np.any(np.isnan(data[d])) == True:
            data[d] = np.nan_to_num(data[d])
    data = data.astype('float64')
    labels = labels.astype('float64')
    return data, labels

In [10]:
def calculate_score_groupLevel(ids, predictions, labels):
    print('ids:')
    print(ids)
    unique_ids = list(set(ids))
    unique_ids.sort()
    print('Unique Ids: ')
    print(unique_ids)
    results = []
    for id in unique_ids:
        indices = [i for i, x in enumerate(ids) if x == id]
        res = 0
        print('lenght of indices: ', len(indices))
        for i in range(len(indices)):
            if predictions[indices[i]] == labels[indices[i]]:
                res += 1
        print('res: ', str(res))
        results.append(res/len(indices))
    
    return results, unique_ids

Create all models and then run cross validation for all of them for comparing their performances.

In [11]:
# Linear Discriminant Analysis
clf_lda_pip = make_pipeline(Vectorizer(), StandardScaler(), LinearDiscriminantAnalysis(solver='svd'))
#Logistic Regression
clf_lr_pip = make_pipeline(Vectorizer(), StandardScaler(), LogisticRegression(penalty='l1', random_state=42))

models = [ clf_lr_pip] #, clf_lda_pip]
model_names = [ 'LR'] #, 'LDA'] 

In [None]:
data_UP, labels_UP = handleMissingValues(data_UP, labels_UP)
results = []
for i in range(len(models)):
    print(model_names[i])
    kfold = StratifiedKFold(n_splits=2, random_state=42)
    if np.all(np.isfinite(data_UP)) == True and np.any(np.isnan(data_UP)) == False:
        predictions_UP = cross_val_predict(models[i], data_UP, labels_UP, cv=kfold)
        print('Predictions: ')
        print(predictions_UP)
        print('True labels: ')
        print(labels_UP)
        cv_accuracy, unique_ids = calculate_score_groupLevel(ids_UP, predictions_UP, labels_UP)
        results.append(cv_accuracy)
        print('Model ' + model_names[i] + ': ' + str(cv_accuracy))
    else:
        print('Data has infinite or NaN value!')    

LR


  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
  sqr = np.multiply(arr, arr, out=arr)
  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)
  return ufunc.reduce(obj, axis, dtype, out, **passkwargs)


In [None]:
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline

def plotModelComparison(results, model_names):
    fig = plt.figure()
    fig.suptitle('Model Comparison')
    ax = fig.add_subplot(111)
    plt.boxplot(results)
    ax.set_xticklabels(model_names)
    plt.show()
#plotModelComparison(results, model_names)

In [None]:
import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline
def plotCVScores_perParticipant(unique_ids, results, model_names):
    
    fig, ax = plt.subplots()
    fig.suptitle('CV accuracy Scores per Participant')
    for i in range(len(results)):
        ax.plot(ids, results[i], label=model_names[i])
    plt.xlabel('Participant IDs')
    plt.ylabel('Accuracy')
    ax.legend()
    plt.show()


In [None]:
plotCVScores_perParticipant(unique_ids, results, model_names)

Independent t-test is applied to statistically show whether there is a significant difference between performace of two models. Small p-value means that performace of models are significantly different, large p-value indicates that models are performing similar. In general, as the threshold for determining whether two distributions are different or not p = 0.05 is preffered.

In [None]:
def applyTTest(results, model_names):
    if len(results) < 2:
        print('Not enough values for t-test!')
    else:
        for i in range(len(results)):
            for j in range(len(results)):
                if i != j:
                    t, p = stats.ttest_ind(results[i],results[j])
                    print("p = {0} for t-test between {1} and {2}".format(p,  model_names[i],  model_names[j])
                    
   

In [None]:
applyTTest(results, model_names)

Since p-value is larger than 0.05, we can conclude that there is no significant difference between performance of LDA and performance of LR on the task of classification between unpleasant and pleasant events.

### Example #2:  Classification between Unpleasant and Pleasant Events

In [None]:
# Dataset with unpleasant and neutral events
data_UP, labels_UP = getData_labels(epochs_all_UP)
print(data_UP.shape)
print(labels_UP.shape)

In [None]:
train_data_UP, test_data_UP, labels_train_UP, labels_test_UP = train_test_split(data_UP, labels_UP, test_size=0.3, random_state=42)
print(train_data_UP.shape)
print(test_data_UP.shape)

In [None]:
# Linear Discriminant Analysis
clf_lda_pip = make_pipeline(Vectorizer(), StandardScaler(), LinearDiscriminantAnalysis(solver='svd'))
clf_lda_pip.fit(train_data_UP,labels_train_UP)

predictions_lda = clf_lda_pip.predict(test_data_UP)

acc_lda = accuracy_score(labels_test_UP, predictions_lda)
print("Accuracy of LDA model: {}".format(acc_lda))

precision_lda,recall_lda,fscore_lda,support_lda=precision_recall_fscore_support(labels_test_UP,predictions_lda,average='macro')
print('Precision: {0}, Recall: {1}, f1-score:{2}'.format(precision_lda,recall_lda,fscore_lda))

In [None]:
#Logistic Regression
clf_lr_pip = make_pipeline(Vectorizer(), StandardScaler(), LogisticRegression(penalty='l1', random_state=42))
clf_lr_pip.fit(train_data_UP,labels_train_UP)

predictions_lr = clf_lr_pip.predict(test_data_UP)

acc_lr = accuracy_score(labels_test_UP, predictions_lr)
print("Accuracy of LR model: {}".format(acc_lr))

precision_lr,recall_lr,fscore_lr,support_lra=precision_recall_fscore_support(labels_test_UP,predictions_lr,average='macro')
print('Precision: {0}, Recall: {1}, f1-score:{2}'.format(precision_lr,recall_lr,fscore_lr))

In [None]:
accuracies.append([acc_lda, acc_lr])
f1_scores.append([fscore_lda, fscore_lr])

### Example #3: Classification between Pleasant and Neutral Events

In [None]:
# Dataset with unpleasant and neutral events
data_NP, labels_NP = getData_labels(epochs_all_NP)
print(data_NP.shape)
print(labels_NP.shape)

In [None]:
train_data_NP, test_data_NP, labels_train_NP, labels_test_NP = train_test_split(data_NP, labels_NP, test_size=0.3, random_state=42)
print(train_data_NP.shape)
print(test_data_NP.shape)

In [None]:
# Linear Discriminant Analysis
clf_lda_pip = make_pipeline(Vectorizer(), StandardScaler(), LinearDiscriminantAnalysis(solver='svd'))
clf_lda_pip.fit(train_data_NP,labels_train_NP)

predictions_lda = clf_lda_pip.predict(test_data_NP)

acc_lda = accuracy_score(labels_test_NP, predictions_lda)
print("Accuracy of LDA model: {}".format(acc_lda))

precision_lda,recall_lda,fscore_lda,support_lda=precision_recall_fscore_support(labels_test_NP,predictions_lda,average='macro')
print('Precision: {0}, Recall: {1}, f1-score:{2}'.format(precision_lda,recall_lda,fscore_lda))

In [None]:
#Logistic Regression
clf_lr_pip = make_pipeline(Vectorizer(), StandardScaler(), LogisticRegression(penalty='l1', random_state=42))
clf_lr_pip.fit(train_data_NP,labels_train_NP)

predictions_lr = clf_lr_pip.predict(test_data_NP)

acc_lr = accuracy_score(labels_test_NP, predictions_lr)
print("Accuracy of LR model: {}".format(acc_lr))

precision_lr,recall_lr,fscore_lr,support_lr=precision_recall_fscore_support(labels_test_NP,predictions_lr,average='macro')
print('Precision: {0}, Recall: {1}, f1-score:{2}'.format(precision_lr,recall_lr,fscore_lr))

In [None]:
accuracies.append([acc_lda, acc_lr])
f1_scores.append([fscore_lda, fscore_lr])

In [None]:
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

In [None]:
def plotEvalMetrics(tasks, labels, evalMetric, metricName):
    width = 0.2  # the width of the bars

    # Set position of bar on X axis
    rects1 = np.arange(len(evalMetric))
    rects2 = [x + width for x in rects1]
    rects3 = [x + width for x in rects2]

    plt.bar(rects1, list(zip(*evalMetric))[0], color='#87CEFA', width=width, edgecolor='white', label=labels[0])
    plt.bar(rects2, list(zip(*evalMetric))[1], color='#FFE4E1', width=width, edgecolor='white', label=labels[1])
    
    plt.xlabel('Classification Tasks')
    plt.xticks([r + width/2 for r in range(len(evalMetric))], tasks)
    plt.ylabel(metricName)

    plt.legend(bbox_to_anchor=(1.01, 1), loc='upper left', )
    plt.show()

In [None]:
#Plot Accuracies
tasks = ['UN', 'UP', 'NP']
labels = ['LDA', 'LR']
plotEvalMetrics(tasks, labels, accuracies, 'Accuracy')

In [None]:
#Plot F1 Scores
tasks = ['UN', 'UP', 'NP']
labels = ['LDA', 'LR']
plotEvalMetrics(tasks, labels, f1_scores, 'F1-Scores')

As part of group level analysis of eeg data, logistic regression (lr) and linear discriminant analysis (lda) are created as in the previous tutorial but this time svm is omitted because it requires hours to build with the amount of data we have. 

As both accuracy and f1 score plots demonstrated, logistic regression is performing better than lda on all tasks unlike the previous tutorial in which we analyze the data on individual participant level.