# Anomaly Detection using CNN Autoencoder

### Loading Data from Google Drive

In [None]:
# Unzip dataset
!unzip -o Dataset/fruits_anomaly_detection.zip

### Import

In [None]:
import os
import keras
from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten, Input, Conv2D, MaxPooling2D, UpSampling2D, Rescaling
import matplotlib.pyplot as plt
from keras import backend as K
import numpy as np
from tensorflow.keras.utils import array_to_img, img_to_array, load_img, image_dataset_from_directory
from PIL import Image, ImageChops
import random

### Create generators for training, validation and testing


In [None]:
# TODO: prepare data using a generator
seed = 91166
batch_size = 85

# TODO: define the Normalization Layer
normalization_layer = ...

# Dataset definition
# TODO: use 'image_dataset_from_directory' to create a Dataset
#       Split training set into training/validation
train_dataset, validation_dataset = ...
test_dataset = ...
# TODO: choose eggplant as anomaly test set
anomaly_dataset = ...

# Normalization
# TODO: define the normalization function
# NOTE: use a lambda function λ
_norm_function = ...

# TODO: apply the normalization function to Datasets
# NOTE: check the 'map' method
train_dataset_norm = ...
validation_dataset_norm = ...
test_dataset_norm = ...
anomaly_dataset_norm = ...

# X as label (X = Y)
# TODO: force the Dataset to return the original image (X) as the
#       label (Y)
# NOTE: use a lambda function λ
_replaceY_function = ...
train_dataset_norm = ...
validation_dataset_norm = ...
test_dataset_norm = ...
anomaly_dataset_norm = ...

In [None]:
for dataset in [train_dataset_norm, validation_dataset_norm, test_dataset_norm, anomaly_dataset_norm]:
    print(dataset)
    # TODO: use 'take' method to retrieve a single batch from Dataset
    #       and plot the images (X, Y) and some stats (like shapes)
    ...

## **Autoencoder Architecture**

As we have seen in the case of MLP Autoencoder, we build a structure composed by an **Encoder**, that able to reduce the dimensions of our data (extract latent fetaures), and a **Decoder**, that is able to restore the original dimensions.

The output has to have the same structure of the input, the objective is to learn a model able to reconstruct well (producing small reconstruction error) data coming from the same distribution of the training data.

Different data (for example anomalies) should produce higher reconstruction error.

In order to inncrease the data size in the Decoder part we can use the class
**`UpSampling2D`**  https://keras.io/api/layers/reshaping_layers/up_sampling2d/


In [None]:
# Define the convolutional autoencoder model

# input shape must be the same size as the images that will be fed into it by the generators
# The output layer must be the same dimensions as the original image
model = Sequential()
#-------------------------

# Encoder
# TODO: define the encoder part as follows:
#       - use (3,3) kernel size
#       - use 3 conv layers (each layer followed by a MaxPooling layer)
#       - use the following filters number path: 16, 8, 3
#       - use padding='same'
#       - use relu activation
...

# Decoder
# TODO: replicate the Encoder structure but in the "opposite" way
#       Use the sigmoid for last layer
...
#-------------------------

model.summary()

# Compile the model
model.compile(optimizer='adam', loss='mean_squared_error')

### **Training**

In [6]:
# Define model name
model_filepath = 'image_anomaly_ae.keras'

In [None]:
# Training the model

# Early stopping (stops training when validation doesn't improve for {patience} epochs)
# TODO: implement EarlyStopping
# NOTE: see keras callbacks EarlyStopping
es = ...

# Saves the best version of the model to disk (as measured on the validation data set)
# TODO: implement SaveBest callback
# NOTE: see keras callbacks ModelCheckpoint
save_best = ...

# TODO: train model for 50 epochs using pre-defined callbacks
history = model.fit(...)

In [None]:
# Plot training and validation losses during training phase
plt.plot(history.history['loss'], label="Loss")
plt.plot(history.history['val_loss'], label="ValLoss")
plt.legend()
plt.title("Training Phase")
plt.tight_layout()
plt.show()

Training continues after improvement stops for the number of epochs equivalent to the 'patience' hyper-parameter

In [None]:
# To get back the model that performed best on the validation set we load the checkpointed model from disk.
# TODO: load model from file
model = ...
model.summary()

### Testing

In [None]:
# Plot original image VS model reconstructed image
def _plot_original_vs_predicted(dataset, title=""):
    # take 1 batch
    for x, _ in dataset.take(1):
        # TODO: use juste first 4 images as model input
        predicted = ...
        print(predicted.shape)
        
        # plot Original vs Predicted
        fig, axs = plt.subplots(4, 2, figsize=(5,8))
        for i in range(4):
            # TODO: plot original image
            axs[i][0].imshow(...)
            # TODO: plot reconstructed image
            axs[i][1].imshow()
        
        axs[0][0].set_title("Original")
        axs[0][1].set_title("Predicted")
        fig.suptitle(title)

        plt.tight_layout()
        plt.show()

In [None]:
# TODO: use '_plot_original_vs_predicted' function to check visually the reconstructed images
...

### **Evaluation**

In [None]:
# We want the difference in error between the testing set (normal) images
# and anomalous images to be as high as possible
# TODO: evaluate the model on test set and anomaly set
test_eval = ...
anomaly_eval = ...

