Notebook based off of https://arxiv.org/pdf/1801.05412.pdf , refer to this for parameters.

# Import of necesary librairies

In [None]:
import pandas as pd
import glob
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from keras.models import Sequential, Model, Input, load_model
from keras.layers import Input, Dense, Dropout
from keras.layers import Embedding, Activation, Flatten
from keras.layers import Conv1D, GlobalAveragePooling1D, MaxPooling1D, BatchNormalization, LSTM
from keras.utils import to_categorical
from keras import optimizers
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import LabelEncoder
from keras.utils import np_utils
import time
from IPython.display import Image
from IPython.core.display import HTML
from keras.callbacks import ModelCheckpoint

# Utilitaries functions

In [None]:
def folder_to_df(letter): #import the .txt files
    full_path ="data/bonn_uni_datasets/"+ letter + "/*.*"
    files = files = glob.glob(full_path)
    df_list = []
    for file in files:
        df_list.append(pd.read_csv(file, header = None))
    big_df = pd.concat(df_list, ignore_index=True, axis= 1)#垂直方向进行拼接
    return big_df.T #每一列是一个20s序列

def norm(X): # zero mean and unit variance normalization
    X = X - np.mean(X)
    X = X / np.std(X)
    return X

def window(a, w = 512, o = 64, copy = False): #window sliding function
    #default for training, for testing data we will split each signal in four of 1024 and apply
    #a window size of 512 with a stride (o) of 256
    sh = (a.size - w + 1, w)
    st = a.strides * 2
    view = np.lib.stride_tricks.as_strided(a, strides = st, shape = sh)[0::o]
    if copy:
        return view.copy()
    else:
        return view

def enrich_train(df): #enrich data by splicing the 4097-long signals 
    #into 512 long ones with a stride of 64
    labels = df.iloc[:,-1]
    data = df.iloc[:, :-1]
    res = list()
    for i in range(len(data)):
        res += [window(data.iloc[i].values)]
    return res

def reshape_x(arr): #shape the input data into the correct form (x1,x2,1)
    nrows = arr.shape[0]
    ncols = arr.shape[1]
    return arr.reshape(nrows, ncols, 1)


# Load data into dataframes

In [10]:
def load_data_as_df():
    A = norm(folder_to_df('A'))
    B = norm(folder_to_df('B'))
    C = norm(folder_to_df('C'))
    D = norm(folder_to_df('D'))
    E = norm(folder_to_df('E'))
    
    normal = A.append(B).reset_index(drop = True)
    interictal = C.append(D).reset_index(drop = True)
    ictal = E

    return normal, interictal, ictal


# Split into 90%/10%, keeping the 10% for the testing of the majority voting later

In [4]:
normal, interictal, ictal = load_data_as_df()

In [5]:
normal_train, normal_vote = train_test_split(normal, test_size = 0.1)
interictal_train, interictal_vote = train_test_split(interictal, test_size = 0.1)
ictal_train, ictal_vote = train_test_split(ictal, test_size = 0.1)
print(normal_train.shape)

(180, 4097)


# Enriching the data as per Scheme 1 in the paper

### window sliding with a stride of 64 and length of 512, as well as adding labels and format into the correct shape for the model

In [6]:
def format_enrich_train(normal, interictal, ictal):
    
    #enrich data and reshape it to have a two dimensional array instead of three
    normal_train_enr = np.asarray(enrich_train(normal)).reshape(-1, np.asarray(enrich_train(normal)).shape[-1])
    interictal_train_enr = np.asarray(enrich_train(interictal)).reshape(-1, np.asarray(enrich_train(interictal)).shape[-1])
    ictal_train_enr = np.asarray(enrich_train(ictal)).reshape(-1, np.asarray(enrich_train(ictal)).shape[-1])

    #change into a dataframe to add labels easily
    normal_train_enr_df = pd.DataFrame(normal_train_enr)
    interictal_train_enr_df = pd.DataFrame(interictal_train_enr)
    ictal_train_enr_df = pd.DataFrame(ictal_train_enr)
    
    normal_train_enr_df['labels'] = 0 # normal
    interictal_train_enr_df['labels'] = 1 #interictal
    ictal_train_enr_df['labels'] = 2 #ictal

    #concat all
    data_labels = pd.concat([normal_train_enr_df, interictal_train_enr_df, ictal_train_enr_df], ignore_index = True)
    

    #separates data and labels into numpy arrays for keras
    data = data_labels.drop('labels', axis = 1).values
    labels = data_labels.labels.values
    
    #labels = np.expand_dims(labels, axis=1)
    
    return data, labels

