In [1]:
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, sampler
from torchvision.datasets import VOCDetection

import albumentations as A
from albumentations.pytorch import ToTensorV2
from collections import Counter
from tqdm.notebook import tqdm

# Custom VOC Dataset

In [2]:
def convert_to_yolo_format(target, img_width, img_height, class_mapping):
    """
    Convert annotation data from VOC format to YOLO format.

    Parameters:
        target (dict): Annotation data from VOC dataset.
        img_width (int): Width of the original image.
        img_height (int): Height of the original image.
        class_mapping (dict): Mapping from class names to integer IDs.

    Returns:
        torch.Tensor: Tensor of shape [N, 5] for N bounding boxes, each with [class_id, x_center, y_center, width, height].
    """
    annotations = target['annotation']['object'] # Extract the list of annotations dictionary

    # Get the real width and height of the image from the annotation.
    real_width = int(target['annotation']['size']['width'])
    real_height = int(target['annotation']['size']['height'])

    # Ensure there is only 1 object if there're only 1 object annotations
    if not isinstance(annotations, list): annotations = [annotations]
    boxes = [] # Initialize an empty list to store the converted bounding boxes.

    # Iterate through each annotation and convert it to YOLO format.
    for anno in annotations:
        xmin = int(anno['bndbox']['xmin']) / real_width
        xmax = int(anno['bndbox']['xmax']) / real_width
        ymin = int(anno['bndbox']['ymin']) / real_height
        ymax = int(anno['bndbox']['ymax']) / real_height

        # Calculate the center coordinates, width, and height of the bounding box.
        x_center = (xmin + xmax) / 2
        y_center = (ymin + ymax) / 2
        width = xmax - xmin
        height = ymax - ymin

        # Retrieve the class name from the annotation and map it to an integer ID.
        class_name = anno['name']
        class_id = class_mapping[class_name] if class_name in class_mapping else 0
        boxes.append([class_id, x_center, y_center, width, height]) # Append the YOLO formatted bounding box to the list
    return np.array(boxes)

In [3]:
class CustomVOCDataset(VOCDetection):
    def init_config_yolo(self, class_mapping, S=7, B=2, C=20, custom_transforms=None):
        # Initialize YOLO-specific configuration parameters.
        self.S = S  # Grid size S x S
        self.B = B  # Number of bounding boxes
        self.C = C  # Number of classes
        self.class_mapping = class_mapping  # Mapping of class names to class indices
        self.custom_transforms = custom_transforms

    def __getitem__(self, index):
        # Get an image and its target (annotations) from the VOC dataset.
        image, target = super(CustomVOCDataset, self).__getitem__(index)
        img_width, img_height = image.size

        # Convert target annotations to YOLO format bounding boxes.
        boxes = convert_to_yolo_format(target, img_width, img_height, self.class_mapping)
        just_boxes, labels = boxes[:, 1:], boxes[:, 0]

        if self.custom_transforms:
            sample = {
                'image': np.array(image),
                'bboxes': just_boxes,
                'labels': labels
            }
            sample = self.custom_transforms(**sample)
            image, boxes, labels = sample['image'], sample['bboxes'], sample['labels']

        # Create an empty label matrix for YOLO ground truth.
        label_matrix = torch.zeros((self.S, self.S, self.C + 5 * self.B))
        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.float32)
        image = torch.as_tensor(image, dtype=torch.float32)

        # Iterate through each bounding box in YOLO format.
        for box, class_label in zip(boxes, labels):
            x, y, width, height = box.tolist()
            class_label = int(class_label)

            # Calculate the grid cell (i, j) that this box belongs to.
            i, j = int(self.S * y), int(self.S * x)
            x_cell, y_cell = self.S * x - j, self.S * y - i

            # Calculate the width and height of the box relative to the grid cell
            width_cell, height_cell = (width * self.S, height * self.S)

            # If no object has been found in this specific cell (i, j) before
            if label_matrix[i, j, 20] == 0:
                label_matrix[i, j, 20] = 1 # Mark that an object exists in this cell.

                # Store the box coordinates as an offset from the cell boundaries
                box_coordinates = torch.tensor([x_cell, y_cell, width_cell, height_cell])
                label_matrix[i, j, 21:25] = box_coordinates # Set the box coordinates in the label matrix
                label_matrix[i, j, class_label] = 1 # Set the one-hot encoding for the class label

        return image, label_matrix

