In [7]:
pip install -U imbalanced-learn

Note: you may need to restart the kernel to use updated packages.


In [14]:
import numpy as np
import pandas as pd
from sklearn.preprocessing import OneHotEncoder
from imblearn.over_sampling import RandomOverSampler

np.random.seed(42)

def load_data():

    df = pd.read_csv("mitbih_train.csv", header=None)
    df_test = pd.read_csv("mitbih_test.csv", header=None)

    df_val_train = df.values
    X = df_val_train[:, :-1]
    y = df_val_train[:, -1].astype(int)
    
    df_val_test = df_test.values
    X_test = df_val_test[:, :-1]
    Y_test = df_val_test[:, -1].astype(int)

    ros = RandomOverSampler(random_state=0)
    X_train, Y_train = ros.fit_resample(X, y)
    
    from sklearn.model_selection import train_test_split
    X_train, X_val, Y_train, Y_val = train_test_split(
                                        X_train, 
                                        Y_train, 
                                        test_size=0.25, 
                                        random_state=42)
    
    shuffle_idx = np.random.permutation(list(range(X_train.shape[0])))
    X_train = X_train[shuffle_idx]
    Y_train = Y_train[shuffle_idx]
    
    X_train = np.expand_dims(X_train, 2)
    X_val = np.expand_dims(X_val, 2)
    X_test = np.expand_dims(X_test, 2)
    
    ohe = OneHotEncoder()
    Y_test = ohe.fit_transform(Y_test.reshape(-1,1))
    
    return X_train, Y_train, X_val, Y_val, X_test, Y_test

In [15]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input,Conv1D,ReLU,Add,MaxPooling1D,Flatten,Dense
from tensorflow.keras.losses import SparseCategoricalCrossentropy
from tensorflow.keras.optimizers.schedules import ExponentialDecay
from tensorflow.keras.optimizers import Adam

def model_ecg():
    input = Input(shape=(360, 1))
    CONV1 = Conv1D(filters=32, kernel_size=5, strides=1)(input)
    CONV1_1 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(CONV1)
    ACT1_1 = ReLU()(CONV1_1)
    CONV1_2 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(ACT1_1)
    ADD1_1 = Add()([CONV1_2, CONV1])
    ACT1_2 = ReLU()(ADD1_1)
    MAX1_1 = MaxPooling1D(pool_size=5, strides=2)(ACT1_2)

    CONV2_1 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(MAX1_1)
    ACT2_1 = ReLU()(CONV2_1)
    CONV2_2 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(ACT2_1)
    ADD2_1 = Add()([CONV2_2, MAX1_1])
    ACT2_2 = ReLU()(ADD2_1)
    MAX2_1 = MaxPooling1D(pool_size=5, strides=2)(ACT2_2)

    CONV3_1 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(MAX2_1)
    ACT3_1 = ReLU()(CONV3_1)
    CONV3_2 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(ACT3_1)
    ADD3_1 = Add()([CONV3_2, MAX2_1])
    ACT3_2 = ReLU()(ADD3_1)
    MAX3_1 = MaxPooling1D(pool_size=5, strides=2)(ACT3_2)

    CONV4_1 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(MAX3_1)
    ACT4_1 = ReLU()(CONV4_1)
    CONV4_2 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(ACT4_1)
    ADD4_1 = Add()([CONV4_2, MAX3_1])
    ACT4_2 = ReLU()(ADD4_1)
    MAX4_1 = MaxPooling1D(pool_size=5, strides=2)(ACT4_2)

    CONV5_1 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(MAX4_1)
    ACT5_1 = ReLU()(CONV5_1)
    CONV52 = Conv1D(filters=32, kernel_size=5, strides=1, padding='same')(ACT5_1)
    ADD5_1 = Add()([CONV52, MAX4_1])
    ACT5_2 = ReLU()(ADD5_1)
    MAX5_1 = MaxPooling1D(pool_size=5, strides=2)(ACT5_2)

    FLA1 = Flatten()(MAX5_1)
    DEN1 = Dense(32)(FLA1)
    ACT = ReLU()(DEN1)
    ADD1 = Dense(5, activation="softmax")(ACT)

    model = Model(inputs=input, outputs=ADD1)

    initial_learning_rate = 0.001
    lr_schedule = ExponentialDecay(
        initial_learning_rate,
        decay_steps=10000,
        decay_rate=0.75,
        staircase=True)

    model.compile(
        optimizer = Adam(learning_rate=lr_schedule),
        loss = SparseCategoricalCrossentropy(),
        metrics = ["sparse_categorical_accuracy"]) 
    
    return model_ecg

