# Object Detection with Faster RCNN

Code is for the following video: https://www.youtube.com/watch?v=Uc90rr5jbA4&t=71s

Do give this notebook a thumbs-up if you liked it. Thanks!

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os

We require the latest version of torchvision

Here are all the necessary libraries

In [None]:
import torch
import torchvision
from torchvision import datasets, models
from torchvision.transforms import functional as FT
from torchvision import transforms as T
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, sampler, random_split, Dataset
import copy
import math
from PIL import Image
import cv2
import albumentations as A  # our data augmentation library

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
# remove arnings (optional)
import warnings
warnings.filterwarnings("ignore")
from collections import defaultdict, deque
import datetime
import time
from tqdm import tqdm # progress bar
from torchvision.utils import draw_bounding_boxes

In [None]:
print(torch.__version__)
print(torchvision.__version__)

PyCOCOTools provides many utilities for dealing with datasets in the COCO format, and if you wanted, you could evaluate the model's performance on the dataset with some of the utilities provided with this library.

That is out of scope for this notebook, however.

In [None]:
# our dataset is in cocoformat, we will need pypcoco tools
from pycocotools.coco import COCO

In [None]:
# Now, we will define our transforms
from albumentations.pytorch import ToTensorV2

We use albumentations as our data augmentation library due to its capability to deal with bounding boxes in multiple formats

In [None]:
def get_transforms(train=False):
    if train:
        transform = A.Compose([
            A.Resize(640, 640), # our input size can be 600px
            A.HorizontalFlip(p=0.3),
            A.VerticalFlip(p=0.3),
            A.RandomBrightnessContrast(p=0.1),
            A.ColorJitter(p=0.1),
            ToTensorV2()
        ], bbox_params=A.BboxParams(format='coco'))
    else:
        transform = A.Compose([
            A.Resize(640, 640), # our input size can be 600px
            ToTensorV2()
        ], bbox_params=A.BboxParams(format='coco'))
    return transform

## Dataset

This is our dataset class. It loads all the necessary files and it processes the data so that it can be fed into the model.

In [None]:
import os
import cv2
import copy
import torch
from torchvision import datasets
from pycocotools.coco import COCO

class AquariumDetection(datasets.VisionDataset):
    def __init__(self, root, split='train', transform=None, target_transform=None, transforms=None):
        super().__init__(root, transforms, transform, target_transform)
        self.split = split  # train, valid, test
        self.coco = COCO(os.path.join(root, split, "_annotations.coco.json"))  # annotations stored here
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.ids = [id for id in self.ids if (len(self._load_target(id)) > 0)]

    def _load_image(self, id: int):
        path = self.coco.loadImgs(id)[0]['file_name']
        image = cv2.imread(os.path.join(self.root, self.split, path))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        return image

    def _load_target(self, id):
        return self.coco.loadAnns(self.coco.getAnnIds(id))
    def get_image_name(self, index):
        """Retrieve the image name for the given index."""
        id = self.ids[index]
        return self.coco.loadImgs(id)[0]['file_name']
    def __getitem__(self, index):
        id = self.ids[index]
        image = self._load_image(id)
        target = self._load_target(id)
        target = copy.deepcopy(target)
        
#         print(f"Original bounding boxes for {index}: {target}")
    
        boxes = [t['bbox'] + [t['category_id']] for t in target]  # required annotation format for albumentations
    
        height, width, _ = image.shape  # Get image dimensions
    
        # Filter out bounding boxes with invalid coordinates
        valid_boxes = []
        for box in boxes:
            x_min, y_min, w, h, category_id = box
            x_max, y_max = x_min + w, y_min + h
    
            # Adjust the coordinates check to consider the image dimensions
            if 0 <= x_min <= width and 0 <= y_min <= height and 0 <= x_max <= width and 0 <= y_max <= height:
                valid_boxes.append(box)

        if self.transforms is not None:
            transformed = self.transforms(image=image, bboxes=valid_boxes)
            image = transformed['image']
            boxes = transformed['bboxes']

        new_boxes = []  # convert from xywh to xyxy
        for box in boxes:
            xmin = box[0]
            xmax = xmin + box[2]
            ymin = box[1]
            ymax = ymin + box[3]
            new_boxes.append([xmin, ymin, xmax, ymax])

        # Initialize 'targ' dictionary with keys and empty tensors
        targ = {
            'boxes': torch.zeros((0, 4), dtype=torch.float32),
            'labels': torch.tensor([], dtype=torch.int64),
            'image_id': torch.tensor([], dtype=torch.int64),
            'area': torch.tensor([], dtype=torch.float32),
            'iscrowd': torch.tensor([], dtype=torch.int64)
        }

        # Update 'targ' dictionary if there are valid boxes
        if new_boxes:
            boxes = torch.tensor(new_boxes, dtype=torch.float32)
            targ['boxes'] = boxes
            targ['labels'] = torch.tensor([t['category_id'] for t in target], dtype=torch.int64)
            targ['image_id'] = torch.tensor([t['image_id'] for t in target])
            targ['area'] = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])  # different area calculation
            targ['iscrowd'] = torch.tensor([t['iscrowd'] for t in target], dtype=torch.int64)
            
