In [1]:
"""
YOLOv1 (You Only Look Once) implementation from scratch in PyTorch
=================================================================
This script defines:
    • CNN backbone + detection head exactly as described in the 2016 paper.
    • Custom loss function that combines localization, confidence, and class
      prediction losses.
    • Minimal training loop skeleton that you can adapt to your own dataset.

Author  : ChatGPT (OpenAI o3)
Created : 2025‑06‑30
License : MIT

Usage
-----
$ python yolov1_scratch.py  # runs a dummy forward & loss check on random data

To train on a real dataset:
    1. Prepare an annotation file in PASCAL‑VOC format or convert it
       to YOLO‑style (``S=7, B=2``).
    2. Implement a ``YOLODataset`` that returns: img_tensor, target_tensor where
       ``target_tensor`` has shape ``(S, S, 30)``. (See ``dummy_targets`` below.)
    3. Replace the ``RandomDataset`` with your dataset in the DataLoader.
    4. Adjust hyper‑parameters in ``train()`` as needed.

Notes
-----
• This code is educational – it aims at clarity over raw speed.
• No funky helper libraries: only PyTorch + torchvision (for transforms).
• Tested with PyTorch 2.3.0 and Python 3.10.
"""
from __future__ import annotations

import math
from pathlib import Path
from typing import List, Tuple

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms as T

# ---------------------------
# 1. Building Blocks
# ---------------------------

class CNNBlock(nn.Module):
    """(Conv → BatchNorm → LeakyReLU) block used throughout the Darknet backbone."""

    def __init__(self, in_c: int, out_c: int, **conv_kwargs):
        super().__init__()
        self.conv = nn.Conv2d(in_c, out_c, bias=False, **conv_kwargs)
        self.batchnorm = nn.BatchNorm2d(out_c)
        self.act = nn.LeakyReLU(0.1, inplace=True)

    def forward(self, x: torch.Tensor) -> torch.Tensor:  # noqa: D401
        return self.act(self.batchnorm(self.conv(x)))


# ---------------------------
# 2. YOLOv1 Backbone + Head
# ---------------------------

S = 7   # grid size
B = 2   # bounding boxes / grid cell
C = 20  # number of classes (VOC‑20)

# Darknet‑style architecture configuration.
# Tuples : (kernel_size, filters, stride, padding)
# "M"    : MaxPool 2×2 stride 2
ARCH_CONFIG: List[Tuple | str] = [
    (7, 64, 2, 3),
    "M",
    (3, 192, 1, 1),
    "M",
    (1, 128, 1, 0),
    (3, 256, 1, 1),
    (1, 256, 1, 0),
    (3, 512, 1, 1),
    "M",
    *[(1, 256, 1, 0), (3, 512, 1, 1)] * 4,
    (3, 1024, 1, 1),
    "M",
    *[(1, 512, 1, 0), (3, 1024, 1, 1)] * 2,
    (3, 1024, 1, 1),
    (3, 1024, 2, 1),
    (3, 1024, 1, 1),
    (3, 1024, 1, 1),
]


class YOLOv1(nn.Module):
    """Complete YOLOv1 end‑to‑end model."""

    def __init__(self, split_size: int = S, num_boxes: int = B, num_classes: int = C):
        super().__init__()
        self.S, self.B, self.C = split_size, num_boxes, num_classes

        self.backbone = self._create_conv_layers(ARCH_CONFIG)

        # Fully‑connected detection head (flatten → FC4096 → FC(output))
        # Final output dim = S×S×(B×5 + C)
        self.fcs = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024 * self.S * self.S, 4096),
            nn.Dropout(0.5),
            nn.LeakyReLU(0.1),
            nn.Linear(4096, self.S * self.S * (self.C + self.B * 5)),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:  # noqa: D401
        x = self.backbone(x)
        x = self.fcs(x)
        return x.view(-1, self.S, self.S, self.C + self.B * 5)

    @staticmethod
    def _create_conv_layers(config: List[Tuple | str]) -> nn.Sequential:  # noqa: D401
        layers: List[nn.Module] = []
        in_channels = 3
        for layer in config:
            if isinstance(layer, tuple):
                k, filters, stride, pad = layer
                layers.append(
                    CNNBlock(
                        in_c=in_channels,
                        out_c=filters,
                        kernel_size=k,
                        stride=stride,
                        padding=pad,
                    )
                )
                in_channels = filters
            elif layer == "M":
                layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
            else:
                raise ValueError(f"Unexpected layer type: {layer}")
        return nn.Sequential(*layers)


