# Path

In [0]:
# mount Google Drive every time you restart instance
from google.colab import drive
drive.mount('/content/gdrive')

In [0]:
# set working path every time after mounting drive or set alternative path
root_path = 'gdrive/My Drive/Dataset/'
cityscapes_path = root_path

# Imports

In [0]:
from __future__ import print_function, division
from distutils.version import LooseVersion

import numpy as np
import torch
import torchvision
from torch.utils import data
from torchvision import transforms, utils
import cv2
import os, sys, time
from torch.utils.data import Dataset, DataLoader
from glob import glob
from pathlib import Path
from skimage import io, transform
from tqdm import tqdm
from tqdm import tqdm_notebook as tqdm
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torchvision.models.vgg import VGG
import argparse
from PIL import Image

from albumentations import (RandomContrast, RandomBrightness, Compose, RandomCrop, HorizontalFlip, Normalize)

# import files for labels and models
%run 'gdrive/My Drive/Colab Notebooks/labels.py'
%run 'gdrive/My Drive/Colab Notebooks/models.py'

# Dataset Preparation

In [0]:
# Set username and password for Datascapes account. Download images
# This code block runs only once to create dataset. Username and password have been removed. 

!wget --keep-session-cookies --save-cookies=cookies.txt --post-data 'username=XXXXXX&password=XXXXX&submit=Login' https://www.cityscapes-dataset.com/login/
!wget --load-cookies cookies.txt --content-disposition https://www.cityscapes-dataset.com/file-handling/?packageID=3
!wget --load-cookies cookies.txt --content-disposition https://www.cityscapes-dataset.com/file-handling/?packageID=1  

In [0]:
# Create folders and unzip dataset

!mkdir dataset && mkdir dataset/gtFine_trainvaltest && mkdir dataset/leftImg8bit_trainvaltest
!unzip gtFine_trainvaltest.zip -d dataset/gtFine_trainvaltest
!unzip leftImg8bit_trainvaltest.zip -d dataset/leftImg8bit_trainvaltest

In [0]:
# Resize images to half the size

images = list(Path("/content/dataset/leftImg8bit_trainvaltest/").glob('**/*.png'))

for image in images:
   oriimg = cv2.imread(str(image), cv2.IMREAD_COLOR)
   newimg = cv2.resize(oriimg, None, fx=0.5,fy=0.5, interpolation = cv2.INTER_AREA)
   cv2.imwrite(str(image), newimg)

In [0]:
# Resize annotations to half the size

# Annotations = list(Path("/content/dataset/gtFine_trainvaltest/").glob('**/*.png'))

for annotation in annotations:
   orianno = cv2.imread(str(annotation), cv2.IMREAD_GRAYSCALE)
   newanno = cv2.resize(orianno, None, fx=0.5,fy=0.5, interpolation = cv2.INTER_NEAREST)
   cv2.imwrite(str(annotation), newanno)

In [0]:
# copy resized images to Google Drive folder

!cp -r dataset/ gdrive/My\ Drive/Dataset

# Image Transformations

In [0]:
# image transformations

# crop should be changed to 1024 if original sized imaged are to be used 

trans_crop = RandomCrop(p=1, height=512, width=512)
trans_flip = HorizontalFlip(p=0.5)
trans_contrast = RandomContrast(p=0.1)
trans_brightness = RandomBrightness(p=0.1)
trans_norm = Normalize(p=1)

# augmentations for training
aug = Compose([trans_crop, trans_flip, trans_contrast, trans_norm])

# augmentations for testing - only normalisation
aug_test = Compose([trans_norm])

# Some Functions

In [0]:
# print coloured image prediction 
def labels2rgb(labels, lut):
    im = cv2.LUT(cv2.merge((labels, labels, labels)), lut)
    im = cv2.cvtColor(im, cv2.COLOR_RGB2BGR)
    return im

In [0]:
# one hot encoding
def one_hot(anot, class_num):
    eye_tb = np.eye(class_num)[np.array(anot).reshape(-1)]
    return eye_tb.reshape(list(anot.shape)+[class_num])

# Cityscape Class

