KUL H02A5a Computer Vision: Group Assignment 2
---------------------------------------------------------------
Student numbers: <span style="color:red">r1, r2, r3, r4, r5</span>. (fill in your student numbers!)

In this group assignment your team will delve into some deep learning applications for computer vision. The assignment will be delivered in the same groups from *Group assignment 1* and you start from this template notebook. The notebook you submit for grading is the last notebook pinned as default and submitted to the [Kaggle competition](https://www.kaggle.com/t/90a3b6380ecb4700857b9e07a44ca41b) prior to the deadline on **Tuesday 20 May 23:59**. Closely follow [these instructions](https://github.com/gourie/kaggle_inclass) for joining the competition, sharing your notebook with the TAs and making a valid notebook submission to the competition. A notebook submission not only produces a *submission.csv* file that is used to calculate your competition score, it also runs the entire notebook and saves its output as if it were a report. This way it becomes an all-in-one-place document for the TAs to review. As such, please make sure that your final submission notebook is self-contained and fully documented (e.g. provide strong arguments for the design choices that you make). Most likely, this notebook format is not appropriate to run all your experiments at submission time (e.g. the training of CNNs is a memory hungry and time consuming process; due to limited Kaggle resources). It can be a good idea to distribute your code otherwise and only summarize your findings, together with your final predictions, in the submission notebook. For example, you can substitute experiments with some text and figures that you have produced "offline" (e.g. learning curves and results on your internal validation set or even the test set for different architectures, pre-processing pipelines, etc). We advise you to first go through the PDF of this assignment entirely before you really start. Then, it can be a good idea to go through this notebook and use it as your first notebook submission to the competition. You can make use of the *Group assignment 2* forum/discussion board on Toledo if you have any questions. Good luck and have fun!

---------------------------------------------------------------
NOTES:
* This notebook is just a template. Please keep the five main sections, but feel free to adjust further in any way you please!
* Clearly indicate the improvements that you make! You can for instance use subsections like: *3.1. Improvement: applying loss function f instead of g*.


# 1. Overview
This assignment consists of *three main parts* for which we expect you to provide code and extensive documentation in the notebook:
* Image classification (Sect. 2)
* Semantic segmentation (Sect. 3)
* Adversarial attacks (Sect. 4)

In the first part, you will train an end-to-end neural network for image classification. In the second part, you will do the same for semantic segmentation. For these two tasks we expect you to put a significant effort into optimizing performance and as such competing with fellow students via the Kaggle competition. In the third part, you will try to find and exploit the weaknesses of your classification and/or segmentation network. For the latter there is no competition format, but we do expect you to put significant effort in achieving good performance on the self-posed goal for that part. Finally, we ask you to reflect and produce an overall discussion with links to the lectures and "real world" computer vision (Sect. 5). It is important to note that only a small part of the grade will reflect the actual performance of your networks. However, we do expect all things to work! In general, we will evaluate the correctness of your approach and your understanding of what you have done that you demonstrate in the descriptions and discussions in the final notebook.

## 1.1 Deep learning resources
If you did not yet explore this in *Group assignment 1 (Sect. 2)*, we recommend using the TensorFlow and/or Keras library for building deep learning models. You can find a nice crash course [here](https://colab.research.google.com/drive/1UCJt8EYjlzCs1H1d1X0iDGYJsHKwu-NO).

In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
import numpy as np
import pandas as pd
import tensorflow as tf
from matplotlib import pyplot as plt
import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
import torch.nn.functional as F
from torchvision.transforms.functional import resize
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import train_test_split

## 1.2 PASCAL VOC 2009
For this project you will be using the [PASCAL VOC 2009](http://host.robots.ox.ac.uk/pascal/VOC/voc2009/index.html) dataset. This dataset consists of colour images of various scenes with different object classes (e.g. animal: *bird, cat, ...*; vehicle: *aeroplane, bicycle, ...*), totalling 20 classes.

In [None]:
# Loading the training data
train_df = pd.read_csv('/kaggle/input/kul-computer-vision-ga-2-2025/train/train_set.csv', index_col="Id")
# train_df =pd.read_csv('train/train_set.csv', index_col="Id")
labels = train_df.columns
train_df["img"] = [np.load('/kaggle/input/kul-computer-vision-ga-2-2025/train/img/train_{}.npy'.format(idx)) for idx, _ in train_df.iterrows()]
train_df["seg"] = [np.load('/kaggle/input/kul-computer-vision-ga-2-2025/train/seg/train_{}.npy'.format(idx)) for idx, _ in train_df.iterrows()]
# train_df["img"] = [np.load('train/img/train_{}.npy'.format(idx)) for idx, _ in train_df.iterrows()]
# train_df["seg"] = [np.load('train/seg/train_{}.npy'.format(idx)) for idx, _ in train_df.iterrows()]
print("The training set contains {} examples.".format(len(train_df)))

# Show some examples
fig, axs = plt.subplots(2, 20, figsize=(10 * 20, 10 * 2))
for i, label in enumerate(labels):
    df = train_df.loc[train_df[label] == 1]
    axs[0, i].imshow(df.iloc[0]["img"], vmin=0, vmax=255)
    axs[0, i].set_title("\n".join(label for label in labels if df.iloc[0][label] == 1), fontsize=40)
    axs[0, i].axis("off")
    axs[1, i].imshow(df.iloc[0]["seg"], vmin=0, vmax=20)  # with the absolute color scale it will be clear that the arrays in the "seg" column are label maps (labels in [0, 20])
    axs[1, i].axis("off")
    
plt.show()

# The training dataframe contains for each image 20 columns with the ground truth classification labels and 20 column with the ground truth segmentation maps for each class
train_df.head(1)

In [None]:
# Loading the test data
test_df = pd.read_csv('/kaggle/input/kul-computer-vision-ga-2-2025/test/test_set.csv', index_col="Id")
test_df["img"] = [np.load('/kaggle/input/kul-computer-vision-ga-2-2025/test/img/test_{}.npy'.format(idx)) for idx, _ in test_df.iterrows()]
# test_df = pd.read_csv('test/test_set.csv', index_col="Id")
# test_df["img"] = [np.load('test/img/test_{}.npy'.format(idx)) for idx, _ in test_df.iterrows()]
test_df["seg"] = [-1 * np.ones(img.shape[:2], dtype=np.int8) for img in test_df["img"]]
print("The test set contains {} examples.".format(len(test_df)))

# The test dataframe is similar to the training dataframe, but here the values are -1 --> your task is to fill in these as good as possible in Sect. 2 and Sect. 3; in Sect. 6 this dataframe is automatically transformed in the submission CSV!
test_df.head(1)

## 1.3 Your Kaggle submission
Your filled test dataframe (during Sect. 2 and Sect. 3) must be converted to a submission.csv with two rows per example (one for classification and one for segmentation) and with only a single prediction column (the multi-class/label predictions running length encoded). You don't need to edit this section. Just make sure to call this function at the right position in this notebook.

In [None]:
def _rle_encode(img):
    """
    Kaggle requires RLE encoded predictions for computation of the Dice score (https://www.kaggle.com/lifa08/run-length-encode-and-decode)

    Parameters
    ----------
    img: np.ndarray - binary img array
    
    Returns
    -------
    rle: String - running length encoded version of img
    """
    pixels = img.flatten()
    pixels = np.concatenate([[0], pixels, [0]])
    runs = np.where(pixels[1:] != pixels[:-1])[0] + 1
    runs[1::2] -= runs[::2]
    rle = ' '.join(str(x) for x in runs)
    return rle

def generate_submission(df):
    """
    Make sure to call this function once after you completed Sect. 2 and Sect. 3! It transforms and writes your test dataframe into a submission.csv file.
    
    Parameters
    ----------
    df: pd.DataFrame - filled dataframe that needs to be converted
    
    Returns
    -------
    submission_df: pd.DataFrame - df in submission format.
    """
    df_dict = {"Id": [], "Predicted": []}
    for idx, _ in df.iterrows():
        df_dict["Id"].append(f"{idx}_classification")
        df_dict["Predicted"].append(_rle_encode(np.array(df.loc[idx, labels])))
        df_dict["Id"].append(f"{idx}_segmentation")
        df_dict["Predicted"].append(_rle_encode(np.array([df.loc[idx, "seg"] == j + 1 for j in range(len(labels))])))
    
    submission_df = pd.DataFrame(data=df_dict, dtype=str).set_index("Id")
    submission_df.to_csv("submission.csv")
    return submission_df

# 2. Image classification
The goal here is simple: implement a classification CNN and train it to recognise all 20 classes (and/or background) using the training set and compete on the test set (by filling in the classification columns in the test dataframe).

In [None]:
class RandomClassificationModel:
    """
    Random classification model: 
        - generates random labels for the inputs based on the class distribution observed during training
        - assumes an input can have multiple labels
    """
    def fit(self, X, y):
        """
        Adjusts the class ratio variable to the one observed in y. 

        Parameters
        ----------
        X: list of arrays - n x (height x width x 3)
        y: list of arrays - n x (nb_classes)

        Returns
        -------
        self
        """
        self.distribution = np.mean(y, axis=0)
        print("Setting class distribution to:\n{}".format("\n".join(f"{label}: {p}" for label, p in zip(labels, self.distribution))))
        return self
        
    def predict(self, X):
        """
        Predicts for each input a label.
        
        Parameters
        ----------
        X: list of arrays - n x (height x width x 3)
            
        Returns
        -------
        y_pred: list of arrays - n x (nb_classes)
        """
        np.random.seed(0)
        return [np.array([int(np.random.rand() < p) for p in self.distribution]) for _ in X]
    
    def __call__(self, X):
        return self.predict(X)
    
model = RandomClassificationModel()
model.fit(train_df["img"], train_df[labels])
test_df.loc[:, labels] = model.predict(test_df["img"])
test_df.head(1)

# 3. Semantic segmentation
The goal here is to implement a segmentation CNN that labels every pixel in the image as belonging to one of the 20 classes (and/or background). Use the training set to train your CNN and compete on the test set (by filling in the segmentation column in the test dataframe).

In [None]:
class RandomSegmentationModel:
    """
    Random segmentation model: 
        - generates random label maps for the inputs based on the class distributions observed during training
        - every pixel in an input can only have one label
    """
    def fit(self, X, Y):
        """
        Adjusts the class ratio variable to the one observed in Y. 

        Parameters
        ----------
        X: list of arrays - n x (height x width x 3)
        Y: list of arrays - n x (height x width)

        Returns
        -------
        self
        """
        self.distribution = np.mean([[np.sum(Y_ == i) / Y_.size for i in range(len(labels) + 1)] for Y_ in Y], axis=0)
        print("Setting class distribution to:\nbackground: {}\n{}".format(self.distribution[0], "\n".join(f"{label}: {p}" for label, p in zip(labels, self.distribution[1:]))))
        return self
        
    def predict(self, X):
        """
        Predicts for each input a label map.
        
        Parameters
        ----------
        X: list of arrays - n x (height x width x 3)
            
        Returns
        -------
        Y_pred: list of arrays - n x (height x width)
        """
        np.random.seed(0)
        return [np.random.choice(np.arange(len(labels) + 1), size=X_.shape[:2], p=self.distribution) for X_ in X]
    
    def __call__(self, X):
        return self.predict(X)
    
model = RandomSegmentationModel()
model.fit(train_df["img"], train_df["seg"])
test_df.loc[:, "seg"] = model.predict(test_df["img"])
test_df.head(1)

## Semantic segmentation from scratch

Semantic segmentation involves classifying each pixel in an image into one of several predefined categories. It provides a dense, pixel-level understanding of the visual scene. 

Implemented U-Net allows to skip connections so features from the contracting path are concatenated with expanding path. Also it allows for symmetrical architecture featuring encoder-decoder structure with matching levels so all downsampling steps has a corresponding unsampling step. Additional Bottleneck Layer captures higher-level features before upsampling. It can provide better feature presentation because double convolutional blosck at each level help learning more robust features.

The connection of traditional convolutional neural network with skip connections was used which helps with loosing resolution with feature extraction. Encoder has four layers were each of them has double convolutional layer 3x3 with batch normalisation and ReLu activation. ReLU is nonlinear activation function to help to learn more advanced fearures. Batch normalisation helps in data standarisation in one small batch so it has mean value of 0 and variance equal to 1. First layer of encoder has 64filters converting input image 126x128x3 every next doubles number of filters and at the same time it shrinks the spacial resolution. Bottelneck layer is the deeperst part of network and operates on the most abstract data. It  has the resolution of 16x16x512 and the double convolution 3x3 was used to get 1024 pieces. It connect the information from higher and lower level.  Decoder does the same thing as encoder but instead of max pooling it uses transposed convolution for upsampling. Each block starts with upsampling (up-trans) then concatenation is used with features in necoder. Two convolution 3x3 connects the connected features. The number of filters shrinks with the growing resolution. Skip connection halps with connection of encoder features with decoder so connect information about context with localisation.
Output layer is 1x1 and transforms 64 channel of feature incon number of selected classes and generates probability map of each class in each piksel. 

In [None]:
#Constants
#Number of classes for segmentation including background class
NUM_CLASSES = 21 #20 classes + background
#target size (height, width) tp which all input images and masks will be resized - for the size consistancy in the network
TARGET_SIZE = (128, 128)
#Dataset
#Define dataset for semantic segmentation - loading and preprocessin of image and mask pairs
class SegmentationDataset(Dataset):
    """
    Constructor for dataset 
    Initializes the dataset with a dataframe containing file paths or image/mask data,
    also sets the target size for resizing and defines image transformations
    """
    def __init__(self, df, target_size=(128, 128)):
        #store the image data from dataframe
        self.images = df["img"].values
        #store the mask data from dataframe
        self.masks = df["seg"].values
        #store the target size for resizing
        self.target_size = target_size
        #Define image transformations: ToTensor converts the image to PyTorch Tensor
        # It also scales pixel values from [0,255] to [0,1].
        # Normalize the tensor with given mean and standard deviation which helps standarizing the input data distribution 
        self.img_transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                               std=[0.229, 0.224, 0.225]),
        ])
        
    #return the total number of samples in the dataset
    def __len__(self):
        return len(self.images)
    #retrive the single sample at the given index
    def __getitem__(self, idx):
        #Load the image and mask data at the specified index and convert it to uint8 data type
        img = self.images[idx].astype(np.uint8)
        mask = self.masks[idx].astype(np.uint8)
        #Apply the defined image transformations to the image
        img = self.img_transform(img)
        #Resize image to the target size
        img = resize(img, self.target_size)
        #Convert the mask numpy array to a PyTorch Tensor amd add a channel dimension using unsqueeze
        #Convert to float initially, as resize expects float tensors
        mask = torch.from_numpy(mask).unsqueeze(0).float()
        #Resize the mask tensor to the target size
        #Using InterpolationMode.NEAREST is crucial for masks to preserve discrete class labels
        mask = resize(mask, self.target_size, interpolation=transforms.InterpolationMode.NEAREST)
        #Remove the added channel dimension using squeeze
        mask = mask.squeeze(0).long()

        #Return the processed image and mask tensors
        return img, mask

