- This project serves to detect anomalies within images using Convolutional Autoencoders
- To determine whether an image is normal or an anomaly, 
reconstruction error and kernel density estimation (based on the vectors in the latent space) 
are used
- The bottleneck layer output
from the autoencoder is considered to be the latent space

In [None]:
# Make Necessary Imports
import tensorflow
import numpy as np
import random
import glob
import matplotlib.pyplot as plt
from tensorflow import keras
from tensorflow.keras.optimizers import Adam
from keras import layers, preprocessing, Sequential
from sklearn.neighbors import KernelDensity
from PIL import Image

Create a wrapper for TensorFlow's 'image_dataset_from_directory' for additional functionality and ease of use in managing images 

In [None]:
class CustomImageDataset:
    def __init__(self, directory, image_size, batch_size, labels):
        '''
        Parameters:
           - directory (str): The path to the directory containing the image dataset. The directory should be organized
             with subdirectories, each representing a class label and containing the images for that class.
           - image_size (tuple): The target size of the images after resizing, specified as (height, width).
           - batch_size (int): The number of images to include in each batch during training.
           - labels (str or list): Specifies whether to infer labels from the directory structure ('inferred') or 
             explicitly provide them. When set to 'inferred', subdirectory names are used as labels.
        '''
        self.dataset = tensorflow.keras.preprocessing.image_dataset_from_directory(
            directory,
            image_size=image_size,
            batch_size=batch_size,
            labels=labels
        )
        self.labels = labels

    def __iter__(self):
        return iter(self.dataset)

    def __len__(self):
        return len(self.dataset)

    def map(self, *args, **kwargs):
        return self.dataset.map(*args, **kwargs)

    def batch(self, *args, **kwargs):
        return self.dataset.batch(*args, **kwargs)

    def prefetch(self, *args, **kwargs):
        return self.dataset.prefetch(*args, **kwargs)

In [None]:
# Define a size and batch_size for later use
# The reasons for doing this are: 1) Neural Networks expect images of consistent dimensions and 2) The size of the batch determines the speed and stability of training. 
# A large batch size may smooth out differences between individual images, thereby mmaking the learning process too uniform. 
# This might cause the model to skim over important details. 
# A small batch size may mean the model updates its understanding based on just a few examples at a time. 
# This can make the learning process noisy, as small batches might not represent the overall data well
SIZE = 8 # Resizing an image to be 8 x 8 will result in significant data loss. Keeping this number small simply for speed of testing (will change to 128 X 128 later)
batch_size = 64 

- Define three generators for training, validation, and anomaly detection
- Resize the images to a specific size and group into batches for processing


*Additional Notes*:
- Labels for the images are automatically inferred based on the subdirectory names within each directory
- The train_generator is used for training a model, the validation_generator is used to evaluate the model's performance on unseen data, and the anomaly_generator is specifically set up to detect anomalies

In [None]:
train_generator = preprocessing.image_dataset_from_directory(
    r'C:\Users\awais\Downloads\archive (1)\noncloud_train',    
    image_size=(SIZE, SIZE),
    batch_size=batch_size,
    labels='inferred'
)

validation_generator = preprocessing.image_dataset_from_directory(
    r'C:\Users\awais\Downloads\archive (1)\noncloud_test',
    image_size=(SIZE, SIZE),
    batch_size=batch_size,
    labels='inferred'
)

anomaly_generator = preprocessing.image_dataset_from_directory(
    r'C:\Users\awais\Downloads\archive (1)\cloud',
    image_size=(SIZE, SIZE),
    batch_size=batch_size,
    labels='inferred'
)

**Define a rescaling layer to normalize image pixel values in a range of [0, 1]. Normalizing values ensures the gradient (partial derivative of the loss function) is not excessively large, leading to more stable and faster convergence (optimal state)**

In [None]:
rescaling_layer = layers.Rescaling(1./255)

*General Idea*:
 - 'change_inputs' function processes a batch of input images for use in an autoencoder by resizing and rescaling them to a specified size

