---
# <div align="center"><font color='blue'>  </font></div>
# <div align="center"><font color='blue'> COSC 2779 | Deep Learning  </font></div>
## <div align="center"> <font color='blue'> Week 11 Lab Exercises: **Autoencoder to detect anomalies on ECG**</font></div>
---

# Introduction
**COSC2779 - Deep Learning - 2020**

In this lab, you will train an autoencoder to detect anomalies on ECG signals. 

In this lab, you will:
- Learn how to build a convolutional autoencoder
- Train an autoencoder on normal data.
- Use the reconstruction error to detect anomalies

## Setting up the Notebook

Let's first load the packages we need.

In [None]:
import tensorflow as tf
AUTOTUNE = tf.data.experimental.AUTOTUNE
import numpy as np
import pandas as pd

import tensorflow_datasets as tfds
import pathlib
import shutil
import tempfile

from  IPython import display
from matplotlib import pyplot as plt

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model

We can use the tensor board to view the learning curves. Let's first set it up.

In [None]:
logdir = pathlib.Path(tempfile.mkdtemp())/"tensorboard_logs"
shutil.rmtree(logdir, ignore_errors=True)

# Load the TensorBoard notebook extension
%load_ext tensorboard

# Open an embedded TensorBoard viewer
%tensorboard --logdir {logdir}/models

We can also write our own function to plot the models training history ones training has completed.


In [None]:
from itertools import cycle
def plotter(history_hold, metric = 'binary_crossentropy', ylim=[0.0, 1.0]):
  cycol = cycle('bgrcmk')
  for name, item in history_hold.items():
    y_train = item.history[metric]
    y_val = item.history['val_' + metric]
    x_train = np.arange(0,len(y_val))

    c=next(cycol)

    plt.plot(x_train, y_train, c+'-', label=name+'_train')
    plt.plot(x_train, y_val, c+'--', label=name+'_val')

  plt.legend()
  plt.xlim([1, max(plt.xlim())])
  plt.ylim(ylim)
  plt.xlabel('Epoch')
  plt.ylabel(metric)
  plt.grid(True)

## Loading the dataset

We will use the [ECG5000](http://www.timeseriesclassification.com/description.php?Dataset=ECG5000) dataset. This dataset contains 5,000 Electrocardiograms, each with 140 data points. You will use a simplified version of the dataset, where each example has been labeled either 0 (corresponding to an abnormal rhythm), or 1 (corresponding to a normal rhythm). You are interested in identifying the abnormal rhythms.

*This is a labeled dataset, so you could phrase this as a supervised learning problem. The goal of this example is to illustrate anomaly detection concepts you can apply to larger datasets, where you do not have labels available (for example, in [How Airbus Detects Anomalies in ISS Telemetry Data](https://blog.tensorflow.org/2020/04/how-airbus-detects-anomalies-iss-telemetry-data-tfx.html), you have many thousands of normal examples, and only a small number of abnormal examples).*

In [None]:
# Download the dataset
dataframe = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header=None)
raw_data = dataframe.values
dataframe.head()

Extract labels and data. Split random train test sets.

In [None]:
# The last element contains the labels
labels = raw_data[:, -1]

# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]

train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21
)

Normalise the data

In [None]:
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)

train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)

train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)

Seperate data into normal and abnormal. We will be using only the normal training data to train the autoencoder.

Both normal and abnormal test data will be used to validate the model.

In [None]:
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

Plot some normal and abnormal data to get an understanding.

In [None]:
plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
plt.grid()
plt.plot(np.arange(140), normal_train_data[0])
plt.title("A Normal ECG")

plt.subplot(1,2,2)
plt.grid()
plt.plot(np.arange(140), anomalous_train_data[0])
plt.title("An Anomalous ECG")

plt.show()

## Model definition & Training

Define an Autoencoder model. We will be using the subclassing method for this example. However, you can use any other technique to define the model including sequential API and functional API.

*Note that we are not defining a sequential model to make the example simple.* 


