# DATA UNDERSTANDING

In [None]:
import json
import pprint

with open('/kaggle/input/coco-2017-dataset/coco2017/annotations/instances_train2017.json') as train_json:
    train_file = json.load(train_json)

In [None]:
print('Total train images: {}'.format(len(train_file['images'])))
print('================================================')
print('Eg:')
pprint.pprint(train_file['images'][0])


In [None]:
print('Total train annotations: {}'.format(len(train_file['annotations'])))
print('================================================')
print('Eg:')
pprint.pprint(train_file['annotations'][0])

# DATA CLEANUP

In [None]:
from pandas import DataFrame as DF

In [None]:
category_person_id = None

categories = train_file['categories']
for category in categories:
    if category['name'] == 'person':
        category_person_id = category['id']
        break

print('ID Label Person: {}'.format(category_person_id))

In [None]:
# SETUP TRAIN'S DATA

In [None]:
people_train_annotations = []
not_people_train_annotations = {}

train_annotations = train_file['annotations']
for annotation in train_annotations:
    if annotation['category_id'] == category_person_id:
        people_train_annotations.append(annotation)
    else:
        if annotation['image_id'] in not_people_train_annotations:
            not_people_train_annotations[annotation['image_id']] += 1
        else:
            not_people_train_annotations[annotation['image_id']] = 1

print("People's annotations {}".format(len(people_train_annotations)))
print("=====================")
print("Eg: {}".format(people_train_annotations[0]))

print(not_people_train_annotations)


In [None]:
people_train_image_ids = [annotation['image_id'] for annotation in people_train_annotations]

print("People's only images' id eg: {}".format(people_train_image_ids[:5]))

In [None]:
# Get images and combine it with annotations
# Using anotation's image_id
# With help from Pandas' Dataframe for performance reason

train_images_df = DF(train_file['images'])
# train_images_df

people_train_annotations_df = DF(people_train_annotations)
# people_train_annotations_df

people_train_images = DF.merge(people_train_annotations_df, train_images_df, left_on="image_id", right_on="id")

people_train_df = DF(people_train_images, columns=['image_id', 'bbox', 'file_name', 'coco_url'])
people_train_df = people_train_df[people_train_df['image_id'].notna()]
people_train_df = people_train_df.fillna("").apply(list)
people_train_df.sort_values(by="bbox")

# DATASET PREPARATION

In [None]:
pip install pycocotools

In [None]:
import torch
from torchvision import tv_tensors
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import requests
import matplotlib.pyplot as plt

In [None]:
class COCODataset(Dataset):
    def __init__(self, data, transform=None, count=None):
        limit = len(data)

        if count != None:
            limit = count
        
        self.transform = transform
        self.data = data[:count]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_df = self.data.iloc[idx]
        image_id = image_df['image_id']

        img = requests.get(image_df['coco_url'], stream=True).raw
        img = Image.open(img).convert("RGB")
        
        boxes = []
        labels = []
        
        annotations = self.data.loc[self.data['image_id'] == image_id]
        for ann in [annotations]:
            for bbox in ann['bbox']:
                x_min, y_min, width, height = bbox
                boxes.append([x_min, y_min, x_min + width, y_min + height])
#                 boxes.append([x_min, y_min, width, height])
                labels.append(category_person_id)

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        img = tv_tensors.Image(img)
        target = {"boxes": boxes, "labels": labels}

        if self.transform:
            img = self.transform(img)

        return img, target

In [None]:
from torchvision.transforms import v2 as T

def compose_transformation(train=True):
    transforms = []
    
    transforms.append(T.ToDtype(torch.float, scale=True))
    transforms.append(T.ToPureTensor())

    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    
    return T.Compose(transforms)

In [None]:
import os
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/engine.py")
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/utils.py")
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_utils.py")
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/coco_eval.py")
os.system("wget https://raw.githubusercontent.com/pytorch/vision/main/references/detection/transforms.py")

import utils

In [None]:
train_dataset = COCODataset(people_train_df.sort_values(by="image_id")[:500], compose_transformation(True))
train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=4, collate_fn = utils.collate_fn)

print('Data Batch count: {}'.format(len(train_dataloader)))

In [None]:
##### SKIPABLE ####
## PREVIEW DATASET ##

img, targets = train_dataset[5]

# Setup Image
img = T.ToPILImage()(img)
plt.figure(figsize=(8, 8))
plt.imshow(img)

# Setup Bounding Box
for bbox in targets['boxes']:
    print(bbox)
    x_min, y_min, width, height = bbox
    rect = plt.Rectangle((x_min, y_min), width, height, fill=False, color='red', linewidth=2)
    plt.gca().add_patch(rect)
    plt.text(x_min + 3, y_min - 6, "Person", color='white', fontsize=10, bbox=dict(facecolor='red'))

# plt.title(f"Image {i + 1}")
plt.show()

# MODEL SETUP

In [None]:
import torchvision.models.detection as detection

In [None]:
# Setup Model
# min_size = 128
# max_size = 256
# image_mean = [0.485, 0.456, 0.406]
# image_std = [0.229, 0.224, 0.225]
# model = detection.fasterrcnn_resnet50_fpn(weights='DEFAULT', min_size = min_size, max_size = max_size, image_mean = image_mean, image_std = image_std)
model = detection.fasterrcnn_resnet50_fpn()

# Setup Backbone
num_classes = 2 # ['people', 'background']
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)

# Setup Device
torch.cuda.empty_cache()
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Setup Optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# Setup Learning Rate Scheduler
scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=3,
    gamma=0.1
)

# MODEL TRAINING

In [None]:
import time
from engine import train_one_epoch, evaluate

In [None]:
epochs_count = 2
best_metric = 0
best_weight = None

for epoch in range(epochs_count):
    result = train_one_epoch(model, optimizer, train_dataloader, device, epoch, print_freq=25)

    loss = result.meters['loss'].global_avg
    current_metric = -loss

    if best_metric == 0:
        best_metric = current_metric
        best_weight = model.state_dict()
        torch.save(best_weight, '/kaggle/working/best.pt')

    if current_metric > best_metric:
        best_metric = current_metric
        best_weight = model.state_dict()
        torch.save(best_weight, '/kaggle/working/best.pt')
        print(f"Saved best weights with metric: {best_metric}")

    print("Epoch loss: {}".format(loss))
    scheduler.step()

In [None]:
from torchvision.utils import draw_bounding_boxes
from torchvision.io import read_image

image = read_image("/kaggle/input/coco-2017-dataset/coco2017/test2017/000000000212.jpg")
transform = compose_transformation(train=False)

model.eval()
with torch.no_grad():
    x = transform(image)
    x = x[:3, ...].to(device) # convert RGBA -> RGB and move to device
    predictions = model([x, ])
    pred = predictions[0]


image = (255.0 * (image - image.min()) / (image.max() - image.min())).to(torch.uint8)
image = image[:3, ...]
pred_labels = [f"score: {score:.3f}" for label, score in zip(pred["labels"], pred["scores"])]
pred_boxes = pred["boxes"].long()
output_image = draw_bounding_boxes(image, pred_boxes, pred_labels, colors="red")

plt.figure(figsize=(12, 12))
plt.imshow(output_image.permute(1, 2, 0))