*Intricacies*:
- Prints the shape of the original images, applies a rescaling transformation using rescaling_layer, and then resizes the images to [SIZE, SIZE] using nearest-neighbor interpolation
- The function returns a tuple (x, x) where both elements are the resized and rescaled images. In the context of an autoencoder, this means the same processed images are used as both the input and target output, aligning with the autoencoder's goal of reconstructing its input data

In [None]:
def change_inputs(images, labels):
    '''
    Parameters:
        - images: A batch of input images.
        - labels: Corresponding labels (currently not used in this function).

    Returns:
        - A tuple (x, x) where both elements are the processed images:
        - x: The resized and rescaled images, which serve as both the input and the target output for the autoencoder.
    '''
    print(f"Original images shape: {images.shape}")
    x = tensorflow.image.resize(rescaling_layer(images),[SIZE, SIZE], method=tensorflow.image.ResizeMethod.NEAREST_NEIGHBOR)
    print(f"Resized images shape: {x.shape}")
    return x, x

**Use map to apply the change_inputs function to each batch of images and labels in the datasets. This function resizes and rescales the images, so now each dataset—train_dataset, validation_dataset, and anomaly_dataset—has images that are prepared and ready for their respective tasks**

In [None]:
train_dataset = train_generator.map(change_inputs)
validation_dataset = validation_generator.map(change_inputs)
anomaly_dataset = anomaly_generator.map(change_inputs)

check_none_in_dataset checks if any images or labels in a dataset are None. It loops through each batch and prints a message if it finds any None values, returning True if it does. If everything looks good and there are no None values, it prints a confirmation message and returns False

In [None]:
def check_none_in_dataset(dataset):
    '''
    Parameters:
        dataset (tf.data.Dataset): A TensorFlow dataset object containing batches of images and labels.

    Returns:
        bool: True if any `None` values are found in the dataset; False otherwise. Prints a message indicating whether `None` values were found.
    '''
    for batch in dataset:
        images, labels = batch
        if images is None or labels is None:
            print("Found None in dataset")
            return True
    print("No None values in dataset")
    return False

# Check validation dataset
print("Checking validation dataset for None values:")
c = check_none_in_dataset(validation_dataset)
print(c)

print_labels_from_dataset iterates through a specified number of batches from a given dataset, printing the labels associated with the images in each batch

In [None]:
def print_labels_from_dataset(dataset, num_batches=1):
    '''
    Parameters:
        dataset (tf.data.Dataset): A TensorFlow dataset object containing batches of images and labels.
        num_batches (int, optional): The number of batches to process from the dataset. Defaults to 1.

    Returns:
        None: The function prints the labels and a comparison between labels and images but does not return any values.
    '''
    for images, labels in dataset.take(num_batches):
        print("Labels (should be the same as images):")
        print(labels.numpy())  # Print the labels to check if they are the expected values (not None)s
        print(labels.numpy() == images.numpy())

print("Validation Dataset Labels:")
bat = print_labels_from_dataset(validation_dataset)

print("Anomaly Dataset Labels:")
cow = print_labels_from_dataset(anomaly_dataset)

- Define a sequential model 
- Deconstruct the images by using convolutional layers and max pooling to progressively downsample the input images, capturing essential features while reducing spatial dimensions
- Reconstruct the images from the encoded features by using convolutional layers and upsampling layers to reverse the downsampling process
- Use a sigmoid activation to output images with the same shape as the input

*Extra Notes*:
Try to minimize the dimensions of the latent space/bottleneck layer
to retain as much information as possible.
The size of the latent space should be small enough to force the model 
to learn an efficient compression but large enough to preserve the essential features of the data


In [None]:
model = Sequential()
# Encoder
model.add(layers.Conv2D(64, (3, 3), activation='relu', padding='same', input_shape=(SIZE, SIZE, 3)))
model.add(layers.MaxPooling2D((2, 2), padding='same')) # reduce the spatial dimensions of the feature maps produced by layers.Conv2D by taking max value (this highlights the most important features) of every 2 x 2 window
model.add(layers.Conv2D(32, (3, 3), activation='relu', padding='same'))
model.add(layers.MaxPooling2D((2, 2), padding='same'))
model.add(layers.Conv2D(16, (3, 3), activation='relu', padding='same'))
model.add(layers.MaxPooling2D((2, 2), padding='same'))

