# Imports

In [None]:
import numpy as np
import pandas as pd
import json
import os
import glob
import random
import gc
import keras
import mcfly
from keras import backend as K
import tensorflow as tf
import tensorflow_addons as tfa
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report
from sklearn.metrics import multilabel_confusion_matrix
from sklearn.metrics import auc, roc_curve, precision_recall_curve
from sklearn.calibration import calibration_curve
from datetime import datetime 
from matplotlib import pyplot as plt
import matplotlib.lines as mlines
import seaborn as sns
import math
from lime import explanation
from lime import lime_base
from lime_timeseries import LimeTimeSeriesExplainer

In [None]:
#set to True if want to generate models, if already ran the script and have the models set to False
generate_models = False
#number of models to generate, if generate_models = True
n_models = 10

#set to True if want to train the model, False to load trained model
train_model = False
#set to True if want to save the plots
save_plots = False

#set to True if want to add weights to loss function
add_weights = True


#path to file with indexes of files split into training, val and test
#split_path = "400_dumped/Final_Data/split/train_val_test.json"
split_path = "400_to_val/Final_Data/split/train_val_test.json"

#paths to the labels and the data
#labels_path = "400_dumped/Final_Data/labels/labels.npy"
#samples_path = "400_dumped/Final_Data/samples/"

labels_path = "400_to_val/Final_Data/labels/labels.npy"
samples_path = "400_to_val/Final_Data/samples/"

#paths to store and retrieve model types, architectures and hyperparameters
#archi_path = "400_dumped/Models_Final_Data/architecture/architecture_"
#params_path = "400_dumped/Models_Final_Data/parameters/params_"
#type_path = "400_dumped/Models_Final_Data/type/type_"

archi_path = "400_to_val/Models_Final_Data/architecture/architecture_"
params_path = "400_to_val/Models_Final_Data/parameters/params_"
type_path = "400_to_val/Models_Final_Data/type/type_"

#choose model to laod and train
model_name = "Model0.json"

#path to store trained model
#trained_path = "400_dumped/Models_Final_Data/trained/" + model_name
trained_path = "400_to_val/Models_Final_Data/trained/" + model_name

#path to save plots
model_name_no_extension = model_name.split(".", 1)[0] + "/"
#plot_path = "400_dumped/Models_Final_Data/plots/" + model_name_no_extension
plot_path = "400_to_val/Models_Final_Data/plots/" + model_name_no_extension

#set the seed 
random.seed(0) #generation of train, val, test sets
np.random.seed(0) #mcfly models
tf.random.set_seed(0) #keras training

# Dictionary with sample id and label

In [None]:
labels_array = np.load(labels_path)
labels = dict()

for row in labels_array:
    labels[row[0]] = int(row[1])

    
del labels_array
gc.collect()

# Train, val, test split

In [None]:
#open train, val, test split dictionary
with open(split_path, "r") as fp:
    train_val_test_dict = json.load(fp)    

In [None]:
#function to create validation set and store in memory
def set_generation(val_or_test, train_val_test_dict, labels, dim = (2500, 8)):
    n_samples = len(train_val_test_dict[val_or_test])

    #Initialise
    X = np.empty((n_samples, dim[0], dim[1]))
    y = np.empty((n_samples), dtype = int)

    #Generate data
    for i, ID in enumerate(train_val_test_dict[val_or_test]):
        #store sample
        X[i,] = np.load(samples_path + ID +".npy")

        #store class
        y[i] = labels[ID]
    
    return X, y

In [None]:
X_val, y_val = set_generation("val", train_val_test_dict, labels, (2500, 8))
y_val = keras.utils.to_categorical(y_val, 2)

#X_test, y_test = set_generation("test", train_val_test_dict, labels, (2500, 8))
#y_test = keras.utils.to_categorical(y_test, 2)

# Checks

In [None]:
print(len(train_val_test_dict["train"]))
print(len(train_val_test_dict["val"]))
print(len(train_val_test_dict["test"]))
print(len(train_val_test_dict["train"]) + len(train_val_test_dict["val"]) + len(train_val_test_dict["test"]))

