# INFO8010: A Yolo v3 object detector implementation

The weights can be found here [git@github.com:SkYF4Il/DP_YOLO_V3.git](https://github.com/SkYF4Il/DP_YOLO_V3.git)

# Step 1 : Import the necessary libraries and define the global variables needed in the notebook

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from torchvision.transforms import ToPILImage
from torch.utils.data import DataLoader, Dataset
import random
import pickle as pkl
import os
import pandas as pd
import numpy as np
from PIL import Image
import torch
import torchvision
from torchvision import transforms
from sklearn.model_selection import train_test_split
import torch
from torch.nn.utils.rnn import pad_sequence
import torch.nn as nn
from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn, TimeElapsedColumn
from rich.console import Console
import time
import torch.optim as optim
import torch.backends.cudnn as cudnn
import warnings
from collections import defaultdict
import gc

## Useful global variables

In [None]:
# Define the device we are using (GPU if available, otherwise CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Ignore warnings
warnings.filterwarnings("ignore")

# Enable cuDNN benchmark mode for performance optimization.
# This allows the cuDNN library to optimize based on the hardware and model, potentially improving training speed.
torch.backends.cudnn.benchmark = True

# Define the image size for the input to the model.
# The YOLO model typically uses a square input size (e.g., 416x416 pixels).
image_size = 416

# Define the number of steps for the training.
# This is how many times the model will iterate over the entire training dataset.
num_steps = 30

# Define the grid size for the YOLO model.
# YOLO uses a grid to divide the image into regions where object detection predictions are made.
grid_size = [image_size // 32, image_size // 16, image_size // 8]

# Define the anchors for the YOLO model taken from the YOLOv3 paper.
# Anchors are predefined bounding box sizes that help the model predict objects of various scales.
anchors = [
    [(116, 90), (156, 198), (373, 326)],  # large scale
    [(30, 61), (62, 45), (59, 119)],      # medium scale
    [(10, 13), (16, 30), (33, 23)]        # small scale
]

# Normalize each anchor by dividing by the image size (416).
# This scales the anchors relative to the size of the input image.
normalized_anchors = [
    [(w / image_size, h / image_size) for (w, h) in scale] for scale in anchors
]

# Convert the normalized anchors to a tensor and move them to the chosen device (GPU/CPU).
normalized_anchors2 = torch.tensor(normalized_anchors).to(device)

# Define the scaled anchors.
# The anchors are further scaled according to the grid size.
scaled_anchors = normalized_anchors2 / (
    1 / torch.tensor(grid_size).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2)
).to(device)

# Define the batch size for training.
# This is the number of images processed together in one forward/backward pass.
batch_size = 32

# Define the learning rate for training.
# This controls how much to adjust the model's parameters in response to the estimated error.
learning_rate = 1e-4

# Define the weight decay for the optimizer.
# This is a regularization technique used to prevent overfitting by penalizing large weights.
weight_decay = 1e-4

# Define the number of workers for loading data.
# This determines how many subprocesses to use for data loading.
num_workers = 4

# Set whether to pin memory in data loaders.
# Pinning memory can speed up data transfer to the GPU.
pin_memory = True

# Set whether to save the model after training.
save_model = True

# Set whether to load a pre-trained model.
load_model = False

# set whether to resume training or reset from start
resume_training = True

# Define the file name for saving the model snapshot.
snapshot_file = "snapshot2_VOC.pth.tar"

# Define the Intersection over Union (IoU) threshold for evaluating detections.
# IoU measures the overlap between the predicted bounding box and the ground truth.
iou_threshold = 0.5

# Define the confidence threshold for detections.
# This sets the minimum confidence level for considering a detection as valid.
conf_threshold = 0.7

# The IoU (Intersection over Union) threshold used to determine whether a predicted bounding box 
# should be ignored during training. If the IoU between a predicted box and any ground truth box 
# exceeds this value, the predicted box is ignored in the loss calculation, 
# as it is considered a sufficiently good prediction (value taken from the cfg file of YOLOv3 given on the official site)
ignore_thresh = 0.7

# Define the root directory for the dataset.
root_dir = '/kaggle/input/pascal-voc/PASCAL_VOC' # To use PASCAL_VOC dataset
#root_dir = '/kaggle/input/coco-dataset/COCO'    # To use COCO dataset

# Set paths based on the chosen dataset.
# This block checks if the dataset is PASCAL VOC or COCO and sets paths accordingly.
if root_dir == '/kaggle/input/coco-dataset/COCO':
    names_path = '/kaggle/input/data-dataset/data/coco.names'
elif root_dir == '/kaggle/input/pascal-voc/PASCAL_VOC':
    names_path = '/kaggle/input/data-dataset/data/voc.names'

train_csv_path = "train.csv"
val_csv_path = "val.csv"
test_csv_path = "test.csv"

# Load the color palette for visualizing bounding boxes.
# This palette is used to color the boxes drawn around detected objects.
colors = pkl.load(open("/kaggle/input/data-dataset/data/pallete", "rb"))

# Step 2 : Define some utility function 

In [None]:
# Function to calculate the Intersection over Union (IoU) score
def compute_iou_boxes(box1, box2):
        # Extract center coordinates and dimensions
        center_x1, center_y1, width1, height1 = box1[..., 0:1], box1[..., 1:2], box1[..., 2:3], box1[..., 3:4]
        center_x2, center_y2, width2, height2 = box2[..., 0:1], box2[..., 1:2], box2[..., 2:3], box2[..., 3:4]

        # Calculate corners of the bounding boxes
        b1_x1 = center_x1 - width1 / 2
        b1_y1 = center_y1 - height1 / 2
        b1_x2 = center_x1 + width1 / 2
        b1_y2 = center_y1 + height1 / 2

        b2_x1 = center_x2 - width2 / 2
        b2_y1 = center_y2 - height2 / 2
        b2_x2 = center_x2 + width2 / 2
        b2_y2 = center_y2 + height2 / 2

        # Get the coordinates of the intersection rectangle
        x1 = torch.max(b1_x1, b2_x1)
        y1 = torch.max(b1_y1, b2_y1)
        x2 = torch.min(b1_x2, b2_x2)
        y2 = torch.min(b1_y2, b2_y2)

        # Calculate the area of intersection rectangle (clamping the negative values to 0)
        intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)

        # Compute the area of both the prediction and ground-truth rectangles
        box1_area = abs((b1_x2 - b1_x1) * (b1_y2 - b1_y1))
        box2_area = abs((b2_x2 - b2_x1) * (b2_y2 - b2_y1))
        union = box1_area + box2_area - intersection

        # Compute the Intersection over Union (IoU) score
        iou_score = intersection / (union + 1e-6)

        return iou_score

In [None]:
# Function to calculate the Intersection over Union (IoU) score
# When boxes contain dimensions directly
def compute_iou_sizes(box1, box2):
        # Compute the intersection width and height
        inter_width = torch.min(box1[..., 0], box2[..., 0])
        inter_height = torch.min(box1[..., 1], box2[..., 1])
        inter_area = inter_width * inter_height

        # Compute the area of both boxes
        box1_area = box1[..., 0] * box1[..., 1]
        box2_area = box2[..., 0] * box2[..., 1]
        union_area = box1_area + box2_area - inter_area

        # Compute the IoU score
        iou_score = inter_area / union_area

        return iou_score

In [None]:
# NB : This function is not very used but it is our functional hand implementation of NMS 
# that is why we kept it in our notebook. Nevertheless since it uses a loop rather than tensor operations
# it is much slower than the next impelementation that uses the torchvision package

# Function to apply non-maximum suppression (NMS) to bounding boxes
def apply_nms_manual(bboxes, iou_thresh, conf_thresh):
    # Retain boxes that meet the confidence threshold
    filtered_boxes = [box for box in bboxes if box[1] > conf_thresh]

    # Sort boxes based on descending confidence
    filtered_boxes.sort(key=lambda x: x[1], reverse=True)

    # Store final bounding boxes after non-maximum suppression
    final_boxes = []

    while filtered_boxes:
        # Take the box with the highest confidence as the primary box
        highest_conf_box = filtered_boxes.pop(0)

        # Add this primary box to the result set
        final_boxes.append(highest_conf_box)

        # Define a new list of boxes to keep (exclude those with high IoU with the current box)
        filtered_boxes = [
            box for box in filtered_boxes
            if box[0] != highest_conf_box[0] or compute_iou_boxes(
                torch.tensor(highest_conf_box[2:]), torch.tensor(box[2:])
            ) < iou_thresh
        ]

    return final_boxes

In [None]:
# This implementation even though it is not hand made it seems to be much faster than the previous one

def apply_nms(bboxes, iou_thresh, conf_thresh):
    # Convert bounding boxes to Tensor
    boxes = torch.tensor([box[2:] for box in bboxes], dtype=torch.float32).to(device)
    scores = torch.tensor([box[1] for box in bboxes], dtype=torch.float32).to(device)
    indices = torchvision.ops.nms(boxes, scores, iou_thresh)

    # Filter out boxes that don't meet the confidence threshold
    final_boxes = [bboxes[i] for i in indices if bboxes[i][1] > conf_thresh]
    return final_boxes

