In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib import patches
from collections import Counter
import cv2
from glob import glob
from tqdm import tqdm
from termcolor import colored

import torch
from torch import nn
from torch import optim
from torch.utils.data import DataLoader

import torchvision
from torchvision import transforms

import albumentations as A
from albumentations.pytorch import ToTensorV2

In [None]:
class CustomVOCDataset(torchvision.datasets.VOCDetection):
    def init_config_yolo(self, class_mapping, S=7, B=2, C=20, custom_transforms=None):
        self.S = S  # Grid size S x S
        self.B = B  # Number of bounding boxes
        self.C = C  # Number of classes
        self.class_mapping = class_mapping  # Mapping of class names to class indices
        self.custom_transforms = custom_transforms

    def __getitem__(self, index):
        image, target = super(CustomVOCDataset, self).__getitem__(index)
        img_width, img_height = image.size

        # Convert target annotations to YOLO format bounding boxes
        boxes = convert_to_yolo_format(target, img_width, img_height, self.class_mapping)

        # Transform
        if self.custom_transforms:
            sample = {
                'image': np.array(image),
                'bboxes': boxes[:, 1:],  # Exclude class_id
                'labels': boxes[:, 0]    # Class_id
            }
            sample = self.custom_transforms(**sample)
            image = sample['image']
            boxes = sample['bboxes']
            labels = sample['labels']

        # Create an empty label matrix for YOLO ground truth
        label_matrix = torch.zeros((self.S, self.S, self.C + 5 * self.B))

        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = torch.tensor(labels, dtype=torch.float32)
        image = torch.as_tensor(image, dtype=torch.float32)

        # Iterate through each bounding box in YOLO format
        for box, class_label in zip(boxes, labels):
            x, y, width, height = box.tolist()
            class_label = int(class_label)

            # Calculate the grid cell (i, j) that this box belongs to
            i, j = int(self.S * y), int(self.S * x)
            x_cell, y_cell = self.S * x - j, self.S * y - i

            # Calculate the width and height of the box relative to the grid cell
            width_cell, height_cell = width * self.S, height * self.S

            # If no object has been found in this specific cell (i, j) before:
            if label_matrix[i, j, 20] == 0:
                # Mark that an object exists in this cell
                label_matrix[i, j, 20] = 1

                # Store the box coordinates as an offset from the cell boundaries
                box_coordinates = torch.tensor([x_cell, y_cell, width_cell, height_cell])

                # Set the box coordinates in the label matrix
                label_matrix[i, j, 21:25] = box_coordinates

                # Set the one-hot encoding for the class label
                label_matrix[i, j, class_label] = 1

        return image, label_matrix

In [None]:
def convert_to_yolo_format(target, img_width, img_height, class_mapping):
    annotations = target['annotation']['object']
    real_width = int(target['annotation']['size']['width'])
    real_height = int(target['annotation']['size']['height'])

    if not isinstance(annotations, list):
        annotations = [annotations]

    boxes = []
    for anno in annotations:
        xmin = int(anno['bndbox']['xmin']) / real_width
        xmax = int(anno['bndbox']['xmax']) / real_width
        ymin = int(anno['bndbox']['ymin']) / real_height
        ymax = int(anno['bndbox']['ymax']) / real_height

        x_center = (xmin + xmax) / 2
        y_center = (ymin + ymax) / 2
        width = xmax - xmin
        height = ymax - ymin

        class_name = anno['name']
        class_id = class_mapping[class_name] if class_name in class_mapping else -1

        boxes.append([class_id, x_center, y_center, width, height])

    return np.array(boxes)

