<a href="https://colab.research.google.com/github/blonsbrough/First-Repository/blob/main/PHYS_555_02_DeepLearning_Assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Please copy the notebook and fill. Once filled, upload it to the course Brightspace.

---
If you worked with other students, provide name(s) here:

# ECG Anomaly Detection with Autoencoders

This homework will consists of a guided notebook where you will have to fill-in some blanks and comment. The notebook explores how we can detect abnormal electrocardiograms (ECG) with deep learning. It is likely not the most performant or best approach, but it will show the main concepts. You do not have to obtain a very performant model.



### Data
The dataset contains samples of 5,000 ECG with 140 timesteps. Each sequence corresponds to a single heartbeat from a single patient with congestive heart failure. If you are not familiar with ECG, you may read up a bit on it, e.g. on [wikipedia](https://en.wikipedia.org/wiki/Electrocardiography).


Here the dataset has simply been labelled with two classes: normal and abnormal.
We will use the labels to split the dataset.

Let us start with boiler plate code.

In [1]:
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.preprocessing import MinMaxScaler

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import matplotlib.pyplot as plt
%matplotlib inline

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Data preparation

The original data can be found on PhysioNet and is described [here](http://www.timeseriesclassification.com/description.php?Dataset=ECG5000). It was all prepared to be one single CSV file and can be found on [tensorflow tutorial](https://www.tensorflow.org/tutorials/generative/autoencoder#third_example_anomaly_detection).
We follow the same pre-processing steps and some of the evaluation procedures, with the slight differences, and with PyTorch.



In [2]:
# Download the dataset
dataframe = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header=None)
raw_data = dataframe.values
dataframe.head()

# The last element contains the labels
labels = raw_data[:, -1]

# The other data points are the ECG data
data = raw_data[:, 0:-1]

n_features = data.shape[1]

# split full data into training and test set
xtrain_data, test_data, ytrain_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21)

#  split a validation set from the training set
train_data, val_data, train_labels, val_labels = train_test_split(
    xtrain_data, ytrain_labels, test_size=0.2, random_state=22)


print(train_data.shape, val_data.shape, test_data.shape)

(3198, 140) (800, 140) (1000, 140)


The data are numpy arrays. Use scikit-learn `MinMaxScaling` to scale appropriately the training, validation and test sets.

In [3]:
scaler = # your code
train_data = # your code
val_data = # your code
test_data = # your code