Train model

In [19]:
#from model_ecg import model_ecg
#from load import load_data
from tensorflow.keras.callbacks import ModelCheckpoint,EarlyStopping

def main() -> object:
    
    model = model_ecg()
    X_train, Y_train, X_val, Y_val, _, _ = load_data()

    callbacks = [EarlyStopping(monitor="val_loss", patience=10, verbose=1),
                ModelCheckpoint(filepath ='.{val_loss:.3f}-{val_sparse_categorical_accuracy:.3f}-{epoch:03d}-{loss:.3f}-{sparse_categorical_accuracy:.3f}.h5', 
                                monitor='val_loss', save_best_only=False, verbose=0, period=1)]
    
    model.fit( X_train, Y_train, 
            validation_data = (X_val, Y_val),
            batch_size = 256,
            epochs = 100, 
            callbacks = callbacks
            )

if __name__ == '__main__':
    main()



AttributeError: 'function' object has no attribute 'fit'

Test model

In [17]:
import numpy as np
import time
import argparse
from sklearn.metrics import cohen_kappa_score, f1_score, confusion_matrix
from sklearn.preprocessing import OneHotEncoder
from tensorflow.keras.models import load_model


def evaluate_metrics(confusion_matrix, y_test, y_pred, print_result=False, f1_avg='macro'):
    
    TP = np.diag(confusion_matrix)
    FP = confusion_matrix.sum(axis=0) - TP
    FN = confusion_matrix.sum(axis=1) - TP    
    TN = confusion_matrix.sum() - (FP + FN + TP)

    TPR = TP / (TP + FN)
    TNR = TN / (TN + FP)
    PPV = TP / (TP + FP)

    ACC = (TP + TN) / (TP + FP + FN + TN)
    ACC_macro = np.mean(ACC)  

    f1 = f1_score(y_test, y_pred, average=f1_avg)
    kappa = cohen_kappa_score(y_test, y_pred)
    
    if (print_result):
        print("\n")
        print("\n")
        print("============ METRICS ============")
        print(confusion_matrix)
        print("Accuracy (macro) : ", ACC_macro)        
        print("F1 score         : ", f1)
        print("Cohen Kappa score: ", kappa)
        print("======= Per class metrics =======")
        print("Accuracy         : ", ACC)
        print("Sensitivity (TPR): ", TPR)
        print("Specificity (TNR): ", TNR)
        print("Precision (+P)   : ", PPV)
    
    return ACC_macro, ACC, TPR, TNR, PPV, f1, kappa

def test_tf_model(X_test, y_test, model):
    predictions, exec_times = [], []
    segments = X_test.astype(np.float32)

    y_test = np.argmax(y_test, axis=1)

    for i in range(segments.shape[0]):
        segment = segments[[i],:,:]
        start_time = time.time()

        pred = model.predict(segment, verbose=0)
        
        end_time = time.time()
        exec_times.append(end_time-start_time)
        predictions.append(pred)

    predictions = np.vstack(predictions)
    y_pred = np.argmax(predictions,1)

    exec_times = np.array(exec_times) * 1000

    cm = confusion_matrix(y_test, y_pred)
    ACC_macro, ACC, TPR, TNR, PPV, f1, kappa = evaluate_metrics(cm, y_test, y_pred, True)

    return y_pred, exec_times, (cm, ACC_macro, ACC, TPR, TNR, PPV, f1, kappa)    

def load_test_data():
    _, _, _, _, X_test, Y_test = load_data()
    
    ohe = OneHotEncoder()
    Y_test = ohe.fit_transform(Y_test.reshape(-1,1))
    
    return X_test, Y_test

