# Anomaly Detection using CNN Autoencoder

### Loading Data from Google Drive

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!unzip -qo "/content/drive/MyDrive/Colab Notebooks/Corso_ML/fruits_anomaly_detection.zip"

### Import

In [None]:
import os
import keras
from keras.models import Sequential
from keras.layers import Dense, Activation, Flatten, Input
from keras.layers import Conv2D, MaxPooling2D, UpSampling2D
import matplotlib.pyplot as plt
from keras import backend as K
import numpy as np
from keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.utils import array_to_img, img_to_array, load_img
from PIL import Image, ImageChops
import random

### Create generators for training, validation and testing

- Read data from folders using ImageDataGenerator
- Generate data from the images in a folder, allows use of datasets that do not fit in main memory all at once
- Image resizing is done by the generator so a folder with any sized-images can be used

The named directory must contain one or more subfolders, path should look like `apples_train/apple_class1/img1.jpg`


- Generate batches of tensor image data with real-time data augmentation.
- A DirectoryIterator yielding tuples of (x, y) where x is a numpy array containing a batch of images with shape (batch_size, *target_size, channels) and y is a numpy array of corresponding labels.

---
`flow_from_directory(directory)`

**Description**: Takes the path to a directory, and generates batches of augmented/normalized data (Yields batches indefinitely, in an infinite loop.)

In [None]:
batch_size = 85
train_datagen = ImageDataGenerator(rescale=1./255, data_format='channels_last') # 96x96x3
train_generator = train_datagen.flow_from_directory(
    '/content/fruits_anomaly_detection/apples_train',
    target_size=(96, 96),
    batch_size=batch_size,
    class_mode='input'   # the output is the input itself
    )

test_datagen = ImageDataGenerator(rescale=1./255, data_format='channels_last')
validation_generator = test_datagen.flow_from_directory(
    '/content/fruits_anomaly_detection/apples_test/',
    target_size=(96, 96),
    batch_size=batch_size,
    class_mode='input'
    )

anomaly_generator = test_datagen.flow_from_directory(
    '/content/fruits_anomaly_detection/eggplant',
    target_size=(96, 96),
    batch_size=batch_size,
    class_mode='input'
    )
#(X, y)

Found 6416 images belonging to 13 classes.
Found 2138 images belonging to 13 classes.
Found 468 images belonging to 3 classes.


## **Autoencoder Architecture**

As we have seen in the case of MLP Autoencoder, we build a structure composed by an **Encoder**, that able to reduce the dimensions of our data (extract latent fetaures), and a **Decoder**, that is able to restore the original dimensions.

The output has to have the same structure of the input, the objective is to learn a model able to reconstruct well (producing small reconstruction error) data coming from the same distribution of the training data.

Different data (for example anomalies) should produce higher reconstruction error.

In order to inncrease the data size in the Decoder part we can use the class
**`UpSampling2D`**  https://keras.io/api/layers/reshaping_layers/up_sampling2d/


In [None]:
# Define the convolutional autoencoder model

# input shape must be the same size as the images that will be fed into it by the generators
# The output layer must be the same dimensions as the original image
model = Sequential()
#-------------------------

?
?
?

# Compile the model
model.compile(optimizer='adam', loss='')
# ... can compute the difference between the image in input and the one produced in output (the reconstructed one)

### **Training**

Define `steps_per_epoch`:
- Is the Total number of steps (batches of samples) to yield from generator
- before declaring one epoch finished and starting the next epoch.
- It should typically be equal to `ceil(num_samples / batch_size)`.

EarlyStopping callback in combination with ModelCheckpoint https://keras.io/api/callbacks/


In [None]:
# Training the model
?

# Early stopping (stops training when validation doesn't improve for {patience} epochs)
?
# Saves the best version of the model to disk (as measured on the validation data set)

# model.fit(X_train, y_train, etc..)
?

Training continues after improvement stops for the number of epochs equivalent to the 'patience' hyper-parameter

