# Introduction to Object Detection Notebook using PyTorch

This notebook showcases the implementation of object detection using the PyTorch library. 
`Object detection` is a fundamental computer vision task that involves identifying and localizing objects of interest within an image or video.

The primary focus of this notebook is to demonstrate the training process of an object detection model using PyTorch. The model has undergone an extensive training period of 60 hours, with each epoch spanning for 2 hours. The longer training duration allows for more comprehensive learning and refinement of the model's parameters, enabling improved detection accuracy.

`PyTorch`, a popular deep learning framework, provides a flexible and efficient platform for building and training object detection models. It offers a wide range of pre-built modules and tools that streamline the development process and facilitate the integration of advanced techniques.

Throughout this notebook, we will explore the step-by-step implementation of the object detection pipeline, including data preprocessing, model architecture, loss functions, and optimization strategies. We will also utilize commonly used datasets and evaluation metrics to assess the performance of the trained model.

By the end of this notebook, you will have gained a practical understanding of object detection using PyTorch and will be able to apply this knowledge to your own computer vision projects. The comprehensive training process and utilization of the PyTorch library ensure that the resulting model will have a solid foundation for accurate object detection in various real-world scenarios.

## Install and Load the PyCocoTool library

In [None]:
import pycocotools

## Import the required libraries

In [None]:
import PIL.Image
import random
import torch
import torch.utils.data
import numpy as np
from collections import defaultdict
import torchvision.datasets as dset

from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt

import torchvision
torchvision.disable_beta_transforms_warning()

from torchvision import models

import torchvision.transforms as original_transforms
import torchvision.transforms.v2 as transforms
from torchvision.transforms.v2 import functional as F
from torchvision.utils import draw_bounding_boxes
import multiprocessing as mp
from torch import nn
import torch.optim as optim
from tqdm import tqdm

## Set the Hyperparameters

In [None]:
n_gpus = torch.cuda.device_count()
USING_CPU = not torch.cuda.is_available()

DEVICE = torch.device("cuda:0" if (torch.cuda.is_available()  and n_gpus > 0) else "mps")
kwargs = {'num_workers': mp.cpu_count() , 'pin_memory': True} if DEVICE.type=='cuda' else {'num_workers': mp.cpu_count()//2, 'prefetch_factor': 4}

print(f'Num of CPUs: {mp.cpu_count()}')
print(f'Device in use: {DEVICE}')
print(f'Found {n_gpus} GPU Device/s.')

In [None]:
kwargs, USING_CPU

## Create a dataset loader that gives a coco datasset

In [None]:
TRAIN_IMG_DIR = 'coco2017/train2017'
TRAIN_ANN_FILE = 'coco2017/annotations/instances_train2017.json'
USE_PRETRAINED = False
SAVED_MODEL_PATH = '/kaggle/input/object-detection-using-pytorch/ssd300_vgg16_checkpoint_2'

def load_dataset(transform):
    return dset.CocoDetection(root = TRAIN_IMG_DIR, 
                              annFile = TRAIN_ANN_FILE)

coco_train = load_dataset(transform=original_transforms.ToTensor())
print("Number of samples: ", len(coco_train))

In [None]:
coco_train[0][0]

## Create the required Data Augmentations

In [None]:
class RandomHorizontalFlip(object):
    def __init__(self, p=0.5):
        self.p = p
        self.hf = transforms.RandomHorizontalFlip(1)
        
    def __call__(self, img, bboxes):
        
        if torch.rand(1)[0] < self.p:            
            img = self.hf.forward(img)
            bboxes = self.hf.forward(bboxes)
        
        return img, bboxes
    
    
class RandomVerticalFlip(object):
    def __init__(self, p=0.5):
        self.p = p
        self.vf = transforms.RandomVerticalFlip(1)
        
    def __call__(self, img, bboxes):
        if torch.rand(1)[0] < self.p:                    
            img = self.vf.forward(img)
            bboxes = self.vf.forward(bboxes)
        
        return img, bboxes

class Resize(object):
    def __init__(self, size):
        self.size = size
        self.resize = transforms.Resize(self.size, antialias=True)
        
    def __call__(self, img, bboxes):
        img = self.resize.forward(img)
        
        bboxes = self.resize.forward(bboxes)

        return img, bboxes


## Simple function to display the sample

In [None]:
def show(sample):
    import matplotlib.pyplot as plt

    from torchvision.transforms.v2 import functional as F
    from torchvision.utils import draw_bounding_boxes
    
    resize = Resize((300, 300))
    
    rhf = RandomHorizontalFlip()
    rvf = RandomVerticalFlip()
    image, target = sample
    
    image, bboxes = image,target["boxes"] 

    
    image, bboxes = resize(image, bboxes)
    image, bboxes = rhf(image, bboxes)
    image, bboxes = rvf(image, bboxes)
    
    if isinstance(image, PIL.Image.Image):
        image = F.to_tensor(image)
        
    image = F.convert_image_dtype(image, torch.uint8)
    annotated_image = draw_bounding_boxes(image, bboxes, colors="yellow", width=3)

    fig, ax = plt.subplots()
    ax.imshow(annotated_image.permute(1, 2, 0).numpy())
    ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])
    fig.tight_layout()

    fig.show()

