# Setup

## Imports

In [None]:
import sys
sys.path.append("../")
from models import *

import pandas as pd
import numpy as np

from pathlib import Path
from typing import Tuple, Optional

import tensorflow as tf
from tensorflow.keras.optimizers import Adam
from keras.models import Model
from keras.callbacks import ModelCheckpoint, EarlyStopping, LearningRateScheduler, ReduceLROnPlateau
from keras.layers import Input, Dropout, Convolution1D, MaxPool1D, UpSampling1D, concatenate, GlobalMaxPool1D

from sklearn.model_selection import GridSearchCV
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score, average_precision_score
from sklearn.model_selection import train_test_split

## Setting up GPU

In [None]:
%tensorflow_version 2.x
device_name = tf.test.gpu_device_name()
if device_name != "/device:GPU:0":
  device_name = "/cpu:0"
print('Found device at: {}'.format(device_name))

## Setting up Folder Structure

In [None]:
data_dir = Path("../input/")
model_dir = Path("./")
base_dir = Path("../")

# Data Loading

format_data is a utility function for relaibly loading and lightly formatting the heartbeat data signals. It can add padding to ensure that signals have a certain lenght.

In [None]:
"""
  :param df: Dataframe containing signal and labels
  :param padded_size: Integer indicating if signal should be padded
                      to certain length
  :return: Signal, Labels
"""
def format_data(
    df : pd.DataFrame,
    padded_size : Optional[int] = None
) -> Tuple[np.array, np.array]:

    # Load signal and labels from the dataframe
    Y = np.array(df[187].values).astype(np.int8)
    X = np.array(df[list(range(187))].values)[..., np.newaxis]

    # Add padding if padded_size is specified
    if not padded_size is None:
        X = np.concatenate([X, np.zeros((X.shape[0], padded_size - X.shape[1], 1))], axis=1)

    return X, Y

# Construct Stacking Ensambles

construct_ensamble is a utility function to train and test a stacking ensamble based on averaging predicted probabilities, voting and using a logistic regression on top of the predicted probabilities.

In [None]:
"""
  :param pretrined_models: Pretrained models to stack
  :param X_train: Training signals
  :param Y_train: Training labels
  :param X_test: Test signals
  :param Y_test: Test labels
  :param multiclass: 
"""
def construct_ensamble(pretrined_models, X_train, Y_train, X_test, Y_test, multiclass):
    
    # Ensamble types to test
    ensamble_types = [
        ("Mean Ensamble", MeanEnsamble), 
        ("Voting Ensamble", VotingEnsamble), 
        ("Logistic Regression Ensamble", LogRegEnsamble)
    ]

    # Iterate over all ensamble types
    for name, ensamble_type in ensamble_types:

        print(f"Fitting a {name}")

        model = ensamble_type(pretrined_models)
        model.fit(X_train, Y_train)

        pred_test = model.predict(X_test)

        if multiclass:
            f1 = f1_score(Y_test, pred_test, average="macro")
            print("Test f1 score : %s "% f1)

            acc = accuracy_score(Y_test, pred_test)
            print("Test accuracy score : %s "% acc)

        else:

            f1 = f1_score(Y_test, pred_test)
            print("Test f1 score : %s "% f1)

            acc = accuracy_score(Y_test, pred_test)
            print("Test accuracy score : %s "% acc)

            auroc = roc_auc_score(Y_test, pred_test)
            print("Test AUROC : %s "% auroc)

            auprc = average_precision_score(Y_test, pred_test)
            print("Test AUPRC : %s "% auprc)

        print("\n")


# PTB Dataset

## Load Data

We load the data using the previously defined utility functions

In [None]:
# Problem parameters
unpadded_size = 187
padded_size = 256

# Load data PTB
df_1 = pd.read_csv(data_dir.joinpath("ptbdb_normal.csv"), header=None)
df_2 = pd.read_csv(data_dir.joinpath("ptbdb_abnormal.csv"), header=None)
df   = pd.concat([df_1, df_2])

df_train, df_test = train_test_split(
    df, test_size=0.2, 
    random_state=1337, stratify=df[unpadded_size]
)

# Format data
X_test, Y_test   = format_data(df_test)
X_train, Y_train = format_data(df_train)


## Set up pretrained models

In [None]:
from tensorflow.keras.losses  import mean_squared_error

# Load the U-Net seperatly before loading the other pre-trained models
encoder = Unet((padded_size,1), loss = mean_squared_error)
encoder.initialize(np.zeros([1, padded_size, 1]), np.zeros([1, padded_size, 1]))
encoder.load_weights("/content/ml4healthproject1/FinishedModels/unet.h5")