In [None]:
def split_dataframe(df, val_split=0.2, random_state=42):
    """
    Split a dataframe into training and validation sets for evaluation
    of the model performance on unseen data during training process

    Use train_test_split from scikit-learn to perform the split.
    df => The input dataframe
    test_size => The proportion of the dataset to include in the validation split
    random_state => Ensure reproducibility of the split
    shuffle => Shuffle the data before splitting, important for preventing ordered biases
    """
    train_df, val_df = train_test_split(df,test_size=val_split,
        random_state=random_state, shuffle=True)
    return train_df.reset_index(drop=True), val_df.reset_index(drop=True)

#Split the main training dataframe into training and validation sets which provides data for training and evaluating the model during the training process
train_df, val_df = split_dataframe(train_df)

"""
    Create dataset and dataloader for training and validation data
    DataLoader provides an iterable over the dataset, handling batching, shuffling, and multiprocessing
    batch_size => The number of samples per batch.
    shuffle => Shuffle the data at each epoch
    num_workers => Number of subprocesses to use for data loading(0 means the main process)
"""
train_dataset = SegmentationDataset(train_df, target_size=TARGET_SIZE)
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, num_workers=0)

val_dataset = SegmentationDataset(val_df, target_size=TARGET_SIZE)
val_loader = DataLoader(val_dataset, batch_size=8, shuffle=False, num_workers=0)


