*This is the open source code of paper: Toward reliable signals decoding for electroencephalogram: A benchmark study to EEGNeX.*<br>
*@Date : 2022-05-29 18:31:45*<br>
*@Author : Xia CHEN (xia.chen@iek.uni-hannover.de), Xiangbin Teng, Han Chen, Yafeng Pan, Philipp Geyer*<br>
*@Link : http://arxiv.org/abs/2207.12369*<br>
*@Ver : v01*<br>
*For using the code or data, please cite:*<br>
*- Chen, X., Teng, X., Chen, H., Pan, Y., & Geyer, P.. (2022). Toward reliable signals decoding for electroencephalogram: A benchmark study to EEGNeX.*

In [None]:
import numpy as np
from numpy import mean, std, dstack
import pandas as pd
from pandas import read_csv

from matplotlib import pyplot

import tensorflow
import torch
import os, gc
from time import time

from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix

from keras.models import Sequential
from keras.layers import Dense, Flatten, Dropout, LSTM
from tensorflow.keras.utils import to_categorical

# Selfbuilded functions
from BenchmarkModels import *

In [None]:
'''
This notebook loads dataset in YourDataName.npy data format and run benchmarkmodels:
- x_{}.npy (Input, format as #Samples, Channels, Lengths) 
- y_{}.npy (Label, num)

'''

DATA_LIST = ['YourDataName']
channel_last=False

In [None]:
# load a dataset group, such as train or test
def load_dataset_group(data):
    print('Loading data', data)
    X = np.load('x_{}.npy'.format(data))
    y = np.load('y_{}.npy'.format(data))
    # load input data
    
    return X, y

# load the dataset, returns train and test X and y elements
def load_dataset(prefix='', channel_last=False):
    # X_SHAPE:  samples, channels, lengths
    X, y = load_dataset_group(DATA)
    # One hot encode y
    y = to_categorical(y)
    # TRAIN - 0.75. VALIDATION - 0.125, TEST - 0.125
    trainX, testX, trainy, testy = train_test_split(X, y, test_size=0.25)
    valX, testX, valy, testy = train_test_split(testX, testy, test_size=0.5)
    
    print(trainX.shape, trainy.shape, valX.shape, valy.shape, testX.shape, testy.shape)
    return trainX, trainy, valX, valy, testX, testy


# summarize scores
def summarize_results(scores):
    print(scores)
    m, s = mean(scores), std(scores)
    print('Accuracy: %.3f%% (+/-%.3f)' % (m, s))
    return  m, s 