# IoU, NMS, and mAP

In [4]:
def intersection_over_union(boxes_preds, boxes_labels, box_format="midpoint"):
    """
    Calculate the Intersection over Union (IoU) between bounding boxes.

    Parameters:
        boxes_preds (tensor): Predicted bounding boxes (BATCH_SIZE, 4)
        boxes_labels (tensor): Ground truth bounding boxes (BATCH_SIZE, 4)
        box_format (str): box format, can be "midpoint" or "corners".

    Returns:
        tensor: Intersection over Union scores for each example.
    """
    if box_format == "midpoint":
        # Calculate coordinates of top-left (x1, y1) and bottom-right (x2, y2) points for predicted boxes
        box1_x1 = boxes_preds[..., 0:1] - boxes_preds[..., 2:3] / 2
        box1_y1 = boxes_preds[..., 1:2] - boxes_preds[..., 3:4] / 2
        box1_x2 = boxes_preds[..., 0:1] + boxes_preds[..., 2:3] / 2
        box1_y2 = boxes_preds[..., 1:2] + boxes_preds[..., 3:4] / 2

        # Calculate coordinates of top-left (x1, y1) and bottom-right (x2, y2) points for ground truth boxes
        box2_x1 = boxes_labels[..., 0:1] - boxes_labels[..., 2:3] / 2
        box2_y1 = boxes_labels[..., 1:2] - boxes_labels[..., 3:4] / 2
        box2_x2 = boxes_labels[..., 0:1] + boxes_labels[..., 2:3] / 2
        box2_y2 = boxes_labels[..., 1:2] + boxes_labels[..., 3:4] / 2

    if box_format == "corners":
        # Extract coordinates for predicted boxes
        box1_x1 = boxes_preds[..., 0:1]
        box1_y1 = boxes_preds[..., 1:2]
        box1_x2 = boxes_preds[..., 2:3]
        box1_y2 = boxes_preds[..., 3:4]

        # Extract coordinates for ground truth boxes
        box2_x1 = boxes_labels[..., 0:1]
        box2_y1 = boxes_labels[..., 1:2]
        box2_x2 = boxes_labels[..., 2:3]
        box2_y2 = boxes_labels[..., 3:4]

    # Calculate the intersection rectangle
    x1 = torch.max(box1_x1, box2_x1)
    y1 = torch.max(box1_y1, box2_y1)
    x2 = torch.min(box1_x2, box2_x2)
    y2 = torch.min(box1_y2, box2_y2)

    # Compute the area of the intersection rectangle, clamp(0) to handle cases where they do not overlap
    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)

    # Calculate the areas of the predicted and ground truth boxes
    box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
    box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))

    # Calculate the IoU, adding a small epsilon to avoid division by 0
    return intersection / (box1_area + box2_area - intersection + 1e-6)

In [5]:
def non_maximum_suppression(boxes, iou_threshold, threshold, box_format="corners"):
    """
    Perform Non-Maximum Suppression on a list of bounding boxes.

    Parameters:
        boxes (list): List of bounding boxes, each represented as [class_pred, prob_score, x1, y1, x2, y2].
        iou_threshold (float): IoU threshold to determine correct predicted bounding boxes.
        threshold (float): Threshold to discard predicted bounding boxes (independent of IoU).
        box_format (str): "midpoint" or "corners" to specify the format of bounding boxes.

    Returns:
        list: List of bounding boxes after performing NMS with a specific IoU threshold.
    """
    assert type(boxes) == list # Check the data type of the input
    boxes = [box for box in boxes if box[1] > threshold] # Filter predicted boxes based on probability threshold
    boxes = sorted(boxes, key=lambda x: x[1], reverse=True) # Sort boxes by probability in descending order
    boxes_after_nms = [] # List to store bounding boxes after NMS

    while boxes: # Continue looping until the list of bounding boxes is empty
        chosen_box = boxes.pop(0) # Get the bounding box with the highest probability

        # Remove bounding boxes with IoU greater than the specified threshold with the chosen box
        boxes = [box for box in boxes if box[0] != chosen_box[0] or intersection_over_union(
            torch.tensor(chosen_box[2:]), torch.tensor(box[2:]), box_format=box_format
        ) < iou_threshold]
        boxes_after_nms.append(chosen_box) # Add the chosen bounding box to the list after NMS
    return boxes_after_nms # Return the list of bounding boxes after NMS

