### Dependencies Installation

In [1]:
#%%shell

# Download TorchVision repo to use some files from
# references/detection
#git clone https://github.com/pytorch/vision.git
#cd vision
#git checkout v0.3.0

#cp references/detection/utils.py ../
#cp references/detection/transforms.py ../
#cp references/detection/coco_eval.py ../
#cp references/detection/engine.py ../
#cp references/detection/coco_utils.py ../
#pip install pycocotools torchvision
#pip install Pillow pandas

In [None]:
import os
import torch
import torch.utils.data
import torchvision
from PIL import Image
import pandas as pd
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from engine import train_one_epoch, evaluate
import utils

### Read and list no of classes in training data

In [None]:
df = pd.read_csv('../train.csv')
classes = list(df['category_id'].unique())
classes

### Custom Dataset class to supply images and annotations directly from a csv

In [None]:
class PersonCarDataset(torch.utils.data.Dataset):
    def __init__(self, img_folder, csv_file, transforms=None):
        self.img_folder = img_folder
        self.transforms = transforms
        self.dataframe = pd.read_csv(csv_file)
        self.ids = list(self.dataframe['file_name'].unique())
        print(len(self.ids))

    def __getitem__(self, index):
        img_id = self.ids[index]
        annos = self.dataframe[self.dataframe['file_name']==img_id]
        img = Image.open(os.path.join(self.img_folder, img_id))

        num_objs = annos.shape[0]

        # The input should be [xmin, ymin, xmax, ymax]
        boxes = []
        labels = []
        for idx,row in annos.iterrows():
            xmin = row['xmin']
            ymin = row['ymin']
            xmax = row['xmax']
            ymax = row['ymax']
            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(row['category_id'])
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        # Size of bbox (Rectangular)
        areas = []
        for idx,row in annos.iterrows():
            areas.append(row['w']*row['h'])
        areas = torch.as_tensor(areas, dtype=torch.float32)
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        # Annotation is in dictionary format
        my_annotation = {}
        my_annotation["boxes"] = boxes
        my_annotation["labels"] = labels
        my_annotation["image_id"] = torch.tensor([index])
        my_annotation["area"] = areas
        my_annotation["iscrowd"] = iscrowd

        if self.transforms is not None:
            img = self.transforms(img)

        return img, my_annotation

    def __len__(self):
        return len(self.ids)

In [None]:
def get_transform():
    custom_transforms = []
    custom_transforms.append(torchvision.transforms.ToTensor())
    return torchvision.transforms.Compose(custom_transforms)

In [None]:
train_dataset = PersonCarDataset(img_folder='../trainval/images',
                          csv_file='../train.csv',
                          transforms=get_transform())
test_dataset = PersonCarDataset(img_folder='./trainval/images',
                          csv_file='../test.csv',
                          transforms=get_transform())

### Build train and test loaders

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch))

# Batch size
batch_size = 4

train_data_loader = torch.utils.data.DataLoader(train_dataset,
                                          batch_size=batch_size,
                                          shuffle=True,
                                          num_workers=4,
                                          collate_fn=collate_fn)
test_data_loader = torch.utils.data.DataLoader(test_dataset,
                                          batch_size=batch_size,
                                          shuffle=True,
                                          num_workers=4,
                                          collate_fn=collate_fn)


In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
device

In [None]:
#for imgs, annotations in data_loader:
#    print(len(annotations),len(imgs))

### Import model architecture with pre-trained weights

In [None]:
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, len(classes)+1)

In [None]:
model.to(device)

In [None]:
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,step_size=3,gamma=0.1)

In [None]:
len_train,len_test = len(train_data_loader),len(test_data_loader)
print(len_train,len_test)

### Start training and save on each epoch

In [None]:
for epoch in range(1,15):
    train_one_epoch(model, optimizer, train_data_loader, device, epoch, print_freq=5)
    lr_scheduler.step()
    evaluate(model, test_data_loader, device=device)
    torch.save(model,f'./stage2_epoch_{epoch}.model')

In [None]:
def test_prediction(inp_model,pos):
  img, _ = test_dataset[pos]
  inp_model.eval()
  with torch.no_grad():
      prediction = model([img.to(device)])
  image = Image.fromarray(img.mul(255).permute(1, 2, 0).byte().numpy())
  return image,prediction

In [None]:
from PIL import Image, ImageDraw, ImageEnhance
color_map = {1:'green',2:'red'}
def visualize(img,prediction,threshold=0.6):
  preds = len(prediction[0]['boxes'])
  from PIL import Image, ImageFont, ImageDraw, ImageEnhance
  draw = ImageDraw.Draw(img)
  for i in range(0,preds):
      box = list(prediction[0]['boxes'])[i]
      label = int(prediction[0]['labels'][i])
      score = float(prediction[0]['scores'][i])
      if (score>=threshold):
        draw.rectangle(((box[0],box[1]), (box[2], box[3])),outline=color_map[label])
        draw.text((box[0], box[1]), str(score),color=color_map[label])
  return img

### Load nth saved model

In [None]:
loaded_model = torch.load('./stage2_epoch_7.model')

### Visualise inference on test data

In [None]:
image,prediction = test_prediction(loaded_model,42)
visualize(image,prediction)