# Lab 7: Computer Vision Applications - Object Detection

In this session, we will learn how to train a Faster RCNN model to do object detection.
  
Here are all the necessary libraries.

In [None]:
# linear algebra
import numpy as np
# data processing, CSV file I/O (e.g. pd.read_csv)
import pandas as pd
import os
import sys
import torch
import torchvision
from torchvision import datasets, models
from torchvision.transforms import functional as FT
from torchvision import transforms as T
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, sampler, random_split, Dataset
import copy
import math
from PIL import Image
# opencv
import cv2
# our data augmentation library
import albumentations as A
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
import warnings
warnings.filterwarnings("ignore")
from collections import defaultdict, deque
import datetime
import time
# display progress bar
from tqdm import tqdm
# plot bounding box
from torchvision.utils import draw_bounding_boxes

## Dataset
First, you need to download the Aquarium dataset in Kaggle:
https://www.kaggle.com/datasets/sharansmenon/aquarium-dataset.
After that, you have two options: upload it into Colab or into your Google Drive, as before.

If needed, you can use the cell below to unzip the files. Adjust it if necessary.


In [None]:
!unzip archive.zip

PyCOCOTools provides many utilities for dealing with datasets in the COCO format, and if you wanted, you could evaluate the model's performance on the dataset with some of the utilities provided with this library.

You may try that by yourself.

In [None]:
from pycocotools.coco import COCO

We use albumentations as our data augmentation library due to its capability to deal with bounding boxes in multiple formats

In [None]:
from albumentations.pytorch import ToTensorV2

In [None]:
def get_transforms(train=False):
    if train:
        transform = A.Compose([
            A.Resize(600, 600), # our input size can be 600px
            A.HorizontalFlip(p=0.3),
            A.VerticalFlip(p=0.3),
            A.RandomBrightnessContrast(p=0.1),
            A.ColorJitter(p=0.1),
            ToTensorV2()
        ], bbox_params=A.BboxParams(format='coco'))
    else:
        transform = A.Compose([
            A.Resize(600, 600), # our input size can be 600px
            ToTensorV2()
        ], bbox_params=A.BboxParams(format='coco'))
    return transform

This is our dataset class.
It loads all the necessary files and it processes the data so that it can be fed into the model.

In [None]:
class AquariumDetection(datasets.VisionDataset):
    def __init__(self, root, split='train', transform=None, target_transform=None, transforms=None):
        # the 3 transform parameters are reuqired for datasets.VisionDataset
        super().__init__(root, transforms, transform, target_transform)
        self.split = split #train, valid, test
        self.coco = COCO(os.path.join(root, split, "_annotations.coco.json")) # annotatiosn stored here
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.ids = [id for id in self.ids if (len(self._load_target(id)) > 0)]

    def _load_image(self, id: int):
        path = self.coco.loadImgs(id)[0]['file_name']
        image = cv2.imread(os.path.join(self.root, self.split, path))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        return image
    def _load_target(self, id):
        return self.coco.loadAnns(self.coco.getAnnIds(id))

    def __getitem__(self, index):
        id = self.ids[index]
        image = self._load_image(id)
        target = self._load_target(id)
        target = copy.deepcopy(self._load_target(id))

        boxes = [t['bbox'] + [t['category_id']] for t in target] # required annotation format for albumentations
        if self.transforms is not None:
            transformed = self.transforms(image=image, bboxes=boxes)

        image = transformed['image']
        boxes = transformed['bboxes']

        new_boxes = [] # convert from xywh to xyxy
        for box in boxes:
            xmin = box[0]
            xmax = xmin + box[2]
            ymin = box[1]
            ymax = ymin + box[3]
            new_boxes.append([xmin, ymin, xmax, ymax])

        boxes = torch.tensor(new_boxes, dtype=torch.float32)

        targ = {} # here is our transformed target
        targ['boxes'] = boxes
        targ['labels'] = torch.tensor([t['category_id'] for t in target], dtype=torch.int64)
        targ['image_id'] = torch.tensor([t['image_id'] for t in target])
        targ['area'] = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0]) # we have a different area
        targ['iscrowd'] = torch.tensor([t['iscrowd'] for t in target], dtype=torch.int64)
        return image.div(255), targ # scale images
    def __len__(self):
        return len(self.ids)

The cell below loads the annotations so we can access the classes.

In [None]:
# load classes
dataset_path="Aquarium Combined"
coco = COCO(os.path.join(dataset_path, "train", "_annotations.coco.json"))
categories = coco.cats
n_classes = len(categories.keys())
categories

This code just gets a list of classes.


In [None]:
classes = [i[1]['name'] for i in categories.items()]
classes

Create the datasets!