def main(model_path) -> object:
    model = load_model(model_path)
    X_test, Y_test = load_test_data()
    
    y_pred_1, exec_times_1, cm = test_tf_model(X_test, Y_test, model)
    print("Mean of execution times: " , np.mean(exec_times_1))
    print("Standard diviation: ", np.std(exec_times_1))
    print("Maximum: ", np.max(exec_times_1))

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Load model')
    parser.add_argument('--model', metavar='path', required=True, help='path to model')
    args = parser.parse_args()    
    
    main(model_path = args.model)

ModuleNotFoundError: No module named 'load'

In [18]:
import numpy as np
import time
import tensorflow as tf
from sklearn.metrics import cohen_kappa_score, f1_score, confusion_matrix
from tensorflow.keras.models import load_model
import tensorflow.lite as tflite

def evaluate_metrics(confusion_matrix, y_test, y_pred, print_result=False, f1_avg='macro'):
    
    TP = np.diag(confusion_matrix)
    FP = confusion_matrix.sum(axis=0) - TP
    FN = confusion_matrix.sum(axis=1) - TP    
    TN = confusion_matrix.sum() - (FP + FN + TP)
    TPR = TP / (TP + FN)
    TNR = TN / (TN + FP)
    PPV = TP / (TP + FP)
    ACC = (TP + TN) / (TP + FP + FN + TN)

    ACC_macro = np.mean(ACC)

    f1 = f1_score(y_test, y_pred, average=f1_avg)
    kappa = cohen_kappa_score(y_test, y_pred)
    
    if (print_result):
        print("\n")
        print("\n")
        print("============ METRICS ============")
        print(confusion_matrix)
        print("Accuracy (macro) : ", ACC_macro)        
        print("F1 score         : ", f1)
        print("Cohen Kappa score: ", kappa)
        print("======= Per class metrics =======")
        print("Accuracy         : ", ACC)
        print("Sensitivity (TPR): ", TPR)
        print("Specificity (TNR): ", TNR)
        print("Precision (+P)   : ", PPV)
    
    return ACC_macro, ACC, TPR, TNR, PPV, f1, kappa


def load_lite_model(lite_model=None, tflite_load='Loaded', filename=None):
    if tflite_load == 'Loaded':
        interpreter = tflite.Interpreter(model_content=lite_model)
    elif tflite_load == 'File':
        interpreter = tflite.Interpreter(model_path=filename + ".tflite")

    interpreter.allocate_tensors()

    input_index = interpreter.get_input_details()[0]["index"]
    output_index = interpreter.get_output_details()[0]["index"]

    return interpreter, input_index, output_index


def load_test_data():
    _, _, _, _, X_test, Y_test = load_data()
    return X_test, Y_test


def test_lite_model(X_test, y_test, interpreter, input_index, output_index):
    predictions = []
    exec_times  = []
    segments = X_test.astype(np.float32)

    y_test = np.argmax(y_test, axis=1)

    for i in range(segments.shape[0]):
        segment = segments[[i],:,:]
        start_time = time.time()
        interpreter.set_tensor(input_index, segment)
        interpreter.invoke()
        pred = interpreter.get_tensor(output_index)
        end_time = time.time()
        exec_times.append(end_time-start_time)
        predictions.append(pred)

    predictions = np.vstack(predictions)
    y_pred = np.argmax(predictions,1)

    exec_times = np.array(exec_times) * 1000

    cm = confusion_matrix(y_test, y_pred)
    ACC_macro, ACC, TPR, TNR, PPV, f1, kappa = evaluate_metrics(cm, y_test, y_pred, True)

    return y_pred, exec_times, (cm, ACC_macro, ACC, TPR, TNR, PPV, f1, kappa) 

def main() -> object:
    X_test, Y_test = load_test_data()

    FILENAME = 'models/ecg_quant'
    interpreter_1, input_idx_1, output_idx_1 = load_lite_model(tflite_load='File', filename=FILENAME)
    y_pred, exec_times, cm = test_lite_model(X_test, Y_test, interpreter_1, input_idx_1, output_idx_1)

    print("Mean of execution times: " , np.mean(exec_times))
    print("Standard diviation: ", np.std(exec_times))
    print("Maximum: ", np.max(exec_times))

if __name__ == '__main__':
    main()

ModuleNotFoundError: No module named 'load'