Notebook is created just for learning and experimentation purpose. Examples and the implementation is based on various research available on the internet and Tensorflow documentation. For now, it just provides a beginner level understanding and in future will lead to a more advanced solution.

## Library Import 

In [42]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model

## Load the dataset

We are going to use the MNIST dataset where image resolution is 28x28.

In [57]:
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()
labels_dict = {
    '0': 'T-shirt/top',
    '1': 'Trouser',
    '2': 'Pullover',
    '3': 'Dress',
    '4': 'Coat',
    '5': 'Sandal',
    '6': 'Shirt',
    '7': 'Sneaker',
    '8': 'Bag',
    '9': 'Ankle boot'
}

x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

print (x_train.shape)
print (x_test.shape)

# display original
n = 5
for i in range(n):
    ax = plt.subplot(1, n, i+1)
    select_img_index = np.random.randint(x_test.shape[0])
    plt.imshow(x_test[select_img_index])
    plt.title(labels_dict[str(y_test[select_img_index])])
    # plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()

# <font color='green'>**Image Compression**</font> 

Define an autoencoder with two Dense layers: an encoder, which compresses the images into a 64 dimensional latent vector, and a decoder, that reconstructs the original image from the latent space.

In [58]:
latent_dim = 64 

class Autoencoder(Model):
    def __init__(self, latent_dim):
        super(Autoencoder, self).__init__()
        self.latent_dim = latent_dim   
        self.encoder = tf.keras.Sequential([
          layers.Flatten(),
          layers.Dense(latent_dim, activation='relu'),
        ])
        self.decoder = tf.keras.Sequential([
          layers.Dense(784, activation='sigmoid'),
          layers.Reshape((28, 28))
        ])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

autoencoder = Autoencoder(latent_dim)

In [45]:
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())

Train the model using x_train as both the input and the target. The encoder will learn to compress the dataset from 784 dimensions to the latent space, and the decoder will learn to reconstruct the original images. .

In [46]:
autoencoder.fit(x_train, x_train,
                epochs=10,
                shuffle=True,
                validation_data=(x_test, x_test))

In [87]:
autoencoder.encoder.summary(), autoencoder.decoder.summary();

Now that the model is trained, let's test it by encoding and decoding images from the test set.

In [47]:
encoded_imgs = autoencoder.encoder(x_test).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()

In [89]:
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
    # display original
    ax = plt.subplot(2, n, i + 1)
    select_img_index = np.random.randint(x_test.shape[0])
    plt.imshow(x_test[select_img_index])
    plt.title("original")
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # display reconstruction
    ax = plt.subplot(2, n, i + 1 + n)
    plt.imshow(decoded_imgs[select_img_index])
    plt.title("reconstructed")
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()

# <font color='green'>**Image Denoising (using Convolution Autoencoder)**</font> 

We can use Autoencoders to clean images by removing the unwanted noise in the given input. In the below steps we'll introduce this noise using synthetic methods, and later will train an Autoencoder network to produce noise free images.

In [73]:
(x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()

In [74]:
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

print(x_train.shape)

Adding random noise to the images

In [75]:
noise_factor = 0.3
x_train_noisy = x_train + noise_factor * tf.random.normal(shape=x_train.shape) 
x_test_noisy = x_test + noise_factor * tf.random.normal(shape=x_test.shape) 

x_train_noisy = tf.clip_by_value(x_train_noisy, clip_value_min=0., clip_value_max=1.)
x_test_noisy = tf.clip_by_value(x_test_noisy, clip_value_min=0., clip_value_max=1.)


# display original + noisy image
n = 5
for i in range(n):
    # original 
    ax = plt.subplot(2, n, i+1)
    select_img_index = np.random.randint(x_test.shape[0])
    plt.imshow(tf.squeeze(x_test[select_img_index]))
    plt.title(labels_dict[str(y_test[select_img_index])])
    # plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # noisy
    ax = plt.subplot(2, n, i+1+n)
    plt.imshow(tf.squeeze(x_test_noisy[select_img_index]))
    plt.title('+ noise')
    # plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()

In this example, we will train a convolutional autoencoder using Conv2D layers in the encoder, and Conv2DTranspose layers in the decoder.

In [76]:
class Denoise(Model):
    def __init__(self):
        super(Denoise, self).__init__()
        self.encoder = tf.keras.Sequential([
          layers.Input(shape=(28, 28, 1)),
          layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),
          layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)])
        
        # The decoder upsamples the images back from 7x7 to 28x28.
        self.decoder = tf.keras.Sequential([
          layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
          layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
          layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

autoencoder = Denoise()

In [77]:
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())

In [78]:
autoencoder.fit(x_train_noisy, x_train,
                epochs=10,
                shuffle=True,
                validation_data=(x_test_noisy, x_test))

In [86]:
autoencoder.encoder.summary(), autoencoder.decoder.summary();

In [88]:
encoded_imgs = autoencoder.encoder(x_test_noisy).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()

In [91]:
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
    # display original
    ax = plt.subplot(2, n, i + 1)
    select_img_index = np.random.randint(x_test.shape[0])
    plt.imshow(tf.squeeze(x_test_noisy[select_img_index]))
    plt.title("original")
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # display reconstruction
    ax = plt.subplot(2, n, i + 1 + n)
    plt.imshow(tf.squeeze(decoded_imgs[select_img_index]))
    plt.title("reconstructed")
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)
plt.show()

