# Dogs vs Cats

To classify whether images contain either a dog or a cat.

**Link:** [https://www.kaggle.com/datasets/salader/dogs-vs-cats]

## Initialization

In [10]:
# For tensor computation with strong GPU acceleration and deep learning
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torchvision import datasets, models, transforms
from torch.utils.data import DataLoader, Dataset
from torchinfo import summary

# Using numpy
import numpy as np

# For data load or save
import pandas as pd

# For visualization
import matplotlib.pyplot as plt
import seaborn as sns

# For operating system dependent functionality
import os,shutil

# For k-folds cross-validation
from sklearn.model_selection import StratifiedKFold

# To unzip datasets
from zipfile import ZipFile

# To generate pseudo-random numbers
import random

# Python Imaging Library (PIL) for added image processing capabilities 
from PIL import Image

# To make loops show a progress meter
from tqdm.auto import tqdm

# To manage colorbar in subplots
from mpl_toolkits.axes_grid1 import make_axes_locatable

In [11]:
# Running this (by clicking run or pressing Shift+Enter) will list the files in the current directory
print("Current directory: ",os.listdir())

Current directory:  ['.DS_Store', 'Lab9_TO_DO.ipynb', 'Lab9.pptx']


In [12]:
## These lines of code must be run only once after dataset download

# Unzipping the dataset
with ZipFile('archive.zip','r') as zip_ref:
    zip_ref.extractall()

# Path of sources
path_train_cats = "train/cats/"
path_train_dogs = "train/dogs/"
path_test_cats = "test/cats/"
path_test_dogs = "test/dogs/"

# Path of destination
path_dst = "dataset"

# Destination folder creation]
os.mkdir(path_dst)

# Iterate on all files to move them to destination folder
# Cats from train dir
allfiles = os.listdir(path_train_cats)
for f in allfiles:
    src = os.path.join(path_train_cats,f)
    dst = os.path.join(path_dst,f)
    shutil.move(src,dst)
# Dogs from train dir
allfiles = os.listdir(path_train_dogs)
for f in allfiles:
    src = os.path.join(path_train_dogs,f)
    dst = os.path.join(path_dst,f)
    shutil.move(src,dst)
# Cats from test dir
allfiles = os.listdir(path_test_cats)
for f in allfiles:
    src = os.path.join(path_test_cats,f)
    dst = os.path.join(path_dst,f)
    shutil.move(src,dst)
# Dogs from test dir
allfiles = os.listdir(path_test_dogs)
for f in allfiles:
    src = os.path.join(path_test_dogs,f)
    dst = os.path.join(path_dst,f)
    shutil.move(src,dst)
    
# Removing all unnecessary files
shutil.rmtree("dogs_vs_cats")
shutil.rmtree("train")
shutil.rmtree("test")

FileNotFoundError: [Errno 2] No such file or directory: 'archive.zip'

In [None]:
# Creating lists for dataframe construction
allfiles = os.listdir("dataset")

# Extracting labels from filenames
labels = []
for filename in allfiles:
    # 0 for 'cat', 1 for 'dog'
    if filename.split(".")[0] == "cat":
        labels.append(0)
    elif filename.split(".")[0] == "dog":
        labels.append(1)

print("Images count = ",len(allfiles),"\nLabels count = ",len(labels))

df = pd.DataFrame(columns=["filename","label"])
df["filename"] = allfiles
df["label"] = labels

print("Some items from the dataframe just created:\n",df)

In [None]:
# Barplot to visualize the distribution of the labels 
fig = plt.figure(figsize=(5,4))
sns.set_style("white")
sns.countplot(data=df,x="label")

In [None]:
## Performing a stratified k-folds subdivision
#skf = StratifiedKFold(n_splits=10,shuffle=True,random_state=42)
#skf.get_n_splits(df.filename,df.label)
#
## Storing indices to recover the splits in a second time
#train_idx = []
#val_idx = []
#for i,(train,val) in enumerate(skf.split(df.filename,df.label)):
#    train_idx.append(train)
#    val_idx.append(val)
#    print("Fold: ",i)
#    print("\nTrain: index = ",train)
#    print("\nValidation:  index = ",val,"\n")
#
## Storing in csv files    
#train_splits = pd.DataFrame(columns=["train_0","train_1","train_2","train_3","train_4","train_5","train_6","train_7","train_8","train_9"])
#val_splits = pd.DataFrame(columns=["val_0","val_1","val_2","val_3","val_4","val_5","val_6","val_7","val_8","val_9"])
#train_splits["train_0"] = train_idx[0]
#val_splits["val_0"] = val_idx[0]
#train_splits["train_1"] = train_idx[1]
#val_splits["val_1"] = val_idx[1]
#train_splits["train_2"] = train_idx[2]
#val_splits["val_2"] = val_idx[2]
#train_splits["train_3"] = train_idx[3]
#val_splits["val_3"] = val_idx[3]
#train_splits["train_4"] = train_idx[4]
#val_splits["val_4"] = val_idx[4]
#train_splits["train_5"] = train_idx[5]
#val_splits["val_5"] = val_idx[5]
#train_splits["train_6"] = train_idx[6]
#val_splits["val_6"] = val_idx[6]
#train_splits["train_7"] = train_idx[7]
#val_splits["val_7"] = val_idx[7]
#train_splits["train_8"] = train_idx[8]
#val_splits["val_8"] = val_idx[8]
#train_splits["train_9"] = train_idx[9]
#val_splits["val_9"] = val_idx[9]
#train_splits.to_csv("train_splits.csv") # save the values in a csv
#val_splits.to_csv("val_splits.csv") # save the values in a csv

In [7]:
# Recovering train and validation splits
train_idx = pd.read_csv("train_splits.csv")
val_idx = pd.read_csv("val_splits.csv")

In [None]:
# Setup device-agnostic code
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device: ",device)

# Images inspection

In [None]:
# Setting the seed to allow reproducibility
random.seed(42) 

# Getting a random image path
random_image_path = random.choice(df.filename)

# Opening the image
img = Image.open("dataset/"+random_image_path)

# Printing metadata
print("Random image path: ",random_image_path)
print("\nImage class: ",df[df.filename == random_image_path].label.item())
print("\nImage height: ",img.height) 
print("\nImage width: ",img.width)
img

In [None]:
# Turning the image into an array
img_as_array = np.asarray(img)

# Plotting the image with matplotlib
plt.figure(figsize=(8, 6))
plt.imshow(img_as_array)
title_str = "Image class: "+str(df[df.filename == random_image_path].label.item())+"\nImage shape: "+str(img_as_array.shape)+"\n-> (height, width, channels)"
plt.title(title_str)
plt.axis(False)

In [None]:
## These lines of code take a while

# Finding all the unique shapes of the images inside the dataset
shape_arr = []

for i in range(0,df.shape[0]):
    with Image.open("dataset/"+df.iloc[i].filename) as img:
        width,height = img.size
        shape_arr.append((width,height))

print("The unique shapes of the images in the dataset are: ",np.unique(shape_arr,axis= 0).__len__())

In [None]:
print("The biggest shape is: ",np.max(shape_arr,axis= 0))

In [None]:
print("The smallest shape is: ",np.min(shape_arr, axis= 0))

In [None]:
### The following lines of code take a lot of time; uncomment to test
#
## Retrieving the overall mean for subsequent normalization purposes
#r_mean_arr = []
#g_mean_arr = []
#b_mean_arr = []
#
#for i in range(0,df.shape[0]):
#    img_path = "dataset/"+df.iloc[i].filename
#    with Image.open(img_path) as img:
#        img_np = np.array(img.getdata()).reshape(img.size[0],img.size[1],3)
#        r_mean,g_mean,b_mean = np.mean(img_np,axis=(0,1))
#        r_mean_arr.append(r_mean)
#        g_mean_arr.append(g_mean)
#        b_mean_arr.append(b_mean)
#
#R_MEAN = np.mean(r_mean_arr) / 255
#G_MEAN = np.mean(g_mean_arr) / 255
#B_MEAN = np.mean(b_mean_arr) / 255
#
#RGB_df = pd.DataFrame(columns= ["R_MEAN","G_MEAN","B_MEAN"])
#RGB_df["R_MEAN"] = [R_MEAN]
#RGB_df["G_MEAN"] = [G_MEAN]
#RGB_df["B_MEAN"] = [B_MEAN]
#RGB_df.to_csv("RGB_df.csv") # save the values in a csv

In [None]:
# Recovering the overall mean from csv
RGB_df = pd.read_csv("RGB_df.csv")
print("Red ch mean = ",RGB_df.iloc[0].R_MEAN.item(),"\nGreen ch mean = ",RGB_df.iloc[0].G_MEAN.item(),"\nBlue ch mean = ",RGB_df.iloc[0].B_MEAN.item())

In [None]:
### The following lines of code take a lot of time; uncomment to test
##
## Retrieving the overall standard deviation for subsequent normalization purposes
#r_std_arr = []
#g_std_arr = []
#b_std_arr = []
#
#for i in range(0,df.shape[0]):
#    img_path = "dataset/"+df.iloc[i].filename    
#    with Image.open(img_path) as img:
#        img_np = np.array(img.getdata()).reshape(img.size[0],img.size[1],3)
#        r_std,g_std,b_std = np.std(img_np,axis=(0,1))
#        r_std_arr.append(r_std)
#        g_std_arr.append(g_std)
#        b_std_arr.append(b_std)
#
#R_STD = np.mean(r_std_arr) / 255
#G_STD = np.mean(g_std_arr) / 255
#B_STD = np.mean(b_std_arr) / 255
#
#RGB_std_df = pd.DataFrame(columns= ["R_STD","G_STD","B_STD"])
#RGB_std_df["R_STD"] = [R_STD]
#RGB_std_df["G_STD"] = [G_STD]
#RGB_std_df["B_STD"] = [B_STD]
#RGB_std_df.to_csv("RGB_std_df.csv") # save the values in a csv

In [None]:
# Recovering the overall standard deviation from csv
RGB_std_df = pd.read_csv("RGB_std_df.csv")
print("Red ch mean = ",RGB_std_df.iloc[0].R_STD.item(),"\nGreen ch mean = ",RGB_std_df.iloc[0].G_STD.item(),"\nBlue ch mean = ",RGB_std_df.iloc[0].B_STD.item())

# Dataset standardization

In [16]:
# Defining the transformations on images

IMAGE_WIDTH=224
IMAGE_HEIGHT=224
IMAGE_SIZE=(IMAGE_WIDTH,IMAGE_HEIGHT)
R_MEAN = RGB_df.iloc[0].R_MEAN.item()
G_MEAN = RGB_df.iloc[0].G_MEAN.item()
B_MEAN = RGB_df.iloc[0].B_MEAN.item()
R_STD = RGB_std_df.iloc[0].R_STD.item()
G_STD = RGB_std_df.iloc[0].G_STD.item()
B_STD = RGB_std_df.iloc[0].B_STD.item()

data_transform = transforms.Compose([
    # Resizing the images to IMAGE_SIZE x IMAGE_SIZE 
    transforms.Resize(size=IMAGE_SIZE),
    # Turning the image into a torch.Tensor
    transforms.ToTensor(), # this also converts all pixel values from 0 to 255 to be between 0.0 and 1.0 
    # Normalizing to the overall mean and std
    transforms.Normalize(mean=[R_MEAN,G_MEAN,B_MEAN],std=[R_STD,G_STD,B_STD])
])

In [None]:
# Effect of transformation on one image of example

# Setting the seed to allow reproducibility
random.seed(42) 

# Getting a random image path
random_image_path = random.choice(df.filename)

# Opening the image
img = Image.open("dataset/"+random_image_path)

# Plotting the original image
fig,ax = plt.subplots(1,2)
ax[0].imshow(img)
title_str = "Original \nSize: "+str(img.size)
ax[0].set_title(title_str)
ax[0].axis("off")

# Transform and plot image
# Note: permute() will change shape of image to suit matplotlib 
# (PyTorch default is [C, H, W] but Matplotlib is [H, W, C])
transformed_image = data_transform(img).permute(1, 2, 0) 
ax[1].imshow(transformed_image) 
title_str = "Transformed \nSize: "+str(transformed_image.shape)
ax[1].set_title(title_str)
ax[1].axis("off")

In [18]:
# Creating custom Dataset
class DogsCatsData(Dataset):
    def __init__(self,x,y,transform=None):
        self.x = x.reset_index()
        self.y = y.reset_index()
        self.transform = transform        
    def __len__(self):
        return self.x.shape[0]
    def load_image(self,path):
        prefix = "dataset/"
        return Image.open(os.path.join(prefix,path))
    def __getitem__(self,index):
        image = self.load_image(self.x.iloc[index].filename)
        label = self.y.iloc[index].label
        if self.transform:
            image = self.transform(image)
        sample = {"image":image,"label":label}
        return sample

In [19]:
## Keeping the first split of the 10-folds cross-validation
## x and y of the new DataSet correspond to filenames and labels, respectively
train_data = DogsCatsData(df.iloc[train_idx["train_0"].values]["filename"],df.iloc[train_idx["train_0"].values]["label"],transform=data_transform)
val_data = DogsCatsData(df.iloc[val_idx["val_0"].values]["filename"],df.iloc[val_idx["val_0"].values]["label"],transform=data_transform)

In [None]:
# Check the lengths
print("The lengths of the training and validation sets: ",len(train_data),len(val_data))

# Loading data

In [21]:
# Turn train and validation Datasets into DataLoaders
trainloader = DataLoader(train_data,batch_size=32)
validationloader = DataLoader(val_data,batch_size=32)

In [None]:
# Visualizing the content of the first training batch
for i_batch,sample_batched in enumerate(trainloader):
    if i_batch == 1:
        break
    fig,axs = plt.subplots(ncols=8,nrows=4,figsize=(15, 8))
    axs = axs.flatten()
    for i in range(len(sample_batched["image"])):
        axs[i].imshow(sample_batched["image"][i].permute(1,2,0))
        axs[i].set_title(str(sample_batched["label"][i]))
        axs[i].axis("off")

# Model building

In [23]:
# Creating a CNN-based image classifier.
class ImageClassifier(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv_layer_1 = nn.Sequential(
            nn.Conv2d(3, 64, 3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2))
        self.conv_layer_2 = nn.Sequential(
            nn.Conv2d(64, 512, 3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(512),
            nn.MaxPool2d(2))
        self.conv_layer_3 = nn.Sequential(
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(512),
            nn.MaxPool2d(2)) 
        self.classifier = nn.Sequential(
            nn.Flatten(),
            nn.Linear(in_features=512*3*3, out_features=2))
    def forward(self, x: torch.Tensor):
        x = self.conv_layer_1(x)
        x = self.conv_layer_2(x)
        x = self.conv_layer_3(x)
        x = self.conv_layer_3(x)
        x = self.conv_layer_3(x)
        x = self.conv_layer_3(x)
        x = self.classifier(x)
        return x

# Instantiating an object.
model = ImageClassifier().to(device)

In [None]:
# Getting a single image from the batch
for i_batch,sample_batched in enumerate(trainloader):
    if i_batch == 1:
        break
    img_single = sample_batched["image"][0].unsqueeze(dim=0)
    label_single = sample_batched["label"][0]
print("Single image shape: ",img_single.shape,"\n")

# Performing a forward pass on a single image
model.eval()
with torch.inference_mode():
    pred = model(img_single.to(device))
    
# Printing out what's happening and convert model logits -> pred probs -> pred label
print("Output logits:\n",pred,"\n")
print("Output prediction probabilities:\n",torch.softmax(pred, dim=1),"\n")
print("Output prediction label:\n",torch.argmax(torch.softmax(pred, dim=1), dim=1),"\n")
print("Actual label:\n",label_single)

In [None]:
# Doing a test pass through of an example input size 
summary(model,input_size=[1,3,IMAGE_WIDTH,IMAGE_HEIGHT]) 

In [None]:
# Defining the operations to do in a training step
def train_step(model:torch.nn.Module, 
               dataloader:torch.utils.data.DataLoader, 
               loss_fn:torch.nn.Module, 
               optimizer:torch.optim.Optimizer):    
    # Put model in train mode
    model.train()
    # Setup train loss and train accuracy values
    train_loss,train_acc = 0,0
    # Loop through DataLoader batches
    for batch,sample_batched in enumerate(dataloader):
        # Send data to target device
        X = sample_batched["image"].to(device)
        y = sample_batched["label"].to(device)
        # Forward pass
        y_pred = model(X)
        # Calculate  and accumulate loss
        loss = loss_fn(y_pred,y)
        train_loss += loss.item() 
        # Optimizer zero grad
        optimizer.zero_grad()
        # Loss backward
        loss.backward()
        # Optimizer step
        optimizer.step()
        # Calculate and accumulate accuracy metric across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred,dim=1),dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)
    # Adjust metrics to get average loss and accuracy per batch 
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss,train_acc

In [None]:
# Defining the operations to do in a validation step
def val_step(model:torch.nn.Module, 
              dataloader:torch.utils.data.DataLoader, 
              loss_fn:torch.nn.Module):    
    # Put model in eval mode
    model.eval() 
    # Setup validation loss and validation accuracy values
    val_loss,val_acc = 0,0
    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch,sample_batched in enumerate(dataloader):
            # Send data to target device
            X = sample_batched["image"].to(device)
            y = sample_batched["label"].to(device)            
            # Forward pass
            val_pred_logits = model(X)
            # Calculate and accumulate loss
            loss = loss_fn(val_pred_logits, y)
            val_loss += loss.item()
            # Calculate and accumulate accuracy
            val_pred_labels = val_pred_logits.argmax(dim=1)
            val_acc += ((val_pred_labels == y).sum().item()/len(val_pred_labels))
    # Adjust metrics to get average loss and accuracy per batch 
    val_loss = val_loss / len(dataloader)
    val_acc = val_acc / len(dataloader)
    return val_loss,val_acc

In [None]:
# Taking in various parameters required for training and validation steps
def train(model:torch.nn.Module, 
          train_dataloader:torch.utils.data.DataLoader, 
          val_dataloader:torch.utils.data.DataLoader, 
          optimizer:torch.optim.Optimizer,
          loss_fn:torch.nn.Module = nn.CrossEntropyLoss(),
          epochs:int = 5,
          split:int = 0):
    # Create empty results dictionary
    results = {"train_loss": [],
        "train_acc": [],
        "val_loss": [],
        "val_acc": []
    }
    # Instantiating the best validation accuracy
    best_val = 0
    # Loop through training and validation steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        val_loss, val_acc = val_step(model=model,
            dataloader=val_dataloader,
            loss_fn=loss_fn) 
        # Saving the model obtaining the best validation accuracy through the epochs
        if val_acc > best_val:
            best_val = val_acc
            checkpoint = {"model": ImageClassifier(),
                          "state_dict": model.state_dict(),
                          "optimizer": optimizer.state_dict()}
            checkpoint_name = "checkpoint_"+str(split)+".pth"
            torch.save(checkpoint, checkpoint_name)    
        else:
            continue
        # Print out what's happening
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"val_loss: {val_loss:.4f} | "
            f"val_acc: {val_acc:.4f}"
        )
        # Update results dictionary
        results["train_loss"].append(train_loss)
        results["train_acc"].append(train_acc)
        results["val_loss"].append(val_loss)
        results["val_acc"].append(val_acc)
    # Return the filled results at the end of the epochs
    return results

# Model training

In [None]:
### These lines take a lot of time; you will load the trained model in the following 
#
## Set random seeds
#torch.manual_seed(42) 
#torch.cuda.manual_seed(42)
#
## Set number of epochs
#NUM_EPOCHS = 50
#
## Setup loss function and optimizer
#loss_fn = nn.CrossEntropyLoss()
#optimizer = torch.optim.Adam(params=model.parameters(), lr=1e-3)
#
## Start the timer
#from timeit import default_timer as timer 
#start_time = timer()
#
## Train model_0 
#model_results = train(model=model,
#                      train_dataloader=trainloader,
#                      val_dataloader=validationloader,
#                      optimizer=optimizer,
#                      loss_fn=loss_fn,
#                      epochs=NUM_EPOCHS,
#                      split=0)
#
## End the timer and print out how long it took
#end_time = timer()
#print(f"Total training time: {end_time-start_time:.3f} seconds")

# Model evaluation

In [None]:
### These are the results from the previous step; you will load the results in the following
#
## Extract train and validation loss and accuracy at each epoch 
#results = dict(list(model_results.items()))
#
## Get the loss values of the results dictionary (training and validation)
#train_loss = results["train_loss"]
#val_loss = results["val_loss"]
#
## Get the accuracy values of the results dictionary (training and validation)
#train_acc = results["train_acc"]
#val_acc = results["val_acc"]
#
## Figure out how many epochs there were
#epochs = range(len(results["train_loss"]))

In [None]:
## Save results in a csv
#results_df = pd.DataFrame(columns= ["train_loss","val_loss","train_acc","val_acc","epochs"])
#results_df["train_loss"] = train_loss
#results_df["val_loss"] = val_loss
#results_df["train_acc"] = train_acc
#results_df["val_acc"] = val_acc
#results_df["epochs"] = epochs
#results_df_name = "results_df_"+str(0)+".csv"
#results_df.to_csv(results_df_name)

In [26]:
# Loading training and validation results from csv
results_from_csv = pd.read_csv("results_df_0.csv")

In [None]:
# Setup a plot 
plt.figure(figsize=(15,7))

# Plot loss
plt.subplot(1,2,1)
plt.plot(results_from_csv["epochs"],results_from_csv["train_loss"],label="train_loss")
plt.plot(results_from_csv["epochs"],results_from_csv["val_loss"],label="val_loss")
plt.title("Loss")
plt.xlabel("Epochs")
plt.legend()

# Plot accuracy
plt.subplot(1,2,2)
plt.plot(results_from_csv["epochs"],results_from_csv["train_acc"],label="train_accuracy")
plt.plot(results_from_csv["epochs"],results_from_csv["val_acc"],label="val_accuracy")
plt.title("Accuracy")
plt.xlabel("Epochs")
plt.legend()

# Cross-validation

In [None]:
### You can find the model and result corresponding to each fold in the files inside the directory
#
## Continue the training and validation of the model for all the other folds
#for i in range(1,10):
#    # Keeping the split
#    train_str = "train_"+str(i)
#    val_str = "val_"+str(i)
#    train_data = DogsCatsData(df.iloc[train_idx[train_str].values]["filename"],df.iloc[train_idx[train_str].values]["label"],transform=data_transform)
#    val_data = DogsCatsData(df.iloc[val_idx[val_str].values]["filename"],df.iloc[val_idx[val_str].values]["label"],transform=data_transform)
#    # Turn train and validation Datasets into DataLoaders
#    trainloader = DataLoader(train_data,batch_size=32)
#    validationloader = DataLoader(val_data,batch_size=32)
#    # Start the timer
#    from timeit import default_timer as timer 
#    start_time = timer()
#    # Train model 
#    model_results = train(model=model,
#                          train_dataloader=trainloader,
#                          val_dataloader=validationloader,
#                          optimizer=optimizer,
#                          loss_fn=loss_fn,
#                          epochs=NUM_EPOCHS,
#                          split=i)
#    # End the timer and print out how long it took
#    end_time = timer()
#    print(f"Total training time for split {i}: {end_time-start_time:.3f} seconds")
#    # Extract train and validation loss and accuracy at each epoch 
#    results = dict(list(model_results.items()))
#    # Get the loss values of the results dictionary (training and validation)
#    train_loss = results["train_loss"]
#    val_loss = results["val_loss"]
#    # Get the accuracy values of the results dictionary (training and validation)
#    train_acc = results["train_acc"]
#    val_acc = results["val_acc"]
#    # Figure out how many epochs there were
#    epochs = range(len(results["train_loss"]))
#    # Save results in a csv
#    results_df = pd.DataFrame(columns= ["train_loss","val_loss","train_acc","val_acc","epochs"])
#    results_df["train_loss"] = train_loss
#    results_df["val_loss"] = val_loss
#    results_df["train_acc"] = train_acc
#    results_df["val_acc"] = val_acc
#    results_df["epochs"] = epochs
#    results_df_name = "results_df_"+str(i)+".csv"
#    results_df.to_csv(results_df_name)

In [28]:
## Keeping the best model

# Looking across all the best validation accuracies obtained from the ten folds
val_accuracies = np.zeros([10,1])
for i in range(10):
    results_string = "results_df_"+str(i)+".csv"
    val_accuracies[i] = np.max(pd.read_csv(results_string)["val_acc"])
    
# Get the fold corresponding to the overall best
index = np.argmax(val_accuracies)

In [None]:
# Loading the checkpoint
def load_checkpoint(filepath):
    checkpoint = torch.load(filepath,map_location=torch.device("cpu")) 
    model = checkpoint["model"]
    model.load_state_dict(checkpoint["state_dict"])
    for parameter in model.parameters():
        parameter.requires_grad = False
    model.eval()
    return model

model_string = "checkpoint_"+str(index)+".pth"
model_cp = load_checkpoint(model_string)
print(model_cp)

# Explainability through Occlusion

In [None]:
# Best fold
val_string = "val_"+str(index)

# Getting a random image path from validation
test_image_path = "dog.11212.jpg" 
test_image_label = 1

# Opening the image
img = Image.open("dataset/"+test_image_path)
test_image = data_transform(img).unsqueeze(dim=0) 
plt.imshow(data_transform(img).permute(1, 2, 0)) 
plt.axis("off")

In [None]:
# Performing a forward pass on a single image
model_cp.eval()
with torch.inference_mode():
    pred = model_cp(test_image.to(device))
    
# Printing out what's happening and convert model logits -> pred probs -> pred label
print("Output logits:\n",pred,"\n")
print("Output prediction probabilities:\n",torch.softmax(pred, dim=1),"\n")
print("Output prediction label:\n",torch.argmax(torch.softmax(pred, dim=1), dim=1),"\n")
print("Actual label:\n",test_image_label)

In [None]:
# Occlude a part of the image
test_image_occ = data_transform(img)
test_image_occ[:,24:72,60:108] = 128
plt.imshow(test_image_occ.permute(1,2,0)) 
plt.axis("off")
test_image_occ = test_image_occ.unsqueeze(dim=0)

In [None]:
# Test again on the occluded image
model_cp.eval()
with torch.inference_mode():
    pred = model_cp(test_image_occ.to(device))
    
# Printing out what's happening and convert model logits -> pred probs -> pred label
print("Output logits:\n",pred,"\n")
print("Output prediction probabilities:\n",torch.softmax(pred,dim=1),"\n")
print("Output prediction label:\n",torch.argmax(torch.softmax(pred,dim=1),dim=1),"\n")
print("Actual label:\n",test_image_label)

Now the dog is classified as a cat!

In [34]:
# Perform a systematic occlusion

# Defining the mask 
MASK_WIDTH = 48
MASK_HEIGHT = 48

# Deriving the coordinates of the mask center on the image
X = np.arange(IMAGE_WIDTH)
Y = np.arange(IMAGE_HEIGHT)
X1,Y1 = np.meshgrid(X,Y,indexing="xy")

# Deriving all the starting X and Y of the mask on the image
X1 = X1.flatten("F") - MASK_WIDTH/2
Y1 = Y1.flatten("F") - MASK_HEIGHT/2

# Deriving all the ending X and Y of the mask on the image
X2 = X1 + MASK_WIDTH
Y2 = Y1 + MASK_HEIGHT

# Convert type for subsequent indexing
X1 = X1.astype(int)
X2 = X2.astype(int)
Y1 = Y1.astype(int)
Y2 = Y2.astype(int)

In [None]:
# Modulating all X and Y in order to avoid point outside the image
X1 = np.maximum(0,X1)
Y1 = np.maximum(0,Y1)
X2 = np.minimum(IMAGE_WIDTH,X2)
Y2 = np.minimum(IMAGE_HEIGHT,Y2)

In [None]:
## These lines of code take a lot of time

# Performing predictions on all the possible occluded version of the image

# TO DO   ->   IMPLEMENTE HERE THE OCCLUSION ALGORITHM









In [None]:
# loading predictions from csv
pred_df = pd.read_csv("pred_df.csv")

# Visualize the image 
fig = plt.figure(figsize=(16, 12))
ax1 = fig.add_subplot(121)
im1 = ax1.imshow(data_transform(img).permute(1,2,0))
ax1.axis("off")

# Visualize the predicted probability of being either class 0 or 1 (where 1 is 'dog') 
pred_mask = np.reshape(pred_df["class_1"].to_numpy(),(IMAGE_HEIGHT,IMAGE_WIDTH))
ax2 = fig.add_subplot(122)
im2 = ax2.imshow(pred_mask,cmap=plt.cm.hot)
ax2.axis("off")
divider = make_axes_locatable(ax2)
cax = divider.append_axes('right', size='5%', pad=0.05)
fig.colorbar(im2, cax=cax, orientation='vertical')

The relevance is high at low values predicted probability because if these parts of image are masked, then the image is erroneously classified as cat. 

In [None]:
# Visualize which parts of the image are more relevant for a correct classification
thresh_pred = pred_mask < 0.35  
plt.imshow(data_transform(img).permute(1,2,0) * thresh_pred[...,None])

It seems that the dog ear and body shape have a high relevance for the classification.

## Explainability through Saliency

In [None]:
# Transform image for model classification
test_image_sal = data_transform(img).unsqueeze(dim=0)

# Gradient of higest score are required to be calculated for the tensor image 
# TO DO -> set the gradients of the input image true
    

# Gradients are not needed for the model parameters
# TO DO -> set the model parameters in a state where the gradients are not computed
    

# Set model in eval mode
# TO DO -> set the model in evaluation mode


# Forward pass to calculate predictions
# TO DO -> compute the prediction from the model using the test_image_sal as input
# TO DO -> extract the max score and corresponding index from the prediction


# Backward pass to get gradients of scores predicted classes w.r.t. input image
# TO DO -> compute the backward function of the output score
    
# Get max along channel axis
saliency,_ = torch.max(torch.abs(test_image_sal.grad[0]),dim=0)
# Normalize to [0 ... 1]
saliency = (saliency - saliency.min())/(saliency.max()-saliency.min())

# Visualize the image 
fig = plt.figure(figsize=(16, 12))
ax1 = fig.add_subplot(121)
im1 = ax1.imshow(data_transform(img).permute(1,2,0))
ax1.axis("off")

# Visualize the saliency map 
ax2 = fig.add_subplot(122)
im2 = ax2.imshow(saliency.numpy(),cmap=plt.cm.hot)
ax2.axis("off")
divider = make_axes_locatable(ax2)
cax = divider.append_axes('right', size='5%', pad=0.05)
fig.colorbar(im2, cax=cax, orientation='vertical')

## Captum

Python package for doing XAI

In [None]:
### These lines take a lot of time as when built from scratch
#
#from captum.attr import Occlusion
#
## Defining Occlusion interpreter
#ablator = Occlusion(model_cp)
## Computes occlusion attribution, ablating each patch,
## shifting in each direction by the default of 1.
#occ_attr = ablator.attribute(data_transform(img).unsqueeze(dim=0), target=1, sliding_window_shapes=(3,48,48))
#
## Save to file
#torch.save(occ_attr, "occ_attr.pt")

In [None]:
# Load the attributions
occ_attr = torch.load("occ_attr.pt",map_location="cpu")

In [None]:
# TO DO -> import Saliency model from Captum

# Defining Saliency interpreter
# TO DO -> define Saliency

# Computes saliency maps for class 1
# TO DO -> compute Saliency w.r.t. class 1
sal_attr = ...

In [None]:
# Visualize the attribution maps 
fig = plt.figure(figsize=(16, 12))
ax1 = fig.add_subplot(121)
im1 = ax1.imshow(np.transpose(occ_attr.squeeze().cpu().detach().numpy(), (1,2,0)),cmap=plt.cm.hot)
ax1.set_title("Occlusion")
ax1.axis("off")
divider = make_axes_locatable(ax1)
cax = divider.append_axes('right', size='5%', pad=0.05)
fig.colorbar(im1, cax=cax, orientation='vertical')

ax2 = fig.add_subplot(122)
im2 = ax2.imshow(np.transpose(sal_attr.squeeze().cpu().detach().numpy(), (1,2,0)),cmap=plt.cm.hot)
ax2.set_title("Saliency")
ax2.axis("off")
divider = make_axes_locatable(ax2)
cax = divider.append_axes('right', size='5%', pad=0.05)
fig.colorbar(im2, cax=cax, orientation='vertical')
