Notebook based off of https://arxiv.org/pdf/1801.05412.pdf , refer to this for parameters.

# Import of necesary librairies

In [2]:
import pandas as pd
import glob
import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from keras.models import Sequential, Model, load_model
from tensorflow.keras.layers import Input
from keras.layers import Dense, Dropout
from keras.layers import Embedding, Activation, Flatten
from keras.layers import Conv1D, GlobalAveragePooling1D, MaxPooling1D, BatchNormalization
from keras.utils import to_categorical
from keras import optimizers
from sklearn.model_selection import StratifiedKFold
from sklearn.preprocessing import LabelEncoder

import time
from IPython.display import Image
from IPython.core.display import HTML
from keras.callbacks import ModelCheckpoint


# Utilitaries functions

In [3]:
!unzip data.zip

Archive:  data.zip
   creating: data/bonn_uni_datasets/
   creating: data/bonn_uni_datasets/A/
  inflating: data/bonn_uni_datasets/A/Z001.txt  
  inflating: data/bonn_uni_datasets/A/Z002.txt  
  inflating: data/bonn_uni_datasets/A/Z003.txt  
  inflating: data/bonn_uni_datasets/A/Z004.txt  
  inflating: data/bonn_uni_datasets/A/Z005.txt  
  inflating: data/bonn_uni_datasets/A/Z006.txt  
  inflating: data/bonn_uni_datasets/A/Z007.txt  
  inflating: data/bonn_uni_datasets/A/Z008.txt  
  inflating: data/bonn_uni_datasets/A/Z009.txt  
  inflating: data/bonn_uni_datasets/A/Z010.txt  
  inflating: data/bonn_uni_datasets/A/Z011.txt  
  inflating: data/bonn_uni_datasets/A/Z012.txt  
  inflating: data/bonn_uni_datasets/A/Z013.txt  
  inflating: data/bonn_uni_datasets/A/Z014.txt  
  inflating: data/bonn_uni_datasets/A/Z015.txt  
  inflating: data/bonn_uni_datasets/A/Z016.txt  
  inflating: data/bonn_uni_datasets/A/Z017.txt  
  inflating: data/bonn_uni_datasets/A/Z018.txt  
  inflating: data/bonn_

In [4]:
def folder_to_df(letter): #import the .txt files
    full_path ="data/bonn_uni_datasets/"+ letter + "/*.*"
    files = files = glob.glob(full_path)
    df_list = []
    for file in files:
        df_list.append(pd.read_csv(file, header = None))
    big_df = pd.concat(df_list, ignore_index=True, axis= 1)
    return big_df.T

def norm(X): # zero mean and unit variance normalization
    X = X - np.mean(X)
    X = X / np.std(X)
    return X

def window(a, w = 512, o = 64, copy = False): #window sliding function
    #default for training, for testing data we will split each signal in four of 1024 and apply
    #a window size of 512 with a stride (o) of 256
    sh = (a.size - w + 1, w)
    st = a.strides * 2
    view = np.lib.stride_tricks.as_strided(a, strides = st, shape = sh)[0::o]
    if copy:
        return view.copy()
    else:
        return view

def enrich_train(df): #enrich data by splicing the 4097-long signals
    #into 512 long ones with a stride of 64
    labels = df.iloc[:,-1]
    data = df.iloc[:, :-1]
    res = list()
    for i in range(len(data)):
        res += [window(data.iloc[i].values)]
    return res

def reshape_x(arr): #shape the input data into the correct form (x1,x2,1)
    nrows = arr.shape[0]
    ncols = arr.shape[1]
    return arr.reshape(nrows, ncols, 1)


# Load data into dataframes

In [5]:
def load_data_as_df():
    A = norm(folder_to_df('A'))
    B = norm(folder_to_df('B'))
    C = norm(folder_to_df('C'))
    D = norm(folder_to_df('D'))
    E = norm(folder_to_df('E'))

    normal = A.append(B).reset_index(drop = True)
    interictal = C.append(D).reset_index(drop = True)
    ictal = E

    return normal, interictal, ictal


# Split into 90%/10%, keeping the 10% for the testing of the majority voting later

