- This project serves to locate anomalies within images using a deep learning approach
- To identify anomalous regions, pre-trained convolutional neural networks (VGG16), heatmap generation, and peak detection within those heatmaps are      leveraged
- The model first classifies the entire image and subsequently locates the anomalous regions by analyzing activations in the final convolutional layers. Peaks in the heatmap, generated from weighted contributions of the convolutional filters, pinpoint the exact areas of the anomaly

In [None]:
# Make Necessary Imports
import os
import cv2
import random
import scipy
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from PIL import Image
from matplotlib.patches import Rectangle 
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from skimage.feature.peak import peak_local_max 

import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense
from tensorflow.keras.applications import vgg16
from tensorflow.keras.optimizers import Adam, SGD
from tensorflow.keras.utils import to_categorical

In [None]:
# locate and read images
outer_image_dir = f'{directory_1}/'
SIZE, dataset, label = 224, [], []
anomaly_images = os.listdir(outer_image_dir + f'{path_to_infected_images}/')
uninfected_images = os.listdir(outer_image_dir + f'{path_to_uninfected_images}/')

1. Iterate over each file in the directory and check if the file extension is 'png'   
2. Read the image using OpenCV and convert the image to a PIL Image object               
3. Resize the image to the specified SIZE and append the image (as a numpy array) to the dataset list     
4. Append the label '1' to the label list (assuming '1' indicates an anomaly) 

In [None]:
for i, image_name in enumerate(anomaly_images):
    if (image_name.split('.')[1] == 'png'):
        image = cv2.imread(outer_image_dir + f'{path_to_infected_images}/' + image_name)
        image = Image.fromarray(image, 'RGB')
        image = image.resize((SIZE, SIZE))
        dataset.append(np.array(image))
        label.append(1)

Perform testing code to ensure output is consistent with expectations

In [None]:
# Print the first image in the dataset and its label
if dataset:
    print(f'First image shape: {dataset[0].shape}')
    print(f'First label: {label[0]}')
else:
    print('No images found or loaded')


# Check the length of dataset and label lists to ensure they match
print(f'Total images loaded: {len(dataset)}')
print(f'Total labels: {len(label)}')


# Ensure the dataset contains only numpy arrays of the specified size
for img in dataset:
    assert isinstance(img, np.ndarray), "Dataset contains non-numpy array elements"
    assert img.shape == (SIZE[1], SIZE[0], 3), "Image size mismatch"

print("All images are loaded and verified successfully.")


1. Iterate over each file in the directory and check if the file extension is 'png'   
2. Read the image using OpenCV and convert the image to a PIL Image object               
3. Resize the image to the specified SIZE and append the image (as a numpy array) to the dataset list     
4. Append the label '0' to the label list (assuming '0' indicates a non-anomaly)   

In [None]:
for i, image_name in enumerate(uninfected_images):
    if (image_name.split('.')[1] == 'png'):
        image = cv2.imread(outer_image_dir + f'{path_to_uninfected_images}/' + image_name)
        image = Image.fromarray(image, 'RGB')
        image = image.resize((SIZE, SIZE))
        dataset.append(np.array(image))
        label.append(0)
        
dataset = np.array(dataset)
label = np.array(label)

Convert lists to numpy arrays  

In [None]:
dataset = np.array(dataset)
label = np.array(label)

1. Split the data into train and test data sets
2. Perform normalization to ensure convergence occurs

In [None]:
X_train, X_test, y_train, y_test = train_test_split(dataset, label, test_size = 0.20, random_state = 0)

X_train = X_train/255.
X_test = X_test/255.

Converting labels to categorical format allows the model to match the label structure to the output layer (e.g., one-hot encoded format), which is necessary for using certain loss functions like categorical_crossentropy and enables correct training and evaluation in classification tasks

In [None]:
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)

In [None]:
# Use pre-trained VGG16 layers, GlobalAveragePooling and dense prediction layers to build a model
def model_o(input_shape=(SIZE, SIZE, 3)):
    
   vgg = vgg16.VGG16(weights='imagenet', include_top=False, input_shape=input_shape)

   for layer in vgg.layers:
       layer.trainable = True
       
   # Get the output tensor from the last layer of the VGG16 model 
   # And reduce the spatial dimensions of the feature map by computing 
   # The average of all values in each feature map
   x = vgg.output
   x = GlobalAveragePooling2D()(x)  
   
   
   # Add a fully connected (Dense) layer with 2 units
   # This layer outputs the final predictions, with each unit representing a class
   x = Dense(2, activation="softmax")(x) 
   
   
   # Create a new model by specifying the input tensor (vgg.input) and the output tensor (x)
   # This combines the pre-trained VGG16 convolutional base with the new fully connected layer for classification
   model = Model(vgg.input, x)
   model.compile(loss="categorical_crossentropy", 
                 optimizer=SGD(learning_rate=0.0001, momentum=0.9), metrics=["accuracy"])
   
   return model