In [None]:
def transform_boxes(tensor):
    """This function transforms a tensor representing bounding boxes and their associated class labels
    by rearranging and reshaping it into a specific format required for further processing."""
    
    # Check if the tensor is empty
    if tensor.numel() == 0:
        return torch.empty((0, 5))  # Return an empty tensor with the expected dimensions (1 batch, 5 columns)

    # Step 1: Extract the necessary parts of the tensor
    class_label = tensor[:, 0]  # Extract the class label from the first column of each row
    coordinates = tensor[:, 2:] # Extract the bounding box coordinates (x, y, w, h) from columns 2 onward

    # Step 2: Concatenate the parts together in the desired order
    transformed_tensor = torch.cat([class_label.unsqueeze(1), coordinates], dim=1)
    # The class label is unsqueezed to add a dimension, making it a column vector,
    # and then concatenated with the coordinates along the columns to form a new tensor

    # Step 3: Add an extra dimension to match the target shape
    transformed_tensor = transformed_tensor.unsqueeze(0)
    # A new dimension is added at the beginning, typically to match a target shape required for batch processing

    return transformed_tensor  # The transformed tensor is returned for further use

In [None]:
def load_classes(namesfile):
    """Utility function to load class names from a file."""
    with open(namesfile, "r") as file:
        names = file.read().strip().split("\n")
    return names

In [None]:
def display_image_with_boxes(data, class_labels):
    """Displays images with bounding boxes using Matplotlib."""

    # Check if data is empty or if there are no images or bounding boxes
    if not data or 'image' not in data or 'bboxes' not in data:
        print("data does not contain right format")
        return  # Do nothing if the data is empty or lacks required keys
    
    if len(data['image']) == 0 or len(data['bboxes']) == 0:
        print("There is no image or box to display")
        return  # Do nothing if there are no images or bounding boxes

    # Step 1: Extract the image tensor and bounding boxes from the data
    image_tensor = data['image'][0]  # Extract the first image tensor from the batch
    bboxes = data['bboxes'][0]       # Extract the corresponding bounding boxes for the first image

    # If the extracted image tensor or bounding boxes are empty, return early
    if image_tensor.numel() == 0 or len(bboxes) == 0:
        return  # Do nothing if the image or bounding boxes are empty

    # Step 2: Convert the image tensor to a PIL image, then to a NumPy array for display
    image = ToPILImage()(image_tensor)  # Convert the image tensor to a PIL image
    image = np.array(image)             # Convert the PIL image to a NumPy array for display

    # Step 3: Set up Matplotlib figure and axis to display the image
    fig, ax = plt.subplots(1)  # Create a new figure and axis
    ax.imshow(image)           # Display the image on the axis
    img_height, img_width, _ = image.shape  # Get the image dimensions (height, width, channels)

    # Step 4: Generate random colors for each class label for bounding box display
    colors = [(random.random(), random.random(), random.random()) for _ in range(len(class_labels))]
    # Generate a random RGB color for each class label

    # Step 5: Loop through each bounding box and display it on the image
    for bbox in bboxes:
        class_index = int(bbox[0])           # Get the class index from the bounding box
        class_name = class_labels[class_index]  # Get the class name using the index
        color = colors[class_index]          # Get the color corresponding to the class label

        # Step 6: Convert the bounding box center coordinates and dimensions to rectangle format
        x_center, y_center, w, h = bbox[1:]  # Extract the center coordinates (x, y) and dimensions (w, h)
        x = x_center * img_width - w * img_width / 2  # Calculate the x-coordinate of the top-left corner
        y = y_center * img_height - h * img_height / 2  # Calculate the y-coordinate of the top-left corner

        # Step 7: Create a rectangle patch for the bounding box and add it to the axis
        rect = patches.Rectangle(
            (x, y), w * img_width, h * img_height, linewidth=2, edgecolor=color, facecolor='none'
        )
        ax.add_patch(rect)  # Add the rectangle patch to the axis

        # Step 8: Add class label text to the bounding box
        font_size = np.clip(h * img_height / 25, 8, None)  # Dynamically set the font size based on box height
        vertical_alignment = 'top'
        horizontal_alignment = 'left'
        padding = 3  # Add some padding to the text position
        x_text = x + padding
        y_text = y + padding if y + padding > 0 else 0

        # Add text label with background color matching the bounding box
        ax.text(
            x_text, y_text, class_name, color='white', fontsize=font_size,
            va=vertical_alignment, ha=horizontal_alignment,
            bbox=dict(facecolor=color, edgecolor=color, boxstyle='round,pad=0.1')
        )

    # Step 9: Remove axis ticks and display the final image with bounding boxes
    plt.axis('off')  # Turn off axis ticks and labels
    plt.show()       # Display the image with bounding boxes

In [None]:
# This function converts the output of a YOLO model from grid coordinates to bounding boxes.
# It processes the output tensor, which contains confidence scores, class predictions,
# and bounding box attributes, and adjusts these values relative to the grid dimensions.
# The function returns the bounding boxes along with their associated class and confidence scores
# in a format that is easier to interpret for further processing or visualization.

def grid_to_bboxes(output, anchors, grid_dim):
    # Determine the number of batches and anchors
    batch_count = output.size(0)
    num_anchors = len(anchors)

    # Split the output into their respective components
    confidence_scores = output[..., 0:1]
    class_predictions = output[..., 5:6]
    bbox_attributes = output[..., 1:5]

    # Generate grid offsets for x and y directions
    grid_y, grid_x = torch.meshgrid(torch.arange(grid_dim), torch.arange(grid_dim))
    grid_x, grid_y = grid_x.to(output.device), grid_y.to(output.device)

    # Expand dimensions for broadcasting with bbox_attributes
    grid_x = grid_x.view(1, 1, grid_dim, grid_dim, 1).repeat(batch_count, num_anchors, 1, 1, 1)
    grid_y = grid_y.view(1, 1, grid_dim, grid_dim, 1).repeat(batch_count, num_anchors, 1, 1, 1)

    # Compute the adjusted x and y coordinates
    x_adj = (bbox_attributes[..., 0:1] + grid_x) / grid_dim
    y_adj = (bbox_attributes[..., 1:2] + grid_y) / grid_dim

    # Calculate width and height relative to the grid
    box_wh = bbox_attributes[..., 2:4] / grid_dim

    # Combine the adjusted coordinates, dimensions, class predictions, and confidence scores
    final_bboxes = torch.cat([class_predictions, confidence_scores, x_adj, y_adj, box_wh], dim=-1)

    # Reshape the tensor to the desired shape
    final_bboxes = final_bboxes.view(batch_count, num_anchors * grid_dim * grid_dim, 6)

    # Convert to list format for output
    return final_bboxes.tolist()

In [None]:
# This function converts the predictions from a YOLO model into bounding boxes. 
# It processes the output tensor, which contains bounding box attributes, confidence scores,
# and class predictions, and adjusts these values relative to the grid dimensions and anchor boxes.
# The function applies transformations to ensure the bounding boxes are in the correct format
# and scales them appropriately. The output is a list of bounding boxes with associated class 
# predictions and confidence scores, ready for further processing or evaluation.

def pred_to_bboxes(output, anchors, grid_dim):
    # Determine the number of batches and anchors
    batch_count = output.size(0)
    num_anchors = len(anchors)
    bbox_attributes = output[..., 1:5]
    
    # Apply the sigmoid function to the x and y coordinates to constrain them between 0 and 1,
    # which makes them relative to the grid cell. 
    # Apply the exponent function to the width and height to ensure they are positive and scale them
    # by the corresponding anchor box dimensions. Then, compute the confidence scores and determine 
    # the best predicted class for each box.

    anchors = anchors.reshape(1, len(anchors), 1, 1, 2)
    bbox_attributes[..., 0:2] = torch.sigmoid(bbox_attributes[..., 0:2])
    bbox_attributes[..., 2:] = torch.exp(bbox_attributes[..., 2:]) * anchors
    confidence_scores = torch.sigmoid(output[..., 0:1])
    class_predictions = torch.argmax(output[..., 5:], dim=-1, keepdim=True)

    # Generate grid offsets for x and y directions
    grid_y, grid_x = torch.meshgrid(torch.arange(grid_dim), torch.arange(grid_dim))
    grid_x, grid_y = grid_x.to(output.device), grid_y.to(output.device)

    # Expand dimensions for broadcasting with bbox_attributes
    grid_x = grid_x.view(1, 1, grid_dim, grid_dim, 1).repeat(batch_count, num_anchors, 1, 1, 1)
    grid_y = grid_y.view(1, 1, grid_dim, grid_dim, 1).repeat(batch_count, num_anchors, 1, 1, 1)

    # Compute the adjusted x and y coordinates
    x_adj = (bbox_attributes[..., 0:1] + grid_x) / grid_dim
    y_adj = (bbox_attributes[..., 1:2] + grid_y) / grid_dim

    # Calculate width and height relative to the grid
    box_wh = bbox_attributes[..., 2:4] / grid_dim

    # Combine the adjusted coordinates, dimensions, class predictions, and confidence scores
    final_bboxes = torch.cat([class_predictions, confidence_scores, x_adj, y_adj, box_wh], dim=-1)

    # Reshape the tensor to the desired shape
    final_bboxes = final_bboxes.view(batch_count, num_anchors * grid_dim * grid_dim, 6)

    # Convert to list format for output
    return final_bboxes.tolist()


