# YOLOv1

Adapted from https://www.kaggle.com/code/vexxingbanana/yolov1-from-scratch-pytorch/notebook

In [None]:
import torch
import torch.nn as nn
import os
import torchvision.transforms as transforms
import torch.optim as optim
from torch.utils.data import DataLoader
from tqdm import tqdm
import cv2
import time
from torchinfo import summary

from utils import *

In [None]:
train_dir = './fruit/train_zip/train'
test_dir = './fruit/test_zip/test'

seed = 123
torch.manual_seed(seed)

LEARNING_RATE = 2e-5
DEVICE = "mps"
BATCH_SIZE = 16 # 64 in the original paper but can use too much resources
DROPOUT = 0.0 # Original paper uses 0.5
LAST_HIDDEN_SIZE = 496 # Original paper uses 4096
WEIGHT_DECAY = 0
EPOCHS = 10
NUM_WORKERS = 0
LOAD_MODEL = False
MODEL_FILE = "model.pth"

class_names = ['apple', 'banana', 'orange']
class_colors = [(255,0,0), (0,255,0), (0,0,255)]

In [None]:
architecture_config = [
    #Tuple: (kernel_size, number of filters, strides, padding)
    (7, 64, 2, 3),
    #"M" = Max Pool Layer
    "M",
    (3, 192, 1, 1),
    "M",
    (1, 128, 1, 0),
    (3, 256, 1, 1),
    (1, 256, 1, 0),
    (3, 512, 1, 1),
    "M",
    #List: [(tuple), (tuple), how many times to repeat]
    [(1, 256, 1, 0), (3, 512, 1, 1), 4],
    (1, 512, 1, 0),
    (3, 1024, 1, 1),
    "M",
    [(1, 512, 1, 0), (3, 1024, 1, 1), 2],
    (3, 1024, 1, 1),
    (3, 1024, 2, 1),
    (3, 1024, 1, 1),
    (3, 1024, 1, 1),
    #Doesnt include fc layers
]


class YoloV1(nn.Module):
    def __init__(self, in_channels=3, **kwargs):
        super(YoloV1, self).__init__()
        self.architecture = architecture_config
        self.in_channels = in_channels
        self.darknet = self._create_conv_layers(self.architecture)
        self.fcs = self._create_fcs(**kwargs)
        
    def forward(self, x):
        x = self.darknet(x)
        return self.fcs(torch.flatten(x, start_dim=1))
    
    def _create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels
        
        for x in architecture:
            if type(x) == tuple:
                layers += [CNNBlock(in_channels, x[1], kernel_size=x[0], stride=x[2], padding=x[3])]
                in_channels = x[1]
            elif type(x) == str:
                layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
            elif type(x) == list:
                conv1 = x[0] #Tuple
                conv2 = x[1] #Tuple
                repeats = x[2] #Int
                
                for _ in range(repeats):
                    layers += [CNNBlock(in_channels, conv1[1], kernel_size=conv1[0], stride=conv1[2], padding=conv1[3])]
                    layers += [CNNBlock(conv1[1], conv2[1], kernel_size=conv2[0], stride=conv2[2], padding=conv2[3])]
                    in_channels = conv2[1]
                    
        return nn.Sequential(*layers)
    
    def _create_fcs(self, split_size, num_boxes, num_classes):
        S, B, C = split_size, num_boxes, num_classes
        return nn.Sequential(nn.Flatten(), nn.Linear(1024 * S * S, LAST_HIDDEN_SIZE), nn.Dropout(DROPOUT), nn.LeakyReLU(0.1), nn.Linear(LAST_HIDDEN_SIZE, S * S * (C + B * 5)))
    
# Instantiate the model and check the parameters
model = YoloV1(split_size=7, num_boxes=2, num_classes=3)
summary(model)

In [None]:

def train_fn(train_loader, model, optimizer, loss_fn):
    loop = tqdm(train_loader, leave=True)
    mean_loss = []
    
    for batch_idx, (images, targets) in enumerate(loop):
        images, targets = images.to(DEVICE), targets.to(DEVICE)
        out = model(images)
        loss = loss_fn(out, targets)
        mean_loss.append(loss.item())
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        loop.set_postfix(loss = loss.item())
        
    print(f"Mean loss was {sum(mean_loss) / len(mean_loss)}")


