In [None]:
import os
import numpy as np
import tensorflow as tf
import keras
import nengo_dl
from tensorflow.python.keras import Input, Model
import nengo
from tensorflow.python.keras.callbacks import EarlyStopping
from tensorflow.python.keras.layers import Conv2D, Dropout, AveragePooling2D, Flatten, Dense, BatchNormalization, Conv3D
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from keras import backend as K
import pandas as pd
from sklearn import metrics

In [None]:
# List of files with samples from each participant
dataset_path = os.path.join('dataset_result')
files = [os.path.join(dataset_path, 'P{:02d}.npz'.format(i+1)) for i in range(18)] # P01 - P18 files

In [None]:
# set seed to produce consistent result
seed = 2
np.random.seed(seed)
tf.random.set_seed(seed)

In [None]:
# Function to create the CNN model
def cnn_model():
    inp = Input(shape=(14, 360, 1), name='input_layer')
    conv1 = Conv2D(filters=32, kernel_size=(3, 3), activation=tf.nn.relu,)(inp)
    dropout1 = Dropout(0.2, seed=seed)(conv1)
    avg_pool1 = AveragePooling2D(pool_size=(2, 2))(dropout1)
    conv2 = Conv2D(filters=64, kernel_size=(3, 3), activation=tf.nn.relu)(avg_pool1)
    dropout2 = Dropout(0.2, seed=seed)(conv2)
    avg_pool2 = AveragePooling2D(pool_size=(2, 2))(dropout2)
    flatten = Flatten()(avg_pool2)
    dense1 = Dense(512, activation=tf.nn.relu)(flatten)
    dropout3 = Dropout(0.2, seed=seed)(dense1)
    dense2 = Dense(256, activation=tf.nn.relu)(dropout3)
    output = Dense(2, activation=tf.nn.softmax, name='output_layer')(dense2)

    return Model(inputs=inp, outputs=output)

