# Task: Classification into 4 classes


## Idea is to use CWT images of the data and perform classification using CNNs

### Use preprocessing.ipynb to preprocess the data

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from keras.utils import to_categorical

import os
import tempfile

### Load in class representatives

In [16]:
ones = pd.read_csv("class_reps/ones.csv", header=None)
twos = pd.read_csv("class_reps/twos.csv", header=None)
threes = pd.read_csv("class_reps/threes.csv", header=None)
fours = pd.read_csv("class_reps/fours.csv", header=None)
print(len(ones))
print(len(twos))
print(len(threes))
print(len(fours))

3030
443
1474
170


# Classification

## Create Model

### Paper NN Structure:
Automatic ECG Classification Using Continuous Wavelet
Transform and Convolutional Neural Network

![Paper NN structure](Paper_pipeline.png "Paper NN Structure")

<!-- insert image -->
![Paper NN structure](Paper_NN_structure.png "Paper NN Structure")

## Load Data

check npz's

## Split dataset into validation, train and test:

In [None]:
y_train = pd.read_csv("y_train.csv").drop(columns='id').to_numpy()
print(y_train.shape)
npz_paths = []
for i in range(y_train.shape[0]):
    npz_path = "npz_files/input{}.npz".format(i)
    npz_paths.append(npz_path)
npz_paths

In [51]:
# shuffle npz paths
shuffled_paths = np.random.shuffle(npz_paths)
train_paths, val_paths, test_paths = np.split(npz_paths, [int(.8*len(npz_paths)), int(.9*len(npz_paths))])


## Check for sufficient representatives of class 4

In [None]:
num_y = 0
for i in range(len(val_paths)):
    npz = np.load(val_paths[i])
    y = npz["y"]
    if y == 3:
        num_y += 1
print(num_y)

## Create Dataset
Note: the three y datasets are called y_train_shuffled and so on to avoid naming conflicts with initial y_train

In [43]:
def create_ds(npz_paths):
    '''arguments: list of training, validation or test npz file paths
    returns: np.arrays'''
    images_out, peaks_out, y_out = [], [], []
    

    for name in npz_paths:
        npz = np.load(name)
        image = npz["img"]
        images_out.append(image)
        peaks = npz["peaks"]
        peaks_out.append(peaks)
        y = npz["y"]
        y_out.append(y)

    
    images_out = np.array(images_out)
    peaks_out = np.array(peaks_out)
    y_out = np.array(y_out)
    return images_out, peaks_out, y_out

In [52]:
train_images, train_peaks, y_train_shuffled = create_ds(train_paths)
val_images, val_peaks, y_val_shuffled = create_ds(val_paths)
test_images, test_peaks, y_test_shuffled = create_ds(test_paths)
y_train_shuffled = to_categorical(y_train_shuffled)
y_val_shuffled = to_categorical(y_val_shuffled)
y_test_shuffled = to_categorical(y_test_shuffled)

In [91]:
# # old load data
# def load_image(Class, index):
#     '''Helper Function to load the image'''
#     path = "cwt_images/" + str(Class) + "/cwtmatr_class" + str(Class) + "_index" + str(index) + ".png"
#     img = tf.image.decode_png(tf.io.read_file(path), channels=3)
#     img = tf.cast(img, tf.float32)
#     img = img / 127.5 - 1
#     img = tf.image.resize(img, (IMG_HEIGHT, IMG_WIDTH))
#     # Don't Think data augmentation makes too much sense
#     # if training:
#     #     img = tf.image.random_flip_left_right(img)
#     #     img = tf.image.random_flip_up_down(img)
#     return img

# def load_input(Class, index):
#     '''Helper Function to load the image - peak pair'''
#     img = load_image(Class, index)
#     pk = pd.read_csv("peaks/" + str(Class) + "/ecg_peaks_normalized_zeros.csv", header=None).to_numpy(dtype="float32")[:,index]
#     image = tf.stack([img], axis=0)
#     peak = tf.stack([pk], axis=0)
#     Input = [image, peak]
#     return Input

# def load_data():
#     '''Load the dataset'''
    
#     train_ds = tf.keras.utils.image_dataset_from_directory("cwt_images", labels="inferred", label_mode="categorical", class_names=None,)
    

#     return 0

## Model Design

In [53]:
METRICS = [
      keras.metrics.TruePositives(name='tp'),
      keras.metrics.FalsePositives(name='fp'),
      keras.metrics.TrueNegatives(name='tn'),
      keras.metrics.FalseNegatives(name='fn'), 
      keras.metrics.BinaryAccuracy(name='accuracy'),
      keras.metrics.Precision(name='precision'),
      keras.metrics.Recall(name='recall'),
      keras.metrics.AUC(name='auc'),
      keras.metrics.AUC(name='prc', curve='PR'), # precision-recall curve
]

