In [1]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
from datetime import date
import numpy as np
import mne as mne
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.metrics import confusion_matrix
import os
import scipy.io as sio
from sklearn.utils import shuffle

tf.config.run_functions_eagerly(True)

ELECTRODES_NUM = 35

today = date.today().strftime("%b-%d-%Y")
root_folder = f"data/{today}"

In [2]:
def product(*args, repeat=1):
    # product('ABCD', 'xy') --> Ax Ay Bx By Cx Cy Dx Dy
    # product(range(2), repeat=3) --> 000 001 010 011 100 101 110 111
    pools = [tuple(pool) for pool in args] * repeat
    result = [[]]
    for pool in pools:
        result = [x+[y] for x in result for y in pool]
    for prod in result:
        yield tuple(prod)

def plot_confusion_matrix(cm, class_names,flag):
    """
    Returns a matplotlib figure containing the plotted confusion matrix.
    
    Args:
       cm (array, shape = [n, n]): a confusion matrix of integer classes
       class_names (array, shape = [n]): String names of the integer classes
    """
    
    figure = plt.figure(figsize=(8, 8))
    plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
    plt.title("Confusion matrix")
    plt.colorbar()
    tick_marks = np.arange(len(class_names))
    plt.xticks(tick_marks, class_names, rotation=45)
    plt.yticks(tick_marks, class_names)
    
    # Normalize the confusion matrix.
    cm_norm = np.around(cm.astype('float') / cm.sum(axis=1)[:, np.newaxis], decimals=2)
    
    # Use white text if squares are dark; otherwise black.
    threshold = cm_norm.max() / 2.
    
    for i, j in product(range(cm_norm.shape[0]), range(cm_norm.shape[1])):
        color = "white" if cm_norm[i, j] > 0.34 else "black"
        plt.text(j, i, '({:.0f}%)\n {}'.format(cm_norm[i, j]*100, cm[i,j]), horizontalalignment="center", color=color, linespacing=3, fontsize='large')

    plt.tight_layout()
    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    return figure

def one_hot_encoder(labels, classes_num = 3):
    epochs_num = labels.shape[0]
    labels_onehot = np.zeros((epochs_num, classes_num))
    for i in range(epochs_num):
        labels_onehot[i, int(labels[i]-1)] = 1
    return labels_onehot

def print_confusion_matrix(model, data, true_labels_onehot):
    ###### Evaluation
    prediction = model.predict(data)
    y_hat = np.zeros(np.shape(true_labels_onehot))
    for i in range(np.shape(y_hat)[0]):
        y_hat[i,np.argmax(prediction[i,:])] = 1
    delta = np.sum(abs(y_hat - true_labels_onehot), axis=1)
    accuracy = len(delta[delta==0])/len(delta)*100

    # Calculating Confusion Matrix
    y_pred = np.argmax(y_hat, axis=1)
    y_true = np.argmax(true_labels_onehot, axis=1)
    cm = confusion_matrix(y_true, y_pred)
    
    class_names=["Right","Left","No Movement"]
    figure = plot_confusion_matrix(cm, class_names=class_names ,flag=1)

In [3]:
path = 'data/igor/'
data_right = sio.loadmat(os.path.join(path,'0.mat'))["EEG"][0,0]["data"]
data_left = sio.loadmat(os.path.join(path,'1.mat'))["EEG"][0,0]["data"]
data_nomove = sio.loadmat(os.path.join(path,'2.mat'))["EEG"][0,0]["data"]

elec_names = ['FP1','Fz','F3','F7','FT9','FC5','FC1','C3','T7','TP9','CP5','CP1','Pz','P3','P7','O1','Oz','O2','P4','P8','TP10','CP6','CP2','Cz','C4','T8','FT10','FC6','FC2','F4','F8','FP2','AF3','AFz','F1','F5','FT7','FC3','FCz','C1','C5','TP7','CP3','P1','P5','PO7','PO3','POz','PO4','PO8','P6','P2','CPz','CP4','TP8','C6','C2','FC4','FT8','F6','F2','AF4']