In [None]:
def get_metrics(simulator, output_layer, x_test, y_test, minibatch_size, network_name):
    """
    Function for calculating metrics
    :param simulator: simulator instance
    :param input_layer: input layer reference
    :param output_layer: output layer reference
    :param x_test: features of the testing subset
    :param y_test: labels of the testing subset
    :param network_name: name of the network
    :return: accuracy, recall and precision metrics
    """

    # Truncate the remaining number of samples since the predict function does use minibatch
    samples = (x_test.shape[0] // minibatch_size ) * minibatch_size
    x_test, y_test = x_test[:samples], y_test[:samples]

    predictions = simulator.predict(x_test)[output_layer] # get result from output layer when predicting on x_test
    predictions = predictions[:,-1,:] # get the last timestep
    predictions_argm = np.argmax(predictions, axis=-1) # get predicted label

    y_test = np.squeeze(y_test, axis=1) # remove time dimension
    y_test_argm = np.argmax(y_test, axis=-1) # get labels

    precision = metrics.precision_score(y_true=y_test_argm, y_pred=predictions_argm, average='binary') # get precision score
    recall = metrics.recall_score(y_true=y_test_argm, y_pred=predictions_argm, average='binary') # get recall
    f1 = metrics.f1_score(y_true=y_test_argm, y_pred=predictions_argm, average='binary')
    accuracy = metrics.accuracy_score(y_true=y_test_argm, y_pred=predictions_argm) # get accuracy
    confusion_matrix = metrics.confusion_matrix(y_true=y_test_argm, y_pred=predictions_argm)

    # Log the statistics
    print(f'{network_name}: accuracy = {accuracy * 100}%, precision = {precision}, '
          f'recall = {recall}, f1 = {f1}')
    print('Confusion matrix:')
    print(confusion_matrix)

    return accuracy, precision, recall, f1, confusion_matrix

def run_ann(model, train, test, params_save_path, iteration, shuffle_training=True, num_epochs=30):
    """
    Run analog network with cross-validation
    :param model: reference to the tensorflow model
    :param train: pair of training data (x_train, y_train)
    :param valid: pair of validation data (x_val, y_val)
    :param test: pair of testing data (x_test, y_test)
    :param params_save_path: output path to save weights of the network
    :param iteration: number of the iteration in CV
    :param shuffle_training: shuffle samples
    :param num_epochs: number of epochs to train for
    :return: accuracy, precision, recall, f1 and confusion matrix from the testing data
    """
    x_train, y_train = train[0], train[1]
    x_test, y_test = test[0], test[1]

    converter = nengo_dl.Converter(model)

    with nengo_dl.Simulator(converter.net) as simulator:
        simulator.compile(optimizer=keras.optimizers.Adam(),
                          loss=keras.losses.BinaryCrossentropy(),
                          metrics=['accuracy'])

        input_layer = converter.inputs[model.get_layer('input_layer')] # get the input layer reference
        output_layer = converter.outputs[model.get_layer('output_layer')] # get the output layer reference

        # fit the model with the training data
        simulator.fit(
            x={ input_layer: x_train }, y={ output_layer: y_train },
            epochs=num_epochs,
            shuffle=shuffle_training,
            # early stop to avoid overfitting
            callbacks=[EarlyStopping(patience=20, verbose=1, monitor='probe_loss', restore_best_weights=True)]
        )

        simulator.save_params(params_save_path) # save weights to the file

        # Get statistics
        accuracy, precision, recall, f1, confusion_matrix = get_metrics(simulator, output_layer, x_test, y_test, 16,
                                                                        f'{iteration}. CNN')
        return {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'confusion_matrix': confusion_matrix
        }


def run_snn(model, x_test, y_test, params_load_path, iteration, timesteps=50, scale_firing_rates=1000, synapse=0.01):
    """
    Run model in spiking setting
    :param model: model reference
    :param x_test: testing features
    :param y_test: testing labels
    :param params_load_path: path to load parameters
    :param iteration: number of current iteration
    :param timesteps: number of timesteps
    :param scale_firing_rates: firing rate scaling
    :param synapse: synaptic smoothing
    :return: accuracy, precision, recall, f1 and confusion matrix from the testing data
    """
    converter = nengo_dl.Converter(
        model,
        swap_activations={ tf.nn.relu: nengo.SpikingRectifiedLinear() },
        scale_firing_rates=scale_firing_rates,
        synapse=synapse
    ) # create a Nengo converter object and swap all relu activations with spiking relu

    with converter.net:
        nengo_dl.configure_settings(stateful=False)

    output_layer = converter.outputs[model.get_layer('output_layer')] # output layer for simulator

    x_test_tiled = np.tile(x_test, (1, timesteps, 1)) # tile test data to timesteps

    with nengo_dl.Simulator(converter.net) as simulator:
        simulator.load_params(params_load_path)

        # Get statistics
        accuracy, precision, recall, f1, confusion_matrix = get_metrics(simulator, output_layer, x_test_tiled, y_test, 1,
                                                                        f'{iteration}. CNN (SNN conversion)')
        return {
            'accuracy': accuracy,
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'confusion_matrix': confusion_matrix
        }

In [None]:
def reshape_dataset(features, labels):
    """
    Function to reshape the dataset for it to be usable with Nengo
    :param features: numpy array containing features
    :param labels: numpy array containing labels
    :return: transformed features and labels
    """
    labels = labels.reshape((-1, 1))
    labels = OneHotEncoder().fit_transform(labels).toarray()
    labels = labels.reshape((labels.shape[0], 1, -1)) # flatten and add time dimension (necessary for nengo)
    features = features.reshape((features.shape[0], 14, -1)) # reshape to channels x data
    features = features.reshape((features.shape[0], 1, -1)) # flatten and add time dimension

    return features, labels

In [None]:
def run_individual(file, particip_num, model, params_save_path, test_size=0.25, epochs=30, scale_firing_rates=1000,
                   synapse=0.01, timesteps=50):
    print(f'Running ANN and SNN for file: {file}')

    dataset = np.load(file) # load numpy file containing the preprocessed data for specific participant
    features, labels = dataset['features'], dataset['labels'] # get features and labels from the numpy file

    # Transform numpy arrays to be usable with Nengo
    features, labels = reshape_dataset(features, labels)

    # Split the data into 75% training and 25% testing
    x_train, x_test, y_train, y_test = train_test_split(features, labels, test_size=test_size, random_state=seed,
                                                        shuffle=True)
    print('X (train) shape:', x_train.shape, 'Y (train) shape:', y_train.shape)
    print('X (test) shape:', x_test.shape, 'Y (test) shape:', y_test.shape)

    ann_stats = run_ann(model, (x_train, y_train), (x_test, y_test), params_save_path, particip_num, epochs=epochs)
    snn_stats = run_snn(model, x_test,  y_test, params_save_path, particip_num,
                      timesteps=timesteps, synapse=synapse, scale_firing_rates=scale_firing_rates)

    return ann_stats, snn_stats

In [None]:
ann, snn = [], []
participant_no = 1

params_save_dir = 'cnn_individuals_nengo_params'
os.makedirs(params_save_dir, exist_ok=True)

for file in files:
    file_name = 'P{:02d}'.format(participant_no)
    model = cnn_model()
    params_save_path = os.path.join(params_save_dir, file_name)

    ann_stats, snn_stats = run_individual(file, participant_no, model, params_save_path)

    ann.append(ann_stats) # append statistics to the list
    snn.append(snn_stats)

    participant_no += 1 # increase participant number

    # Delete model and clear session to prevent memory leaks
    K.clear_session()
    del model

In [None]:
data = {
    'participant': [x for x in range(len(files))],
    'ann_accuracy': [x['accuracy'] for x in ann],
    'ann_precision': [x['precision'] for x in ann],
    'ann_recall': [x['recall'] for x in ann],
    'ann_f1': [x['f1'] for x in ann],
    'snn_accuracy': [x['accuracy'] for x in snn],
    'snn_precision': [x['precision'] for x in snn],
    'snn_recall': [x['recall'] for x in snn],
    'snn_f1': [x['f1'] for x in snn]
}

data

In [None]:
data_output_folder = 'results' # output path for data from each iteration
os.makedirs(data_output_folder, exist_ok=True)

df = pd.DataFrame(data) # create pandas dataframe and save it
df.to_excel(os.path.join(data_output_folder, 'cnn_individuals.xlsx'))

'Statistics for iterations successfully saved.'

In [None]:
# Create statistics such as maximums and averages for each metric
data_stats = {
    'models': ['ann', 'snn'],
    'average_accuracy': [],
    'max_accuracy': [],
    'accuracy_std': [],
    'average_precision': [],
    'max_precision': [],
    'average_recall': [],
    'max_recall': [],
    'average_f1': [],
    'max_f1': []
}

# slightly less code if we iterate over snn_{metric_name} in dictionary
for model in ['ann', 'snn']:
    data_stats['average_accuracy'].append(df[f'{model}_accuracy'].mean())
    data_stats['accuracy_std'].append(df[f'{model}_accuracy'].std())
    data_stats['average_precision'].append(df[f'{model}_precision'].mean())
    data_stats['average_recall'].append(df[f'{model}_recall'].mean())
    data_stats['average_f1'].append(df[f'{model}_f1'].mean())
    data_stats['max_accuracy'].append(df[f'{model}_accuracy'].max())
    data_stats['max_f1'].append(df[f'{model}_f1'].max())
    data_stats['max_precision'].append(df[f'{model}_precision'].max())
    data_stats['max_recall'].append(df[f'{model}_recall'].max())

data_stats

In [None]:
# create dataframe for statistics and save it
df_stats = pd.DataFrame(data_stats)
df_stats.to_excel(os.path.join(data_output_folder, 'cnn_individuals_stats.xlsx'))

'File with statistics successfully saved.'

In [None]:
# Print confusion matrices for ANN
conf_matrices_ann = [x['confusion_matrix'] for x in ann]
print('Confusion matrices for the ANN:')
for confusion_matrix in conf_matrices_ann:
    print(confusion_matrix)

# Print confusion matrices for SNN
conf_matrices_snn = [x['confusion_matrix'] for x in snn]
print('\nConfusion matrices for the SNN')
for confusion_matrix in conf_matrices_snn:
    print(confusion_matrix)

