# Lab 5: Anomaly Detection Autoencoders

<a target="_blank" href="https://colab.research.google.com/github/andrew-nash/CS6421-labs-2025/blob/main/CS6421_Lab_05.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

Source: https://www.tensorflow.org/tutorials/generative/autoencoder#third_example_anomaly_detection

In this lab we will look at modelling non-image data with autoencoders - specifically, we will be taking ECG (Electrocardiogram) signals, and uing an autoencoder to identify potential abnormalities in the signals

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split

# The Data

We will be using the [ECG5000 Dataset](http://www.timeseriesclassification.com/description.php?Dataset=ECG5000). This countains 5000 ECG signals each of 140 samples. Each signal has a label indicating whether it contains normal or abnormal behaviour.

In [None]:
dataframe = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header=None)
raw_data = dataframe.values
dataframe.head()

In [None]:
# The last element contains the labels
labels = raw_data[:, -1]

# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]

train_data, valid_data, train_labels, valid_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21
)

In [None]:
labels

The only pre-processing needed will be to scalem the data to be between 0 and 1

In [None]:
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)

train_data = (train_data - min_val) / (max_val - min_val)
valid_data = (valid_data - min_val) / (max_val - min_val)

train_data = tf.cast(train_data, tf.float32)
valid_data = tf.cast(valid_data, tf.float32)

In [None]:
# convert the 1s and 0s to True/False
train_labels = train_labels.astype(bool)
valid_labels = valid_labels.astype(bool)

#    train_data[ [True,False,True,False,False,...] ] will return the values of train_data
#                                                    at indices that correspond to True
normal_train_data = train_data[train_labels]
normal_valid_data = valid_data[valid_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_valid_data = valid_data[~valid_labels]

In [None]:
plt.grid()
plt.plot(np.arange(140), normal_train_data[0])
plt.title("A Normal ECG")
plt.show()

In [None]:
plt.grid()
plt.plot(np.arange(140), anomalous_train_data[0])
plt.title("An Anomalous ECG")
plt.show()

# Modelling

We are going to approach this problem in what might perhaps be a surprising manner.

We will **not** fit a regression/classification deep model onto the data directly.

Instead, we will fit an autoecnoder to the signals and attempt to reconstruct its inputs.

This can be considered to be a model that tries as best as possible to learn a *"normal"* model of ECG signal behaviour. Then, we will compare the *actual* signals, to their reconstruction - our assumption is that deviations from the reconstructions are caused by anomalous behavior.

<strong>What is the benefit of this over a more traditional approach?</strong>

## Creating the reconstruction autoencoder model

In [None]:
%load_ext tensorboard
%tensorboard --logdir tboard

In [None]:
class AnomalyDetector(tf.keras.models.Model):
  def __init__(self):
    super(AnomalyDetector, self).__init__()
    self.encoder = tf.keras.Sequential([
      tf.keras.layers.Dense(32, activation="relu"),
      tf.keras.layers.Dense(16, activation="relu"),
      tf.keras.layers.Dense(8, activation="relu")])

    self.decoder = tf.keras.Sequential([
      tf.keras.layers.Dense(16, activation="relu"),
      tf.keras.layers.Dense(32, activation="relu"),
      tf.keras.layers.Dense(140, activation="sigmoid")])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

autoencoder = AnomalyDetector()

autoencoder.compile(optimizer='adam', loss='mse')

tensorboard_callback = tf.keras.callbacks.TensorBoard(f"./tboard/dense_basic", histogram_freq=1)
autoencoder.fit(normal_train_data, normal_train_data,
          epochs=20,
          batch_size=512,
          validation_data=(valid_data, valid_data),
          shuffle=True, callbacks=[tensorboard_callback])

Let us now look at the reconstruction of a normal (non-anaomalous) ECG signal

In [None]:
encoded_data = autoencoder.encoder(normal_valid_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

plt.plot(normal_valid_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], normal_valid_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

... compared to an anomalous signal:

In [None]:
encoded_data = autoencoder.encoder(anomalous_valid_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

plt.plot(anomalous_valid_data[0], 'b')
plt.plot(decoded_data[0], 'r')
plt.fill_between(np.arange(140), decoded_data[0], anomalous_valid_data[0], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

## Detecting the anomalies

Detect anomalies by calculating whether the reconstruction loss is greater than a fixed threshold. In this tutorial, you will calculate the mean average error for normal examples from the training set, then classify future examples as anomalous if the reconstruction error is higher than one standard deviation from the training set.

Plot the reconstruction error on normal ECGs from the training set

In [None]:
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

There are many heuristics for choosing this threshold.

Next, look at the distribution of reconstruction errors for the validation data

In [None]:
reconstructions = autoencoder.predict(anomalous_valid_data)
valid_loss = tf.keras.losses.mae(reconstructions, anomalous_valid_data)

plt.hist(valid_loss[None, :], bins=50)
plt.xlabel("Validation loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, predictions)))
  print("Precision = {}".format(precision_score(labels, predictions)))
  print("Recall = {}".format(recall_score(labels, predictions)))

In [None]:
preds = predict(autoencoder, valid_data, threshold)
print_stats(preds, valid_labels)

# Auto-encoder Variants - Contractive Autoncoder

Already, we can see that the above auto-encoder is an under-complete autoncoder.

We can extend it to be a contractive autoencoder by adding a regularizing loss term:

\begin{equation}
L = L(x - \hat{x}) + \lambda\sum_i ||\nabla_x h(x)||
\end{equation}

Where $h(x)$ isthe bottleneck

## Contractive Auto-Encoder

In [None]:
class ContractiveAutoencoder(tf.keras.Model):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        self.encoder = tf.keras.Sequential([
            tf.keras.layers.Dense(32,input_shape=(140,), activation="relu"),
            tf.keras.layers.Dense(16, activation="relu"),
            tf.keras.layers.Dense(8, activation="relu")
        ])
        self.decoder = tf.keras.Sequential([
            tf.keras.layers.Dense(16, activation="relu"),
            tf.keras.layers.Dense(32, activation="relu"),
            tf.keras.layers.Dense(140, activation="sigmoid")
        ])
    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        self.add_loss(self.contractive_loss(x))

        return decoded

    def contractive_loss(self, x):
        with tf.GradientTape() as tape:
            tape.watch(x)
            encoded = self.encoder(x)
        jacobian = tape.batch_jacobian(encoded, x)
        contractive_loss = tf.reduce_sum(tf.square(jacobian), axis=(1,2))
        return 1e-4 * contractive_loss

cae = ContractiveAutoencoder()
cae.compile(optimizer='adam', loss='mse')

cae.fit(normal_train_data, normal_train_data,
          epochs=20,
          batch_size=512,
          validation_data=(valid_data, valid_data),
          shuffle=True, callbacks=[tensorboard_callback])

In [None]:
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

reconstructions = autoencoder.predict(anomalous_valid_data)
valid_loss = tf.keras.losses.mae(reconstructions, anomalous_valid_data)

preds = predict(cae, valid_data, threshold)
print_stats(preds, valid_labels)