# Autoencoder Latent Feature Representation

Idea: Train an Autoencoder network to generate a latent feature representation
for the heartbeats, then use this representation with a classifier.

In [None]:
from typing import Tuple

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.ensemble import GradientBoostingClassifier

import tensorflow as tf
from tensorflow.keras import layers, losses, optimizers
from tensorflow.keras.models import Model

In [None]:
def load_data(dataset):

     if dataset == 'mitbih':
         df_train = pd.read_csv("../data/mitbih_train.csv", header=None)
         df_train = df_train.sample(frac=1)
         df_test = pd.read_csv("../data/mitbih_test.csv", header=None)

         Y = np.array(df_train[187].values).astype(np.int8)
         X = np.array(df_train[list(range(187))].values)[..., np.newaxis]

         Y_test = np.array(df_test[187].values).astype(np.int8)
         X_test = np.array(df_test[list(range(187))].values)[..., np.newaxis]

         return X,Y,X_test,Y_test

     elif dataset == 'ptbdb':
         df_1 = pd.read_csv("../data/ptbdb_normal.csv", header=None)
         df_2 = pd.read_csv("../data/ptbdb_abnormal.csv", header=None)
         df = pd.concat([df_1, df_2])

         df_train, df_test = train_test_split(df, test_size=0.2, random_state=1337, stratify=df[187])


         Y = np.array(df_train[187].values).astype(np.int8)
         X = np.array(df_train[list(range(187))].values)[..., np.newaxis]

         Y_test = np.array(df_test[187].values).astype(np.int8)
         X_test = np.array(df_test[list(range(187))].values)[..., np.newaxis]

         return X,Y,X_test,Y_test

     else:
         raise NotImplementedError('wrong dataset name')

In [None]:
X_train, y_train, X_test, y_test = load_data('mitbih')

## Representation Learning

In [None]:
def pad_signals(signals: np.ndarray, target_length: int) -> np.ndarray:
    return np.pad(signals, [(0,0), (0, target_length - signals.shape[1]), (0,0)])

class Autoencoder(Model):
    def __init__(self, input_shape: Tuple[int, int], latent_dim: int):
        super(Autoencoder, self).__init__()

        ## Encoder ##
        conv = layers.Conv1D(filters=5, kernel_size=3, strides=2, padding='same', input_shape=input_shape)
        max_pool = layers.MaxPool1D(pool_size=2, padding='same')
        conv2 = layers.Conv1D(filters=15, kernel_size=3, strides=2, padding='same')
        max_pool2 = layers.MaxPool1D(pool_size=2, padding='same')
        conv3 = layers.Conv1D(filters=30, kernel_size=3, strides=2, padding='same')
        max_pool3 = layers.MaxPool1D(pool_size=2, padding='same')
        flatten = layers.Flatten()
        dense = layers.Dense(latent_dim)

        self.encoder = tf.keras.Sequential([
            conv,
            layers.BatchNormalization(),
            layers.ReLU(),
            max_pool,
            conv2,
            layers.BatchNormalization(),
            layers.ReLU(),
            max_pool2,
            conv3,
            layers.BatchNormalization(),
            layers.ReLU(),
            layers.Dropout(rate=0.1),
            max_pool3,
            flatten,
            dense
        ])

        ## Decoder ##
        dec_dense = layers.Dense(90, input_shape=(latent_dim,))
        reshape = layers.Reshape((3, 30))
        convt3 = layers.Conv1DTranspose(filters=30, kernel_size=3, strides=2, padding='same')
        upsample3 = layers.UpSampling1D(size=2)
        convt2 = layers.Conv1DTranspose(filters=15, kernel_size=3, strides=2, padding='same')
        upsample2 = layers.UpSampling1D(size=2)
        convt = layers.Conv1DTranspose(filters=5, kernel_size=3, strides=2, padding='same')
        upsample = layers.UpSampling1D(size=2)
        convt_final = layers.Conv1DTranspose(filters=1, kernel_size=3, strides=1, padding='same')

        self.decoder = tf.keras.Sequential([
            dec_dense,
            layers.ReLU(),
            reshape,
            convt3,
            layers.BatchNormalization(),
            layers.ReLU(),
            upsample3,
            convt2,
            layers.BatchNormalization(),
            layers.ReLU(),
            upsample2,
            convt,
            layers.BatchNormalization(),
            layers.ReLU(),
            upsample,
            convt_final
        ])

    def encode(self, x):
        return self.encoder(x)

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [None]:
# Pad the signals to a large enough multiple of 2 so that there is no loss
# in dimensionality through the autoencoder.
X_train_split, X_valid_split, y_train_split, y_valid_split = train_test_split(
    X_train,
    y_train,
    test_size = 0.2,
    random_state=42
)

X_valid_padded = pad_signals(X_valid_split, 192)
X_train_padded = pad_signals(X_train_split, 192)

autoencoder = Autoencoder(input_shape=X_train_padded.shape[1:], latent_dim=45)

In [None]:
autoencoder.compile(
    optimizer=optimizers.Adam(learning_rate=0.01),
    loss=losses.MeanSquaredError()
)
autoencoder.fit(
    X_train_padded,
    X_train_padded,
    batch_size=128,
    epochs=10,
    validation_data=(X_valid_padded, X_valid_padded)
)
autoencoder.save_weights('./checkpoints/autoencoder')

In [None]:
autoencoder = Autoencoder(input_shape=X_train_padded.shape[1:], latent_dim=45)
autoencoder.load_weights('./checkpoints/autoencoder')

signal = X_train_padded[1142][tf.newaxis, ...]
print(f'Signal shape: {signal.shape}')

encoded_signal = autoencoder.encode(signal)
print(f'Encoded signal shape: {encoded_signal.shape}')

reconstructed_signal = autoencoder(signal)
print(f'Reconstructed signal shape: {reconstructed_signal.shape}')

fig, ax = plt.subplots(1, 1, figsize=(18,6))

ax.plot(tf.squeeze(signal, axis=0), label='Original Signal')
ax.plot(tf.squeeze(reconstructed_signal, axis=0), label='Reconstructed Signal')
ax.legend()

plt.show()

## Classification

Preliminary accuracy result on `mitbih` dataset:

| Latent Rep. | Classifier | Accuracy |
|---|---|---|
| Autoencoder (45D) | SVM (no fine-tune) | 0.9680201016503912 |
| Autoencoder (45D) | GBC (no fine-tune) | 0.9479755582205471 |

In [None]:
X_train_transformed = autoencoder.encode(X_train_padded)
X_valid_transformed = autoencoder.encode(X_valid_padded)

svm = SVC(verbose=True)

svm.fit(X_train_transformed, y_train_split)

In [None]:
svm.score(X_valid_transformed, y_valid_split)

In [None]:
X_train_transformed = autoencoder.encode(X_train_padded)
X_valid_transformed = autoencoder.encode(X_valid_padded)

gbc = GradientBoostingClassifier(verbose=2)

gbc.fit(X_train_transformed, y_train_split)

In [None]:
gbc.score(X_valid_transformed, y_valid_split)