In [6]:
def mean_average_precision(pred_boxes, true_boxes, iou_threshold=0.5, box_format="midpoint", num_classes=20):
    """
    Calculate the mean average precision (mAP).

    Parameters:
        pred_boxes (list): A list containing predicted bounding boxes with each box defined as [train_idx, class_pred, prob_score, x1, y1, x2, y2].
        true_boxes (list): Similar to pred_boxes but containing information about true boxes.
        iou_threshold (float): IoU threshold, where the predicted boxes are considered correct.
        box_format (str): "midpoint" or "corners" used to specify the format of boxes.
        num_classes (int): Number of classes.

    Returns:
        float: The mAP value across all classes with a specific IoU threshold.
    """
    average_precisions = [] # List to store mAP for each class
    epsilon = 1e-6

    for c in range(num_classes):
        detections, ground_truths = [], []

        # Iterate through predicted boxes and true boxes, and only add those belonging to the current class 'c'
        for detection in pred_boxes:
            if detection[1] == c: detections.append(detection)

        for true_box in true_boxes:
            if true_box[1] == c: ground_truths.append(true_box)

        # Find the number of boxes for ech training example
        # The Counter here counts the number of target boxes we have for each training example
        # So if image 0 has 3, and image 1 has 5, we'll have a dict like {0: 3, 1: 5}
        amount_boxes = Counter([gt[0] for gt in ground_truths])

        # Loop through each key, val in the above dict and convert it to the following (for the same example):
        # {0: [0, 0, 0], 1: [0, 0, 0]} where the number of zeros is equal to the number of boxes in the image
        for key, val in amount_boxes.items():
            amount_boxes[key] = torch.zeros(val)

        # Sort by box probability, index 2 is the probability
        detections.sort(key=lambda x: x[2], reverse=True)
        TP = torch.zeros((len(detections)))
        FP = torch.zeros((len(detections)))
        total_true_bboxes = len(ground_truths)

        if total_true_bboxes == 0: continue # If there are no true boxes for this class, it can be safely skipped
        for detection_idx, detection in enumerate(detections):
            # Only consider ground truth boxes with the same training index as the prediction
            ground_truth_img = [bbox for bbox in ground_truths if bbox[0] == detection[0]]
            best_iou = 0

            for idx, gt in enumerate(ground_truth_img):
                iou = intersection_over_union(torch.tensor(detection[3:]), torch.tensor(gt[3:]), box_format=box_format)
                if iou > best_iou:
                    best_iou = iou
                    best_gt_idx = idx

            if best_iou > iou_threshold:
                if amount_boxes[detection[0]][best_gt_idx] == 0: # Only detect ground truth boxes once
                    TP[detection_idx] = 1 # True positive and mark this box as seen
                    amount_boxes[detection[0]][best_gt_idx] = 1
                else: FP[detection_idx] = 1
            else: FP[detection_idx] = 1 # If IoU is lower, the detection result is a false positive

        TP_cumsum = torch.cumsum(TP, dim=0)
        FP_cumsum = torch.cumsum(FP, dim=0)
        recalls = TP_cumsum / (total_true_bboxes + epsilon)

        precisions = torch.divide(TP_cumsum, (TP_cumsum + FP_cumsum + epsilon))
        precisions = torch.cat((torch.tensor([1]), precisions))
        recalls = torch.cat((torch.tensor([0]), recalls))
        average_precisions.append(torch.trapz(precisions, recalls)) # Use torch.trapz for numerical integration
    return sum(average_precisions) / len(average_precisions)