In [None]:
# To get back the model that performed best on the validation set we load the checkpointed model from disk:
model_filepath = 'image_anomaly_ae.h5'
model = keras.models.load_model(model_filepath)
model.summary()


Test the model by viewing a sample of original and reconstructed images.

A `DirectoryIterator` yielding tuples of `(x, y)` where `x` is a numpy array containing a batch of images with shape `(batch_size, *target_size, channels)` and `y` is a numpy array of corresponding labels.

### Testing

In [None]:
# Get extract some batches with the generator
data_list = []
batch_index = 0
while batch_index <= train_generator.batch_index:
    data = train_generator.next()
    data_list.append(data[0]) # just get the input batches
    ## each entry is a batch of shape (n_records,size,size,channels)=(85,96,96,3)
    batch_index = batch_index + 1

print(len(data_list))
print(data_list[0].shape)


In [None]:
# Plot some original samples vs reconstructed samples

predicted = model.predict(data_list[0])   #compute prediction for the first batch

no_of_samples = 4
_, axs = plt.subplots(no_of_samples, 2, figsize=(5, 8))
axs = axs.flatten()
imgs = []
for i in range(no_of_samples):
    imgs.append(data_list[0][i])
    imgs.append(predicted[i])

for img, ax in zip(imgs, axs):
    ax.imshow(img)
plt.show()

In [None]:
# Get images from eggplant (which we consider to be anomalous data)

# Test the model by viewing a sample of original and reconstructed images
eggplant_data_list = []
batch_index = 0
while batch_index <= anomaly_generator.batch_index:
    print("batch_index: ", batch_index, "anomaly_generator batch_index: ", anomaly_generator.batch_index)
    data = anomaly_generator.next()
    eggplant_data_list.append(data[0])
    batch_index = batch_index + 1

print(len(eggplant_data_list))
print(eggplant_data_list[0].shape)


In [None]:
# Plot some original eggplants vs reconstructed eggplants
?

In [None]:
# Get images from apple test

# Test the model by viewing a sample of original and reconstructed images
apple_test_list = []
batch_index = 0
while batch_index <= validation_generator.batch_index:
    #print("batch_index: ", batch_index, "validation_generator batch_index: ", validation_generator.batch_index)
    data = validation_generator.next()
    apple_test_list.append(data[0]) #just get the input batches
    batch_index = batch_index + 1

print(len(apple_test_list))
print(apple_test_list[0].shape)


26
(85, 96, 96, 3)


In [None]:
# Apple test samples: original vs reconstructed
?


### **Evaluation**

In [None]:
# We want the difference the difference in error between the validation (normal) images and anomalous images to be as high as possible
?

#### **Analysis of the reconstruction errors**

In [None]:
error_list = []
for idx in range(len(apple_test_list)): #iterate over batches

  r = model.predict(apple_test_list[idx])   #get prediction of batch with index idx

  #MSE
  r_error  = [np.square(apple_test_list[idx][i] - r[i]).mean() for i in range(len(apple_test_list[idx]))]
  # MSE Mean squared (reconstruction) error between the original image and the reconstructed one
  error_list.append(r_error) #error_list is a list of list: so I will flatten everything out.


error_flat_list = [item for sublist in error_list for item in sublist]

plt.scatter(x = range(len(error_flat_list)), y = sorted(error_flat_list))

In [None]:
# Get the reconstructions errors for eggplants
error_list_eggplant = []
for idx in range(len(eggplant_data_list)):
  r = model.predict(eggplant_data_list[idx])

  r_error  = [np.square(eggplant_data_list[idx][i] - r[i]).mean() for i in range(len(eggplant_data_list[idx]))]

  error_list_eggplant.append(r_error)

error_flat_list_eggplant = [item for sublist in error_list_eggplant for item in sublist]
# total_error = sum(error_flat_list_eggplant)

plt.scatter(x = range(len(error_flat_list_eggplant)), y = sorted(error_flat_list_eggplant))