In [None]:
sample = coco_train[0]
image, target = sample
print(type(image))
print(type(target), type(target[0]), list(target[0].keys()))

In [None]:
coco_train = dset.wrap_dataset_for_transforms_v2(coco_train)

In [None]:
sample = coco_train[0]
image, target = sample
print(type(image))
print(type(target), list(target.keys()))
print(type(target["boxes"]), type(target["labels"]))

In [None]:
show(sample)

## Transformer that performs the extra data augmentations

transformer v2 has these functions exclusively, do not mistake it for transformer v1 functions

In [None]:
transform = transforms.Compose(
    [
        transforms.RandomPhotometricDistort(),        
        transforms.RandomAutocontrast(),
        transforms.RandomEqualize(),
        transforms.GaussianBlur(kernel_size=3),
        # transforms.ToImageTensor(),
        transforms.PILToTensor(),
        transforms.ConvertImageDtype(torch.float32),
    ]
)

## Create a dataset using wrapper function of transformer v2

In [None]:
# del coco_train
coco_train = load_dataset(transform=transform)
coco_train = dset.wrap_dataset_for_transforms_v2(coco_train)

In [None]:
sample = coco_train[1]
show(sample)

## Create a Dataset class for getting single sample and apply transforms

In [None]:
class NewCocoDataset(Dataset):    
    def __init__(self, coco_dataset, image_size=(312, 312)):
        """
        Arguments:
            coco_dataset (dataset): The coco dataset containing all the expected transforms.
            image_size (tuple): Target image size. Default is (512, 512)
        """
        
        self.coco_dataset = coco_dataset
        self.resize = Resize(image_size)
        self.rhf = RandomHorizontalFlip()
        self.rvf = RandomVerticalFlip()   
        self.transformer = transforms.Compose([
            # transforms.ToImageTensor(),
            transforms.PILToTensor(),
            transforms.ConvertImageDtype(torch.float32),
        ])

        
    def __len__(self):
        return len(self.coco_dataset)
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()
        
        new_target = {}
        
        image, target = self.coco_dataset[idx]
        
        if 'boxes' not in target:    
            new_idx = idx-1
            _img, _t = self.coco_dataset[new_idx]
            while 'boxes' not in _t :
                new_idx -= 1
                _img, _t = self.coco_dataset[new_idx]
                
            image, target = self.coco_dataset[new_idx]
        
        
        image, bboxes = image, target["boxes"] 
            
        image, bboxes = self.resize(image, bboxes)
        image, bboxes = self.rhf(image, bboxes)
        image, bboxes = self.rvf(image, bboxes)
        
        image = self.transformer(image)
        
        new_boxes = []
        for box in bboxes:
            if box[0] < box[2] and box[1] < box[3]:
                new_boxes.append(box)
        
        new_target["boxes"] = torch.stack(new_boxes)
        new_target["labels"] = target["labels"]
    
        return (image, new_target)