# Generate McFly Models

In [None]:
if generate_models:
    X_train_shape = (len(train_val_test_dict["train"]), 2500, 8)
    models = mcfly.modelgen.generate_models(X_train_shape, 
                                           number_of_classes = 2,
                                           number_of_models = n_models,
                                           metrics = ["accuracy"])
    
    models_to_print = range(len(models))
    for i, item in enumerate(models):
        if i in models_to_print:
            model, params, model_types = item
            print("--------------------------------------------------------------------")
            print("Model" + str(i))
            print("  ")
            print("Hyperparameters:")
            print(params)
            print("  ")
            print("Model description:")
            model.summary()
            print("  ")
            print("Model type:")
            print(model_types)
            print(" ") 

            for key, value in params.items():
                if isinstance(value, np.ndarray):
                    params[key] = value.tolist()

            name = "Model" + str(i)
            model_type = {"type": model_types}

            with open(archi_path + name + ".json", "w") as f:
                json.dump(model.to_json(), f)

            with open(params_path + name + ".json", "w") as f:
                json.dump(params, f)

            with open(type_path + name + ".json", "w") as f:
                json.dump(model_type, f)

# Data Loader

In [None]:
class DataGenerator(keras.utils.Sequence):    

    def __init__(self, list_IDs, labels, batch_size = 32, dim = (2500,), n_channels = 8, n_classes=2, shuffle = True):
        #"Initialization"
        self.dim = dim
        self.batch_size = batch_size
        self.labels = labels
        self.list_IDs = list_IDs
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()
        
    def __len__(self):
        #number of batches per epoch
        return int(np.floor(len(self.list_IDs)/self.batch_size))
    
    def __getitem__(self, index):
        #Generates indexes of one batch of data
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]
        
        #find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]
        
        #Generate data
        X, y = self.__data_generation(list_IDs_temp)        
        return X, y
    
    def on_epoch_end(self):
        #updates indexes after each epoch
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)
    
    def __data_generation(self, list_IDs_temp):
        #Generates data containing batch_size samples
        
        #Initialise
        X = np.empty((self.batch_size, *self.dim, self.n_channels))
        y = np.empty((self.batch_size), dtype = int)
        
        #Generate data
        for i, ID in enumerate(list_IDs_temp):
            #store sample
            X[i,] = np.load(samples_path + ID +".npy")
            
            #store class
            y[i] = self.labels[ID]
        
        return X, keras.utils.to_categorical(y, num_classes = self.n_classes)

In [None]:
if train_model: 
    #define parameters
    params = {"dim" : (2500,),
             "batch_size": 32,
             "n_classes": 2,
             "n_channels":8,
             "shuffle" :True}

    #Generators 
    training_generator = DataGenerator(train_val_test_dict["train"], labels, **params)

# Load Model

In [None]:
with open(archi_path + model_name, "r") as f:
    model_loaded = json.load(f)
    model = keras.models.model_from_json(model_loaded)

In [None]:
with open(params_path + model_name, "r") as f:
    mcfly_params = json.load(f)    
    lr = mcfly_params["learning_rate"]
    rr = mcfly_params["regularization_rate"]
    print(lr, rr)

In [None]:
with open(type_path + model_name, "r") as f:
    model_type = json.load(f)    
    print(model_type)

In [None]:
model.summary()

In [None]:
#set F1 as metric on val set since val test is still imbalanced
def recall_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true[:,1] * y_pred[:,1], 0, 1)))
    possible_positives = K.sum(K.round(K.clip(y_true[:,1], 0, 1)))
    recall = true_positives / (possible_positives + K.epsilon())
    return recall

def precision_m(y_true, y_pred):
    true_positives = K.sum(K.round(K.clip(y_true[:,1]* y_pred[:,1], 0, 1)))
    predicted_positives = K.sum(K.round(K.clip(y_pred[:,1], 0, 1)))
    precision = true_positives / (predicted_positives + K.epsilon())
    return precision

def f1_m(y_true, y_pred):
    precision = precision_m(y_true, y_pred)
    recall = recall_m(y_true, y_pred)
    return 2*(precision*recall)/(precision + recall + K.epsilon())