#### **Count anomalies on apple test given a threshold**

In [None]:
# Anomaly detection on apple_test samples
?

apple test anomaly 20 over a total of 2138 apple test
0.009354536950420954


In [None]:
# Count anomalies on eggplants
#anomaly detection in the eggplant samples
?

eggplant anomaly 438 over a total of 468 eggplant test
0.9358974358974359


#### **ROC Curve**

In [None]:
from sklearn.metrics import (confusion_matrix, precision_recall_curve, auc,
                             roc_curve, recall_score, classification_report, f1_score,
                             precision_recall_fscore_support)
import pandas as pd

# Assign labels
# Create labels for normal and anomaly samples
apple_test_labels =  np.zeros(len(error_flat_list))    #normal label = 0
eggplant_test_labels = np.ones(len(error_flat_list_eggplant)) #anomaly label = 1

# Put all the labels together
all_labels = np.concatenate((apple_test_labels, eggplant_test_labels))

# Put together the reconstruction errors and Target_scores
all_errors  = error_flat_list + error_flat_list_eggplant

# Create a dataframe to store all the above information, to have everything together
# This way we can compute some statistics easily

error_df = pd.DataFrame({'reconstruction_error': all_errors,
                         "true_class": all_labels})
error_df.describe()
print(error_df.head())
print(error_df.tail())


In [None]:
fpr, tpr, thresholds = roc_curve(error_df.true_class, error_df.reconstruction_error)
# fpr, tpr, thresholds = roc_curve(all_labels, all_errors)

# AUC
roc_auc = auc(fpr, tpr)

plt.figure(figsize=(5,4))
plt.title('Receiver Operating Characteristic')

plt.plot(fpr, tpr, label='AUC = %0.4f'% roc_auc)
plt.legend(loc='lower right')
plt.plot([0,1],[0,1],'r--')

plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')

plt.show();

In [None]:
optimal_idx = np.argmax(tpr - fpr)
optimal_threshold = thresholds[optimal_idx]

print("Threshold ottimale:", optimal_threshold)

Threshold ottimale: 0.02499602


## **OneClass SVM for anomaly detection**

 SVMs use hyperplanes in multi-dimensional space to separate one class of observations from another. Naturally, SVM is used in solving multi-class classification problems.

However, SVM is also increasingly being used in one class problem, where all data belong to a single class. In this case, the algorithm is trained to learn what is “normal”, so that when a new data is shown the algorithm can identify whether it should belong to the group or not. If not, the new data is labeled as out of ordinary or anomaly.

In [None]:
from sklearn.svm import OneClassSVM
from sklearn.datasets import make_blobs
from numpy import quantile, where, random
import matplotlib.pyplot as plt

**Preparing the data**

We create a toy dataset for this tutorial by using the make_blob() function. We can check the dataset by visualizing it in a plot.



In [None]:
random.seed(13)
x, _ = make_blobs(n_samples=200, centers=1, cluster_std=.3, center_box=(8, 8))

plt.scatter(x[:,0], x[:,1])
plt.show()

**Defining the model and prediction**

In [None]:
# Create the model
?

# Fit the model & predict
?

# Extract the negative outputs as outliers
?

#visualize the results in a plot, highlighting with red the anomalies/outliers
plt.scatter(x[:,0], x[:,1])
plt.scatter(values[:,0], values[:,1], color='r')
plt.show()

In [None]:
# anomaly detection with scores
# We can find anomalies by using their scores.
# In this method, we'll define the model, fit it on the x data by using the fit_predict() method.
# We'll calculate the outliers according to the score value of each element.

?

print(scores)

# Get the threshold values from the scores using the quantile function
#for example, get the lowest 3-percent of score values as the anomalies
?

# Extract the anomalies by comparing the threshold value and identify the values of elements
?

#visualize the data, anomalies are colored in red
plt.scatter(x[:,0], x[:,1])
plt.scatter(values[:,0], values[:,1], color='r')
plt.show()