In [None]:
# basic
import math
import os
import pickle
from PIL import Image
import random

# data
import numpy as np
import pandas as pd

# plot
import matplotlib.pyplot as plt
import matplotlib.patches as patches

# torch
import torch
import torchvision
from torchvision import transforms
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# vision reference. taken from https://github.com/pytorch/vision/tree/master/references/detection
from engine import train_one_epoch, evaluate
import utils

from dataset import BDDDataset

## Check GPU

In [None]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)

## Config vars

In [None]:
root = '.'

dataset_path = os.path.join(root, 'dataset')
train_dataset_path = os.path.join(dataset_path, 'train')
val_dataset_path = os.path.join(dataset_path, 'val')

val_split = 0.2

target_labels = ['car', 'traffic sign', 'pedestrian']

# create a map for label->id
label_id_map = {}
id_label_map = {}
id_color_map = {}
for i in range(len(target_labels)):
    label_id_map[target_labels[i]] = i
    id_label_map[i] = target_labels[i]
    color = (random.random(), random.random(), random.random())
    id_color_map[i] = color

## Utility functions

In [None]:
def plot_image_with_boxes(img, target):

    # Create figure and axes
    fig, ax = plt.subplots(1)

    # Display the image
    ax.imshow(img.permute(1, 2, 0))

    # draw each box
    for i in range(len(target['boxes'])):
        bottom_left = (target['boxes'][i][0], target['boxes'][i][1])
        width = target['boxes'][i][2] - target['boxes'][i][0]
        height = target['boxes'][i][3] - target['boxes'][i][1]
        
        label = int(target['labels'][i])

        # Create a Rectangle patch
        rect = patches.Rectangle(bottom_left, width, height, linewidth=1,
                                 edgecolor=id_color_map[label], facecolor="none")

        # Add the patch to the Axes
        ax.add_patch(rect)
    plt.show()

In [None]:
def get_transform(train):
    
    transform = transforms.Compose([
        transforms.ToTensor(),
    ])
    
    return transform

In [None]:
def get_fastrcnn_model(num_classes):

    # load the pretrained model
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

    # number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features

    # replace with pretrained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    return model

## Defining our custom dataset

In [None]:
# class BDDDataset(object):
    
#     def __init__(self, root, transforms=None):
#         self.root = root
#         self.transforms = transforms
        
#         # load the train set calculated from prev notebook
#         with open('train_set.pkl', 'rb') as f:
#             self.train_set = pickle.load(f)
            
#         self.imgs = list(os.listdir(train_dataset_path))
        
#     def __len__(self):
#         return len(self.imgs)
    
#     def __getitem__(self, idx):
        
#         # load image
#         img_name = self.imgs[idx]
#         img_path = os.path.join(train_dataset_path, img_name)
#         img = Image.open(img_path).convert('RGB')
        
#         num_instances = len(self.train_set[img_name]['labels'])
#         labels = []
#         boxes = []
        
#         for instance in self.train_set[img_name]['labels']:
#             # making sure we dont have visually unclear instances
#             if instance['category'] in label_id_map and not instance['attributes']['occluded'] and not instance['attributes']['truncated']:
#                 labels.append(label_id_map[instance['category']])
#                 boxes.append([instance['box2d']['x1'], instance['box2d']['y1'], instance['box2d']['x2'], instance['box2d']['y2']])
        
#         # convert all variables to tensors
#         boxes = torch.as_tensor(boxes, dtype=torch.float32)
#         labels = torch.as_tensor(labels, dtype=torch.int64)
#         image_id = torch.tensor([idx])
        
#         area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        
#         # attach all info into a dict target
#         target = {}
#         target['boxes'] = boxes
#         target['labels'] = labels
#         target['image_id'] = image_id
#         target['area'] = area
                
#         if self.transforms is not None:
#             img = self.transforms(img)
        
#         return img, target

## Testing the dataset

In [None]:
random_idx = 142
dataset = BDDDataset(root, transforms=get_transform(train=True))
img, target = dataset[random_idx]
print(f'img shape: {img.shape}')
print(f'target: {target}')
print(f'dataset length: {len(dataset)}')
plot_image_with_boxes(img, target)

## Instantiate the train and val dataloaders

In [None]:
# later see if you can have train-specific transforms
train_dataset = BDDDataset(root, transforms=get_transform(train=True))
val_dataset = BDDDataset(root, transforms=get_transform(train=True))

# split dataset in train and val set
indices = torch.randperm(len(train_dataset)).tolist()
val_split_index = int(val_split * len(train_dataset))
train_dataset = torch.utils.data.Subset(train_dataset, indices[:-1 * val_split_index])
val_dataset = torch.utils.data.Subset(val_dataset, indices[-1 * val_split_index:])

# define the dataloaders
dataloader_train = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=2,
    shuffle=True,
    num_workers=0,
    collate_fn=utils.collate_fn
)

dataloader_val = torch.utils.data.DataLoader(
    val_dataset,
    batch_size=2,
    shuffle=False,
    num_workers=0,
    collate_fn=utils.collate_fn
)

## Train model

In [None]:
def main():
    # define the model
    num_classes = len(target_labels)
    model = get_fastrcnn_model(num_classes)
    model.to(device)

    # construct an optimizer
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

    # decay the LR every 3 steps
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

    # start train
    num_epochs = 10

    for epoch in range(num_epochs):

        print(f'EPOCH {epoch}:')
        train_one_epoch(model, optimizer, dataloader_train, device, epoch, print_freq=10)
        lr_scheduler.step()
        evaluate(model, dataloader_val, device=device)
        
#         model.train()
#         for images, targets in dataloader_train:
#             images = list(image.to(device) for image in images)
#             targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

#             loss_dict = model(images, targets)
#             break

In [None]:
main()

# Fix the boxes area problem