In [None]:
def calculate_accuracy(predictions, targets, mask):
    """
    Computes the classification accuracy by comparing the predicted classes 
    to the target classes for the objects identified by the mask.
    """
    correct_predictions = torch.sum(torch.argmax(predictions[mask], dim=-1) == targets[mask])
    total_predictions = torch.sum(mask)
    return correct_predictions.detach(), total_predictions.detach()

def calculate_objectness_accuracy(predictions, targets, mask, threshold):
    """
    Computes the accuracy of objectness predictions by thresholding the predictions
    and comparing them to the target values for the objects identified by the mask.
    """
    objectness_predictions = torch.sigmoid(predictions) > threshold
    correct_predictions = torch.sum(objectness_predictions[mask] == targets[mask])
    total_predictions = torch.sum(mask)
    return correct_predictions.detach(), total_predictions.detach()

def calculate_coord_accuracy(predictions, targets, mask, iou_threshold):
    """
    Computes the coordinates accuracy by comparing IoU of predicted and target boxes.
    Returns the number of correct predictions where IoU exceeds the threshold.
    """
    # Compute IoUs
    iou_scores = compute_iou_boxes(predictions[mask][..., :4], targets[mask][..., :4])

    # Determine the number of predictions with IoU greater than the threshold
    correct_predictions = torch.sum(iou_scores > iou_threshold)
    total_predictions = torch.sum(mask)

    return correct_predictions.detach(), total_predictions.detach()

def evaluate_model_accuracy(model, data_loader, threshold, iou_threshold=0.5):
    # Set the model to evaluation mode (disables dropout, batch norm, etc.)
    model.eval()
    
    # Initialize a dictionary to store accuracy metrics for different predictions
    accuracy_metrics = {
        "correct_class_predictions": 0,      # Correct class predictions
        "total_class_predictions": 0,        # Total class predictions
        "correct_object_predictions": 0,     # Correct objectness predictions (cells with objects)
        "total_object_predictions": 0,       # Total objectness predictions (cells with objects)
        "correct_noobject_predictions": 0,   # Correct no-object predictions (cells without objects)
        "total_noobject_predictions": 0,     # Total no-object predictions (cells without objects)
        "correct_coord_predictions": 0,      # Correct coordinate predictions (IoU above threshold)
        "total_coord_predictions": 0,        # Total coordinate predictions (cells with objects)
    }

    # Loop through each batch in the data loader
    for batch_index, batch in enumerate(data_loader):
        images = batch['image']              # Extract images from the batch
        labels = batch['outputs']            # Extract ground truth labels from the batch
        images = images.to(device)           # Move images to the appropriate device (GPU/CPU)

        # Disable gradient calculation for evaluation
        with torch.no_grad():
            predictions = model(images)      # Perform forward pass to get model predictions

            # Iterate over the three scales (e.g., 13x13, 26x26, 52x52) of the model's output
            for scale_index in range(3):
                labels[scale_index] = labels[scale_index].to(device)  # Move labels to device
                object_mask = labels[scale_index][..., 0] == 1        # Mask for cells with objects
                noobject_mask = labels[scale_index][..., 0] == 0      # Mask for cells without objects

                # Calculate and accumulate class accuracy for cells with objects
                correct_preds, total_preds = calculate_accuracy(
                    predictions[scale_index][..., 5:],               # Predicted classes
                    labels[scale_index][..., 5],                      # Ground truth classes
                    object_mask                                       # Mask for objects
                )
                accuracy_metrics["correct_class_predictions"] += correct_preds
                accuracy_metrics["total_class_predictions"] += total_preds

                # Calculate and accumulate objectness accuracy for cells with objects
                correct_preds, total_preds = calculate_objectness_accuracy(
                    predictions[scale_index][..., 0],                 # Predicted objectness
                    labels[scale_index][..., 0],                      # Ground truth objectness
                    object_mask,                                      # Mask for objects
                    threshold                                         # Objectness threshold
                )
                accuracy_metrics["correct_object_predictions"] += correct_preds
                accuracy_metrics["total_object_predictions"] += total_preds

                # Calculate and accumulate no-object accuracy for cells without objects
                correct_preds, total_preds = calculate_objectness_accuracy(
                    predictions[scale_index][..., 0],                 # Predicted objectness
                    labels[scale_index][..., 0],                      # Ground truth objectness
                    noobject_mask,                                    # Mask for no-objects
                    threshold                                         # Objectness threshold
                )
                accuracy_metrics["correct_noobject_predictions"] += correct_preds
                accuracy_metrics["total_noobject_predictions"] += total_preds

                # Calculate and accumulate coordinates accuracy for cells with objects
                correct_preds, total_preds = calculate_coord_accuracy(
                    predictions[scale_index][..., 1:5],               # Predicted coordinates (x, y, w, h)
                    labels[scale_index][..., 1:5],                    # Ground truth coordinates (x, y, w, h)
                    object_mask,                                      # Mask for objects
                    iou_threshold                                     # IoU threshold for accuracy
                )
                accuracy_metrics["correct_coord_predictions"] += correct_preds
                accuracy_metrics["total_coord_predictions"] += total_preds

    # Calculate overall accuracy metrics for the current batch
    class_accuracy = (accuracy_metrics["correct_class_predictions"] / 
                      (accuracy_metrics["total_class_predictions"] + 1e-16)) * 100
    noobject_accuracy = (accuracy_metrics["correct_noobject_predictions"] / 
                         (accuracy_metrics["total_noobject_predictions"] + 1e-16)) * 100
    object_accuracy = (accuracy_metrics["correct_object_predictions"] / 
                       (accuracy_metrics["total_object_predictions"] + 1e-16)) * 100
    coord_accuracy = (accuracy_metrics["correct_coord_predictions"] / 
                      (accuracy_metrics["total_coord_predictions"] + 1e-16)) * 100

    # Print accuracy metrics for the current batch
    print(f"Class accuracy: {class_accuracy:.2f}%")
    print(f"No-object accuracy: {noobject_accuracy:.2f}%")
    print(f"Object accuracy: {object_accuracy:.2f}%")
    print(f"Coordinates accuracy (IoU > {iou_threshold}): {coord_accuracy:.2f}%")

    # Set the model back to training mode after evaluation
    model.train()

In [None]:
def save_snapshot(model, optimizer, filename, step=None, scheduler=None, extra_info=None):
    """
    Saves the model and optimizer state dictionaries along with additional information.

    Args:
        model (nn.Module): The model to save.
        optimizer (torch.optim.Optimizer): The optimizer to save.
        filename (str): The filename for the snapshot.
        step (int, optional): The current step number, if available. Defaults to None.
        scheduler (torch.optim.lr_scheduler, optional): The learning rate scheduler, if used. Defaults to None.
        extra_info (dict, optional): Any extra information you want to save. Defaults to None.

    Returns:
        None
    """
    print("=> Saving model snapshot")
    
    snapshot = {
        "state_dict": model.state_dict(),  # Save model parameters
        "optimizer_state_dict": optimizer.state_dict(),  # Save optimizer parameters
    }
    
    if step is not None:
        snapshot["step"] = step  # Save the current step number if provided
    
    if scheduler is not None:
        snapshot["scheduler_state_dict"] = scheduler.state_dict()  # Save the scheduler state if provided
    
    if extra_info is not None:
        snapshot["extra_info"] = extra_info  # Save any additional information if provided
    
    # Save the snapshot to the specified file
    torch.save(snapshot, filename)

In [None]:
def load_snapshot(snapshot_file, model, optimizer=None, scheduler=None, lr=None, device=torch.device('cpu')):
    """
    Loads the model, optimizer, and scheduler state dictionaries from a snapshot file.

    Args:
        snapshot_file (str): The path to the snapshot file.
        model (nn.Module): The model to load the state into.
        optimizer (torch.optim.Optimizer, optional): The optimizer to load the state into. Defaults to None.
        scheduler (torch.optim.lr_scheduler, optional): The scheduler to load the state into. Defaults to None.
        lr (float, optional): The learning rate to set for the optimizer. If None, the optimizer's learning rate is not modified.
        device (torch.device, optional): The device on which to load the model. Defaults to CPU.

    Returns:
        int: The step number from the snapshot, if available, else None.
        dict: Extra information stored in the snapshot, if available, else None.
    """
    print(f"=> Loading model snapshot to {device}")
    snapshot = torch.load(snapshot_file, map_location=device)

    # Load model state
    model.load_state_dict(snapshot["state_dict"])
    model.to(device)  # Ensure the model is moved to the correct device
    
    # Load optimizer state if provided
    if optimizer is not None and "optimizer_state_dict" in snapshot:
        optimizer.load_state_dict(snapshot["optimizer_state_dict"])

        # Reset the learning rate if a new one is provided
        if lr is not None:
            for param_group in optimizer.param_groups:
                param_group["lr"] = lr

    # Load scheduler state if provided
    if scheduler is not None and "scheduler_state_dict" in snapshot:
        scheduler.load_state_dict(snapshot["scheduler_state_dict"])

    # Return the step and any extra information if they exist
    step = snapshot.get("step", None)
    extra_info = snapshot.get("extra_info", None)

    return step, extra_info