train_labels = train_labels.astype(bool)
val_labels = val_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_val_data = val_data[val_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_val_data = val_data[~val_labels]
anomalous_test_data = test_data[~test_labels]

SyntaxError: ignored

Plot a few samples or normal and abnormal ECGs.

In [None]:
plt.grid()
plt.plot(np.arange(n_features), normal_train_data[0])
plt.title("Normal")
plt.show()

In [None]:
plt.grid()
plt.plot(np.arange(n_features), anomalous_train_data[0])
plt.title("Anomalous")
plt.show()


Now create a `Dataloader`  for each of the normal training, validation, and test set. It could be as simple as you want to be.

In [None]:
# your code
# train_loader...
# val_loader...
# test_loader...

All good. Let now build a first simple fully connected autoencoder. You are free to use any number of hidden layers and neurons, and the type of activation functions.

You could for example use encoding with number of inputs as: 140->32->16->8 and symmetric decoding, e.g. 8->16->32->140.

Remember to be consistent in the last activation with the choice of the loss function later on.

In [None]:
class ECGAutoencoder(nn.Module):

  def __init__(self, n_features, z_dim):
    super(ECGAutoencoder, self).__init__()

    self.encoder = # your code
    #
    #
    self.decoder = # your code
    #
    #

  def forward(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

### Training

Let's write a helper function for our training process, that will take an autoencoder neural network model, both a training and a validation `DataLoader` and will loop over `n_epochs`.

Make sure to return the trained model, and arrays of the losses at each epoch to allow post-training plots.

In [None]:
def train_model(model, train_loader, val_loader, n_epochs=150):

  optimizer = optim.Adam(model.parameters(), lr=1e-3)
  criterion = # choose a loss function

  train_losses = []
  val_losses = []

  for epoch in range(n_epochs):
    model = model.train()

    train_batch_losses = []

    for data in train_loader:
      # your code
      #
      #
      #

      train_batch_losses.append(loss.item())

    val_batch_losses = []
    model = model.eval()
    with torch.no_grad():
      for data in val_loader:
          # your code
          #
          #
          #

          val_losses.append(loss.item())

    train_loss = np.mean(train_batch_losses)
    val_loss = np.mean(val_batch_losses)

    train_losses.append(train_loss)
    val_losses.append(val_loss)

    if (epoch + 1) % 10 == 0:
        print(f"Epoch {epoch+1}: Losses Train = {train_loss:.4f} Valid = {val_loss:.4f}")

  # plot losses
  ax = plt.figure().gca()
  ax.plot(train_losses)
  ax.plot(val_losses)
  plt.ylabel('Loss')
  plt.xlabel('Epoch')
  plt.legend(['train', 'val'])
  plt.title('Loss monitoring')
  plt.show()

  # return trained model and loss arrays.
  return model.eval()

Now train an `ECGAutoencoder` with a latent dimension of 8, plot the loss history and save the model.

In [None]:
model = ECGAutoencoder(n_features, 8)
model = train_model(model, train_loader, val_loader)

torch.save(model, 'ecg_autoencoder.pt')


#### Comments

Add a couple lines of comments on the training procedure.


## Evaluating the outputs

The trained model can be used to reconstruct the ECGs.
Let's plot some of the reconstructed ECG segments, both from the normal and anomalous.

Create a prediction function that takes a trained model and a dataset, and returns arrays of the predictions and the losses.

We will use these to plot the reconstruction and histograms of the losses

In [None]:
def predict(model, data):

  preds, losses = [], []
  criterion = # your code. reuse the same loss as in your training procedure.

  with torch.no_grad():
    model = model.eval()
    #  your code

  # should return a tuple of an array of the predictions and an array the corresponding loss values
  return preds, losses

In [None]:
def plot_reconstruction(data, model, title, ax):
  recon, losses = predict(model, data)
  ax.plot(data, label='original')
  ax.plot(recon[0], label='reconstructed')
  ax.set_title(f'{title}: loss = {np.around(losses[0], 3)}')
  ax.legend()

In [None]:
n_ex = 4
fig, axs = plt.subplots(nrows=2, ncols=n_ex, sharex=True, sharey=True, figsize=(22, 8))

for col, data in enumerate(normal_test_data[:n_ex]):
  plot_reconstruction(data, model, title='Normal', ax=axs[0, col])

for col, data in enumerate(anomalous_test_data[:n_ex]):
  plot_reconstruction(data, model, title='Anomalous', ax=axs[1, col])

fig.tight_layout()

### Convolutional autoencoder

Now copy the`ECGAutoencoder` network definition, and modify it with a Convolutional Autoencoder. You are fairly free to build your network, here is a proposal:

- `self.encoder`: two blocks, each consisting of a one-dimensional convolutional layer, a ReLU activation, and a one-dimentional Batch normalisation. You are free to choose kernel size, strides and padding. You can add 2 fully-connected layers, to produce a compressed latent space.

- `self.decoder`: the symmetric of the encoder, by using `ConvTranspose1D` instead to expand to larger layers.

In [None]:
class ECGConvAutoencoder(nn.Module):

  def __init__(self, n_features, z_dim):
    super(ECGConvAutoencoder, self).__init__()
    self.encoder = # your code
    self.decoder = # your code

  def forward(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

Let's train the new network

In [None]:
model = ECGConvAutoencoder(n_features, 8)
model = train_model(model, train_loader, val_loader)

torch.save(model, 'ecg_convautoencoder.pt')

In [None]:
fig, axs = plt.subplots(nrows=2, ncols=n_ex, sharex=True, sharey=True, figsize=(22, 8))

for col, data in enumerate(normal_test_data[:n_ex]):
  plot_reconstruction(data, model, title='Normal', ax=axs[0, col])

for col, data in enumerate(anomalous_test_data[:n_ex]):
  plot_reconstruction(data, model, title='Anomalous', ax=axs[1, col])

fig.tight_layout()

## Anomalous samples detection

We can detect anomalies by evaluating whether the reconstruction metric (i.e. the loss for a sample) is greater than a given threshold.

For example, we can take the mean average error for normal examples from the training set, then classify future examples as anomalous if the reconstruction error is higher than one standard deviation from the training set.

Choose a threshold value that is one standard deviation above the mean of all the training sample losses.

Our function goes through each example in the dataset and records the predictions and losses. Let's get the losses and have a look at them.

The model can either the the convolutional or the fully-connected autoencoder.

In [None]:
_, train_losses = predict(model, normal_train_data)

plt.hist(train_losses, bins=50)
plt.xlabel("Train loss")
plt.ylabel("# of examples")
plt.show()

In [None]:
threshold = # your code

The problem is framed as a detection for a sample to be above or below the selected threshold.


Let's see how well the model performs on normal samples of the test set:


In [None]:
preds, losses = predict(model, normal_test_data)
plt.hist(losses, bins=50)
plt.xlabel("Test loss")
plt.ylabel("# of examples")
plt.title("Normal sample")
plt.show()

Do the same with the anomalous sample.

In [None]:
preds, losses = predict(model, anomalous_test_data)
plt.hist(losses, bins=50)
plt.xlabel("Test loss")
plt.ylabel("# of examples")
plt.title("Anomalous sample")
plt.show()

Finally, we can count the number of examples above and below the threshold similar to a classification model. Use standar metrics such as accuracy, precision and recall to evaluate your model.


In [None]:
def is_normal(model, data, threshold):
  # your code
  # returns true if loss is less than threshold

def print_stats(predictions, labels):
  print("Accuracy = {}".format(accuracy_score(labels, predictions)))
  print("Precision = {}".format(precision_score(labels, predictions)))
  print("Recall = {}".format(recall_score(labels, predictions)))

In [None]:
preds = is_normal(model, test_data, threshold)
print_stats(preds, test_labels)

## Extra

(Optional == Bonus points).
If time permits, you can build, train and evaluate other type or architectures
- an LSTM Autoencoder: it will take into account some time-dependence of the samples
- a VAE.