In [6]:
normal, interictal, ictal = load_data_as_df()

  return mean(axis=axis, dtype=dtype, out=out, **kwargs)
  return mean(axis=axis, dtype=dtype, out=out, **kwargs)
  return mean(axis=axis, dtype=dtype, out=out, **kwargs)
  return mean(axis=axis, dtype=dtype, out=out, **kwargs)
  return mean(axis=axis, dtype=dtype, out=out, **kwargs)
  normal = A.append(B).reset_index(drop = True)
  interictal = C.append(D).reset_index(drop = True)


In [7]:
normal_train, normal_vote = train_test_split(normal, test_size = 0.1)
interictal_train, interictal_vote = train_test_split(interictal, test_size = 0.1)
ictal_train, ictal_vote = train_test_split(ictal, test_size = 0.1)

# Enriching the data as per Scheme 1 in the paper

### window sliding with a stride of 64 and length of 512, as well as adding labels and format into the correct shape for the model

In [8]:
def format_enrich_train(normal, interictal, ictal):

    #enrich data and reshape it to have a two dimensional array instead of three
    normal_train_enr = np.asarray(enrich_train(normal)).reshape(-1, np.asarray(enrich_train(normal)).shape[-1])
    interictal_train_enr = np.asarray(enrich_train(interictal)).reshape(-1, np.asarray(enrich_train(interictal)).shape[-1])
    ictal_train_enr = np.asarray(enrich_train(ictal)).reshape(-1, np.asarray(enrich_train(ictal)).shape[-1])

    #change into a dataframe to add labels easily
    normal_train_enr_df = pd.DataFrame(normal_train_enr)
    interictal_train_enr_df = pd.DataFrame(interictal_train_enr)
    ictal_train_enr_df = pd.DataFrame(ictal_train_enr)

    normal_train_enr_df['labels'] = 0 # normal
    interictal_train_enr_df['labels'] = 1 #interictal
    ictal_train_enr_df['labels'] = 2 #ictal

    #concat all
    data_labels = pd.concat([normal_train_enr_df, interictal_train_enr_df, ictal_train_enr_df], ignore_index = True)


    #separates data and labels into numpy arrays for keras
    data = data_labels.drop('labels', axis = 1).values
    labels = data_labels.labels.values

    #labels = np.expand_dims(labels, axis=1)

    return data, labels

# The model, as per :
![Schema of the model](images/model_schema.png)


### Parameters taken in the paper

In [9]:
def create_model():
    model = Sequential()
    #Conv - 1
    model.add(Conv1D(24, 5,strides =  3, input_shape=(512,1)))
    model.add(BatchNormalization())
    model.add(Activation('relu'))

    #Conv - 2
    model.add(Conv1D(16, 3,strides =  2))
    model.add(BatchNormalization())
    model.add(Activation('relu'))

    #Conv - 3
    model.add(Conv1D(8, 3,strides =  2))
    model.add(BatchNormalization())
    model.add(Activation('relu'))

    #FC -1
    model.add(Flatten())
    model.add(Dense(20))
    model.add(Activation('relu'))
    #Dropout
    model.add(Dropout(0.5))
    #FC -2
    model.add(Dense(3,activation = 'softmax'))
    #softmax
    #model.add(Activation('softmax'))

    adam = tf.keras.optimizers.legacy.Adam(lr=0.00002, beta_1=0.9, beta_2=0.999, epsilon=0.00000001, decay=0.0, amsgrad=False)

    model.compile(loss='categorical_crossentropy',
                  optimizer=adam,
                  metrics=['accuracy'])
    return model

# Training function as well as the stratified 10 fold cross validation for testing

In [10]:
def train_evaluate_model(model, xtrain, ytrain, xval, yval, fold):
    model_name = 'P-1D-CNN'
    metric = 'accuracy'
    checkpointer = ModelCheckpoint(filepath='checkpoints/'+'fold'+ str(fold)+'.'+model_name + '.{epoch:03d}-{accuracy:.3f}.h5',verbose=0, monitor=metric, save_best_only=True)
    history = model.fit(xtrain, ytrain, batch_size=32, callbacks = [checkpointer],epochs=200, verbose = 1)
    print(history)
    score = model.evaluate(xval, yval, batch_size=32)
    print('\n')
    print(score)
    return score, history