In [None]:
def intersection_over_union(boxes_preds, boxes_labels, box_format="midpoint"):
    if box_format == "midpoint":
        box1_x1 = boxes_preds[..., 0:1] - boxes_preds[..., 2:3] / 2
        box1_y1 = boxes_preds[..., 1:2] - boxes_preds[..., 3:4] / 2
        box1_x2 = boxes_preds[..., 0:1] + boxes_preds[..., 2:3] / 2
        box1_y2 = boxes_preds[..., 1:2] + boxes_preds[..., 3:4] / 2

        box2_x1 = boxes_labels[..., 0:1] - boxes_labels[..., 2:3] / 2
        box2_y1 = boxes_labels[..., 1:2] - boxes_labels[..., 3:4] / 2
        box2_x2 = boxes_labels[..., 0:1] + boxes_labels[..., 2:3] / 2
        box2_y2 = boxes_labels[..., 1:2] + boxes_labels[..., 3:4] / 2

    elif box_format == "corners":
        box1_x1 = boxes_preds[..., 0:1]
        box1_y1 = boxes_preds[..., 1:2]
        box1_x2 = boxes_preds[..., 2:3]
        box1_y2 = boxes_preds[..., 3:4]

        box2_x1 = boxes_labels[..., 0:1]
        box2_y1 = boxes_labels[..., 1:2]
        box2_x2 = boxes_labels[..., 2:3]
        box2_y2 = boxes_labels[..., 3:4]

    x1 = torch.max(box1_x1, box2_x1)
    y1 = torch.max(box1_y1, box2_y1)
    x2 = torch.min(box1_x2, box2_x2)
    y2 = torch.min(box1_y2, box2_y2)

    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)
    box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
    box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))

    return intersection / (box1_area + box2_area - intersection + 1e-6)

def non_max_suppression(bboxes, iou_threshold, threshold, box_format="corners"):
    assert type(bboxes) == list
    bboxes = [box for box in bboxes if box[1] > threshold]
    bboxes = sorted(bboxes, key=lambda x: x[1], reverse=True)

    boxes_after_nms = []
    while bboxes:
        chosen_box = bboxes.pop(0)
        bboxes = [
            box
            for box in bboxes
            if box[0] != chosen_box[0]
            or intersection_over_union(
                torch.tensor(chosen_box[2:]),
                torch.tensor(box[2:]),
                box_format=box_format,
            ) < iou_threshold
        ]
        boxes_after_nms.append(chosen_box)

    return boxes_after_nms

In [None]:
architecture_config = [
    (7, 64, 2, 3),  # Convolutional block 1
    "M",            # Max-pooling layer 1
    (3, 192, 1, 1), # Convolutional block 2
    "M",            # Max-pooling layer 2
    (1, 128, 1, 0), # Convolutional block 3
    (3, 256, 1, 1), # Convolutional block 4
    (1, 256, 1, 0), # Convolutional block 5
    (3, 512, 1, 1), # Convolutional block 6
    "M",            # Max-pooling layer 3
    [(1, 256, 1, 0), (3, 512, 1, 1), 4], # Convolutional block 7 (repeated 4 times)
    (1, 512, 1, 0), # Convolutional block 8
    (3, 1024, 1, 1),# Convolutional block 9
    "M",            # Max-pooling layer 4
    [(1, 512, 1, 0), (3, 1024, 1, 1), 2], # Convolutional block 10 (repeated 2 times)
    (3, 1024, 1, 1),# Convolutional block 11
    (3, 1024, 2, 1),# Convolutional block 12
    (3, 1024, 1, 1),# Convolutional block 13
    (3, 1024, 1, 1),# Convolutional block 14
]

class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(CNNBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, bias=False, **kwargs)
        self.batchnorm = nn.BatchNorm2d(out_channels)
        self.leakyrelu = nn.LeakyReLU(0.1)

    def forward(self, x):
        return self.leakyrelu(self.batchnorm(self.conv(x)))

class Yolov1(nn.Module):
    def __init__(self, in_channels=3, **kwargs):
        super(Yolov1, self).__init__()
        self.architecture = architecture_config
        self.in_channels = in_channels
        self.darknet = self._create_conv_layers(self.architecture)
        self.fcs = self._create_fcs(**kwargs)

    def forward(self, x):
        x = self.darknet(x)
        return self.fcs(torch.flatten(x, start_dim=1))

    def _create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels

        for x in architecture:
            if type(x) == tuple:
                layers += [
                    CNNBlock(
                        in_channels, x[1], kernel_size=x[0], stride=x[2], padding=x[3],
                    )
                ]
                in_channels = x[1]
            elif type(x) == str:
                layers += [nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))]
            elif type(x) == list:
                conv1 = x[0]
                conv2 = x[1]
                num_repeats = x[2]

                for _ in range(num_repeats):
                    layers += [
                        CNNBlock(
                            in_channels,
                            conv1[1],
                            kernel_size=conv1[0],
                            stride=conv1[2],
                            padding=conv1[3],
                        )
                    ]
                    layers += [
                        CNNBlock(
                            conv1[1],
                            conv2[1],
                            kernel_size=conv2[0],
                            stride=conv2[2],
                            padding=conv2[3],
                        )
                    ]
                    in_channels = conv2[1]

        return nn.Sequential(*layers)

    def _create_fcs(self, split_size, num_boxes, num_classes):
        S, B, C = split_size, num_boxes, num_classes
        return nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024 * S * S, 4096),
            nn.Dropout(0.0),
            nn.LeakyReLU(0.1),
            nn.Linear(4096, S * S * (C + B * 5)),
        )