#         print(f"Processed bounding boxes for {index}: {boxes}")

        return image.div(255), targ  # scale images


    def __len__(self):
        return len(self.ids)


In [None]:
dataset_path = "/rsrch5/home/plm/yshokrollahi/dataset_frcnn"

In [None]:
#load classes
coco = COCO(os.path.join(dataset_path, "train", "_annotations.coco.json"))
categories = coco.cats
n_classes = len(categories.keys())
categories

This code just gets a list of classes

In [None]:
classes = [i[1]['name'] for i in categories.items()]
classes

In [None]:
train_dataset = AquariumDetection(root=dataset_path, transforms=get_transforms(True))

This is a sample image and its bounding boxes, this code does not get the model's output

In [None]:
# Load a sample
sample = train_dataset[23]
img_int = torch.tensor(sample[0] * 255, dtype=torch.uint8)

# Retrieve and print the image name
image_name = train_dataset.get_image_name(50)
print(f"Image Name: {image_name}")

# Check if there are bounding boxes
if len(sample[1]['boxes']) > 0:
    print("Bounding boxes are present.")
else:
    print("No bounding boxes.")

# Draw and display the image with bounding boxes (if any)
plt.imshow(draw_bounding_boxes(
    img_int, sample[1]['boxes'], [classes[i] for i in sample[1]['labels']], width=4
).permute(1, 2, 0))
plt.show()


In [None]:
len(train_dataset)

## Model

Our model is FasterRCNN with a backbone of `MobileNetV3-Large`. We need to change the output layers because we have just 7 classes but this model was trained on 90 classes.

In [None]:
# lets load the faster rcnn model
model = models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features # we need to change the head
model.roi_heads.box_predictor = models.detection.faster_rcnn.FastRCNNPredictor(in_features, n_classes)

This is our collating function for the train dataloader, it allows us to create batches of data that can be easily pass into the model

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

In [None]:
# Define a new batch size
new_batch_size = 16  # for example, increase to 8; adjust this based on your GPU memory

# Create the DataLoader with the new batch size
train_loader = DataLoader(train_dataset, 
                          batch_size=new_batch_size, 
                          shuffle=True, 
                          num_workers=4, 
                          pin_memory=True,  # set to True if using a GPU
                          collate_fn=collate_fn)

# Now you can use the train_loader as before

The following blocks ensures that the model can take in the data and that it will not crash during training

In [None]:
images,targets = next(iter(train_loader))


In [None]:
images = list(image for image in images)
targets = [{k:v for k, v in t.items()} for t in targets]
output = model(images, targets) # just make sure this runs without error

In [None]:
device = torch.device("cuda") # use GPU to train

In [None]:
model = model.to(device)

## Optimizer

Here, we define the optimizer. If you wish, you can also define the LR Scheduler, but it is not necessary for this notebook since our dataset is so small.

> Note, there are a few bugs with the current way `lr_scheduler` is implemented. If you wish to use the scheduler, you will have to fix those bugs

In [None]:
# Now, and optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.01, momentum=0.9, nesterov=True, weight_decay=1e-4)
# lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[16, 22], gamma=0.1) # lr scheduler

In [None]:
import sys

## Training

The following is a function that will train the model for one epoch. Torchvision Object Detections models have a loss function built in, and it will calculate the loss automatically if you pass in the `inputs` and `targets`

In [None]:
def train_one_epoch(model, optimizer, loader, device, epoch):
    model.to(device)
    model.train()
    
#     lr_scheduler = None
#     if epoch == 0:
#         warmup_factor = 1.0 / 1000 # do lr warmup
#         warmup_iters = min(1000, len(loader) - 1)
        