# The model, as per :
![Schema of the model](images/model_schema.png)


### Parameters taken in the paper

In [11]:
def create_model(,inception=True, res=True, strided=True, maxpool=False, avgpool=False, batchnorm=True):
# #     X = Sequential()
# #     X_input = Input(input_shape)
# #     X = ZeroPadding2D((3, 3))(X_input)
# #     X_shortcut = X
#     model = Sequential()
#     #Conv - 1
#     model.add(Conv1D(24, 5,strides =  3, input_shape=(512,1)))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
# #     X = Conv1D(24, 5,strides =  3, input_shape=(512,1))
# #     X = BatchNormalization(X)
# #     X = Activation('relu')(X)
#     #Conv - 2
#     model.add(Conv1D(16, 3,strides =  2))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
# #     X = Conv1D(16, 3,strides =  3, input_shape=(512,1))
# #     X = BatchNormalization(X)
# #     X = Activation('relu')(X)
#     #Conv - 3
#     model.add(Conv1D(8, 3,strides =  2))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
# #     X = Conv1D(8, 3,strides =  3, input_shape=(512,1))
# #     X = BatchNormalization(X)
# #     X = Activation('relu')(X)
#     model.add(Conv1D(4, 3,strides =  2))
#     model.add(BatchNormalization())
#     model.add(Activation('relu'))
#     #FC -1
#     model.add(Flatten())
#     model.add(Dense(20))
#     model.add(Activation('relu'))
# #     X = Flatten(X)
# #     X = Dense(20)(X)
# #     X = Activation('relu')(X)
#     #Dropout
#     model.add(Dropout(0.5))
# #     X = Dropout(0.5)(X)
#     #FC -2
#     model.add(Dense(units=50, activation='relu',
#                 use_bias=True, kernel_initializer='glorot_uniform', bias_initializer='zeros',))
#     model.add(Dense(3,activation = 'softmax'))
# #     X = Dense(3,activation = 'softmax')(X)
#     #softmax
# #     model.add(Activation('softmax'))
        self.i = 0
        pad = 'same'
        padp = 'same'

        c_act = config['c_act']
        r_act = config['r_act']
        rk_act = config['rk_act']

        r = kr.regularizers.l2(config['reg'])

        c = self.input
        stride_size = config['strides'] if strided else 1

        if inception:
            c0 = layers.Conv1D(config['filters'], kernel_size=4, strides=stride_size, padding=pad,
                               activation=c_act)(c)
            c1 = layers.Conv1D(config['filters'], kernel_size=8, strides=stride_size, padding=pad,
                               activation=c_act)(c)
            c2 = layers.Conv1D(config['filters'], kernel_size=32, strides=stride_size, padding=pad,
                               activation=c_act)(c)

            c = layers.concatenate([c0, c1, c2])

            if maxpool:
                c = layers.MaxPooling1D(2, padding=padp)(c)
            elif avgpool:
                c = layers.AveragePooling1D(2, padding=padp)(c) 
            if batchnorm:
                c = layers.BatchNormalization()(c)
            c = layers.SpatialDropout1D(self.config['cnn_drop'])(c)



            c0 = layers.Conv1D(config['filters'], kernel_size=4, strides=stride_size, padding=pad,
                               activation=c_act)(c)
            c1 = layers.Conv1D(config['filters'], kernel_size=8, strides=stride_size, padding=pad,
                               activation=c_act)(c)
            c2 = layers.Conv1D(config['filters'], kernel_size=32, strides=stride_size, padding=pad,
                               activation=c_act)(c)

            c = layers.concatenate([c0, c1, c2])
            if maxpool:
                c = layers.MaxPooling1D(2, padding=padp)(c)
            elif avgpool:
                c = layers.AveragePooling1D(2, padding=padp)(c) 
            if batchnorm:
                c = layers.BatchNormalization()(c)
            c = layers.SpatialDropout1D(self.config['cnn_drop'])(c)

            c0 = layers.Conv1D(config['filters'], kernel_size=4, strides=stride_size, padding=pad,
                               activation=c_act)(c)
            c1 = layers.Conv1D(config['filters'], kernel_size=8, strides=stride_size, padding=pad,
                               activation=c_act)(c)
            c2 = layers.Conv1D(config['filters'], kernel_size=32, strides=stride_size, padding=pad,
                               activation=c_act)(c)

            c = layers.concatenate([c0, c1, c2])
            if maxpool:
                c = layers.MaxPooling1D(2, padding=padp)(c)
            elif avgpool:
                c = layers.AveragePooling1D(2, padding=padp)(c) 
            if batchnorm:
                c = layers.BatchNormalization()(c)
            c = layers.SpatialDropout1D(config['cnn_drop'])(c)

        else:   # No inception Modules
            c = layers.Conv1D(config['filters'], kernel_size=4, strides=stride_size, padding=pad, activation=c_act)(input)
            if maxpool:
                c = layers.MaxPooling1D(2, padding=padp)(c)
            elif avgpool:
                c = layers.AveragePooling1D(2, padding=padp)(c) 
            if batchnorm:
                c = layers.BatchNormalization()(c)
            c = layers.SpatialDropout1D(config['cnn_drop'])(c)

            c = layers.Conv1D(config['filters'], kernel_size=4, strides=stride_size, padding=pad, activation=c_act)(c)
            if maxpool:
                c = layers.MaxPooling1D(2, padding=padp)(c)
            elif avgpool:
                c = layers.AveragePooling1D(2, padding=padp)(c) 
            if batchnorm:
                c = layers.BatchNormalization()(c)
            c = layers.SpatialDropout1D(config['cnn_drop'])(c)

            c = layers.Conv1D(config['filters'], kernel_size=4, strides=stride_size, padding=pad, activation=c_act)(c)
            if maxpool:
                c = layers.MaxPooling1D(2, padding=padp)(c)
            elif avgpool:
                c = layers.AveragePooling1D(2, padding=padp)(c) 
            if batchnorm:
                c = layers.BatchNormalization()(c)
            c = layers.SpatialDropout1D(self.config['cnn_drop'])(c)

        if res: # Residual RNN
            g1 = layers.GRU(config['state_size'], return_sequences=True, activation=rk_act, recurrent_activation=r_act,
                            dropout=config['rec_drop'], recurrent_dropout=config['rec_drop'],
                            recurrent_regularizer=r, kernel_regularizer=r)(c)
            g2 = layers.GRU(config['state_size'], return_sequences=True,  activation=rk_act, recurrent_activation=r_act,
                            dropout=config['rec_drop'], recurrent_dropout=config['rec_drop'],
                            recurrent_regularizer=r, kernel_regularizer=r)(g1)
            g_concat1 = layers.concatenate([g1, g2])

            g3 = layers.GRU(self.config['state_size'], return_sequences=True, activation=rk_act, recurrent_activation=r_act,
                            dropout=self.config['rec_drop'], recurrent_dropout=self.config['rec_drop'],
                            recurrent_regularizer=r, kernel_regularizer=r)(g_concat1)
            g_concat2 = layers.concatenate([g1, g2, g3])

            g = layers.GRU(self.config['state_size'], return_sequences=False, activation=rk_act, recurrent_activation=r_act,
                           dropout=self.config['rec_drop'], recurrent_dropout=self.config['rec_drop'],
                            recurrent_regularizer=r, kernel_regularizer=r)(g_concat2)

        else:   # No Residual RNN
            g = layers.GRU(self.config['state_size'], return_sequences=True, activation=rk_act, recurrent_activation=r_act,
                           dropout=self.config['rec_drop'], recurrent_dropout=self.config['rec_drop'],
                            recurrent_regularizer=r, kernel_regularizer=r)(c)

            g = layers.GRU(self.config['state_size'], return_sequences=True, activation=rk_act, recurrent_activation=r_act,
                           dropout=self.config['rec_drop'], recurrent_dropout=self.config['rec_drop'],
                            recurrent_regularizer=r, kernel_regularizer=r)(g)
            g = layers.GRU(self.config['state_size'], return_sequences=True, activation=rk_act, recurrent_activation=r_act,
                           dropout=self.config['rec_drop'], recurrent_dropout=self.config['rec_drop'],
                            recurrent_regularizer=r, kernel_regularizer=r)(g)

            g = layers.GRU(self.config['state_size'], return_sequences=False, activation=rk_act, recurrent_activation=r_act,
                           dropout=self.config['rec_drop'], recurrent_dropout=self.config['rec_drop'],
                            recurrent_regularizer=r, kernel_regularizer=r)(g)


        d = layers.Dense(self.config['output_size'])(g)
        out = layers.Softmax()(d)

        self.model = Model(self.input, out)
        print("{} initialized.".format(self.model.name))