In [None]:
class YoloLoss(nn.Module):
    """
    Calculate the loss for yolo (v1) model
    """

    def __init__(self, S=7, C=3):
        super(YoloLoss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")

        """
        S is split size of image (in paper 7),
        B is number of boxes (in paper 2),
        C is number of classes (in paper 20, in dataset 3),
        """
        self.S = S
        self.B = 2
        self.C = C

        # These are from Yolo paper, signifying how much we should
        # pay loss for no object (noobj) and the box coordinates (coord)
        self.lambda_noobj = 0.5
        self.lambda_coord = 5

    def forward(self, predictions, target):
        # predictions are shaped (BATCH_SIZE, S*S(C+B*5) when inputted
        predictions = predictions.reshape(-1, self.S, self.S, self.C + self.B * 5)

        #Output structure: [class scores, box1, box2]
        #Box structure: [confidence, x, y, w, h]

        # Calculate IoU for the two predicted bounding boxes with target bbox
        iou_b1 = intersection_over_union(predictions[..., self.C + 1:self.C + 5], target[..., self.C + 1:self.C + 5])
        iou_b2 = intersection_over_union(predictions[..., self.C + 6:self.C + 10], target[..., self.C + 1:self.C + 5])
        ious = torch.cat([iou_b1.unsqueeze(0), iou_b2.unsqueeze(0)], dim=0)

        # Take the box with highest IoU out of the two prediction
        # Note that bestbox will be indices of 0, 1 for which bbox was best
        iou_maxes, bestbox = torch.max(ious, dim=0)
        i1obj_i = target[..., self.C].unsqueeze(3)  # in paper this is Iobj_i

        # ======================== #
        #       BOX COORDINATES    #
        # ======================== #

        # Set boxes with no object in them to 0. We only take out one of the two 
        # predictions, which is the one with highest Iou calculated previously.
        box_predictions = i1obj_i * (
            (
                bestbox * predictions[..., self.C + 6:self.C + 10]
                + (1 - bestbox) * predictions[..., self.C + 1:self.C + 5]
            )
        )

        box_targets = i1obj_i * target[..., self.C + 1:self.C + 5]

        # Take sqrt of width, height of boxes
        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(
            torch.abs(box_predictions[..., 2:4] + 1e-6)
        )
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])

        box_loss = self.mse(
            torch.flatten(box_predictions, end_dim=-2),
            torch.flatten(box_targets, end_dim=-2),
        )

        # ==================== #
        #       OBJECT LOSS    #
        # ==================== #

        # pred_box is the confidence score for the bbox with highest IoU
        pred_box = (
            bestbox * predictions[..., self.C + 5:self.C + 6] + (1 - bestbox) * predictions[..., self.C:self.C + 1]
        )

        object_loss = self.mse(
            torch.flatten(i1obj_i * pred_box),
            torch.flatten(i1obj_i * iou_maxes),
        )

        # ======================= #
        #       NO OBJECT LOSS    #
        # ======================= #


        no_object_loss = self.mse(
            torch.flatten((1 - i1obj_i) * predictions[..., self.C:self.C + 1], start_dim=1),
            torch.flatten((1 - i1obj_i) * target[..., self.C:self.C + 1], start_dim=1),
        )

        no_object_loss += self.mse(
            torch.flatten((1 - i1obj_i) * predictions[..., self.C + 5:self.C + 6], start_dim=1),
            torch.flatten((1 - i1obj_i) * torch.zeros_like(target[..., self.C:self.C + 1]), start_dim=1)
        )

        # ================== #
        #       CLASS LOSS   #
        # ================== #

        class_loss = self.mse(
            torch.flatten(i1obj_i * predictions[..., :self.C], end_dim=-2,),
            torch.flatten(i1obj_i * target[..., :self.C], end_dim=-2,),
        )

        loss = (
            self.lambda_coord * box_loss  # first two rows in paper
            + object_loss  # third row in paper
            + self.lambda_noobj * no_object_loss  # fourth row
            + class_loss  # fifth row
        )

        return loss


In [None]:
transform = Compose([transforms.Resize((448, 448)), transforms.ToTensor()])

