
<h1><div style="text-align: center;"> MSc Artificial Intelligence </div></h1>
<h2><div style="text-align: center;"> Object detection and Instance semantic segmentation -
Pascal VOC
</div></h2>

<h3><div style="text-align: center;"> A Ascencio-Cabral
</div></h3>



## Models

- Faster-RCNN-50-FPN,
- Mask-RCNN-50-FPN
- Mask-RCNN-101-FPN
- Mask-RCNN-101-FPN with customised anchors

### Evaluation - Coco style metrics
 - mean average precision (AP or mAP) at IoU [0.5, 0.05, 0.95], 0.75 and 0.50

This code is based on the following tutorials

- https://learn-pytorch.oneoffcoder.com/object-detection.html
- https://pytorch.org/tutorials/intermediate/torchvision_tutorial.html
The folder utility contains code from pytorch with some hacks to track the loss and the average
precision per epoch during training and validation.
- https://github.com/pytorch/vision/tree/master/references/detection

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Comment out if pycocotools is not install
!pip install cython
!pip install -U 'git+https://github.com/cocodataset/cocoapi.git#subdirectory=PythonAPI'

In [None]:
%cd '/content/drive/MyDrive/INM705'

## 1. Libraries

In [None]:
import os
import pandas as pd
import numpy as np
import cv2
import random
import torch.utils.data
import torch.nn as nn
import torchvision

from torchvision.models.detection.mask_rcnn import MaskRCNN, MaskRCNNPredictor
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN_ResNet50_FPN_Weights, \
    maskrcnn_resnet50_fpn, MaskRCNN_ResNet50_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models import resnet101, ResNet101_Weights
from torchvision.models.detection.backbone_utils import _resnet_fpn_extractor, \
    _validate_trainable_layers
from torchvision.ops import misc as misc_nn_ops
from torchvision.models.detection.rpn import AnchorGenerator, RPNHead

from pascal_dataset import PascalVoc
from utility.engine import train_one_epoch, evaluate
import utility.utils as utils
import utility.transforms as T

import matplotlib.pyplot as plt
%matplotlib inline

np.random.seed(37)
np.random.seed(37)
torch.manual_seed(37)

## 2. Pascal VOC Dataset

In [None]:
# uncomment to unzip
# !unzip './data' -d './'

In [None]:
print('Images:', len(os.listdir('data/Images')))
print('Masks:', len(os.listdir('data/GT')))
print('Annotations:', len(os.listdir('data/annotations')))

In [None]:
# Example of a process image with boxes and masks
root = os.path.join(os.getcwd(), 'data')
dataset = PascalVoc(root, transforms=None)
dataset[5]

In [None]:
dataset.get_img_name(5)

## 3. Building the models

In [None]:
# Maps available devices
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

In [None]:
#VOC Classes
voc_classes= ('__background__','aeroplane', 'bicycle', 'bird', 'boat', 'bottle',
                           'bus','car','cat', 'chair', 'cow', 'diningtable', 'dog', 'horse',
                           'motorbike', 'person', 'pottedplant', 'sheep', 'sofa', 'train',
                           'tvmonitor')

###  3.1 Built Mask-RCNN-ResNet-101-FPN

In [None]:
def maskrcnn_resnet101_fpn(*, progress=True, num_classes=None,
    weights_backbone=ResNet101_Weights.IMAGENET1K_V2, trainable_backbone_layers=None, **kwargs):
    """
    Adapted from pytorch maskrcnn_resnet50_fpn
    https://pytorch.org/vision/main/_modules/torchvision/models/detection/mask_rcnn.html#maskrcnn_resnet50_fpn
    """
    weights_backbone = ResNet101_Weights.verify(weights_backbone)

    if num_classes is None:
        num_classes = 91

    is_trained = weights_backbone is not None
    trainable_backbone_layers = _validate_trainable_layers(is_trained, trainable_backbone_layers, 5, 3)
    norm_layer = misc_nn_ops.FrozenBatchNorm2d if is_trained else nn.BatchNorm2d
    backbone = resnet101(weights=weights_backbone, progress=progress, norm_layer=norm_layer)
    backbone = _resnet_fpn_extractor(backbone, trainable_backbone_layers)
    model = MaskRCNN(
        backbone,
        num_classes=num_classes,
        **kwargs)
    return model

### 3.2 Build the segmentation and detection models

Let's update input features for the predictor and classifier for the models.
Masrcnn-Resnet101-FPN requires first to update the state dictionary with the common parameters in
 the backbone and then create new box and mask predictors for the 21 VOC classes (20 + background).