In [None]:
class UNet(nn.Module):
    """ 
        Define of UNet model
        It consists of a contracting path (encoder) to capture context and an expanding path (decoder)
        to enable precise localization, with skip connections between the encoder and decoder

    """
    def __init__(self, num_classes):
        """
            Args: num_classes(init): number of ourput classes with background
        """
        super().__init__() #construct the parent class nn.Module
        self.num_classes = num_classes #store the number of classes
        
        """
            Contracting Path - Encoder
            The network downsamples the input image and extracts features. 
            Each downsampling block consists of convolutional layers and a pooling layer and
            the number of channels increases with depth to capture more complex features
        """
        # First double convolution block: Input channels = 3 (for RGB images), Output channels = 64
        self.down_conv1 = self.double_conv(3, 64)
        # Second double convolution block: Input channels = 64, Output channels = 128
        self.down_conv2 = self.double_conv(64, 128)
        # Third double convolution block: Input channels = 128, Output channels = 256
        self.down_conv3 = self.double_conv(128, 256)
        # Fourth double convolution block: Input channels = 256, Output channels = 512
        self.down_conv4 = self.double_conv(256, 512)
        # Max pooling layer for downsampling => kernel size and stride of 2 reduce the spatial dimensions by half
        self.maxpool = nn.MaxPool2d(2)
        
        # Bottleneck - he layer with the lowest spatial resolution and highest number of channels connecting encoder and decoder
        self.bottleneck = self.double_conv(512, 1024)
        
        """
            Expanding Path - Decoder
            The network upsamples here the feature maps and reconstructs the segmentation mask.
            It uses transposed convolutions (or upsampling followed by convolution) and skip connections.
            The number of channels decreases with depth.
        """
        # First transposed convolution for upsampling from the bottleneck
        # Input channels = 1024, Output channels = 512. Kernel size and stride of 2 double the spatial dimensions.
        self.up_trans1 = nn.ConvTranspose2d(1024, 512, kernel_size=2, stride=2)
        # First up-convolution block after the skip connection
        # Input channels = 1024 (512 from transposed conv + 512 from skip connection), Output channels = 512
        self.up_conv1 = self.double_conv(1024, 512)
        # Second transposed convolution for upsampling
        # Input channels = 512, Output channels = 256

        self.up_trans2 = nn.ConvTranspose2d(512, 256, kernel_size=2, stride=2)
        # Second up-convolution block after the skip connection
        # Input channels = 512 (256 from transposed conv + 256 from skip connection), Output channels = 256
        self.up_conv2 = self.double_conv(512, 256)

        # Third transposed convolution for upsampling
        # Input channels = 256, Output channels = 128
        self.up_trans3 = nn.ConvTranspose2d(256, 128, kernel_size=2, stride=2)
        # Third up-convolution block after the skip connection
        # Input channels = 256 (128 from transposed conv + 128 from skip connection), Output channels = 128
        self.up_conv3 = self.double_conv(256, 128)

        # Fourth transposed convolution for upsampling
        # Input channels = 128, Output channels = 64
        self.up_trans4 = nn.ConvTranspose2d(128, 64, kernel_size=2, stride=2)
        # Fourth up-convolution block after the skip connection
        # Input channels = 128 (64 from transposed conv + 64 from skip connection), Output channels = 64
        self.up_conv4 = self.double_conv(128, 64)
        
        # Final output layer
        # A 1x1 convolution to map the final feature maps to the number of classes
        # Input channels = 64, Output channels = num_classes
        self.out_conv = nn.Conv2d(64, num_classes, kernel_size=1)
    
    #Double convolutional block function  Consists of two convolutional layers, each followed by batch normalization and ReLU activation
    def double_conv(self, in_channels, out_channels):
        """
            Double convolution block: Conv -> BatchNorm -> ReLU -> Conv -> BatchNorm -> ReLU
        """
        return nn.Sequential(
            # First convolutional layer. Kernel size 3x3, padding 1 to maintain spatial dimensions
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            # Batch normalization layer to normalize the activations, improving training stability
            nn.BatchNorm2d(out_channels),
            # ReLU activation function for non-linearity. inplace=True saves memory
            nn.ReLU(inplace=True),
            # Second convolutional layer
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            # Second batch normalization layer
            nn.BatchNorm2d(out_channels),
            # Second ReLU activation function
            nn.ReLU(inplace=True)
        )
    
    def forward(self, x):
        """
            Defines the forward pass of the U-Net model
            Args:
                x: The input tensor (image batch)
        """
        # Forward pass through Encoder
        # Apply the first double convolution block. Store the output (x1) for the skip connection
        x1 = self.down_conv1(x)
        # Apply max pooling to reduce spatial dimensions
        x2 = self.maxpool(x1)
        
        # Apply the second double convolution block. Store the output (x3) for the skip connection
        x3 = self.down_conv2(x2) 
        # Apply max pooling
        x4 = self.maxpool(x3)
        
        # Apply the third double convolution block. Store the output (x5) for the skip connection
        x5 = self.down_conv3(x4)
        # Apply max pooling
        x6 = self.maxpool(x5)
        
        # Apply the fourth double convolution block. Store the output (x7) for the skip connection
        x7 = self.down_conv4(x6)
        # Apply max pooling => the input to the bottleneck
        x8 = self.maxpool(x7)
        
        # Bottleneck - Apply the bottleneck double convolution block
        x9 = self.bottleneck(x8)
        
        # Forward pass through the Decoder 
        # Apply the first transposed convolution to upsample from the bottleneck
        x = self.up_trans1(x9)
        # Concatenate the upsampled feature map with the corresponding feature map from the encoder (x7)
        # This is the skip connection, providing high-resolution features to the decoder.
        # dim=1 means concatenating along the channel dimension
        x = torch.cat([x, x7], dim=1)  # Skip connection
        # Apply the first up-convolution block
        x = self.up_conv1(x)
        
        # Apply the second transposed convolution
        x = self.up_trans2(x)
        # Concatenate with the feature map from the encoder (x5)
        x = torch.cat([x, x5], dim=1)  # Skip connection
        # Apply the second up-convolution block
        x = self.up_conv2(x)
        
        # Apply the third transposed convolution
        x = self.up_trans3(x)
        # Concatenate with the feature map from the encoder (x3)
        x = torch.cat([x, x3], dim=1)  # Skip connection
        # Apply the third up-convolution block
        x = self.up_conv3(x)
        
        # Apply the fourth transposed convolution
        x = self.up_trans4(x)
        # Concatenate with the feature map from the encoder (x1)
        x = torch.cat([x, x1], dim=1)  # Skip connection
        # Apply the fourth up-convolution block
        x = self.up_conv4(x)
        
        # Final output => Apply the 1x1 convolution to produce the final segmentation map
        out = self.out_conv(x)
        #Return the output tensor 
        return out