# Decoder
model.add(layers.Conv2D(16, (3, 3), activation='relu', padding='same'))
model.add(layers.UpSampling2D((2, 2)))
model.add(layers.Conv2D(32, (3, 3), activation='relu', padding='same'))
model.add(layers.UpSampling2D((2, 2)))
model.add(layers.Conv2D(64, (3, 3), activation='relu', padding='same'))
model.add(layers.UpSampling2D((2, 2)))

model.add(layers.Conv2D(3, (3, 3), activation='sigmoid', padding='same'))

model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error', metrics=['mse'])
model.summary()

print_shapes_and_check function examines and compares the shapes of images, labels, and predictions from a dataset using a specified model

*Additional Information*:
- Processes one batch of data, prints the shapes of the images and labels, makes predictions with the model, and prints the shape of those predictions

- Checks if the shape of the predictions matches the shape of the labels, printing a boolean result to indicate whether they are the same 

In [None]:
def print_shapes_and_check(dataset, model):
    '''
    Parameters:
        dataset (tf.data.Dataset): A TensorFlow dataset object containing batches of images and labels.
        model (tf.keras.Model): A Keras model used for making predictions.

    Returns:
        None: The function prints the shapes of images, labels, and predictions, and checks for shape mismatches, but does not return any values.
    '''
    for images, labels in dataset.take(1):  # Take one batch for checking
        print(f"Images shape: {images.shape}")
        print(f"Labels shape: {labels.shape}")

        # Make predictions
        predictions = model(images, training=False)

        # Print prediction and label shapes
        print(f"Predictions shape: {predictions.shape}")

        # Check if shapes match
        label_shape_correct = labels.shape == predictions.shape
        print(f"Labels shape matches predictions shape: {label_shape_correct}")

        # If shapes are not matching, print additional debugging info
        if not label_shape_correct:
            print(f"Shape mismatch: Labels shape {labels.shape} vs. Predictions shape {predictions.shape}")

# Check training dataset
print("Checking training dataset shapes:")
print_shapes_and_check(train_dataset, model)

# Check validation dataset
print("Checking validation dataset shapes:")
print_shapes_and_check(validation_dataset, model)

# Check anomaly dataset
print("Checking anomaly dataset shapes:")
print_shapes_and_check(anomaly_dataset, model)

- evaluate_and_print_shapes function is designed to assess and report on the shapes and performance of a model on a given dataset

*Additional Information*:
- Processes one batch from the dataset, printing the shapes of the images and labels
- Checks if the shape of the predictions matches the shape of the labels
- Calculates the Mean Squared Error (MSE) between the labels and predictions, printing both the shape and value of the MSE

In [None]:
def evaluate_and_print_shapes(dataset, model):
    '''
    Paramters:
        dataset (tf.data.Dataset): A TensorFlow dataset object containing batches of images and labels.
        model (tf.keras.Model): A Keras model used for making predictions.

    Returns:
        None: The function prints the shapes of images, labels, and predictions, checks for shape mismatches, and computes the MSE, but does not return any values.
    '''
    for batch in dataset.take(1):  # Take one batch for checking
        images, labels = batch
        print(f"Batch images shape: {images.shape}")
        print(f"Batch labels shape: {labels.shape}")

        # Make predictions
        predictions = model(images, training=False)

        # Print prediction and label shapes
        print(f"Predictions shape: {predictions.shape}")

        # Check if shapes match
        label_shape_correct = labels.shape == predictions.shape
        print(f"Labels shape matches predictions shape: {label_shape_correct}")

        # If shapes are not matching, print additional debugging info
        if not label_shape_correct:
            print(f"Shape mismatch: Labels shape {labels.shape} vs. Predictions shape {predictions.shape}")

        # Compute and print the Mean Squared Error (MSE)
        mse = tensorflow.keras.losses.MeanSquaredError()
        error = mse(labels, predictions)
        print(f"Mean Squared Error shape: {error.shape}")
        print(f"Mean Squared Error value: {error.numpy()}")

# Evaluate and print shapes for validation dataset
print("Evaluating validation dataset:")
evaluate_and_print_shapes(validation_dataset, model)