In [None]:
class YoloLoss(nn.Module):
    def __init__(self, S=7, B=2, C=20):
        super(YoloLoss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")
        self.S = S
        self.B = B
        self.C = C
        self.lambda_noobj = 0.5
        self.lambda_coord = 5

    def forward(self, predictions, target):
        predictions = predictions.reshape(-1, self.S, self.S, self.C + self.B * 5)
        iou_b1 = intersection_over_union(predictions[..., 21:25], target[..., 21:25])
        iou_b2 = intersection_over_union(predictions[..., 26:30], target[..., 21:25])
        ious = torch.cat([iou_b1.unsqueeze(0), iou_b2.unsqueeze(0)], dim=0)
        iou_maxes, bestbox = torch.max(ious, dim=0)
        exists_box = target[..., 20].unsqueeze(3)

        # Box coordinates loss
        box_predictions = exists_box * (
            bestbox * predictions[..., 26:30] + (1 - bestbox) * predictions[..., 21:25]
        )
        box_targets = exists_box * target[..., 21:25]
        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(
            torch.abs(box_predictions[..., 2:4] + 1e-6)
        )
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])
        box_loss = self.mse(
            torch.flatten(box_predictions, end_dim=-2),
            torch.flatten(box_targets, end_dim=-2),
        )

        # Object loss
        pred_box = (
            bestbox * predictions[..., 25:26] + (1 - bestbox) * predictions[..., 20:21]
        )
        object_loss = self.mse(
            torch.flatten(exists_box * pred_box),
            torch.flatten(exists_box * target[..., 20:21]),
        )

        # No object loss
        no_object_loss = self.mse(
            torch.flatten((1 - exists_box) * predictions[..., 20:21], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., 20:21], start_dim=1),
        )
        no_object_loss += self.mse(
            torch.flatten((1 - exists_box) * predictions[..., 25:26], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., 20:21], start_dim=1),
        )

        # Class loss
        class_loss = self.mse(
            torch.flatten(exists_box * predictions[..., :20], end_dim=-2),
            torch.flatten(exists_box * target[..., :20], end_dim=-2),
        )

        loss = (
            self.lambda_coord * box_loss
            + object_loss
            + self.lambda_noobj * no_object_loss
            + class_loss
        )

        return loss

In [None]:
# Set the random seed for reproducibility
seed = 123
torch.manual_seed(seed)

# Hyperparameters and configurations
LEARNING_RATE = 2e-5
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 16
EPOCHS = 300
NUM_WORKERS = 2
PIN_MEMORY = True
LOAD_MODEL = False
LOAD_MODEL_FILE = "yolov1.pth.tar"

# Define data transformations
WIDTH = 448
HEIGHT = 448

def get_train_transforms():
    return A.Compose([
        A.OneOf([
            A.HueSaturationValue(hue_shift_limit=0.2, sat_shift_limit=0.2, val_shift_limit=0.2, p=0.9),
            A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.9)
        ], p=0.9),
        A.ToGray(p=0.01),
        A.HorizontalFlip(p=0.2),
        A.VerticalFlip(p=0.2),
        A.Resize(height=WIDTH, width=WIDTH, p=1),
        ToTensorV2(p=1.0),
    ], bbox_params=A.BboxParams(format='yolo', min_area=0, min_visibility=0, label_fields=['labels']))

def get_valid_transforms():
    return A.Compose([
        A.Resize(height=WIDTH, width=WIDTH, p=1.0),
        ToTensorV2(p=1.0),
    ], bbox_params=A.BboxParams(format='yolo', min_area=0, min_visibility=0, label_fields=['labels']))