#Set device for training (GPU if available otherwise use CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
#Initialize U-Net model and move it to the selected device
model = UNet(NUM_CLASSES).to(device)

In [None]:
class DiceLoss(nn.Module):
    """
    Used as loss function especially when there is class imbalance
    It measures the similarity between the predicted segmentation and the ground truth mask
    Args:
        smooth => A small value added to the numerator and denominator to prevent division by zero
        ignore_index => Class index to ignore in the loss calculation for example invalid regions
    """
    def __init__(self, smooth=1, ignore_index=255):
        super(DiceLoss, self).__init__()

        self.smooth = smooth # Store the smoothing value
        self.ignore_index = ignore_index # Store the index to ignore

    """
        Forward pass for the Dice Loss calculation
        Args:
            pred => The predicted segmentation map
            target=> The ground truth segmentation mask
    """
    def forward(self, pred, target):
        # Apply softmax to the predicted logits to get probabilities for each class
        # dim=1 means applying softmax across the channel dimension
        pred = torch.softmax(pred, dim=1)
        # Get the number of classes from the prediction tensor
        num_classes = pred.shape[1]
        
        # Create a mask to exclude pixels with the ignore_index from the loss calculation
        mask = (target != self.ignore_index).float()
        #Apply the mask to the target, convert back to long as target should have class indices
        target = target * mask.long()
        
        # Convert the target mask to one-hot encoding
        #Create a binary tensor where for each pixel, only the channel corresponding to the
        # ground truth class is 1, and others are 0
        target_onehot = torch.nn.functional.one_hot(target, num_classes=num_classes).permute(0,3,1,2)
        
        # Calculate the intersection between the predicted probabilities and the one-hot target
        # Sum across the spatial dimensions (height and width) to get the intersection for each class in each batch
        intersection = (pred * target_onehot).sum(dim=(2,3))
        # Calculate the union of the predicted probabilities and the one-hot target and sum across the spatial dimensions
        union = pred.sum(dim=(2,3)) + target_onehot.sum(dim=(2,3))
        # Calculate the Dice coefficient for each class in each batch
        # Add 'smooth' to numerator and denominator to avoid division by zero
        dice = (2. * intersection + self.smooth) / (union + self.smooth)

        # Return the mean Dice loss (1 - Dice coefficient) averaged across all classes and batches.
        return 1 - dice.mean()