## Custom Batching
This class gives a different Batching solution for `CPU` and `GPU`

In [None]:
class CustomBatchs:
    def __init__(self, data):
        transposed_data = list(zip(*data))
        self.inp = torch.stack(transposed_data[0], 0)
        self.tgt = transposed_data[1]

    # custom memory pinning method on custom type
    def pin_memory(self):
        self.inp = self.inp.pin_memory()
        return (self.inp, self.tgt)
    
def collate_wrapper(batch):
    if torch.cuda.is_available():
        return CustomBatchs(batch)
    else:
        return tuple(zip(*batch))

## Create a dataset loader

In [None]:
new_coco_train = NewCocoDataset(coco_train)

data_loader = torch.utils.data.DataLoader(
    new_coco_train,
    batch_size=50 if not USING_CPU else 8,
    shuffle=True,
    # collate_fn=lambda batch: tuple(zip(*batch)),
    collate_fn=collate_wrapper,
    **kwargs
)


In [None]:
for img, tar in tqdm(data_loader):
    pass

## Get the names and their corresponding indices

In [None]:
import pycocotools.coco

coco_anns = pycocotools.coco.COCO(TRAIN_ANN_FILE)
catIDs = coco_anns.getCatIds()
cats = coco_anns.loadCats(catIDs)

name_idx = {}

for sub_dict in cats:
    name_idx[sub_dict["id"]] = sub_dict["name"]
    
del coco_anns, catIDs, cats

In [None]:
data = next(iter(data_loader))
if USING_CPU:
    x = torch.stack(data[0])
else:
    x = data[0]
print(x.shape)
# _labels = [name_idx[i] for i in data[1][0]['labels'].tolist()]
# print(_labels)

plt.imshow(data[0][0].permute(1, 2, 0).numpy())

In [None]:
data[1][0]['boxes']

In [None]:
data[0][0].shape, data[0][1].shape

## Load the base model

`ssd300_vgg16` is used for training.

In [None]:
base_model = models.get_model("ssd300_vgg16", weights=None, weights_backbone=None).train()

## Old Model uses VGG16

In [None]:
# base_model

## New Model uses VGG19 `Removed_for_now`

In [None]:
# new_feature_extractor = models.vgg19(weights=None).train()
# base_model.backbone.features = new_feature_extractor.features[:27]

In [None]:
# base_model

## Initalize the weights

In [None]:
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)

In [None]:
base_model.apply(weights_init)
print(DEVICE)

if (DEVICE.type == 'cuda') and (n_gpus > 1):
    base_model = nn.DataParallel(base_model, list(range(n_gpus)))

## Display the loaded model

In [None]:
base_model.to(DEVICE)

In [None]:
total_params = sum(p.numel() for p in base_model.parameters())
print(f'{total_params:,} total parameters.')
total_trainable_params = sum(
    p.numel() for p in base_model.parameters() if p.requires_grad)
print(f'{total_trainable_params:,} training parameters.')

## Model Hyper Parameters

In [None]:
learning_rate = 1e-4

optimizer = optim.Adam(base_model.parameters(), lr=learning_rate)

In [None]:
if USE_PRETRAINED:
    new_LR = 1e-5 # change this value to set a new Learning Rate for the version of notebook
    
    if USING_CPU:
        checkpoint = torch.load(SAVED_MODEL_PATH, map_location=torch.device('mps'))
    else:
        checkpoint = torch.load(SAVED_MODEL_PATH)
        
    base_model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    for g in optimizer.param_groups:
        g['lr'] = new_LR

## Model Training

In [None]:
EPOCHS = 5

In [None]:
import gc