In [None]:
class AnomalyDetector(Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    self.encoder = tf.keras.Sequential([
      layers.Dense(32, activation="relu"),
      layers.Dense(16, activation="relu"),
      layers.Dense(8, activation="relu")])
    
    self.decoder = tf.keras.Sequential([
      layers.Dense(16, activation="relu"),
      layers.Dense(32, activation="relu"),
      layers.Dense(140, activation="sigmoid")])
    
  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()

In [None]:
def get_callbacks(name):
  return [
    tf.keras.callbacks.TensorBoard(logdir/name, histogram_freq=1),
  ]

autoencoder.compile(optimizer='adam', loss='mae')

In [None]:
m_histories = {}
m_histories['autoencoder'] = autoencoder.fit(normal_train_data, normal_train_data, 
          epochs=20, 
          batch_size=512,
          validation_data=(test_data, test_data),
          callbacks=get_callbacks('models/autoencoder'),
          shuffle=True)

In [None]:
plotter(m_histories, ylim=[0.0, 0.1], metric = 'loss')

## Analysing the outputs

Lets first reconstruct some test data using the trained autoencoder and observe the reconstruction errors.

In [None]:
plt.figure(figsize=(10,5))
plt.subplot(1,2,1)
encoded_normal = autoencoder.encoder(normal_test_data).numpy()
decoded_normal = autoencoder.decoder(encoded_normal).numpy()

plt.plot(normal_test_data[0],'b')
plt.plot(decoded_normal[0],'r')
plt.fill_between(np.arange(140), decoded_normal[0], normal_test_data[0], color='lightcoral' )
plt.legend(labels=["Input", "Reconstruction", "Error"])

plt.subplot(1,2,2)
encoded_abnormal = autoencoder.encoder(anomalous_test_data).numpy()
decoded_abnormal = autoencoder.decoder(encoded_abnormal).numpy()

plt.plot(anomalous_test_data[0],'b')
plt.plot(decoded_abnormal[0],'r')
plt.fill_between(np.arange(140), decoded_abnormal[0], anomalous_test_data[0], color='lightcoral' )
plt.legend(labels=["Input", "Reconstruction", "Error"])

plt.show()

## Anomaly detection

Detect anomalies by calculating whether the reconstruction loss is greater than a fixed threshold. In this lab, you will calculate the mean average error for normal examples from the training set, then classify future examples as anomalous if the reconstruction error is higher than one standard deviation from the training set.

Plot the reconstruction error on normal ECGs from the training set

In [None]:
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss, bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

Choose a threshold value that is one standard deviations above the mean.

In [None]:
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

Lets plot the threshold on the train data.

In [None]:
plt.hist(train_loss, bins=50)
plt.vlines(np.mean(train_loss), 0, 300, colors='g')
plt.vlines(threshold, 0, 300, colors='r')
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

If you examine the reconstruction error for the anomalous examples in the test set, you'll notice most have greater reconstruction error than the threshold. By varying the threshold, you can adjust the precision and recall of your classifier.

In [None]:
reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)

plt.hist(test_loss, bins=50, alpha = 0.5, label='Anomalous')

reconstructions = autoencoder.predict(normal_test_data)
test_loss = tf.keras.losses.mae(reconstructions, normal_test_data)

plt.hist(test_loss, bins=50, alpha = 0.5, label='Normal')

plt.legend()

plt.vlines(threshold, 0, 80, colors='r')
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

Classify an ECG as an anomaly if the reconstruction error is greater than the threshold.

In [None]:
def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, preds)))
  print("Precision = {}".format(precision_score(labels, preds)))
  print("Recall = {}".format(recall_score(labels, preds)))

In [None]:
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)

Finally lets vizualize the hidden feature representation learned by the autoencoder.

In [None]:
from sklearn.manifold import TSNE
encoded_z = autoencoder.encoder(test_data).numpy()
X_embedded = TSNE(n_components=2).fit_transform(encoded_z)

plt.scatter(X_embedded[:, 0], X_embedded[:, 1], c=test_labels, cmap=plt.cm.Spectral)

plt.show()