# Evaluate and print shapes for anomaly dataset
print("Evaluating anomaly dataset:")
evaluate_and_print_shapes(anomaly_dataset, model)

- Train the model for 1000 epochs using train_dataset
- The shuffle=True option ensures that the training data is shuffled before each epoch to improve generalization

In [None]:
# model fitting
history = model.fit(
    train_dataset,
    steps_per_epoch = 1500 // batch_size,
    epochs = 1000,
    validation_data = validation_dataset,
    validation_steps = 225 // batch_size,
    shuffle = True
)

Plots the training and validation loss over epochs, using yellow for training loss and red for validation loss, with appropriate labels and title

In [None]:
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [None]:
# Get all batches generated by the datagen and pick a batch for prediction
# Just to test the model.
data_batch = []  # Capture all training batches as a numpy array

# Iterate over the dataset
for images, _ in train_dataset:
    data_batch.append(images.numpy())

# Convert the list of numpy arrays to a single numpy array
data_batch = np.concatenate(data_batch, axis=0)

# Predict using the model
predicted = model.predict(data_batch)

# View a couple images and recon.

image_num = random.randint(0, data_batch[0].shape[0] - 1) 
plt.figure(figsize=(12, 6))
plt.subplot(121)
plt.imshow(data_batch[0][image_num])
plt.subplot(122)
plt.imshow(predicted[image_num])
plt.show()

- Print the shape of a batch of images from both the anomaly and validation generators, stopping after the first batch from each generator
- Examine the reconstruction error in the validation data and anomaly data

In [None]:
for images, _ in anomaly_generator:
    print(f"Anomaly batch shape: {images.shape}")
    break

for images, _ in validation_generator:
    print(f"Validation batch shape: {images.shape}")
    break


anomaly_error = model.evaluate(anomaly_dataset)
validation_error = model.evaluate(validation_dataset)


# Print out the results
print(f"Recon. error for the validation data is {validation_error}")
print(f"Recon. error for the anomaly data is {anomaly_error}")

- Construct an encoder model by sequentially adding convolutional and max pooling layers, replicating the structure of the encoder portion of the previous model

*Additional Information*:
Each convolutional layer in the encoder is initialized with weights taken from corresponding layers in the other model, ensuring that the encoder starts with the same learned features as the original model. This encoder processes input images to extract their latent space representations, which can then be used for further analysis, such as calculating Kernel Density Estimates (KDE) to understand the distribution of these representations

In [None]:
encoder = Sequential()
encoder.add(layers.Conv2D(64, (3, 3), activation='relu', padding='same', use_bias=True, input_shape=(SIZE, SIZE, 3)))
x = model.layers[0].get_weights()
encoder.layers[0].set_weights(x)
encoder.add(layers.MaxPooling2D((2, 2), padding='same'))
encoder.add(layers.Conv2D(32, (3, 3), activation='relu', padding='same', use_bias=True))
y = model.layers[2].get_weights()
encoder.layers[2].set_weights(y)
encoder.add(layers.MaxPooling2D((2, 2), padding='same'))
encoder.add(layers.Conv2D(16, (3, 3), activation='relu', padding='same', use_bias=True))
z = model.layers[4].get_weights()
encoder.layers[4].set_weights(z)
encoder.add(layers.MaxPooling2D((2, 2), padding='same'))
encoder.summary()

In [None]:
#Get encoded output of input images = Latent space
encoded_images = encoder.predict(train_dataset)

# Flatten the encoder output because KDE takes 1D vectors as input
encoder_output_shape = encoder.output_shape 
out_vector_shape = encoder_output_shape[1] * encoder_output_shape[2] * encoder_output_shape[3]

- Reshape each image in encoded_images to a specified vector shape and append the reshaped images to a list
- Convert this list into a NumPy array and fit a Kernel Density Estimate (KDE) model with a Gaussian kernel and a bandwidth of 0.2 to the reshaped image vectors

In [None]:
encoded_images_vector = []

# Reshape each image and append to the list
for img in encoded_images:
    reshaped_img = np.reshape(img, out_vector_shape)
    encoded_images_vector.append(reshaped_img)