In [None]:
if train_model: 
    
    if add_weights:    
        #metrics = ["accuracy", f1_m, tfa.metrics.MatthewsCorrelationCoefficient(num_classes=2)]
        metric = ["accuracy"]
        model.compile(loss="categorical_crossentropy", optimizer = "adam", metrics = metric)  

        #calculate class imbalance
        zeroes = 0
        ones = 0
        for i, ID in enumerate(train_val_test_dict["train"]):
            if labels[ID] == 0:
                zeroes = zeroes + 1
            if labels[ID] == 1:
                ones = ones + 1

        if ones < zeroes:
            class_weights = {0: 1., 1: zeroes/ones}
        elif zeroes < ones:
            class_weights = {0: ones/zeroes, 1: 1.}
        else:
            class_weights = {0: 1., 1: 1.}
    else:
        class_weights = {0: 1., 1: 1.}
        
    #print time    
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print(current_time)
    
    #train
    callback = tf.keras.callbacks.EarlyStopping(monitor = "val_loss", patience = 5, restore_best_weights = True)
    history = model.fit(training_generator, 
              validation_data = (X_val, y_val), 
              epochs = 20,
              class_weight = class_weights, 
              callbacks = callback,
              verbose = True)
    
    #print time
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print(current_time)
    
    #save the model
    model.save(trained_path)
    
else:
    #load the model
    #model = keras.models.load_model(trained_path, custom_objects = {"f1_m": f1_m})
    model = keras.models.load_model(trained_path)

# Predictions

In [None]:
pred_probas = model.predict(X_val)

In [None]:
#no BrS would appear as 0, hence transformed to [1,0] => the first column returns 1 if no BrS, 0 otherwise
no_BrS = y_val[:, 0]

#BrS appears as 1, hence transformed to [0,1] => the second column returns 1 if BrS, 0 otherwise
BrS = y_val[:,1]

# Performance

In [None]:
BrS_probas = pred_probas[:,1]
BrS_predictions = pred_probas.argmax(axis = -1)
BrS_predictions

In [None]:
no_BrS_probas = pred_probas[:,0]
no_BrS_predictions = pred_probas.argmin(axis = -1)
no_BrS_predictions

In [None]:
def performance_metrics(y_true, y_pred, y_proba):
    conf_mat = confusion_matrix(y_true, y_pred)
    print("Confusion matrix: ")
    print(conf_mat)
    tn,fp,fn,tp = conf_mat.ravel()
    print("tn: ", tn," fp: ", fp," fn: ", fn," tp: ", tp)
    
    print("")
    matthews = ((tp*tn) - (fp*fn)) / math.sqrt(((tp+fp)*(tp+fn)*(tn+fp)*(tn+fn)))
    print("Matthews Correlation Coefficient: ", matthews)
    
    print("")
    print(classification_report(y_true, y_pred))
    
    print("")           
    precision_bis = tp/(tp+fp)
    recall_bis = tp/(tp+fn)
    f1 = 2*precision_bis*recall_bis/(precision_bis+recall_bis)
    print("precision: ", precision_bis)
    print("recall/sensitivity: ", recall_bis)
    print("specificity: ", tn/(tn+fp))
    print("accuracy: ", (tp+tn)/(tp+tn+fp+fn))    
    print("f1 score: ", f1) 

      
    print("")
    fpr, tpr, thresholds = roc_curve(y_true, y_proba)
    auc_coef = auc(fpr, tpr)
    precision, recall, thresholds = precision_recall_curve(y_true, y_proba)
    auprc = auc(recall, precision)
    print("auc: ", auc_coef)
    print("auprc: ", auprc)
    
    return

In [None]:
performance_metrics(BrS, BrS_predictions, BrS_probas)

In [None]:
#performance_metrics(no_BrS, no_BrS_predictions, no_BrS_probas)

# Plots