# Define class mapping
class_mapping = {
    "aeroplane": 0, "bicycle": 1, "bird": 2, "boat": 3, "bottle": 4,
    "bus": 5, "car": 6, "cat": 7, "chair": 8, "cow": 9,
    "diningtable": 10, "dog": 11, "horse": 12, "motorbike": 13, "person": 14,
    "pottedplant": 15, "sheep": 16, "sofa": 17, "train": 18, "tvmonitor": 19
}

# Define training and evaluation functions
def train_fn(train_loader, model, optimizer, loss_fn, epoch):
    model.train()
    mean_loss = []
    mean_mAP = []

    total_batches = len(train_loader)
    display_interval = total_batches // 5

    for batch_idx, (x, y) in enumerate(train_loader):
        x, y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        loss = loss_fn(out, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        pred_boxes, true_boxes = get_boxes_training(out, y, iou_threshold=0.5, threshold=0.4)
        mAP = mean_average_precision(pred_boxes, true_boxes, iou_threshold=0.5, box_format="midpoint")

        mean_loss.append(loss.item())
        mean_mAP.append(mAP.item())

        if batch_idx % display_interval == 0 or batch_idx == total_batches - 1:
            print(f"Epoch: {epoch:3} \t Iter: {batch_idx:3}/{total_batches:3} \t Loss: {loss.item():3.10f} \t mAP: {mAP.item():3.10f}")

    avg_loss = sum(mean_loss) / len(mean_loss)
    avg_mAP = sum(mean_mAP) / len(mean_mAP)
    print(colored(f"Train \t loss: {avg_loss:3.10f} \t mAP: {avg_mAP:3.10f}", 'green'))
    return avg_mAP

def test_fn(test_loader, model, loss_fn, epoch):
    model.eval()
    mean_loss = []
    mean_mAP = []

    for batch_idx, (x, y) in enumerate(test_loader):
        x, y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        loss = loss_fn(out, y)

        pred_boxes, true_boxes = get_boxes_training(out, y, iou_threshold=0.5, threshold=0.4)
        mAP = mean_average_precision(pred_boxes, true_boxes, iou_threshold=0.5, box_format="midpoint")

        mean_loss.append(loss.item())
        mean_mAP.append(mAP.item())

    avg_loss = sum(mean_loss) / len(mean_loss)
    avg_mAP = sum(mean_mAP) / len(mean_mAP)
    print(colored(f"Test \t loss: {avg_loss:3.10f} \t mAP: {avg_mAP:3.10f}", 'yellow'))
    return avg_mAP

# Main training function
def train():
    model = Yolov1(split_size=7, num_boxes=2, num_classes=20).to(DEVICE)
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    loss_fn = YoloLoss()

    if LOAD_MODEL:
        load_checkpoint(torch.load(LOAD_MODEL_FILE), model, optimizer)

    train_dataset = CustomVOCDataset(
        root='./data',
        year='2012',
        image_set='train',
        download=True,
    )
    train_dataset.init_config_yolo(class_mapping=class_mapping, custom_transforms=get_train_transforms())

    testval_dataset = CustomVOCDataset(
        root='./data',
        year='2012',
        image_set='val',
        download=True,
    )
    testval_dataset.init_config_yolo(class_mapping=class_mapping, custom_transforms=get_valid_transforms())

    dataset_size = len(testval_dataset)
    val_size = int(0.15 * dataset_size)
    test_size = dataset_size - val_size

    val_indices = list(range(val_size))
    test_indices = list(range(val_size, val_size + test_size))

    val_sampler = SubsetRandomSampler(val_indices)
    test_sampler = SubsetRandomSampler(test_indices)

    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=BATCH_SIZE,
        num_workers=NUM_WORKERS,
        pin_memory=PIN_MEMORY,
        drop_last=True,
    )

    val_loader = DataLoader(
        dataset=testval_dataset,
        batch_size=BATCH_SIZE,
        num_workers=NUM_WORKERS,
        pin_memory=PIN_MEMORY,
        sampler=val_sampler,
        drop_last=False,
    )

    test_loader = DataLoader(
        dataset=testval_dataset,
        batch_size=BATCH_SIZE,
        num_workers=NUM_WORKERS,
        pin_memory=PIN_MEMORY,
        sampler=test_sampler,
        drop_last=False,
    )

    best_mAP_train = 0
    best_mAP_val = 0
    best_mAP_test = 0

    for epoch in range(EPOCHS):
        train_mAP = train_fn(train_loader, model, optimizer, loss_fn, epoch)
        val_mAP = test_fn(val_loader, model, loss_fn, epoch)
        test_mAP = test_fn(test_loader, model, loss_fn, epoch)

        if train_mAP > best_mAP_train:
            best_mAP_train = train_mAP
        if val_mAP > best_mAP_val:
            best_mAP_val = val_mAP
            checkpoint = {
                "state_dict": model.state_dict(),
                "optimizer": optimizer.state_dict(),
            }
            save_checkpoint(checkpoint, filename=LOAD_MODEL_FILE)
        if test_mAP > best_mAP_test:
            best_mAP_test = test_mAP

        print(colored(f"Best Train mAP: {best_mAP_train:3.10f}", 'green'))
        print(colored(f"Best Val mAP: {best_mAP_val:3.10f}", 'blue'))
        print(colored(f"Best Test mAP: {best_mAP_test:3.10f}", 'yellow'))