learning_rate = 1e-3

IMG_HEIGHT = 369
IMG_WIDTH = 496


def create_model(metrics=METRICS, output_bias=None):
    '''Create feature extractor'''
    if output_bias is not None:
        output_bias = tf.keras.initializers.Constant(output_bias)

    image_input = tf.keras.Input(shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    encoder = tf.keras.Sequential()
    encoder.add(keras.layers.Conv2D(filters=16,kernel_size=7,strides=1,input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)))
    encoder.add(keras.layers.BatchNormalization())
    encoder.add(keras.layers.ReLU())
    encoder.add(keras.layers.MaxPool2D(5,5))
    
    encoder.add(keras.layers.Conv2D(32,3,1))
    encoder.add(keras.layers.BatchNormalization())
    encoder.add(keras.layers.ReLU())
    encoder.add(keras.layers.MaxPool2D(3,3))

    encoder.add(keras.layers.Conv2D(64,3,1))
    encoder.add(keras.layers.BatchNormalization())
    encoder.add(keras.layers.ReLU())
    encoder.add(keras.layers.MaxPool2D(3))

    # fully connected layer for scalar output
    encoder.add(keras.layers.Flatten())
    encoder.add(keras.layers.Dense(64, activation='relu'))
    image_encoder = keras.Model(image_input, encoder(image_input))

    
    # peaks input
    peak_input = keras.Input(shape=(159))

    # conactenate the two submodels
    complete_encoder = keras.layers.concatenate([image_encoder.output, peak_input])


    classifier = keras.layers.Dense(32, activation='relu')(complete_encoder)
    # Set initial bias towards classes 1 and 3 to speed up convergence
    classifier = keras.layers.Dense(4, activation='softmax', bias_initializer=output_bias)(classifier)
    model = keras.Model([image_encoder.input, peak_input], classifier)

    model.compile(
      optimizer=keras.optimizers.Adam(learning_rate=learning_rate),
      loss=keras.losses.CategoricalCrossentropy(),
      metrics=metrics)

    return model


## Train Model

Fix seed:

In [14]:
from numpy.random import seed
seed(1)
print(np.random.rand(1))

[0.417022]


### Create Bias to speed up convergence
Note on the plotting of the model: If we want a more detailed overview we should define each layer separate and not in a sequential

In [54]:
data_size = 5117
class_1_size = ones.shape[0] #3030
class_2_size = twos.shape[0]    #443
class_3_size = threes.shape[0]  #1474
class_4_size = fours.shape[0]   #170
initial_bias = [class_1_size/data_size, class_2_size/data_size, class_3_size/data_size, class_4_size/data_size]

class_weights = {0: 1.0/(data_size - class_1_size) * data_size / 2.0, 1: 1.0/(data_size - class_2_size) * data_size / 2.0, 2: 1.0/(data_size - class_3_size) * data_size / 2.0, 3: 1.0/(data_size - class_4_size) * data_size / 2.0}

model = create_model(output_bias=initial_bias)
# model.summary()
# keras.utils.plot_model(model, show_shapes=True, dpi=64)

Prediction Test for the initialized bias

In [None]:
# image1 = load_image(1, 0)
# image2 = load_image(1, 1)

# peak1 = pd.read_csv("peaks/1/ecg_peaks_normalized_zeros.csv", header=None).to_numpy(dtype="float32")[:,0]
# peak2 = pd.read_csv("peaks/1/ecg_peaks_normalized_zeros.csv", header=None).to_numpy(dtype="float32")[:,1]

# # build images using tf.stack
# images = tf.stack([image1, image2], axis=0)
# peaks = tf.stack([peak1, peak2], axis=0)

# Input = [images,peaks]

# model1 = create_model()
# model2 = create_model(output_bias=initial_bias)
# print("Note the initial bias increases ikelihoods for class 1 and class 3")
# print(model1.predict(Input))
# print(model2.predict(Input))

Store the initial weights for later comparison

In [18]:
initial_weights = os.path.join(tempfile.mkdtemp(), 'initial_weights')
model.save_weights(initial_weights)

In [55]:
# Might want Larger Batch size to ensure that some examples of that class are in the batch
EPOCHS = 100
BATCH_SIZE = 1024 #2048
baseline_history = model.fit(
    [train_images, train_peaks],
    y_train_shuffled,
    batch_size=BATCH_SIZE,
    epochs=EPOCHS,
    validation_data=([val_images, val_peaks],y_val_shuffled),
    verbose=1,
    class_weight=class_weights
)

Epoch 1/100


: 

: 