# <font color='green'>**Anomaly detection (on the ECG5000 dataset)**</font> 

About dataset: http://www.timeseriesclassification.com/description.php?Dataset=ECG5000
        
*         The original dataset for "ECG5000" is a 20-hour long ECG downloaded from Physionet. 
*         The data was pre-processed in two steps: (1) extract each heartbeat, (2) make each heartbeat equal length using interpolation.

This dataset contains 5,000 Electrocardiograms, each with 140 data points. 
Here, we'll use a simplified version of the dataset, where each example has been labeled either 0 (corresponding to an abnormal rhythm), or 1 (corresponding to a normal rhythm). You are interested in identifying the abnormal rhythms.

In [93]:
# Download the dataset
dataframe = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header=None)
raw_data = dataframe.values
print(dataframe.shape)
dataframe.head()

In [120]:
# The last element contains the labels
labels = raw_data[:, -1]

# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]

train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21
)

In [121]:
# Normalize to 0 - 1
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)

train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)

train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)

In [122]:
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

train_data, normal_test_data, anomalous_train_data

In [133]:
# Plot a normal ECG.


plt.grid()
selected_index = np.random.randint(normal_train_data.shape[0])
plt.plot(np.arange(140), normal_train_data[selected_index])
plt.title(f"A Normal ECG - Index:{selected_index}")
plt.show()

In [134]:
# Plot an anomalous ECG.

plt.grid()
plt.plot(np.arange(140), anomalous_train_data[0])
plt.title("An Anomalous ECG")
plt.show()

### Build the model

In [136]:
class AnomalyDetector(Model):
    def __init__(self):
        super(AnomalyDetector, self).__init__()
        self.encoder = tf.keras.Sequential([
          layers.Dense(32, activation="relu"),
          layers.Dense(16, activation="relu"),
          layers.Dense(8, activation="relu")])

        self.decoder = tf.keras.Sequential([
          layers.Dense(16, activation="relu"),
          layers.Dense(32, activation="relu"),
          layers.Dense(140, activation="sigmoid")])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

autoencoder = AnomalyDetector()

In [138]:
autoencoder.compile(optimizer='adam', loss='mae')

In [139]:
history = autoencoder.fit(normal_train_data,
                          normal_train_data,
                          epochs=20,
                          batch_size=512,
                          validation_data=(test_data, test_data),
                          shuffle=True)

Notice that the autoencoder is trained using only the normal ECGs, but is evaluated using the full test set.


In [140]:
plt.plot(history.history["loss"], label="Training Loss")
plt.plot(history.history["val_loss"], label="Validation Loss")
plt.legend()

In [150]:
encoded_data = autoencoder.encoder(normal_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

selected_index = np.random.randint(normal_test_data.shape[0])

plt.plot(normal_test_data[selected_index], 'b')
plt.plot(decoded_data[selected_index], 'r')
plt.title(f'Normal Test Example - {selected_index}')
plt.fill_between(np.arange(140), decoded_data[selected_index], normal_test_data[selected_index], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

In [152]:
# Create a similar plot, this time for an anomalous test example.

encoded_data = autoencoder.encoder(anomalous_test_data).numpy()
decoded_data = autoencoder.decoder(encoded_data).numpy()

selected_index = np.random.randint(anomalous_test_data.shape[0])

plt.plot(anomalous_test_data[selected_index], 'b')
plt.plot(decoded_data[selected_index], 'r')
plt.title(f'Normal Test Example - {selected_index}')
plt.fill_between(np.arange(140), decoded_data[selected_index], anomalous_test_data[selected_index], color='lightcoral')
plt.legend(labels=["Input", "Reconstruction", "Error"])
plt.show()

ECG can be considered as anomalous if the reconstruction error is greater than one standard deviation from the normal input observation.

We can calculate the MAE between reconstructions and the normalized ECG values. Mean and Std. of this MAE can be used to decide the MAE threshold value above which a datapoint should be classified as anomaly. 

In [157]:
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss, bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

Note that, this train loss distribution is obtained for the normal ECG training examples, and here we can clearly see that most of the points give MAE < ~0.04. So our threshold can be decided nearby only.

In [160]:
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

**Note:** There are other strategies you could use to select a threshold value above which test examples should be classified as anomalous, the correct approach will depend on your dataset. You can learn more with the links at the end of this tutorial.

In [161]:
reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

If we examine the reconstruction error for the anomalous examples in the test set, we notice most have greater reconstruction error than the threshold. By varing the threshold, we can adjust the precision and recall of our classifier.

#### Classify an ECG as an anomaly if the reconstruction error is greater than the threshold.

In [162]:
def predict(model, data, threshold):
    reconstructions = model(data)
    loss = tf.keras.losses.mae(reconstructions, data)
    return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
    print("Accuracy = {}".format(accuracy_score(labels, predictions)))
    print("Precision = {}".format(precision_score(labels, predictions)))
    print("Recall = {}".format(recall_score(labels, predictions)))
    
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)

Later, let's look at few custom and advanced solutions. Don't forget to upvote if you find it useful. 