# ---------------------------
# 3. Loss function
# ---------------------------

class YOLOLoss(nn.Module):
    """Standard YOLOv1 loss with λ_coord = 5, λ_noobj = 0.5."""

    def __init__(self, S: int = S, B: int = B, C: int = C, lambda_coord: float = 5.0, lambda_noobj: float = 0.5):
        super().__init__()
        self.mse = nn.MSELoss(reduction="sum")
        self.S, self.B, self.C = S, B, C
        self.lambda_coord = lambda_coord
        self.lambda_noobj = lambda_noobj

    def forward(self, preds: torch.Tensor, targets: torch.Tensor) -> torch.Tensor:  # noqa: D401
        # preds / targets shape : (N, S, S, 30)
        N = preds.shape[0]

        # Split predictions
        pred_boxes = preds[..., : self.B * 5].view(N, self.S, self.S, self.B, 5)
        pred_classes = preds[..., self.B * 5 :]

        # Split targets
        target_boxes = targets[..., : self.B * 5].view(N, self.S, self.S, self.B, 5)
        target_classes = targets[..., self.B * 5 :]

        # Identity mask: where object exists in target (assume first bbox has obj conf)
        obj_mask = targets[..., 4].unsqueeze(-1)  # shape (N, S, S, 1)

        # Localization (x,y,w,h) – only where objects exist, only responsible bbox (idx0)
        # YOLOv1 picks first bbox as responsible if we’re using simplified targets
        pred_xywh = pred_boxes[..., 0:4]
        target_xywh = target_boxes[..., 0:4]

        coord_loss = self.mse(obj_mask * pred_xywh, obj_mask * target_xywh)

        # Confidence loss (object + no‑object)
        pred_conf = pred_boxes[..., 4]
        target_conf = target_boxes[..., 4]

        conf_obj = self.mse(obj_mask.squeeze(-1) * pred_conf, obj_mask.squeeze(-1) * target_conf)
        conf_noobj = self.mse((1 - obj_mask.squeeze(-1)) * pred_conf, (1 - obj_mask.squeeze(-1)) * target_conf)

        # Classification loss (only where object exists)
        class_loss = self.mse(obj_mask * pred_classes, obj_mask * target_classes)

        total = (
            self.lambda_coord * coord_loss
            + conf_obj
            + self.lambda_noobj * conf_noobj
            + class_loss
        ) / N  # normalize by batch
        return total


# ---------------------------
# 4. Dummy Dataset (replace with your own)
# ---------------------------

class RandomDataset(Dataset):
    """Returns random images & dummy targets – just for sanity check."""

    def __init__(self, size: int = 256):
        self.size = size
        self.transform = T.Compose([
            T.Resize((448, 448)),
            T.ToTensor(),
        ])

    def __len__(self) -> int:  # noqa: D401
        return self.size

    def __getitem__(self, idx: int):  # noqa: D401
        img = torch.rand(3, 448, 448)  # Random RGB image
        target = torch.zeros(S, S, C + B * 5)
        # Put a dummy object in cell (3,4)
        target[3, 4, 0:5] = torch.tensor([0.5, 0.5, 0.2, 0.3, 1.0])  # bbox1
        target[3, 4, B * 5 + 7] = 1.0  # set class‑7 (e.g., car) to 1
        return img, target


# ---------------------------
# 5. Training loop skeleton
# ---------------------------

def train():  # noqa: D401
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = YOLOv1().to(device)
    criterion = YOLOLoss().to(device)
    optimizer = optim.Adam(model.parameters(), lr=1e-4)

    loader = DataLoader(RandomDataset(64), batch_size=8, shuffle=True, num_workers=0)

    model.train()
    for epoch in range(3):  # quick sanity run
        running_loss = 0.0
        for imgs, targets in loader:
            imgs, targets = imgs.to(device), targets.to(device)
            preds = model(imgs)
            loss = criterion(preds, targets)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        print(f"Epoch {epoch+1} | Loss: {running_loss/len(loader):.4f}")


if __name__ == "__main__":
    # Run a tiny sanity check to ensure forward/loss compile.
    train()


ModuleNotFoundError: No module named 'torch'