# Deep Learning Project Work  
<a href="https://www.kaggle.com/pirimidin/deep-learning-hw-milestone-1">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>

### Transfer Learning solution

This solution of our project work is based around transfer learning, to find out more please read the documentation.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import torch
import torchvision
import math
from skimage.metrics import structural_similarity

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
# Downloading data from the MVTecAD database - Carpet and Hazelnut categories were used

!wget https://www.mydrive.ch/shares/38536/3830184030e49fe74747669442f0f282/download/420937545-1629951845/hazelnut.tar.xz
!wget https://www.mydrive.ch/shares/38536/3830184030e49fe74747669442f0f282/download/420937484-1629951672/carpet.tar.xz
!tar -xf hazelnut.tar.xz 
!tar -xf carpet.tar.xz
!pip install piqa

In [None]:
def test_image_visualization():

  image_paths = ['./hazelnut/test/crack/000.png','./hazelnut/test/cut/006.png',
                 './hazelnut//test/hole/013.png','./carpet/test/metal_contamination/004.png',
                 './carpet/test/color/005.png', './carpet/test/thread/008.png']
  mask_paths = ['./hazelnut/ground_truth/crack/000_mask.png','./hazelnut/ground_truth/cut/006_mask.png',
                './hazelnut/ground_truth/hole/013_mask.png','./carpet/ground_truth/metal_contamination/004_mask.png',
                './carpet/ground_truth/color/005_mask.png', './carpet/ground_truth/thread/008_mask.png']

  for i in range(0,len(image_paths)):

    msk = cv2.imread(str(mask_paths[i]))
    img = cv2.imread(str(image_paths[i]))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    n_msk = mask_color_magenta(msk)
    plot_images(img, n_msk)
    
    
    
def plot_images(img, n_msk):

  plt.figure(figsize = (20,20))

  plt.subplot(131)
  plt.imshow(img)
  plt.xticks([]), plt.yticks([])

  plt.subplot(132)
  plt.imshow(n_msk)
  plt.xticks([]), plt.yticks([])

  plt.subplot(133)
  plt.imshow(img)
  plt.imshow(n_msk, alpha = 0.3)
  plt.xticks([]), plt.yticks([])
  
  plt.show()



def mask_color_magenta(img):
  # Modifies the original mask image - makes background transparent + sets the 
  # color of the segmented parts to magenta

  # separate R,G,B color channels and create a mask for the white pixels of the image
  mask = (img[:,:,0] == 255) & (img[:,:,1] == 255) & (img[:,:,2] == 255)
  # set the new color for white coloured pixels
  img[:,:,:3][mask] = [255,0,255] 
  # convert image to grayscale and "create a dimension for opacity"
  tmp = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
  _, alpha = cv2.threshold(tmp, 0, 255, cv2.THRESH_BINARY)
  # split image by color channels and append 4th 'opacity' dimension
  b, g, r = cv2.split(img)
  rgba = [b, g, r, alpha]
  n_img = cv2.merge(rgba, 4)

  return n_img

test_image_visualization()

# Model section

Our model is a pretty simple autoencoder architecture with extra added convolutional layers. This convolutional section helps us better deal with image data. We have to keep the autoencoder architecture in mind, when designing the convolutional section, because in a way that the encoder's reverse is the decoder, this way our convolutional section has to have a deconvolutional section as well, which basically means using transposed convolutional layers.

In [None]:

import torch.nn as nn

# First, let's load a pre-trained model and freeze all of the layers
model = torchvision.models.vgg19(pretrained=True)
for param in model.parameters():
    param.requires_grad = False

