In [1]:
pip install -r /content/requirements.txt

Collecting Counter (from -r /content/requirements.txt (line 4))
  Downloading Counter-1.0.0.tar.gz (5.2 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting typing (from -r /content/requirements.txt (line 7))
  Downloading typing-3.7.4.3.tar.gz (78 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m78.6/78.6 kB[0m [31m1.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[31mERROR: Ignored the following versions that require a different python version: 3.10.0.0 Requires-Python >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <3.5; 3.7.4.2 Requires-Python >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, <3.5[0m[31m
[0m[31mERROR: Could not find a version that satisfies the requirement os (from versions: none)[0m[31m
[0m[31mERROR: No matching distribution found for os[0m[31m
[0m

In [1]:
##################################################             YOLO         ##########################################################################

# ------------------------------------------------       Defining the Model           ----------------------------------------------------------------

import torch
import torch.nn as nn

# Tuples are defined as (kernel_size, filters, stride, padding), all the layers are designed exactly as described in the original paper.

Yolo_configuration = [
    (7, 64, 2, 3),        # (7, 64, 2, 3) indicates 7*7 kernel, 64 output filter, stride of 2 and 3 padding
    "M",                  # maxpooling layer
    (3, 192, 1, 1),       # 3*3 kernel, 192 output filters, stride of 1 and 1 padding
    "M",                  # maxpooling layer with stride 2x2 and kernel 2x2
    (1, 128, 1, 0),       # 1*1 kernel, 128 output filters, stride of 1 and 0 padding
    (3, 256, 1, 1),       # 3*3 kernel, 192 output filters, stride of 1 and 1 padding
    (1, 256, 1, 0),       # 3*3 kernel, 192 output filters, stride of 1 and 1 padding
    (3, 512, 1, 1),       # 3*3 kernel, 192 output filters, stride of 1 and 1 padding
    "M",                  # maxpooling layer
    [(1, 256, 1, 0), (3, 512, 1, 1), 4],    # 4 layers of 1*1 kernels, 256 output filters, 1 stride and 3*3 kernels, 512 output filters, 1 stride and 1 padding
    (1, 512, 1, 0),                         # 1*1 kernels, 512 output filters, 1 stride
    (3, 1024, 1, 1),                        # 3*3 kernels, 1024 output filters, 1 stride and 1 padding
    "M",                                    # maxpooling layer
    [(1, 512, 1, 0), (3, 1024, 1, 1), 2],   # 2 layers of 1*1 kernels, 512 output filters, 1 stride and 3*3 kernels, 1024 output filters, 1 stride and 1 padding
    (3, 1024, 1, 1),                        # 3*3 kernels, 1024 output filters, 1 stride and 1 padding
    (3, 1024, 2, 1),                        # 3*3 kernels, 1024 output filters, 2 stride and 1 padding
    (3, 1024, 1, 1),                        # 3*3 kernels, 1024 output filters, 1 stride and 1 padding
    (3, 1024, 1, 1),                        # 3*3 kernels, 1024 output filters, 1 stride and 1 padding
]

# ---------------------------------------  Defining the CNN Blocks   -------------------------------------------------------------
class CNNLayer(nn.Module):
    def __init__(self, input_channels, output_channels, **kwargs):
        """
        Args:
            input_channels (int): Number of input channels.
            output_channels (int): Number of output channels (filters).
            kernel_size (int): Size of the convolutional kernel.
            stride (int): Stride for the convolution.
            padding (int): Padding for the convolution.
        """
        super(CNNLayer, self).__init__()
        # Convolutional layer with no bias (bias=False for BatchNorm compatibility)
        self.conv = nn.Conv2d(input_channels, output_channels, bias=False, **kwargs)
        # Batch normalization layer to stabilize training
        self.batchnorm = nn.BatchNorm2d(output_channels)
        # Leaky ReLU activation function
        self.leakyrelu = nn.LeakyReLU(0.1)

    def forward(self, x):
        """
        Forward pass.
        Args:
            x (torch.Tensor): Input tensor.
        Returns:
            torch.Tensor: Output after applying Conv2D, BatchNorm, and LeakyReLU.
        """
        x = self.conv(x)         # convolution
        x = self.batchnorm(x)    # batch normalization
        x = self.leakyrelu(x)    # LeakyReLU activation
        return x

# -----------------------------------------------------------------------------------------------------------------------------------------

# ---------------------------------------  Defining the YOLO architecture   -------------------------------------------------------------

class YoloVersion1(nn.Module):
    def __init__(self, input_channels=3, **kwargs):
        """
        Initializing the YOLO1 model.

        Args:
            input_channels: Number of input channels (3 - RGB images).
            **kwargs: Additional parameters for fully connected layer creation.
        """
        super(YoloVersion1, self).__init__()
        self.architecture = Yolo_configuration    # Yolo configuration
        self.input_channels = input_channels       # Initial input channels
        self.darknet = self.create_cnn_layers(self.architecture)   # Convolutional layers
        self.fcs = self.create_fully_connected_layers(**kwargs)      # Fully connected layers

    def forward(self, x):
        """
        Forward pass through the YOLO.

        Args:
            x (torch.Tensor): Input image tensor.

        Returns:
            torch.Tensor: Output after passing through the model.
        """
        x = self.darknet(x)  # Passing through convolutional layers
        x = torch.flatten(x, start_dim=1)  # Flattening for fully connected layers
        return self.fcs(x)  # Passing through fully connected layers

    def create_cnn_layers(self, architecture):
        """
        Creating the convolutional layers of the YOLO model.

        Args:
            architecture: Configuration of the convolutional layers.

        Returns:
            nn.Sequential: A sequential container of the convolutional layers.
        """
        layers = []
        input_channels = self.input_channels

        for layer in architecture:
            if isinstance(layer, tuple):  # Standard convolutional layer
                kernel_size, filters, stride, padding = layer
                layers.append(
                    CNNLayer(
                        input_channels, filters,
                        kernel_size=kernel_size, stride=stride, padding=padding
                    )
                )
                input_channels = filters  # Updating the input channels for the next layer

            elif layer == "M":  # Max pooling layer
                layers.append(nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2)))

            elif isinstance(layer, list):  # Repeated blocks of convolutional layers
                conv1, conv2, num_repeats = layer
                for _ in range(num_repeats):
                    # First convolution in the repeated block
                    layers.append(
                        CNNLayer(
                            input_channels, conv1[1],
                            kernel_size=conv1[0], stride=conv1[2], padding=conv1[3]
                        )
                    )
                    # Second convolution in the repeated block
                    layers.append(
                        CNNLayer(
                            conv1[1], conv2[1],
                            kernel_size=conv2[0], stride=conv2[2], padding=conv2[3]
                        )
                    )
                    input_channels = conv2[1]  # Updating input channels after each block

        return nn.Sequential(*layers)  # sequential container

    def create_fully_connected_layers(self, split_size, num_boxes, num_classes):
        """
        The fully connected layers of the YOLO model.

        Args:
            **kwargs: Parameters for the fully connected layers (e.g., output size).

        Returns:
            nn.Sequential: A sequential container of the fully connected layers.
        """
        S, B, C = split_size, num_boxes, num_classes

        # Below code is as defined in the original paper
        return nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024 * S * S, 496),
            nn.Dropout(0.0),
            nn.LeakyReLU(0.1),
            nn.Linear(496, S * S * (C + B * 5)),
        )

In [2]:
# --------------------------      Intersection over Union (IoU),Non-Maximum Suppression (NMS),Mean Average Precision (mAP)    ----------------------
import torch
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from collections import Counter

def calculate_iou(predicted_boxes, target_boxes, box_format="midpoint"):
    """
    Intersection over Union (IoU) between predicted and target bounding boxes.

    predicted_boxes (torch.Tensor): Predicted bounding boxes (BATCH_SIZE, 4)
        target_boxes (torch.Tensor): Target bounding boxes (BATCH_SIZE, 4)
        box_format (str): Format of bounding boxes - "midpoint" or "corners"

    Returns:
        torch.Tensor: IoU values for all examples
    """

    if box_format == "midpoint":
        # Converting from (x, y, w, h) to (x1, y1, x2, y2)
        pred_x1 = predicted_boxes[..., 0:1] - predicted_boxes[..., 2:3] / 2
        pred_y1 = predicted_boxes[..., 1:2] - predicted_boxes[..., 3:4] / 2
        pred_x2 = predicted_boxes[..., 0:1] + predicted_boxes[..., 2:3] / 2
        pred_y2 = predicted_boxes[..., 1:2] + predicted_boxes[..., 3:4] / 2
        target_x1 = target_boxes[..., 0:1] - target_boxes[..., 2:3] / 2
        target_y1 = target_boxes[..., 1:2] - target_boxes[..., 3:4] / 2
        target_x2 = target_boxes[..., 0:1] + target_boxes[..., 2:3] / 2
        target_y2 = target_boxes[..., 1:2] + target_boxes[..., 3:4] / 2

    if box_format == "corners":
        # Boxes are already in (x1, y1, x2, y2) format
        pred_x1 = predicted_boxes[..., 0:1]
        pred_y1 = predicted_boxes[..., 1:2]
        pred_x2 = predicted_boxes[..., 2:3]
        pred_y2 = predicted_boxes[..., 3:4]  # (N, 1)
        target_x1 = target_boxes[..., 0:1]
        target_y1 = target_boxes[..., 1:2]
        target_x2 = target_boxes[..., 2:3]
        target_y2 = target_boxes[..., 3:4]

    # Calculating intersection coordinates
    intersection_x1 = torch.max(pred_x1, target_x1)
    intersection_y1 = torch.max(pred_y1, target_y1)
    intersection_x2 = torch.min(pred_x2, target_x2)
    intersection_y2 = torch.min(pred_y2, target_y2)

    # .clamp(0) is for the case when they do not intersect
    intersection_area = (intersection_x2 - intersection_x1).clamp(0) * (intersection_y2 - intersection_y1).clamp(0)

    # Calculating box areas
    pred_area = abs((pred_x2 - pred_x1) * (pred_y2 - pred_y1))
    target_area = abs((target_x2 - target_x1) * (target_y2 - target_y1))

    union_area = pred_area + target_area - intersection_area

    return intersection_area / (union_area + 1e-6) # Adding small epsilon to avoid division by zero


def non_max_suppression(bounding_boxes, iou_threshold, confidence_threshold, box_format="corners"):
    """
    Applies Non-Maximum Suppression (NMS) to a list of bounding boxes.

    Args:
        bounding_boxes (List[List[float]]): List of bounding boxes, each specified as
            [class_prediction, confidence_score, x1, y1, x2, y2]
        iou_threshold (float): IoU threshold for considering overlapping boxes
        confidence_threshold (float): Threshold to filter low-confidence predictions
        box_format (str): Format of bounding boxes - "midpoint" or "corners"

    Returns:
        List[List[float]]: Filtered bounding boxes after applying NMS
    """

    assert isinstance(bounding_boxes, list)

    # Filtering out low-confidence predictions
    filtered_boxes = [box for box in bounding_boxes if box[1] > confidence_threshold]
    # Sorting boxes by confidence score in descending order
    sorted_boxes = sorted(filtered_boxes, key=lambda x: x[1], reverse=True)
    nms_result = []

    while sorted_boxes:
        chosen_box = sorted_boxes.pop(0)
        # Filtering out boxes with high IoU overlap
        sorted_boxes = [
            box for box in sorted_boxes
            if box[0] != chosen_box[0]
            or calculate_iou(
                torch.tensor(chosen_box[2:]),
                torch.tensor(box[2:]),
                box_format=box_format,
            )
            < iou_threshold
        ]

        nms_result.append(chosen_box)

    return nms_result


def calculate_mean_average_precision(
    predicted_boxes, ground_truth_boxes, iou_threshold=0.5, box_format="midpoint", num_classes=20
):
    """
    Calculates Mean Average Precision (mAP) for object detection.

    Args:
        predicted_boxes (List[List[float]]): List of predicted bounding boxes, each specified as
            [image_id, class_prediction, confidence_score, x1, y1, x2, y2]
        ground_truth_boxes (List[List[float]]): List of ground truth bounding boxes, same format as predicted_boxes
        iou_threshold (float): IoU threshold for considering a prediction as correct
        box_format (str): Format of bounding boxes - "midpoint" or "corners"
        num_classes (int): Number of classes in the dataset

    Returns:
        float: mAP value across all classes given a specific IoU threshold
    """

    average_precisions = []

    #for numerical stability
    epsilon = 1e-6

    for class_id in range(num_classes):
        detections = []
        ground_truths = []

        for detection in predicted_boxes:
            if detection[1] == class_id:
                detections.append(detection)

        for true_box in ground_truth_boxes:
            if true_box[1] == class_id:
                ground_truths.append(true_box)

        if len(detections) > 0 and len(ground_truths) > 0:
            # print(f"\nClass {c}:")
            # print(f"Sample detection: {detections[0]}")
            # print(f"Sample ground truth: {ground_truths[0]}")

            # Debug IoU calculation
            sample_iou = calculate_iou(
                torch.tensor(detections[0][3:]),
                torch.tensor(ground_truths[0][3:]),
                box_format=box_format
            )

        if not detections or not ground_truths:
            continue

        # Counting the number of ground truth boxes for each image
        gt_counts = Counter([gt[0] for gt in ground_truths])

        for image_id, count in gt_counts.items():
            gt_counts[image_id] = torch.zeros(count)

        # Sorting detections by confidence score (descending)
        detections.sort(key=lambda x: x[2], reverse=True)
        true_positives = torch.zeros((len(detections)))
        false_positives = torch.zeros((len(detections)))
        total_true_bboxes = len(ground_truths)

        # If none exists for this class then we can safely skip
        if total_true_bboxes == 0:
            continue

        for detection_idx, detection in enumerate(detections):

            image_ground_truths = [
                gt for gt in ground_truths if gt[0] == detection[0]
            ]

            best_iou = 0
            best_gt_idx = None

            for idx, gt in enumerate(image_ground_truths):
                iou = calculate_iou(
                    torch.tensor(detection[3:]),
                    torch.tensor(gt[3:]),
                    box_format=box_format,
                )

                if iou > best_iou:
                    best_iou = iou
                    best_gt_idx = idx

            if best_iou > iou_threshold:
                # only detect ground truth detection once
                if gt_counts[detection[0]][best_gt_idx] == 0:
                    # true positive and add this bounding box to seen
                    true_positives[detection_idx] = 1
                    gt_counts[detection[0]][best_gt_idx] = 1
                else:
                    false_positives[detection_idx] = 1

            # if IOU is lower then the detection is a false positive
            else:
                false_positives[detection_idx] = 1

        # Calculating cumulative sums
        TP_cumsum = torch.cumsum(true_positives, dim=0)
        FP_cumsum = torch.cumsum(false_positives, dim=0)

        recalls = TP_cumsum / (total_true_bboxes + epsilon)
        precisions = torch.divide(TP_cumsum, (TP_cumsum + FP_cumsum + epsilon))

        # Appending 0 recall and 1 precision for AUC calculation
        precisions = torch.cat((torch.tensor([1]), precisions))
        recalls = torch.cat((torch.tensor([0]), recalls))

        # Calculating average precision using trapezoidal rule
        average_precisions.append(torch.trapz(precisions, recalls))

    if not average_precisions:
        print("Warning: No valid predictions were made for any class.")
        return 0.0  # Return 0 mAP if no valid predictions

    return sum(average_precisions) / len(average_precisions)


def plot_image(image, bounding_boxes, class_labels):
    # Plots predicted bounding boxes and their labels on the given image.
    image = np.array(image)
    height, width, _ = image.shape

    # Create figure and axes
    fig, ax = plt.subplots(1)

    # Display the image
    ax.imshow(image)

    for box, label in zip(bounding_boxes, class_labels):
        box = box[2:]
        assert len(box) == 4, "Got more values than in x, y, w, h, in a box!"

        # Extracting box coordinates
        x_center, y_center, box_width, box_height = box
        x_min = (x_center - box_width / 2) * width
        y_min = (y_center - box_height / 2) * height

        # Creating rectangle patch
        rect = patches.Rectangle(
            (x_min, y_min),
            box_width * width,
            box_height * height,
            linewidth=1,
            edgecolor="r",
            facecolor="none",
        )
        # Adding the rectangle to the plot
        ax.add_patch(rect)

        # Adding label above the box
        ax.text(x_min, y_min, label, color='red')

    plt.show()


def extract_bounding_boxes(
    loader,
    model,
    iou_threshold,
    threshold,
    pred_format="cells",
    box_format="midpoint",
    device="cpu",
):
    """
    Extracts predicted and true bounding boxes from a data loader using a given model.

    Args:
        data_loader: DataLoader providing batches of images and labels
        model: Trained model used for making predictions
        iou_threshold (float): IoU threshold for non-max suppression
        confidence_threshold (float): Confidence threshold to filter predictions
        prediction_format (str): Format of predictions - "cells" or other
        box_format (str): Format of bounding boxes - "midpoint" or "corners"
        device (str): Device to run the model on ("cpu" or "cuda")

    Returns:
        Tuple[List[List[float]], List[List[float]]]: Lists of predicted and true bounding boxes
    """
    all_pred_boxes = []
    all_true_boxes = []

    # make sure model is in eval before get bboxes
    model.eval()
    train_idx = 0

    for batch_idx, (x, labels) in enumerate(loader):
        x = x.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            predictions = model(x)

        batch_size = x.shape[0]
        true_bboxes = convert_yolo_output_to_boxes(labels)
        bboxes = convert_yolo_output_to_boxes(predictions)

        for idx in range(batch_size):
            nms_boxes = non_max_suppression(
                bboxes[idx],
                iou_threshold=iou_threshold,
                confidence_threshold=threshold,
                box_format=box_format,
            )


            # if batch_idx == 0 and idx == 0:
            #     print("\nPrediction confidences before NMS:")
            #     for box in bboxes[idx]:
            #         print(f"Class: {box[0]}, Confidence: {box[1]:.4f}")

            #     print("\nPredictions after NMS:")
            #     for box in nms_boxes:
            #         print(f"Class: {box[0]}, Confidence: {box[1]:.4f}")

            for nms_box in nms_boxes:
                all_pred_boxes.append([train_idx] + nms_box)

            for box in true_bboxes[idx]:
                # many will get converted to 0 pred
                if box[1] > threshold:
                    all_true_boxes.append([train_idx] + box)

            train_idx += 1

    model.train()
    return all_pred_boxes, all_true_boxes



def convert_yolo_predictions(predictions, S=7):
    """
    Converts YOLO predictions from cell-relative coordinates to image-relative coordinates.

    Args:
        predictions (torch.Tensor): Raw predictions from YOLO model
        grid_size (int): Size of the grid used in YOLO (default: 7)

    Returns:
        torch.Tensor: Converted predictions in image-relative coordinates
    """

    predictions = predictions.to("cpu")
    batch_size = predictions.shape[0]
    predictions = predictions.reshape(batch_size, 7, 7, 30)

    # Extracting bounding box predictions
    box_1 = predictions[..., 21:25]
    box_2 = predictions[..., 26:30]

    # Determining which box has higher confidence score
    scores = torch.cat(
        (predictions[..., 20].unsqueeze(0), predictions[..., 25].unsqueeze(0)), dim=0
    )
    best_box = scores.argmax(0).unsqueeze(-1)
    best_boxes = box_1 * (1 - best_box) + best_box * box_2

    # Creating grid cell indices
    cell_indices = torch.arange(7).repeat(batch_size, 7, 1).unsqueeze(-1)

    # Converting x and y coordinates
    x_coord = 1 / S * (best_boxes[..., :1] + cell_indices)
    y_coord = 1 / S * (best_boxes[..., 1:2] + cell_indices.permute(0, 2, 1, 3))

    # Converting width and height
    width_height = 1 / S * best_boxes[..., 2:4]

    # Combining converted coordinates
    converted_boxes = torch.cat((x_coord, y_coord, width_height), dim=-1)

     # Getting predicted class and best confidence score
    predicted_class = predictions[..., :20].argmax(-1).unsqueeze(-1)
    best_confidence = torch.max(predictions[..., 20], predictions[..., 25]).unsqueeze(
        -1
    )

    # Combining all predictions
    converted_predictions = torch.cat(
        (predicted_class, best_confidence, converted_boxes), dim=-1
    )

    return converted_predictions

def convert_yolo_output_to_boxes(yolo_output, S=7):
    """
    Converts YOLO output from cell-based format to a list of bounding boxes for each image in the batch.

    Args:
        yolo_output (torch.Tensor): Raw output from YOLO model
        grid_size (int): Size of the grid used in YOLO (default: 7)

    Returns:
        List[List[List[float]]]: List of bounding boxes for each image in the batch
    """
    # Converting cell-based predictions to image-based predictions
    converted_predictions = convert_yolo_predictions(yolo_output).reshape(yolo_output.shape[0], S * S, -1)

    # Converting class predictions to long integers
    converted_predictions[..., 0] = converted_predictions[..., 0].long()
    all_bounding_boxes = []

    for image_index in range(yolo_output.shape[0]):
        image_bounding_boxes = []

        for cell_index in range(S * S):
            # Extracting and converting each prediction to a list of floats
            box = [x.item() for x in converted_predictions[image_index, cell_index, :]]
            image_bounding_boxes.append(box)
        all_bounding_boxes.append(image_bounding_boxes)

    return all_bounding_boxes

def save_checkpoint(state, filename="CS512_YOLO.pth.tar"):
    print("=> Saving checkpoint")
    torch.save(state, filename)


def load_checkpoint(checkpoint, model, optimizer):
    print("=> Loading checkpoint")
    model.load_state_dict(checkpoint["state_dict"])
    optimizer.load_state_dict(checkpoint["optimizer"])


In [3]:
# ----------------------------------   YOLO LOSS Function   -----------------------------------------------------------------

import torch
import torch.nn as nn


class YoloLoss(nn.Module):
    """
    Loss calculation for our YOLO Model
    """

    def __init__(self, S=7, B=2, C=20):
        super(YoloLoss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")

        """
        S is split size of image - 7 in original paper,
        B is number of boxes - 2 in original paper,
        C is number of classes - 20 ,
        """
        self.S = S
        self.B = B
        self.C = C

         # Loss weights as given in the paper
        self.lambda_noobj = 0.5  # Weight for no-object confidence loss
        self.lambda_coord = 5    # Weight for bounding box coordinate loss

    def forward(self, predictions, target):
        # predictions are shaped (BATCH_SIZE, S*S(C+B*5)
        predictions = predictions.reshape(-1, self.S, self.S, self.C + self.B * 5)

        # Calculating IoU for the two predicted bounding boxes with target bounding boxes
        iou_b1 = calculate_iou(predictions[..., 21:25], target[..., 21:25])
        iou_b2 = calculate_iou(predictions[..., 26:30], target[..., 21:25])
        ious = torch.cat([iou_b1.unsqueeze(0), iou_b2.unsqueeze(0)], dim=0)

        # Getting the box with highest IoU
        iou_maxes, bestbox = torch.max(ious, dim=0)
        exists_box = target[..., 20].unsqueeze(3)

        # === ===========  Bounding Box Coordinate Loss ==================

        box_predictions = exists_box * (
            (
                bestbox * predictions[..., 26:30]
                + (1 - bestbox) * predictions[..., 21:25]
            )
        )

        box_targets = exists_box * target[..., 21:25]

        # Applying sqrt to width and height
        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(
            torch.abs(box_predictions[..., 2:4] + 1e-6)
        )
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])

        box_loss = self.mse(
            torch.flatten(box_predictions, end_dim=-2),
            torch.flatten(box_targets, end_dim=-2),
        )

        # ==============================   Object Confidence Loss   ==============
        pred_box = (
            bestbox * predictions[..., 25:26] + (1 - bestbox) * predictions[..., 20:21]
        )

        object_loss = self.mse(
            torch.flatten(exists_box * pred_box),
            torch.flatten(exists_box * target[..., 20:21]),
        )

          # ==============================  No Object Confidence Loss   ==============

        no_object_loss = self.mse(
            torch.flatten((1 - exists_box) * predictions[..., 20:21], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., 20:21], start_dim=1),
        )

        no_object_loss += self.mse(
            torch.flatten((1 - exists_box) * predictions[..., 25:26], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., 20:21], start_dim=1)
        )


         # =============================   Class Prediction Loss      ========================================

        class_loss = self.mse(
            torch.flatten(exists_box * predictions[..., :20], end_dim=-2,),
            torch.flatten(exists_box * target[..., :20], end_dim=-2,),
        )

        # combining all losses
        total_loss = (
            self.lambda_coord * box_loss  # first two losses component in total loss as given in paper
            + object_loss  # 3rd two losses component in total loss as given in paper
            + self.lambda_noobj * no_object_loss  # 4th two losses component in total loss as given in paper
            + class_loss  # 5th two losses component in total loss as given in paper
        )

        return total_loss


In [4]:
# -------------------------------------   Loading and processing the Pascal VOC dataset.  ---------------------------------------------------

import torch
import os
import pandas as pd
from PIL import Image

class VOCDataset(torch.utils.data.Dataset):
    def __init__(
        self,
        csv_file: str,
        image_dir: str,
        label_dir: str,
        grid_size: int = 7,
        num_boxes: int = 2,
        num_classes: int = 20,
        transform = None,
    ):
        self.annotations = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transform = transform
        self.grid_size = grid_size
        self.num_boxes = num_boxes
        self.num_classes = num_classes

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index: int):
        # Loading image and bounding box data
        image, bounding_boxes = self._load_image_and_boxes(index)

        # Applying transformations if specified
        if self.transform:
            image, bounding_boxes = self.transform(image, bounding_boxes)

        # Converting bounding boxes to grid cell format
        label_matrix = self._convert_to_grid_format(bounding_boxes)

        return image, label_matrix

    def _load_image_and_boxes(self, index):
        # Loading bounding box data
        label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1])
        bounding_boxes = []
        with open(label_path) as f:
            for label in f.readlines():
                class_label, x, y, width, height = [
                    float(x) if float(x) != int(float(x)) else int(x)
                    for x in label.replace("\n", "").split()
                ]
                bounding_boxes.append([class_label, x, y, width, height])

        # Loading the image
        img_path = os.path.join(self.image_dir, self.annotations.iloc[index, 0])
        image = Image.open(img_path)

        return image, torch.tensor(bounding_boxes)

    def _convert_to_grid_format(self, bounding_boxes):
        label_matrix = torch.zeros((self.grid_size, self.grid_size, self.num_classes + 5 * self.num_boxes))

        for box in bounding_boxes:
            class_label, x, y, width, height = box.tolist()
            class_label = int(class_label)

            # Calculating grid cell indices
            i, j = int(self.grid_size * y), int(self.grid_size * x)

            # Calculating box coordinates relative to the cell
            x_cell, y_cell = self.grid_size * x - j, self.grid_size * y - i
            width_cell, height_cell = width * self.grid_size, height * self.grid_size

            # Assigning values to the label matrix if no object is already present in the cell
            if label_matrix[i, j, 20] == 0:
                label_matrix[i, j, 20] = 1  # Object presence flag
                label_matrix[i, j, 21:25] = torch.tensor([x_cell, y_cell, width_cell, height_cell])
                label_matrix[i, j, class_label] = 1  # One-hot encoding for class label

        return label_matrix


In [13]:
#######################  This code is taken from Perplexity AI ###########################
# Method 1: Using Google Drive Mount
from google.colab import drive
drive.mount('/content/drive')

# Create a directory for the dataset
!mkdir -p /content/pascal_voc

# Assuming PASCAL_VOC.zip is in your main Drive folder
!cp '/content/drive/My Drive/PASCAL_VOC.zip' /content/pascal_voc/
!cd /content/pascal_voc && unzip PASCAL_VOC.zip

# Method 2: Using gdown (if you have a shareable link)
# First, get the shareable link from Google Drive:
# 1. Right-click on PASCAL_VOC.zip
# 2. Click "Share" -> "Anyone with the link"
# 3. Copy the link

!pip install gdown

# Replace FILE_ID with your actual file ID from the shareable link
# Example: if link is https://drive.google.com/file/d/1234xyz/view
# then FILE_ID is 1234xyz
!gdown --id FILE_ID -O /content/pascal_voc/PASCAL_VOC.zip
!cd /content/pascal_voc && unzip PASCAL_VOC.zip

# Verify the extraction
!ls /content/pascal_voc

# Expected structure:
# /content/pascal_voc/
#   ├── Annotations/
#   ├── ImageSets/
#   │   └── Main/
#   └── JPEGImages/

# Clean up the zip file to save space
!rm /content/pascal_voc/PASCAL_VOC.zip

#######################  This code is taken from Perplexity AI ###########################

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
  inflating: labels/2009_004347.txt  
  inflating: labels/2009_004350.txt  
  inflating: labels/2009_004351.txt  
  inflating: labels/2009_004357.txt  
  inflating: labels/2009_004358.txt  
  inflating: labels/2009_004359.txt  
  inflating: labels/2009_004361.txt  
  inflating: labels/2009_004364.txt  
  inflating: labels/2009_004366.txt  
  inflating: labels/2009_004368.txt  
  inflating: labels/2009_004369.txt  
  inflating: labels/2009_004370.txt  
  inflating: labels/2009_004371.txt  
  inflating: labels/2009_004374.txt  
  inflating: labels/2009_004375.txt  
  inflating: labels/2009_004377.txt  
  inflating: labels/2009_004382.txt  
  inflating: labels/2009_004383.txt  
  inflating: labels/2009_004390.txt  
  inflating: labels/2009_004392.txt  
  inflating: labels/2009_004394.txt  
  inflating: labels/2009_004397.txt  
  inflating: labels/2009_004399.txt  
  inflating: labels/2009_004402.txt  
  inflating: labels/200

In [14]:
# ---------------------------       Main script for training YOLO model on Pascal VOC dataset    -----------------------------

import torch
import torchvision.transforms as transforms
import torch.optim as optim
import torchvision.transforms.functional as FT
from tqdm import tqdm
from torch.utils.data import DataLoader
from torch.utils.data import Subset
import random
from typing import List

#random seed for reproducibility
RANDOM_SEED = 123
torch.manual_seed(RANDOM_SEED)


# Pascal VOC dataset classes
VOC_CLASSES = [
    "aeroplane", "bicycle", "bird", "boat", "bottle", "bus", "car", "cat",
    "chair", "cow", "diningtable", "dog", "horse", "motorbike", "person",
    "pottedplant", "sheep", "sofa", "train", "tvmonitor"
]


# Training configurations
CONFIG = {
    "LEARNING_RATE": 2e-5,
    "DEVICE": "cpu",  # we have used "cpu" only
    "BATCH_SIZE": 16,
    "WEIGHT_DECAY": 0,
    "EPOCHS": 1000,  #5 - in the output panel the code run was with 10 epochs and only on 1000 data
    "NUM_WORKERS": 2,
    "PIN_MEMORY": True,
    "LOAD_MODEL": False,
    "LOAD_MODEL_FILE": "/content/CS512_YOLO.pth.tar",
    "IMG_DIR": "/root/.cache/kagglehub/datasets/aladdinpersson/pascal-voc-dataset-used-in-yolov3-video/versions/1/PASCAL_VOC/images",
    "LABEL_DIR": "/root/.cache/kagglehub/datasets/aladdinpersson/pascal-voc-dataset-used-in-yolov3-video/versions/1/PASCAL_VOC/labels",
}


class Compose(object):
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, img, bboxes):
        for t in self.transforms:
            img, bboxes = t(img), bboxes

        return img, bboxes

# Defining image transformations
transform = Compose([transforms.Resize((448, 448)), transforms.ToTensor(),])


def train_epoch(train_loader, model, optimizer, loss_fn):
    """
    Trains the model for one epoch
    """
    model.train()
    progress_bar = tqdm(train_loader, leave=True)
    epoch_losses = []

    for batch_idx, (images, targets) in enumerate(progress_bar):
        images, targets = images.to(CONFIG["DEVICE"]), targets.to(CONFIG["DEVICE"])

        predictions = model(images)
        loss = loss_fn(predictions, targets)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        epoch_losses.append(loss.item())
        progress_bar.set_postfix(loss=loss.item())

    mean_loss = sum(epoch_losses) / len(epoch_losses)
    print(f"Mean loss for this epoch: {mean_loss:.4f}")


def main():
    # Initializing model, optimizer, and loss function
    model = YoloVersion1(split_size=7, num_boxes=2, num_classes=20).to(CONFIG["DEVICE"])
    optimizer = optim.Adam(
        model.parameters(), lr=CONFIG["LEARNING_RATE"], weight_decay=CONFIG["WEIGHT_DECAY"]
    )
    loss_fn = YoloLoss()

    if CONFIG["LOAD_MODEL"]:
        load_checkpoint(torch.load(CONFIG["LOAD_MODEL_FILE"]), model, optimizer)

    # Loading and preparing datasets
    train_dataset = VOCDataset(
        "/root/.cache/kagglehub/datasets/aladdinpersson/pascal-voc-dataset-used-in-yolov3-video/versions/1/PASCAL_VOC/train.csv",
        transform=transform,
        image_dir=CONFIG["IMG_DIR"],
        label_dir=CONFIG["LABEL_DIR"],
    )

    # subset of the training data (1000 samples)
    #train_dataset = Subset(train_dataset, random.sample(range(len(train_dataset)), 1000))

    test_dataset = VOCDataset(
        "/root/.cache/kagglehub/datasets/aladdinpersson/pascal-voc-dataset-used-in-yolov3-video/versions/1/PASCAL_VOC/test.csv",
        transform=transform,
        image_dir=CONFIG["IMG_DIR"],
        label_dir=CONFIG["LABEL_DIR"],
    )

    # Data loaders
    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=CONFIG["BATCH_SIZE"],
        num_workers=CONFIG["NUM_WORKERS"],
        pin_memory=CONFIG["PIN_MEMORY"],
        shuffle=True,
        drop_last=False,
    )

    test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=CONFIG["BATCH_SIZE"],
        num_workers=CONFIG["NUM_WORKERS"],
        pin_memory=CONFIG["PIN_MEMORY"],
        shuffle=True,
        drop_last=True,
    )

    # Training loop
    for epoch in range(CONFIG["EPOCHS"]):
        ##############################################################################
        # The code below test the model performance using 8 photos
        # LOAD_MODEL = true, the code will load the model
        if CONFIG["LOAD_MODEL"]:
            for x, y in train_loader:
                x = x.to(CONFIG["DEVICE"])
                for idx in range(8):
                    bboxes = convert_yolo_output_to_boxes(model(x))
                    bboxes = non_max_suppression(bboxes[idx], iou_threshold=0.5, confidence_threshold=0.4, box_format="midpoint")
                    #plot_image(x[idx].permute(1,2,0).to("cpu"), bboxes)

                    # Extract class predictions
                    class_preds = [box[0] for box in bboxes]

                    # Convert class indices to labels (assuming you have a list of class names)
                    class_labels = [VOC_CLASSES[int(pred)] for pred in class_preds]

                    # Plot image with bounding boxes and class labels
                    plot_image(x[idx].permute(1,2,0).to("cpu"), bboxes, class_labels)

            import sys
            sys.exit()
        ##########################################################################################

        # train the model
        train_epoch(train_loader, model, optimizer, loss_fn)

        # Evaluating the model
        pred_boxes, target_boxes = extract_bounding_boxes(
            train_loader, model, iou_threshold=0.1, threshold=0.05
        )

        # Calculation mAP
        mean_avg_prec = calculate_mean_average_precision(
            pred_boxes,
            target_boxes,
            iou_threshold=0.1,
            box_format="midpoint"
        )
        print(f"Epoch {epoch+1}/{CONFIG['EPOCHS']} - Train mAP: {mean_avg_prec:.4f}")


        # Save checkpoint after each epoch (optional)
        checkpoint = {
            "state_dict": model.state_dict(),
            "optimizer": optimizer.state_dict(),
        }
        save_checkpoint(checkpoint, filename=CONFIG["LOAD_MODEL_FILE"])


if __name__ == "__main__":
    main()


100%|██████████| 63/63 [01:29<00:00,  1.42s/it, loss=201]

Mean loss for this epoch: 549.2892





Epoch 1/10 - Train mAP: 0.0070
=> Saving checkpoint


100%|██████████| 63/63 [01:34<00:00,  1.50s/it, loss=196]

Mean loss for this epoch: 292.4213





Epoch 2/10 - Train mAP: 0.0153
=> Saving checkpoint


100%|██████████| 63/63 [01:34<00:00,  1.50s/it, loss=121]

Mean loss for this epoch: 210.4699





Epoch 3/10 - Train mAP: 0.0289
=> Saving checkpoint


100%|██████████| 63/63 [01:43<00:00,  1.64s/it, loss=89.2]

Mean loss for this epoch: 176.1152





Epoch 4/10 - Train mAP: 0.0759
=> Saving checkpoint


100%|██████████| 63/63 [01:44<00:00,  1.66s/it, loss=71.2]

Mean loss for this epoch: 156.5019





Epoch 5/10 - Train mAP: 0.1279
=> Saving checkpoint


100%|██████████| 63/63 [01:43<00:00,  1.65s/it, loss=85.7]

Mean loss for this epoch: 144.6685





Epoch 6/10 - Train mAP: 0.1874
=> Saving checkpoint


100%|██████████| 63/63 [01:39<00:00,  1.59s/it, loss=76.3]

Mean loss for this epoch: 133.3275





Epoch 7/10 - Train mAP: 0.2325
=> Saving checkpoint


100%|██████████| 63/63 [01:45<00:00,  1.67s/it, loss=71.1]

Mean loss for this epoch: 122.8571





Epoch 8/10 - Train mAP: 0.3880
=> Saving checkpoint


100%|██████████| 63/63 [01:41<00:00,  1.62s/it, loss=94.2]

Mean loss for this epoch: 115.4267





Epoch 9/10 - Train mAP: 0.4722
=> Saving checkpoint


100%|██████████| 63/63 [01:41<00:00,  1.61s/it, loss=85.7]

Mean loss for this epoch: 110.0101





Epoch 10/10 - Train mAP: 0.4974
=> Saving checkpoint