In [None]:
for epoch in range(EPOCHS):
    running_classifier_loss = 0.0
    running_bbox_loss = 0.0
    running_loss = 0.0
    
    counter = 0
    base_model.train()
    
    for data_point in tqdm(data_loader):
        _i, _t = data_point[0], data_point[1]
        
        if USING_CPU:
            _i = torch.stack(_i)

#         _t = torch.from_numpy(np.asarray(_t))
        
        _i = _i.to(DEVICE)
        _t = [{k: v.to(DEVICE) for k, v in __t.items()} for __t in _t]

        optimizer.zero_grad()


        loss_dict = base_model(_i, _t)
        
#         running_bbox_loss += torch.mean(loss_dict['bbox_regression']).item()
#         running_classifier_loss += torch.mean(loss_dict['classification']).item()

        losses = sum(loss for loss in loss_dict.values())
    
        losses.backward()
        optimizer.step()
        
        running_loss += losses.item()
        
        del loss_dict, losses
        
        counter += 1
        
        if counter % 500 == 499:
            last_classifier_loss = running_classifier_loss / 500 # loss per batch
            last_bbox_loss = running_bbox_loss / 500 # loss per batch
            last_loss = running_loss / 500 # loss per batch
#             print(f'batch {counter + 1} Classification Loss: {last_classifier_loss}', end='')
#             print(f', BBox Loss: {last_bbox_loss}')
            print(f'Epoch {epoch}, Batch {counter + 1}, Running Loss: {last_loss}')
            running_classifier_loss = 0.0
            running_bbox_loss = 0.0
            running_loss = 0.0
            
        gc.collect()

In [None]:
gc.collect()

## Use an image from Validation Set and Display the Results

In [None]:
VAL_IMG_DIR = '/kaggle/input/coco-2017-dataset/coco2017/val2017'
VAL_ANN_FILE = '/kaggle/input/coco-2017-dataset/coco2017/annotations/instances_val2017.json'


def load_val_dataset(transform):
    return dset.CocoDetection(root = VAL_IMG_DIR, 
                              annFile = VAL_ANN_FILE)

val_transform = transforms.Compose(
    [
        transforms.ToImageTensor(),
        transforms.ConvertImageDtype(torch.float32),
    ]
)
coco_val = load_val_dataset(transform=val_transform)
coco_val = dset.wrap_dataset_for_transforms_v2(coco_val)

new_coco_val = NewCocoDataset(coco_val)
val_data_loader = torch.utils.data.DataLoader(
    new_coco_val,
    batch_size=50 if not USING_CPU else 8,
    shuffle=True,
#     collate_fn=lambda batch: tuple(zip(*batch)),
    collate_fn=collate_wrapper,
     **kwargs
)


In [None]:
img_dtype_converter = transforms.ConvertImageDtype(torch.uint8)
data = next(iter(val_data_loader))

_i = data[0]

threshold = 0.5
idx = 3

if USING_CPU:
    _i = torch.stack(_i)

_i = _i.to(DEVICE)
base_model.eval()
p_t = base_model(_i)

confidence_length = len(np.argwhere(p_t[idx]['scores'] > threshold)[0])

p_boxes = p_t[idx]['boxes'][: confidence_length]
p_labels = [name_idx[i] for i in p_t[idx]['labels'][: confidence_length].tolist()]
i_img = img_dtype_converter(_i[idx])

annotated_image = draw_bounding_boxes(i_img, p_boxes, p_labels, colors="yellow", width=3)
fig, ax = plt.subplots()
ax.imshow(annotated_image.permute(1, 2, 0).numpy())
ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])
fig.tight_layout()


fig.show()


## Save and Load Model along with its states

In [None]:
PATH = '/kaggle/working/ssd300_vgg16_checkpoint_2'

torch.save({
            'epoch': EPOCHS,
            'model_state_dict': base_model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            }, PATH)

In [None]:
checkpoint = torch.load(PATH)
base_model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])