""""
Define a combined loss function that is a weighted sum of Cross-Entropy Loss and Dice Loss.
Using a combination of loss functions can often lead to better performance, especially
for segmentation tasks with class imbalance. Cross-Entropy focuses on individual pixel classification,
while Dice Loss focuses on the overall overlap of segmentation regions
"""
class CombinedLoss(nn.Module):
    """
    Args:
        weight => Class weights for Cross-Entropy Loss to handle class imbalance
        alpha => Weighting factor for the Cross-Entropy Loss (1 - alpha is the weight for Dice Loss)
    """
    def __init__(self, weight=None, alpha=0.5):
        super().__init__()
        self.alpha = alpha
        # Initialize the Cross-Entropy Loss
        self.ce_loss = nn.CrossEntropyLoss(weight=weight)
        # Initialize the Dice Loss
        self.dice_loss = DiceLoss()
        
    def forward(self, pred, target):
        """
        Define the forward pass for the Combined Loss calculation
        pred => The predicted segmentation map 
        target => The ground truth segmentation mask
        """
        # Calculate the Cross-Entropy Loss
        ce = self.ce_loss(pred, target)
        # Calculate the Dice Loss
        dice = self.dice_loss(pred, target)
        # Return the weighted sum of the two losses
        return self.alpha * ce + (1 - self.alpha) * dice
    
"""
    Class weighting for impalanced datasets
    Class weighting assigns higher importance to less frequent classes during training,
    helping the model learn to segment them better
"""
# Calculate the count of pixels for each class in the training masks
# np.concatenate joins all mask arrays into a single array,
# np.bincount counts the occurrences of each non-negative integer value
class_counts = np.bincount(np.concatenate([m.flatten() for m in train_df["seg"]]))
# Calculate initial class weights as the inverse of class counts
class_weights = 1. / torch.tensor(class_counts, dtype=torch.float32)
# Normalize the class weights so they sum to 1, ensurING that the overall scale of the weighted loss is consistent
class_weights = class_weights / class_weights.sum()
# Initialize the combined loss function with calculated class weights and an alpha value
criterion = CombinedLoss(weight=class_weights.to(device), alpha=0.5)

"""
    Adam optimization algorithm  adapts the learning rate for each parameter
    Args:
        model.parameters()=> specifies which parameters of the model should be optimized
        lr => learning rate, controls the step size during optimization
        weight_decay => L2 regularization term, helps prevent overfitting by penalizing large weights
"""
#Selection optimizer
optimizer = optim.Adam(model.parameters(), lr=1e-4, weight_decay=1e-4)
# Choose a learning rate scheduler to adjust the learning rate during training
# 'min' =>  Monitor a metric that should be minimized (validation loss)
# patience => Number of epochs with no improvement after which the learning rate will be reduced
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=3)

In [None]:
#Vizualise train losses
# Lists to store training and validation losses for plotting
train_losses = []
val_losses = []
# Number of epochs to train the model => An epoch is one full pass through the training dataset
num_epochs = 30
for epoch in range(num_epochs):
    # Set the model to training mode
    model.train()
    # Initialize running loss for the current epoch
    running_loss = 0.0
    # Iterate over batches in the training data loader
    for imgs, masks in train_loader:
        # Move images and masks to the selected device
        imgs, masks = imgs.to(device), masks.to(device)
        
        # Zero the gradients of the model parameters
        # Gradients are accumulated by default, so this is necessary to prevent
        # gradients from previous iterations affecting the current update
        optimizer.zero_grad()
        # Perform a forward pass to get model predictions for the current batch of images
        outputs = model(imgs) 
        # Calculate the loss using the defined criterion and the predictions and ground truth masks
        loss = criterion(outputs, masks)
        # Perform backpropagation - calculate gradients of the loss with respect to the model parameters
        loss.backward()
        # Update the model parameters using the calculated gradients and the optimizer
        optimizer.step()
        
        # Accumulate the loss for the current epoch
        running_loss += loss.item()
    epoch_loss = running_loss/len(train_loader)
    train_losses.append(epoch_loss)

    # Validation phase
    # Set the model to evaluation mode.
    model.eval()
    # Initialize validation loss for the current epoch
    val_loss = 0.0
    # Disable gradient calculation during validation
    with torch.no_grad():
        # Iterate over batches in the validation data loader
        for imgs, masks in val_loader:
            # Move images and masks to the selected device
            imgs, masks = imgs.to(device), masks.to(device)
            # Perform a forward pass to get model predictions
            outputs = model(imgs)
            # Calculate the loss on the validation data
            loss = criterion(outputs, masks)
            # Accumulate the validation loss
            val_loss += loss.item()

    # Calculate the average validation loss for the epoch
    val_loss /= len(val_loader)
    val_losses.append(val_loss)
    print(f"Epoch {epoch+1}/{num_epochs} - Train Loss: {epoch_loss:.4f} - Val Loss: {val_loss:.4f}")


In [None]:
# Plotting losses
plt.figure(figsize=(10, 6))
# Plot the training loss over epochs
plt.plot(range(1, num_epochs+1), train_losses, 'b-o', label='Training Loss', linewidth=2, markersize=8)
# Plot the validation loss over epochs
plt.plot(range(1, num_epochs+1), val_losses, 'r-o', label='Validation Loss',linewidth=2, markersize=8)
plt.title('Training and Validation Loss over Epochs', fontsize=14)
plt.xlabel('Epoch', fontsize=12)
plt.ylabel('Loss', fontsize=12)
plt.xticks(range(1, num_epochs+1))
plt.grid(True, linestyle='--', alpha=0.7)
plt.show()

In [None]:
#Visualize the test results
def show_predictions(model, dataloader, num_show):
    # Set the model to evaluation mode.
    model.eval()
    imgs, masks = next(iter(dataloader))
    imgs, masks = imgs.to(device), masks.to(device)

    # Disable gradient calculation for predictions
    with torch.no_grad():
        preds = model(imgs)
        preds = torch.argmax(preds, dim=1)

    #Convert to numpy for visualization
    imgs_np = imgs.cpu().numpy()
    masks_np = masks.cpu().numpy()
    preds_np = preds.cpu().numpy()

    # Reverse normalization applied during preprocessing to display the images correctly
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    imgs_np = imgs_np.transpose(0, 2, 3, 1)
    imgs_np = imgs_np * std + mean
    imgs_np = np.clip(imgs_np, 0, 1)

    #Plot results
    num_show = min(3, len(imgs))
    _, axs = plt.subplots(num_show, 3, figsize=(15, 5*num_show))

    # Iterate through the selected number of samples
    for i in range(num_show):
        axs[i, 0].imshow(imgs_np[i])
        axs[i, 0].set_title("Input Image")
        axs[i, 0].axis('off')
        
        axs[i, 1].imshow(masks_np[i], vmin=0, vmax=NUM_CLASSES-1, cmap='jet')
        axs[i, 1].set_title("Ground Truth")
        axs[i, 1].axis('off')
        
        axs[i, 2].imshow(preds_np[i], vmin=0, vmax=NUM_CLASSES-1, cmap='jet')
        axs[i, 2].set_title("Prediction")
        axs[i, 2].axis('off')

    plt.tight_layout()
    plt.show()

