In [None]:
import h5py
import os
import pandas as pd
import numpy as np
%matplotlib inline
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_addons as tfa
from tensorflow import keras
from keras import Sequential, Input
from keras.layers import Dense, Dropout,LSTM,Conv1D,Flatten,MaxPooling1D,UpSampling1D,BatchNormalization,Bidirectional
from sklearn.metrics import accuracy_score, confusion_matrix
from keras.models import Model
from keras import layers
from keras.utils import plot_model
import scipy.stats as stats
from data_augmentation.augmentation import *
from data_augmentation.helper import *
import seaborn as sns
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score


from joblib import Parallel, delayed
from sklearn.cluster import dbscan
import joblib
from tqdm.notebook import tqdm
import keras_tuner

import umap
import umap.plot

import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.datasets import make_moons, make_circles, make_classification
from sklearn.neural_network import MLPClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.gaussian_process import GaussianProcessClassifier
from sklearn.gaussian_process.kernels import RBF
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis

## H5 Extraction functions

In [None]:
N_ELECTRODES = 32
CUT_OFF = 120
STEP_CUT_OFF = 25
CYCLE_PER_SEC = 30000

raw_stream = "Data/Recording_0/AnalogStream/Stream_1/ChannelData"
electrode_tpl = "Data/Recording_0/SegmentStream/Stream_0/SegmentData"


def find_sublist(sub, bigger):
    if not bigger:
        return -1
    if not sub:
        return 0
    first, rest = sub[0], sub[1:]
    pos = 0
    try:
        while True:
            pos = bigger.index(first, pos) + 1
            if not rest or bigger[pos:pos+len(rest)] == rest:
                return pos
    except ValueError:
        return -1 

def get_raw_electrode_data(path: str, electrode_number_start: int, electrode_number_stop: int,label: int) -> np.ndarray:
    f = h5py.File(path, mode='r')
    X = []
    Y = []

    for index in range(electrode_number_start, electrode_number_stop):
        print(f'\nNum electrode : {index}')
        spike_windows = np.array(f[f'{electrode_tpl}_{index}'][()]).T
        dataRaw = f[f'{raw_stream}'][index]
        range_cut_off = []

        K = len(spike_windows)
        for indx,spke in enumerate(spike_windows):
            tmp = find_sublist(spke.tolist(), dataRaw.tolist())
            if(tmp != -1):
                if(tmp - (CUT_OFF/2) >= 0):
                    cut = CUT_OFF/2
                    range_cut_off = dataRaw[tmp-cut:tmp+cut]
                else:
                    range_cut_off = dataRaw[tmp:tmp+CUT_OFF]

                X.append(range_cut_off)
                Y.append(label)    
            print(end="\r|%-80s|" % ("="*int(80*indx/(K-1))))

    return X, Y

def get_raw_data(path: str) -> np.ndarray:
    f = h5py.File(path, mode='r')
    X = []
    Y = []

    for index in range(10,13):
        print(f'\nNum electrode : {index}')
        spike_windows = f[f'SpikeWindow-0.{index}'][()]
        dataRaw = f[f'Raw-0.{index}'][0:len(f[f'Raw-0.{index}']):1, 1]
        sp = []

        K = len(spike_windows)
        for indx,spke in enumerate(spike_windows):
            tmp = find_sublist(spke.tolist(), dataRaw.tolist())
            if(tmp != -1):
                sp.append(tmp + 30)
            print(end="\r|%-80s|" % ("="*int(80*indx/(K-1))))
        
        for i in range(0,len(dataRaw)-CUT_OFF,CUT_OFF):
            range_cut_off = dataRaw[i:i+CUT_OFF]
            if(any(x in sp for x in range(i,i+CUT_OFF))):
                Y.append(1)
            else:
                Y.append(0)
            X.append(range_cut_off)

    return X, Y