In [None]:
# Load pretrained models
pretrained_models = [
    (
        DoubleConvCNN(classes=1, dropout=0.1, optimizer=Adam(learning_rate = 0.001)),
        base_dir.joinpath("CNNs/Results/VanillaCNN_PTBDB.h5")
    ),

    (
        ResNetStandard(classes=1, filters=32, dropout=0.1, optimizer=RMSprop(1e-3)),
        base_dir.joinpath("CNNs/Results/ResNet_ptbdb.h5")
    ),
    (
        LatentClassifier(classes = 1, width=128, depth=2, encoder=encoder, padded_size=padded_size),
        base_dir.joinpath("Unet/latent_ptbdb.h5")
    ),
    (
        VanillaRNN(input_length=X_train.shape[1], num_units=200, classes=2, num_cells = 1, dropout=0.2, optimizer="adam", lr=0.0001),
        base_dir.joinpath("RNNs/Results/final_vanilla_rnn_ptbdb.h5")
    ),
    (
        BiDirLSTM(input_length=X_train.shape[1], num_units=100, classes=2, num_cells = 2, num_dense = 2, dropout=0, optimizer="adam", lr=0.0001),
        base_dir.joinpath("RNNs/Results/final_bdlstm_ptbdb.h5")
    ),
    (
        ConvLSTM(input_length=X_train.shape[1], num_units=150, num_conv=2, num_dense = 2, classes=2, dropout=0.5, optimizer="adam", lr=0.00),
        base_dir.joinpath("/RNNs/Results/final_cnn_lstm_ptbdb.h5")
    )
]

## Construct ensambles based on all previously trained models and also just the CNN models

In [None]:
# Test stacking all previously trained models
print("Testing all models\n")
construct_ensamble(
    pretrained_models,
    X_train, Y_train,
    X_test, Y_test,
    False
)

# Test stacking only the top performing models
# based on their CV-scores during training
print("Testing using only top three models\n")
construct_ensamble(
    pretrained_models[:3],
    X_train, Y_train,
    X_test, Y_test,
    False
)


# MIT-BIH Dataset

## Load Data

We load the data using the previously defined utility functions

In [None]:
# Problem parameters
unpadded_size = 187
padded_size = 256

# Load data MIT
df_train = pd.read_csv(data_dir.joinpath("mitbih_train.csv"), header=None)
df_train = df_train.sample(frac=1)
df_test = pd.read_csv(data_dir.joinpath("mitbih_test.csv"), header=None)

# Format data
X_test, Y_test   = format_data(df_test)
X_train, Y_train = format_data(df_train)


## Set up pretrained models

In [None]:
from tensorflow.keras.losses  import mean_squared_error

# Load the U-Net seperatly before loading the other pre-trained models
encoder = Unet((padded_size,1), loss = mean_squared_error)
encoder.initialize(np.zeros([1, padded_size, 1]), np.zeros([1, padded_size, 1]))
encoder.load_weights("/content/ml4healthproject1/FinishedModels/unet.h5")

In [None]:
# Load pretrained models
pretrained_models = [
    (
        VanillaCNN(classes=5, dropout=0.1, optimizer=Adam(learning_rate = 0.001)),
        base_dir.joinpath("CNNs/Results/VanillaCNN_MITBIH.h5")
    ),

    (
        ResNetStandard(classes=5, filters=32, dropout=0.1, optimizer=RMSprop(1e-3)),
        base_dir.joinpath("CNNs/Results/ResNet_MITBIH.h5")
    ),
    (
        LatentClassifier(classes = 5, width=128, depth=2, encoder=encoder, padded_size=padded_size),
        base_dir.joinpath("Unet/latent_mitbih.h5")
    ),
    (
        VanillaRNN(input_length=X_train.shape[1], num_units=150, classes=5, num_cells = 1, dropout=0.2, optimizer="adam", lr=5e-05),
        base_dir.joinpath("RNNs/Results/final_vanilla_rnn_mitbih.h5")
    ),
    (
        BiDirLSTM(input_length=X_train.shape[1], num_units=100, classes=5, num_cells = 2,  num_dense = 2, dropout=0, optimizer="adam", lr=0.0001),
        base_dir.joinpath("RNNs/Results/final_bdlstm_mitbih.h5")
    ),
    (
        ConvLSTM(input_length=X_train.shape[1], num_units=150, num_conv=2, num_dense = 2, classes=5, dropout=0.5, optimizer="adam", lr=0.001),
        base_dir.joinpath("RNNs/Results/final_cnn_lstm_mitbih.h5")
    )
]

## Construct ensambles based on all previously trained models and also just the CNN models

In [None]:
# Test stacking all previously trained models
print("Testing all models\n")
construct_ensamble(
    pretrained_models,
    X_train, Y_train,
    X_test, Y_test,
    True
)

# Test stacking only the top performing models
# based on their CV-scores during training
print("Testing using only top three models\n")
construct_ensamble(
    pretrained_models[:3],
    X_train, Y_train,
    X_test, Y_test,
    True
)