show_predictions(model, val_loader, 3)


## Transfer Learning

In [None]:
import torch
from torch.utils.data import Dataset
from PIL import Image
import torchvision.transforms as transforms
import albumentations as A
import numpy as np
from torch.utils.data import DataLoader
import torchvision.models.segmentation as models
import torch.nn as nn
import matplotlib.pyplot as plt
from tqdm import tqdm
import pandas as pd
from sklearn.model_selection import train_test_split
from torch.utils.data import ConcatDataset, DataLoader
import matplotlib.colors as mcolors
import pydensecrf.densecrf as dcrf
from pydensecrf.utils import unary_from_softmax, create_pairwise_bilateral
torch.cuda.empty_cache()

In [None]:
BATCH_SIZE = 16
NUM_WORKERS = 0
EPOCH = 70
N_frozen = 3
LR = 1e-5
LR_FROZEN = 1e-4

In [None]:
def get_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    elif torch.backends.mps.is_available():
        return torch.device('mps')
    elif hasattr(torch, 'xla') and torch.xla.device_count() > 0:
        return torch.device('xla')
    else:
        return torch.device('cpu')

device = get_device()

In [None]:
# Loading the training data
train_df = pd.read_csv('data/train/train_set.csv', index_col="Id")
labels = train_df.columns
train_df["img"] = [np.load('data/train/img/train_{}.npy'.format(idx)) for idx, _ in train_df.iterrows()]
train_df["seg"] = [np.load('data/train/seg/train_{}.npy'.format(idx)) for idx, _ in train_df.iterrows()]
print("The training set contains {} examples.".format(len(train_df)))

# Show some examples
fig, axs = plt.subplots(2, 20, figsize=(10 * 20, 10 * 2))
for i, label in enumerate(labels):
    df = train_df.loc[train_df[label] == 1]
    axs[0, i].imshow(df.iloc[0]["img"], vmin=0, vmax=255)
    axs[0, i].set_title("\n".join(label for label in labels if df.iloc[0][label] == 1), fontsize=40)
    axs[0, i].axis("off")
    axs[1, i].imshow(df.iloc[0]["seg"], vmin=0, vmax=20)  # with the absolute color scale it will be clear that the arrays in the "seg" column are label maps (labels in [0, 20])
    axs[1, i].axis("off")
    
plt.show()

# The training dataframe contains for each image 20 columns with the ground truth classification labels and 20 column with the ground truth segmentation maps for each class
train_df.head(1)

In [None]:
# Loading the test data
test_df = pd.read_csv('data/test/test_set.csv', index_col="Id")
test_df["img"] = [np.load('data/test/img/test_{}.npy'.format(idx)) for idx, _ in test_df.iterrows()]
test_df["seg"] = [-1 * np.ones(img.shape[:2], dtype=np.int8) for img in test_df["img"]]
print("The test set contains {} examples.".format(len(test_df)))

# The test dataframe is similar to the training dataframe, but here the values are -1 --> your task is to fill in these as good as possible in Sect. 2 and Sect. 3; in Sect. 6 this dataframe is automatically transformed in the submission CSV!
test_df.head(1)

In [None]:
class VOC2009Dataset(Dataset):
    def __init__(self, dataframe, transform=None, target_transform=None, paired_transform=None, ignore_label=21):
        self.df = dataframe.reset_index()
        self.transform = transform
        self.target_transform = target_transform
        self.paired_transform = paired_transform
        
        self.label_columns = [col for col in self.df.columns if col not in ['img', 'seg', 'Id']]
        self.ignore_label = ignore_label
        self.classes = 22  # 20 classes + background + void

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        image = self.df.iloc[idx]['img'] 
        mask = self.df.iloc[idx]['seg']   

        image = Image.fromarray(image.astype(np.uint8))  
        mask = Image.fromarray(mask.astype(np.uint8))    

        if self.paired_transform:
            image, mask = self.paired_transform(image, mask)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            mask = self.target_transform(mask)

        return image, mask

In [None]:
paired_transform_aug = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.ShiftScaleRotate(
        shift_limit=0.05,     
        scale_limit=0.10,     
        rotate_limit=15,      
        interpolation=1,
        p=0.5
    ),

    A.Resize(256, 256, interpolation=0),
],
additional_targets={'mask': 'mask'}
)

paired_transform = A.Compose([
    A.Resize(256, 256, interpolation=1),
],
additional_targets={'mask': 'mask'}
)

image_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

mask_transform = transforms.Compose([
    transforms.Lambda(lambda x: torch.tensor(np.array(x), dtype=torch.long)),
    transforms.Lambda(lambda x: torch.where(x == 255, 21, x))
])

def apply_paired_transform(image, mask):
    image_np = np.array(image)
    mask_np = np.array(mask)
    
    augmented = paired_transform(image=image_np, mask=mask_np)
    
    image_aug = Image.fromarray(augmented['image'])
    mask_aug = Image.fromarray(augmented['mask'])
    
    return image_aug, mask_aug

def apply_paired_transform_aug(image, mask):
    image_np = np.array(image)
    mask_np = np.array(mask)
    
    augmented = paired_transform_aug(image=image_np, mask=mask_np)
    
    image_aug = Image.fromarray(augmented['image'])
    mask_aug = Image.fromarray(augmented['mask'])
    
    return image_aug, mask_aug


In [None]:
def split_dataframe(df, val_split=0.2, random_state=42):
    df = df.reset_index()
    
    train_df, val_df = train_test_split(
        df,
        test_size=val_split,
        random_state=random_state,
        shuffle=True
    )
    
    train_df = train_df.reset_index(drop=True)
    val_df = val_df.reset_index(drop=True)
    
    return train_df, val_df

train_df, val_df = split_dataframe(train_df)

train_aug_dataset = VOC2009Dataset(
    dataframe=train_df,
    transform=image_transform,
    target_transform=mask_transform,
    paired_transform=apply_paired_transform_aug
    )
# train_ori_dataset =  VOC2009Dataset(
#     dataframe=train_df,
#     transform=image_transform,
#     target_transform=mask_transform,
#     paired_transform=apply_paired_transform
#     )

# train_dataset = ConcatDataset([train_aug_dataset, train_ori_dataset])
train_dataloader = DataLoader(train_aug_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)

val_dataset = VOC2009Dataset(
    dataframe=val_df,
    transform=image_transform,
    target_transform=mask_transform,
    paired_transform=apply_paired_transform
)
val_dataloader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