In [None]:
def extract_bboxes_from_predictions(predictions, anchors, num_scales, device):
    """
    Extracts bounding boxes from the model's predictions for each scale.

    Args:
        predictions (list): List of predictions from the model, one for each scale.
        anchors (list): List of anchor boxes for each scale.
        num_scales (int): Number of scales used in the model.
        device (str): Device on which the operations will be performed.

    Returns:
        list: List of bounding boxes for each image in the batch.
    """
    batch_size = predictions[0].shape[0]
    all_bboxes = [[] for _ in range(batch_size)]
    
    for scale_idx in range(num_scales):
        _, _, grid_size, _, _ = predictions[scale_idx].shape
        anchor = torch.tensor([*anchors[scale_idx]]).to(device) * grid_size

        # Process the entire batch at once
        boxes = pred_to_bboxes(predictions[scale_idx], anchor, grid_size)
        for img_idx, box in enumerate(boxes):
            all_bboxes[img_idx].extend(box)
    
    return all_bboxes

def extract_ground_truth_bboxes(labels, anchors, scale_idx, grid_size, device):
    """
    Extracts ground truth bounding boxes from labels for a specific scale.

    Args:
        labels (list of Tensors): The labels from the DataLoader.
        anchors (list): List of anchor boxes for the given scale.
        scale_idx (int): The index of the scale to be processed.
        grid_size (int): The size of the grid for the given scale.
        device (str): Device on which to perform computations.

    Returns:
        list: Ground truth bounding boxes for each image in the batch.
    """
    # Ensure labels are on the correct device
    labels[scale_idx] = labels[scale_idx].to(device)
    
    # Move anchors to the same device as labels
    anchors = torch.tensor(anchors[scale_idx]).to(device)

    # Convert grid-based labels to bounding boxes
    gt_bboxes = grid_to_bboxes(labels[scale_idx], anchors, grid_size)
    
    return gt_bboxes

def gather_evaluation_bboxes(loader, model, iou_threshold, conf_threshold, anchors, device):
    """
    Gathers predicted and ground truth bounding boxes for evaluation.

    Args:
        loader (DataLoader): DataLoader providing batches of images and labels.
        model (nn.Module): The object detection model.
        iou_threshold (float): IoU threshold for NMS.
        conf_threshold (float): Confidence threshold for filtering predictions.
        anchors (list): List of anchor boxes used by the model.
        device (str): Device on which to perform computations.

    Returns:
        tuple: Lists of predicted and ground truth bounding boxes.
    """
    model.eval()  # Set the model to evaluation mode
    image_idx = 0
    predicted_bboxes = []
    ground_truth_bboxes = []
    num_scales = len(anchors)
    
    for batch in loader:
        images = batch['image'].to(device)
        labels = batch['outputs']

        with torch.no_grad():
            predictions = model(images)

            # Process the entire batch at once
            batch_bboxes = extract_bboxes_from_predictions(predictions, anchors, num_scales, device)
            gt_bboxes = extract_ground_truth_bboxes(labels, anchors, scale_idx=2, grid_size=predictions[2].shape[2], device=device)
            
            for img_idx in range(len(images)):
                # Filter predictions using NMS
                filtered_bboxes = apply_nms(batch_bboxes[img_idx], iou_threshold, conf_threshold)

                # Store filtered predictions
                for bbox in filtered_bboxes:
                    predicted_bboxes.append([image_idx] + bbox)

                # Store ground truth bboxes above confidence threshold
                for gt_bbox in gt_bboxes[img_idx]:
                    if gt_bbox[1] > conf_threshold:
                        ground_truth_bboxes.append([image_idx] + gt_bbox)

                image_idx += 1

    model.train()  # Reset model to training mode
    return predicted_bboxes, ground_truth_bboxes

In [None]:
def map_evaluation(predicted_boxes, actual_boxes, num_classes, iou_threshold=0.5):
    """
    This function calculates the mean Average Precision (mAP) for object detection models. 
    mAP is a metric that evaluates how well the model predicts object locations and classifications.

    Parameters:
    - predicted_boxes: List of predicted bounding boxes, where each entry is a tuple 
      (image_index, class_index, confidence_score, x, y, width, height).
    - actual_boxes: List of ground truth bounding boxes, where each entry is a tuple 
      (image_index, class_index, x, y, width, height).
    - num_classes: Total number of object classes.
    - iou_threshold: Intersection over Union (IoU) threshold for determining whether a prediction 
      is a true positive (default is 0.5).

    Returns:
    - Mean Average Precision (mAP) for the given predictions and ground truths.
    """

    # List to store average precision for each class
    class_avg_precisions = []

    # Ensure numerical stability by adding a small epsilon value to avoid division by zero
    epsilon = 1e-6

    # Organize ground truths and predictions by class in dictionaries
    class_wise_predictions = defaultdict(list)
    class_wise_ground_truths = defaultdict(list)

    # Populate class-wise predictions dictionary
    for pred in predicted_boxes:
        class_wise_predictions[pred[1]].append(pred)

    # Populate class-wise ground truths dictionary
    for gt in actual_boxes:
        class_wise_ground_truths[gt[1]].append(gt)

    # Iterate over each class to calculate average precision
    for class_idx in range(num_classes):
        # Get predictions and ground truths for the current class
        predictions = class_wise_predictions[class_idx]
        ground_truths = class_wise_ground_truths[class_idx]

        # If there are no ground truths for this class, skip to the next class
        if len(ground_truths) == 0:
            continue

        # Organize ground truths by image index
        gt_image_dict = defaultdict(list)
        for gt in ground_truths:
            gt_image_dict[gt[0]].append(gt)

        # Initialize list to store true positives (TP) and false positives (FP)
        tp_fp_pairs = []

        # Process each prediction for this class, sorted by confidence score (highest to lowest)
        for pred in sorted(predictions, key=lambda x: x[2], reverse=True):
            img_idx = pred[0]  # Image index
            best_iou = 0  # Initialize the best IoU
            best_gt_idx = -1  # Index of the best ground truth match

            # Compare prediction with each ground truth in the same image
            for gt_idx, gt in enumerate(gt_image_dict[img_idx]):
                # Compute IoU between the predicted box and the ground truth box
                iou = compute_iou_boxes(
                    torch.tensor(pred[3:]),
                    torch.tensor(gt[3:])
                )

                # If this IoU is the best so far, update the best IoU and the index of the best match
                if iou > best_iou:
                    best_iou = iou
                    best_gt_idx = gt_idx

            # If the best IoU exceeds the threshold, it is considered a match
            if best_iou > iou_threshold:
                # Check if this ground truth was already matched to a prediction
                if not gt_image_dict[img_idx][best_gt_idx][2]:  # If not matched yet
                    tp_fp_pairs.append((1, 0))  # True Positive
                    gt_image_dict[img_idx][best_gt_idx][2] = 1  # Mark this GT as matched
                else:
                    tp_fp_pairs.append((0, 1))  # False Positive due to multiple matches with the same GT
            else:
                tp_fp_pairs.append((0, 1))  # False Positive due to low IoU

        # Convert the list of TP/FP pairs to a 2D tensor
        if len(tp_fp_pairs) > 0:
            tp_fp_pairs = torch.tensor(tp_fp_pairs, dtype=torch.float32)
        else:
            tp_fp_pairs = torch.zeros((0, 2))

        # Separate true positives and false positives
        true_positives = tp_fp_pairs[:, 0]
        false_positives = tp_fp_pairs[:, 1]

        # Calculate cumulative sums of true positives and false positives
        tp_cumsum = torch.cumsum(true_positives, dim=0)
        fp_cumsum = torch.cumsum(false_positives, dim=0)

        # Calculate recall and precision values
        recalls = tp_cumsum / (len(ground_truths) + epsilon)
        precisions = tp_cumsum / (tp_cumsum + fp_cumsum + epsilon)

        # Add a point (1,0) to the precision-recall curve for easier calculation
        precisions = torch.cat((torch.tensor([1]), precisions))
        recalls = torch.cat((torch.tensor([0]), recalls))

        # Compute the average precision (AP) for this class by integrating the area under the precision-recall curve
        ap = torch.trapz(precisions, recalls)
        class_avg_precisions.append(ap)

    # Return the mean of the average precisions for all classes
    return sum(class_avg_precisions) / len(class_avg_precisions) if class_avg_precisions else 0