In [None]:
def get_instance_segmentation_model(num_classes, backbone, custom_anchors=False, pretrained=True,
                                    task='detection'):
    """
    :param num_classes: an integer with the number of classes in the dataset including the
    background
    :param backbone: string with the name of the backbon eto use
    :param custom_anchors: a boolean indicating whether custom anchor should be built
    :param pretrained: a boolean to indicate in the model is pretrained
    :param task: a boolean indicating whether is a detection or segmentation task - Faster-RCNN
    :return:  a deep model
    """
    # load an instance segmentation model pre-trained on COCO

    assert backbone in ['resnet-50-fpn', 'resnet-101-fpn' ], \
        'input one of resnet-50-fpn or resnet-101-fpn'

    assert task in ['segmentation', 'detection' ], 'input detection or segmentation'

    if backbone == "resnet-50-fpn" and task == 'segmentation':

        if pretrained:
            model = maskrcnn_resnet50_fpn(weights=MaskRCNN_ResNet50_FPN_Weights.DEFAULT)
        else:
            model = maskrcnn_resnet50_fpn()

        # get number of input features for the classifier
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        # replace the pre-trained head with a new one
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

        # get the number of input features for the mask classifier
        in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
        hidden_layer = 256
        # and replace the mask predictor with a new one
        model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer,
                                                           num_classes)
        m_name = 'maskrcnn-resnet50-fpn'
        print(m_name)

    elif backbone == 'resnet-101-fpn' and task == 'segmentation' and custom_anchors:

        model = maskrcnn_resnet101_fpn(num_classes=num_classes)

        #create a customised anchors for the FPN which by default has 5 outputs
        anchor_generator = AnchorGenerator(
        sizes=tuple([(16, 32, 64, 128, 256, 512) for _ in range(5)]),
        aspect_ratios = tuple([(0.5, 1.0, 2.0) for _ in range(5)]))
        model.rpn.anchor_generator = anchor_generator
        model.rpn.head = RPNHead(256, anchor_generator.num_anchors_per_location()[0])

        # get number of input features for the classifier
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        # replace the pre-trained head with a new one
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
        # get the number of input features for the mask classifier
        in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
        hidden_layer = 256
        # replace the mask predictor with a new one
        model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer,
                                                           num_classes)
        m_name = 'maskrcnn-resnet101-fpn-ca'
        print(m_name)

    elif backbone == 'resnet-101-fpn' and task == 'segmentation' and not custom_anchors:

        model = maskrcnn_resnet101_fpn(num_classes=num_classes)
        # get the number of input features for the classifier
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        # replace the pre-trained head with a new one -to prevent size clashes
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
        #get the number of input features for the mask classifier
        in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
        hidden_layer = 256
        # replace the mask predictor with a new one
        model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
                                                          hidden_layer, num_classes)
        m_name = 'maskrcnn-resnet101-fpn'
        print(m_name)

    elif backbone == 'resnet-101-fpn' and task =='detection':
        assert task == 'segmentation', 'Input segmentation'

    else:
        weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
        model = fasterrcnn_resnet50_fpn(weights=weights)
        # get number of input features for the classifier
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        # replace the pre-trained head with a new one
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
        m_name = 'fasterrcnn-resnet50-fpn'
        print(m_name)

    return model, m_name

In [None]:
def get_transform(train):
    transforms = [T.PILToTensor(), T.ConvertImageDtype(dtype=torch.float32)]
    # converts the image, a PIL image, into a PyTorch Tensor
    if train:
        # during training, randomly flip the training images
        # and ground-truth for data augmentation
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

## 4. Training and Validation

The dataset will be randomly split in three subsets. First a list of random indices will be generated by using a random permutation. The split rations are 80:10:10 for training valdation and test data respectively. The model will be trained and evaluated after each epoch.