print(f"Error on test set:{test_eval:.3f}")
print(f"Error on anomaly set:{anomaly_eval:.3f}")
print(f"Difference: {abs(test_eval - anomaly_eval):.3f}")

#### **Analysis of the reconstruction errors**

In [None]:
# TODO: compute the reconstruction error for each picture
def compute_plot_rec_error(dataset, plot=True, title="", color="tab:green"):
    r_errors = []
    # TODO: define the MSE as lambda function
    _mse = ...

    for batch_x, _ in dataset:
        # TODO: get model prediction for actual batch
        preds = ...
        
        # TODO: compute the MSE for every single image inside the batch
        actual_errors = ...
        r_errors.extend(actual_errors)

    if plot:
        plt.scatter(x=range(len(r_errors)), y=sorted(r_errors), s=3.0, c=color)
        plt.title(title)
        plt.tight_layout()
        plt.show()
    
    return r_errors

In [None]:
apple_rec_errors = compute_plot_rec_error(train_dataset_norm, title="Apple Rec. Error", color="tab:green")

In [None]:
eggplant_rec_errors = compute_plot_rec_error(anomaly_dataset_norm, title="Eggplant Rec. Error", color="tab:red")

In [None]:
# Combine them into a single plot
all_rec_errs = tuple(zip(apple_rec_errors, ["tab:green"]*len(apple_rec_errors))) + tuple(zip(eggplant_rec_errors, ["tab:red"]*len(eggplant_rec_errors)))
all_rec_errs = sorted(all_rec_errs, key=lambda x: x[0])

plt.scatter(
    x=range(len(all_rec_errs)),
    y=[x for x, _ in all_rec_errs],
    c=[y for _, y in all_rec_errs],
    s=1.0,
    )
plt.tight_layout()
plt.show()

In [None]:
appletest_rec_errors = compute_plot_rec_error(test_dataset_norm, title="Apple(test) Rec. Error", color="tab:green")

#### **Count anomalies on datasets given a threshold**

In [None]:
def count_anomalies(rec_errors, threshold, name=""):
    count_anomaly = 0

    # TODO: count items over the selected threshold
    count_anomaly = ...

    print(f"{name} anomaly {count_anomaly} over a total of {len(rec_errors)} {name}")
    print(f"{(count_anomaly/(len(rec_errors)) * 100):.2f} %")

    return count_anomaly

In [None]:
# TODO: try to define a threshold
threshold = ...

In [None]:
# Anomaly detection on apple_train samples
apple_count_anomaly = count_anomalies(apple_rec_errors, threshold, name="apple")

In [None]:
# Anomaly detection on apple_test samples
appletest_count_anomaly = count_anomalies(appletest_rec_errors, threshold, name="apple (test)")

In [None]:
# Count anomalies on eggplants
#anomaly detection in the eggplant samples
eggplant_count_anomaly = count_anomalies(eggplant_rec_errors, threshold, name="eggplant")

#### **ROC Curve**

In [23]:
import pandas as pd
from sklearn.metrics import auc,roc_curve

In [None]:
# Assign labels
# Create labels for normal and anomaly samples
apple_test_labels =  np.zeros(len(apple_rec_errors))     # normal label = 0
eggplant_test_labels = np.ones(len(eggplant_rec_errors)) # anomaly label = 1

# Put all the labels together
all_labels = np.concatenate((apple_test_labels, eggplant_test_labels))

# Put together the reconstruction errors and Target_scores
all_errors  = apple_rec_errors + eggplant_rec_errors

# Create a dataframe to store all the above information, to have everything together
# This way we can compute some statistics easily
error_df = pd.DataFrame({
        'reconstruction_error': all_errors,
        "true_class": all_labels,
        },)

display(error_df.describe())
display(error_df.head())
display(error_df.tail())

In [None]:
fpr, tpr, thresholds = roc_curve(error_df.true_class, error_df.reconstruction_error)

# AUC
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(5,4))
plt.title('Receiver Operating Characteristic')

plt.plot(fpr, tpr, label=f'AUC = {roc_auc:.4f}')
plt.legend(loc='lower right')
plt.plot([0,1],[0,1],'r--')

plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')

plt.tight_layout()
plt.show()

In [None]:
optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]

print("Optimal Threshold:", optimal_threshold)

In [None]:
_ = count_anomalies(apple_rec_errors, optimal_threshold, name="apple (opt)")
_ = count_anomalies(appletest_rec_errors, optimal_threshold, name="apple (opt)")
_ = count_anomalies(eggplant_rec_errors, optimal_threshold, name="eggplant (opt)")

In [None]:
fig, axs = plt.subplots(1, 2, figsize=(10, 6))

for i, (rec_err, title) in enumerate(zip([apple_rec_errors, eggplant_rec_errors], ["Apple", "Eggplant"])):
    axs[i].scatter(x=range(len(rec_err)), y=sorted(rec_err), s=3.0)
    axs[i].axhline(optimal_threshold, color='tab:red', linestyle="--", linewidth=.5)
    axs[i].set_title(title)
    axs[i].legend([f"{title} rec. error", "Threshold"])

plt.tight_layout()
plt.show()