In [None]:
def validate_func(val_loader, loss_func, model, scaled_anchors):
    """
    This function performs validation on a given validation dataset loader. It evaluates the model's performance
    by calculating the average validation loss across all batches in the validation dataset.

    Parameters:
    - val_loader: DataLoader object for the validation dataset.
    - loss_func: The loss function used to compute the loss for model predictions.
    - model: The YOLOv3 model being evaluated.
    - scaled_anchors: The anchors scaled according to the feature map sizes.

    Returns:
    - mean_val_loss: The average validation loss over all batches.
    """

    model.eval()  # Set the model to evaluation mode, disabling dropout and batch normalization layers
    val_loss_list = []  # Initialize a list to store the loss values for each batch
    
    # Disable gradient computation during validation to save memory and computation
    with torch.no_grad():  
        # Iterate over the validation dataset by batch
        for i_batch, batch in enumerate(val_loader):
            img = batch['image']  # Extract the images from the current batch
            outputs = batch['outputs']  # Extract the ground truth outputs (targets) from the current batch

            img = img.to(device)  # Move the images to the GPU for faster computation
            output0, output1, output2 = [output.to(device) for output in outputs]  # Move ground truth outputs to the GPU

            # Perform a forward pass through the model to obtain predictions
            model_outputs = model(img)  # Get the model's predictions
            
            total_loss = 0  # Initialize the total loss for this batch
            
            # Calculate the total loss across different scales and anchors
            for output, target, anchor in zip(model_outputs, [output0, output1, output2], scaled_anchors):
                total_loss += loss_func(output, anchor, target)  # Compute the loss for each scale and accumulate

            val_loss_list.append(total_loss.item())  # Store the loss value for the current batch

    # Calculate the mean validation loss over all batches
    mean_val_loss = sum(val_loss_list) / len(val_loss_list)
    return mean_val_loss  # Return the mean validation loss

# Step 3: Data Loader

This section defines our custom dataset loader using the `torch.utils.data.Dataset` class. It handles the loading of images and their corresponding labels from the dataset. The labels are stored in `.txt` files, where each file has the same name as its associated image. Each line in a label file contains the class index (or class label) and the bounding box coordinates for objects in the image. These bounding box coordinates are normalized between 0 and 1. 

### Data Transformation

The images are first padded to maintain their aspect ratio and then resized to 416x416 pixels, which is the input size expected by the YOLO model. The bounding box coordinates are adjusted accordingly to match the resized image dimensions. During training, data augmentation techniques such as random color jittering are applied to enhance model robustness. For validation or testing, only resizing and normalization are performed to maintain consistency.

In [None]:
class YOLODataset(Dataset):
    def __init__(self, root, csv_file, anchors, img_size=416, max_objects=100, ignore_thresh = 0.7, grid_sizes = [13,26,52], train=True):
        self.data = pd.read_csv(os.path.join(root, csv_file)) # csv file should have image name and label name
        self.img_dir = os.path.join(root, 'images') # image directory
        self.label_dir = os.path.join(root, 'labels') # label directory
        self.img_size = img_size # image size
        self.max_objects = max_objects # maximum number of objects in an image
        self.ignore_thresh = ignore_thresh # ignore threshold
        self.grid_sizes = grid_sizes # grid sizes
        self.train = train # train or test/validation

        flattened_anchors = [anchor for scale in anchors for anchor in scale]
        self.anchors = torch.tensor(flattened_anchors) # tensor of anchors
        self.total_anchors = self.anchors.shape[0]  # total number of anchors
        self.anchors_per_scale = self.total_anchors // len(grid_sizes)  # number of anchors per scale

        # transform images differently for training and testing/validation.
        # Training images are augmented.
        if self.train:
            self.transform = transforms.Compose([
                transforms.Resize((img_size, img_size)),
                transforms.ColorJitter(brightness=0.5, contrast=0.5, saturation=0.5, hue=0.5),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0, 0, 0], std=[1, 1, 1])
            ])
        else:
            self.transform = transforms.Compose([
                transforms.Resize((img_size, img_size)),
                transforms.ToTensor(),
                transforms.Normalize(mean=[0, 0, 0], std=[1, 1, 1])
            ])

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        # ------------------------------- Image part -------------------------------
        img_path = os.path.join(self.img_dir, self.data.iloc[index, 0]) # image path for the given index
        image = Image.open(img_path).convert('RGB') # open image
        original_width, original_height = image.size # original image size

        pad_width = 0
        pad_height = 0
        max_edge = max(original_width, original_height) # maximum edge of the image

        # pad the image to make it square
        if original_width < max_edge:
            pad_width = (max_edge - original_width)
        elif original_height < max_edge:
            pad_height = (max_edge - original_height)

        image = transforms.Pad((0, 0, pad_width, pad_height))(image) # pad image
        image = self.transform(image) # transform image

        # ------------------------------- Label part -------------------------------
        label_path = os.path.join(self.label_dir, self.data.iloc[index, 1]) # label path for the given index

        # read the label file if it exists and extract bounding boxes
        if os.path.exists(label_path) and os.path.getsize(label_path) > 0:
            bboxes = np.loadtxt(label_path, delimiter=" ", ndmin=2)
            if bboxes.ndim == 1:
                bboxes = bboxes[np.newaxis, :]
        else:
            bboxes = np.zeros((0, 5))

        # limit the number of objects in an image
        if len(bboxes) > self.max_objects:
            bboxes = bboxes[:self.max_objects]


        scale_x = self.img_size / (original_width + pad_width) # scale factor for x
        scale_y = self.img_size / (original_height + pad_height) # scale factor for y

        bboxes[:, 1] = (bboxes[:, 1] * original_width) * scale_x / self.img_size  # x_center
        bboxes[:, 2] = (bboxes[:, 2] * original_height) * scale_y / self.img_size # y_center
        bboxes[:, 3] = (bboxes[:, 3] * original_width) * scale_x / self.img_size # width
        bboxes[:, 4] = (bboxes[:, 4] * original_height) * scale_y / self.img_size # height

        # Initialize an empty list to store the tensors
        output_tensors = []

        # Loop through each grid dimension
        for grid_size in self.grid_sizes:
            # Create a zero-filled tensor for each grid dimension
            # dimension 6 stands for : [probability, x, y, width, height, class_label]
            zero_tensor = torch.zeros((self.anchors_per_scale, grid_size, grid_size, 6))

            # Append the tensor to the output_tensors list
            output_tensors.append(zero_tensor)

        for box in bboxes:
            # Extract the coordinates of the bounding box
            class_label, x, y, width, height = box

            bbox_tensor = torch.tensor([1, x, y, width, height, class_label])

            # Calculate the iou score for the bounding box and the anchors
            ious = compute_iou_sizes(bbox_tensor[3:5],self.anchors)

            # sort the anchors based on the IoU score
            ious_sorted_indx = ious.argsort(descending=True, dim=0)

            is_scale_handled = [False for _ in range(len(self.grid_sizes))]

            for iou_indx in ious_sorted_indx:
                scale_indx = iou_indx // self.anchors_per_scale
                anchor_for_indx =  iou_indx % self.anchors_per_scale

                grid_size = self.grid_sizes[scale_indx]

                # Calculate the grid cell for the bounding box
                grid_x = int(x * grid_size)
                grid_y = int(y * grid_size)

                anchor_probability = output_tensors[scale_indx][anchor_for_indx, grid_y, grid_x, 0]

                if anchor_probability == 0 :
                    if is_scale_handled[scale_indx] == False:
                        # Update the tensor with the bounding box with the relative coordinates to the grid cell
                        bbox_tensor[0] = 1
                        bbox_tensor[1] = x * grid_size - grid_x
                        bbox_tensor[2] = y * grid_size - grid_y
                        bbox_tensor[3] = width * grid_size
                        bbox_tensor[4] = height * grid_size
                        bbox_tensor[5] = int(class_label) # for security cast to int

                        output_tensors[scale_indx][anchor_for_indx, grid_y, grid_x] = bbox_tensor
                        is_scale_handled[scale_indx] = True # mark the scale as handled

                    elif ious[iou_indx] > self.ignore_thresh:
                        output_tensors[scale_indx][anchor_for_indx, grid_y, grid_x, 0] = -1 # ignore the anchor

        return {
            'image': image,
            'bboxes': torch.tensor(bboxes, dtype=torch.float32),
            'outputs': output_tensors,
        }


## Test the data loader
The following code tests the data loader. It loads the dataset and displays the first image with the bounding boxes of the objects in 2 ways. First it uses directly the bounding boxes retrieved from the label file and the second one uses the output tensor that contains the offset information for each scale and each anchor taken and which is the tensor that will be used for training.

In [None]:
if __name__ == '__main__':

    dataset = YOLODataset(root=root_dir, csv_file='train.csv', anchors=normalized_anchors, train=True)
    loader = DataLoader(dataset, batch_size=1, shuffle=True)
    class_labels = load_classes(names_path)


    i = 0
    for data in loader:
        boxes = []
        y = data['outputs']

        for i in range(y[0].shape[1]):
            anchor = scaled_anchors[i]
            boxes += grid_to_bboxes(y[i], anchors=anchor, grid_dim=y[i].shape[2])[0]

        boxes = apply_nms_manual(boxes, 0.99, 0.99)
        boxes2 = torch.tensor(boxes, dtype=torch.float32)
        boxes = transform_boxes(torch.tensor(boxes, dtype=torch.float32))
        display_image_with_boxes(data, class_labels) # display the boxes

        data['bboxes'] = boxes
        display_image_with_boxes(data, class_labels) # display the boxes retrieved from the offsets and anchors

        i += 1
        if i > 0:
            break


