In [None]:
import os
import math 
import numpy as np
import numpy.random as random
import pandas as pd
import matplotlib.pyplot as plt
from google.colab import drive
import time
import torch
import torch.nn as nn
import torchvision
from torch.utils.data.dataloader import Dataset
from torch.utils.data import DataLoader, SubsetRandomSampler, ConcatDataset
from torch.cuda.amp import autocast, GradScaler
import albumentations as A
from albumentations.pytorch import ToTensorV2
from sklearn.model_selection import KFold
from PIL import Image
import gc

# Try to get torchinfo, install it if it doesn't work
try:
    from torchinfo import summary
except:
    print("[INFO] Couldn't find torchinfo... installing it.")
    !pip install -q torchinfo
    from torchinfo import summary

# Try to get torchmetrics, install it if it doesn't work
try:
  import torchmetrics
except:
  print("[INFO] Couldn't find torchmetrics... installing it.")
  !pip install -q torchmetrics 
  import torchmetrics


class CreateDataset():
    '''
    Takes a data directory (the location fo the training or testing folders) as input, as well as the 
    number of augments and the desired augmentation transform. Number of augments and the augmentation
    are optional, set to 0 and None respectively.
    Returns a dataset of PIL images.
    '''
    def __init__(self, data_dir, n_of_augments=0, aug_transform=None):
        self.data_dir = data_dir
        self.dataset = self.pair_image_with_mask()
        self.n_of_augments = n_of_augments
        self.aug_transform = aug_transform
        self.augment_img_and_msk()

    def pair_image_with_mask(self):
        filenames = os.listdir(self.data_dir)
        filenames.sort()
        self.dataset = []
        self.size = len(filenames) // 2

        for idx in range(self.size):
            img_path_original = os.path.join(self.data_dir, filenames[idx*2])
            labels_path_original = os.path.join(self.data_dir, filenames[idx*2 +1])
            img_original = Image.open(img_path_original)
            label_original  = Image.open(labels_path_original)
            self.dataset.append((img_original, label_original))

        return self.dataset

    def augment_img_and_msk(self):

        for i in range(self.n_of_augments):
            base_pair = self.dataset[random.randint(0, self.size)] # Only augment original images to avoid heavy distortion
            image = np.array(base_pair[0])
            mask = np.array(base_pair[1])
            aug_pair = self.aug_transform(image=image, mask=mask)
            aug_image = aug_pair['image']
            aug_mask = aug_pair['mask']
        
            # Convert back into PIL image
            aug_image = Image.fromarray(aug_image.astype('uint8'))
            aug_mask = Image.fromarray(aug_mask.astype('uint8'))
            self.dataset.append((aug_image, aug_mask))

        return self.dataset 

class MyDataset2(torch.utils.data.Dataset):
    '''
    Required class for formatting a dataset and allowing the pytorch DataLoader to retreive images
    in the correct form. Takes as input a dataset of PIL images and a transform to allow the images 
    to be of the right format to be used in subsequent models. Pairs the images with their masks and
    turns them into tensors with shape [3, height, width].
    '''
    def __init__(self, dataset, transform=None):
        self.dataset = dataset
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        img = self.dataset[idx][0]
        mask = self.dataset[idx][1]
        
        if self.transform:
            image_np = np.array(img)
            mask_np = np.array(mask)
            aug = self.transform(image=image_np, mask=mask_np)
            img=aug['image']
            mask = aug['mask']
            mask = np.transpose(mask, (2, 0, 1))

        return img, mask

# Augmentation transform using Albumentations import
aug_transform = A.Compose([
                A.RandomCrop(width=960, height=720),
                A.HorizontalFlip(p=0.8),
                A.RandomBrightnessContrast(p=0.8),
                A.RandomRotate90(p=0.8),
                A.OneOf([
                  A.CoarseDropout(max_holes=50,min_height=7,p=0.5),
                  A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, p=0.5),
                  A.GridDistortion(num_steps=4,p=0.5),
                  A.HueSaturationValue(p=0.5)],
                  p=1)
                ])

# Necessary transform for MyDataSet2, given the models used take images of input size 520x520
transform = A.Compose([
                  A.Resize(520, 520, p=1),
                  ToTensorV2()
                  ])

# Define device to be cuda if available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

# Define file_name 
file_name = '/content/Cam101'

# If file Cam101 is not saved locally, mount user's Google Drive and look for it there
if not os.path.exists(file_name):
      drive.mount('/content/drive')
      file_name = '/content/drive/MyDrive/Cam101'
    
# Define paths for the train and test datasets
train_path= os.path.join(file_name, 'train')
test_path= os.path.join(file_name, 'test')