In [None]:
class EarlyStopping:
    def __init__(self, patience=5, delta=1e-4, verbose=False):
        self.patience = patience
        self.delta = delta # Minimum improvement
        self.verbose = verbose
        self.best_score = None
        self.early_stop = False
        self.counter = 0
        self.best_loss = float('inf')

    def __call__(self, val_loss, model):
        score = -val_loss  # Convert to negative if minimizing loss

        if self.best_score is None:
            self.best_score = score
            self.best_loss = val_loss
            self.save_checkpoint(val_loss, model)
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.verbose:
                print(f'EarlyStopping counter: {self.counter}/{self.patience}')
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        if self.verbose:
            print(f'Validation loss decreased ({self.best_loss:.4f} --> {val_loss:.4f}). Saving model...')
        torch.save(model.state_dict(), 'checkpoint.pt')
        self.best_loss = val_loss

In [None]:
class DiceLoss(nn.Module):
    def __init__(self, smooth=1, ignore_index=21, from_logits=True):
        super(DiceLoss, self).__init__()
        self.smooth = smooth
        self.ignore_index = ignore_index
        self.from_logits = from_logits

    def forward(self, pred, target):
        if self.from_logits:
            pred = torch.softmax(pred, dim=1) 
            
        num_classes = pred.size(1) + 1

        mask = (target != self.ignore_index).float()
        
        target = torch.nn.functional.one_hot(target.long(), num_classes=num_classes)  # [batch_size, height, width, num_classes]
        target = target.permute(0, 3, 1, 2).float()  # [batch_size, num_classes, height, width]
        
        # Apply mask to target
        mask_target = mask.unsqueeze(1).expand_as(target)  # [batch_size, 22, height, width]
        target = target * mask_target  # Zero out ignored pixels
        target = target[:, :-1] # [batch_size, 21, height, width]
        
        # Apply mask to predictions
        mask_pred = mask.unsqueeze(1).expand_as(pred) # [batch_size, 21, height, width]
        pred = pred * mask_pred 

        # Flatten predictions and targets for each class
        pred = pred.contiguous().view(-1, pred.size(1))  # [batch_size * height * width, num_classes]
        target = target.contiguous().view(-1, target.size(1))  # [batch_size * height * width, num_classes]
        
        # Compute Dice coefficient for each class
        intersection = (pred * target).sum(dim=0)  # Sum over pixels for each class
        union = pred.sum(dim=0) + target.sum(dim=0)  # Sum over pixels for each class ans substract the intersection
        dice = (2. * intersection + self.smooth) / (union + self.smooth + 1e-8)  # Dice score per class
        
        # Return 1 - mean Dice score as loss
        return 1 - dice.mean()

In [None]:
class WeightedCEDiceLoss(nn.Module):
    def __init__(self, smooth=1, ignore_index=21, alpha=0.5):
        super(WeightedCEDiceLoss, self).__init__()
        self.alpha = alpha
        self.diceloss_fn = DiceLoss(smooth, ignore_index)
        self.wceloss_fn = nn.CrossEntropyLoss(ignore_index=ignore_index)
        
    def forward(self, pred, target):
        diceloss = self.diceloss_fn.forward(pred, target)
        wceloss = self.wceloss_fn.forward(pred, target)
        return self.alpha * diceloss + (1 - self.alpha) * wceloss

In [None]:
model = models.deeplabv3_resnet50(pretrained=True, num_classes=21) 
criterion = WeightedCEDiceLoss(alpha=0.8)

In [None]:
for param in model.backbone.parameters():
    param.requires_grad = False
for m in model.backbone.modules():
    if isinstance(m, nn.BatchNorm2d):
        m.eval()

head_params = filter(lambda p: p.requires_grad, model.parameters())
optimizer = torch.optim.Adam([
    {'params': head_params, 'lr': LR_FROZEN},
], lr=LR_FROZEN)

scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.8)

In [None]:
train_losses = []
val_losses = []

In [None]:
num_epochs = EPOCH
model = model.to(device)
early_stopping = EarlyStopping(patience=5, verbose=True)

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    with tqdm(train_dataloader, desc=f"Training epoch [{epoch+1}/{num_epochs}]", unit="batch") as pbar:
        for images, masks in pbar:
            images = images.to(device)
            masks = masks.to(device)
            
            # Forward pass
            logits = model(images)['out']
            loss = criterion(logits, masks)
            
            # Backward pass
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
        
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(train_dataloader):.4f}")
        train_losses.append(running_loss/len(train_dataloader))
    
    # Validation phase
    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        with tqdm(val_dataloader, desc=f"Validating", unit="batch") as pbar:
            for images, masks in pbar:
                images = images.to(device)
                masks = masks.to(device)
                logits = model(images)['out']
                loss = criterion(logits, masks)
                val_loss += loss.item()
    
    val_loss = val_loss / len(val_dataloader)
    print(f"Epoch [{epoch+1}/{num_epochs}], Val Loss: {val_loss:.4f}")
    val_losses.append(val_loss)

    early_stopping(val_loss, model)
    if early_stopping.early_stop:
        print("Early stopping triggered.")
        break
    
    if epoch == N_frozen:
        for param in model.backbone.parameters():
            param.requires_grad = True
        for m in model.backbone.modules():
            if isinstance(m, nn.BatchNorm2d):
                m.train()

        optimizer = torch.optim.Adam([
            {'params': model.backbone.parameters(), 'lr': LR},   
            {'params': model.classifier.parameters(), 'lr': LR},
        ])
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.8)

# Load the best model
model.load_state_dict(torch.load('checkpoint.pt'))

In [None]:
def crf_refine_logits(logits, img, n_iters: int = 5):
    probs = torch.softmax(logits, dim=0).cpu().numpy()
    C, H, W = probs.shape
    U = unary_from_softmax(probs)
    d = dcrf.DenseCRF2D(W, H, C)
    d.setUnaryEnergy(U)
    
    mean = torch.tensor([0.485, 0.456, 0.406], device=img.device)[:, None, None]
    std  = torch.tensor([0.229, 0.224, 0.225], device=img.device)[:, None, None]

    img_unnorm = img * std + mean  

    img_uint8 = (img_unnorm.clamp(0,1) * 255).byte()  
    img_np = img_uint8.permute(1, 2, 0).cpu().numpy()  

    feats = create_pairwise_bilateral(
        sdims=(20, 20), schan=(13,13,13),
        img=img_np, chdim=2
    )
    d.addPairwiseEnergy(feats, compat=21)

    Q = d.inference(n_iters)                     

    refined_probs = np.array(Q).reshape((C, H, W))
    return refined_probs

