## Imports

In [25]:
import json
import os
from tqdm import tqdm
import numpy as np
import math
import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import Dataset
from PIL import Image
from rich.console import Console
import torchvision
from torchvision import transforms
import matplotlib.pyplot as plt

## Colab block for loading data folder

In [26]:
IN_COLAB = 'google.colab' in str(get_ipython()) if hasattr(__builtins__, '__IPYTHON__') else False
if IN_COLAB:
    from google.colab import drive
    drive.mount('/content/drive')
    from zipfile import ZipFile
    with ZipFile("drive/MyDrive/train_annotations.zip", 'r') as zip_ref:
        zip_ref.extractall("data/assignment_1/train/")
    with ZipFile("drive/MyDrive/test.zip", 'r') as zip_ref:
        zip_ref.extractall("data/assignment_1/")
    with ZipFile("drive/MyDrive/images_pt1.zip", 'r') as zip_ref:
        zip_ref.extractall("data/assignment_1/train/images/")
    with ZipFile("drive/MyDrive/images_pt2.zip", 'r') as zip_ref:
        zip_ref.extractall("data/assignment_1/train/images/")

## Initial configuration

In [27]:
console = Console()

# Set torch seed
torch.manual_seed(3407)

# Initialize training variables
BATCH = 16
LR = 0.01
MOMENTUM = 0.9

## Defining some custom utilities