def get_noise_data(path: str,shape:int,arr: np.ndarray) -> np.ndarray:
    f = h5py.File(path, mode='r')
    X = []
    Y = []

    for index in arr:
        print(f'\nNum electrode : {index}')
        spike_windows = f[f'SpikeWindow-0.{index}'][()]
        print(len(spike_windows))
        dataRaw = f[f'Raw-0.{index}'][0:len(f[f'Raw-0.{index}']):1, 1]
        sp = []

        K = len(spike_windows)
        for indx,spke in enumerate(spike_windows):
            tmp = find_sublist(spke.tolist(), dataRaw.tolist())
            if(tmp != -1):
                sp.append(tmp + 30)
            print(end="\r|%-80s|" % ("="*int(80*indx/(K-1))))
        
        for i in range(0,len(dataRaw)-CUT_OFF,CUT_OFF):
            if(len(X) == shape):
                return X
            range_cut_off = dataRaw[i:i+CUT_OFF]
            if(not any(x in sp for x in range(i,i+CUT_OFF))):
                X.append(range_cut_off)



def get_spike_data(path: str, arr: np.ndarray) -> np.ndarray:
    f = h5py.File(path, mode='r')
    sp = []

    for index in arr:
        print(f'\nNum electrode : {index}')
        spike_windows = f[f'SpikeWindow-0.{index}'][()]

        K = len(spike_windows)
        print(K)
        for indx,spke in enumerate(spike_windows):
            sp.append(spke[0:CUT_OFF])
            if(K != 1):
                print(end="\r|%-80s|" % ("="*int(80*indx/(K-1))))
    return sp

def show_spike_data(path: str, number_by_fold:int) -> np.ndarray:
    f = h5py.File(path, mode='r')
    for n in f.keys():
        if("SpikeWindow-0." in n):
            spike_windows = f[n][()]
            fig, axs = plt.subplots(number_by_fold)

            for i in range(number_by_fold):
                fig.set_size_inches(10, 5)
                axs[i].plot(spike_windows[i])     

def show_multiple_file_Spike(directory: str):
    for filename in os.listdir(directory):
        print(f"{filename}")
        show_spike_data(os.path.join(directory, filename),5)  
                
def get_number_spike_raw_data(path: str) -> np.ndarray:
    f = h5py.File(path, mode='r')
    event = 0
    for n in f.keys():
        if("SpikeTimestamp-0" in n):
            event += f[n].shape[0]
    return event
    



In [None]:
rebuild_spike = False
rebuild_noise = False

if os.path.exists("x_spike"+str(CUT_OFF)+".csv") and not rebuild_spike:
    spike = np.genfromtxt("x_spike"+str(CUT_OFF)+".csv", delimiter=',')
else:
    spike = get_spike_data('./RAW/2022-12-09T11-44-00_SpikeOnChip_SPOC1_Data.h5',[14,27,29])
    np.savetxt("x_spike"+str(CUT_OFF)+".csv", spike, delimiter=",")

if os.path.exists("x_noise"+str(CUT_OFF)+".csv") and os.path.exists("x_tbi"+str(CUT_OFF)+".csv") and not rebuild_noise:
    noise = np.genfromtxt("x_noise"+str(CUT_OFF)+".csv", delimiter=',')
    tbi_flat = np.genfromtxt("x_tbi"+str(CUT_OFF)+".csv", delimiter=',')
else:
    noise = get_noise_data('./RAW/2022-11-23T16-07-00_SpikeOnChip_SPOC1_Data.h5',len(spike),[1,3,5])
    tbi = []
    for i in range(32):
        if(i != 6 and i != 7 and i != 25):
            tbi.append(get_spike_data('./Post TBI 1/2022-11-23T16-30-00_SpikeOnChip_SPOC1_Data.h5',[i]))
    
    for i in range(32):
        if(i != 7 and i != 14 and i != 26):
            tbi.append(get_spike_data('./Post TBI 2/2022-11-23T16-42-00_SpikeOnChip_SPOC1_Data.h5',[i]))

    tbi_flat = [item for sublist in tbi for item in sublist]
    np.savetxt("x_noise"+str(CUT_OFF)+".csv", noise, delimiter=",")
    np.savetxt("x_tbi"+str(CUT_OFF)+".csv", tbi_flat, delimiter=",")

## Show some exemple for a spike

In [None]:
item = 0
tmp = 0
fig, axs = plt.subplots(2, 2)
for row in spike:
    if(item == 4):
        break
    fig.set_size_inches(20, 5)
    if(item == 2):
        tmp += 1 
    axs[tmp,item%2].plot(row)
    item += 1
fig.show()

## Show noise sample

In [None]:
item = 0
tmp = 0
fig, axs = plt.subplots(2, 2)
for row in noise:
    if(item == 4):
        break
    fig.set_size_inches(20, 5)
    if(item == 2):
        tmp += 1 
    axs[tmp,item%2].plot(row)
    item += 1