# Bounding Boxes Utilities

In [7]:
def cellboxes_to_boxes(predictions):
    """
    Convert model output from cell grid format to bounding boxes.

    Parameters:
        predictions (tensor): Model predictions, shape (batch_size, S*S*(C+B*5)) or already reshaped to (batch_size, S, S, C+B*5)

    Returns:
        list: List of bounding boxes for each image in the batch, where each box is [class_pred, prob_score, x, y, width, height]
    """
    batch_size = predictions.shape[0]
    S = 7  # Grid size
    B = 2  # Number of boxes per cell
    C = 20  # Number of classes
    predictions = predictions.reshape(batch_size, S, S, C + B * 5) # Reshape predictions if needed
    all_bboxes = [] # Lists to store all bounding boxes for each image


    for i in range(batch_size): # For each image in the batch
        bboxes = []
        for row in range(S): # Iterate through each cell in the grid
            for col in range(S):
                class_probs = predictions[i, row, col, :C] # Get class probabilities

                box1_confidence = predictions[i, row, col, C] # Process first box
                box1_x = (predictions[i, row, col, C+1] + col) / S
                box1_y = (predictions[i, row, col, C+2] + row) / S
                box1_w = predictions[i, row, col, C+3]
                box1_h = predictions[i, row, col, C+4]

                box2_confidence = predictions[i, row, col, C+5] # Process second box
                box2_x = (predictions[i, row, col, C+6] + col) / S
                box2_y = (predictions[i, row, col, C+7] + row) / S
                box2_w = predictions[i, row, col, C+8]
                box2_h = predictions[i, row, col, C+9]

                for c in range(C): # For each class
                    prob_box1 = class_probs[c] * box1_confidence # Probability for class c with box 1
                    if prob_box1 > 0: # Add box 1 if probability is non-zero
                        bboxes.append([c, prob_box1.item(), box1_x.item(), box1_y.item(), box1_w.item(), box1_h.item()])

                    prob_box2 = class_probs[c] * box2_confidence # Probability for class c with box 2
                    if prob_box2 > 0: # Add box 2 if probability is non-zero
                        bboxes.append([c, prob_box2.item(), box2_x.item(), box2_y.item(), box2_w.item(), box2_h.item()])
        all_bboxes.append(bboxes)
    return all_bboxes

In [8]:
def get_bboxes_training(predictions, targets, iou_threshold, threshold):
    """
    Get bounding boxes from predictions and targets for mAP calculation during training.

    Parameters:
        predictions (tensor): Model predictions
        targets (tensor): Ground truth targets
        iou_threshold (float): IoU threshold for NMS
        threshold (float): Confidence threshold for predictions

    Returns:
        tuple: (pred_boxes, target_boxes) where each is a list of bounding boxes in the format
               required by mean_average_precision function
    """
    # Convert predictions and targets to bounding boxes
    pred_boxes_raw = cellboxes_to_boxes(predictions)
    target_boxes_raw = cellboxes_to_boxes(targets)
    all_pred_boxes, all_target_boxes = [], [] # Initialize lists to store processed boxes
    batch_size = predictions.shape[0]

    for idx in range(batch_size): # Process each image in the batch
        nms_boxes = non_maximum_suppression(pred_boxes_raw[idx], iou_threshold=iou_threshold, threshold=threshold, box_format="midpoint")
        for nms_box in nms_boxes: # Add batch index to each box for mAP calculation
            all_pred_boxes.append([idx] + nms_box)

        for target_box in target_boxes_raw[idx]: # Process target boxes (no need for NMS, but add batch index)
            if target_box[1] > threshold: # Only include boxes with non-zero confidence
                all_target_boxes.append([idx] + target_box)

    return all_pred_boxes, all_target_boxes

# YOLOv1

In [9]:
"""
Information about the architectural configuration:
A Tuple is structured as (kernel_size, number of filters, stride, padding).
"M" simply represents max-pooling with a 2x2 pool filter size and 2x2 kernel.
The list is structured according to the data blocks, and ends with an integer representing the number of repetitions.
"""