#     adam = optimizers.Adam(lr=0.00002, beta_1=0.9, beta_2=0.999, epsilon=0.00000001, decay=0.0, amsgrad=False)

#     model.compile(loss='categorical_crossentropy',
#                   optimizer=adam,
#                   metrics=['accuracy'])
#     return model

# Training function as well as the stratified 10 fold cross validation for testing

In [12]:
def train_evaluate_model(model, xtrain, ytrain, xval, yval, fold):
    model_name = 'P-1D-CNN'
    checkpointer = ModelCheckpoint(filepath='checkpoints/'+'fold'+ str(fold)+'.'+model_name + '.{epoch:03d}-{accuracy:.3f}.h5',verbose=0,monitor ='acc', save_best_only=True)
    history = model.fit(xtrain, ytrain, batch_size=32, callbacks = [checkpointer],epochs=100, verbose = 1)
    print(history)
    score = model.evaluate(xval, yval, batch_size=32)
    print('\n')
    print(score)
    return score, history

In [13]:
n_folds = 2
X, y = format_enrich_train(normal_train, interictal_train, ictal_train)
#initialize 10 fold validation
skf = StratifiedKFold(n_splits=2, shuffle=True)


#10 fold cross validation loop
for i, (train, test) in enumerate(skf.split(X,y)):
    print("Running Fold", i+1, "/", n_folds)
    start_time = time.time()
    X = reshape_x(X)
    xtrain, xval = X[train], X[test]
    ytrain, yval = y[train], y[test]
    ytrain = to_categorical(ytrain, num_classes=3, dtype='float32')
    yval = to_categorical(yval, num_classes=3, dtype='float32')


    model = None # Clearing the NN.
    model = create_model(inception=True, res=True, strided=True, maxpool=False, avgpool=False, batchnorm=True)
    score, history = train_evaluate_model(model, xtrain, ytrain, xval, yval, i+1)
    print("Ran ", i+1, "/", n_folds, "Fold in %s seconds ---" % (time.time() - start_time))