# Model traning/evaluating
def model_evaluation(trainX, trainy, valX, valy, testX, testy, Model_name, Data_name, channel_last=False, plot_model=False):
    
    # Callbacks
    callback_es = tensorflow.keras.callbacks.EarlyStopping(monitor='val_loss', patience=20)
    callback_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5)
    
    checkpointDir = './model_weights/'
    os.makedirs(checkpointDir, exist_ok=True)
    checkpointPath = os.path.join(checkpointDir, "{}_best_{}_weights.h5".format(Data_name, Model_name))
    checkpointer = ModelCheckpoint(filepath=checkpointPath, verbose=1, save_best_only=True)

    # Params
    verbose, epochs, batch_size = 0, 200, 128
    n_timesteps, n_features, n_outputs = trainX.shape[2], trainX.shape[1], trainy.shape[1]

    trainX = trainX.reshape((trainX.shape[0], 1, n_features, n_timesteps))
    valX = valX.reshape((valX.shape[0], 1, n_features, n_timesteps))
    testX = testX.reshape((testX.shape[0], 1, n_features, n_timesteps))

    ### Model structure        
    if(Model_name == 'Single_LSTM'):
        model = Single_LSTM(n_timesteps, n_features, n_outputs)

    if(Model_name == 'Single_GRU'):
        model = Single_GRU(n_timesteps, n_features, n_outputs)
        
    if(Model_name == 'OneD_CNN'):
        model = OneD_CNN(n_timesteps, n_features, n_outputs)
        
    if(Model_name == 'OneD_CNN_Dilated'):
        model = OneD_CNN_Dilated(n_timesteps, n_features, n_outputs)
        
    if(Model_name == 'OneD_CNN_Causal'):
        model = OneD_CNN_Causal(n_timesteps, n_features, n_outputs)   
        
    if(Model_name == 'OneD_CNN_CausalDilated'):
        model = OneD_CNN_CausalDilated(n_timesteps, n_features, n_outputs)   
        
    if(Model_name == 'TwoD_CNN'):
        model = TwoD_CNN(n_timesteps, n_features, n_outputs)
        
    if(Model_name == 'TwoD_CNN_Separable'):
        model = TwoD_CNN_Separable(n_timesteps, n_features, n_outputs)
        
    if(Model_name == 'TwoD_CNN_Dilated'):
        model = TwoD_CNN_Dilated(n_timesteps, n_features, n_outputs)
        
    if(Model_name == 'TwoD_CNN_Depthwise'):
        model = TwoD_CNN_Depthwise(n_timesteps, n_features, n_outputs)
        
    ###  
    if(Model_name == 'Single_ConvLSTM2D'):
        # reshape into subsequences (samples, time steps, rows, cols, channels)
        trainX = trainX.reshape((trainX.shape[0], 1, 1, n_features, n_timesteps))
        valX = valX.reshape((valX.shape[0], 1, 1, n_features, n_timesteps))
        testX = testX.reshape((testX.shape[0], 1, 1, n_features, n_timesteps))
    
        model = Single_ConvLSTM2D(n_timesteps, n_features, n_outputs)
    
    if(Model_name == 'CNN_LSTM'):
        model = CNN_LSTM(n_timesteps, n_features, n_outputs)
        
    if(Model_name == 'CNN_GRU'):
        model = CNN_GRU(n_timesteps, n_features, n_outputs)
        
    ###

    if(Model_name == 'EEGNet_8_2'):
        model = EEGNet_8_2(n_timesteps, n_features, n_outputs)
        
    if(Model_name == 'test1'):
        model = test1(n_timesteps, n_features, n_outputs) 
    if(Model_name == 'test2'):
        model = test2(n_timesteps, n_features, n_outputs) 
    if(Model_name == 'test3'):
        model = test3(n_timesteps, n_features, n_outputs) 
    if(Model_name == 'test4'):
        model = test4(n_timesteps, n_features, n_outputs) 
        
    if(Model_name == 'EEGNeX_8_32'):
        model = EEGNeX_8_32(n_timesteps, n_features, n_outputs) 

    
    if(plot_model==True):
        print('The current running model is:', Model_name)
        print(model.summary())
    
    
    # Fit / Evaluate model
    # Fit network callback_es, callback_lr
    model.fit(trainX, trainy, epochs=epochs, batch_size=batch_size, verbose=verbose, 
              validation_data=(valX, valy), callbacks=[callback_es, callback_lr])

    # Load optimal weights
    # model.load_weights(checkpointPath)
    # Evaluate model
    _, accuracy = model.evaluate(testX, testy, batch_size=batch_size, verbose=0)
    pred_i = model.predict(testX, batch_size=batch_size)
    
    return accuracy, pred_i

In [None]:
for DATA in DATA_LIST:
    # Log
    path = 'ResultLog_{}.csv'.format(DATA)
    now = datetime.now()
    with open(path,'a', newline='') as f:
        csv_write = csv.writer(f)
        # Write a row of date dd/mm/YY H:M:S
        dt_string = now.strftime("%d/%m/%Y %H:%M:%S")
        csv_write.writerow([dt_string])


    Model_list = [
                  'Single_LSTM', 'Single_GRU',
                  'OneD_CNN', 'OneD_CNN_Dilated', 
                  'OneD_CNN_Causal', 'OneD_CNN_CausalDilated',
                  'TwoD_CNN' ,'TwoD_CNN_Dilated', 
                  'TwoD_CNN_Separable','TwoD_CNN_Depthwise', 
                  'CNN_LSTM', 'CNN_GRU', 
                  'Single_ConvLSTM2D',
                  'EEGNet_8_2',
                  'EEGNeX_8_32',
                  ]
    
    
    # Clear cache
    gc.collect()
    torch.cuda.empty_cache()
    print('='*20)        
    # Load data
    trainX, trainy, valX, valy, testX, testy = load_dataset(channel_last=channel_last)

    
    for Model_name in Model_list:
        
        scores = list()
        pred = list()
        true = list()
        run_time = list()
        
        # Repeat experiment
        for r in range(5):
            if(r == 0):
                score, pred_i = model_evaluation(trainX, trainy, valX, valy, testX, testy, Model_name, DATA, plot_model=True)
            else:
                score, pred_i = model_evaluation(trainX, trainy, valX, valy, testX, testy, Model_name, DATA)
            score = score * 100.0
            print('>#%d: %.3f' % (r+1, score))
            scores.append(score)
            pred.append(pred_i)
            true.append(testy)
        # Summarize results
        mean_, std_ = summarize_results(scores)
        print("Classification Report:\n", classification_report(np.argmax(testy, axis=1), np.argmax(pred_i, axis=1), digits=3))
        
        with open(path,'a', newline='') as f:
            csv_write = csv.writer(f)
            # Write data
            csv_head = [Model_name, mean_, std_]
            csv_write.writerow(csv_head)