# Describing convolutional and max-pooling layers, as well as the number of repetitions for convolutional blocks.
architecture_config = [
    (7, 64, 2, 3),   # Convolutional block 1
    "M", # Max-pooling layer 1
    (3, 192, 1, 1),  # Convolutional block 2
    "M", # Max-pooling layer 2
    (1, 128, 1, 0),  # Convolutional block 3
    (3, 256, 1, 1),  # Convolutional block 4
    (1, 256, 1, 0),  # Convolutional block 5
    (3, 512, 1, 1),  # Convolutional block 6
    "M", # Max-pooling layer 3
   [(1, 256, 1, 0), (3, 512, 1, 1), 4],  # Convolutional block 7 (repeated 4 times)
    (1, 512, 1, 0),  # Convolutional block 8
    (3, 1024, 1, 1), # Convolutional block 9
    "M", # Max-pooling layer 4
   [(1, 512, 1, 0), (3, 1024, 1, 1), 2], # Convolutional block 10 (repeated 2 times)
    (3, 1024, 1, 1), # Convolutional block 11
    (3, 1024, 2, 1), # Convolutional block 12
    (3, 1024, 1, 1), # Convolutional block 13
    (3, 1024, 1, 1), # Convolutional block 14
]

In [10]:
class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(CNNBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, bias=False, **kwargs)
        self.batchnorm = nn.BatchNorm2d(out_channels)
        self.leakyrelu = nn.LeakyReLU(0.1)

    def forward(self, x):
        return self.leakyrelu(self.batchnorm(self.conv(x)))


class YOLOv1(nn.Module): # The YOLOv1 model is defined with conv layers and fully connected layers
    def __init__(self, in_channels=3, **kwargs):
        super(YOLOv1, self).__init__()
        self.architecture = architecture_config
        self.in_channels = in_channels
        self.darknet = self._create_conv_layers(self.architecture)
        self.fcs = self._create_fcs(**kwargs)

    def forward(self, x):
        x = self.darknet(x)
        return self.fcs(torch.flatten(x, start_dim=1))


    # Function to create conv layers based on the predefined architecture configuration
    def _create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels

        for x in architecture:
            if type(x) == tuple:
                layers += [CNNBlock(in_channels, x[1], kernel_size=x[0], stride=x[2], padding=x[3])]
                in_channels = x[1]
            elif type(x) == str:
                layers += [nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))]
            elif type(x) == list:
                conv1, conv2 = x[0], x[1]
                num_repeats = x[2]
                for _ in range(num_repeats):
                    layers += [CNNBlock(in_channels, conv1[1], kernel_size=conv1[0], stride=conv1[2], padding=conv1[3])]
                    layers += [CNNBlock(conv1[1], conv2[1], kernel_size=conv2[0], stride=conv2[2], padding=conv2[3])]
                    in_channels = conv2[1]
        return nn.Sequential(*layers)

    # Function to create fully connected layers based on input parameters such as grid size, number of boxes, and number of classes
    def _create_fcs(self, split_size, num_boxes, num_classes):
        S, B, C = split_size, num_boxes, num_classes
        return nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024 * S * S, 4096),
            nn.LeakyReLU(0.1),
            nn.Linear(4096, S * S * (C + B * 5)),
        )