data_orig = np.concatenate([data_right, data_left, data_nomove], axis=2)
data_org = np.moveaxis(data_orig, 2, 0)
labels_org = np.concatenate([np.zeros((data_right.shape[2])), np.ones((data_left.shape[2])), 2*np.ones((data_nomove.shape[2]))])

data, labels = shuffle(data_org, labels_org, random_state=42)

elec_order = ['F5','F3','F1','Fz','F2','F4','F6','FC5','FC3','FC1','FCz','FC2','FC4','FC6','C5','C3','C1','Cz','C2','C4','C6','CP5','CP3','CP1','CPz','CP2','CP4','CP6','P5','P3','P1','Pz','P2','P4','P6']
elec_idx = [elec_names.index(x) for x in elec_order]

data_relevant = data[:,elec_idx,150:] 

n_trials = np.shape(data_relevant)[0]
n_samples = np.shape(data_relevant)[2]

data_3d = np.reshape(data_relevant, (n_trials,5,7,n_samples))

labels_onehot = np.zeros((n_trials, 3))
for i in range(n_trials):
    labels_onehot[i, int(labels[i])]=1

X_train_val, X_test, y_train_val, y_test = train_test_split(data_3d, labels_onehot, test_size=0.1, random_state=42, stratify=labels_onehot)

X_train_val = np.expand_dims(X_train_val,axis=4)
X_test = np.expand_dims(X_test,axis=4)

X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.1, random_state=42, stratify=y_train_val)

In [4]:
X_big, X_small, y_big, y_small = train_test_split(X_train, y_train, test_size=0.045, random_state=42, stratify=y_train)
X_small_train, X_small_val, y_small_train, y_small_val = train_test_split(X_small, y_small, test_size=0.1, random_state=42, stratify=y_small)
print(X_small_train.shape)
print(X_small_val.shape)

(324, 5, 7, 350, 1)
(36, 5, 7, 350, 1)


In [5]:
model = keras.Sequential([
    layers.Conv3D(20, (3,3,200), padding='valid', input_shape=(5, 7, n_samples, 1), activation="relu"),
    layers.Conv3D(10, (2,2,20), padding='valid', activation="relu"),
    layers.MaxPooling3D((2, 2, 50), strides=(1,1,50)),
    layers.BatchNormalization(),
    layers.Flatten(),
    layers.Dense(3, activation="softmax")
])

model.compile(optimizer='adam',
              loss=keras.losses.CategoricalCrossentropy(),
              metrics=['accuracy'])

model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv3d (Conv3D)             (None, 3, 5, 151, 20)     36020     
                                                                 
 conv3d_1 (Conv3D)           (None, 2, 4, 132, 10)     16010     
                                                                 
 max_pooling3d (MaxPooling3D  (None, 1, 3, 2, 10)      0         
 )                                                               
                                                                 
 batch_normalization (BatchN  (None, 1, 3, 2, 10)      40        
 ormalization)                                                   
                                                                 
 flatten (Flatten)           (None, 60)                0         
                                                                 
 dense (Dense)               (None, 3)                 1

In [7]:
# history = model.fit(X_train, y_train, epochs=15, validation_data=(X_val, y_val))
history = model.fit(X_small_train, y_small_train, epochs=10, validation_data=(X_small_val, y_small_val))

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


In [None]:
# Plot Accuracy graph (train set and val set)
metrics_df = pd.DataFrame(history.history)
ax = metrics_df[["accuracy", "val_accuracy"]].plot(title="Accuracy");
ax.set_xlabel("Epochs")
ax.set_ylabel("Accuracy")
ax.set_title("Accuracy")
ax.legend(["Training accuracy", "Validation accuracy"]);

# Plot Loss graph (train set and val set)
axe = metrics_df[["loss", "val_loss"]].plot(title="Loss");
axe.set_xlabel("Epochs")
axe.set_ylabel("Loss Value")
axe.set_title("Loss")
axe.legend(["Training loss", "Validation loss"]);

# print_confusion_matrix(model, X_small_val , y_small_val)
print_confusion_matrix(model, X_small_train, y_small_train)
# print_confusion_matrix(model, X_big, y_big)

In [None]:
print_confusion_matrix(model, X_big, y_big)