In [None]:
def main(kwargs):

    print("........Starting........")
    device = kwargs['device']
    num_classes = kwargs['num_classes']

    train_dataset = PascalVoc(data_path, get_transform(train=True))
    val_dataset = PascalVoc(data_path, get_transform(train=False))

    # split the dataset in train and test set
    torch.manual_seed(1)
    indices = torch.randperm(len(train_dataset)).tolist()
    indx = round(len(train_dataset) * 0.80)
    test_idx = round(len(train_dataset) * 0.90)
    train_set = torch.utils.data.Subset(train_dataset, indices[0:indx])
    val_set = torch.utils.data.Subset(val_dataset, indices[indx:test_idx])

    # define training and validation data loaders
    train_loader = torch.utils.data.DataLoader(train_set, batch_size=2,
                    shuffle=True, num_workers=2, collate_fn=utils.collate_fn)

    val_loader = torch.utils.data.DataLoader(
      val_set, batch_size= 2, shuffle=False, num_workers=2,
      collate_fn=utils.collate_fn)

    # get the model using our helper function
    model, model_name = get_instance_segmentation_model(num_classes, backbone=kwargs['backbone'],
                                            custom_anchors=kwargs['anchors'],
                                            task=kwargs['task'])

    # move model to the right device
    model.to(device)

    # construct an optimizer
    params = [p for p in model.parameters() if p.requires_grad]

    if kwargs['opt'] == "SGD":
        optimizer = torch.optim.SGD(params, lr=kwargs['lr'],
                                  momentum= kwargs['momentum'], weight_decay=kwargs['w_decay'])
    else:
        optimizer = torch.optim.Adam(params, lr=kwargs['lr'], weight_decay=kwargs['w_decay'])


    # learning rate scheduler
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                        step_size= kwargs['step'], gamma=kwargs['gamma'])

    #########################
    # Training and validation
    ##########################

    best_loss = np.inf

    train_loss =[]
    epochs = kwargs['epochs']
    lr = kwargs['lr']

    for epoch in range(epochs):

        # train for one epoch, printing every 100 iterations
        loss, _ = train_one_epoch(model, optimizer, train_loader, device, epoch, print_freq=100)

        train_loss.append(loss)

        lr_scheduler.step()

        # evaluate on the test dataset
        _, _ = evaluate(model, val_loader, device=device)

        if loss < best_loss:
            best_loss = loss

            check_point = {'epoch': epoch +1, 'model': model.state_dict(),
                           'optimizer_dict': optimizer.state_dict(),
                           'scheduler': lr_scheduler.state_dict()}

            torch.save(check_point, os.path.join(output_dir,
                                                 f'best-{model_name}-{epochs}-{lr}.pth'))

    ##############################
    #  Plot loss per epoch during training
    ############################
    plt.plot(train_loss, label='training')
    plt.ylabel('Loss')
    plt.xlabel('Epochs')
    plt.legend()
    plt.show()
    return indices[test_idx:], model_name


### 4.2 Hyper-parameters


In [None]:
# hyperparametres
data_path = os.path.join(os.getcwd(), 'data')
output_dir = os.path.join(os.getcwd(), 'weigths')
try:
    os.makedirs(output_dir, exist_ok=False)
    print('Directory successfully created')
except OSError as error:
    print('Directory already exist')

# Select Adam or SGD
optims = ['SGD', 'Adam']
tasks = ['detection', 'segmentation']
backbone_names = ['resnet-50-fpn', 'resnet-101-fpn']
params_dict = {'task': tasks[1], 'backbone':backbone_names[1], 'device': device, 'num_classes':
    21, 'anchors': True, 'opt': optims[1], 'step': 5, 'gamma':0.5, 'epochs': 20, 'lr': 0.0001,
               'w_decay': 0.0001, 'momentum': 0.9}

### 4.3 Training

In [None]:
# Run experiment
test_ind, net_name = main(params_dict)

# 5. Evaluation

### 5.1 Build the test dataset and test data loader

In [None]:
# Prepare test dataset
voc_dataset = PascalVoc(data_path, get_transform(train=False))
test_dataset = torch.utils.data.Subset(voc_dataset, test_ind)

In [None]:
# Prepare test dataloader
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=4, shuffle=False, num_workers=2,
  collate_fn=utils.collate_fn)

In [None]:
# model_weights = 'fasterrcnn-resnet50-fpn'
ep = params_dict['epochs']
lr = params_dict['lr']
net_name = f'{net_name}-{ep}-{lr}'
checkpoint_dir = os.path.join(output_dir, f'best-{net_name}.pth')
checkpoint = torch.load(checkpoint_dir, map_location='cpu')

### 5.2 Load the trained weights onto the model

In [None]:
model_, _ = get_instance_segmentation_model(num_classes=params_dict['num_classes'], backbone=params_dict['backbone'],
                                            custom_anchors=params_dict['anchors'], task=params_dict['task'])
model_.load_state_dict(checkpoint['model'], strict=False)

### 5.3 Built evaluation fuction per class