In [None]:
if train_model:
    #plot train and validation loss
    training_loss = history.history["loss"]
    validation_loss = history.history["val_loss"]

    #number of epochs
    epoch_count = range(1, len(training_loss) +1)

    #visualise loss history
    f, ax = plt.subplots(figsize=(6,6))      
    ax.plot(epoch_count, training_loss, "r--", label="Training Loss")
    ax.plot(epoch_count, validation_loss, "b-", label="Validation Loss")
    ax.set_xlabel("Epoch")
    ax.set_ylabel("Loss")
    ax.set_title("Training and Validation Loss Over the Epochs")
    ax.legend()
    plt.savefig(plot_path + "Loss.png")


In [None]:
#ROC curve
fpr, tpr, thresholds = roc_curve(BrS, BrS_probas)
auc_coef = round(auc(fpr, tpr),3)
f, ax = plt.subplots(figsize=(6,6))
ax.plot(fpr, tpr, marker=".", label = model_type["type"] + " - AUC: " + str(auc_coef))
ax.plot([0,1], [0,1], transform = ax.transAxes, linestyle="--", label="Random Classifier")
ax.set_xlabel("False Positive Rate")
ax.set_ylabel("True Positive Rate")
ax.set_title("ROC")
ax.legend()
if save_plots:
    plt.savefig(plot_path + "ROC.png")

In [None]:
#Precision Recall curve
precision, recall, thresholds = precision_recall_curve(BrS, BrS_probas)
auprc = round(auc(recall, precision),3)
f, ax = plt.subplots(figsize=(6,6))
ax.plot(recall, precision, marker=".", label = model_type["type"] + " - AUPRC: " + str(auprc))
ax.set_xlabel("Recall (Positive label: Brugada)")
ax.set_ylabel("Precision (Positive label: Brugada)")
ax.set_title("AUPRC")
ax.legend()

if save_plots:
    plt.savefig(plot_path + "PrecisionRecallCurve.png")

In [None]:
#Calibration
# bin data and normalise counts
def counts_to_percentages(probabilities):
    bin0_01 = 0
    bin01_02=0
    bin02_03=0
    bin03_04=0
    bin04_05=0
    bin05_06=0
    bin06_07=0
    bin07_08=0
    bin08_09=0
    bin09_1=0 
    
    for val in probabilities:
    
        if val <0.1:
            bin0_01 = bin0_01 + 1
    
        elif val >= 0.1 and val <0.2:
            bin01_02= bin01_02 +1 
    
        elif val >= 0.2 and val <0.3:
            bin02_03= bin02_03 +1 
    
        elif val >= 0.3 and val <0.4:
                bin03_04= bin03_04 +1
    
        elif val >= 0.4 and val <0.5:
                bin04_05= bin04_05 +1 
    
        elif val >= 0.5 and val <0.6:
                bin05_06= bin05_06 +1 
    
        elif val >= 0.6 and val <0.7:
                    bin06_07= bin06_07 +1 
    
        elif val >= 0.7 and val <0.8:
                    bin07_08= bin07_08 +1 
    
        elif val >= 0.8 and val <0.9:
                    bin08_09= bin08_09 +1 
    
        elif val >= 0.9:
                    bin09_1= bin09_1 +1 
                
    counts = [bin0_01, bin01_02, bin02_03, bin03_04, bin04_05,
             bin05_06, bin06_07, bin07_08, bin08_09, bin09_1]    
    
    percentages = counts/np.sum(counts) *100
    
    return percentages
    
    