# Run training
train()

In [None]:
def plot_image_with_labels(image, ground_truth_boxes, predicted_boxes, class_mapping):
    inverted_class_mapping = {v: k for k, v in class_mapping.items()}
    im = np.array(image)
    height, width, _ = im.shape

    fig, ax = plt.subplots(1)
    ax.imshow(im)

    for box in ground_truth_boxes:
        label_index, box = box[0], box[2:]
        upper_left_x = box[0] - box[2] / 2
        upper_left_y = box[1] - box[3] / 2
        rect = patches.Rectangle(
            (upper_left_x * width, upper_left_y * height),
            box[2] * width,
            box[3] * height,
            linewidth=1,
            edgecolor="green",
            facecolor="none",
        )
        ax.add_patch(rect)
        class_name = inverted_class_mapping.get(label_index, "Unknown")
        ax.text(upper_left_x * width, upper_left_y * height, class_name, color='white', fontsize=12, bbox=dict(facecolor='green', alpha=0.2))

    for box in predicted_boxes:
        label_index, box = box[0], box[2:]
        upper_left_x = box[0] - box[2] / 2
        upper_left_y = box[1] - box[3] / 2
        rect = patches.Rectangle(
            (upper_left_x * width, upper_left_y * height),
            box[2] * width,
            box[3] * height,
            linewidth=1,
            edgecolor="r",
            facecolor="none",
        )
        ax.add_patch(rect)
        class_name = inverted_class_mapping.get(label_index, "Unknown")
        ax.text(upper_left_x * width, upper_left_y * height, class_name, color='white', fontsize=12, bbox=dict(facecolor='red', alpha=0.2))

    plt.show()

def test():
    model = Yolov1(split_size=7, num_boxes=2, num_classes=20).to(DEVICE)

    if LOAD_MODEL:
        model.load_state_dict(torch.load(LOAD_MODEL_FILE)['state_dict'])

    test_dataset = CustomVOCDataset(root='./data', image_set='val', download=False)
    test_dataset.init_config_yolo(class_mapping=class_mapping, custom_transforms=get_valid_transforms())
    test_loader = DataLoader(dataset=test_dataset, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, pin_memory=PIN_MEMORY, shuffle=False, drop_last=False)

    model.eval()
    for x, y in test_loader:
        x = x.to(DEVICE)
        out = model(x)

        pred_bboxes = cellboxes_to_boxes(out)
        gt_bboxes = cellboxes_to_boxes(y)

        for idx in range(8):
            pred_box = non_max_suppression(pred_bboxes[idx], iou_threshold=0.5, threshold=0.4, box_format="midpoint")
            gt_box = non_max_suppression(gt_bboxes[idx], iou_threshold=0.5, threshold=0.4, box_format="midpoint")
            image = x[idx].permute(1,2,0).to("cpu")/255
            plot_image_with_labels(image, gt_box, pred_box, class_mapping)
        break

# Run test
test()