In [28]:
class CustomUtils:
  
    # Defining project root in order to avoid relative paths
    PROJECT_ROOT = "."

    # Initializing torch device according to hardware available
    DEVICE = torch.device("cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu")

    IMG_SIZE = 256

    @staticmethod
    def collate_fn(batch):
        """
        Function to combine images, boxes and labels
        :param batch: an iterable of N sets from __getitem__() of CustomDataset
        :return: a tensor of images, lists of varying-size tensors of bounding boxes and labels
        """
        images = list()
        boxes = list()
        labels = list()
        mask_coords = list()
        objectness_mask = list()
        boxes_mask = list()
        labels_mask = list()

        for b in batch:
            images.append(b[0])
            boxes.append(b[1]["boxes"])
            labels.append(b[1]["labels"])
            mask_coords.append(b[1]["objectness"]["coords"])
            objectness_mask.append(b[1]["objectness"]["matrix"])
            boxes_mask.append(b[1]["boxes_mask"])
            labels_mask.append(b[1]["labels_mask"])

        images = torch.stack(images)
        objectness_mask = torch.stack(objectness_mask)
        boxes_mask = torch.stack(boxes_mask)
        labels_mask = torch.stack(labels_mask)

        return images, (boxes, labels, mask_coords, objectness_mask, boxes_mask, labels_mask)

    @staticmethod
    def with_bounding_box(image, target):
        """
        Returns an image with bounding boxes and labels
        :param image: image as Tensor
        :param target: dict representing containing the bounding boxes
        """
        tensor_image = torchvision.utils.draw_bounding_boxes(transforms.PILToTensor()(transforms.ToPILImage()(image)), target['boxes'], target['categories'], colors="red", width=2)
        return transforms.ToPILImage()(tensor_image)

    @staticmethod
    def plot_aspect_ratio_distribution(dataset):
        """
        Returns the aspect ratio distribution of a CustomDataset
        :param dataset: the dataset of type CustomDataset
        """
        aspect_ratios = np.empty(len(dataset), dtype=float)
        for i in tqdm(range(len(dataset))):
            img, _ = dataset[i]
            sizes = img.size
            aspect_ratios = np.append(aspect_ratios, sizes[0] / sizes[1])

        plt.bar(*np.unique(aspect_ratios, return_counts=True))
        return plt

    @staticmethod
    def to_center_coords(boxes):
        new_boxes = []
        for box in boxes:
            w = box[2] - box[0]
            h = box[3] - box[1]
            x = math.ceil(box[0] + w/2)
            y = math.ceil(box[1] + h/2)
            new_boxes.append([x, y, w, h])
        return new_boxes

    @staticmethod
    def i_over_u(batched_predicted_boxes, batched_target_boxes):
        """
        Compute intersection over union of batched Tensors
        """

        pred_x1 = batched_predicted_boxes[..., 0:1] - batched_predicted_boxes[..., 2:3] / 2
        pred_y1 = batched_predicted_boxes[..., 1:2] - batched_predicted_boxes[..., 3:4] / 2
        pred_x2 = batched_predicted_boxes[..., 0:1] + batched_predicted_boxes[..., 2:3] / 2
        pred_y2 = batched_predicted_boxes[..., 1:2] + batched_predicted_boxes[..., 3:4] / 2

        target_x1 = batched_target_boxes[..., 0:1] - batched_target_boxes[..., 2:3] / 2
        target_y1 = batched_target_boxes[..., 1:2] - batched_target_boxes[..., 3:4] / 2
        target_x2 = batched_target_boxes[..., 0:1] + batched_target_boxes[..., 2:3] / 2
        target_y2 = batched_target_boxes[..., 1:2] + batched_target_boxes[..., 3:4] / 2

        intersection_area = (torch.min(pred_x2, target_x2) - torch.max(pred_x1, target_x1)).clamp(0) * (torch.min(pred_y2, target_y2) - torch.max(pred_y1, target_y1)).clamp(0)

        pred_area = torch.abs((pred_x2 - pred_x1) * (pred_y2 - pred_y1))
        target_area = torch.abs((target_x2 - target_x1) * (target_y2 - target_y1))

        union_area = pred_area + target_area - intersection_area

        return intersection_area/(union_area + 1e-8)
    
    @staticmethod
    def build_low_level_feat(in_channels, out_channels, conv_k_size, pool_k_size):
        """
        Builds a low level feature extraction block
        :param in_channels: input channels for the block
        :param out_channels: target output channels (there is no variation inside the block)
        :param conv_k_size: kernel size for convolution
        :param pool_k_size: kernel size for pooling | stride value
        :return Sequential object [Conv -> ReLU -> Conv -> ReLU -> Conv -> BatchNorm -> ReLU -> MaxPool]
        """
        layers = nn.Sequential()
        layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=conv_k_size, padding=1))
        layers.append(nn.ReLU())
        layers.append(nn.Conv2d(out_channels, out_channels, kernel_size=conv_k_size, padding=1))
        layers.append(nn.ReLU())
        layers.append(nn.BatchNorm2d(out_channels))
        layers.append(nn.ReLU())
        layers.append(nn.MaxPool2d(kernel_size=pool_k_size, stride=pool_k_size))
        return layers

    @staticmethod
    def build_inception_components(in_channels, out_channels):
        """
        Builds the inception network components
        :param in_channels: input channels for the block
        :param out_channels: for the four components will be [out_channels, out_channels, out_channels*2, out_channels*2]
        :return the four components of an inception block
        """
        pool = nn.Sequential(
            nn.MaxPool2d(3, 1, padding=1),
            nn.Conv2d(in_channels, out_channels, kernel_size=1)
        ).to(CustomUtils.DEVICE)
        conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1).to(CustomUtils.DEVICE)
        conv2 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels * 2, kernel_size=1),
            nn.Conv2d(out_channels * 2, out_channels * 2, kernel_size=3, padding=1)
        ).to(CustomUtils.DEVICE)
        conv3 = nn.Sequential(
            nn.Conv2d(in_channels, out_channels * 2, kernel_size=1),
            nn.Conv2d(out_channels * 2, out_channels * 2, kernel_size=5, padding=2)
        ).to(CustomUtils.DEVICE)
        return pool, conv1, conv2, conv3

    @staticmethod
    def build_output_components(in_channels, b=2):
        """
        Builds the two output components of a YOLO-style network
        :param in_channels: input channels for the block
        :param b: number of boxes
        :return the three components
        """
        total_boxes_layers = b * 4
        confidence = nn.Sequential(
            nn.Conv2d(in_channels, b, 1),
            nn.Sigmoid()
        ).to(CustomUtils.DEVICE)
        box = nn.Sequential(
            nn.Conv2d(in_channels, total_boxes_layers, 1),
            nn.Conv2d(total_boxes_layers, total_boxes_layers, 9, padding='same'),
            nn.Conv2d(total_boxes_layers, total_boxes_layers, 1),
            nn.ReLU()
        ).to(CustomUtils.DEVICE)
        classes = nn.Sequential(
            nn.Conv2d(in_channels, 13, 1),
            nn.Softmax(dim=1)
        ).to(CustomUtils.DEVICE)
        
        return confidence, box, classes



## Defining Custom Dataset

