<a href="https://colab.research.google.com/github/JalalSayed1/DL_CW/blob/master/DL_CW_damages.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## GUID: 2571964S

## to describe and motivate your design choices: architecture, pre-processing, training regime

## to analyse, describe and comment on your results

## to provide some discussion on what you think are the limitations of your solution and what could be future work

## Can discuss:
Choice of Architecture: Explain why you chose a specific architecture over others. For instance, U-Net might be chosen for its efficiency and small dataset effectiveness, while DeepLab could be chosen for its state-of-the-art performance on segmentation tasks.

Preprocessing: Describe any preprocessing steps you take, such as resizing images, normalising pixel values, data augmentation, etc.

Postprocessing: If you apply any postprocessing to the segmentation maps, such as CRFs to sharpen the boundaries, explain why and how this improves the results.

Loss Functions: Discuss the choice of loss function, which in the case of segmentation could be cross-entropy, dice coefficient, or a combination of several loss functions.

Metrics: Describe the metrics you'll use to evaluate the performance of your model, such as pixel accuracy, mean Intersection over Union (IoU), etc.

Training Strategy: Detail the training process, including the choice of optimiser, learning rate, batch size, and any other hyperparameters.

Results and Analysis: Present the results and provide an analysis of what worked and what didn’t. Discuss any challenges you faced and how you addressed them.

Visualisation: Explain the importance of visualisation in understanding the performance of your model. For example, overlay images help to see where the model is performing well and where it is making mistakes.

---

# Damages - Deep Learning Coursework 2024

The aim of this coursework will be for you to design, implement and test a deep learning architecture to detect and identify damage in images. Digitization allows to make historical pictures and art much more widely available to the public. Many such pictures have suffered some form of damage due to time, storage conditions and the fragility of the original medium. For example, the image below (A) shows an example of a digitized parchment that has suffered significant damage over time.

**The aim of this project is for you to design, implement and evaluate a deep learning model to detect and identify damage present in images.**

<table>
<tr>
<td>
<div>
<img src="damage_data/image_path/cljmrkz5n341f07clcujw105j.png" width="500"/>
</div>
</td>
<td>
<div>
<img src="damage_data/annotation_rgb_path/cljmrkz5n341f07clcujw105j.png" width="500"/>
</div>
</td>
</tr>
<td><center>(A) Image</center></td><td><center>(B) damage labels</center></td>
</table>
*(Note that the images will only show once you have downloaded the dataset)*


The image labels in this figure (B) identifies a smatter of peeling paint, a large stained area in the bottom left and a missing part on the top left. Each colour in those images corresponds to a different category of damage, including `fold`, `writing` or `burn marks`. You are provided with a dataset of a variety of damaged images, from Parchment to ceramic or wood painting, and detailed annotations of a range of damages.

You are free to use any architecture you prefer, from what we have seen in class. You can decide to use unsupervised pre-training of only supervised end-to-end training - the approach you choose is your choice.

### Hand-in date: Friday 15th of March before 4:30pm (on Moodle)

### Steps & Hints
* First, look at the data. What are the different type of images (content), what type of material, what type of damage? How different are they? What type of transformations for your data augmentation do you think would be acceptable here?.
* Second, check the provided helper functions for loading the data and separate into training and test set and cross-validation.
* Design a network for the task. What output? What layers? How many? Do you want to use an Autoencoder for unsupervised pre-training?
* Choose a loss function for your network
* Select optimiser and training parameters (batch size, learning rate)
* Optimise your model, and tune hyperparameters (especially learning rate, momentum etc)
* Analyse the results on the test data. How to measure success? Which classes are recognised well, which are not? Is there confusion between some classes? Look at failure cases.
* If time allows, go back to drawing board and try a more complex, or better, model.
* Explain your thought process, justify your choices and discuss the results!

### Submission
* submit ONE zip file on Moodle containing:
  * **your notebook**: use `File -> download .ipynb` to download the notebook file locally from colab.
  * **a PDF file** of your notebook's output as you see it: use `File -> print` to generate a PDF.
* your notebook must clearly contains separate cells for:
  * setting up your model and data loader
  * training your model from data
  * loading your pretrained model from github/gitlab/any other online storage you like!
  * testing your model on test data.
* The training cells must be disabled by a flag, such that when running *run all* on your notebook it does
  * load the data
  * load your model
  * apply the model to the test data
  * analyse and display the results and accuracy
* In addition provide markup cell:
  * containing your student number at the top
  * to describe and motivate your design choices: architecture, pre-processing, training regime
  * to analyse, describe and comment on your results
  * to provide some discussion on what you think are the limitations of your solution and what could be future work

* **Note that you must put your trained model online so that your code can download it.**


### Assessment criteria
* In order to get a pass mark, you will need to demonstrate that you have designed and trained a deep NN to solve the problem, using sensible approach and reasonable efforts to tune hyper-parameters. You have analysed the results. It is NOT necessary to have any level of accuracy (a network that predicts poorly will always yield a pass mark if it is designed, tuned and analysed sensibly).
* In order to get a good mark, you will show good understanding of the approach and provide a working solution.
* in order to get a high mark, you will demonstrate a working approach of gradual improvement between different versions of your solution.
* bonus marks for attempting something original if well motivated - even if it does not yield increased performance.
* bonus marks for getting high performance, and some more points are to grab for getting the best performance in the class.