In [None]:
train_dataset = AquariumDetection(root=dataset_path, transforms=get_transforms(True))
test_dataset = AquariumDetection(root=dataset_path, split="test", transforms=get_transforms(False))

This is our collating function for the train dataloader, it allows us to create batches of data that can be easily pass into the model

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

Creating the dataloader.

In [None]:
train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, num_workers=4, collate_fn=collate_fn)

This is a sample image and its bounding boxes, this code does not get the model's output

In [None]:
# Lets view a sample
sample = train_dataset[2]
img_int = torch.tensor(sample[0] * 255, dtype=torch.uint8)
plt.imshow(draw_bounding_boxes(
    img_int, sample[1]['boxes'], [classes[i] for i in sample[1]['labels']], width=4
).permute(1, 2, 0))

## Model
Our model is FasterRCNN with a backbone of `MobileNetV3-Large`.
We need to change the output layers because we have just `7` classes but this model was trained on `80` classes from the [COCO dataset](https://cocodataset.org/).

In [None]:
# Load the faster rcnn model
model = models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)

# we need to change the head, get the number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features

# replace the pre-trained head with a new one
model.roi_heads.box_predictor = models.detection.faster_rcnn.FastRCNNPredictor(in_features, n_classes)

The following blocks ensures that the model can take in the data and that it will not crash during training.

In [None]:
images,targets = next(iter(train_loader))
images = list(image for image in images)
targets = [{k:v for k, v in t.items()} for t in targets]
output = model(images, targets) # just make sure this runs without error

In [None]:
device = torch.device("cuda") # use GPU to train
model = model.to(device)

## Optimizer
Here, we define the optimizer. If you wish, you can also define the `LR Scheduler`, but it is not necessary for this notebook since our dataset is so small.

In this case, we do not need to define a loss function because the model has its own built-in loss. We just need to pass the labels and the model will process and return the calculated loss.

In [None]:
# parameters and optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.01, momentum=0.9, nesterov=True, weight_decay=1e-4)

## Training
The following is a function that will train the model for one epoch. `Torchvision Object Detections` models have a loss function built in, and it will calculate the loss automatically if you pass in the **inputs** and **targets**.

In [None]:
def train_one_epoch(model, optimizer, loader, device, epoch):
    model.to(device)
    model.train()

    all_losses = []
    all_losses_dict = []

    for images, targets in tqdm(loader):
        images = list(image.to(device) for image in images)
        targets = [{k: torch.tensor(v).to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets) # the model computes the loss automatically if we pass in targets
        losses = sum(loss for loss in loss_dict.values())
        loss_dict_append = {k: v.item() for k, v in loss_dict.items()}
        loss_value = losses.item()

        all_losses.append(loss_value)
        all_losses_dict.append(loss_dict_append)

        if not math.isfinite(loss_value):
            print(f"Loss is {loss_value}, stopping trainig") # train if loss becomes infinity
            print(loss_dict)
            sys.exit(1)

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

    all_losses_dict = pd.DataFrame(all_losses_dict) # for printing
    print("Epoch {}, lr: {:.6f}, loss: {:.6f}, loss_classifier: {:.6f}, loss_box: {:.6f}, loss_rpn_box: {:.6f}, loss_object: {:.6f}".format(
        epoch, optimizer.param_groups[0]['lr'], np.mean(all_losses),
        all_losses_dict['loss_classifier'].mean(),
        all_losses_dict['loss_box_reg'].mean(),
        all_losses_dict['loss_rpn_box_reg'].mean(),
        all_losses_dict['loss_objectness'].mean()
    ))

20 Epochs should be enough to train this model for a high accuracy.

In [None]:
num_epochs=20

for epoch in range(num_epochs):
    train_one_epoch(model, optimizer, train_loader, device, epoch)

## Inference

This is the inference code for the model. First, we set the model to evaluation mode and clear the GPU Cache.

In [None]:
# we will watch first epoich to ensure no errrors
# while it is training, lets write code to see the models predictions. lets try again
model.eval()
torch.cuda.empty_cache()

Now, let's get some image (in this case, image at index 20) from the test set and generate predictions using the trained model.

Feel free to change the prediction image.

In [None]:
img, _ = test_dataset[20]  # change this index here to check other images
img_int = torch.tensor(img*255, dtype=torch.uint8)
with torch.no_grad():
    prediction = model([img.to(device)])
    pred = prediction[0]

fig = plt.figure(figsize=(14, 10))
plt.imshow(draw_bounding_boxes(img_int,
    pred['boxes'][pred['scores'] > 0.8],
    [classes[i] for i in pred['labels'][pred['scores'] > 0.8].tolist()], width=3
).permute(1, 2, 0))