In [None]:
def map_per_class(coco_evaluator, inst_seg=True):
    """
    Code adapted from
    https://github.com/kevalmorabia97/Object-and-Semantic-Part-Detection-pyTorch/blob/master/extra/per_class_AP.ipynb
    """
    d = coco_evaluator.coco_eval['bbox'].eval['precision']
    #  All classes except `__background__`
    d_classes = d.shape[2]
    d_ap_class = [np.mean(d[0, :, cl, 0, 2]) for cl in range(d_classes)]
    if inst_seg:
        s = coco_evaluator.coco_eval['segm'].eval['precision']
        s_classes = s.shape[2]
        s_ap_class = [np.mean(s[0, :, cl, 0, 2]) for cl in range(s_classes)]
        return d_ap_class, s_ap_class
    else:
        return d_ap_class

In [None]:
def metric_per_class(dataset, coco_evaluator, inst_seg=True, num_classes=21, fname=None,
                     metrics_dir=None):

    metrics = {}
    if inst_seg:
        d_ap_class, s_ap_class  = map_per_class(coco_evaluator, inst_seg=True)
        for i in range(1, num_classes):
            cl = dataset.idx_to_class[i]
            det = round(100 * d_ap_class[i-1], 2)
            seg = round(100 * s_ap_class[i-1], 2)
            print(f'Detection {cl}: {det} | Segmentation {cl}: {seg}')
            metrics[cl] = [det, seg]
        df = pd.DataFrame.from_dict(metrics)
        df.insert(0, 'Model', fname)
        df.insert(1, 'Task', ['detection', 'segmentation'])

    else:
        d_ap_class = map_per_class(coco_evaluator, inst_seg=False)
        for i in range(1, num_classes):
            cl = dataset.idx_to_class[i]
            det = round(100 * d_ap_class[i-1], 2)
            print(f'Detection {cl}: {det}')
            metrics[cl] = [det]
        df = pd.DataFrame.from_dict(metrics)
        df.insert(0, 'Model', fname)
        df.insert(1, 'Task', ['detection'])

    df.to_csv(os.path.join(metrics_dir,  f'{fname}.csv'))
    return df

In [None]:
# Create results directory
results_dir = os.path.join(os.getcwd(), 'metrics')
try:
    os.makedirs(results_dir, exist_ok=False)
    print('Directory successfully created')
except OSError as error:
    print('Directory already exist')

### 5.4 Compute mAP and mAP per class for the test dataset

In [None]:
coco_ev, test_stats = evaluate(model_.to(device), test_loader, device=device)

In [None]:
segmentation = True if params_dict['task'] == 'segmentation' else False

In [None]:
metrics_df = metric_per_class(voc_dataset, coco_ev, inst_seg=segmentation, num_classes=21,
                              fname=net_name, metrics_dir=results_dir)

In [None]:
# Visualise result 
metrics_df

##  6. Inference

###  6.1 Color map decoding for masks

In [None]:
# This function allows us to visualize a particular segmentation output, by setting
# each pixel color according to the given segmentation class provided in the
# image (segmentation output).
# Adapted from https://learnopencv.com/pytorch-for-beginners-semantic-segmentation-using
# -torchvision/
def decode_segmap(image, n_classes=21):

    label_colours = [(0, 255, 0), (0, 0, 255), (255, 0, 0), (0, 255, 255), (255, 255, 0),
               (0, 128, 128), (255, 0, 2550), (80, 70, 180), (250, 80, 190), (64, 128, 0),
               (192, 128, 0), (64, 0, 128), (192, 0, 128), (64, 128, 128), (192, 128, 128),
               (0, 64, 0), (128, 64, 0), (0, 192, 0), (70, 150, 250), (50, 190, 190)]

    r = np.zeros_like(image).astype(np.uint8)
    g = np.zeros_like(image).astype(np.uint8)
    b = np.zeros_like(image).astype(np.uint8)

    r[image == 1], g[image == 1], b[image == 1] = label_colours[random.randrange(0, 20)]
    rgb = np.stack([r, g, b], axis=2)

    return rgb, label_colours

### 6.2 Build prediction and inference functions