### Notes
* You are provided code to isolate the test set and cross validation, make sure to keep the separation clean to ensure proper setting of all hyperparameters.
* I recommend to start with small models that can be easier to train to set a baseline performance before attempting more complex one.
* Be mindful of the time!

In [None]:
using_colab = False
if using_colab:
    from google.colab import drive
    drive.mount('/content/drive')

## Housekeeping

In [None]:
!pip install gdown pytorch_lightning

In [None]:

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint

import os
import pandas as pd
import PIL
PIL.Image.MAX_IMAGE_PIXELS = 243748701
from PIL import Image
import numpy as np
import matplotlib.pyplot as plt
import gdown
import shutil

DEVICE = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
DEVICE

# Load dataset

We then load the metadata in a dataframe for convenience

In [None]:
if using_colab:
    !pwd

In [None]:
if not os.path.exists("damage_data"):
    !gdown 1v8aUId0-tTW3ln3O2BE4XajQeCToOEiS -O damages.zip

In [None]:
# set  that to wherever you want to store the data (eg, your Google Drive), choose a persistent location!
root_dir = '.'
data_dir = os.path.join(root_dir, "damage_data")
csv_path = os.path.join(data_dir, 'metadata.csv')

try:
    df = pd.read_csv(csv_path)

except:  # if the dataset has not been downloaded yet, do it.
    zip_path = os.path.join(root_dir, 'damages.zip')
    gdown.download(id='1v8aUId0-tTW3ln3O2BE4XajQeCToOEiS', output=zip_path)
    shutil.unpack_archive(zip_path, root_dir)
    df = pd.read_csv(csv_path)

This dataframe has the paths of where the dataset images and annotation labels are stored, plus classification labels.

In [None]:
df

The images in the dataset are categorised in terms of the type of `material`, meaning what was the original picture on, eg, Parchment, Glass or Textile.

In [None]:
df['material'].unique()

Moreover, images are also categorised in terms on the `content` of the image, meaning what is depicted: eg, Line art, geometric patterns, etc.

In [None]:
df['content'].unique()

## Labels
Segmentation labels are saved as a PNG image, where each number from 1 to 15 corresponds to a damage class like Peel, Scratch etc; the Background class is set to 255, and the Clean class (no damage) is set to 0. We also provide code to convert these annotation values to RGB colours for nicer visualisation, but for training you should use the original annotations.

In [None]:
name_color_mapping = {
    "Material loss": "#1CE6FF",
    "Peel": "#FF34FF",
    "Dust": "#FF4A46",
    "Scratch": "#008941",
    "Hair": "#006FA6",
    "Dirt": "#A30059",
    "Fold": "#FFA500",
    "Writing": "#7A4900",
    "Cracks": "#0000A6",
    "Staining": "#63FFAC",
    "Stamp": "#004D43",
    "Sticker": "#8FB0FF",
    "Puncture": "#997D87",
    "Background": "#5A0007",
    "Burn marks": "#809693",
    "Lightleak": "#f6ff1b",
}

class_names = [ 'Material loss', 'Peel', 'Dust', 'Scratch',
                'Hair', 'Dirt', 'Fold', 'Writing', 'Cracks', 'Staining', 'Stamp',
                'Sticker', 'Puncture', 'Burn marks', 'Lightleak', 'Background']

class_to_id = {class_name: idx+1 for idx, class_name in enumerate(class_names)}
class_to_id['Background'] = 255  # Set the Background ID to 255

def hex_to_rgb(hex_color: str) -> tuple:
    hex_color = hex_color.lstrip('#')
    return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))

id_to_rgb = {class_to_id[class_name]: hex_to_rgb(color) for class_name, color in name_color_mapping.items()}
id_to_rgb[0] = (0,0,0)

# Create id2label mapping: ID to class name
id2label = {idx: class_name for class_name, idx in class_to_id.items()}

# Create label2id mapping: class name to ID, which is the same as class_to_id
label2id = class_to_id

# Non-damaged pixels
id2label[0] = 'Clean'
label2id['Clean'] = 0

print(len(id_to_rgb))

In [None]:
from IPython.display import Markdown

legend='#### Colour labels for each damage type\n'
for damage in class_names:
    legend += '- <span style="color: {color}">{damage}</span>.\n'.format(color=name_color_mapping[damage], damage=damage)
display(Markdown(legend))

## Create dataset splits

Here is an example of how to split the dataset for Leave-one-out cross validation (LOOCV) based on material.

In [None]:
def create_leave_one_out_splits(df, criterion='material'):

    grouped = df.groupby(criterion)
    content_splits = {name: group for name, group in grouped}
    unique_val = df[criterion].unique()

    # Initialize a dictionary to hold the train and validation sets for each LOOCV iteration
    loocv_splits = {}

    for value in unique_val:
        # Create the validation set
        val_set = content_splits[value]

        # Create the training set
        train_set = pd.concat([content_splits[c] for c in unique_val if c != value])

        # Add these to the loocv_splits dictionary
        loocv_splits[value] = {'train_set': train_set, 'val_set': val_set}

    return loocv_splits


For this coursework, we will want to assess the generalisation of the method, so for that we will keep one type of material (`Canvas`) as test set, and only train on the remaining ones.

In [None]:
# split the dataset according to material type
full_splits = create_leave_one_out_splits(df, 'material')