#         lr_scheduler = optim.lr_scheduler.LinearLR(optimizer, start_factor = warmup_factor, total_iters=warmup_iters)
    
    all_losses = []
    all_losses_dict = []
    
    for images, targets in tqdm(loader):
        images = list(image.to(device) for image in images)
        targets = [{k: torch.tensor(v).to(device) for k, v in t.items()} for t in targets]
        
        loss_dict = model(images, targets) # the model computes the loss automatically if we pass in targets
        losses = sum(loss for loss in loss_dict.values())
        loss_dict_append = {k: v.item() for k, v in loss_dict.items()}
        loss_value = losses.item()
        
        all_losses.append(loss_value)
        all_losses_dict.append(loss_dict_append)
        
        if not math.isfinite(loss_value):
            print(f"Loss is {loss_value}, stopping trainig") # train if loss becomes infinity
            print(loss_dict)
            sys.exit(1)
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
#         if lr_scheduler is not None:
#             lr_scheduler.step() # 
        
    all_losses_dict = pd.DataFrame(all_losses_dict) # for printing
    print("Epoch {}, lr: {:.6f}, loss: {:.6f}, loss_classifier: {:.6f}, loss_box: {:.6f}, loss_rpn_box: {:.6f}, loss_object: {:.6f}".format(
        epoch, optimizer.param_groups[0]['lr'], np.mean(all_losses),
        all_losses_dict['loss_classifier'].mean(),
        all_losses_dict['loss_box_reg'].mean(),
        all_losses_dict['loss_rpn_box_reg'].mean(),
        all_losses_dict['loss_objectness'].mean()
    ))

10 Epochs should be enough to train this model for a high accuracy

In [None]:
num_epochs = 250
model_save_path = 'frcnn_models'  # Define the directory to save the models

# Create the directory if it does not exist
if not os.path.exists(model_save_path):
    os.makedirs(model_save_path)

for epoch in range(num_epochs):
    train_one_epoch(model, optimizer, train_loader, device, epoch)
    # lr_scheduler.step() # Uncomment if you are using a learning rate scheduler

    # Save the model every 25 epochs
    if (epoch + 1) % 10 == 0:
        checkpoint = {
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict()
        }
        save_path = os.path.join(model_save_path, f'checkpoint_epoch_{epoch + 1}.pth')
        torch.save(checkpoint, save_path)
        print(f'Model saved at epoch {epoch + 1} in {save_path}')

# Save Model

In [None]:
# our learning rate was too low, due to a lr scheduler bug. For this task, we wont need a scheudul.er
# Save the final trained model
torch.save(model.state_dict(), "frcnn_models/checkpoint_epoch_250.pth")

In [None]:
# Load the trained model (assuming the model architecture is already defined)
model.load_state_dict(torch.load("frcnn_models/checkpoint_epoch_250.pth"))
model.eval()  # Set the model to evaluation mode if you are doing inference


## Trying on sample Images

This is the inference code for the model. First, we set the model to evaluation mode and clear the GPU Cache. We also load a test dataset, so that we can use fresh images that the model hasn't seen.

In [None]:
# we will watch first epoich to ensure no errrors
# while it is training, lets write code to see the models predictions. lets try again
model.eval()
torch.cuda.empty_cache()

In [None]:
test_dataset = AquariumDetection(root=dataset_path, split="test", transforms=get_transforms(False))

In [None]:
img, _ = test_dataset[1]
img_int = torch.tensor(img*255, dtype=torch.uint8)
with torch.no_grad():
    prediction = model([img.to(device)])
    pred = prediction[0]

In [None]:
# it did learn

In [None]:
fig = plt.figure(figsize=(14, 10))
plt.imshow(draw_bounding_boxes(img_int,
    pred['boxes'][pred['scores'] > 0.8],
    [classes[i] for i in pred['labels'][pred['scores'] > 0.8].tolist()], width=4
).permute(1, 2, 0))

In [None]:
# Set the model to evaluation mode and clear the GPU Cache
model.eval()
torch.cuda.empty_cache()

# Load a test dataset image along with its annotations
test_dataset = AquariumDetection(root=dataset_path, split="test", transforms=get_transforms(False))
img, target = test_dataset[6]
img_int = torch.tensor(img*255, dtype=torch.uint8)

# Perform prediction
with torch.no_grad():
    prediction = model([img.to(device)])
    pred = prediction[0]

# Draw actual (ground truth) bounding boxes
actual_boxes_img = draw_bounding_boxes(
    img_int.clone(), 
    target['boxes'], 
    [classes[i] for i in target['labels']], 
    colors='blue', 
    width=4
)

# Set confidence threshold
confidence_threshold = 0.001

# Filter out predictions below the confidence threshold
pred_boxes = pred['boxes'][pred['scores'] > confidence_threshold]
pred_labels = pred['labels'][pred['scores'] > confidence_threshold]

# Draw predicted bounding boxes
predicted_boxes_img = draw_bounding_boxes(
    img_int.clone(), 
    pred_boxes, 
    [classes[i] for i in pred_labels.tolist()], 
    colors='red', 
    width=4
)

# Display the images side by side for comparison
fig, ax = plt.subplots(1, 2, figsize=(20, 10))
ax[0].imshow(actual_boxes_img.permute(1, 2, 0))
ax[0].set_title('Real Bounding Boxes')
ax[0].axis('off')