In [11]:
class YoloLoss(nn.Module): # Calculate the loss for the YOLO (v1) model
    def __init__(self, S=7, B=2, C=20):
        super(YoloLoss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")
        """
        S is the grid size of the image (7),
        B is the number of bounding boxes (2),
        C is the number of classes (in PASCAL VOC dataset, it's 20).
        """
        self.S = S
        self.B = B
        self.C = C

        # These are YOLO-specific constants, representing the weight
        # for no object (lambda_noobj) and box coordinates loss (lambda_coord).
        self.lambda_noobj = 0.5
        self.lambda_coord = 5


    def forward(self, predictions, target):
        # Reshape the predictions to the shape (BATCH_SIZE, S * S * (C + B * 5))
        predictions = predictions.reshape(-1, self.S, self.S, self.C + self.B * 5)

        # Calculate IoU for the 2 predicted bounding boxes with the target bounding box.
        iou_b1 = intersection_over_union(predictions[..., 21:25], target[..., 21:25])
        iou_b2 = intersection_over_union(predictions[..., 26:30], target[..., 21:25])
        ious = torch.cat([iou_b1.unsqueeze(0), iou_b2.unsqueeze(0)], dim=0)

        # Get the box with the highest IoU among the 2 predictions.
        # Note that bestbox will have an index of 0 or 1, indicating which box is better.
        iou_maxes, bestbox = torch.max(ious, dim=0)
        exists_box = target[..., 20].unsqueeze(3)  # This represents Iobj_i in the paper

        # FOR BOX COORDINATES
        # Set the boxes with no objects to 0. Choose 1 of the 2 predictions based on the bestbox index calculated earlier.
        box_predictions = exists_box * (bestbox * predictions[..., 26:30] + (1 - bestbox) * predictions[..., 21:25])
        box_targets = exists_box * target[..., 21:25]

        # Take the square root of width and height to ensure positive values.
        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(torch.abs(box_predictions[..., 2:4] + 1e-6))
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])
        box_loss = self.mse(torch.flatten(box_predictions, end_dim=-2), torch.flatten(box_targets, end_dim=-2))

        # FOR OBJECT LOSS
        # pred_box represents the confidence score of the box with the highest IoU.
        pred_box = bestbox * predictions[..., 25:26] + (1 - bestbox) * predictions[..., 20:21]
        object_loss = self.mse(torch.flatten(exists_box * pred_box), torch.flatten(exists_box * target[..., 20:21]))

        # FOR NO OBJECT LOSS
        no_object_loss = self.mse(
            torch.flatten((1 - exists_box) * predictions[..., 20:21], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., 20:21], start_dim=1),
        )
        no_object_loss += self.mse(
            torch.flatten((1 - exists_box) * predictions[..., 25:26], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., 20:21], start_dim=1),
        )

        # FOR CLASS LOSS
        class_loss = self.mse(
            torch.flatten(exists_box * predictions[..., :20], end_dim=-2),
            torch.flatten(exists_box * target[..., :20], end_dim=-2),
        )

        # Calculate the final loss by combining the above components.
        loss = (
            self.lambda_coord * box_loss  # First term
            + object_loss  # Second term
            + self.lambda_noobj * no_object_loss  # Third term
            + class_loss  # Fourth term
        )
        return loss

# Training Constants

In [12]:
torch.manual_seed(123)
LEARNING_RATE = 2e-5

# Specify whether to use "cuda" (GPU) or "cpu" for training.
# "cuda" in the research paper, but using a smaller batch size due to GPU memory limits.
DEVICE = "cuda"
BATCH_SIZE = 32
EPOCHS = 10
NUM_WORKERS = 2 # Number of worker processes for the data loader.
PIN_MEMORY = True # Pin memory will speed up the process of transferring data to the GPU faster.
LOAD_MODEL = False # If False, the training process will not load a pre-trained model.
LOAD_MODEL_FILE = "yolov1.pth.tar" # Specify the file name for the pre-trained model if LOAD_MODEL is True.
WIDTH = 448
HEIGHT = 448

In [13]:
class_mapping = {
    "aeroplane": 0,
    "bicycle": 1,
    "bird": 2,
    "boat": 3,
    "bottle": 4,
    "bus": 5,
    "car": 6,
    "cat": 7,
    "chair": 8,
    "cow": 9,
    "diningtable": 10,
    "dog": 11,
    "horse": 12,
    "motorbike": 13,
    "person": 14,
    "pottedplant": 15,
    "sheep": 16,
    "sofa": 17,
    "train": 18,
    "tvmonitor": 19
}

# Prepare the Data for Training

In [14]:
def get_train_transforms():
    return A.Compose([
        A.OneOf([
            A.HueSaturationValue(hue_shift_limit=0.2, sat_shift_limit=0.2, val_shift_limit=0.2, p=0.9),
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.9)
        ], p=0.9),
        A.ToGray(p=0.01),
        A.HorizontalFlip(p=0.2),
        A.VerticalFlip(p=0.2),
        A.Resize(height=HEIGHT, width=WIDTH, p=1),
        # A.Cutout(num_holes=8, max_h_size=64, max_w_size=64, fill_value=0, p=0.5),
        ToTensorV2(p=1.0),
    ], p=1.0, bbox_params=A.BboxParams(format="yolo", min_area=0, min_visibility=0, label_fields=["labels"]))