model = get_model(input_shape = (SIZE,SIZE,3))
print(model.summary())

The training is done on X_train and y_train with validation using X_test and y_test. After training, it extracts the loss and accuracy metrics for both the training and validation sets from the history object, which stores the results of each epoch. The code then generates two plots: one showing the training and validation loss over epochs, and another showing the training and validation accuracy over epochs

In [None]:
history = model.fit(X_train, 
                    y_train, 
                    batch_size=16, 
                    epochs=30, 
                    verbose = 1, 
                    validation_data=(X_test,y_test)
                    )


#plot the training and validation accuracy and loss at each epoch
loss = history.history['loss']
val_loss = history.history['val_loss']
epochs = range(1, len(loss) + 1)
plt.plot(epochs, loss, 'y', label='Training loss')
plt.plot(epochs, val_loss, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.show()


acc = history.history['accuracy']
val_acc = history.history['val_accuracy']
plt.plot(epochs, acc, 'y', label='Training acc')
plt.plot(epochs, val_acc, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()

In [None]:
#Check model accuracy on the test data
_, acc = model.evaluate(X_test, y_test)
print("Accuracy = ", (acc * 100.0), "%")


img = X_test[5]
plt.imshow(img)
input_img = np.expand_dims(img, axis=0) 
print("The prediction for this image is: ", np.argmax(model.predict(input_img)))
print("The actual label for this image is: ", np.argmax(y_test[n]))


# Display the confusion matrix as a heatmap
y_pred = np.argmax(model.predict(y_test, axis=1), y_pred)
sns.heatmap(cm, annot=True)


#Identify all images classified as anomalies
anomaly_image_dataset = np.where(y_pred==1)[0]


def plot_heatmap(img):
    pred = model.predict(np.expand_dims(img, axis=0))
    pred_class = np.argmax(pred)
    #Get weights for all classes from the prediction layer
    last_layer_weights = model.layers[-1].get_weights()[0] #Prediction layer
    #Get weights for the predicted class.
    last_layer_weights_for_pred = last_layer_weights[:, pred_class]
    #Get output from the last conv. layer
    last_conv_model = Model(model.input, model.get_layer("block5_conv3").output)
    last_conv_output = last_conv_model.predict(img[np.newaxis,:,:,:])
    last_conv_output = np.squeeze(last_conv_output)
    
    #Upsample/resize the last conv. output to same size as original image
    h = int(img.shape[0]/last_conv_output.shape[0])
    w = int(img.shape[1]/last_conv_output.shape[1])
    upsampled_last_conv_output = scipy.ndimage.zoom(last_conv_output, (h, w, 1), order=1)
    
    heat_map = np.dot(upsampled_last_conv_output.reshape((img.shape[0]*img.shape[1], 512)), 
                 last_layer_weights_for_pred).reshape(img.shape[0],img.shape[1])
    
    #Since we have a lot of dark pixels where the edges may be thought of as 
    #high anomaly, let us drop all heat map values in this region to 0.
    #This is an optional step based on the image. 
    heat_map[img[:,:,0] == 0] = 0  #All dark pixels outside the object set to 0
    
    #Detect peaks (hot spots) in the heat map. We will set it to detect maximum 5 peaks.
    #with rel threshold of 0.5 (compared to the max peak). 
    peak_coords = peak_local_max(heat_map, num_peaks=5, threshold_rel=0.5, min_distance=10) 

    plt.imshow(img.astype('float32').reshape(img.shape[0],img.shape[1],3))
    plt.imshow(heat_map, cmap='jet', alpha=0.30)
    for i in range(0,peak_coords.shape[0]):
        print(i)
        y = peak_coords[i,0]
        x = peak_coords[i,1]
        plt.gca().add_patch(Rectangle((x-25, y-25), 50,50,linewidth=1,edgecolor='r',facecolor='none'))
        
        

im = random.randint(0,predicted_as_para.shape[0]-1)
heat_map =plot_heatmap(predicted_as_para[im])

img = predicted_as_para[im]
plt.imshow(predicted_as_para[im])