In [None]:
def get_prediction(pred, threshold, seg=True, msk_thres=0.5):
    """
    :param pred:  dictionary with the prediction
    :param threshold: float, threshol for detection
    :param seg: boolean, wheter the nference is for a segmentation model or detectection only
    :param msk_thres: float threshold for correct ask predictons
    :return:  predictions
    """

    pred_score = list(pred['scores'].detach().cpu().numpy())
    pred_t = [pred_score.index(s) for s in pred_score if s>threshold][-1]
    pred_class = [voc_classes[i] for i in list(pred['labels'].cpu().numpy())]
    pred_boxes = [[(int(i[0]), int(i[1])), (int(i[2]), int(i[3]))]
                  for i in list(pred['boxes'].detach().cpu().numpy())]
    pred_boxes = pred_boxes[:pred_t+1]
    pred_class = pred_class[:pred_t+1]
    pred_score = pred_score[:pred_t+1]
    if seg:
        masks = (pred['masks']>msk_thres).squeeze(1).detach().cpu().numpy()
        pred_masks = masks[:pred_t+1]
        return pred_masks, pred_boxes, pred_class, pred_score
    else:
        return pred_boxes, pred_class, pred_score


def test_inference(img, pred, threshold=0.5, msk_thres=0.5, rect_th=2, text_size=0.5, text_th=1,
                 seg=False):
    """
    :param img: a pil image
    :param pred: a tuple with the predictions
    :param threshold: float boxes thresholds
    :param msk_thres: float mask threshold
    :param rect_th: int thicknes of the rectangle
    :param text_size: float font size
    :param text_th: float the thickness of the test
    :param seg: boolean indicates segmentation or only a detection model
    :return: the image with the inference
    """

    if seg:
        masks, boxes, pred_cls, scores = get_prediction(pred, threshold, seg, msk_thres)
        for i in range(len(masks)):
            rgb_mask, colors = decode_segmap(masks[i])
            img = cv2.addWeighted(np.array(img), 1, rgb_mask, 0.5, 0)
            cv2.rectangle(img, boxes[i][0], boxes[i][1], color=(0, 204, 0), thickness=rect_th)
            s = str(round(scores[i], 2))
            cv2.putText(img, f'{pred_cls[i]}: {s}', (boxes[i][0][0], boxes[i][0][1]-3),
                        cv2.FONT_HERSHEY_SIMPLEX, text_size, color=(0, 204, 0), thickness=text_th,
                        lineType=cv2.LINE_AA)
        return img

    else:
        boxes, pred_cls, scores = get_prediction(pred, threshold, segmentation, msk_thres)
        for i in range(len(boxes)):
            cv2.rectangle(img, boxes[i][0], boxes[i][1],color=(0, 204, 0), thickness=rect_th)
            s = str(round(scores[i], 2))
            cv2.putText(img, f'{pred_cls[i]}: {s}', (boxes[i][0][0], boxes[i][0][1]-3),
                      cv2.FONT_HERSHEY_SIMPLEX, text_size, color=(0, 204, 0), thickness=text_th,
                      lineType=cv2.LINE_AA)
        return img

In [None]:
def show_inference(model, loader, threshold=0.7, mask_thres=0.5, seg=True,
                     outdir=None, fname=None):
    """
    :param model: model to evaluate
    :param loader:  constructor with tensors of the test images and targets
    :param threshold: float to select the boxes
    :param mask_thres: float to threshold the masks
    :param seg: boolean to show only mask or boxes and masks
    :param outdir: str directory to save the inference
    :param fname: str name to save the inference
    :return: save and show predictions and probability scores on the test images
    """

    fig = plt.figure(figsize=(12, 6))
    model = model
    model.eval()

    images, targets = next(iter(loader))
    with torch.no_grad():
        predictions = model(images)
    for i, image in enumerate(images):
        fig.add_subplot(len(images) // 2, 2, i + 1, xticks=[], yticks=[])
        img = torchvision.transforms.ToPILImage()(image)
        img = test_inference(np.array(img), predictions[i], threshold, msk_thres=mask_thres,
                           seg=seg)
        img_id = targets[i]['image_id'].item()
        name = voc_dataset.get_img_name(img_id)
        plt.imshow(img)
        plt.xlabel(name, color='blue', fontsize=12)
    plt.tight_layout()

    if outdir is not None:
        fname = f'predictions_{fname}.png'
        return plt.savefig(os.path.join(outdir, fname), bbox_inches='tight',
                            format='png', dpi=300)

### 6.3 Inference on the test dataset

In [None]:
inference_dir = os.path.join(os.getcwd(), 'inference')
try:
    os.makedirs(inference_dir, exist_ok=False)
    print('Directory successfully created')
except OSError as error:
    print('Directory already exist')

In [None]:
show_inference(model_.to('cpu'), test_loader, threshold=0.8, mask_thres=0.5,
               seg=segmentation, outdir=inference_dir, fname=f'{net_name}')