def get_valid_transforms():
    return A.Compose([
        A.Resize(height=HEIGHT, width=WIDTH, p=1.0),
        ToTensorV2(p=1.0),
    ], p=1.0, bbox_params=A.BboxParams(format="yolo", min_area=0, min_visibility=0, label_fields=["labels"]))

In [15]:
train_dataset = CustomVOCDataset(root="./data", year="2012", image_set="train", download=True)
val_dataset = CustomVOCDataset(root="./data", year="2012", image_set="val", download=True)
train_dataset.init_config_yolo(class_mapping=class_mapping, custom_transforms=get_train_transforms())
val_dataset.init_config_yolo(class_mapping=class_mapping, custom_transforms=get_valid_transforms())

In [16]:
# Split dataset into train, validation, and test sets using indices
dataset_size = len(val_dataset)
val_size = int(0.15 * dataset_size)
val_sampler = sampler.SubsetRandomSampler(range(val_size))
test_sampler = sampler.SubsetRandomSampler(range(val_size, dataset_size))

# Create data loaders for training, validation, and test sets
train_loader = DataLoader(
    dataset=train_dataset, batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY, drop_last=True
)
val_loader = DataLoader(
    dataset=val_dataset, batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY,
    sampler=val_sampler, drop_last=False
)
test_loader = DataLoader(
    dataset=val_dataset, batch_size=BATCH_SIZE,
    num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY,
    sampler=test_sampler, drop_last=False
)

# Training Utilities