In [None]:
n_folds = 10
X, y = format_enrich_train(normal, interictal, ictal)
#initialize 10 fold validation
skf = StratifiedKFold(n_splits=10, shuffle=True)


#10 fold cross validation loop
for i, (train, test) in enumerate(skf.split(X,y)):
    print("Running Fold", i+1, "/", n_folds)
    start_time = time.time()
    X = reshape_x(X)
    xtrain, xval = X[train], X[test]
    ytrain, yval = y[train], y[test]
    ytrain = to_categorical(ytrain, num_classes=3, dtype='float32')
    yval = to_categorical(yval, num_classes=3, dtype='float32')


    model = None # Clearing the NN.
    model = create_model()
    score, history = train_evaluate_model(model, xtrain, ytrain, xval, yval, i+1)
    print("Ran ", i+1, "/", n_folds, "Fold in %s seconds ---" % (time.time() - start_time))

Running Fold 1 / 10


  super().__init__(name, **kwargs)


Epoch 1/200
Epoch 2/200
 13/802 [..............................] - ETA: 6s - loss: 1.1367 - accuracy: 0.3918

  saving_api.save_model(


Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 38/200
Epoch 39/200
Epoch 40/200
Epoch 41/200
Epoch 42/200
Epoch 43/200
Epoch 44/200
Epoch 45/200
Epoch 46/200
Epoch 47/200
Epoch 48/200
Epoch 49/200
Epoch 50/200
Epoch 51/200
Epoch 52/200
Epoch 53/200
Epoch 54/200
Epoch 55/200
Epoch 56/200
Epoch 57/200
Epoch 58/200
Epoch 59/200
Epoch 60/200
Epoch 61/200
Epoch 62/200
Epoch 63/200
Epoch 64/200
Epoch 65/200
Epoch 66/200
Epoch 67/200
Epoch 68/200
Epoch 69/200
Epoch 70/200
Epoch 71/200
Epoch 72/200
Epoch 73/200
Epoch 74/200
Epoch 75/200
Epoch 76/200
Epoch 77/200
Epoch 78/200
Epoch 79/200
Epoch 

# Evaluation

In [11]:
best_model = load_model('fold7.P-1D-CNN.186-0.968.h5')

Additional necessary funtions

In [12]:
def split_vote(df):
    res = list()
    for i in range(len(df)):
        res += [window(df.iloc[i].values,w= 512, o = 256)]
    return np.asarray(res)

def count_votes(my_list):
    freq = {}
    for i in my_list:
        if (i in freq):
            freq[i] += 1
        else:
            freq[i] = 1
    return freq

def reshape_signal(signal):
    signal = np.expand_dims(signal, axis=1)
    signal = np.expand_dims(signal, axis=0)
    return np.asarray(signal)

def evaluate_subsignals(subsignals,model):
    vote_list = np.array([])
    for i in range(len(subsignals)):
        mini_signal = reshape_signal(subsignals[i])
        probabilities = model.predict(mini_signal)
        ynew = np.argmax(probabilities, axis=-1)
        # ynew = model.predict_classes(mini_signal)
        vote_list = np.append(vote_list, ynew)
    decision = count_votes(vote_list)
    return decision_to_str(decision), vote_list

def decision_to_str(dec):
    res = list()
    for key,val in dec.items():
        if key == 0:
            res += ['normal: ' + str(val) + ' votes' + '\n']
        if key == 1:
            res += ['ictal: ' + str(val) + ' votes' + '\n']
        if key == 2:
            res += ['interictal: ' + str(val) + ' votes' + '\n']
    return res

Exctracting 1st normal signal for testing

In [13]:
big_signal = split_vote(ictal_vote)
subsignals = big_signal[0]

It is divided into 15 subsignals of length 512, the model will "vote" on each subsignal and decide by majority

In [14]:
decision, vote_list = evaluate_subsignals(subsignals,best_model)
print(vote_list)
print(decision[0])

[2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2. 2.]
interictal: 15 votes