In [None]:
# def binary_masks_from_logits(logits, thresholds, img, refine_logits=True):
#     if refine_logits:
#         probs = crf_refine_logits(logits, img)
#     else:
#         probs = torch.softmax(logits, dim=0).cpu().numpy() 

#     C, H, W = probs.shape

#     thr = np.array([thresholds.get(c, 0.5) for c in range(C)], dtype=np.float32)
#     thr = thr[:, None, None]                              

#     passed = probs > thr                                 
#     masked_probs = probs * passed.astype(np.float32)      
#     label_map = masked_probs.argmax(axis=0)             
#     masks = np.zeros_like(passed, dtype=np.uint8)
#     for c in range(C):
#         masks[c] = (label_map == c).astype(np.uint8)

#     return masks

In [None]:
# def post_process(logits_batch, imgs_batch, thresholds):
#     processed_predicts_batch = []
#     for (logits, img) in zip(logits_batch, imgs_batch):
#         processed_predicts = binary_masks_from_logits(logits, thresholds, img)
#         processed_predicts_batch.append(processed_predicts)

#     processed_predicts_batch = torch.tensor(processed_predicts_batch, dtype=torch.float32)
#     return processed_predicts_batch

In [None]:
def post_process(logits_batch, imgs_batch):
    processed_logits_batch = []
    for (logits, img) in zip(logits_batch, imgs_batch):
        processed_logits = crf_refine_logits(logits, img)
        processed_logits_batch.append(processed_logits)

    processed_logits_batch = torch.tensor(processed_logits_batch, dtype=torch.float32)
    return processed_logits_batch

In [None]:
dice_loss = DiceLoss(smooth=1, from_logits=False)
    
model.eval()
total_loss = 0.0
num_batches = 0

with torch.no_grad():
    with tqdm(val_dataloader, desc=f"Validating", unit="batch") as pbar:
        for images, masks in pbar:
            images = images.to(device)
            masks = masks.to(device)
            logits = model(images)['out']
            # processed_predicts = post_process(logits, images, thresholds)
            # processed_predicts = processed_predicts.to(device)
            logits = post_process(logits, images)
            logits = logits.to(device)
            loss = dice_loss(logits, masks)
            total_loss += loss.item()
            num_batches += 1
    
avg_loss = total_loss / num_batches
print(f'Final average DICE score: {1 - avg_loss}')

In [None]:
def visualize_segmentation(image, mask, pred):
    # Convert tensors to NumPy
    image = image.permute(1, 2, 0).cpu().numpy()  # Convert to HWC
    mask = mask.cpu().numpy()
    pred = pred.cpu().numpy()
    
    # Ensure mask and pred are 2D (H, W)
    if mask.ndim > 2:
        mask = mask.squeeze()
    if pred.ndim > 2:
        pred = pred.squeeze()
    
    # Initialize RGB images for masks
    height, width = mask.shape
    colored_mask = np.zeros((height, width, 3), dtype=np.uint8)
    colored_pred = np.zeros((height, width, 3), dtype=np.uint8)
    
    # Get the viridis colormap
    cmap = plt.get_cmap('viridis')
    norm = mcolors.Normalize(vmin=0, vmax=20)  # Scale for labels [0, 20]
    
    # Map class indices to colors for ground truth and prediction
    for class_idx in np.unique(np.concatenate([mask, pred])):
        if class_idx <= 20:
            # Convert normalized colormap value to RGB (0-255)
            color = cmap(norm(class_idx))[:3]  # Get RGB (ignore alpha)
            color = (np.array(color) * 255).astype(np.uint8)
            colored_mask[mask == class_idx] = color
            colored_pred[pred == class_idx] = color
        elif class_idx == 255:
            # Void label mapped to white, consistent with original visualize_segmentation
            colored_mask[mask == class_idx] = (255, 255, 255)
            colored_pred[pred == class_idx] = (255, 255, 255)
        else:
            print(f"Warning: Class index {class_idx} not in expected range [0, 20] or 255. Using black.")
            colored_mask[mask == class_idx] = (0, 0, 0)
            colored_pred[pred == class_idx] = (0, 0, 0)
    
    # Visualize
    plt.figure(figsize=(15, 5))
    
    plt.subplot(1, 3, 1)
    plt.title("Input Image")
    plt.imshow(image)  # May need denormalization if normalized
    plt.axis('off')
    
    plt.subplot(1, 3, 2)
    plt.title("Ground Truth")
    plt.imshow(colored_mask)
    plt.axis('off')
    
    plt.subplot(1, 3, 3)
    plt.title("Prediction")
    plt.imshow(colored_pred)
    plt.axis('off')
    
    plt.show()
    
    # Return colored masks as PIL Images
    return Image.fromarray(colored_mask), Image.fromarray(colored_pred)

# Example visualization
images, masks = next(iter(val_dataloader))
images = images.to(device)
masks = masks.to(device)
with torch.no_grad():
    outputs = model(images)['out']
    preds = post_process(logits, images)
    preds = torch.argmax(outputs, dim=1)
    preds = torch.where(preds == 21, 255, preds)

visualize_segmentation(images[1], masks[1], preds[1])

In [None]:
def visualize_loss(train_loss, val_loss, save_path=None):
    epochs = list(range(1, len(train_loss) + 1))

    plt.figure(figsize=(10, 6))
    plt.plot(epochs, train_loss, label='Training Loss', marker='o', color='blue')
    plt.plot(epochs, val_loss, label='Validation Loss', marker='s', color='orange')
    
    plt.xlabel('Epoch')
    plt.ylabel('Loss (DICE)')
    plt.title('Training and Validation Loss Over Epochs')
    plt.legend()
    plt.grid(True)
    
    if save_path:
        plt.savefig(save_path)
        print(f"Plot saved to {save_path}")

    plt.show()
    plt.close()

visualize_loss(train_losses, val_losses, 'loss.png')

## Submit to competition
You don't need to edit this section. Just use it at the right position in the notebook. See the definition of this function in Sect. 1.3 for more details.

In [None]:
generate_submission(test_df)

# 4. Adversarial attack
For this part, your goal is to fool your classification and/or segmentation CNN, using an *adversarial attack*. More specifically, the goal is build a CNN to perturb test images in a way that (i) they look unperturbed to humans; but (ii) the CNN classifies/segments these images in line with the perturbations.

# 5. Discussion
Finally, take some time to reflect on what you have learned during this assignment. Reflect and produce an overall discussion with links to the lectures and "real world" computer vision.