##### Copyright 2019 The TensorFlow Authors.

In [None]:
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Novelty Detection with Autoencoder

In [None]:
from __future__ import absolute_import, division, print_function, unicode_literals

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_datasets as tfds

from tensorflow.keras.datasets import mnist

In [None]:
class Encoder(tf.keras.layers.Layer):
    def __init__(self, intermediate_dim):
        super(Encoder, self).__init__()
        self.hidden_layer = tf.keras.layers.Dense(units=intermediate_dim, activation=tf.nn.relu)
        self.output_layer = tf.keras.layers.Dense(units=intermediate_dim, activation=tf.nn.relu)
    
    def call(self, input_features):
        activation = self.hidden_layer(input_features)
        return self.output_layer(activation)

In [None]:
class Decoder(tf.keras.layers.Layer):
    def __init__(self, intermediate_dim, original_dim):
        super(Decoder, self).__init__()
        self.hidden_layer = tf.keras.layers.Dense(units=intermediate_dim, activation=tf.nn.relu)
        self.output_layer = tf.keras.layers.Dense(units=original_dim, activation=tf.nn.relu)
  
    def call(self, code):
        activation = self.hidden_layer(code)
        return self.output_layer(activation)

In [None]:
class Autoencoder(tf.keras.Model):
  def __init__(self, intermediate_dim, original_dim):
    super(Autoencoder, self).__init__()
    self.loss = []
    self.encoder = Encoder(intermediate_dim=intermediate_dim)
    self.decoder = Decoder(intermediate_dim=intermediate_dim, original_dim=original_dim)

  def call(self, input_features):
    code = self.encoder(input_features)
    reconstructed = self.decoder(code)
    return reconstructed

In [None]:
def loss(preds, real):
  return tf.reduce_mean(tf.square(tf.subtract(preds, real)))

In [None]:
def train(loss, model, opt, original):
  with tf.GradientTape() as tape:
    preds = model(original)
    reconstruction_error = loss(preds, original)
  gradients = tape.gradient(reconstruction_error, model.trainable_variables)
  gradient_variables = zip(gradients, model.trainable_variables)
  opt.apply_gradients(gradient_variables)
  
  return reconstruction_error

In [None]:
def train_loop(model, opt, loss, dataset, epochs):
  for epoch in range(epochs):
    epoch_loss = 0
    for step, batch_features in enumerate(dataset):
      loss_values = train(loss, model, opt, batch_features)
      epoch_loss += loss_values
    model.loss.append(epoch_loss)
    print('Epoch {}/{}. Loss: {}'.format(epoch + 1, epochs, epoch_loss.numpy()))

## Process the dataset

In [None]:
(x_train, y_train), (x_test, y_test) = mnist.load_data()

x_train = x_train.astype(np.float32)
x_test = x_test.astype(np.float32)

x_train = x_train / 255.
x_test = x_test / 255.

x_train = np.reshape(x_train, (x_train.shape[0], 784))
x_test = np.reshape(x_test, (x_test.shape[0], 784))

## Prepare the novelty dataset

In [None]:
# This takes several minutes to download and prepare

emnist_ds = tfds.load('emnist/letters:3.*.*', split='test')

def transform(d):
    image = d['image']
    label = d['label']
    
    image = tf.transpose(image, perm=(1, 0, 2))
    image = tf.cast(image, tf.float32) / 255.0
    
    return {'image': image, 'label': label}

emnist_ds = emnist_ds.map(transform)

In [None]:
fashion_ds = tfds.load('fashion_mnist:3.*.*', split='test')

In [None]:
novelty_ds = emnist_ds

In [None]:
novelty_data = np.array([x['image'].numpy() for x in novelty_ds]).reshape(-1, 784)

In [None]:
novelty_data.shape, novelty_data.dtype

In [None]:
x_test.shape, x_test.dtype

## Train the model

In [None]:
# hyperparameters
batch_size = 256
epochs = 10
intermediate_dim = 128

In [None]:
training_dataset = tf.data.Dataset.from_tensor_slices(x_train).batch(batch_size)

model = Autoencoder(intermediate_dim=intermediate_dim, original_dim=784)
opt = tf.keras.optimizers.Adam(learning_rate=1e-2)

train_loop(model, opt, loss, training_dataset, epochs)

## Plot the in-training performance

In [None]:
plt.plot(range(epochs), model.loss)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()

## Visualization

In [None]:
def display(indices, data=x_test):
    number = len(indices)
    plt.figure(figsize=(20, 4))
    for i, index in enumerate(indices):
        # display original
        ax = plt.subplot(2, number, i + 1)
        original = data[index].reshape(28, 28)
        reconstructed = model(data)[index].numpy().reshape(28, 28)

        # the displayed error is scaled
        error = np.round(np.square(original - reconstructed).sum(), 3)

        plt.imshow(original * 255)
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)

        # display reconstruction
        ax = plt.subplot(2, number, i + 1 + number)
        plt.imshow(reconstructed * 255)
        plt.gray()
        ax.get_xaxis().set_visible(False)
        ax.get_yaxis().set_visible(False)
        ax.text(0, -1, error, fontdict={'size': 25})
    plt.show()

In [None]:
indices = np.random.choice(range(len(y_test)), 10)
display(indices)

In [None]:
indices = np.random.choice(range(len(novelty_data)), 10)
display(indices, data=novelty_data)

## Sort novelty data by reconstruction error

In [None]:
originals = novelty_data
reconstructeds = model(novelty_data).numpy()

errors = np.square(originals - reconstructeds).sum(axis=-1).astype(int)

sorted_args = np.argsort(errors)
in_indices = sorted_args[:10]
out_indices = sorted_args[-10:]

In [None]:
display(in_indices, novelty_data)

In [None]:
display(out_indices, novelty_data)

In [None]:
start = 2000
display(sorted_args[start:start+10], novelty_data)

# Analysis

In [None]:
# get reconstruction errors from the two test data sets
test_reconstructed = model(x_test).numpy()
test_errors = np.round(np.square(x_test - test_reconstructed).sum(-1), 3)

novelty_reconstructed = model(novelty_data).numpy()
novelty_errors = np.round(np.square(novelty_data - novelty_reconstructed).sum(-1), 3)

print(test_errors.shape, novelty_errors.shape)

In [None]:
plt.hist(novelty_errors, bins=30, fc=(0, 0, 1, 0.5))
plt.hist(test_errors, bins=30, fc=(1, 0, 0, 0.5))

plt.show()