In [29]:
class CustomDataset(Dataset):
    """
    Class that represents a dataset object to use as input on a CNN
    """
    def __init__(self, root):
        """
        Default initializer
        :param root: path to dataset root
        """
        self.root = root
        self.size = CustomUtils.IMG_SIZE

        # Load images filelist
        self.images = list(sorted(os.listdir(os.path.join(root, "images"))))
        # Load annotations filelist
        self.annotations = list(sorted(os.listdir(os.path.join(root, "annotations"))))

    def __getitem__(self, index):
        """
        Default getter for dataset objects
        :param index: i of the wanted image + annotation
        :return: image as PIL Image and target dictionary
        """
        img = self.__load_image(index)
        target = self.__generate_target(index)
        if self.size is not None:
            img, target = self.__apply_transform(img, target) 

        target["objectness"] = self.__compute_objectness(target['boxes'])
        target["boxes_mask"] = self.__build_target_bb_mask(target['boxes'])
        target["labels_mask"] = self.__build_target_labels_mask(target['objectness']['coords'], target['labels'])

        return img, target

    def __apply_transform(self, img, target):
        """
        Apply a resize transformation to an image and its target
        :param img: image as PIL Image
        :param target: dict representing the bounding boxes
        """
        target["boxes"] = self.__resize_boxes(target["boxes"], img.size)
        transform = transforms.Compose([transforms.ToTensor(), transforms.Resize((self.size, self.size))])
        img = transform(img)
        return img, target

    def __resize_boxes(self, boxes, img_size):
        """
        Apply to bounding boxes the same resize as the corresponding image
        :param boxes: tensor containing the coordinates of the bounding boxes
        :param img_size: size of the original image
        """
        x_scale = self.size/img_size[0]
        y_scale = self.size/img_size[1]

        scaled_boxes = []
        for box in boxes:
            box = box.tolist()
            x = int(np.round(box[0] * x_scale))
            y = int(np.round(box[1] * y_scale))
            x_max = int(np.round(box[2] * x_scale))
            y_max = int(np.round(box[3] * y_scale))
            scaled_boxes.append([x, y, x_max, y_max])
        return torch.as_tensor(scaled_boxes, dtype=torch.float32, device=CustomUtils.DEVICE)

    def __load_image(self, index):
        """
        Load an image from the list of available images
        :param index: i of the wanted image
        :return: the image as a PIL.Image object
        """
        image_path = os.path.join(self.root, "images", self.images[index])
        return Image.open(image_path).convert("RGB")

    def __load_annotation(self, index):
        """
        Load image annotations from the list of available annotations files
        :param index: i of the wanted image
        :return: the annotations as a dict
        """
        annotation_path = os.path.join(self.root, "annotations", self.annotations[index])
        with open(annotation_path, "r") as fp:
            annotation_json = json.load(fp)
        return [value for key, value in annotation_json.items() if "item" in key]

    def __compute_objectness(self, boxes):
        target_matrix = np.zeros(49, dtype=np.float32).reshape(7, 7)
        coords = []
        square_length = np.round(self.size/7, 1)

        for box in boxes:
            box = box.tolist()
            box_center_x, box_center_y = np.round((box[2] - box[0]) / 2 + box[0], 1), np.round((box[3] - box[1]) / 2 + box[1], 1)
            box_center_x, box_center_y = math.floor(box_center_x / square_length), math.floor(box_center_y / square_length)
            target_matrix[box_center_y, box_center_x] = 1.0
            coords.append((box_center_x, box_center_y))

        return {"matrix": torch.as_tensor(target_matrix, dtype=torch.float32, device=CustomUtils.DEVICE), "coords": coords}

    def __build_target_bb_mask(self, boxes):
        target_matrix = np.zeros(49*4, dtype=np.float32).reshape((7, 7, 4))
        square_length = np.round(self.size/7, 1)

        for box in boxes:
            box = box.tolist()
            box_w = box[2] - box[0]
            box_h = box[3] - box[1]
            box_center_x = np.round(box[0] + box_w / 2, 1)
            box_center_y = np.round(box[1] + box_h / 2, 1)
            square_x, square_y = math.floor(box_center_x / square_length), math.floor(box_center_y / square_length)
            square_corner_x, square_corner_y = square_x * square_length, square_y * square_length
            box_center_x = (box_center_x - square_corner_x) / square_length
            box_center_y = (box_center_y - square_corner_y) / square_length
            box_w = box_w / self.size
            box_h = box_h / self.size
            target_matrix[square_y, square_x, 0] = box_center_x
            target_matrix[square_y, square_x, 1] = box_center_y
            target_matrix[square_y, square_x, 2] = box_w
            target_matrix[square_y, square_x, 3] = box_h
        return torch.as_tensor(target_matrix, dtype=torch.float32, device=CustomUtils.DEVICE)

    def __build_target_labels_mask(self, coords, labels):
        target_matrix = np.zeros(49 * 13, dtype=np.float32).reshape((7, 7, 13))

        labels = labels.tolist()
        for index, label in enumerate(labels):
            target_matrix[coords[index][1], coords[index][0], label-1] = 1.

        return torch.as_tensor(target_matrix, dtype=torch.float32, device=CustomUtils.DEVICE)

    def __generate_target(self, index):
        """
        Generate the target dict according to Torch specification
        :param index: i of the wanted annotations
        :return: target dict
        """
        annotations = self.__load_annotation(index)
        boxes = []
        labels = []
        categories = []
        
        for annotation in annotations:
            boxes.append(annotation["bounding_box"])
            labels.append(annotation["category_id"])
            categories.append(annotation['category_name'])

        boxes = torch.as_tensor(boxes, dtype=torch.float32, device=CustomUtils.DEVICE)
        labels = torch.as_tensor(labels, dtype=torch.int64, device=CustomUtils.DEVICE)
        
        return {
            "boxes": boxes,
            "labels": labels,
            "categories": categories,
            "image_id": torch.tensor([index], device=CustomUtils.DEVICE)
        }

    def __len__(self):
        return len(self.images)


