# Autoencoders
This exercise is a variation of [Tensorflow Intro to Autoencoders tutorial](https://www.tensorflow.org/tutorials/generative/autoencoder). An autoencoder is a special type of neural network that is trained to copy its input to its output. For example, given an image of a handwritten digit, an autoencoder first encodes the image into a lower dimensional latent representation, then decodes the latent representation back to an image. An autoencoder learns to compress the data while minimizing the reconstruction error. 

To learn more about autoencoders, please consider reading chapter 14 from [Deep Learning](https://www.deeplearningbook.org/) by Ian Goodfellow, Yoshua Bengio, and Aaron Courville.


Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

<a target="_blank" href="https://colab.research.google.com/github/PrzemekSekula/DeepLearningClasses1/blob/master/Autoencoders/Autoencoders_done.ipynb">
    <img src="https://www.tensorflow.org/images/colab_logo_32px.png" />
    Run in Google Colab</a>

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow as tf

from sklearn.metrics import accuracy_score, precision_score, recall_score
from sklearn.model_selection import train_test_split
from tensorflow.keras import layers, losses
from tensorflow.keras.datasets import fashion_mnist
from tensorflow.keras.models import Model

Helper functions

In [None]:
def plot_mnist(images1, labels1 = None, images2 = None, labels2 = None,n=10):
    """
    Plots n images in a row with their labels. Can plot two rows of images.
    Args:
        images1 (np.array): array of images
        labels1 (list, optional): labels for images1. Defaults to None.
        images2 (np.array, optional): array of images in the second row. Defaults to None.
        labels2 (list, optional): labels for images2. Defaults to None.
        n (int, optional): number of images to plot. Defaults to 10.
    """
    plt.figure(figsize=(2*n, 4))
    if images2 is None:
        nr_rows = 1
    else:
        nr_rows = 2
    for i in range(n):
    # display original
        ax = plt.subplot(nr_rows, n, i + 1)
        img = images1[i]
        if len(img.shape) > 2:
            img = tf.squeeze(img)
        plt.imshow(img)
        if labels1 is not None:
            plt.title(labels1[i])
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        if images2 is not None:
    # display reconstruction
            ax = plt.subplot(nr_rows, n, i + 1 + n)
            img = images2[i]
            if len(img.shape) > 2:
                img = tf.squeeze(img)
            plt.imshow(img)
            if labels2 is not None:
                plt.title(labels2[i])
            plt.gray()
            ax.get_xaxis().set_visible(False)
            ax.get_yaxis().set_visible(False)
    plt.show()

def plot_history(history):
    """
    Plots the loss and accuracy of the model.
    Args:
        history (keras.callbacks.History): Model.fit output
    """
    plt.figure()
    plt.plot(history.history['loss'], label='loss')
    plt.plot(history.history['val_loss'], label='val_loss')
    plt.legend()
    plt.show()

## Part 1 - Basic autoencoder
Let's build a basic autoencoder based on the presented schema. To define your model, we will use the [Keras Model Subclassing API](https://www.tensorflow.org/guide/keras/custom_layers_and_models).

In [None]:
(x_train, _), (x_test, _) = fashion_mnist.load_data()

x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.

print (x_train.shape)
print (x_test.shape)

In [None]:
plot_mnist(x_train)

### Task 1
Create a class `BasicAutoencoder` that inherits from `tf.keras.Model`. The class should have the following methods:
- `__init__(self, latent_dim)` - method where encoder and decoder are built (both as `tf.keras.Sequential` sets of layers). 
    - Encoder should have the following layers:
        - `Flatten` - to flatten images to vectors
        - `Dense` - layers with `latent_dim` neurons, where the values are encoded. Use `relu` activation function
    - Decoder should have the following layers:
        - `Dense` - layer with 784 neurons and `sigmoid` activation function
        - `Reshape` - layer that chances the shape of the output to 28x28

- `call(self, x)` - implements forward pass. The signal should go through encoder, and then through decoder

*Note: Remember to use `super().__init__()`*

In [None]:
class BasicAutoencoder(Model):
  def __init__(self, latent_dim):
    super().__init__()
    self.latent_dim = latent_dim   
    self.encoder = tf.keras.Sequential([
      layers.Flatten(),
      layers.Dense(latent_dim, activation='relu'),
    ])
    self.decoder = tf.keras.Sequential([
      layers.Dense(784, activation='sigmoid'),
      layers.Reshape((28, 28))
    ])

  def call(self, x):
    encoded = self.encoder(x)
    decoded = self.decoder(encoded)
    return decoded

#### Task 2 
Create and compile an Autoencoder model with latent dimension = 64. Use Adam optimizer and MSE loss.

In [None]:
autoencoder = BasicAutoencoder(64) 
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())

Let's train the model

In [None]:
history = autoencoder.fit(x_train, x_train,
                epochs=10,
                shuffle=True,
                validation_data=(x_test, x_test))
plot_history(history)

In [None]:
autoencoder.encoder.summary()

In [None]:
autoencoder.decoder.summary()

#### Task 3
Encode and then decode test images. display examples of raw and decoded images.

*Note: Remember to call .numpy() on autoencoder output*

In [None]:
encoded_imgs = autoencoder.encoder(x_test).numpy()
print (encoded_imgs[0].shape)

In [None]:
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
print (decoded_imgs[0].shape)

In [None]:
plot_mnist(x_test, images2 = decoded_imgs)

## Part 2 - Dealing with noisy images
This time we will:
- add noise to test images
- build a convolutional autoencoder 
- use this autoencoder to denoise images.


In [None]:
(x_train, _), (x_test, _) = fashion_mnist.load_data()
x_train = x_train.astype('float32') / 255.
x_test = x_test.astype('float32') / 255.
print (x_train.shape)

In [None]:
x_train = x_train[..., tf.newaxis]
x_test = x_test[..., tf.newaxis]

print(x_train.shape)

#### Adding noise to images
Let's add Gausian nosie to our datasets 

In [None]:
noise_factor = 0.2
x_train_noisy = x_train + noise_factor * tf.random.normal(shape=x_train.shape) 
x_test_noisy = x_test + noise_factor * tf.random.normal(shape=x_test.shape) 

x_train_noisy = tf.clip_by_value(x_train_noisy, clip_value_min=0., clip_value_max=1.)
x_test_noisy = tf.clip_by_value(x_test_noisy, clip_value_min=0., clip_value_max=1.)

In [None]:
n = 10
plt.figure(figsize=(20, 4))
for i in range(n):
    # display original
    ax = plt.subplot(1, n, i + 1)
    plt.imshow(tf.squeeze(x_test_noisy[i]))
    plt.title("original")
    plt.gray()
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

#### Task 4 - Create an autoencoder for denoisification
Create `Denoise` class that inherits from `tensorflow.keras.models.Model`. This class should have the following layers:
- Encoder:
    - `Input` layer with shape = (28, 28, 1)
    - `Conv2D` layer with `16` `3x3` filters, `relu` activation function, `same` padding and `stide=2`
    - `Conv2D` layer with `8` `3x3` filters, `relu` activation function, `same` padding and `stide=2`
- Decoder:
    - `Conv2DTranspose` layer with `8` `3x3` filters, `relu` activation function, `same` padding and `stide=2`
    - `Conv2DTranspose` layer with `16` `3x3` filters, `relu` activation function, `same` padding and `stide=2`
    - `Conv2D` layer with `1` `3x3` filter, `sigmoid` activation function, and `same` padding

*Note 1: Remember to implement the `call` method.*

*Note 2: Try to understand the size of inputs and outputs for each layer. You may be asked about this.*

*Note 3: Make sure, that you understand deconvolution layers (`Conv2DTranspose`). You may learn about it [here](https://www.tensorflow.org/api_docs/python/tf/keras/layers/Conv2DTranspose).*




In [None]:
class Denoise(Model):
    def __init__(self):
        super().__init__()
        self.encoder = tf.keras.Sequential([
            layers.Input(shape=(28, 28, 1)),
            layers.Conv2D(16, (3, 3), activation='relu', padding='same', strides=2),
            layers.Conv2D(8, (3, 3), activation='relu', padding='same', strides=2)
        ])
        self.decoder = tf.keras.Sequential([
            layers.Conv2DTranspose(8, kernel_size=3, strides=2, activation='relu', padding='same'),
            layers.Conv2DTranspose(16, kernel_size=3, strides=2, activation='relu', padding='same'),
            layers.Conv2D(1, kernel_size=(3, 3), activation='sigmoid', padding='same')
        ])

    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [None]:
autoencoder = Denoise()
autoencoder.compile(optimizer='adam', loss=losses.MeanSquaredError())
history = autoencoder.fit(x_train_noisy, x_train,
                epochs=10,
                shuffle=True,
                validation_data=(x_test_noisy, x_test))
plot_history(history)


In [None]:
encoded_imgs = autoencoder.encoder(x_test_noisy).numpy()
decoded_imgs = autoencoder.decoder(encoded_imgs).numpy()
plot_mnist(x_test_noisy, images2 = decoded_imgs)

## Part 3 - Anomaly detection
In this example, you will train an autoencoder to detect anomalies on the [ECG5000 dataset](http://www.timeseriesclassification.com/description.php?Dataset=ECG5000). This dataset contains 5,000 [Electrocardiograms](https://en.wikipedia.org/wiki/Electrocardiography), each with 140 data points. You will use a simplified version of the dataset, where each example has been labeled either `0` (corresponding to an abnormal rhythm), or `1` (corresponding to a normal rhythm). You are interested in identifying the abnormal rhythms.

Note: This is a labeled dataset, so you could phrase this as a supervised learning problem. The goal of this example is to illustrate anomaly detection concepts you can apply to larger datasets, where you do not have labels available (for example, if you had many thousands of normal rhythms, and only a small number of abnormal rhythms).

How will you detect anomalies using an autoencoder? Recall that an autoencoder is trained to minimize reconstruction error. You will train an autoencoder on the normal rhythms only, then use it to reconstruct all the data. Our hypothesis is that the abnormal rhythms will have higher reconstruction error. You will then classify a rhythm as an anomaly if the reconstruction error surpasses a fixed threshold.

In [None]:
df = pd.read_csv('http://storage.googleapis.com/download.tensorflow.org/data/ecg.csv', header=None)

raw_data = df.values
print (raw_data.shape)
df.head()

#### EDA with Sweetviz
Usually, the first step is to perform an exploratory data analysis. Luckily, there are packages (like [Sweetviz](https://pypi.org/project/sweetviz/)) that can perform a quick EDA for us. Here we are simply going to demonstrate how Sweetviz works. Thus, we are assuming that our dataset has only 10 features.


In [None]:
#!pip install sweetviz

In [None]:
def take_df_subset(df, nr_features = 10):
    res = df.copy()
    cols = list(res.columns[0:nr_features]) + [res.columns[-1]]
    res = res[cols]
    res.columns = [f'x_{i}' for i in range(nr_features)] + ['y']
    res.y = res.y.astype(bool)
    return res

take_df_subset(df).head()

In [None]:
import sweetviz as sv
orig_data_report = sv.analyze(take_df_subset(df), target_feat='y', pairwise_analysis="on")
orig_data_report.show_notebook()

#### Data split
- Train test split
- Features / labels selection
- Normalization
- Normal / anomalous data split

In [None]:
# The last element contains the labels
labels = raw_data[:, -1]

# The other data points are the electrocadriogram data
data = raw_data[:, 0:-1]

train_data, test_data, train_labels, test_labels = train_test_split(
    data, labels, test_size=0.2, random_state=21
)

EDA once again, but this time let's take a look at two subsets separately.

In [None]:
traindf = pd.DataFrame(train_data)
traindf['y'] = train_labels
traindf = take_df_subset(traindf)

testdf = pd.DataFrame(test_data)
testdf['y'] = test_labels
testdf = take_df_subset(testdf)

traintest_data_report = sv.compare(
    source = traindf,
    compare = testdf, 
    target_feat='y', pairwise_analysis="on")
traintest_data_report.show_notebook()

Normalziation

In [None]:
min_val = tf.reduce_min(train_data)
max_val = tf.reduce_max(train_data)

train_data = (train_data - min_val) / (max_val - min_val)
test_data = (test_data - min_val) / (max_val - min_val)

train_data = tf.cast(train_data, tf.float32)
test_data = tf.cast(test_data, tf.float32)

Normal / anomalous data split

In [None]:
train_labels = train_labels.astype(bool)
test_labels = test_labels.astype(bool)

normal_train_data = train_data[train_labels]
normal_test_data = test_data[test_labels]

anomalous_train_data = train_data[~train_labels]
anomalous_test_data = test_data[~test_labels]

In [None]:
plt.grid()
plt.plot(np.arange(140), normal_train_data[0])
plt.title("A Normal ECG")
plt.show()

In [None]:
plt.grid()
plt.plot(np.arange(140), anomalous_train_data[0])
plt.title("An Anomalous ECG")
plt.show()

#### Task 5
Biuld `AnomalyDetector` class with:
- Encoder that contains 3 dense layers
    - 32 neurons, relu activation
    - 16 neurons, relu activation
    - 8 neurons, relu activation
- Decoder that contains 2 dense layers and an output layer
    - 16 neurons, relu activation
    - 32 neurons, relu activation
    - For output layer choose the number of neurons and the activation function yourself.



In [None]:
class AnomalyDetector(Model):
    def __init__(self):
        super().__init__()
        self.encoder = tf.keras.Sequential([
            layers.Dense(32, activation="relu"),
            layers.Dense(16, activation="relu"),
            layers.Dense(8, activation="relu"),
        ])
        self.decoder = tf.keras.Sequential([
            layers.Dense(16, activation="relu"),
            layers.Dense(32, activation="relu"),
            layers.Dense(140, activation="sigmoid"),
        ])
    def call(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [None]:
autoencoder = AnomalyDetector()
autoencoder.compile(optimizer='adam', loss='mae')

In [None]:
history = autoencoder.fit(normal_train_data, normal_train_data, 
          epochs=20, 
          batch_size=512,
          validation_data=(test_data, test_data),
          shuffle=True)
plot_history(history)

In [None]:
def encode_and_analyze(input_data, pos = 0):
    data = input_data.numpy()[pos, tf.newaxis]
    encoded_data = autoencoder.encoder(data).numpy()
    decoded_data = autoencoder.decoder(encoded_data).numpy()

    plt.figure()
    plt.plot(data[0], 'b')
    plt.plot(decoded_data[0], 'r')
    plt.fill_between(np.arange(140), data[0], decoded_data[0], color='lightcoral')
    plt.legend(['Input', 'Reconstruction', 'Error'])
    plt.show()

encode_and_analyze(normal_test_data, 0)

In [None]:
encode_and_analyze(anomalous_test_data, 0)

#### Anomaly detection using a threshold
Detect anomalies by calculating whether the reconstruction loss is greater than a fixed threshold. In this notebook, you will calculate the mean average error for normal examples from the training set, then classify future examples as anomalous if the reconstruction error is higher than one standard deviation from the training set.

In [None]:
reconstructions = autoencoder.predict(normal_train_data)
train_loss = tf.keras.losses.mae(reconstructions, normal_train_data)

plt.hist(train_loss[None,:], bins=50)
plt.xlabel("Train loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
reconstructions = autoencoder.predict(anomalous_test_data)
test_loss = tf.keras.losses.mae(reconstructions, anomalous_test_data)

plt.hist(test_loss[None, :], bins=50)
plt.xlabel("Test loss")
plt.ylabel("No of examples")
plt.show()

In [None]:
threshold = np.mean(train_loss) + np.std(train_loss)
print("Threshold: ", threshold)

In [None]:
def predict(model, data, threshold):
  reconstructions = model(data)
  loss = tf.keras.losses.mae(reconstructions, data)
  return tf.math.less(loss, threshold)

def print_stats(predictions, labels):
  print("Accuracy = {:.1f}%".format(100*accuracy_score(labels, predictions)))
  print("Precision = {:.3f}".format(precision_score(labels, predictions)))
  print("Recall = {:.3f}".format(recall_score(labels, predictions)))

In [None]:
preds = predict(autoencoder, test_data, threshold)
print_stats(preds, test_labels)