fig.show()

In [None]:
def build_long_waves_df(waves, labels):
    spikes_df = pd.DataFrame(waves, columns=["time{}".format(x) for x in range(waves.shape[1])])
    spikes_df['label'] = labels

    spikes_df_long = pd.melt(spikes_df, id_vars=['label'], value_vars=None, var_name='timepoint', )
    spikes_df_long['timepoint'] = spikes_df_long.timepoint.apply(lambda name: int(name[4:]))
    return spikes_df_long

spikes_df_long = build_long_waves_df(np.array(spike), 'spike')
sns.lineplot(x='timepoint', y='value', data=spikes_df_long, ci='sd', hue='label', legend=False)

## Prepare dataset

In [None]:
df = pd.concat([pd.DataFrame(spike), pd.DataFrame(noise),pd.DataFrame(tbi_flat)], axis=0)
y = np.append(np.ones(len(spike)),np.zeros(len(noise) + len(tbi_flat)))

In [None]:
df_spike = pd.DataFrame(spike)
y_spike = np.ones(df_spike.shape[0])

In [None]:
from sklearn import model_selection as ms

x_train, x_test, y_train, y_test = ms.train_test_split(df, y, 
                                     test_size=0.20, random_state=1)

x_train_spike, x_test_spike, y_train_spike, y_test_spike = ms.train_test_split(df_spike, y_spike, 
                                     test_size=0.20, random_state=1)

print("---------------- Dataset ------------------")
print(x_train.shape, x_test.shape, y_train.shape, y_test.shape)

print("------------- Dataset Spike ---------------")
print(x_train_spike.shape, x_test_spike.shape, y_train_spike.shape, y_test_spike.shape)

## Define model

In [None]:
if os.path.exists("denoising_Dense_1.h5"):
    autoencoder = tf.keras.models.load_model('denoising_Dense_1.h5')

In [None]:
output_layer = (autoencoder.layers[-11].output)

encoder = Model(autoencoder.input, output_layer)

encoder.summary()

In [None]:
X_train_encode = encoder.predict(x_train)
X_test_encode = encoder.predict(x_test)

## Classifier

In [None]:


names = [
    "Nearest Neighbors",
    "Linear SVM",
    "RBF SVM",
    "Decision Tree",
    "Random Forest",
    "Neural Net",
    "AdaBoost",
    "Naive Bayes",
    "QDA",
]

classifiers = [
    KNeighborsClassifier(3),
    SVC(kernel="linear", C=0.025),
    SVC(gamma=2, C=1),
    DecisionTreeClassifier(max_depth=5),
    RandomForestClassifier(max_depth=5, n_estimators=10, max_features=1),
    MLPClassifier(alpha=1, max_iter=1000),
    AdaBoostClassifier(),
    GaussianNB(),
    QuadraticDiscriminantAnalysis(),
]

cm = plt.cm.RdBu
cm_bright = ListedColormap(["#FF0000", "#0000FF"])

for name, clf in zip(names, classifiers):
    clf = make_pipeline(StandardScaler(), clf)
    clf.fit(X_train_encode, y_train)
    y_pred = clf.predict(X_test_encode)

    print(f'{name} & {accuracy_score(y_test, y_pred).round(4)} & {recall_score(y_test, y_pred).round(4)} & {precision_score(y_test, y_pred).round(4)} & {f1_score(y_test, y_pred).round(4)}')

    conf_matrix = confusion_matrix(y_true=y_test, y_pred=y_pred)
    fig, ax = plt.subplots(figsize=(5, 5))
    ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
    for i in range(conf_matrix.shape[0]):
        for j in range(conf_matrix.shape[1]):
            ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')
    
    plt.xlabel('Predictions', fontsize=18)
    plt.ylabel('Actuals', fontsize=18)
    plt.title('Confusion Matrix', fontsize=18)
    plt.show()
    


In [None]:
from sklearn.datasets import make_classification
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import GridSearchCV