## Training dataset and Dataloader

In [30]:
# Loading training dataset 

train_dataset = CustomDataset(os.path.join(CustomUtils.PROJECT_ROOT, "data", "assignment_1", "train"))
train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH, shuffle=True, collate_fn=CustomUtils.collate_fn)

## Network

In [31]:
class ObjectDetectionModel(nn.Module):
    def __init__(self, num_convolutions: int, out_filter: int, conv_k_sizes: list, pool_k_sizes: list):
        super(ObjectDetectionModel, self).__init__()
        if len(conv_k_sizes) != len(pool_k_sizes) or len(conv_k_sizes) != num_convolutions or len(pool_k_sizes) != num_convolutions:
            raise RuntimeError("Mismatch in length of arguments")
        in_filter = 3
        self.conv_blocks = nn.Sequential()
        for i in range(num_convolutions):
            block = CustomUtils.build_low_level_feat(in_filter, out_filter, conv_k_sizes[i], pool_k_sizes[i])
            self.conv_blocks.append(block)
            in_filter = out_filter
            out_filter *= 2
        self.inception1 = CustomUtils.build_inception_components(in_filter, out_filter)
        # self.inception2 = net_utils.build_inception_components(128*6, 128*12)
        self.batch_after_inception = nn.BatchNorm2d(out_filter*6)
        self.activation_after_inception = nn.ReLU()
        self.pool_after_inception = nn.MaxPool2d(2, 2)
        self.output = CustomUtils.build_output_components(out_filter*6)

    def forward(self, x):
        x = self.conv_blocks(x)
        x = [
            self.inception1[0](x),
            self.inception1[1](x),
            self.inception1[2](x),
            self.inception1[3](x)
        ]
        x = torch.cat(x, 1)
        x = self.activation_after_inception(x)
        x = self.pool_after_inception(x)
        x = self.batch_after_inception(x)
        x = [
            self.output[0](x),
            self.output[1](x),
            self.output[2](x)
        ]
        return torch.cat(x, 1)

## Loss Function