In [17]:
def train_fn(train_loader, model, optimizer, loss_fn, epoch):
    mean_loss,mean_mAP = [], []
    total_batches = len(train_loader)
    loop = tqdm(enumerate(train_loader), total=total_batches)

    for batch_idx, (x, y) in loop:
        x, y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        loss = loss_fn(out, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        pred_boxes, true_boxes = get_bboxes_training(out, y, iou_threshold=0.5, threshold=0.4)
        mAP = mean_average_precision(pred_boxes, true_boxes, iou_threshold=0.5, box_format="midpoint")
        mean_loss.append(loss.item())
        mean_mAP.append(mAP.item())

        # Update the progress bar
        loop.set_description(f"[EPOCH {epoch + 1}/{EPOCHS}] {batch_idx + 1}/{total_batches}")
        loop.set_postfix(loss=loss.item(), mAP=mAP.item())

    avg_loss = sum(mean_loss) / len(mean_loss)
    avg_mAP = sum(mean_mAP) / len(mean_mAP)
    return avg_loss, avg_mAP


def val_fn(test_loader, model, loss_fn):
    model.eval()
    mean_loss, mean_mAP = [], []

    for x, y in test_loader:
        x, y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        loss = loss_fn(out, y)

        pred_boxes, true_boxes = get_bboxes_training(out, y, iou_threshold=0.5, threshold=0.4)
        mAP = mean_average_precision(pred_boxes, true_boxes, iou_threshold=0.5, box_format="midpoint")
        mean_loss.append(loss.item())
        mean_mAP.append(mAP.item())

    avg_loss = sum(mean_loss) / len(mean_loss)
    avg_mAP = sum(mean_mAP) / len(mean_mAP)
    model.train()
    return avg_loss, avg_mAP

# Experiments

In [18]:
model = YOLOv1(split_size=7, num_boxes=2, num_classes=20).to(DEVICE)
optimizer = optim.AdamW(model.parameters(), lr=LEARNING_RATE)
loss_fn = YoloLoss()

if LOAD_MODEL: # Load checkpoint if necessary
    checkpoint = torch.load(LOAD_MODEL_FILE)
    model.load_state_dict(checkpoint["model_state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer_state_dict"])

In [None]:
%%time
best_mAP_train, best_mAP_val = 0, 0
for epoch in range(EPOCHS):
    train_loss, train_mAP = train_fn(train_loader, model, optimizer, loss_fn, epoch)
    val_loss, val_mAP = val_fn(val_loader, model, loss_fn)
    print(f"=> Loss: {train_loss:.4f} - mAP: {train_mAP:.4f} - Val Loss: {val_loss:.4f} - Val mAP: {val_mAP:.4f}")

    if train_mAP > best_mAP_train: best_mAP_train = train_mAP # Update best mAP values
    if val_mAP > best_mAP_val: # Save checkpoint when validation mAP improves
        best_mAP_val = val_mAP
        checkpoint = {
            "state_dict": model.state_dict(),
            "optimizer": optimizer.state_dict(),
        }
        torch.save(checkpoint, LOAD_MODEL_FILE)

  0%|          | 0/178 [00:00<?, ?it/s]

In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()

# Inference

In [None]:
# Draw both ground truth and predicted boxes on the image with labels
def plot_image_with_labels(image, ground_truth_boxes, predicted_boxes, class_mapping):
    inverse_class_mapping = {v: k for k, v in class_mapping.items()} # Invert class mapping for easy access of class names based on indices
    im = np.array(image) # Convert the image to a numpy array and get its dimensions
    height, width, _ = im.shape
    fig, ax = plt.subplots(1) # Create a figure and axis for plotting
    ax.imshow(im)

    for box in ground_truth_boxes: # Plot each ground truth box in green
        label_index, box = box[0], box[2:] # Extract label index and box coordinates

        # Calculate upper left coordinates
        upper_left_x = box[0] - box[2] / 2
        upper_left_y = box[1] - box[3] / 2

        # Create a rectangle patch
        rect = patches.Rectangle(
            (upper_left_x * width, upper_left_y * height),
            box[2] * width, box[3] * height,
            linewidth=1, edgecolor="green", facecolor="none",
        )
        ax.add_patch(rect) # Add the rectangle to the plot

        # Retrieve the class name and add it as text to the plot
        class_name = inverse_class_mapping.get(label_index, "Unknown")
        ax.text(upper_left_x * width, upper_left_y * height, class_name, color="white", fontsize=12, bbox=dict(facecolor="green", alpha=0.2))

    for box in predicted_boxes: # Plot each predicted box in red (Similar processing as for ground truth boxes)
        label_index, box = box[0], box[2:]
        upper_left_x = box[0] - box[2] / 2
        upper_left_y = box[1] - box[3] / 2
        rect = patches.Rectangle(
            (upper_left_x * width, upper_left_y * height),
            box[2] * width, box[3] * height,
            linewidth=1, edgecolor="red", facecolor="none",
        )
        ax.add_patch(rect)
        class_name = inverse_class_mapping.get(label_index, "Unknown")
        ax.text(upper_left_x * width, upper_left_y * height, class_name, color="white", fontsize=12, bbox=dict(facecolor="red", alpha=0.2))
    plt.show()

In [None]:
model = YOLOv1(split_size=7, num_boxes=2, num_classes=20).to(DEVICE)
if LOAD_MODEL: model.load_state_dict(torch.load(LOAD_MODEL_FILE)["state_dict"])
model.eval()

# Iterate over the test dataset and process each batch
for x, y in test_loader:
    x = x.to(DEVICE)
    out = model(x)

    # Convert model output to bounding boxes and apply non-max suppression
    pred_boxes = cellboxes_to_boxes(out)
    gt_boxes = cellboxes_to_boxes(y)

    # Plot the first 8 images with their ground truth and predicted bounding boxes
    for idx in range(8):
        pred_box = non_maximum_suppression(pred_boxes[idx], iou_threshold=0.5, threshold=0.4, box_format="midpoint")
        gt_box = non_maximum_suppression(gt_boxes[idx], iou_threshold=0.5, threshold=0.4, box_format="midpoint")
        image = x[idx].permute(1, 2, 0).to("cpu") / 255.0
        plot_image_with_labels(image, gt_box, pred_box, class_mapping)
    break # Stop after processing the first batch