# define the model with default hyperparameters
model = KNeighborsClassifier()
# define the grid of values to search
grid = dict()
grid['n_neighbors'] = [3,5,11,19,21]
grid['weights'] = ['uniform','distance']
grid['metric'] = ['euclidean','manhattan']
# define the evaluation procedure
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
# define the grid search procedure
grid_search = GridSearchCV(estimator=model, param_grid=grid, n_jobs=-1, cv=cv, scoring='accuracy')
# execute the grid search
grid_result = grid_search.fit(X_train_encode, y_train)
# summarize the best score and configuration
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
# summarize all scores that were evaluated
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))

In [None]:
clf = make_pipeline(StandardScaler(), KNeighborsClassifier(metric='manhattan',n_neighbors=5,weights='uniform'))
clf.fit(X_train_encode, y_train)
y_pred = clf.predict(X_test_encode)

print(f'Nearest Neighbors & {accuracy_score(y_test, y_pred).round(4)} & {recall_score(y_test, y_pred).round(4)} & {precision_score(y_test, y_pred).round(4)} & {f1_score(y_test, y_pred).round(4)}')

conf_matrix = confusion_matrix(y_true=y_test, y_pred=y_pred)
fig, ax = plt.subplots(figsize=(5, 5))
ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')
    
plt.xlabel('Predictions', fontsize=18)
plt.ylabel('Actuals', fontsize=18)
plt.title('Confusion Matrix', fontsize=18)
plt.show()
    

In [None]:
from sklearn.datasets import make_classification
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import GridSearchCV

# define the model with default hyperparameters
model = SVC()
# define the grid of values to search
grid = dict()
grid['C'] = [0.1, 1, 10, 100, 1000]
grid['gamma'] = [1, 0.1, 0.01, 0.001, 0.0001]
grid['kernel'] = ['linear']
# define the evaluation procedure
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
# define the grid search procedure
grid_search = GridSearchCV(estimator=model, param_grid=grid, n_jobs=-1, cv=cv, scoring='accuracy')
# execute the grid search
grid_result = grid_search.fit(X_train_encode, y_train)
# summarize the best score and configuration
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
# summarize all scores that were evaluated
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))

In [None]:
clf = make_pipeline(StandardScaler(), SVC(kernel='linear',C=1,gamma=1))
clf.fit(X_train_encode, y_train)
y_pred = clf.predict(X_test_encode)

print(f'Linear SVM & {accuracy_score(y_test, y_pred).round(4)} & {recall_score(y_test, y_pred).round(4)} & {precision_score(y_test, y_pred).round(4)} & {f1_score(y_test, y_pred).round(4)}')

conf_matrix = confusion_matrix(y_true=y_test, y_pred=y_pred)
fig, ax = plt.subplots(figsize=(5, 5))
ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')
    
plt.xlabel('Predictions', fontsize=18)
plt.ylabel('Actuals', fontsize=18)
plt.title('Confusion Matrix', fontsize=18)
plt.show()
    

In [None]:
from sklearn.datasets import make_classification
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import GridSearchCV

# define the model with default hyperparameters
model = MLPClassifier(max_iter=1000)
# define the grid of values to search
grid = {
    'hidden_layer_sizes': [(10,30,10),(20,),(10,),(100,)],
    'activation': ['tanh', 'relu'],
    'solver': ['sgd', 'adam'],
    'alpha': [0.0001, 0.05,1],
    'learning_rate': ['constant','adaptive'],
}
# define the evaluation procedure
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
# define the grid search procedure
grid_search = GridSearchCV(estimator=model, param_grid=grid, n_jobs=-1, cv=cv, scoring='accuracy')
# execute the grid search
grid_result = grid_search.fit(X_train_encode, y_train)
# summarize the best score and configuration
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
# summarize all scores that were evaluated
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))

In [None]:
clf = make_pipeline(StandardScaler(), MLPClassifier(max_iter=1000,activation='tanh',alpha=0.0001,hidden_layer_sizes=(100,),learning_rate='adaptive',solver='adam'))
clf.fit(X_train_encode, y_train)
y_pred = clf.predict(X_test_encode)

print(f'Neural Net & {accuracy_score(y_test, y_pred).round(4)} & {recall_score(y_test, y_pred).round(4)} & {precision_score(y_test, y_pred).round(4)} & {f1_score(y_test, y_pred).round(4)}')

conf_matrix = confusion_matrix(y_true=y_test, y_pred=y_pred)
fig, ax = plt.subplots(figsize=(5, 5))
ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')
    