# use Canvas as test set
test_set = full_splits['Canvas']['val_set']

# use the rest as training set
train_set = full_splits['Canvas']['train_set']

# prepare a leave-one-out cross validation for the training set
loocv_splits = create_leave_one_out_splits(train_set, 'material')

# identify the different type of image content
unique_material = train_set['material'].unique()

print("Training set materials:", unique_material)
print("Test set material:", test_set['material'].unique())


To help you, here are some helper functions to help crop and process images.

In [None]:
def random_square_crop_params(image, target_size):
    width, height = image.size
    min_edge = min(width, height)

    # Conditionally set the range for random crop size
    lower_bound = min(min_edge, target_size)
    upper_bound = max(min_edge, target_size)

    # Generate crop_size
    crop_size = random.randint(lower_bound, upper_bound)

    # Check and adjust if crop_size is larger than any dimension of the image
    if crop_size > width or crop_size > height:
        crop_size = min(width, height)

    # Generate random coordinates for the top-left corner of the crop
    x = random.randint(0, width - crop_size)
    y = random.randint(0, height - crop_size)

    return (x, y, x + crop_size, y + crop_size)

def apply_crop_and_resize(image, coords, target_size):
    image_crop = image.crop(coords)
    image_crop = image_crop.resize((target_size, target_size), Image.NEAREST)
    return image_crop

We also provide a simple class for holding the dataset

In [None]:
import torch
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
import random
import numpy as np
from PIL import Image

class CustomDataset(Dataset):
    def __init__(self, dataframe, target_size, is_train=True):
        self.dataframe = dataframe
        self.target_size = target_size
        self.is_train = is_train

        self.to_tensor = transforms.ToTensor()

        # Define the normalization transform
        self.normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                              std=[0.229, 0.224, 0.225])

    def __len__(self):
            return len(self.dataframe)

    def __getitem__(self, idx):
        row = self.dataframe.iloc[idx]
        image = Image.open(row['image_path']).convert('RGB')
        annotation = Image.open(row['annotation_path']).convert('L')
        annotation_rgb = Image.open(row['annotation_rgb_path']).convert('RGB')
        id = row['id']
        material = row['material']
        content = row['content']

        if self.is_train:
            # Generate random square cropping coordinates
            crop_coords = random_square_crop_params(image, self.target_size)

            # Apply the same cropping and resizing to all
            image = apply_crop_and_resize(image, crop_coords, self.target_size)
            annotation = apply_crop_and_resize(annotation, crop_coords, self.target_size)
            annotation_rgb = apply_crop_and_resize(annotation_rgb, crop_coords, self.target_size)
        else:  # Validation
            # Instead of cropping, downsize the images so that the longest edge is 1024 or less
            # max_edge = max(image.size)
            # if max_edge > 1024:
            #     downsample_ratio = 1024 / max_edge
            #     new_size = tuple([int(dim * downsample_ratio) for dim in image.size])

            #     image = image.resize(new_size, Image.BILINEAR)
            #     annotation = annotation.resize(new_size, Image.NEAREST)
            #     annotation_rgb = annotation_rgb.resize(new_size, Image.BILINEAR)

            # Generate random square cropping coordinates
            crop_coords = random_square_crop_params(image, self.target_size)

            # Apply the same cropping and resizing to all
            image = apply_crop_and_resize(image, crop_coords, self.target_size)
            annotation = apply_crop_and_resize(annotation, crop_coords, self.target_size)
            annotation_rgb = apply_crop_and_resize(annotation_rgb, crop_coords, self.target_size)

        # Convert PIL images to PyTorch tensors
        image = self.to_tensor(image)
        annotation = torch.tensor(np.array(annotation), dtype=torch.long)
        annotation_rgb = self.to_tensor(annotation_rgb)

        # Normalize the image
        image = self.normalize(image)

        # Change all values in annotation that are 255 to 16
        #! why?
        annotation[annotation == 255] = 16

        return {
            'image': image,
            'annotation': annotation,
            'annotation_rgb': annotation_rgb,
            'id': id,
            'material': material,
            'content': content
        }
        

Here we create a DataModule which encapsulates our training and validation DataLoaders; you can also do this manually by only using the Pytorch DataLoader class, lines 24 and 27.

In [None]:
from torch.utils.data import DataLoader
from torchvision import transforms
import torch.nn.functional as F

class CustomDataModule(pl.LightningDataModule):
    def __init__(self, loocv_splits, current_material, target_size, batch_size=32, num_workers=4):
        super().__init__()
        self.loocv_splits = loocv_splits
        self.current_material = current_material
        self.target_size = target_size
        self.batch_size = batch_size
        self.num_workers = num_workers

    def prepare_data(self):
        pass

    def setup(self, stage=None):
        # Load current train and validation set based on LOOCV iteration
        train_df = self.loocv_splits[self.current_material]['train_set']
        val_df = self.loocv_splits[self.current_material]['val_set'].sample(frac=1).reset_index(drop=True)

        self.train_dataset = CustomDataset(dataframe=train_df, target_size=self.target_size, is_train=True)
        self.val_dataset = CustomDataset(dataframe=val_df, target_size=self.target_size, is_train=False) 

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=1, shuffle=False, num_workers=self.num_workers)

    def test_dataloader(self):
        pass


The following will create a data module for validating on the first content in the list (`Parchment`) and training on all the other types of material (you will want to do that for each fold).