In [32]:
class Loss(nn.Module):
    def __init__(self, l1, l2):
        super(Loss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")
        self.l1 = l1
        self.l2 = l2

    def forward(self, predictions, targets):
        predictions = predictions.reshape(-1, 7, 7, 23)
        target_boxes_mask = targets[4]
        objectness_mask = targets[3].unsqueeze(3)

        iou_maxes, best_box = self.__find_best_bb(predictions[..., 2:10], target_boxes_mask)

        box_loss = self.__compute_box_loss(predictions, target_boxes_mask, objectness_mask, best_box)

        confidence_score = self.__compute_confidence_score(best_box, predictions)

        object_loss = self.mse(
            torch.flatten(objectness_mask * confidence_score),
            torch.flatten(targets[3])
        )

        no_object_loss = self.__compute_no_object_loss(objectness_mask, predictions, targets[3])

        class_loss = self.mse(
            torch.flatten(objectness_mask * predictions[..., 10:], end_dim=-2),
            torch.flatten(objectness_mask * targets[5], end_dim=-2)
        )

        return self.l1 * box_loss + object_loss + self.l2 * no_object_loss + class_loss, (self.l1 * box_loss, object_loss, self.l2 * no_object_loss, class_loss)

    def __compute_box_loss(self, predictions, target_boxes_mask, objectness_mask, best_box):
        box_predictions = self.__compute_valid_boxes(predictions[..., 2:10], objectness_mask, best_box)

        box_targets = objectness_mask * target_boxes_mask

        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(
            torch.abs(box_predictions[..., 2:4] + 1e-6))
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])

        return self.mse(
            torch.flatten(box_predictions, end_dim=-2),
            torch.flatten(box_targets, end_dim=-2)
        )

    def __compute_no_object_loss(self, objectness_mask, predictions, targets):
        no_obj_loss_1 = self.mse(
            torch.flatten(((1 - objectness_mask) * predictions[..., 0:1]), start_dim=1),
            torch.flatten(((1 - objectness_mask) * targets.unsqueeze(3)), start_dim=1)
        )
        no_obj_loss_2 = self.mse(
            torch.flatten(((1 - objectness_mask) * predictions[..., 1:2]), start_dim=1),
            torch.flatten(((1 - objectness_mask) * targets.unsqueeze(3)), start_dim=1)
        )
        return no_obj_loss_1 + no_obj_loss_2

    def __compute_confidence_score(self, best_box, predictions):
        """
            Compute confidence score according to YOLOv1
        """
        return best_box * predictions[..., 1:2] + (1 - best_box) * predictions[..., 0:1]

    def __compute_valid_boxes(self, predictions, objectness_mask, best_box):
        """
        Computes the valid predictions based on best bounding box and valid objectness
        """
        return objectness_mask * (best_box * predictions[..., 4:8] + (1 - best_box) * predictions[..., 0:4])

    def __find_best_bb(self, predictions, target_boxes_mask):
        """
        Computes the best predicted bounding box using Intersection Over Union
        """
        iou_bb_1 = CustomUtils.i_over_u(predictions[..., 0:4], target_boxes_mask)  # Computes IOU on first predicted bounding box and target
        iou_bb_2 = CustomUtils.i_over_u(predictions[..., 4:8], target_boxes_mask)  # Computes IOU on first predicted bounding box and target
        iou_bbs = torch.cat([iou_bb_1.unsqueeze(0), iou_bb_2.unsqueeze(0)], dim=0)  # Merge the previous two into a (2, BATCH, 7, 7, 4) Tensor
        return torch.max(iou_bbs, dim=0)  # Return best bounding box for each mask cell (maximum IOU)


In [39]:
num_convolutions = 3
out_filter = 16
conv_k_sizes = [5, 5, 3]
pool_k_sizes = [4, 2, 2]
network = ObjectDetectionModel(num_convolutions, out_filter, conv_k_sizes, pool_k_sizes)
loss_fn = Loss(5, 0.5)
optimizer = torch.optim.Adam(network.parameters(), lr=LR)

In [40]:
def train(num_epochs):
    best_accuracy = 0.0

    network.to(CustomUtils.DEVICE)

    for epoch in range(num_epochs):
        running_loss = 0.

        for i, data in enumerate(tqdm(train_dataloader)):
            images, target = data
            images = images.to(CustomUtils.DEVICE)

            # optimizer.zero_grad()

            outputs = network(images)

            loss_fn_return = loss_fn(outputs, target)
            loss = loss_fn_return[0]
            loss.backward()

            optimizer.step()

            running_loss += loss.item()
            bb = loss_fn_return[1][0]
            obj = loss_fn_return[1][1]
            no_obj = loss_fn_return[1][2]
            cla = loss_fn_return[1][3]
            if i % 10 == 9:
                print(
                    '[%d, %5d] loss: %.3f - bb: %.3f | obj: %.3f | no_obj: %.3f | class: %.3f' %
                    (epoch + 1, i + 1, running_loss / 10, bb, obj, no_obj, cla)
                )
                running_loss = 0.0


In [41]:
train(3)

  0%|          | 2/1141 [00:03<31:50,  1.68s/it]


KeyboardInterrupt: 