plt.xlabel('Predictions', fontsize=18)
plt.ylabel('Actuals', fontsize=18)
plt.title('Confusion Matrix', fontsize=18)
plt.show()

In [None]:
from sklearn.datasets import make_classification
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.model_selection import GridSearchCV

# define the model with default hyperparameters
model = AdaBoostClassifier()
# define the grid of values to search
grid = dict()
grid['n_estimators'] = [10, 50, 100, 500]
grid['learning_rate'] = [0.0001, 0.001, 0.01, 0.1, 1.0]
# define the evaluation procedure
cv = RepeatedStratifiedKFold(n_splits=10, n_repeats=3, random_state=1)
# define the grid search procedure
grid_search = GridSearchCV(estimator=model, param_grid=grid, n_jobs=-1, cv=cv, scoring='accuracy')
# execute the grid search
grid_result = grid_search.fit(X_train_encode, y_train)
# summarize the best score and configuration
print("Best: %f using %s" % (grid_result.best_score_, grid_result.best_params_))
# summarize all scores that were evaluated
means = grid_result.cv_results_['mean_test_score']
stds = grid_result.cv_results_['std_test_score']
params = grid_result.cv_results_['params']
for mean, stdev, param in zip(means, stds, params):
    print("%f (%f) with: %r" % (mean, stdev, param))

In [None]:
clf = make_pipeline(StandardScaler(), AdaBoostClassifier(learning_rate=1.0,n_estimators=500))
clf.fit(X_train_encode, y_train)
y_pred = clf.predict(X_test_encode)

print(f'AdaBoost & {accuracy_score(y_test, y_pred).round(4)} & {recall_score(y_test, y_pred).round(4)} & {precision_score(y_test, y_pred).round(4)} & {f1_score(y_test, y_pred).round(4)}')

conf_matrix = confusion_matrix(y_true=y_test, y_pred=y_pred)
fig, ax = plt.subplots(figsize=(5, 5))
ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')
    
plt.xlabel('Predictions', fontsize=18)
plt.ylabel('Actuals', fontsize=18)
plt.title('Confusion Matrix', fontsize=18)
plt.show()

## Dense classifier

In [None]:
def build_model_dense(hp):
    n_inputs = X_train_encode.shape[1]

    model = keras.Sequential()
    model.add(Input(shape=(n_inputs,)))
    model.add(Dense(units=hp.Int("units0", min_value=240, max_value=960, step=120),
      activation='relu'))
    model.add(Dense(units=hp.Int("units1", min_value=360, max_value=960, step=120),
      activation='relu'))
    model.add(Dense(units=hp.Int("units2", min_value=200, max_value=960, step=100),
      activation='relu'))
    model.add(Dense(units=hp.Int("units3", min_value=60, max_value=240, step=60),
      activation='relu'))
    model.add(keras.layers.Dense(1, activation='sigmoid'))
    model.compile(optimizer='SGD',loss='mse')
    print(model.summary())
    return model

In [None]:
train = False
if train:
    tuner = keras_tuner.RandomSearch(
        build_model_dense,
        objective='val_loss',
        max_trials=5,
        project_name="Classifier_Dense")
    tuner.search(X_train_encode, y_train, epochs=10, callbacks=[
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min"),
            keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.0001),
        ], validation_split=0.15)
    bestHP = tuner.get_best_hyperparameters(num_trials=1)[0]
    # build the best model and train it
    print("[INFO] training the best model...")
    model = tuner.hypermodel.build(bestHP)
    H = model.fit(x=X_train_encode, y=y_train,
        validation_split=0.15,
        epochs=50, callbacks=[
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min"),
            keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.0001),
        ], verbose=1)
else:
    if os.path.exists('DENSE_Classifier.h5'):
        model = tf.keras.models.load_model('DENSE_Classifier.h5')


In [None]:

# evaluate the network
print("[INFO] evaluating network...")
predictions = model.predict(x=X_test_encode, batch_size=32).round()
print(classification_report(y_test,predictions))
# generate the training loss/accuracy plot

conf_matrix = confusion_matrix(y_true=y_test, y_pred=predictions)
fig, ax = plt.subplots(figsize=(5, 5))
ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')
   
plt.xlabel('Predictions', fontsize=18)
plt.ylabel('Actuals', fontsize=18)
plt.title('Confusion Matrix', fontsize=18)
plt.show()