# Initialise the train and test dataset with the desired number of augmented images
train_dataset = CreateDataset(train_path, 100, aug_transform)
train_set = train_dataset.dataset
print(f"Train size: {len(train_dataset.dataset)}")

test_dataset = CreateDataset(test_path)
test_set = test_dataset.dataset
print(f"Test size: {len(test_dataset.dataset)}")  

# Initialise train and test dataloaders in order to pass into PyTorch's DataLoader wrapper
trainLoader = MyDataset2(train_set, transform)
testLoader = MyDataset2(test_set, transform)

[INFO] Couldn't find torchinfo... installing it.
[INFO] Couldn't find torchmetrics... installing it.
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m519.2/519.2 kB[0m [31m8.9 MB/s[0m eta [36m0:00:00[0m
[?25hMounted at /content/drive
Train size: 190
Test size: 11


In [None]:
# Use pandas to read the RGB-Class Name text file and create a classes dictionary
classes_path = os.path.join(file_name, "label_colors.txt")
df = pd.read_csv(classes_path, delim_whitespace=True, header=None)
classes_dic = {}

for idx, line in enumerate(df.values[:]):
    classes_dic[idx] = tuple(int(rgb) for rgb in line[:-1])

# Get the number of class names (one output unit for each class)
output_shape = len(classes_dic.keys())

#Create class tensor of shape [output_shape, 3] for encode_segmap()
class_tensor = torch.tensor(list(classes_dic.values()), dtype=torch.uint8).to(device)

In [None]:
def encode_segmap(labels, class_tensor=class_tensor):
    # Create a tensor of size [batch_size, height, width, 3]
    labels = labels.permute(0, 2, 3, 1).to(device)
    # Create a tensor of size [batch_size, height, width, num_classes] using broadcasting
    one_hot = torch.all(labels[:, :, :, None, :] == class_tensor, dim=-1)
    # Convert bool tensor to int tensor of size [batch_size, height, width, num_classes]
    one_hot = one_hot.long()
    # Convert tensor of size [batch_size, height, width, num_classes] to [batch_size, num_classes, height, width]
    one_hot = one_hot.permute(0, 3, 1, 2)
    return one_hot

def one_hot_to_rgb(one_hot_tensor, classes_dic=classes_dic):
    # Convert one-hot tensor to index array
    one_hot_array = one_hot_tensor.cpu().numpy()
    index_array = np.argmax(one_hot_array, axis=0)
    
    # Map index to RGB PIL image using classes_dic
    color_array = np.take(np.array(list(classes_dic.values())), index_array, axis=0)
    rgb_array = color_array.astype(np.uint8)
    rgb_image = Image.fromarray(rgb_array, mode='RGB')
    # rgb_image.to(device)
    
    return rgb_image

In [None]:
#Garbage collect and empty cache
gc.collect()
torch.cuda.empty_cache()

weight_dataset = CreateDataset(train_path) # Use the whole training dataset with no augmented images
weight_set = weight_dataset.dataset
weightLoader = MyDataset2(weight_set, transform)

def weights_map(dataloader):
    '''
    Takes the (train) dataloader from which we select all masks and calculate the number of pixel 
    instances of each class.
    '''
    num_samples = len(weightLoader)
    weight_dataloader = DataLoader(dataloader, batch_size=num_samples, shuffle=True)
    images, masks = next(iter(weight_dataloader))
    # Turn all masks into a one_hot tensor of shape [num_samples, 32, height, width]
    one_hot_masks = encode_segmap(masks)
    # Calculate an index tensor of shape [num_samples, height width] where each pixel has an integer value corresponding to the class index it belongs to
    index_tensor = torch.argmax(one_hot_masks, dim=1)
    # Create a tensor of length 32 that counts how many pixels belong to each class across the whole index tensor
    classes_count = torch.zeros(output_shape)
    for i in range(output_shape):
        classes_count[i] = (index_tensor == i).sum().item()
    
    # Calculate the class weights according to the inverse of the frequencies
    class_weights = torch.where(classes_count > 0,  (100*max(classes_count)/sum(classes_count).item()) - 50*(classes_count / sum(classes_count).item()), (100*max(classes_count)/sum(classes_count).item()))

    return class_weights.to(device)

class_weights = weights_map(weightLoader)

In [None]:
class Model():

    def __init__(self, model_name, optim, loss_type, output_shape=32, verbose=False):
        # Initialise class parameters
        self.model_name = model_name
        self.optimiser_type = optim
        self.loss_type = loss_type

        self.model_dic = {
        'DeepLabV3': [torchvision.models.segmentation.DeepLabV3_ResNet101_Weights.DEFAULT,
                        torchvision.models.segmentation.deeplabv3_resnet101],
    
        'FCN' :      [torchvision.models.segmentation.FCN_ResNet101_Weights.DEFAULT,
                        torchvision.models.segmentation.fcn_resnet101],
    
        'LRASPP' :   [torchvision.models.segmentation.LRASPP_MobileNet_V3_Large_Weights.DEFAULT, 
                        torchvision.models.segmentation.lraspp_mobilenet_v3_large],

        'IoU' :       torchmetrics.classification.MultilabelJaccardIndex(num_labels=output_shape).to(device),

        'PixAcc':     torchmetrics.classification.MultilabelAccuracy(num_labels=output_shape).to(device),
        }

        # Call the createModel function to return the specified model, and initialise accuracy metrics
        self.createModel(output_shape, verbose)
        self.iou_acc = torchmetrics.classification.MultilabelJaccardIndex(num_labels=output_shape).to(device)
        self.pix_acc = torchmetrics.classification.MultilabelAccuracy(num_labels=output_shape).to(device)
        
    def createModel(self, output_shape, verbose):
        """Pretrained semantic segmentation model with custom head
        Args:
            output_shape (int, optional): The number of output channels
            in your dataset masks. Defaults to 32.

            verbose (bool, optional): Print out the model architecture. 
            Default is False.

        Returns:
            model: Returns the desired model with either the ResNet101 (for DeepLabV3 and FCN), 
            or MobileNet (for LRASPP) backbone.
        """
        
        self.weights = self.model_dic[self.model_name][0]
        self.model = self.model_dic[self.model_name][1](weights=self.weights)
        self.auto_transform = self.weights.transforms()
        
        # Freeze pretrained "backbone" layers
        if self.model_name == 'LRASPP':
            for name, param in self.model.named_parameters():
                if "backbone" in name:
                    if "14" in name or "15" in name or "16" in name:
                        pass     
                    else:
                        param.requires_grad = False

        if self.model_name == 'DeepLabV3' or self.model_name == 'FCN':
            for name, param in self.model.named_parameters():
                if "backbone" in name:
                    if "layer3" in name or "layer4" in name:
                        pass     
                    else:
                        param.requires_grad = False        


        # Replace the last classifier layer with a Conv2d layer with the correct output shape
        # If the model has an auxiliary classifier, replace the last classifier layer with 32 output channels
        
        if self.model_name == 'DeepLabV3':
            self.model.classifier[-1] = nn.Conv2d(256, output_shape, kernel_size=1, stride=1)
            try:
                self.model.aux_classifier[-1] = nn.Conv2d(256, output_shape, kernel_size=1, stride=1)
                self.model.aux_classifier.add_module('softmax', nn.Softmax(dim=1))
            except:
                pass
        
        if self.model_name == 'FCN':
            self.model.classifier[-1] = nn.Conv2d(512, output_shape, kernel_size=1, stride=1)
            try:
                self.model.aux_classifier[-1] = nn.Conv2d(256, output_shape, kernel_size=1, stride=1)
                self.model.aux_classifier.add_module('softmax', nn.Softmax(dim=1))
            except:
                pass  

        if self.model_name == 'LRASPP':
            self.model.classifier.high_classifier = nn.Conv2d(128, output_shape, kernel_size=1, stride=1)
            self.model.classifier.low_classifier = nn.Conv2d(40, output_shape, kernel_size=1, stride=1)
            try:
                self.model.aux_classifier[-1] = nn.Conv2d(256, output_shape, kernel_size=1, stride=1)
                self.model.aux_classifier.add_module('softmax', nn.Softmax(dim=1))
            except:
                pass

        #Create optimiser and learning rate scheduler
        params = [p for p in self.model.parameters() if p.requires_grad]

        if self.model_name == 'DeepLabV3' or self.model_name == 'FCN':
            if self.optimiser_type == 'SGD':
                self.optimiser = torch.optim.SGD(params, lr=0.0001, momentum=0.9, weight_decay=0.0005)
            if self.optimiser_type == 'Adam': 
                self.optimiser = torch.optim.Adam(params, lr=0.0001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.001, amsgrad=False)
            if self.optimiser_type == 'RMSprop':
                self.optimiser = torch.optim.RMSprop(params, lr=0.0001, alpha=0.99, eps=1e-08, weight_decay=0.001, momentum=0.9)
            self.lr_scheduler = torch.optim.lr_scheduler.StepLR(self.optimiser, step_size=4, gamma=0.01) 

        if self.model_name == 'LRASPP':
            if self.optimiser_type == 'SGD':
                self.optimiser = torch.optim.SGD(params, lr=0.001, momentum=0.9, weight_decay=0.0005)
            if self.optimiser_type == 'Adam': 
                self.optimiser = torch.optim.Adam(params, lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0.001, amsgrad=False)
            if self.optimiser_type == 'RMSprop':
                self.optimiser = torch.optim.RMSprop(params, lr=0.001, alpha=0.99, eps=1e-08, weight_decay=0.001, momentum=0.9)
            self.lr_scheduler = torch.optim.lr_scheduler.StepLR(self.optimiser, step_size=3, gamma=0.01)

        # Initialise either the weighted Cross Entropy Loss or unweighted
        if self.loss_type == 'Standard_CEL':
            self.loss = nn.CrossEntropyLoss()
        if self.loss_type == 'Weighted_CEL':
            self.loss = nn.CrossEntropyLoss(weight=class_weights)
        
        #Optionally print out the new model architecture
        if verbose:
            print(summary(model=self.model, 
                input_size=(10, 3, 520, 520),
                col_names=["input_size", "output_size", "num_params", "trainable"],
                col_width=20,
                row_settings=["var_names"]
            )) 

        return self.model

m1 = Model('DeepLabV3', 'Adam', 'Standard_CEL', verbose=True)
m2 = Model('FCN', 'Adam', 'Standard_CEL', verbose=True)
m3 = Model('LRASPP', 'Adam', 'Standard_CEL', verbose=True)


Downloading: "https://download.pytorch.org/models/deeplabv3_resnet101_coco-586e9e4e.pth" to /root/.cache/torch/hub/checkpoints/deeplabv3_resnet101_coco-586e9e4e.pth
100%|██████████| 233M/233M [00:01<00:00, 207MB/s]
  action_fn=lambda data: sys.getsizeof(data.storage()),
  return super().__sizeof__() + self.nbytes()


Layer (type (var_name))                            Input Shape          Output Shape         Param #              Trainable
DeepLabV3 (DeepLabV3)                              [10, 3, 520, 520]    [10, 32, 520, 520]   --                   Partial
├─IntermediateLayerGetter (backbone)               [10, 3, 520, 520]    [10, 2048, 65, 65]   --                   Partial
│    └─Conv2d (conv1)                              [10, 3, 520, 520]    [10, 64, 260, 260]   (9,408)              False
│    └─BatchNorm2d (bn1)                           [10, 64, 260, 260]   [10, 64, 260, 260]   (128)                False
│    └─ReLU (relu)                                 [10, 64, 260, 260]   [10, 64, 260, 260]   --                   --
│    └─MaxPool2d (maxpool)                         [10, 64, 260, 260]   [10, 64, 130, 130]   --                   --
│    └─Sequential (layer1)                         [10, 64, 130, 130]   [10, 256, 130, 130]  --                   False
│    │    └─Bottleneck (0)            

Downloading: "https://download.pytorch.org/models/fcn_resnet101_coco-7ecb50ca.pth" to /root/.cache/torch/hub/checkpoints/fcn_resnet101_coco-7ecb50ca.pth
100%|██████████| 208M/208M [00:00<00:00, 227MB/s]


Layer (type (var_name))                       Input Shape          Output Shape         Param #              Trainable
FCN (FCN)                                     [10, 3, 520, 520]    [10, 32, 520, 520]   --                   Partial
├─IntermediateLayerGetter (backbone)          [10, 3, 520, 520]    [10, 2048, 65, 65]   --                   Partial
│    └─Conv2d (conv1)                         [10, 3, 520, 520]    [10, 64, 260, 260]   (9,408)              False
│    └─BatchNorm2d (bn1)                      [10, 64, 260, 260]   [10, 64, 260, 260]   (128)                False
│    └─ReLU (relu)                            [10, 64, 260, 260]   [10, 64, 260, 260]   --                   --
│    └─MaxPool2d (maxpool)                    [10, 64, 260, 260]   [10, 64, 130, 130]   --                   --
│    └─Sequential (layer1)                    [10, 64, 130, 130]   [10, 256, 130, 130]  --                   False
│    │    └─Bottleneck (0)                    [10, 64, 130, 130]   [10, 256, 1

Downloading: "https://download.pytorch.org/models/lraspp_mobilenet_v3_large-d234d4ea.pth" to /root/.cache/torch/hub/checkpoints/lraspp_mobilenet_v3_large-d234d4ea.pth
100%|██████████| 12.5M/12.5M [00:00<00:00, 117MB/s]


Layer (type (var_name))                                      Input Shape          Output Shape         Param #              Trainable
LRASPP (LRASPP)                                              [10, 3, 520, 520]    [10, 32, 520, 520]   --                   Partial
├─IntermediateLayerGetter (backbone)                         [10, 3, 520, 520]    [10, 960, 33, 33]    --                   Partial
│    └─Conv2dNormActivation (0)                              [10, 3, 520, 520]    [10, 16, 260, 260]   --                   False
│    │    └─Conv2d (0)                                       [10, 3, 520, 520]    [10, 16, 260, 260]   (432)                False
│    │    └─BatchNorm2d (1)                                  [10, 16, 260, 260]   [10, 16, 260, 260]   (32)                 False
│    │    └─Hardswish (2)                                    [10, 16, 260, 260]   [10, 16, 260, 260]   --                   --
│    └─InvertedResidual (1)                                  [10, 16, 260, 260]   [10

In [None]:
#Garbage collect and empty cache
gc.collect()
torch.cuda.empty_cache()

class TrainTest():
  def __init__(self, model_class, trainLoader, testLoader, n_epochs=3, batch_size=3, k=10):
    
    # Initialise model specific attributes
    self.model_class = model_class
    self.model = self.model_class.model.to(device)
    self.lr_scheduler = self.model_class.lr_scheduler
    self.optimiser = self.model_class.optimiser
    self.loss = self.model_class.loss
    self.iou_accuracy = self.model_class.iou_acc
    self.pixel_accuracy = self.model_class.pix_acc


    # Initialise remaining attributes
    self.trainLoader = trainLoader
    self.testLoader = testLoader
    self.n_epochs = n_epochs
    self.batch_size = batch_size
    self.k = k

    # Check to see whether the combination of batch size and dataloader size will result in a batch size of 1 during training,
    # which will break the model. Change the batch size if so.
    while len(self.trainLoader)*(self.k - 1)/self.k % self.batch_size == 1:
        proceed = input(f"Invalid batch size. Proceed with batch_size = {self.batch_size + 1}?  [Y/N]\n")
        if proceed.lower() == 'y':
            self.batch_size += 1
            break
        self.batch_size = input("Enter batch size: ")
    
  def train_epoch(self, dataloader):
          self.model.train()
          iou_acc = 0.0
          pix_acc = 0.0
          train_loss = 0.0

          scaler = GradScaler()

          for idx, (images, labels) in enumerate(dataloader): 
              images, labels = images.to(device).float(), labels.to(device)

              # Calculate output predicitons
              self.optimiser.zero_grad()
              with autocast(dtype=torch.float16): # Use automatic mixed precision training to optimise memory usage
                  outputs = self.model(images)['out']
                
                  # Convert labels to 32 channel one-hot encoding
                  labels = encode_segmap(labels)

                  # Calculate loss 
                  loss = self.loss(outputs, labels.float())
                    
              # Perform scaled backward pass and update loss and accuracy
              scaler.scale(loss).backward()
              scaler.step(self.optimiser)
              scaler.update()
              train_loss += loss.item() * images.size(0)
              iou_acc += self.iou_accuracy(outputs, labels).item() * images.size(0)
              pix_acc += self.pixel_accuracy(outputs, labels).item() * images.size(0)

          # Update learning rate scheduler
          self.lr_scheduler.step()

          return iou_acc, pix_acc, train_loss
      
  def validation_epoch(self, dataloader):
      
      self.model.eval()
      validation_iou_acc = 0.0
      validation_pix_acc = 0.0
      validation_loss = 0.0
      
      with torch.no_grad():
          for idx, (images, labels) in enumerate(dataloader): 
              images, labels = images.to(device).float(), labels.to(device)
    
              # Calculate output predicitons
              outputs = self.model(images)['out']

              # Convert labels to 32 channel one-hot encoding
              labels = encode_segmap(labels)

              # Calculate accuracy and loss
              loss = self.loss(outputs, labels.float())
              validation_loss += loss.item() * images.size(0)
              validation_iou_acc += self.iou_accuracy(outputs, labels).item() * images.size(0)
              validation_pix_acc += self.pixel_accuracy(outputs, labels).item() * images.size(0)

      return validation_iou_acc, validation_pix_acc, validation_loss

  def k_fold_train(self, shuffle=True, verbose=True):

      kfold = KFold(n_splits=self.k, shuffle=shuffle)
      cv_average_train_iou_accuracy = []
      cv_average_train_pixel_accuracy = []
      cv_average_validation_iou_accuracy = []
      cv_average_validation_pixel_accuracy = []
      cv_average_train_loss = []
      cv_average_validation_loss = []
      start_time = time.time()

      for fold_idx, (train_idx, val_idx) in enumerate(kfold.split(self.trainLoader.dataset)):
          print("Current Fold:", fold_idx + 1)

          # Create data loaders for the training and validation subsets
          train_loader = DataLoader(self.trainLoader, batch_size=self.batch_size, sampler=SubsetRandomSampler(train_idx))
          val_loader = DataLoader(self.trainLoader, batch_size=self.batch_size, sampler=SubsetRandomSampler(val_idx))
          
          # Initialse lists to keep track of accuracy at each epoch
          epoch_train_iou_accuracy = []
          epoch_train_pixel_accuracy = []
          epoch_validation_iou_accuracy = []
          epoch_validation_pixel_accuracy = []
          epoch_train_loss = []
          epoch_validation_loss = []

          for epoch in range(self.n_epochs):

              train_cumulative_iou_acc, train_cumulative_pix_acc, train_loss = TrainTest.train_epoch(self, train_loader)
              validation_cumulative_iou_acc, validation_cumulative_pix_acc, validation_loss = TrainTest.validation_epoch(self, val_loader)
              
              # Find the IoU and Pixel accuracy, and loss
              train_iou_acc = train_cumulative_iou_acc / len(train_loader.sampler) * 100
              train_pix_acc = train_cumulative_pix_acc / len(train_loader.sampler) * 100
              validation_iou_acc = validation_cumulative_iou_acc / len(val_loader.sampler) * 100
              validation_pix_acc = validation_cumulative_pix_acc / len(val_loader.sampler) * 100
              train_loss = train_loss / len(train_loader.sampler)
              validation_loss = validation_loss / len(val_loader.sampler)

              if verbose==True:
                  print("Epoch:{}/{} Training IoU Acc: {:.2f}%, Validation IoU Acc: {:.2f}%, Training Pixel Acc: {:.2f}%, Validation Pixel Acc: {:.2f}%, Training Loss: {:.3f}, Validation Loss: {:.3f}".format(
                            epoch + 1, self.n_epochs, train_iou_acc, validation_iou_acc, train_pix_acc, validation_pix_acc, train_loss, validation_loss))
              
              # Append epoch lists keeping track of accuracy and loss at each epoch 
              epoch_train_iou_accuracy.append(train_iou_acc)
              epoch_train_pixel_accuracy.append(train_pix_acc)
              epoch_validation_iou_accuracy.append(validation_iou_acc)
              epoch_validation_pixel_accuracy.append(validation_pix_acc)
              epoch_train_loss.append(train_loss)
              epoch_validation_loss.append(validation_loss)
          
          # Append the CV average lists with the list containing the accuracy and loss for the epochs just gone
          cv_average_train_iou_accuracy.append(epoch_train_iou_accuracy)
          cv_average_train_pixel_accuracy.append(epoch_train_pixel_accuracy)
          cv_average_validation_iou_accuracy.append(epoch_validation_iou_accuracy)
          cv_average_validation_pixel_accuracy.append(epoch_validation_pixel_accuracy)
          cv_average_train_loss.append(epoch_train_loss)
          cv_average_validation_loss.append(epoch_validation_loss)

      # Calculate and print the training time
      train_time = time.time() - start_time
      print(f'The time taken to train the network was {int(train_time // 60)} mins {train_time % 60 :.0f} seconds')

      # Find the average accuracy and loss for each epoch (averaging across k folds)
      cv_average_train_iou_accuracy = np.mean(cv_average_train_iou_accuracy, axis=0)
      cv_average_train_pixel_accuracy = np.mean(cv_average_train_pixel_accuracy, axis=0)
      cv_average_validation_iou_accuracy = np.mean(cv_average_validation_iou_accuracy, axis=0)
      cv_average_validation_pixel_accuracy = np.mean(cv_average_validation_pixel_accuracy, axis=0)
      cv_average_train_loss = np.mean(cv_average_train_loss, axis=0)
      cv_average_validation_loss = np.mean(cv_average_validation_loss, axis=0)
      
      self.cv_average_train_iou_accuracy, self.cv_average_train_pixel_accuracy = cv_average_train_iou_accuracy, cv_average_train_pixel_accuracy
      self.cv_average_validation_iou_accuracy, self.cv_average_validation_pixel_accuracy = cv_average_validation_iou_accuracy, cv_average_validation_pixel_accuracy
      self.cv_average_train_loss, self.cv_average_validation_loss = cv_average_train_loss, cv_average_validation_loss

      return cv_average_train_iou_accuracy, cv_average_train_pixel_accuracy, cv_average_validation_iou_accuracy, cv_average_validation_pixel_accuracy, cv_average_train_loss, cv_average_validation_loss

  def test(self):

      self.test_loader = DataLoader(self.testLoader, batch_size=self.batch_size, shuffle=True)
      
      test_iou_acc, test_pix_acc = 0.0, 0.0
      test_loss = 0.0
      self.model.eval()

      with torch.no_grad():
          for idx, (images, labels) in enumerate(self.test_loader):
              if idx == len(self.testLoader) // self.batch_size:   
                  images_copy, labels_copy = images.to("cpu").clone(), labels.to("cpu").clone()
              images, labels = images.to(device).float(), labels.to(device)
              
              # Calculate output predicitons
              outputs = self.model(images)['out']
  
              # Convert labels to 32 channel one-hot encoding
              labels = encode_segmap(labels)
  
              # Calculate accuracy and loss
              test_iou_acc += self.iou_accuracy(outputs, labels).item() * images.size(0)
              test_pix_acc += self.pixel_accuracy(outputs, labels).item() * images.size(0)
              loss = self.loss(outputs, labels.float())
              test_loss += loss.item() * images.size(0)
      
      # Calculate and print the final testing accuracy and loss
      test_iou_acc = test_iou_acc / len(self.testLoader) * 100
      test_pix_acc = test_pix_acc / len(self.testLoader) * 100
      test_loss = test_loss / len(self.testLoader)

      print(f"Final Test IoU Accuracy: {test_iou_acc:.2f}%, Final Test Pixel Accuracy: {test_pix_acc:.2f}%, Final Test Loss: {test_loss:.3f}")
      
      # Select random image from last batch and display predicted mask alongside original mask and image
      idx = random.randint(0, images.shape[0])
      original_image = np.transpose(images_copy[idx], (1, 2, 0))
      true_mask = np.transpose(labels_copy[idx], (1, 2, 0))
      outputs_copy = self.model(images)['out'].cpu()
      outputs_copy = outputs_copy.detach()
      predicted_mask = one_hot_to_rgb(outputs_copy[idx])
      
      plt.figure(figsize=(6,6))
      
      ax = plt.subplot2grid((2,4),(0,0), colspan=2)
      ax.imshow(original_image)
      plt.title("Original Image")
      plt.axis("off")
  
      # Convert a one-hot image to RGB and display
      ax1 = plt.subplot2grid((2,4),(0,2), colspan=2)
      ax1.imshow(true_mask)
      plt.title("True Mask")
      plt.axis("off")
  
      ax2 = plt.subplot2grid((2,4),(1,1), colspan=2)
      ax2.imshow(predicted_mask)
      plt.title("Predicted Mask")
      plt.axis("off")
  
      plt.subplots_adjust(left=0.1, bottom=0.1, right=0.9, top=0.9, wspace=0.2, hspace=0.2)
      plt.show()
  
  def plot(self):

      # Plot graph showing the IoU and Pixel accuracy for the training and validation sets across n epochs
      x = [str(i+1) for i in range(self.n_epochs)]
      plt.figure()
      plt.title(f"Cross-Validated Average Training Accuracy over {self.n_epochs} epochs")
      plt.plot(x, self.cv_average_train_iou_accuracy, label='Training IoU Accuracy')
      plt.plot(x, self.cv_average_train_pixel_accuracy, label='Training Pixel Accuracy')
      plt.plot(x, self.cv_average_validation_iou_accuracy, label='Validation IoU Accuracy')
      plt.plot(x, self.cv_average_validation_pixel_accuracy, label='Validation Pixel Accuracy')
      plt.xlabel("Epoch")
      plt.ylabel("Accuracy (%)")
      plt.legend()
      plt.show()

      # Plot graph showing the training and validation loss over n epochs
      plt.figure()
      plt.title(f"Cross-Validated Average Training Loss over {self.n_epochs} epochs")
      plt.plot(x, self.cv_average_train_loss, color='red', label='Training Loss')
      plt.plot(x, self.cv_average_validation_loss, color='blue', label='Validation Loss')
      plt.xlabel("Epoch")
      plt.ylabel("Loss")
      plt.legend()        
      plt.show()

# run = TrainTest(deepLabV3Model, trainLoader, testLoader, n_epochs=2, batch_size=8, k=3)
# run.k_fold_train()
# run.test()
# run.plot()


### Hyperparameter tuning



In [None]:
#@title
# DeepLab-V3:

# deepLabV3Model_SGD_standard_loss = Model('DeepLabV3', 'SGD', 'Standard_CEL')

# run = TrainTest(deepLabV3Model_SGD_standard_loss, trainLoader, testLoader, n_epochs=10, batch_size=8, k=3)
# run.k_fold_train()

# result: 

# Epoch:10/10 Training Acc: 6.57%, Validation Acc: 6.61%, Training Loss: 3.330, Validation Loss: 3.344
# The time taken to train the network was 6 mins 2 seconds


In [None]:
#@title
# deepLabV3Model_Adam_standard_loss = Model('DeepLabV3', 'Adam', 'Standard_CEL')

# run = TrainTest(deepLabV3Model_Adam_standard_loss, trainLoader, testLoader, n_epochs=10, batch_size=8, k=3)
# run.k_fold_train()

# # result: Epoch:10/10 Training Acc: 87.46%, Validation Acc: 87.90%, Training Loss: 1.255, Validation Loss: 1.230
# The time taken to train the network was 6 mins 5 seconds

In [None]:
#@title
# deepLabV3Model_RMSprop_standard_loss = Model('DeepLabV3', 'RMSprop', 'Standard_CEL')

# run = TrainTest(deepLabV3Model_RMSprop_standard_loss, trainLoader, testLoader, n_epochs=10, batch_size=8, k=3)
# run.k_fold_train()

# # result: Epoch:10/10 Training Acc: 36.68%, Validation Acc: 37.01%, Training Loss: 0.400, Validation Loss: 0.355
# The time taken to train the network was 6 mins 5 seconds

In [None]:
#@title
# FCN

# FCNModel_SGD_Standard_loss = Model('FCN', 'SGD', 'Standard_CEL')
# run = TrainTest(FCNModel_SGD_Standard_loss, trainLoader, testLoader, n_epochs=10, batch_size=8, k=3)
# run.k_fold_train()

# Epoch:10/10 Training Acc: 33.52%, Validation Acc: 35.32%, Training Loss: 3.163, Validation Loss: 3.156
# The time taken to train the network was 5 mins 2 seconds

In [None]:
#@title
# FCN

# FCNModel_Adam_Standard_loss = Model('FCN', 'Adam', 'Standard_CEL')
# run = TrainTest(FCNModel_Adam_Standard_loss, trainLoader, testLoader, n_epochs=10, batch_size=8, k=3)
# run.k_fold_train()
# gc.collect()
# torch.cuda.empty_cache()

# Epoch:10/10 Training Acc: 87.31%, Validation Acc: 87.10%, Training Loss: 0.932, Validation Loss: 0.932
# The time taken to train the network was 5 mins 28 seconds

In [None]:
#@title
# FCN

# FCNModel_RMSprop_Standard_loss = Model('FCN', 'RMSprop', 'Standard_CEL')
# run = TrainTest(FCNModel_RMSprop_Standard_loss, trainLoader, testLoader, n_epochs=10, batch_size=8, k=3)
# run.k_fold_train()

# Epoch:10/10 Training Acc: 28.25%, Validation Acc: 28.07%, Training Loss: 0.396, Validation Loss: 0.351
# The time taken to train the network was 5 mins 4 seconds

In [None]:
#@title
# LRASPP

# LRASPP_SGD_Standard_loss = Model('LRASPP', 'SGD', 'Standard_CEL')
# run = TrainTest(LRASPP_SGD_Standard_loss, trainLoader, testLoader, n_epochs=10, batch_size=8, k=3)
# run.k_fold_train()


# Epoch:10/10 Training Acc: 26.12%, Validation Acc: 25.46%, Training Loss: 2.037, Validation Loss: 2.070
# The time taken to train the network was 1 mins 41 seconds

In [None]:
#@title
# LRASPP

# LRASPP_Adam_Standard_loss = Model('LRASPP', 'Adam', 'Standard_CEL')
# run = TrainTest(LRASPP_Adam_Standard_loss, trainLoader, testLoader, n_epochs=10, batch_size=8, k=3)
# run.k_fold_train()

# Epoch:10/10 Training Acc: 52.67%, Validation Acc: 52.45%, Training Loss: 0.367, Validation Loss: 0.365
# The time taken to train the network was 1 mins 42 seconds

In [None]:
#@title
# LRASPP

# LRASPP_RMSprop_Standard_loss = Model('LRASPP', 'RMSprop', 'Standard_CEL')
# run = TrainTest(LRASPP_RMSprop_Standard_loss, trainLoader, testLoader, n_epochs=10, batch_size=8, k=3)
# run.k_fold_train()

# Epoch:10/10 Training Acc: 23.82%, Validation Acc: 23.71%, Training Loss: 0.288, Validation Loss: 0.304
# The time taken to train the network was 1 mins 41 seconds

### Best model testing 

In [None]:
Best_deepLabV3Model = Model('DeepLabV3', 'Adam', 'Standard_CEL')
run = TrainTest(Best_deepLabV3Model, trainLoader, testLoader, n_epochs=5, batch_size=12, k=10)
run.k_fold_train()
run.test()
run.plot()

In [None]:
Best_FCNModel = Model('FCN', 'Adam', 'Standard_CEL')
run = TrainTest(Best_FCNModel, trainLoader, testLoader, n_epochs=5, batch_size=12, k=10)
run.k_fold_train()
run.test()
run.plot()

In [None]:
Best_LRASPPModel = Model('LRASPP', 'Adam', 'Standard_CEL')
run = TrainTest(Best_LRASPPModel, trainLoader, testLoader, n_epochs=10, batch_size=12, k=10)
run.k_fold_train()
run.test()
run.plot()

Weighted Loss Function Model

In [None]:
LRASPPModel = Model('LRASPP', 'RMSprop', 'Weighted_CEL')
run = TrainTest(LRASPPModel, trainLoader, testLoader, n_epochs=5, batch_size=5, k=5)
run.k_fold_train()
run.test()
run.plot()