In [None]:
num_workers = 4 if using_colab else 0
data_module = CustomDataModule(loocv_splits=loocv_splits,
                               current_material=unique_material[0],
                               target_size=512,
                               batch_size=4,
                               num_workers=num_workers)

Finally, we can get the train and validation data loaders from the data module.

In [None]:
data_module.setup()
train_loader = data_module.train_dataloader()
val_loader = data_module.val_dataloader()

print("Number of training batches:", len(train_loader))
print("Number of training samples:", len(train_loader.dataset))
# val dataset is set to have batch size of 1:
print("Number of validation batches:", len(val_loader))
print("Number of validation samples:", len(val_loader.dataset))
print("image size:", train_loader.dataset[-1]['image'].shape)
print("annotation size:", train_loader.dataset[-1]['annotation'].shape)
print("number of material in training set:", len(train_loader.dataset.dataframe['material'].unique()))


# Dataset visualisation

We need to denormalise the images so we can display them

In [None]:
# Mean and std used for normalization
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

def denormalize(image, mean = [0.485, 0.456, 0.406], std = [0.229, 0.224, 0.225]):
    img_cpy = image.copy()
    for i in range(3):
        img_cpy[..., i] = img_cpy[..., i] * std[i] + mean[i]
    return img_cpy

## Visualise training samples
Random square crops of the images and correspoding RGB annotations on their own and overlaid onto the image.

In [None]:
num_to_visualise = len(train_loader.dataset) # all images
print("Number of image that can be visualized:", num_to_visualise)

example_batch = next(iter(train_loader))
print("Shape of the image batch:", example_batch['image'].shape)

example_images = example_batch['image']
example_annotations = example_batch['annotation']
example_annotation_rgbs = example_batch['annotation_rgb']

# Number of examples to visualize
# N = min(4, len(example_images))
N = min(num_to_visualise, len(example_images))
print("Number of examples to visualize:", N)

fig, axes = plt.subplots(N, 3, figsize=(15, 5 * N))

for ax, col in zip(axes[0], ['Image', 'Annotation', 'Overlay']):
    ax.set_title(col, fontsize=24)

for i in range(N):
    example_image = denormalize(example_images[i].numpy().transpose((1, 2, 0)), mean, std)  # C, H, W -> H, W, C
    example_annotation = Image.fromarray(np.uint8(example_annotations[i].numpy()), 'L')
    example_annotation_rgb = example_annotation_rgbs[i].numpy().transpose((1, 2, 0))  # C, H, W -> H, W, C

    # Create an alpha (transparency) channel where black pixels in annotation_rgb are fully transparent
    alpha_channel = np.all(example_annotation_rgb == [0, 0, 0], axis=-1)
    example_annotation_rgba = np.dstack((example_annotation_rgb, np.where(alpha_channel, 0, 1)))

    axes[i, 0].imshow(example_image)
    axes[i, 0].axis('off')

    #axes[i, 1].imshow(example_annotation, cmap='gray', vmin=0, vmax=255)
    axes[i, 1].imshow(example_annotation_rgb)
    axes[i, 1].axis('off')

    axes[i, 2].imshow(example_image)
    axes[i, 2].imshow(example_annotation_rgba)
    axes[i, 2].axis('off')

plt.tight_layout()
plt.show()


Visualising the validation set, which loads the left-out class as whole images.

In [None]:
val_iter = iter(val_loader)
example_batches = [next(val_iter) for _ in range(N)]

# Initialize empty lists to collect different parts of each batch
example_images = []
example_annotations = []
example_annotation_rgbs = []
example_materials = []
example_contents = []

# Populate the lists with the data from the 4 batches
for batch in example_batches:
    example_images.append(batch['image'].squeeze())
    example_annotations.append(batch['annotation'].squeeze())
    example_annotation_rgbs.append(batch['annotation_rgb'].squeeze())
    example_materials.append(batch['material'][0])
    example_contents.append(batch['content'][0])
    
    print("batch image shape:", batch['image'].shape)
    print("batch annotation shape:", batch['annotation'].shape)
    print("Shape of the image batch:", example_images[0].shape)
    print("Shape of the annotation batch:", example_annotations[0].shape)

# Number of examples to visualize
# N = min(4, len(example_images))
N = min(num_to_visualise, len(example_images))

fig, axes = plt.subplots(N, 3, figsize=(15, 5 * N))

for ax, col in zip(axes[0], ['Image', 'Annotation', 'Overlay']):
    ax.set_title(col, fontsize=24)

for i in range(N):
    example_image = denormalize(example_images[i].numpy().transpose((1, 2, 0)), mean, std)  # C, H, W -> H, W, C
    example_annotation = example_annotations[i].numpy()
    example_annotation_rgb = example_annotation_rgbs[i].numpy().transpose((1, 2, 0))  # C, H, W -> H, W, C
    example_material = example_materials[i]
    example_content = example_contents[i]
    # Create an alpha (transparency) channel where black pixels in annotation_rgb are fully transparent
    alpha_channel = np.all(example_annotation_rgb == [0, 0, 0], axis=-1)
    example_annotation_rgba = np.dstack((example_annotation_rgb, np.where(alpha_channel, 0, 1)))
    axes[i, 0].imshow(example_image)
    axes[i, 0].axis('off')

    axes[i, 1].imshow(example_annotation_rgb)
    axes[i, 1].axis('off')

    axes[i, 2].imshow(example_image)
    axes[i, 2].imshow(example_annotation_rgba)
    axes[i, 2].axis('off')