In [None]:
plt.plot(H.history['loss'], label='train')
plt.plot(H.history['val_loss'], label='test')
plt.legend()
plt.show()

In [None]:
print(f'Custom dense classifier & {accuracy_score(y_test, predictions).round(4)} & {recall_score(y_test, predictions).round(4)} & {precision_score(y_test, predictions).round(4)} & {f1_score(y_test, predictions).round(4)}')

In [None]:
item = 0
tmp = 0
idx = 0

idx_m = 0
idx_p = 0

fig, axs = plt.subplots(4)
for index,row in x_test.iterrows():
    fig.set_size_inches(15, 20)
    if(idx == 4):
        break
    if(predictions[item] == 0 and y_test[item] == 1 and idx_m < 2):
        axs[idx].plot(row)
        axs[idx].set_title("Predicted Noise | Actuals Spike")
        idx += 1
        idx_m+=1
    if(predictions[item] == 1 and y_test[item] == 0  and idx_p < 2):
        axs[idx].plot(row)
        axs[idx].set_title("Predicted Spike | Actuals Noise")
        idx += 1
        idx_p+=1

    item += 1
fig.show()

## CNN Classifier

In [None]:
def build_model_cnn(hp):
    n_inputs = X_train_encode.shape[1]

    model = keras.Sequential()
    model.add(Input(shape=(n_inputs,1)))
    model.add(Conv1D(filters = hp.Choice('cnn1', [32,64,128,256]), kernel_size = hp.Choice('kernel1', [3,5,7,9]), activation = 'relu', padding = 'same'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Conv1D(filters = hp.Choice('cnn2', [32,64,128,256]), kernel_size = hp.Choice('kernel2', [3,5,7,9]), activation = 'relu', padding = 'same'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Conv1D(filters = hp.Choice('cnn3', [32,64,128,256]), kernel_size = hp.Choice('kernel3', [3,5,7,9]), activation = 'relu', padding = 'same'))
    model.add(MaxPooling1D(pool_size=2))
    model.add(Flatten())
    model.add(Dense(units=50, activation='relu'))
    model.add(Dense(units=1, activation='sigmoid'))

    # Compile the model with Adam optimizer
    model.compile(loss='binary_crossentropy',
                metrics=['accuracy'],
                optimizer='Adam')
    print(model.summary())
    return model


In [None]:
train = False
if train:
    tuner = keras_tuner.RandomSearch(
        build_model_cnn,
        objective='val_loss',
        max_trials=5,
        project_name="Classifier_CNN")
    tuner.search(X_train_encode, y_train, epochs=10, callbacks=[
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min"),
            keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.0001),
        ], validation_split=0.15)
    bestHP = tuner.get_best_hyperparameters(num_trials=1)[0]
    # build the best model and train it
    print("[INFO] training the best model...")
    model = tuner.hypermodel.build(bestHP)
    H = model.fit(x=X_train_encode, y=y_train,
        validation_split=0.15,
        epochs=50, callbacks=[
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=7, mode="min"),
            keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.0001),
        ], verbose=1)
else:
    if os.path.exists('CNN_Classifier.h5'):
        model = tf.keras.models.load_model('CNN_Classifier.h5')

In [None]:
# evaluate the network
print("[INFO] evaluating network...")
predictions = model.predict(x=X_test_encode, batch_size=32).round()
print(classification_report(y_test,predictions))
# generate the training loss/accuracy plot

conf_matrix = confusion_matrix(y_true=y_test, y_pred=predictions)
fig, ax = plt.subplots(figsize=(5, 5))
ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')
   
plt.xlabel('Predictions', fontsize=18)
plt.ylabel('Actuals', fontsize=18)
plt.title('Confusion Matrix', fontsize=18)
plt.show()

In [None]:
plt.plot(H.history['loss'], label='train')
plt.plot(H.history['val_loss'], label='test')
plt.legend()
plt.show()

In [None]:
print(f'Custom CNN classifier & {accuracy_score(y_test, predictions).round(4)} & {recall_score(y_test, predictions).round(4)} & {precision_score(y_test, predictions).round(4)} & {f1_score(y_test, predictions).round(4)}')

In [None]:
item = 0
tmp = 0
idx = 0

idx_m = 0
idx_p = 0