ax[1].imshow(predicted_boxes_img.permute(1, 2, 0))
ax[1].set_title('Predicted Bounding Boxes')
ax[1].axis('off')

plt.show()


In [None]:
def get_model_predictions(model, dataset, device):
    model.eval()
    predictions = []

    with torch.no_grad():
        for img, _ in dataset:
            img = img.to(device)
            prediction = model([img])
            predictions.append(prediction[0])

    return predictions

# Assuming test_dataset is already created
predictions = get_model_predictions(model, test_dataset, device)


In [None]:
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import numpy as np
import json

def prepare_for_coco_evaluation(predictions, dataset):
    coco_results = []
    for img_id, prediction in zip(dataset.ids, predictions):  # Use actual image IDs from the dataset
        boxes = prediction['boxes'].cpu().numpy()
        scores = prediction['scores'].cpu().numpy()
        labels = prediction['labels'].cpu().numpy()

        for idx in range(boxes.shape[0]):
            box = boxes[idx].tolist()
            score = float(scores[idx])
            label = int(labels[idx])

            coco_result = {
                "image_id": img_id,  # Use actual image ID
                "category_id": label,
                "bbox": [box[0], box[1], box[2] - box[0], box[3] - box[1]],
                "score": score
            }
            coco_results.append(coco_result)
    return coco_results



# Convert model predictions to COCO format
coco_predictions = prepare_for_coco_evaluation(predictions, test_dataset)

# Write to a file (COCO expects a JSON file)
with open('predictions.json', 'w') as f:
    json.dump(coco_predictions, f)

# Load the ground truth annotations
cocoGt = COCO('/rsrch5/home/plm/yshokrollahi/dataset_frcnn/test/_annotations.coco.json')

# Load the predictions
cocoDt = cocoGt.loadRes('predictions.json')

# Create COCO Eval object
cocoEval = COCOeval(cocoGt, cocoDt, 'bbox')

# Evaluate on the dataset
cocoEval.evaluate()
cocoEval.accumulate()
cocoEval.summarize()


# Retrain

In [None]:
# Define your model architecture exactly as before
model = models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=False)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = models.detection.faster_rcnn.FastRCNNPredictor(in_features, n_classes)

# Load the weights
model.load_state_dict(torch.load("frcnn_models/checkpoint_epoch_150.pth"))


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [None]:
optimizer = torch.optim.SGD(params, lr=0.01, momentum=0.9, nesterov=True, weight_decay=1e-4)
# If you saved optimizer state, load it here
# optimizer.load_state_dict(torch.load("optimizer_state.pth"))

num_epochs = 250  # Set the number of epochs you want to train for

for epoch in range(num_epochs):
    train_one_epoch(model, optimizer, train_loader, device, epoch)
    # Optionally save the model and optimizer state at each epoch


In [None]:
# our learning rate was too low, due to a lr scheduler bug. For this task, we wont need a scheudul.er
# Save the final trained model
torch.save(model.state_dict(), "frcnn_models/trained_50.pth")

In [None]:
# Set the model to evaluation mode and clear the GPU Cache
model.eval()
torch.cuda.empty_cache()

# Load a test dataset image along with its annotations
test_dataset = AquariumDetection(root=dataset_path, split="test", transforms=get_transforms(False))
img, target = test_dataset[76]
img_int = torch.tensor(img*255, dtype=torch.uint8)

# Perform prediction
with torch.no_grad():
    prediction = model([img.to(device)])
    pred = prediction[0]

# Draw actual (ground truth) bounding boxes
actual_boxes_img = draw_bounding_boxes(
    img_int.clone(), 
    target['boxes'], 
    [classes[i] for i in target['labels']], 
    colors='blue', 
    width=4
)

# Set confidence threshold
confidence_threshold = 0.001

# Filter out predictions below the confidence threshold
pred_boxes = pred['boxes'][pred['scores'] > confidence_threshold]
pred_labels = pred['labels'][pred['scores'] > confidence_threshold]

# Draw predicted bounding boxes
predicted_boxes_img = draw_bounding_boxes(
    img_int.clone(), 
    pred_boxes, 
    [classes[i] for i in pred_labels.tolist()], 
    colors='red', 
    width=4
)

# Display the images side by side for comparison
fig, ax = plt.subplots(1, 2, figsize=(20, 10))
ax[0].imshow(actual_boxes_img.permute(1, 2, 0))
ax[0].set_title('Real Bounding Boxes')
ax[0].axis('off')

ax[1].imshow(predicted_boxes_img.permute(1, 2, 0))
ax[1].set_title('Predicted Bounding Boxes')
ax[1].axis('off')

plt.show()