In [0]:
class CityscapeDateset(Dataset):
    """Cityscape dataset."""

    def __init__(self, root_dir, mode = "train",transform=None):
        """
        Args:
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        
        self.images_path = os.path.join(root_dir ,"leftImg8bit_trainvaltest")
        self.annotations_path = os.path.join(root_dir , "gtFine_trainvaltest")

        self.imgs_train = list(Path(self.images_path+'/leftImg8bit/train').glob('**/*.png'))
        self.imgs_test = list(Path(self.images_path+'/leftImg8bit/test').glob('**/*.png'))
        self.imgs_val = list(Path(self.images_path+'/leftImg8bit/val').glob('**/*.png'))
        self.imgs_train = sorted(self.imgs_train)
        self.imgs_test = sorted(self.imgs_test)
        self.imgs_val = sorted(self.imgs_val)
        
        self.annotations_train = list(Path(self.annotations_path+'/gtFine/train').glob('**/*gtFine_labelIds.png'))
        self.annotations_test = list(Path(self.annotations_path+'/gtFine/test').glob('**/*gtFine_labelIds.png'))
        self.annotations_val = list(Path(self.annotations_path+'/gtFine/val').glob('**/*gtFine_labelIds.png'))
        self.annotations_train = sorted(self.annotations_train)
        self.annotations_test = sorted(self.annotations_test)
        self.annotations_val = sorted(self.annotations_val)
        
        self.transform = transform
        self.mode = mode

    def __len__(self):
        if self.mode == "train":
            return len(self.imgs_train)
        elif self.mode == "test":
            return len(self.imgs_test)
        elif self.mode == "val":
            return len(self.imgs_val)

    def __getitem__(self, idx):
        if self.mode == "train":
            img_name = self.imgs_train[idx]
            annotation_name = self.annotations_train[idx]
  
        elif self.mode == "test":
            img_name = self.imgs_test[idx]
            annotation_name = self.annotations_test[idx]

        elif self.mode == "val":
            img_name = self.imgs_val[idx]
            annotation_name = self.annotations_val[idx]

        
        image = io.imread(img_name)
        annotations = io.imread(annotation_name)

        if self.mode == "train":
            augmented = aug(image=image, mask=annotations)
          
            image = torch.from_numpy(augmented['image']).float().permute(2, 0, 1)

            annotations = augmented['mask']
            w,h = annotations.shape
            annotations = annotations.reshape(w*h,)
            annotations = one_hot(annotations, 34)
            annotations = annotations.reshape(w,h,34)

            sample = {'image':image , 'annotation': annotations} 
        else:
            augmented = aug_test(image=image, mask=annotations)
            image = torch.from_numpy(augmented['image']).float().permute(2, 0, 1)
            sample = {'image':image , 'annotation': annotations, "name": img_name}

        return sample

In [0]:
# Defining training and test sets
# validation set used as test set 

train_db = CityscapeDateset(cityscapes_path)
test_db = CityscapeDateset(cityscapes_path, mode = "val")

# Model/Mode Selection

In [0]:
# model_selection function asks the user to select the model to be trained 

# 1. FCN32 with ResNet50 backbone
# 2. FCN32 with ResNet18 backbone
# 3. FCN16 with VGG backbone
# 4. FCN8 with VGG backbone
# 5. SegNet

while True:
  selected_model = int(input("""Please select model:

  1. FCN32 with ResNet50 backbone
  2. FCN32 with ResNet18 backbone
  3. FCN16 with VGG backbone
  4. FCN8 with VGG backbone
  5. SegNet \n
  """))

  if selected_model == 1:
      backbone = models.resnet50(pretrained=True)
      backbone = torch.nn.Sequential(*(list(backbone.children())[:-2]))
      model = FCN32s(backbone, 34, n_feats_backbone=2048).float().cuda()
      model_name = "fcn32_resnet50"
      print("\nFCN32 with ResNet50 backbone selected.")
      break
  elif selected_model == 2:
      backbone = models.resnet18(pretrained=True)
      backbone = torch.nn.Sequential(*(list(backbone.children())[:-2]))
      model = FCN32s(backbone, 34, n_feats_backbone=512).float().cuda()
      model_name = "fcn32_resnet18"
      print("\nFCN32 with ResNet18 backbone selected.")
      break
  elif selected_model == 3:
      vgg_model = VGGNet()
      model = FCN16s_VGG(pretrained_net=vgg_model, n_class=34).float().cuda()
      model_name = "fcn16_vgg"
      print("\nFCN16 with VGG backbone selected.")
      break
  elif selected_model == 4:
      vgg_model = VGGNet()
      model = FCN8s_VGG(pretrained_net=vgg_model, n_class=34).float().cuda()
      model_name = "fcn8_vgg"
      print("\nFCN8 with VGG backbone selected.")
      break
  elif selected_model == 5:
      model = segnet(3,34).float().cuda()
      model_name = "segnet"
      print("\nSegNet network selected.\n")
      break
  else:
      print("Please select a valid model number\n")

# mode selection

while True:
  mode_sel = int(input("""Please select mode:

  1. Training
  2. Testing/Evaluating\n
  """))
  if mode_sel == 1:
      print("\nProceed to training...")
      break

  elif mode_sel == 2:
    model_path = os.path.join(cityscapes_path, model_name + ".pt")

    if os.path.exists(model_path) == True:
      checkpoint = torch.load(model_path)
      model.load_state_dict(checkpoint['model_state_dict'])
      break
    else:
      print("Saved model \"",model_name,"\" not found. Please train model first.", sep='')
  else:
    print("Please select a valid mode number\n")


# Training

In [0]:
model_path = os.path.join(cityscapes_path, model_name + ".pt")

# define loss function and optimiser
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters())

# model trained for 20 epochs. modify as appropriate
max_epoch = 19

dataloader = DataLoader(train_db, batch_size=4, shuffle=True, num_workers=4)

if os.path.exists(model_path) == True:
  checkpoint = torch.load(model_path)
  start_epoch = int(checkpoint['epoch'])
  model.load_state_dict(checkpoint['model_state_dict'])
  optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
  loss = checkpoint['loss']
  
  if start_epoch < max_epoch:
    print("Loading epoch %d. Training resuming..." %start_epoch)
  else:
    pring("Network fully trained")
else:
  print("No checkpoint found. Training starting...")
  start_epoch = 0

for epoch in range(start_epoch, max_epoch):
  for i_batch, sample_batched in enumerate(tqdm(dataloader)):
      model.train()
      images = sample_batched['image'].float().cuda()
      gt = sample_batched['annotation'].permute(0,3,1,2).float().cuda()
      predictions = model(images)
      loss = criterion(predictions, gt)

      # Before the backward pass zero all gradients
      optimizer.zero_grad()

      # Backward pass: compute gradient of the loss with respect to model parameters
      loss.backward()

      # Calling the step function on an Optimizer makes an update to its parameters
      optimizer.step()

  print("Loss: %f" %loss.item())
  print("Epoch: %d" %epoch)

  torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss
}, model_path)


# Experiments

In [29]:
print("Selected model: ", model_name, "\n")

Selected model:  fcn32_resnet50 



In [0]:
# go into evaluation mode
model.eval()

# select a random image
sample = test_db[2]

img = sample["image"].unsqueeze(0)
prediction = model(img.cuda())

# checks tensor dimensions for original image and predictions
print("Image tensor:  ", img.size())
print("Prediction tensor: ", prediction.size())

prediction = torch.nn.functional.softmax(prediction,1)
y = torch.argmax(prediction, dim=1)

#now visualise all of them
labels2color = np.zeros((256, 3), dtype=np.uint8)

for lbl in labels:
    labels2color[lbl.id] = np.array(lbl.color)
labels2color = np.expand_dims(labels2color,0)

y = y.squeeze().cpu().numpy().astype(np.uint8)

# check if the size of the prediction is correct
print("Size of sample predicted image:", y.shape)

path = sample['name']
#print(path)

# read original image
original_img = cv2.imread(str(path))

# generate coloured prediction
cityscapes_rgb = labels2rgb(y, labels2color)

# write original and annotation images 
cv2.imwrite(os.path.join(cityscapes_path + model_name + "_original.png"), original_img)
cv2.imwrite(os.path.join(cityscapes_path + model_name + "_masks.png"), cityscapes_rgb)

# Testing / **Evaluation**

In [0]:
# intersectionAndUnion() function from MIT CSAIL Computer Vision github repo
# https://github.com/CSAILVision/semantic-segmentation-pytorch

def intersectionAndUnion(imPred, imLab, numClass):
    imPred = np.asarray(imPred).copy()
    imLab = np.asarray(imLab).copy()

    imPred += 1
    imLab += 1
    # Remove classes from unlabeled pixels in gt image.
    # We should not penalize detections in unlabeled portions of the image.
    imPred = imPred * (imLab > 0)

    # Compute area intersection:
    intersection = imPred * (imPred == imLab)
    (area_intersection, _) = np.histogram(
        intersection, bins=numClass, range=(1, numClass))

    # Compute area union:
    (area_pred, _) = np.histogram(imPred, bins=numClass, range=(1, numClass))
    (area_lab, _) = np.histogram(imLab, bins=numClass, range=(1, numClass))
    area_union = area_pred + area_lab - area_intersection

    return (area_intersection, area_union)

In [0]:
# go into evaluation mode
model.eval()

# create tables to store scores IoUs and IoUs per class
scores = []
ious = []
means = []

#initialize with NaNs. NaNs will be present when a class does not exist in a picture and then ignored in calculations
labels_array = np.empty((34,500)) * np.nan 

# counter for IoU per class array
j = 0 

start_time = time.time()

# predict all pictures
for sample in test_db:
  img = sample["image"].unsqueeze(0)
  gt_labels = sample["annotation"]
  prediction = model(img.cuda())
  prediction = torch.nn.functional.softmax(prediction,1)
  y = torch.argmax(prediction, dim=1)
  y = y.squeeze().cpu().numpy().astype(np.uint8)
  
  # populate accuracy table
  score = y == gt_labels
  scores.append(score)

  img_labels = np.unique(gt_labels)
  intersection, union = intersectionAndUnion(gt_labels, y, 35)
  intersection = intersection[img_labels]
  
  union = union[img_labels]

  # add a small number to union to avoid division by zero
  iou = intersection / (union + 1e-10)
  
  # append IoU values to calculate mean IoU 
  ious.append(iou)

  # populate array to calculate IoUs per class 
  for i in enumerate(img_labels):
       labels_array[i[1],j] = iou[i[0]]
  
  j = j + 1 

# convert to NumPy arrays 
scores = np.array(score)
ious = np.array(ious)

# calculate mean IoU
for res in ious:
  mean = np.mean(res)
  means.append(mean)

stop_time = time.time()
execution_time = stop_time - start_time

# print results
print("Selected model: ", model_name, "\n")
print("Mean Accuracy: %.2f\n" %np.mean(scores))
print("Mean IoU: %.2f\n" %np.mean(means))
print("Execution time: %.4f seconds\n" %execution_time)

# prints ids, labels and respective IoUs
print("IoUs per label\n")
for i in range(34):
  print("Id:", i, "Label:",id2label[i].name, "IoU:", np.around(np.nansum(labels_array[i,:], axis = 0, ) / np.count_nonzero(~np.isnan(labels_array[i,:])), decimals = 2))




# Evaluation Results

**FCN32 - ResNet50**

Mean Accuracy: 0.77

Mean IoU: 0.47

Execution time: 105 seconds

---
**FCN32 - ResNet18**

Mean Accuracy: 0.73

Mean IoU: 0.47

Execution time: 86 seconds


---

**FCN16 - VGG**

Mean Accuracy: 0.73

Mean IoU: 0.50

Execution time: 175 seconds


---


**FCN8 - VGG**

Mean Accuracy: 0.75

Mean IoU: 0.51

Execution time: 174 seconds


---
**SegNet**

Mean Accuracy: 0.73

Mean IoU: 0.40

Execution time: 298 seconds


---

*for detailed IoUs per class please refer to project report screenshots