fig, axs = plt.subplots(4)
for index,row in x_test.iterrows():
    fig.set_size_inches(15, 20)
    if(idx == 4):
        break
    if(predictions[item] == 0 and y_test[item] == 1 and idx_m < 2):
        axs[idx].plot(row)
        axs[idx].set_title("Predicted Noise | Actuals Spike")
        idx += 1
        idx_m+=1
    if(predictions[item] == 1 and y_test[item] == 0  and idx_p < 2):
        axs[idx].plot(row)
        axs[idx].set_title("Predicted Spike | Actuals Noise")
        idx += 1
        idx_p+=1

    item += 1
fig.show()

## RNN Classifier

In [None]:
def build_model_rnn(hp):
    n_inputs = X_train_encode.shape[1]

    model = keras.Sequential()
    model.add(Input(shape=(n_inputs,1)))
    model.add(Bidirectional(LSTM(hp.Int("units1", min_value=128, max_value=512, step=128), return_sequences=True)))
    model.add(Bidirectional(LSTM(hp.Int("units2", min_value=64, max_value=256, step=64), return_sequences=True)))
    model.add(Bidirectional(LSTM(hp.Int("units3", min_value=32, max_value=128, step=32), return_sequences=True)))
    model.add(Dense(1, activation='sigmoid'))

    # Compile the model with Adam optimizer
    model.compile(loss='binary_crossentropy',
                metrics=['accuracy'],
                optimizer='Adam')
    print(model.summary())
    return model


In [None]:
train = False
if train:
    tuner = keras_tuner.RandomSearch(
        build_model_rnn,
        objective='val_loss',
        max_trials=5,
        project_name="Classifier_RNN")
    tuner.search(X_train_encode, y_train, epochs=10, callbacks=[
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min"),
            keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.0001),
        ], validation_split=0.15)
    bestHP = tuner.get_best_hyperparameters(num_trials=1)[0]

    # build the best model and train it
    print("[INFO] training the best model...")
    model = tuner.hypermodel.build(bestHP)
    H = model.fit(x=X_train_encode, y=y_train,
        validation_split=0.15,
        epochs=20, callbacks=[
            keras.callbacks.EarlyStopping(monitor="val_loss", patience=5, mode="min"),
            keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.0001),
        ], verbose=1)
else:
    if os.path.exists('RNN_Classifier.h5'):
        model = tf.keras.models.load_model('RNN_Classifier.h5')

In [None]:

# evaluate the network
print("[INFO] evaluating network...")
predictions = model.predict(x=X_test_encode, batch_size=32).round()
print(classification_report(y_test,predictions))
# generate the training loss/accuracy plot

conf_matrix = confusion_matrix(y_true=y_test, y_pred=predictions)
fig, ax = plt.subplots(figsize=(5, 5))
ax.matshow(conf_matrix, cmap=plt.cm.Oranges, alpha=0.3)
for i in range(conf_matrix.shape[0]):
    for j in range(conf_matrix.shape[1]):
        ax.text(x=j, y=i,s=conf_matrix[i, j], va='center', ha='center', size='xx-large')
   
plt.xlabel('Predictions', fontsize=18)
plt.ylabel('Actuals', fontsize=18)
plt.title('Confusion Matrix', fontsize=18)
plt.show()

In [None]:
plt.plot(H.history['loss'], label='train')
plt.plot(H.history['val_loss'], label='test')
plt.legend()
plt.show()

In [None]:
print(f'Custom RNN classifier & {accuracy_score(y_test, predictions).round(4)} & {recall_score(y_test, predictions).round(4)} & {precision_score(y_test, predictions).round(4)} & {f1_score(y_test, predictions).round(4)}')

In [None]:
item = 0
tmp = 0
idx = 0

idx_m = 0
idx_p = 0

fig, axs = plt.subplots(4)
for index,row in x_test.iterrows():
    fig.set_size_inches(15, 20)
    if(idx == 4):
        break
    if(predictions[item] == 0 and y_test[item] == 1 and idx_m < 2):
        axs[idx].plot(row)
        axs[idx].set_title("Predicted Noise | Actuals Spike")
        idx += 1
        idx_m+=1
    if(predictions[item] == 1 and y_test[item] == 0  and idx_p < 2):
        axs[idx].plot(row)
        axs[idx].set_title("Predicted Spike | Actuals Noise")
        idx += 1
        idx_p+=1

    item += 1
fig.show()