plt.tight_layout()
plt.show()

# Evaluation

For the final evaluation of the model, make sure to test performance on the left out category, `Canvas` to have a fair idea on how well the model generalises.

In [None]:
test_module = CustomDataModule(loocv_splits=full_splits,
                               current_material='Canvas',
                               target_size=512,
                               batch_size=4)

test_module.setup()

test_loader = test_module.val_dataloader()


---

# My Solution:

---

### Network Design

In [None]:
# Shape of the image batch: torch.Size([4, 3, 512, 512])
class UNet(nn.Module):
    def __init__(self, in_channels, out_channels, features=[64, 256, 512, 1024], latent_dims=16, verbose=False):
        super().__init__()
        self.verbose = verbose
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.features = features
        self.latent_dims = latent_dims

        # Define the encoder path (downsampling)
        self.encoder_cnn = nn.Sequential(
            nn.Conv2d(in_channels=in_channels,
                      out_channels=features[0], kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(
                in_channels=features[0], out_channels=features[1], kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(
                in_channels=features[1], out_channels=features[2], kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(
                in_channels=features[2], out_channels=features[3], kernel_size=3, stride=1, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        # self.encoder_flatten = nn.Flatten()
        # self.bottleneck = nn.Sequential(
        #     nn.Conv2d(features[3], features[3], kernel_size=1),
        #     nn.ReLU(inplace=True)
        # )

        self.decoder_cnn = nn.Sequential(
            nn.ConvTranspose2d(in_channels=features[3], out_channels=features[2],
                               kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(in_channels=features[2], out_channels=features[1],
                               kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(in_channels=features[1], out_channels=features[0],
                               kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(inplace=True),
            nn.ConvTranspose2d(in_channels=features[0], out_channels=out_channels,
                               kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.ReLU(inplace=True)
        )
        # Define the final convolution to reduce the number of channels to the number of classes:
        # self.final_conv = nn.Conv2d(
        #     in_channels=in_channels, out_channels=out_channels, kernel_size=1)

    def forward(self, x):
        original_size = x.size()[-2:]
        
        # ' Encoder
        if self.verbose:
            print("Input shape:", x.shape)
        
        x = self.encoder_cnn(x)
        if self.verbose:
            print("Shape after encoder cnn:", x.shape)

        # x = self.bottleneck(x)
        # if self.verbose:
        #     print("Shape after bottleneck:", x.shape)

        # ' Decoder
        x = self.decoder_cnn(x)
        if self.verbose:
            print("Shape after decoder cnn:", x.shape)

        # Force the model to output the same spatial dimensions as the input image:
        # x = F.interpolate(x, size=original_size, mode='bilinear', align_corners=False)
        # x = self.final_conv(x)
        # if self.verbose:
        #     print("Shape after final conv:", x.shape)

        # software because we are dealing with multi-class classification
        # apply it to the first dimension (channels) because it represents the classes.
        # Tensor shape: (batch_size, num_classes, height, width)
        x = nn.Softmax(dim=1)(x)

        if self.verbose:
            print("Shape after softmax:", x.shape)

        # x = torch.argmax(x, dim=1)
        
        # if self.verbose:
        #     print("Shape after argmax:", x.shape)

        if self.verbose:
            print()

        return x

    def enable_verbose(self, enabled):
        self.verbose = enabled

#### Limit training dataset to only one material for simplicity

In [None]:
limit_dataset = False # set to True to limit the dataset to one material for faster training

if limit_dataset:
    # limit the training dataset to only one material:
    train_loader.dataset.dataframe = train_loader.dataset.dataframe[train_loader.dataset.dataframe['material'] == 'Glass']
    print("number of material in training set:", len(train_loader.dataset.dataframe['material'].unique()))

    image_path = train_loader.dataset.dataframe.iloc[-1]['image_path']
    image_annotation_path = train_loader.dataset.dataframe.iloc[-1]['annotation_rgb_path']

    # from IPython.display import display, Image
    # display(Image(filename=image_path))
    # display(Image(filename=image_annotation_path))


#### Helper Functions

In [None]:
def tensor_similarity(outputs, targets):
    # output's shape from network is (batch_size, num_classes, height, width), convert it to (batch_size, height, width) 
    outputs = torch.argmax(outputs, dim=1)
    if outputs.shape != targets.shape:
        raise ValueError(f"Both tensors must have the same shape to compare their similarity. Got output's shape {outputs.shape} and target's shape {targets.shape} instead.")
    # Calculate the number of elements that are the same in both tensors
    same_elements = torch.eq(outputs, targets)
    # Calculate the similarity percentage
    similarity_percentage = torch.mean(same_elements.float())
    return similarity_percentage

def train_epoch(model, train_loader, loss_fn, optimiser):
    model.train()
    train_loss = []
    # train_accuracy = []
    train_similarity = []
    for batch in train_loader:
        images = batch['image'].to(DEVICE)
        targets = batch['annotation'].to(DEVICE)

        outputs = model(images)
        loss = loss_fn(outputs, targets)
        # accuracy = calculate_accuracy(outputs, targets)
        similarity = tensor_similarity(outputs, targets)
        
        # Backward pass:
        optimiser.zero_grad()
        loss.backward()
        optimiser.step()
        
        train_loss.append(loss.detach().cpu().numpy())
        # train_accuracy.append(accuracy.detach().cpu().numpy())
        train_similarity.append(similarity.detach().cpu().numpy())
        
    return np.mean(train_loss), np.mean(train_similarity) # np.mean(train_accuracy)

def test_epoch(model, test_loader, loss_fn):
    model.eval()
    test_loss = []
    # test_accuracy = []
    test_similarity = []
    with torch.no_grad():
        for batch in test_loader:
            images = batch['image'].to(DEVICE)
            targets = batch['annotation'].to(DEVICE)

            outputs = model(images)
            loss = loss_fn(outputs, targets)
            # accuracy = calculate_accuracy(outputs, targets)
            similarity = tensor_similarity(outputs, targets)

            test_loss.append(loss.detach().cpu().numpy())
            # test_accuracy.append(accuracy.detach().cpu().numpy())
            test_similarity.append(similarity.detach().cpu().numpy())
    return np.mean(test_loss), np.mean(test_similarity) # np.mean(test_accuracy)


### Loss Function:

In [None]:
class WeightedCombinedLoss(nn.Module):
    '''
    Weighted Combined Loss:
    - Cross Entropy Loss
    - Dice Loss
    
    The class weights are used to balance the class distribution in the dataset. The Dice Loss is weighted by the class weights.
    
    The final loss is the sum of the Cross Entropy Loss and the weighted Dice Loss.
    
    The Dice Loss is calculated as follows:
    - Convert the inputs to probabilities using the softmax function.
    - Convert the targets to one-hot encoded format.
    - Calculate the intersection and the union of the predictions and the targets.
    - Calculate the Dice Loss using the formula: 1 - (2 * intersection + smooth) / (pred.sum() + target.sum() + smooth)
    
    The combined loss is calculated as follows:
    - CE = F.cross_entropy(inputs, targets, weight=class_weights, reduction='mean')
    - Dice = dice_loss(F.softmax(inputs, dim=1).float(), F.one_hot(targets, inputs.size(1)).permute(0, 3, 1, 2).float(), smooth)
    - Combined Loss = CE + weighted_dice
    '''
    def __init__(self, class_weights=None):
        super(WeightedCombinedLoss, self).__init__()
        self.class_weights = class_weights

    def dice_loss(self, pred, target, smooth = 1.):
        pred = pred.contiguous()
        target = target.contiguous()    

        intersection = (pred * target).sum(dim=2).sum(dim=2)
        
        loss = (1 - ((2. * intersection + smooth) / (pred.sum(dim=2).sum(dim=2) + target.sum(dim=2).sum(dim=2) + smooth)))
        
        return loss.mean()

    def forward(self, inputs, targets, smooth=1):
        '''
        inputs => NxCxHxW
        
        targets => NxHxW
        '''

        # Cross Entropy Loss:
        CE = F.cross_entropy(inputs, targets, weight=self.class_weights, reduction='mean')
        
        # Dice Loss:
        dice_per_class = self.dice_loss(F.softmax(inputs, dim=1).float(),
                         F.one_hot(targets, inputs.size(1)).permute(0, 3, 1, 2).float(),
                         smooth)
        weighted_dice = (dice_per_class * self.class_weights).mean()
        
        # Combined Loss:
        combined_loss = CE + weighted_dice
        # print(f"CE: {CE}, Dice: {weighted_dice}, Combined: {combined_loss}")
        if combined_loss.isnan():
            print(f"CE: {CE}, Dice: {weighted_dice}, Combined: {combined_loss}")
        return combined_loss

### Hyperparameters

#### Hyperparameter values

In [None]:
torch.manual_seed(0)

in_channels = train_loader.dataset[-1]['image'].shape[-3]
# image_size = train_loader.dataset[-1]['image'].shape[-1] # square images so only one dimension is needed
num_classes = len(class_names) + 1  # 16 damage classes + 1 background class
out_channels = num_classes

class_weights = torch.ones(num_classes)
class_weights[0] = 0.1  # background class weight
class_weights = class_weights.to(DEVICE)
print(f"Class weights for {len(class_weights)} classes: {class_weights.tolist()}")
loss_fn = WeightedCombinedLoss(class_weights=class_weights).to(DEVICE)

lr = 1e-3
latent_dims = 16
momentum = 0.9
weight_decay = 0.0
dampening = 0

print(f"Initialising model with {in_channels} input channels and {out_channels} output channels with {latent_dims} latent dimensions.")
model = UNet(in_channels, out_channels, latent_dims=latent_dims).to(DEVICE)

#### Hyperparameters optimisation

In [None]:
from ax.service.managed_loop import optimize
from ax.plot.contour import plot_contour
from ax.plot.trace import optimization_trace_single_method
from ax.utils.notebook.plotting import render, init_notebook_plotting

optimise_parameters = True

if optimise_parameters:

    def train_evaluate(parameterisation):
        net = UNet(in_channels, out_channels, latent_dims=parameterisation.get('latent_dims')).to(DEVICE)
        optimiser = optim.SGD(net.parameters(), lr=parameterisation.get('learning_rate'), momentum=parameterisation.get('momentum'), weight_decay=parameterisation.get('weight_decay'), dampening=parameterisation.get('dampening'))
        train_epoch(net, train_loader, loss_fn, optimiser)
        # val_loss, val_accuracy = test_epoch(net, val_loader, loss_fn)
        prediction_correctness = torch.tensor([])
        for batch in val_loader:
            images = batch['image'].to(DEVICE)
            targets = batch['annotation'].to(DEVICE)
            outputs = net(images)
            prediction_correctness.append(1 - tensor_similarity(outputs, targets)) # 100% - similarity
        
        prediction_correctness_mean = prediction_correctness.mean()
        # print(f"Parameters: {parameterisation}, Val Loss: {val_loss}, Val Accuracy: {val_accuracy}")
        # return {"loss": (val_loss, 0.0)}  # return a tuple of the mean and the standard deviation of the loss
        print(f"Parameters: {parameterisation}, Prediction Correctness: {prediction_correctness_mean}")
        return {"accuracy": (prediction_correctness_mean, 0.0)}
    
    print("Optimising hyperparameters..")
    parameters = [
        {"name": "learning_rate", "type": "range", "bounds": [1e-5, 1e-1], "log_scale": True},
        {"name": "momentum", "type": "range", "bounds": [0.0, 1.0]},
        {"name": "weight_decay", "type": "range", "bounds": [0.0, 2.0]},
        {"name": "dampening", "type": "range", "bounds": [0.0, 1.0]},
        {"name": "latent_dims", "type": "range", "bounds": [2, 17]},
    ]

    best_parameters, values, experiment, optimiser_model = optimize(
        parameters=parameters,
        evaluation_function=train_evaluate,
        objective_name="accuracy",
        total_trials=100,
    )
    
    print()
    print("best_parameters", best_parameters)
    print("values", values)
    print("experiment", experiment)
    print("model", optimiser_model)

    lr = round(best_parameters['learning_rate'], 5)
    momentum = round(best_parameters['momentum'], 2)
    weight_decay = round(best_parameters['weight_decay'], 5)
    dampening = round(best_parameters['dampening'], 2)
    latent_dims = best_parameters['latent_dims']

    optimiser = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay, dampening=dampening)

    print("\nFinished optimising hyperparameters.")
    print("Best parameters:")
    [print(f"\t{key}: {value}") for key, value in best_parameters.items()]

    init_notebook_plotting(offline=True)
    best_objectives = np.array([[trial.objective_mean*100 for trial in experiment.trials.values()]])
    data = optimization_trace_single_method(best_objectives, title="Optimization trace", ylabel="loss")
    render(data)
    
else:
    optimiser = optim.SGD(model.parameters(), lr=lr, momentum=momentum, weight_decay=weight_decay, dampening=dampening)

    print("Using default hyperparameters..")
    [print(f"{key}: {value}") for key, value in {'learning_rate': lr, 'latent_dims': latent_dims, 'weight_decay': weight_decay, 'momentum': momentum}.items()]


print("########## Finished initialising model and training parameters. ##########")

### Training

In [None]:
test_model = False # set to True to test the model correctness before training

if test_model:
    from torchsummary import summary
    model.enable_verbose(True)
    image_size = train_loader.dataset[-1]['image'].shape[-1] # square images so only one dimension is needed
    summary(model, (in_channels, image_size, image_size))
    # turn off verbose mode:
    model.enable_verbose(False)


#### Train model

In [None]:
train_model_again = True # set to True to train the model again.
save_model = True # set to True to save the model to github after training. train_model_again must be True if save_model is True.

training_performance = {'train_loss': [], 'train_accuracy': []}
validation_performance = {'val_loss': [], 'val_accuracy': []}

if train_model_again:
    num_epochs = 100
    # model.enable_verbose(True)
    for epoch in range(num_epochs):
        train_loss, train_accuracy = train_epoch(model, train_loader, loss_fn, optimiser)
        val_loss, val_accuracy = test_epoch(model, val_loader, loss_fn)
    
        if (epoch < 10) or ((epoch + 1) % 5 == 0):
            print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")

        training_performance['train_loss'].append(train_loss)
        training_performance['train_accuracy'].append(train_accuracy)
        validation_performance['val_loss'].append(val_loss)
        validation_performance['val_accuracy'].append(val_accuracy)
        print(f"training performance: {training_performance}")
        print(f"validation performance: {validation_performance}")
        
    print("Training complete!")
    # Plot the training and validation losses
    plt.figure(figsize=(10, 5))
    
    plt.subplot(1, 2, 1)
    plt.plot(training_performance['train_loss'], label='Train Loss')
    plt.plot(validation_performance['val_loss'], label='Val Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Training and Validation Losses')
    
    plt.subplot(1, 2, 2)
    plt.plot(training_performance['train_accuracy'], label='Train Accuracy')
    plt.plot(validation_performance['val_accuracy'], label='Val Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    
    plt.title('Training and Validation Accuracies')
    plt.legend()
    
    # save plot:
    plt.savefig(f'img/training_results/losses_lr={lr}.svg')
    
    plt.show()
    
    if save_model:
        # Save model locally:
        torch.save(model.state_dict(), 'unet.pth')
        # Save model to github:
        !git add unet.pth
        !git commit -m "Add trained UNet model"
        !git push


### Load model from github

In [None]:
load_model = False # set to True to load the model from github after training.

if load_model:

    import requests

    def download_model(url, model_path):
        r = requests.get(url, allow_redirects=True)
        if r.status_code == 200:
            # Override the model.pth file if it already exists:
            with open(model_path, 'wb') as f:
                f.write(r.content)
            print(f"Model downloaded to {model_path}")
            return model_path
        else:
            print(f"Failed to download model. Status code: {r.status_code}")
            return None

    model_url = 'https://raw.githubusercontent.com/JalalSayed1/Image-Damage-Classification/3baa846923307ae573f4fbecd7c8f00fc269fad4/unet.pth'
    
    model_path = download_model(model_url, 'unet.pth')
    if model_path:
        print("Model downloaded successfully!")
    else:
        print("Failed to download model.")


### Test Model performance

In [None]:
import torch
torch.cuda.empty_cache()

In [None]:
from torchvision.transforms.functional import to_pil_image

model.load_state_dict(torch.load('unet.pth', map_location=DEVICE))
model.eval()
print("Model loaded successfully!")

def map_class_ids_to_rgb(input_tensor, id_to_rgb):
    # Assuming input_tensor is of shape [1, H, W] and contains class IDs for each pixel
    H, W = input_tensor.shape[1], input_tensor.shape[2]
    
    # Initialize the RGB image tensor
    rgb_image = torch.zeros((3, H, W), dtype=torch.uint8)
    
    # Convert class ID tensor to a 2D array for easier processing
    class_ids_2d = input_tensor.squeeze(0)
    
    # Iterate over each class ID in id_to_rgb mapping
    for class_id, rgb in id_to_rgb.items():
        # Create a mask for the current class ID
        class_mask = class_ids_2d == class_id
        
        # Apply the mask to set the appropriate RGB values
        for channel, color_value in enumerate(rgb):
            rgb_image[channel][class_mask] = color_value

    return rgb_image

N = min(5, len(test_loader.dataset))
print(f"Number of examples to visualize: {N}")

# print(f"example image shape: {test_loader.dataset[-1]['image'].shape}")
test_batches = [test_loader.dataset[i] for i in range(N)]

for index, batch in enumerate(test_batches):
    test_image = batch['image'].squeeze().to(DEVICE)
    test_annotation = (batch['annotation'].squeeze()).to(DEVICE)
    test_annotation_rgb = batch['annotation_rgb'].squeeze()
    predicted_annotation = model(test_image)
    
    # print(f"Shapes:")
    # print(f"\ttest_image: {test_image.shape}")
    # print(f"\ttest_annotation: {test_annotation.shape}")
    # print(f"\ttest_annotation_rgb: {test_annotation_rgb.shape}")
    # print(f"\tpredicted_annotation: {predicted_annotation.shape}")
    
    predicted_annotation = torch.argmax(predicted_annotation, dim=0).unsqueeze(0)
    # predicted_annotation_rgb = indices_to_rgb(predicted_annotation, id_to_rgb)
    predicted_annotation_rgb = map_class_ids_to_rgb(predicted_annotation, id_to_rgb)
    # print(f"\tpredicted_annotation_rgb: {predicted_annotation_rgb.shape}")
    
    fig, axes = plt.subplots(1, 4, figsize=(15, 5))
    
    similarity = round(tensor_similarity(predicted_annotation.unsqueeze(0), test_annotation.unsqueeze(0)).item() * 100, 2)
    plt.suptitle(f"Annotation Similarity: {similarity}%", fontsize=24, ha='center', va='top', y=1.1)

    titles = ['Image', 'Overlay', 'Predicted Overlay', 'Predicted']
    for ax, title in zip(axes, titles):
        ax.set_title(title, fontsize=24)
        
    test_image = denormalize(test_image.cpu().numpy().transpose((1, 2, 0)), mean, std)  # C, H, W -> H, W, C
    test_annotation_rgb = test_annotation_rgb.numpy().transpose((1, 2, 0))  # C, H, W -> H, W, C
    
    predicted_annotation = predicted_annotation.cpu().numpy().squeeze() # remove singleton dimension
    predicted_annotation_rgb = predicted_annotation_rgb.cpu().numpy().transpose((1, 2, 0)) # C, H, W -> H, W, C
    
    if similarity >= 0:
        # Save the images to a file
        test_image_pil = to_pil_image(test_image)
        test_image_pil.save(f'img/predictions/test_results/{index}1.png')
        test_annotation_rgb_pil = to_pil_image(test_annotation_rgb)
        test_annotation_rgb_pil.save(f'img/predictions/test_results/{index}2.png')
        predicted_annotation_rgb_pil = to_pil_image(predicted_annotation_rgb)
        predicted_annotation_rgb_pil.save(f'img/predictions/test_results/{index}3.png')
    
    axes[0].imshow(test_image)
    axes[0].axis('off')
    
    alpha_channel = np.all(test_annotation_rgb == [0, 0, 0], axis=-1)
    test_annotation_rgba = np.dstack((test_annotation_rgb, np.where(alpha_channel, 0, 1)))
    axes[1].imshow(test_image)
    axes[1].imshow(test_annotation_rgba)
    axes[1].axis('off')

    axes[2].imshow(predicted_annotation_rgb)
    axes[2].axis('off')

    predicted_annotation_rgba = np.dstack((predicted_annotation_rgb, np.where(alpha_channel, 0, 1)))
    axes[3].imshow(test_image)
    axes[3].imshow(predicted_annotation_rgba)
    axes[3].axis('off')


In [None]:
# !pip freeze > requirements.txt