# Convert the list to a NumPy array
encoded_images_vector = np.array(encoded_images_vector)

kde = KernelDensity(kernel='gaussian', bandwidth=0.2).fit(encoded_images_vector)

- Compute the density and reconstruction error for a batch of images, using the new encoder and KDE model

*Additional Information*:
It reshapes each image, predicts its encoded form, calculates its density using KDE, and evaluates the reconstruction error by comparing the reconstructed image to the original. The function returns the average and standard deviation of both density and reconstruction error. The function is then applied to batches of images from train_generator and anomaly_generator to obtain these statistics for both uninfected and anomaly images

In [None]:
def calc_density_and_recon_error(batch_images):
    '''
    Parameters:
        batch_images (np.ndarray or tensor): A batch of images, where each image is expected to be preprocessed to match the input shape required by the encoder and model.

    Returns:
        tuple: A tuple containing four values:
            - average_density (float): The mean density score of the images in the batch.
            - stdev_density (float): The standard deviation of the density scores.
            - average_recon_error (float): The mean reconstruction error of the images in the batch.
            - stdev_recon_error (float): The standard deviation of the reconstruction errors.
    '''
    density_list=[]
    recon_error_list=[]
    for im in range(0, batch_images.shape[0]-1):
        
        img = batch_images[im][np.newaxis, :,:,:]
        encoded_img = encoder.predict([[img]]) 
        for i in range(len(encoded_img)):
            encoded_img[i] = np.reshape(encoded_img[i], out_vector_shape)
        density = kde.score_samples(encoded_img)[0] 
        reconstruction_error = model.evaluate([model.predict([[img]])],[[img]], batch_size = 1)[0]
        density_list.append(density)
        recon_error_list.append(reconstruction_error)
        
    average_density, stdev_density = np.mean(np.array(density_list)), np.std(np.array(density_list)) 
   
    average_recon_error, stdev_recon_error = np.mean(np.array(recon_error_list)), np.std(np.array(recon_error_list)) 

    
    return average_density, stdev_density, average_recon_error, stdev_recon_error


# Get average and std dev. of density and recon. error for uninfected and anomaly images. 
train_batch = next(iter(train_generator))[0]
anomaly_batch = next(iter(anomaly_generator))[0]

uninfected_values = calc_density_and_recon_error(train_batch)
anomaly_values = calc_density_and_recon_error(anomaly_batch)

- Evaluate whether a given image is an anomaly based on density and reconstruction error thresholds

*Additional Information*:
It loads and preprocesses the image by resizing it to SIZExSIZE pixels and ensuring it has three color channels. The image is then normalized and reshaped for model input. The function uses the encoder to obtain the encoded representation and calculates its density with the KDE model. It also reconstructs the image using the model and computes the reconstruction error. If the density is below a specified threshold or the reconstruction error exceeds another threshold, the function classifies the image as an anomaly; otherwise, it considers it as not an anomaly

In [None]:
def check_anomaly(img_path):
    '''
    Parameters:
        img_path (str): The file path to the image to be checked.

    Returns:
        None: Prints whether the image is an anomaly or not based on predefined thresholds.
    
    '''
    density_threshold = 11 # Set this value based on the above exercise
    reconstruction_error_threshold = 0.00014 # Set this value based on the above exercise
    img  = Image.open(img_path)
    img = np.array(img.resize((SIZE,SIZE), Image.Resampling.LANCZOS))
    if img.shape[-1] != 3:
        img = np.stack((img,) * 3, axis=-1) 
    
    plt.imshow(img)
    img = img / 255.
    img = img[np.newaxis, :,:,:]
    encoded_img = encoder.predict(img) 
    encoded_img = [np.reshape(img, (out_vector_shape)) for img in encoded_img] 
    density = kde.score_samples(encoded_img)[0] 

    reconstruction = model.predict([[img]])
    reconstruction_error = model.evaluate([reconstruction],[[img]], batch_size = 1)[0]

    if density < density_threshold or reconstruction_error > reconstruction_error_threshold:
        print("The image is an anomaly")
        
    else:
        print("The image is not an anomaly")