#plot calibration plot and histogram together
def calibration_together (BrS, BrS_probas):        
    print("plot curves and save in one png file")
    #save three plots in one png file
    fig_index = 1
      
    #save three plots in one png file
    fig, (ax1, ax2) = plt.subplots(nrows=2, figsize=(7, 12))   
    
    # plot calibration curve LSTM
    y, x = calibration_curve(BrS, BrS_probas, n_bins=10)

    ax1.plot(x, y, 'C0',marker='o', linewidth=1, label= model_type["type"], color = "darkturquoise") 
    ax1.set(xlabel= 'Predicted probability', ylabel= 'True probability in each bin')
    
    line = mlines.Line2D([0, 1], [0, 1], color='black', linestyle='--', linewidth=0.9, label= "Perfectly calibrated")
    transform = ax1.transAxes
    line.set_transform(transform)
    ax1.add_line(line)     
    ax1.legend(loc="upper left")  
  
    #HISTOGRAMS    
    x = np.arange(0,1,0.1)
    
    #Before onset LSTM
    y = counts_to_percentages(BrS_probas)   #if instead of % want values in [0,1], do: y = counts_to_percentages(lstm_proba)/100 
    ax2.hist(x, range = [0,1], bins=10, weights = y, label= model_type["type"],
                 histtype="step", lw=3.5, color = "darkturquoise")
    
    ax2.set_xlabel("Mean predicted probability")
    ax2.set_ylabel("Percentage of counts")
    ax2.legend(loc="upper center", ncol=5)
    ax2.set_ylim([0,101]) #if instead of % want probabilities, change to [0,1]     

    #plt.tight_layout()
    if save_plots:
        plt.savefig(plot_path + "Calibration.png")
    plt.show()
    
    return

calibration_together(BrS, BrS_probas)


In [None]:
#Discrimination
def distribution(BrS, BrS_probas):
    #probabilities distributions graphs
    true_1 = pd.DataFrame(BrS_probas, columns=['Predicted probabilities'])
    true_1['labels'] = BrS.tolist()
    true_0 = true_1.copy(deep = True) 
    indexNames = true_1[true_1['labels'] == 0].index
    true_1.drop(indexNames , inplace=True)
    indexNames = true_0[ true_0['labels'] == 1 ].index
    true_0.drop(indexNames , inplace=True)
    true_1.drop(columns=['labels'], inplace = True)
    true_0.drop(columns=['labels'], inplace = True)
    
    sns.distplot(true_1['Predicted probabilities'], hist = False, kde = True,
                 kde_kws = {'shade': True, 'linewidth': 3,"color": "g"}, label = 'Class 1')
    plt.ylabel('Density')
    sns.distplot(true_0['Predicted probabilities'], hist = False, kde = True,
                     kde_kws = {'shade': True, 'linewidth': 3, "color": "r"}, label = 'Class 0')
    plt.legend(labels=["BrP","No BrP"])
    
    if save_plots:
        plt.savefig(plot_path + "Discrimination.png")
        
    plt.show()
    plt.clf()    
    return

distribution(BrS, BrS_probas)

In [None]:
skip this

# LIME

In [None]:
def custom_predict(trained_model):
    #puts sample in right format for keras prediction
    def func(sample):
        prediction = trained_model.predict(np.transpose(sample, axes=[0,2,1]))
        return prediction
    return func

In [None]:
def explain_sample_lime(explainer,series_ecg, num_features_ecg, n_samples, num_slices_ecg, replacement_method, predict_function = custom_predict(trained_model=model)):
    
    exp = explainer.explain_instance(series_ecg, predict_function, num_features=num_features_ecg, num_samples=n_samples, 
                                 num_slices=num_slices_ecg, replacement_method = "total_mean")
    
    fig = exp.as_pyplot_figure() 
    
    return exp, fig

