## **Project2 - Neural networks for object detection**

---



**Name:** Dana Haham

**ID:** 209278407

In [None]:
# Imports
import numpy as np
import cv2
import os
import sys
import json

import torchvision
import torchvision.transforms as transforms
import torchvision.models.detection.mask_rcnn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader
from torch.utils.data import Dataset

from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from pycocotools import mask as coco_mask

import albumentations as A
from albumentations.pytorch import ToTensorV2

from PIL import Image, ImageDraw

from google.colab import drive

!pip install roboflow
from roboflow import Roboflow

!pip install torchmetrics
from torchmetrics.detection import MeanAveragePrecision

In [None]:
print(f" is cuda available: {torch.cuda.is_available()}")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Download dataset
rf = Roboflow(api_key="xf1OzOc8OqX4qwcC4Jcf")
project = rf.workspace("drone-when8").project("drone_bird_aircraft")
version = project.version(3)
rf_dataset = version.download("coco-segmentation")

In [None]:
class CustomDataset(Dataset):

    def __init__(self, root_dir, annotation_file, transforms=None):
        self.root_dir = root_dir
        self.transforms = transforms
        self.coco = COCO(annotation_file)

        # Initially get all image IDs
        all_image_ids = self.coco.getImgIds()

        # Filter out image IDs that have no annotations
        self.image_ids = [img_id for img_id in all_image_ids if len(self.coco.getAnnIds(imgIds=img_id)) > 0]

    def __getitem__(self, idx):

        # Loading image
        img_id = self.image_ids[idx]
        img_info = self.coco.loadImgs(img_id)[0]
        img_path = os.path.join(self.root_dir, img_info['file_name'])
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Loading annotation
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        coco_annotations = self.coco.loadAnns(ann_ids)

        # Create arrays with compatible format for the model
        bboxes = []
        labels = []
        masks = []

        for ann in coco_annotations:

          # Convert COCO bbox format from [x, y, width, height] to [x_min, y_min, x_max, y_max]
          bbox = ann['bbox']
          bbox = [bbox[0], bbox[1], min(bbox[0] + bbox[2], img_info['width']), min(bbox[1] + bbox[3], img_info['height'])]

          # Convert COCO segmentation format from polygon to binary mask
          segmentation = ann['segmentation'][0]
          polygon = np.array(segmentation).reshape((-1, 2))
          mask = np.zeros((img_info['height'], img_info['width']), dtype=np.uint8)
          cv2.fillPoly(mask, [polygon.astype(np.int32)], 1)

          bboxes.append(bbox)
          masks.append(mask)
          labels.append(ann['category_id'])

        masks = np.stack(masks, axis=0)

        # Apply transformations
        if self.transforms:
            transformed = self.transforms(image=image, bboxes=bboxes, category_id=labels, segmentation=masks)
            image = transformed['image']
            bboxes = transformed['bboxes'] if transformed['bboxes'] else [[0, 0, 1e-5, 1e-5]]
            labels = transformed['category_id']
            masks = transformed['segmentation']

        bboxes = [bbox if (bbox[2] > 0 and bbox[3] > 0) else [0,0, 1e-5, 1e-5] for bbox in bboxes]

        # Convert everything into torch tensors
        targets = {
        'boxes': torch.as_tensor(bboxes, dtype=torch.float32),
        'labels': torch.as_tensor(labels, dtype=torch.int64),
        'masks': torch.as_tensor(masks, dtype=torch.uint8),
        'image_id': torch.tensor([img_id])
         }

        return image, targets

    def __len__(self):
        return len(self.image_ids)

In [None]:
# Pre-processing the datasets
train_transform = A.Compose([

    # Geometric transforms
    A.Resize(225, 225),
    A.HorizontalFlip(p=0.5),
    A.RandomRotate90(p=0.5),

    # Color and noise transforms
    A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2, p=0.3),
    A.GaussNoise(var_limit=(10.0, 50.0), p=0.5),

    # Normalization and conversion to tensor
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['category_id']))

test_transform = A.Compose([
    A.Resize(225, 225),
    A.Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5)),
    ToTensorV2()
], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['category_id']))

In [None]:
# Train data path
train_path_to_images = os.path.join(rf_dataset.location, "train")
train_path_to_annotations = os.path.join(train_path_to_images, "_annotations.coco.json")

# Validation data path
valid_path_to_images = os.path.join(rf_dataset.location, "valid")
valid_path_to_annotations = os.path.join(valid_path_to_images, "_annotations.coco.json")

# Test data path
test_path_to_images = os.path.join(rf_dataset.location, "test")
test_path_to_annotations = os.path.join(test_path_to_images, "_annotations.coco.json")

In [None]:
# Create datasets
train_dataset = CustomDataset(root_dir = train_path_to_images,
                             annotation_file = train_path_to_annotations,
                             transforms = train_transform)

valid_dataset = CustomDataset(root_dir = valid_path_to_images,
                             annotation_file = valid_path_to_annotations,
                             transforms = test_transform)