## Batch loader

Since we will be handling only batches, we made a code to test the batches and their sizes if everything matches. The code sets up data loaders for training and test datasets of images using PyTorch, specifically for a YOLOv3 object detection model. It initializes data loaders with a batch size of 8 and an image dimension of 416x416. The training loop iterates over batches, printing out the shape of image tensors and bounding boxes for the first three batches to test the loading and processing functionality.

In [None]:
# This function defines a custom collate function for use with a PyTorch DataLoader.
# The collate function is responsible for combining individual samples into a batch
# that can be fed into the model during training or inference.

def custom_collate_fn(batch):
    # Extract the 'image' and 'outputs' fields from each item in the batch.
    images = [item['image'] for item in batch]
    outputs = [item['outputs'] for item in batch]

    # Stack the list of images into a single tensor along a new dimension (dim=0),
    # creating a batch of images.
    images = torch.stack(images, dim=0)

    # Initialize a list to hold the stacked outputs for each grid size and anchor.
    outputs_stacked = []
    for i in range(len(outputs[0])):  # Assuming the same structure for each batch
        # For each grid size and anchor, stack the corresponding outputs across the batch.
        outputs_for_scale = torch.stack([output[i] for output in outputs], dim=0)
        outputs_stacked.append(outputs_for_scale)

    # Return a dictionary containing the stacked images and the stacked outputs,
    # ready to be used as input for the model.
    return {'image': images, 'outputs': outputs_stacked}

In [None]:
batch_size2 = 8
inp_dim = 416

# Use custom collate function in DataLoader
dataloaders = {
    'train': DataLoader(YOLODataset(root=root_dir, csv_file='train.csv', img_size=inp_dim, anchors=normalized_anchors), batch_size=batch_size2, shuffle=True, collate_fn=custom_collate_fn),
}

for i_batch, sample_batched in enumerate(dataloaders["train"]):
    input_images_batch = sample_batched['image']
    outputs_batch = sample_batched['outputs']

    print(f"Batch {i_batch}:")
    print(f"Image Tensor Shape: {input_images_batch.shape}")
    print(f"Outputs Shape: {[output.shape for output in outputs_batch]}")

    if i_batch == 3:
        break


In [None]:
# Define a function to create and return the data loaders for training, validation and testing.
# The function takes the paths to the training, validation and testing CSV files as input.

def get_loaders(train_csv_path, test_csv_path, val_csv_path=None ):
    
    # Create the training dataset using the specified CSV file, image size, and anchors.
    train_dataset = YOLODataset(root=root_dir, csv_file=train_csv_path, img_size=image_size, anchors=normalized_anchors)
    
    # Create the validation dataset using the specified CSV file, image size and anchors
    if val_csv_path != None:
        val_dataset = YOLODataset(root=root_dir, csv_file=val_csv_path, img_size=image_size, anchors=normalized_anchors, train=False)
    
    # Create the testing dataset using the specified CSV file, image size, and anchors.
    test_dataset = YOLODataset(root=root_dir, csv_file=test_csv_path, img_size=image_size, anchors=normalized_anchors, train=False)

    # Create the data loader for the training dataset.
    # The DataLoader handles batching, shuffling, and parallel loading of data.
    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=batch_size,         # Set the batch size for training
        num_workers=num_workers,       # Number of subprocesses to use for data loading
        pin_memory=pin_memory,         # If True, the data loader will copy Tensors into CUDA pinned memory before returning them
        shuffle=True,                  # Shuffle the data at the start of each step
        drop_last=False,               # Do not drop the last incomplete batch
        collate_fn=custom_collate_fn   # Use the custom collate function to handle data formatting
    )
    
    # Create the data loader for the validation dataset.
    # Similar to the training loader but without data shuffling.
    if val_csv_path != None:
        val_loader = DataLoader(
            dataset=val_dataset,
            batch_size=batch_size,         # Set the batch size for testing
            num_workers=num_workers,       # Number of subprocesses to use for data loading
            pin_memory=pin_memory,         # If True, the data loader will copy Tensors into CUDA pinned memory before returning them
            shuffle=False,                 # Do not shuffle the data
            drop_last=False,               # Do not drop the last incomplete batch
            collate_fn=custom_collate_fn   # Use the custom collate function to handle data formatting
        )
    else:
        val_loader = None

    # Create the data loader for the testing dataset.
    # Similar to the training loader but without data shuffling.
    test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=batch_size,         # Set the batch size for testing
        num_workers=num_workers,       # Number of subprocesses to use for data loading
        pin_memory=pin_memory,         # If True, the data loader will copy Tensors into CUDA pinned memory before returning them
        shuffle=False,                 # Do not shuffle the data
        drop_last=False,               # Do not drop the last incomplete batch
        collate_fn=custom_collate_fn   # Use the custom collate function to handle data formatting
    )

    # Return the training and testing data loaders
    return train_loader, val_loader, test_loader

# Step 4 : Build the model
Now that we have set up the data, we can attack the model. The model will be based on the darknet 53 architecture and so the first thing to do is to implement the building blocks of this architecture and then integrate them into the YOLOv3 architecture.

## Darknet 53

In [None]:
# Define a Convolution Block class, which is a common building block in CNN architectures.
# This block consists of a convolutional layer followed by batch normalization and 
# a Leaky ReLU activation function. It is used to extract features from input data.

class ConvBlock(nn.Module):
    # Initialize the ConvBlock with the given input and output channels, and any additional arguments.
    def __init__(self, input_channels, output_channels, **keyword_args):
        super().__init__()
        # Define a 2D convolutional layer with the specified input and output channels.
        # The bias is set to False since batch normalization handles the bias.
        self.convolution = nn.Conv2d(input_channels, output_channels, bias=False, **keyword_args)
        # Define a batch normalization layer to normalize the output of the convolutional layer,
        # which helps in stabilizing and accelerating the training process.
        self.bn = nn.BatchNorm2d(output_channels)
        # Define a Leaky ReLU activation function with a negative slope of 0.1,
        # which allows a small gradient when the input is negative, preventing dead neurons.
        self.leaky = nn.LeakyReLU(negative_slope=0.1)

    # Define the forward pass, which is the computation performed at each call of the block.
    def forward(self, x):
        # Apply the convolutional layer, followed by batch normalization, 
        # and then the Leaky ReLU activation function to the input tensor x.
        return self.leaky(self.bn(self.convolution(x)))


In [None]:
# Define a Residual Block class, a fundamental component in many deep neural networks, such as Darknet 53 in this case.
# This block helps in training very deep networks by allowing gradients to flow through 
# skip connections, which mitigate the vanishing gradient problem.