In [None]:
def plot_lime(series_ecg, num_slices_ecg, X_val, y_val, BrS_predictions, idx_ecg, exp):

    values_per_slice_ecg = math.ceil(series_ecg.shape[1]/ num_slices_ecg)
    no_pattern = X_val[np.where(y_val[:,1]==0)]
    pattern = X_val[np.where(y_val[:,1]==1)]
    lead_to_index = {"I": 0, "II": 1, "V1": 2, "V2":3,
                    "V3": 4, "V4": 5, "V5": 6, "V6":7}
    leads = ["I", "II", "V1","V2","V3","V4","V5","V6"]

    labels = ["no Brugada Pattern", "Brugada Pattern"]
    true_label = labels[int(y_val[idx_ecg,1])]
    predicted_label = labels[BrS_predictions[idx_ecg]]

    fig, axes = plt.subplots(nrows = 4, ncols = 2, figsize = (30,30), sharex = True, sharey=True)

    for i, ax in enumerate(axes.flatten()):
        ax.plot(series_ecg[i], 'b', label='Explained instance')
        ax.plot(no_pattern[:,:,i].mean(axis=0), color='red',label='Mean of class no Brugada Pattern')
        ax.plot(pattern[:,:,i].mean(axis=0), color='green',label='Mean of class Brugada Pattern')
        ax.set_title("Lead "+ leads[i])

        for j in range(num_features_ecg):
            feature, weight = exp.as_list()[j]        
            feature_name_index = lead_to_index[feature.split(" ", 2)[2]] #split at second space in feature name to take lead name as key

            if feature_name_index == i:
                start = int(feature.split(" ", 1)[0]) * values_per_slice_ecg #int(feature.split(" ", 1)[0]): only keep int from feature name, eg feature name (23 - II), split at " " (space) and keep first part (23) and take int(23)
                end = start + values_per_slice_ecg
                color = 'red' if weight < 0 else 'green' 
                ax.axvspan(start , end, color=color, alpha=abs(weight*10))

    ax.legend(loc='lower left')
    title = "LIME explanation of single sample. True label: " + true_label + " . Predicted label: " + predicted_label + "."
    fig.suptitle(title, fontsize=20)
    
    if true_label == "no Brugada Pattern" and predicted_label == "no Brugada Pattern":
        saved_title = "True_negative.png"
    elif true_label == "no Brugada Pattern" and predicted_label == "Brugada Pattern":
        saved_title = "False_positive.png"
    elif true_label == "Brugada Pattern" and predicted_label == "no Brugada Pattern":
        saved_title = "False_negative.png"
    elif true_label == "Brugada Pattern" and predicted_label == "Brugada Pattern":
        saved_title = "True_positive.png"
    
    fig.savefig(plot_path + saved_title)
    plt.show()
              
    return fig

#interpretation: real label is no BrP but model predicts as Brugada (false positive). Green bands correspond to
# evidence that it's a positive sample, red bands correspond to evidence that it's a negative sample

In [None]:
num_features_ecg = 50 #number of lime weights
num_slices_ecg = 1000 #number of segments of a lead
n_samples = 50 #number of perturbated samples at a single time point
replacement_method = "total_mean" #possible replacement mathods: "noise" (fill in noise for perturbation), "mean" (fill in mean of segment), "total_mean" (fill in mean of lead)
explainer = LimeTimeSeriesExplainer(class_names = ["No BrP", "BrP"], signal_names= ["I", "II", "V1", "V2", "V3", "V4", "V5", "V6"])

In [None]:
#false positive
idx_ecg = 0 #0th sample
series_ecg = X_val[idx_ecg].T
exp, weights_fig = explain_sample_lime(explainer,series_ecg, num_features_ecg, n_samples, num_slices_ecg, replacement_method)

In [None]:
plot_lime(series_ecg, num_slices_ecg, X_val, y_val, BrS_predictions, idx_ecg, exp)

In [None]:
#false negative
idx_ecg = 361
series_ecg = X_val[idx_ecg].T
exp, weights_fig = explain_sample_lime(explainer,series_ecg, num_features_ecg, n_samples, num_slices_ecg, replacement_method)

In [None]:
plot_lime(series_ecg, num_slices_ecg, X_val, y_val, BrS_predictions, idx_ecg, exp)

In [None]:
#true positive
idx_ecg = 360
series_ecg = X_val[idx_ecg].T
exp, weights_fig = explain_sample_lime(explainer,series_ecg, num_features_ecg, n_samples, num_slices_ecg, replacement_method)

In [None]:
plot_lime(series_ecg, num_slices_ecg, X_val, y_val, BrS_predictions, idx_ecg, exp)

In [None]:
#true negative
idx_ecg = 10
series_ecg = X_val[idx_ecg].T
exp, weights_fig = explain_sample_lime(explainer,series_ecg, num_features_ecg, n_samples, num_slices_ecg, replacement_method)

In [None]:
plot_lime(series_ecg, num_slices_ecg, X_val, y_val, BrS_predictions, idx_ecg, exp)