Running Fold 1 / 2


TypeError: create_model() missing 1 required positional argument: 'self'

# Evaluation

In [None]:
best_model = load_model('best_model.0.966.h5')

Additional necessary funtions

In [None]:
def split_vote(df):
    res = list()
    for i in range(len(df)):
        res += [window(df.iloc[i].values,w= 512, o = 64)]
    return np.asarray(res)

def count_votes(my_list): 
    freq = {} 
    for i in my_list: 
        if (i in freq): 
            freq[i] += 1
        else: 
            freq[i] = 1
    return freq

def reshape_signal(signal):
    signal = np.expand_dims(signal, axis=1)
    signal = np.expand_dims(signal, axis=0)
    return np.asarray(signal)

def evaluate_subsignals(subsignals,model):
    vote_list = np.array([])
    for i in range(len(subsignals)):
        mini_signal = reshape_signal(subsignals[i])
        ynew = model.predict_classes(mini_signal)
        vote_list = np.append(vote_list, ynew)
    decision = count_votes(vote_list)
    return decision_to_str(decision), vote_list

def decision_to_str(dec):
    res = list()
    for key,val in dec.items():
        if key == 0:
            res += ['normal: ' + str(val) + ' votes' + '\n']
        if key == 1:
            res += ['ictal: ' + str(val) + ' votes' + '\n']
        if key == 2:
            res += ['interictal: ' + str(val) + ' votes' + '\n']
    return res

Exctracting 1st normal signal for testing

In [None]:
big_signal = split_vote(ictal_vote)
subsignals = big_signal

It is divided into 15 subsignals of length 512, the model will "vote" on each subsignal and decide by majority

In [None]:
for subsignal in subsignals:
    decision, vote_list = evaluate_subsignals(subsignal,best_model)
    print(vote_list)
    print(decision[0])