# Next, let's define our own autoencoder model, using the pre-trained model as the encoder
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        # Define the encoder part of the model
        self.encoder = nn.Sequential(
            # Use the first few layers of VGG-19 as the encoder
            *list(model.features.children())[:15]
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(256, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.ConvTranspose2d(64, 3, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.Dropout(0.2),
            nn.Sigmoid()
        )
        

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(f'Selected device: {device}')

### Define SSIM loss function
from piqa import SSIM

class SSIMLoss(SSIM):
    def forward(self, x, y):
        return 1. - super().forward(x, y)

### Define the loss function
loss_fn = torch.nn.MSELoss()
ssim_loss = SSIMLoss()

### Set the learning rate
lr= 0.008

### Set the random seed for reproducible results
torch.manual_seed(0)

### Initialize the autoencoder
autoencoder = Autoencoder()

### Define an optimizer 
optim = torch.optim.Adam(autoencoder.parameters(), lr=lr, weight_decay=1e-05)
### Define the scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optim, step_size=10, gamma=0.5)

# Move the autoencoder to the selected device
autoencoder.to(device)

# Dataset and Dataloader creation

In [None]:
# Define a transform function that is going to be used on our images
transform = torchvision.transforms.Compose([torchvision.transforms.Resize(32),
                                torchvision.transforms.ToTensor(),
                                #torchvision.transforms.Normalize((127.5, 127.5, 127.5), (127.5, 127.5, 127.5))
                               ])
# Load in the datasets
hazelnut_train_dataset = torchvision.datasets.ImageFolder("./hazelnut/train/",transform=transform)
hazelnut_test_dataset = torchvision.datasets.ImageFolder("./hazelnut/test/",transform=transform)

carpet_train_dataset = torchvision.datasets.ImageFolder("./carpet/train/",transform=transform)
carpet_test_dataset = torchvision.datasets.ImageFolder("./carpet/test/",transform=transform)

# Split the data into training and validation
hazelnut_test_length = int(len(hazelnut_test_dataset) * 0.8)
hazelnut_val_length = len(hazelnut_test_dataset) - hazelnut_test_length

carpet_test_length = int(len(carpet_test_dataset) * 0.8)
carpet_val_length = len(carpet_test_dataset) - carpet_test_length

hazelnut_test_dataset, hazelnut_val_dataset = torch.utils.data.random_split(hazelnut_test_dataset,[hazelnut_test_length,hazelnut_val_length],generator=torch.Generator().manual_seed(42))
carpet_test_dataset, carpet_val_dataset = torch.utils.data.random_split(carpet_test_dataset,[carpet_test_length,carpet_val_length],generator=torch.Generator().manual_seed(42))

# Define our DataLoader objects
batch_size = 32
hazelnut_train_loader  = torch.utils.data.DataLoader(hazelnut_train_dataset,batch_size=batch_size,shuffle=True)
hazelnut_test_loader  = torch.utils.data.DataLoader(hazelnut_test_dataset,batch_size=batch_size,shuffle=True)
hazelnut_val_loader  = torch.utils.data.DataLoader(hazelnut_val_dataset,batch_size=batch_size,shuffle=True)
carpet_train_loader  = torch.utils.data.DataLoader(carpet_train_dataset,batch_size=batch_size,shuffle=True)
carpet_test_loader  = torch.utils.data.DataLoader(carpet_test_dataset,batch_size=batch_size,shuffle=True)
carpet_val_loader  = torch.utils.data.DataLoader(carpet_val_dataset,batch_size=batch_size,shuffle=True)

# DataLoader for visualization
hazelnut_vis_test_loader = torch.utils.data.DataLoader(hazelnut_test_dataset,batch_size=len(hazelnut_test_dataset))
carpet_vis_test_loader = torch.utils.data.DataLoader(carpet_test_dataset,batch_size=len(hazelnut_test_dataset))

# Train, Test, Evaluation, and Visualization functions

In [None]:
def train_epoch(autoencoder, device, dataloader, loss_fn, scheduler,optimizer):
    # Set train mode for both the encoder and the decoder
    autoencoder.train()
    ssim_loss.to('cuda:0')
    train_loss = []
    train_loss_ssim = []
    # Iterate the dataloader (we do not need the label values, this is unsupervised learning)
    for image_batch, _ in dataloader: # with "_" we just ignore the labels (the second element of the dataloader tuple)
        # Move tensor to the proper device
        image_batch = image_batch.to(device)
        # Encode data
        decoded_data = autoencoder(image_batch)
        # Evaluate loss
        loss = loss_fn(decoded_data, image_batch)
        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        
        # Forward propogation
        optimizer.step()
        scheduler.step()
        
        # Print batch loss
        print('\t partial train loss (single batch): %f' % (loss.data))
        train_loss.append(loss.detach().cpu().numpy())
        

    return np.mean(train_loss)

In [None]:
def test_epoch(autoencoder, device, dataloader, loss_fn):
    # Set evaluation mode for encoder and decoder
    autoencoder.eval()
    
    with torch.no_grad(): # No need to track the gradients
        # Define the lists to store the outputs for each batch
        conc_out = []
        conc_label = []
        for image_batch, _ in dataloader:
            # Move tensor to the proper device
            image_batch = image_batch.to(device)
            # Decode data
            decoded_data = autoencoder(image_batch)
            # Append the network output and the original image to the lists
            conc_out.append(decoded_data.cpu())
            conc_label.append(image_batch.cpu())
        # Create a single tensor with all the values in the lists
        conc_out = torch.cat(conc_out)
        conc_label = torch.cat(conc_label) 
        # Evaluate global loss
        val_loss = loss_fn(conc_out, conc_label)
        
        
    return val_loss.data

In [None]:
def plot_ae_outputs(autoencoder,vis_test_loader, n=5): # n stands for the number of classes
    plt.figure(figsize=(16,4.5))
    # Get the test images and labels from our loader
    images ,targets = next(iter(vis_test_loader))
    # Group them into a dictionary with keys as possible labels
    t_idx = {i:np.where(targets==i)[0] for i in range(n)}
    # Plot for each class
    for i in range(n):
      ax = plt.subplot(2,n,i+1)
      # Get our image from the test dataset
      img = images[t_idx[i]][0].unsqueeze(0).to(device)
      autoencoder.eval()
      # Put the test image through our autoencoder
      with torch.no_grad():
         rec_img  = autoencoder(img)
      plt.imshow(img.T.cpu().squeeze().numpy(), cmap='gist_gray')
      ax.get_xaxis().set_visible(False)
      ax.get_yaxis().set_visible(False)  
      if i == n//2:
        ax.set_title('Original images')
      ax = plt.subplot(2, n, i + 1 + n)
      plt.imshow(rec_img.T.cpu().squeeze().numpy(), cmap='gist_gray')  
      ax.get_xaxis().set_visible(False)
      ax.get_yaxis().set_visible(False)  
      if i == n//2:
         ax.set_title('Reconstructed images')
    plt.show()   

In [None]:
def compute_ssim(img1, img2):
    img1 = img1.cpu().numpy()
    img2 = img2.cpu().numpy()
    # Convert the images to grayscale if they are not already
    if img1.ndim == 3:
        img1 = np.mean(img1, axis=2)
    if img2.ndim == 3:
        img2 = np.mean(img2, axis=2)

    # Compute the SSIM between the two images
    ssim = structural_similarity(img1, img2, data_range=img1.max() - img1.min(), multichannel=False)

    return ssim

# Define a function to compute the SSIM between two batches of images
def compute_batch_ssim(batch1, batch2):
    # Check that the batches have the same size
    if batch1.shape != batch2.shape:
        raise ValueError('Batch size does not match')

    # Compute the SSIM for each pair of images in the batch
    ssim_values = []
    for img1, img2 in zip(batch1, batch2):
        ssim = compute_ssim(img1, img2)
        ssim_values.append(ssim)

    # Return the average SSIM value
    return np.mean(ssim_values)

In [None]:
def compute_threshold(autoencoder, dataloader, fpr = 0.05):
    # Compute the error threshold on the trained model with training dataset.
    # The threshold is the q-th percentile of the output pixels' squared errors
    autoencoder.eval()
    
    errors = []
    ssim_errors = []
    with torch.no_grad():
        for image_batch, _ in dataloader:
            image_batch = image_batch.to(device)

            rec_img_batch = autoencoder(image_batch)

            error_batch = torch.sum((rec_img_batch - image_batch)**2, axis=1).cpu().numpy()
            ssim_error_batch = compute_batch_ssim(rec_img_batch, image_batch)
            errors.extend(error_batch)
            ssim_errors.extend(ssim_error_batch)

    threshold = np.percentile(errors, 100-fpr)
    ssim_threshold = np.percentile(ssim_errors, 100-fpr)
    return threshold, ssim_threshold

In [None]:
def predict(autoencoder, dataloader, threshold):
    # Calculates errors on the given dataset and compares it to the computed threshold (error > t.hold -> anomaly)
    # Returns a list of booleans, where each item is true if the model detected an anomaly.
    autoencoder.eval()
    predictions = []
    ssim_predictions = []
    with torch.no_grad():
        for image_batch, _ in dataloader:
            image_batch = image_batch.to(device)
            
            rec_img_batch = autoencoder(image_batch)
            
            error_batch = torch.sum((rec_img_batch - image_batch)**2, axis=1).T.cpu().numpy()
            ssim_error_batch = compute_batch_ssim(rec_img_batch, image_batch)
            ssim_pred_batch = np.any(ssim_error_batch > threshold, axis=(0,1))
            pred_batch = np.any(error_batch > threshold, axis=(0,1))
            
            predictions.extend(pred_batch)
            ssim_predictions.extend(ssim_pred_batch)
            
    return predictions, ssim_pred_batch

In [None]:
def evaluation(autoencoder, dataloader, threshold):
    # Calculates errors on the given (test) dataset and compares it to the computed threshold (error > t.hold -> anomaly)
    # Returns a list of booleans, where each item is true if the prediction is equal to the actual value
    autoencoder.eval()
   
    predictions = []
    ssim_predictions = []
    labels = []
    ssim_labels = []
    with torch.no_grad():
        for image_batch, label_batch in dataloader:
            image_batch = image_batch.to(device)
            
            rec_img_batch = autoencoder(image_batch)
            
            error_batch = torch.sum((rec_img_batch - image_batch)**2, axis=1).cpu().numpy()
            ssim_error_batch = compute_batch_ssim(rec_img_batch, image_batch)
            ssim_pred_batch = np.any(ssim_error_batch > threshold, axis=(0,1))
            pred_batch = np.any(error_batch > threshold, axis=(1,2))
            
            predictions.extend(pred_batch)
            ssim_predictions.extend(ssim_predictions)
            
            labels.extend(label_batch.cpu().numpy() != 2)
            
    return predictions,ssim_predictions, labels

In [None]:
def experimental_evaluation(autoencoder, dataset, threshold,ssim_threshold, n=10):
    # Evaluation wiyh n samples and visualization 
    autoencoder.eval()
    for i, (img, label) in enumerate(dataset):
        if i == n - 1:
            break
        
        img = img.unsqueeze(0).to(device)
        with torch.no_grad():
            rec_img = autoencoder(img)
            print(rec_img.shape)
        error = torch.sum((rec_img - img)**2, axis=0).T.cpu().squeeze().numpy()
        ssim_error = compute_ssim(rec_img,img)
        
        prediction = np.any(error > threshold)
        ssim_pred = np.any(ssim_error > threshold)
        mask = np.where(error > threshold, 255, 0)
        ssim_mask = np.where(ssim_error > threshold,255,0)
        
        _, axarr = plt.subplots(2,2, figsize=(15,15))
        axarr[0][0].imshow(img.T.cpu().squeeze().numpy(), cmap='gist_gray')
        axarr[0][1].imshow(img.T.cpu().squeeze().numpy(), cmap='gist_gray')
        axarr[0][1].imshow(mask, cmap='jet', alpha=0.5, interpolation='none')
        axarr[1][0].imshow(img.T.cpu().squeeze().numpy(), cmap='gist_gray')
        axarr[1][1].imshow(img.T.cpu().squeeze().numpy(), cmap='gist_gray')
        axarr[1][1].imshow(ssim_mask, cmap='jet', alpha=0.5, interpolation='none')
        plt.show()

        print(f"Threshold: {threshold}, SSIM_Threshold: {ssim_threshold} MaxValue: {np.max(error)} , SSIM MaxValue: {np.max(ssim_error)}")
        print(f"label: {label}, anomaly detected: {prediction}, ssim prediction: {ssim_pred}")

# Training loops

## Training on the hazelnut dataset

In [None]:
# Define the number of epochs to use for training
num_epochs = 80
# Unified loss dictionary
h_diz_loss = {'train_loss':[],'val_loss':[]}
for epoch in range(num_epochs):
    
   # Calculate training loss
   h_train_loss = train_epoch(autoencoder,device,
   hazelnut_train_loader,loss_fn,scheduler,optim)
    
   # Calculate validation loss
   h_val_loss = test_epoch(autoencoder,device,hazelnut_test_loader,loss_fn)
   print('\n EPOCH {}/{} \t train loss {} \t val loss {} '.format(epoch + 1, num_epochs,h_train_loss,h_val_loss))
   h_diz_loss['train_loss'].append(h_train_loss)
   h_diz_loss['val_loss'].append(h_val_loss)
   plot_ae_outputs(autoencoder,hazelnut_vis_test_loader,n=5)

### Hazelnut losses

In [None]:
# Plot losses for hazelnuts
plt.figure(figsize=(10,8))
plt.semilogy(h_diz_loss['train_loss'], label='Train')
plt.semilogy(h_diz_loss['val_loss'], label='Valid')
plt.xlabel('Epoch')
plt.ylabel('Average Loss')
#plt.grid()
plt.legend()
plt.title('Loss on hazelnuts')
plt.show()

### Hazelnut experimental evaluation

In [None]:
threshold, ssim_threshold = compute_threshold(autoencoder, hazelnut_train_dataset)
print(f"Treshhold: {threshold}")
print(f"SSIM Treshhold: {ssim_threshold}")

Calculate the accuracy of the anomaly prediction on the test dataset, and visualize some samples

In [None]:
results, ssim_results = evaluation(autoencoder, hazelnut_test_dataset, threshold, ssim_threshold)
accuracy = sum(results) / len(results)
ssim_acc = sum(ssim_results) / len(ssim_results)
print(f"ACC: {accuracy}, SSIM ACC: {ssim_acc}")

In [None]:
experimental_evaluation(autoencoder, hazelnut_test_dataset, threshold, ssim_threshold)