# 1D CNN on ECG data with transfer learning

This notebook demonstrates a 1D CNN that is trained on ECG data from the MIT-BIH dataset. This project will utilize transfer learning to learn the general patterns of an ECG. Then this final layers of this pre-trained model are re-trained on one specific patient using the first ten minutes of the ECG trace. Then the model will be evaluated using the final 20 minutes of the ECG trace.

Of course the to-be evaluated patient is not included initial learning step, this will prevent any data leakage. The process will be repeated in a leave-one-out-cross-validation fashion.

In [1]:
import math

import datetime
import pandas as pd
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers, models
import functools

## Final filtering and loading
The dataset needs to have it's final preprocessing steps applied. 
We factorize the labels (convert strings to integers) so it can be used by the model.
Also we exclude any hearthbeats that have a windown length of 160, these are hearthbeats that occur in the beginning and and of a recording

In [2]:
data = pd.read_pickle("splitted_samples.data")

data = data[data['waves'].str.len() == 160]
# factorize labels
factorized_labels = data["beat_type"].factorize()
data["label"] = factorized_labels[0]

In [3]:
summ = data["beat_type"].value_counts()
print("Percentage of normal hearthbeats")
1- (sum(summ) - summ["N"]) / sum(summ)

Percentage of normal hearthbeats


0.6856484950382865

## Data splitting and normalization
Data needs to be splitted and transformed to the correct shape. Then the values are normalized between 0 and 1. It is important to note that that test and train data is seperately normalized, also the two hearth tracks are also seperately normalized

In [4]:
def train_test_split():
    patient_count = np.unique(data["patient"]).shape[0]
    
    for i in range(patient_count):
        train = data[data["patient"] != i]
        test = data[data["patient"] == i]

        train_data, train_labels = np.array(train["waves"].tolist()), np.expand_dims(train["label"].values, axis=1)
        test_data, test_labels = np.array(test["waves"].tolist()), np.expand_dims(test["label"].values, axis=1)
        
        # Normalization between [0,1]
        train_data[:,:,0] = (train_data[:,:,0] - train_data[:,:,0].min()) / (train_data[:,:,0].max() - train_data[:,:,0].min())
        train_data[:,:,1] = (train_data[:,:,1] - train_data[:,:,1].min()) / (train_data[:,:,1].max() - train_data[:,:,1].min())  
        
        test_data[:,:,0] = (test_data[:,:,0] - test_data[:,:,0].min()) / (test_data[:,:,0].max() - test_data[:,:,0].min())  
        test_data[:,:,1] = (test_data[:,:,1] - test_data[:,:,1].min()) / (test_data[:,:,1].max() - test_data[:,:,1].min())  

        yield (train_data, train_labels), (test_data, test_labels)

## Evaluation all models with cross validation

In [5]:
patient = 0
evaluations = []
predictions_and_true_labels = []
run_name = input("Enter name of run")
for (train_data, train_labels), (test_data, test_labels) in train_test_split():

    base_model = models.Sequential()
    base_model.add(layers.Conv1D(16, 3, input_shape=(160, 2), activation="relu"))
    base_model.add(layers.MaxPool1D(2))
    base_model.add(layers.Conv1D(32, 3, activation="relu"))
    base_model.add(layers.MaxPool1D(2))
    base_model.add(layers.Conv1D(64, 3, activation="relu"))
    base_model.add(layers.MaxPool1D(2))

    final_layers = base_model.output
    final_layers = layers.Flatten()(final_layers)
    final_layers = layers.Dense(32, activation="relu")(final_layers)
    final_layers = layers.Dense(14, activation="relu")(final_layers)

    model = tf.keras.models.Model(inputs=base_model.input, outputs=final_layers)
    
    # Recompiling model

    model.compile(optimizer="adam",
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
                  metrics=["accuracy"])

    print("First training step")
    history = model.fit(train_data, train_labels,validation_data=(test_data, test_labels), epochs=10)

    log_dir = "logs/fit/" + run_name + "patient " + str(patient) + " | " + datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
    tensorboard_callback = tf.keras.callbacks.TensorBoard(log_dir=log_dir, histogram_freq=1)

    # Store evaluations
    evaluations.append(model.evaluate(test_data, test_labels))
    predictions_and_true_labels.append((model.predict(test_data), test_labels))
    
    # tf.keras.backend.clear_session()
    print("PATIENT", patient)
    patient += 1

Enter name of run FINALNOTRANSFER


First training step
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
PATIENT 0
First training step
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
PATIENT 1
First training step
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
PATIENT 2
First training step
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
PATIENT 3
First training step
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
PATIENT 4
First training step
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
PATIENT 5
First training step
Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
PATIENT 6
First trainin

In [6]:
import pickle
import os

if not os.path.exists("results"):
    os.mkdir("results")

with open("results/normal_learning_results.data", "wb") as out:
    pickle.dump(evaluations, out)

with open("results/normal_learning_predictions.data", "wb") as out:
    pickle.dump(predictions_and_true_labels, out)
    
print("Results have been stored to disk")

Results have been stored to disk