test_dataset = CustomDataset(root_dir = test_path_to_images,
                             annotation_file = test_path_to_annotations,
                             transforms = test_transform)

In [None]:
# Load the datasets
train_loader = DataLoader(train_dataset, batch_size=160, shuffle=True, collate_fn=lambda x: tuple(zip(*x)), num_workers=2, pin_memory=True)
valid_loader = DataLoader(valid_dataset, batch_size=160, shuffle=False, collate_fn=lambda x: tuple(zip(*x)), num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=160, shuffle=False, collate_fn=lambda x: tuple(zip(*x)), num_workers=2, pin_memory=True)

In [None]:
# Bird, drone, aircraft and background
num_classes = 4

# Transfer Learning on MASK RCNN with Fine-Tuning

# Load Pretrained Mask R-CNN:
model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)

# Freeze all the layers of the model
for param in model.backbone.parameters():
    param.requires_grad = False

# Unfreeze the classifier and box predictor in the ROI heads
for param in model.roi_heads.box_predictor.parameters():
    param.requires_grad = True

# Unfreeze the mask predictor
for param in model.roi_heads.mask_predictor.parameters():
    param.requires_grad = True

# Modify the model
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
hidden_layer = 256
model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)

model.to(device)

In [None]:
def valid_loss():

    # The way to run loss on validation
    model.train()

    val_loss_sum = 0

    with torch.no_grad():

        # Run mini-batch
        for images, targets in valid_loader:

            # Move batch to device
            images = list(img.to(device) for img in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            # Sum all the losses in the dictionary
            loss_dict = model(images, targets)

            losses = sum(loss for loss in loss_dict.values())
            val_loss_sum += losses.item()

    # Calculate the loss avarage
    val_loss = val_loss_sum / len(valid_loader)

    return val_loss

In [None]:
def accuracy(data_loader):

    # Initialize the metric
    map_metric = MeanAveragePrecision(box_format='xyxy', iou_thresholds=[0.5], class_metrics=True)

    # Set the model to evaluation mode
    model.eval()

    with torch.no_grad():

        # Run mini-batch
        for images, targets in data_loader:

            # Move batch to device
            images = list(img.to(device) for img in images)

            # Get predictions in the model format
            outputs = model(images)

            # Prepare data for MeanAveragePrecision
            preds = []
            for output in outputs:
                pred_boxes = output['boxes'].cpu()
                pred_scores = output['scores'].cpu()
                pred_labels = output['labels'].cpu()
                preds.append({
                    'boxes': pred_boxes,
                    'scores': pred_scores,
                    'labels': pred_labels
                })

            gts = []
            for target in targets:
                gt_boxes = target['boxes']
                gt_labels = target['labels']
                gts.append({
                    'boxes': gt_boxes,
                    'labels': gt_labels
                })

            # Update metric for current batch
            map_metric.update(preds, gts)

    # Compute final mAP
    final_map = map_metric.compute()
    print(f"Mean Average Precision: {final_map}")

    return final_map.get("map")

In [None]:
%load_ext tensorboard
%tensorboard --logdir logs

writer = SummaryWriter('logs')

In [None]:
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = 'expandable_segments:True'

# Traning model
optimizer = optim.Adam(model.parameters())
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size= 10, gamma=0.001)
best_val_loss = float('inf')

epochs = 13
for e in range(epochs):

    # Set the model to tarin mode
    model.train()
    epoch_sum_loss = 0

    # Run mini-batch
    for images, targets in train_loader:

        # Move batch to daeice
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in t.items()} for t in targets]

        # Training pass
        optimizer.zero_grad()

        # Sum all the losses in the dictionary
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        # The model learns by backpropagating
        losses.backward()

        # Optimizes its weights
        optimizer.step()

        epoch_sum_loss += losses.item()

    # Improve model
    scheduler.step()
    lr = optimizer.param_groups[0]['lr']

    # Evaluation
    train_loss = epoch_sum_loss/len(train_loader)
    val_loss = valid_loss()
    val_acc = accuracy(valid_loader)

    # Find the model with the best validation loss
    if val_loss < best_val_loss:
        best_val_loss = val_loss

        # Save the model in the drive
        drive.mount('/content/drive', force_remount=True)
        model_save_path = '/content/drive/MyDrive/Project2/model.pth'
        torch.save(model.state_dict(), model_save_path)

    # Visualizing with TensorBoard
    writer.add_scalar('Loss/train', train_loss, e)
    writer.add_scalar('Loss/val', val_loss, e)
    writer.add_scalar('Accuracy/val', val_acc, e)
    writer.add_scalar('Learning Rate', lr, e)

    print(e)

    print(f"Epoch {e}, train_loss {train_loss:.3f}, val_loss {val_loss:.3f}")

writer.flush()
writer.close()

In [None]:
# Test model
drive.mount('/content/drive', force_remount=True)

# Load trained model
model.load_state_dict(torch.load('/content/drive/MyDrive/Project2/model.pth'))

accuracy(test_loader)