class ResBlock(nn.Module):
    # Initialize the Residual Block with the specified number of channels, whether to use skip connections,
    # and the number of times the block should be repeated.
    def __init__(self, channels, skip_connection=True, num_repeats=1):
        super().__init__()

        # Define the layers that make up the residual block.
        # The block consists of a sequence of convolutional layers, batch normalization, and Leaky ReLU activation.
        self.layers = nn.ModuleList()
        for _ in range(num_repeats):
            self.layers += [
                nn.Sequential(
                    nn.Conv2d(channels, channels // 2, kernel_size=1),  # 1x1 convolution to reduce dimensionality
                    nn.BatchNorm2d(channels // 2),
                    nn.LeakyReLU(0.1),
                    nn.Conv2d(channels // 2, channels, kernel_size=3, padding=1),  # 3x3 convolution to restore original dimensionality
                    nn.BatchNorm2d(channels),
                    nn.LeakyReLU(0.1)
                )
            ]

        # Store the number of repeats and whether to use skip connections as instance variables.
        self.num_repeats = num_repeats
        self.skip_connection = skip_connection

    # Define the forward pass, which processes the input through each layer of the residual block.
    def forward(self, x):
        # For each layer in the block, apply the layer to the input.
        # If skip connections are enabled, add the original input (residual) to the output of the layer.
        for layer in self.layers:
            if self.skip_connection:
                x = x + layer(x)  # Skip connection: add input to the output of the layer
            else:
                x = layer(x)  # No skip connection: simply pass the output of the layer
        return x  # Return the final output after processing through all layers

## YOLOv3 architecture

In [None]:
# Define a YOLOScaleHead class, which is a part of the YOLO model architecture.
# This block is responsible for predicting bounding boxes, objectness scores, and class probabilities 
# for objects at a specific scale in the image.

class YOLOScaleHead(nn.Module):
    # Initialize the YOLO Scale Head block with the given input channels and number of classes.
    def __init__(self, input_channels, number_classes):
        super().__init__()

        # Define the layers in the YOLO scale head block.
        # The block consists of two convolutional layers:
        # - The first layer increases the number of channels, followed by batch normalization and Leaky ReLU activation.
        # - The second layer reduces the number of channels to match the required output format.
        self.prediction = nn.Sequential(
            nn.Conv2d(input_channels, 2 * input_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(2 * input_channels),
            nn.LeakyReLU(0.1),
            nn.Conv2d(2 * input_channels, (number_classes + 5) * 3, kernel_size=1),
        )
        self.number_classes = number_classes

    # Implement the forward pass, which processes the input through the defined layers 
    # and adjusts the output to match the target format:
    # (batch_size, 3, grid_size, grid_size, number_classes + 5).
    # This format includes predictions for 3 anchor boxes, each with objectness scores, bounding box attributes,
    # and class probabilities.
    def forward(self, x):
        return (
            self.prediction(x)
            .view(x.size(0), 3, self.number_classes + 5, x.size(2), x.size(3))
            .permute(0, 1, 3, 4, 2)  # Rearrange the dimensions to match the desired output shape
        )

In [None]:
# Define the YOLOv3 model configuration
# C = Convolution Block, R = Residual Block, S = Scale Head Block, U = Upsampling Block

cfg = [
    ["C", 32, 3, 1, 1],
    ["C", 64, 3, 2, 1],
    ["R", 1],
    ["C", 128, 3, 2, 1],
    ["R", 2],
    ["C", 256, 3, 2, 1],
    ["R", 8],
    ["C", 512, 3, 2, 1],
    ["R", 8],
    ["C", 1024, 3, 2, 1],
    ["R", 4],
    ["C", 512, 1, 1, 0],
    ["C", 1024, 3, 1, 1],
    ["S"],
    ["C", 256, 1, 1, 0],
    ["U"],
    ["C", 256, 1, 1, 0],
    ["C", 512, 3, 1, 1],
    ["S"],
    ["C", 128, 1, 1, 0],
    ["U"],
    ["C", 128, 1, 1, 0],
    ["C", 256, 3, 1, 1],
    ["S"],
]

In [None]:
# Define the YOLOv3 class, which is the main architecture for the YOLOv3 object detection model.
# This class encapsulates the entire model, including the layers and the forward pass logic.

class YOLO_v3(nn.Module):
    # Initialize the YOLOv3 model with the specified input channels (default 3 for RGB images)
    # and the number of object classes (default 80, typical for the COCO dataset or 20 for PASCAL_VOC).
    def __init__(self, input_channels=3, number_classes=80):
        super().__init__()
        self.number_classes = number_classes
        self.input_channels = input_channels

        # Create the layers for the YOLOv3 model, defined by the 'create_layers' method.
        self.layers = self.create_layers()

    # Define the forward pass for YOLOv3, which handles routing connections, 
    # scale predictions, and passing the input through the network.
    def forward(self, x):
        outputs = []  # List to store the outputs for each scale (13x13, 26x26, 52x52).
        skip_connections = []  # List to store skip connections for later use in upsampling.

        for layer in self.layers:
            if isinstance(layer, YOLOScaleHead):
                # If the layer is a YOLOScaleHead block, compute the output and store it.
                outputs.append(layer(x))
                continue

            # Pass the input through the current layer.
            x = layer(x)

            if isinstance(layer, ResBlock) and layer.num_repeats == 8:
                # Store the output of the ResBlock with 8 repeats for later use in skip connections.
                skip_connections.append(x)

            elif isinstance(layer, nn.Upsample):
                # If the layer is an upsampling layer, concatenate it with the last skip connection.
                x = torch.cat([x, skip_connections[-1]], dim=1)
                skip_connections.pop()  # Remove the used skip connection.

        return outputs  # Return the outputs for each scale.

    # Define the method to create the layers of the YOLOv3 model based on the provided configuration.
    def create_layers(self):
        input_channels = self.input_channels  # Track the current number of channels through the layers.
        layers = nn.ModuleList()  # Create a list to hold all the layers of the model.

        for layer in cfg:

            if layer[0] == "C":
                # Add a ConvBlock layer based on the configuration.
                layers.append(ConvBlock(input_channels, layer[1], kernel_size=layer[2], stride=layer[3], padding=layer[4]))
                input_channels = layer[1]  # Update the input channels for the next layer.

            elif layer[0] == "R":
                # Add a Residual Block (ResBlock) based on the configuration.
                layers.append(ResBlock(input_channels, num_repeats=layer[1]))

            elif layer[0] == "S":
                # Add a Residual Block without skip connections, followed by a ConvBlock and YOLOScaleHead block.
                layers.append(ResBlock(input_channels, skip_connection=False, num_repeats=1))
                layers.append(ConvBlock(input_channels, input_channels // 2, kernel_size=1, stride=1, padding=0))
                layers.append(YOLOScaleHead(input_channels // 2, self.number_classes))
                input_channels = input_channels // 2  # Update the input channels for the next layer.

            elif layer[0] == "U":
                # Add an upsampling layer to double the spatial dimensions of the input.
                layers.append(nn.Upsample(scale_factor=2))
                input_channels = input_channels * 3  # Adjust the input channels after upsampling.

        return layers  # Return the list of layers that make up the YOLOv3 model.

# Step 5: Define the loss function
The YOLOv3 loss is divided into four key components. The **Coordinate Loss** measures the accuracy of predicted bounding box coordinates (x, y, width, height) using Mean Squared Error (MSE), focusing only on boxes that contain objects. The **Objectness Loss** also uses MSE to evaluate the model's confidence in the presence of objects within the predicted bounding boxes, comparing these predictions to Intersection over Union (IoU) scores. The **No Object Loss** applies Binary Cross Entropy (BCE) loss to penalize incorrect predictions of object presence in regions where no objects exist, helping to reduce false positives. Finally, the **Class Loss** uses Cross Entropy loss to assess how well the model predicts the correct class for detected objects, ensuring accurate classification. Together, these components guide the model's learning process to improve detection accuracy.

In [None]:
# Define the YOLOv3 loss class, which calculates the different components of the loss function 
# used to train the YOLOv3 model. The loss consists of coordinate loss, object confidence loss, 
# no object confidence loss, and class prediction loss.

class Yolov3Loss(nn.Module):
    def __init__(self):
        super().__init__()
        # Define loss functions for different components
        self.mse = nn.MSELoss()  # Mean Squared Error for coordinates and object confidence
        self.bce = nn.BCEWithLogitsLoss()  # Binary Cross Entropy with logits for no object confidence
        self.cross_entropy = nn.CrossEntropyLoss()  # Cross Entropy loss for class predictions
        self.sigmoid_function = nn.Sigmoid()  # Sigmoid function used for converting logits to probabilities

        # Define the weights for each loss component
        self.loss_class = 1  # Weight for class prediction loss
        self.loss_noobj = 1  # Weight for no object confidence loss
        self.loss_coord = 1  # Weight for coordinate loss
        self.loss_obj = 1  # Weight for object confidence loss

    def forward(self, predictions, anchors, output):
        # Retrieve the mask indicating where objects are present and where they are not
        obj = torch.eq(output[..., 0], 1)
        noobj = torch.eq(output[..., 0], 0)

        # Reshape the anchors to match the shape of the predictions
        anchors = anchors.unsqueeze(0).unsqueeze(2).unsqueeze(3)

        # ----------------------------- Coordinates loss ----------------------------- #

        # Apply sigmoid to x and y coordinates of predictions to constrain them between 0 and 1
        pred_coords_xy = self.sigmoid_function(predictions[..., 1:3])

        # Scale the width and height of the ground truth by the anchors and take the log
        scaled_wh = torch.log(
            torch.clamp(output[..., 3:5] / anchors, min=1e-16)  # Clamp to avoid log(0)
        )

        # Update the predictions tensor with the transformed coordinates
        updated_predictions = torch.cat((pred_coords_xy, predictions[..., 3:5]), dim=-1)

        # Update the output tensor with the scaled width and height
        updated_output = torch.cat((output[..., 1:3], scaled_wh), dim=-1)
        
        # Calculate the MSE loss for the bounding box coordinates
        coordinates_loss = self.mse(updated_predictions[obj], updated_output[obj])

        # ----------------------------- Object loss ----------------------------- #

        # Calculate the bounding box predictions for x, y, width, and height
        box_xy = self.sigmoid_function(predictions[..., 1:3])  # Sigmoid applied to x, y
        box_wh = torch.exp(predictions[..., 3:5]) * anchors   # Exponential applied to width and height, then scaled by anchors

        # Concatenate the predictions along the last dimension to form complete bounding boxes
        box_predictions = torch.cat((box_xy, box_wh), dim=-1)

        # Compute IoU (Intersection over Union) scores between the predicted and ground-truth boxes
        iou_values = compute_iou_boxes(box_predictions[obj], output[..., 1:5][obj]).detach()

        # Calculate the object confidence loss using MSE between predicted confidence and IoU values
        pred_confidence = self.sigmoid_function(predictions[..., 0:1][obj])
        actual_confidence = iou_values * output[..., 0:1][obj]

        loss_obj = self.mse(pred_confidence, actual_confidence)

        # ----------------------------- No object loss ----------------------------- #

        # Reshape noobj mask to match the shape of the predictions' objectness scores
        noobj_expanded = noobj.unsqueeze(-1)  # This makes the shape (batch_size, num_grids, num_grids, 1)

        # Apply the mask to both predictions and output using torch.masked_select to select no-object regions
        pred_noobj = torch.masked_select(predictions[..., 0:1], noobj_expanded)
        output_noobj = torch.masked_select(output[..., 0:1], noobj_expanded)

        # Calculate the BCE loss for regions where no objects are present
        no_object_loss = self.bce(pred_noobj, output_noobj)

        # ----------------------------- Class loss ----------------------------- #

        # Select the class predictions and corresponding ground truth classes where objects are present
        predicted_classes = predictions[..., 5:][obj]
        ground_truth_classes = output[..., 5][obj].long()

        # Calculate the cross entropy loss for class predictions
        class_loss = self.cross_entropy(predicted_classes, ground_truth_classes)

        # Return the total loss as a weighted sum of all components
        return (
            self.loss_coord * coordinates_loss  # Weighted coordinate loss
            + self.loss_obj * loss_obj          # Weighted object confidence loss
            + self.loss_noobj * no_object_loss  # Weighted no object confidence loss
            + self.loss_class * class_loss      # Weighted class prediction loss
        )

# Step 6: Define the training function
The training function consists of processing each batch of data through the model, calculating the loss for multiple scales and anchors, and updating the model's weights using backpropagation. It utilizes mixed precision for efficiency, tracks progress with a real-time progress bar, and manages GPU memory by clearing the CUDA cache after each batch.

In [None]:
# Define the training function that handles the training loop for the YOLOv3 model.
# This function processes batches of data, calculates the loss, and updates the model weights.
# It also utilizes a progress bar to visually track training progress and loss metrics.

def train_func(train_loader, loss_func, model, optimizer, scaler, scaled_anchors):
    total_batches = len(train_loader)  # Get the total number of batches in the training loader
    console = Console()  # Initialize a console object for rich text display

    # Set up a progress bar with various metrics and visual indicators
    with Progress(
        TextColumn("[bold blue]Training..."),
        BarColumn(bar_width=None),
        "[progress.percentage]{task.percentage:>3.0f}%",
        " | {task.completed}/{task.total}",
        TimeElapsedColumn(),
        TimeRemainingColumn(),
        " | Current Loss: {task.fields[current_loss]}",
        " | Mean Loss: {task.fields[mean_loss]}",
        console=console,
    ) as progress:
        # Create a progress task to monitor the training progress
        task = progress.add_task("Training", total=total_batches, current_loss="N/A", mean_loss="N/A")
        loss_list = []  # List to store the loss values for calculating the mean loss

        # Loop over each batch in the training data
        for i_batch, batch in enumerate(train_loader):
            img = batch['image']  # Get the images from the batch
            outputs = batch['outputs']  # Get the ground truth outputs

            img = img.to(device)  # Move the images to the GPU
            output0, output1, output2 = [output.to(device) for output in outputs]  # Move the outputs to the GPU

            # Forward pass through the model with automatic mixed precision for efficiency
            with torch.cuda.amp.autocast():
                model_outputs = model(img)  # Get the model's predictions
                total_loss = 0
                # Calculate the total loss across the different scales and anchors
                for output, target, anchor in zip(model_outputs, [output0, output1, output2], scaled_anchors):
                    total_loss += loss_func(output, anchor, target)

            # Accumulate the loss values for later analysis
            loss_list.append(total_loss.item())

            # Backward pass to compute gradients and update model weights
            optimizer.zero_grad()  # Clear the gradients
            total_loss.backward()  # Compute gradients
            optimizer.step()  # Update model parameters

            # Update the progress bar with the latest loss metrics
            mean_loss = sum(loss_list) / len(loss_list)  # Calculate the mean loss so far
            progress.update(task, advance=1, current_loss=f"{loss_list[-1]:.4f}", mean_loss=f"{mean_loss:.4f}")

            # Clear the CUDA memory cache to optimize GPU memory usage
            torch.cuda.empty_cache()

# Step 7 : Train the model

In [None]:
class_labels = load_classes(names_path)
# create the YOLOv3 model
gc.collect()

# Empty the CUDA cache
torch.cuda.empty_cache()
#print_memory_usage()
model = YOLO_v3(number_classes=len(class_labels)).to(device)

# Define the optimizer
optimizer = optim.Adam(model.parameters(), lr = learning_rate, weight_decay=weight_decay)

# Define the scaler for automatic mixed precision
scaler = torch.cuda.amp.GradScaler()

# Define the loss function
loss_func = Yolov3Loss()

# Define the train loader and test loader
train_loader ,_ , test_loader = get_loaders(train_csv_path, test_csv_path, val_csv_path=None)

# Check if there's a snapshot to load
start_step = 0
if resume_training:  # 'resume_training' is a boolean flag indicating whether to resume training
    start_step, _ = load_snapshot(snapshot_file, model, optimizer, scheduler=None, device=device)
    print(f"Resuming training from step {start_step+1}")

for step in range(start_step + 1, num_steps):
    print(f"Step n°: {step + 1}\n----------------------------")
    train_func(train_loader, loss_func, model, optimizer, scaler, scaled_anchors)

    # Saving the model
    if save_model:
      save_snapshot(model, optimizer, filename=f"{snapshot_file}", step=step)
    
    # Evaluate the model and compute mAP every 3 steps
    if step != 0 and step % 3 == 0:
        
        # Step 0 : Compute validation loss
        val_loss = validate_func(test_loader, loss_func, model, scaled_anchors)
        print(f"Validation Loss step {step + 1}: {val_loss:.4f}")
        
        # Step 1: Evaluate model accuracy on the validation dataset
        evaluate_model_accuracy(model, test_loader, conf_threshold, iou_threshold)

        # Step 2: Gather predicted and ground truth bounding boxes for evaluation
        #predicted_bboxes, ground_truth_bboxes = gather_evaluation_bboxes(
        #    test_loader, model, iou_threshold, conf_threshold, anchors, device=device
        #)

        # Step 3: Calculate the mean Average Precision (mAP) for the current step using the predicted and ground truth boxes
        #map_eval_50 = map_evaluation(predicted_bboxes, ground_truth_bboxes, len(class_labels), iou_threshold = 0.5)
        #map_eval_75 = map_evaluation(predicted_bboxes, ground_truth_bboxes, len(class_labels), iou_threshold = 0.75)

        # Step 5: Print the computed mAP values
        #print(f"MAP_50: {map_eval_50.detach()}")  # Detach the tensor from the computation graph before printing
        
        #print(f"MAP_75: {map_eval_75.detach()}")  # Detach the tensor from the computation graph before printing

        # Step 6: Return the model to training mode after evaluation
        model.train()


# Step 8 : Inference

In [None]:
# Load the class labels
labels_list = load_classes(names_path)

# Initialize the YOLOv3 model with the number of classes
yolo_model = YOLO_v3(number_classes=len(labels_list)).to(device)

# Set up the optimizer
optimizer = optim.Adam(yolo_model.parameters(), lr=learning_rate, weight_decay=weight_decay)

# Set up the scaler for mixed precision training
amp_scaler = torch.cuda.amp.GradScaler()

# Initialize the loss function
yolo_loss_function = Yolov3Loss()

# Flag to determine whether to load a pre-trained model
resume_training = True

# Load a checkpoint if resuming from a previous state
if resume_training:
    load_snapshot(snapshot_file, yolo_model, optimizer, scheduler=None, lr=learning_rate, device=device)

# Create a dataset and data loader for testing
validation_dataset = YOLODataset(
    root=root_dir, 
    csv_file=test_csv_path, 
    img_size=image_size, 
    anchors=normalized_anchors, 
    train=False
)

validation_loader = DataLoader(
    dataset=validation_dataset,
    batch_size=1,  # Batch size for testing
    num_workers=num_workers,  # Number of data loading workers
    pin_memory=pin_memory,  # Use pinned memory for faster GPU transfer
    shuffle=True,  # Shuffle the data (although typically, we might not shuffle for validation/testing)
    drop_last=False  # Include all batches, even if the last one is incomplete
)

# Process a sample batch from the validation data loader
for idx, batch in enumerate(validation_loader):
    if idx == 1:
        break

    input_images = batch['image'].to(device)
    
    yolo_model.eval()  # Set model to evaluation mode
    with torch.no_grad():
        # Generate predictions from the model
        predictions = yolo_model(input_images)

        # Initialize list to collect bounding boxes for each image
        predicted_boxes = [[] for _ in range(input_images.shape[0])]

        # Extract bounding boxes for each prediction scale
        for scale_idx in range(3):
            batch_size, num_anchors, grid_size, _, _ = predictions[scale_idx].shape
            anchors_at_scale = scaled_anchors[scale_idx]
            boxes_at_scale = pred_to_bboxes(predictions[scale_idx], anchors_at_scale, grid_dim=grid_size)

            for img_idx, box in enumerate(boxes_at_scale):
                predicted_boxes[img_idx] += box
        
    yolo_model.train()  # Switch back to training mode
    
    print("Expected result ====>")
    display_image_with_boxes(batch, labels_list)

    # Apply non-max suppression and plot the results for each image in the batch
    for img_idx in range(batch_size):
        nms_filtered_boxes = apply_nms(predicted_boxes[img_idx], iou_thresh=0.5, conf_thresh=0.7)
        
        # Convert and display the image with bounding boxes
        batch['bboxes'] = transform_boxes(torch.tensor(nms_filtered_boxes, dtype=torch.float32))
        print("Our result ===>")
        display_image_with_boxes(batch, labels_list)