def main(model):
    model = model.to(DEVICE)
    optimizer = optim.Adam(
        model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY
    )
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer, factor=0.1, patience=3, mode='max', verbose=True)
    loss_fn = YoloLoss()

    if LOAD_MODEL:
        load_checkpoint(torch.load(MODEL_FILE), model, optimizer)

    summary(model)

    train_dataset = FruitImagesDataset(
        transform=transform,
        files_dir=train_dir
    )

    test_dataset = FruitImagesDataset(
        transform=transform,
        files_dir=test_dir
    )

    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        drop_last=False,
        num_workers=NUM_WORKERS
    )


    train_loader_unshuffled = DataLoader(
        dataset=train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=False,
        drop_last=False,
        num_workers=NUM_WORKERS
    )

    test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=BATCH_SIZE,
        shuffle=False,
        drop_last=False,
        num_workers=NUM_WORKERS
    )

    for epoch in range(EPOCHS):
        print(f"Epoch {epoch+1}/{EPOCHS}")

        train_fn(train_loader, model, optimizer, loss_fn)
        
        pred_boxes, target_boxes, _ = get_bboxes(
            train_loader, model, iou_threshold=0.5, threshold=0.4
        )

        mean_avg_prec = mean_average_precision(
            pred_boxes, target_boxes, iou_threshold=0.5, box_format="midpoint"
        )
        print(f"Train mAP: {mean_avg_prec}")


        model.eval()
        with torch.no_grad():
            pred_boxes, target_boxes, images = get_bboxes(
                test_loader, model, iou_threshold=0.5, threshold=0.4
            )
            mean_avg_prec = mean_average_precision(
                pred_boxes, target_boxes, iou_threshold=0.5, box_format="midpoint"
            )
            print(f"Test mAP: {mean_avg_prec}")
        model.train()

        
        scheduler.step(mean_avg_prec)

        checkpoint = {
            "state_dict": model.state_dict(),
            "optimizer": optimizer.state_dict(),
        }
        save_checkpoint(checkpoint, filename=MODEL_FILE)


    model.eval()

    with torch.no_grad():
        for _ in range(1):
            start = time.time()
            pred_boxes, target_boxes, images = get_bboxes(
                train_loader_unshuffled, model, iou_threshold=0.5, threshold=0.4
            )
            end = time.time()

            # Visualize
            for image_i in range(len(images)):
                # Find all boxes related to image_i
                pred_boxes_image_i = [box for box in pred_boxes if box[0] == image_i]
                target_boxes_image_i = [box for box in target_boxes if box[0] == image_i]
                image_with_boxes = visualize_boxes(images[image_i].cpu().numpy().transpose(1,2,0)[:, :, ::-1], pred_boxes_image_i, class_names, class_colors)
                # Export the image
                os.makedirs("./train_pred", exist_ok=True)
                cv2.imwrite(f"./train_pred/{image_i}_pred.jpg", image_with_boxes*255)

            mean_avg_prec = mean_average_precision(
                pred_boxes, target_boxes, iou_threshold=0.5, box_format="midpoint"
            )
            print(f"Train mAP: {mean_avg_prec}; Time: {end - start} for {len(images)} images")

        for _ in range(1):
            start = time.time()
            pred_boxes, target_boxes, images = get_bboxes(
                test_loader, model, iou_threshold=0.5, threshold=0.4
            )
            end = time.time()

            # Visualize
            for image_i in range(len(images)):
                # Find all boxes related to image_i
                pred_boxes_image_i = [box for box in pred_boxes if box[0] == image_i]
                target_boxes_image_i = [box for box in target_boxes if box[0] == image_i]
                image_with_boxes = visualize_boxes(images[image_i].cpu().numpy().transpose(1,2,0)[:, :, ::-1], pred_boxes_image_i, class_names, class_colors)
                # Export the image
                os.makedirs("./test_pred", exist_ok=True)
                cv2.imwrite(f"./test_pred/{image_i}_pred.jpg", image_with_boxes*255)

            mean_avg_prec = mean_average_precision(
                pred_boxes, target_boxes, iou_threshold=0.5, box_format="midpoint"
            )
            print(f"Test mAP: {mean_avg_prec}; Time: {end - start} for {len(images)} images")

main(model)

In [None]:
def predictions():

    model = YoloV1(split_size=7, num_boxes=2, num_classes=3).to(DEVICE)
    optimizer = optim.Adam(
        model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY
    )
    loss_fn = YoloLoss()

    load_checkpoint(torch.load(MODEL_FILE), model, optimizer)

    test_dataset = FruitImagesDataset(
        transform=transform, 
        files_dir=test_dir
    )

    test_loader = DataLoader(
        dataset=test_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        drop_last=False,
    )
        
    for epoch in range(1):
        model.eval()
        with torch.no_grad():
            pred_boxes, target_boxes, _ = get_bboxes(
                test_loader, model, iou_threshold=0.5, threshold=0.4
            )

            mean_avg_prec = mean_average_precision(
                pred_boxes, target_boxes, iou_threshold=0.5, box_format="midpoint"
            )
            print(f"Test mAP: {mean_avg_prec}")

predictions()