# Person Detection Training

In this notebook, we are going to use transfer learning to update a pre-trained [FastestDet](https://github.com/dog-qiuqiu/FastestDet) model to identify people in an image. We will retrain the model using a preexisting [dataset from Kaggle](https://www.kaggle.com/datasets/adilshamim8/people-detection/).

In Google Colab, select **File > Open notebook** then select the **Upload** tab. Select this file to open it in Colab.

Press **shift + enter** to execute each cell in order. Make sure you stop and read each text section, as there are some manual steps you will need to perform (e.g. upload dataset).

In [None]:
# Install specific versions of the packages
!python3 -m pip install \
    opencv-python=='4.13.0.90' \
    matplotlib=='3.10.0' \
    numpy=='2.0.2' \
    onnxscript=='0.5.7' \
    pandas=='2.2.2' \
    Pillow=='11.3.0' \
    torch=='2.9.0'

In [None]:
# Import standard libraries
import os
from pathlib import Path
import random
import shutil
import sys
import types
import urllib.request
import zipfile

# Import third-party libraries
import cv2
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
import pandas as pd
from PIL import Image
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

In [None]:
# Print out the versions of the libraries
print(f"OpenCV version: {cv2.__version__}")
print(f"Matplotlib version: {plt.matplotlib.__version__}")
print(f"NumPy version: {np.__version__}")
print(f"Pandas version: {pd.__version__}")
print(f"Pillow version: {Image.__version__}")
print(f"PyTorch version: {torch.__version__}")

In [None]:
# General settings
SEED = 42
DATASET_URL = "https://www.kaggle.com/api/v1/datasets/download/adilshamim8/people-detection"
DATASET_ZIP_PATH = Path("/content/dataset.zip")
DATASET_PATH = Path("/content/dataset")
CONFIG_PATH = Path("/content/yolo_config")

# Pre-trained model settings
REPO_URL = "https://github.com/dog-qiuqiu/Yolo-Fastest/archive/refs/tags/v.1.1.0.zip"
REPO_ZIP_PATH = Path("/content/Yolo-Fastest.zip")
REPO_PATH = Path("/content/Yolo-Fastest")
WEIGHTS_PATH = REPO_PATH
MODEL_CFG_PATH = REPO_PATH / "ModelZoo/yolo-fastest-1.1_coco/yolo-fastest-1.1.cfg"
MODEL_WEIGHTS_PATH = REPO_PATH / "ModelZoo/yolo-fastest-1.1_coco/yolo-fastest-1.1.weights"
MODIFIED_CFG_PATH = Path("/content/yolo-fastest-1.1_192_no_dropout.cfg")

# Darknet to PyTorch converter script
CONVERTER_SCRIPT_URL = "https://raw.githubusercontent.com/ShawnHymel/intro-to-edge-ai-and-cv-with-renesas/main/scripts/yolo_fastest_to_pytorch.py"
CONVERTER_SCRIPT_PATH = Path("/content/yolo_fastest_to_pytorch.py")

# Utilities script
UTILS_SCRIPT_URL = "https://raw.githubusercontent.com/ShawnHymel/intro-to-edge-ai-and-cv-with-renesas/main/scripts/yolo_fastest_utils.py"
UTILS_SCRIPT_PATH = Path("/content/yolo_fastest_utils.py")

# Loss calculation script
LOSS_SCRIPT_URL = "https://raw.githubusercontent.com/ShawnHymel/intro-to-edge-ai-and-cv-with-renesas/main/scripts/yolo_fastest_loss.py"
LOSS_SCRIPT_PATH = Path("/content/yolo_fastest_loss.py")

# Image preprocessing settings
IMG_WIDTH = 192
IMG_HEIGHT = 192

# Model settings
NUM_ANCHORS = 3
NUM_CLASSES = 1
NEW_FILTERS = NUM_ANCHORS * (5 + NUM_CLASSES)  # 3 * 6 = 18

# Model update settings
NUM_CLASSES = 1
BATCH_SIZE = 32
NUM_WORKERS = 2 # Number of parallel processes for loading data

# Freeze settings (set to True to freeze)
FREEZE_BACKBONE = False
FREEZE_NECK = False

# Training settings
NUM_EPOCHS = 20
LEARNING_RATE = 0.0001
BEST_MODEL_PATH = Path("/content/best_model.pth")

# ONNX export settings
ONNX_OPSET_VERSION = 18
ONNX_PATH = Path("/content/model.onnx")

# Calibration data settings
NUM_CALIB_SAMPLES = 20
CALIB_NPZ_PATH = Path("/content/calibration_data.npz")

In [None]:
 # Set random seeds for reproducibility
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed(SEED)

In [None]:
# Define the target compute device (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

## Prepare Dataset

Rather than create our own (which is very time consuming), we will use an existing dataset to retrain the model.

We first download and unzip the dataset. After that, we covert the annotations to YOLO-style (normalized bounding box information in a .txt file for each image). Then, we create a few text files in *yolo_config/* that lists the classes and gives the location of each image file.

In [None]:
# Download dataset
!curl -L -o {DATASET_ZIP_PATH} {DATASET_URL}

# Extract the zip file
with zipfile.ZipFile(DATASET_ZIP_PATH, 'r') as zip_ref:
    zip_ref.extractall(DATASET_PATH)

In [None]:
def convert_csv_to_yolo(csv_path, class_mapping={'person': 0}):
    """
    Convert CSV annotations to YOLO format.
    Writes .txt files alongside original images (in the same folder).
    Creates empty .txt files for images not in the CSV (negative samples).

    CSV format: filename, width, height, class, xmin, ymin, xmax, ymax
    YOLO format: class_id cx cy w h (normalized 0-1)

    Returns list of image paths for train.txt/val.txt generation.
    """
    df = pd.read_csv(csv_path)
    source_dir = csv_path.parent

    image_paths = []
    annotated_stems = set()  # Track which images we've processed

    # Group by filename since one image can have multiple annotations
    grouped = df.groupby('filename')

    for filename, group in grouped:
        # Get image dimensions (same for all rows of this image)
        img_width = group.iloc[0]['width']
        img_height = group.iloc[0]['height']

        # Convert each bounding box to YOLO format
        yolo_annotations = []
        for _, row in group.iterrows():
            class_id = class_mapping[row['class']]

            # Convert xmin,ymin,xmax,ymax to cx,cy,w,h (normalized)
            xmin, ymin, xmax, ymax = row['xmin'], row['ymin'], row['xmax'], row['ymax']

            cx = ((xmin + xmax) / 2) / img_width
            cy = ((ymin + ymax) / 2) / img_height
            w = (xmax - xmin) / img_width
            h = (ymax - ymin) / img_height

            # Clamp values to [0, 1] just in case
            cx = max(0, min(1, cx))
            cy = max(0, min(1, cy))
            w = max(0, min(1, w))
            h = max(0, min(1, h))

            yolo_annotations.append(f"{class_id} {cx:.6f} {cy:.6f} {w:.6f} {h:.6f}")

        # Write YOLO annotation file alongside the image
        txt_filename = Path(filename).stem + '.txt'
        txt_path = source_dir / txt_filename
        with open(txt_path, 'w') as f:
            f.write('\n'.join(yolo_annotations))

        # Store full image path and track stem
        image_paths.append(str(source_dir / filename))
        annotated_stems.add(Path(filename).stem)

    # Create empty .txt files for images not in CSV (negative samples)
    negative_count = 0
    for img_file in source_dir.iterdir():
        if img_file.suffix.lower() in ['.jpg', '.jpeg', '.png', '.bmp']:
            if img_file.stem not in annotated_stems:
                txt_path = img_file.with_suffix('.txt')
                txt_path.touch()  # Create empty file
                image_paths.append(str(img_file))
                negative_count += 1

    print(f"Positive samples: {len(annotated_stems)}, Negative samples: {negative_count}")

    return image_paths

In [None]:
# Define the dataset splits
splits = {
    'train': DATASET_PATH / 'train' / 'train' / '_annotations.csv',
    'val': DATASET_PATH / 'valid' / 'valid' / '_annotations.csv',
    'test': DATASET_PATH / 'test' / 'test' / '_annotations.csv',
}

# Read the CSV file for each split and add YOLO-style .txt files for each image
all_paths = {}
for split_name, csv_path in splits.items():
    print(f"Converting {split_name}...")
    paths = convert_csv_to_yolo(csv_path)
    all_paths[split_name] = paths
    print(f"{len(paths)} images with annotations created")

print(f"\nTotal: {sum(len(p) for p in all_paths.values())} images")

In [None]:
# Create directory for YOLO config files
CONFIG_PATH.mkdir(parents=True, exist_ok=True)

# Write train.txt, val.txt, test.txt
for split_name, paths in all_paths.items():
    txt_file = CONFIG_PATH / f"{split_name}.txt"
    with open(txt_file, 'w') as f:
        f.write('\n'.join(paths))
    print(f"Created: {txt_file} ({len(paths)} images)")

# Write classes.names file
names_file = CONFIG_PATH / "classes.names"
with open(names_file, 'w') as f:
    f.write("person\n")
print(f"Created: {names_file}")

## Prepare Model

The original Yolo-Fastest was built using the [Darknet](https://github.com/pjreddie/darknet) framework. We need to download the Yolo-Fastest repo, convert the model to PyTorch, and load the preexisting weights.

In [None]:
# Download the model repo (we need the model architecture and weights)
!curl -L -o {REPO_ZIP_PATH} {REPO_URL}

# Extract and rename to consistent path
with zipfile.ZipFile(REPO_ZIP_PATH, 'r') as zip_ref:
    # Get top-level folder name from zip contents
    top_folder = zip_ref.namelist()[0].split('/')[0]
    zip_ref.extractall(REPO_PATH.parent)

# Rename extracted folder to desired name
extracted_path = REPO_PATH.parent / top_folder
extracted_path.rename(REPO_PATH)

In [None]:
# Read the original config
with open(MODEL_CFG_PATH, 'r') as f:
    content = f.read()

# Change input size strings in config
new_content = content.replace("width=320", f"width={IMG_WIDTH}")
new_content = new_content.replace("height=320", f"height={IMG_HEIGHT}")

# Save modified config
with open(MODIFIED_CFG_PATH, 'w') as f:
    f.write(new_content)

# Verify changes
with open(MODIFIED_CFG_PATH, 'r') as f:
    first_lines = ''.join(f.readlines()[:15])
print(f"First lines of modified config:\n{first_lines}")

In [None]:
# Download converter script
urllib.request.urlretrieve(CONVERTER_SCRIPT_URL, CONVERTER_SCRIPT_PATH)
print(f"Downloaded converter script to {CONVERTER_SCRIPT_PATH}")

# Import classes and functions from our converter script
from yolo_fastest_to_pytorch import YoloFastest, ConvNoBN


In [None]:
# Build model
model = YoloFastest(str(MODIFIED_CFG_PATH), input_size=IMG_HEIGHT)
print(f"Model built! Parameters: {sum(p.numel() for p in model.parameters()):,}")

# Load weights
model.load_darknet_weights(str(MODEL_WEIGHTS_PATH))

# Test forward pass
model.eval()
with torch.no_grad():
    dummy = torch.randn(1, 3, IMG_HEIGHT, IMG_WIDTH)
    outputs = model(dummy)
    print(f"Output shapes: {[o.shape for o in outputs]}")

## Replace Detection Heads

We need to replace the detection heads (final layers that predict one of 80 COCO classes by default) with custom layers that predict our 1 class ("person").

YOLO-Fastest has 2 detection heads that output predictions on two different grids (a coarse 6x6 grid and a finer 12x12 grid). Each grid cell contains 3 anchor points where an object might appear (anchors are predefined bounding boxes with different aspect ratios). The output of each detection head is a tensor with shape `[batch, channels, grid_height, grid_width]` where:

* `batch`: number of images. For inference, this will be 1.
* `channels`: 18 values representing 3 anchors with 6 predictions per anchor:
  * `tx, ty`: box center offset (relative to grid cell)
  * `tw, th`: box width/height scaling (relative to anchor size)
  * `obj`: objectness score (confidence that an object exists)
  * `cls`: class probability ("person")
* `grid_height`: number of rows of cells
* `grid_width`: number of columns of cells

The original COCO detection heads output 255 channels (3 anchors × 85 values, where 85 = 4 box + 1 obj + 80 classes). We replace these with heads that output 18 channels (3 anchors × 6 values, where 6 = 4 box + 1 obj + 1 class).

We should expect the output of the coarse detection head to have the shape `[1, 18, 6, 6]` and the output of the finer detection head to have the shape `[1, 18, 12, 12]`.

In [None]:
def find_detection_heads(model):
    """Find indices of conv layers immediately before yolo layers."""
    head_indices = []
    layer_idx = 0

    for i, block in enumerate(model.blocks):
        if block['type'] == 'net':
            continue

        if block['type'] == 'yolo':
            # The conv layer before this yolo is a detection head
            # Search backwards for the most recent conv
            search_idx = layer_idx - 1
            while search_idx >= 0:
                # Check the block type at this index
                block_idx = 0
                for b in model.blocks:
                    if b['type'] == 'net':
                        continue
                    if block_idx == search_idx:
                        if b['type'] == 'convolutional':
                            head_indices.append(search_idx)
                        break
                    block_idx += 1
                break

        layer_idx += 1

    return head_indices

In [None]:
# Find detection head layers programmatically
head_indices = find_detection_heads(model)
print(f"Detection head layer indices: {head_indices}")

In [None]:
# Replace detection heads with our 1-class detection heads
for idx in head_indices:
    old_conv = model.module_list[idx].conv
    model.module_list[idx] = ConvNoBN(
        in_ch=old_conv.in_channels,
        out_ch=NEW_FILTERS,
        kernel=old_conv.kernel_size[0],
        stride=old_conv.stride[0],
        pad=old_conv.padding[0],
        activation='linear'
    )

# Verify new output shapes
model.eval()
with torch.no_grad():
    dummy = torch.randn(1, 3, IMG_HEIGHT, IMG_WIDTH)
    outputs = model(dummy)
    print(f"New output shapes: {[o.shape for o in outputs]}")

In [None]:
# Move model to device
model = model.to(device)

## Freeze Parts of the Model

You have the option to *freeze* sections of the model. Freezing preserves the pretrained feature extraction capabilities, preventing the model from "forgetting" what it learned on COCO. This is useful when your dataset is small (risk of overfitting) or very similar to the original training data (features already work well). Freezing also speeds up training since fewer gradients need to be computed. You have the option of freezing either:

* **Backbone:** Extracts basic visual features from the image (e.g., edges, textures, shapes, object parts). YOLO-Fastest uses depthwise separable convolutions and residual blocks for efficient feature extraction. These low-level features are generally universal across datasets.
* **Neck:** Fuses features from different scales to help detect both large and small objects. This includes:
  * *Spatial Pyramid Pooling (SPP)*: Captures multi-scale context using parallel pooling operations at different kernel sizes
  * *FPN-like layers*: Upsamples and concatenates features from earlier backbone layers, combining semantic (what) and spatial (where) information

Note that the **detection heads** are single convolutional layers that output predictions for each grid cell. Each prediction contains box coordinates, objectness score, and class probability — all combined into 18 channels (3 anchors × 6 values). We replaced these heads for our single class, and they will always remain trainable.

A good starting point is to freeze both backbone and neck, training only the detection heads. If performance is insufficient, try unfreezing the neck to allow the feature fusion layers to adapt to your data.

In [None]:
def freeze_model_layers(model, freeze_backbone=True, freeze_neck=True):
    """
    Freeze portions of the YOLO-Fastest model. Note that the layers are hardcoded.

    Architecture breakdown:
    - Backbone: layers 0-108 (feature extraction)
    - Neck/SPP: layers 109-119, 122-128 (feature fusion)
    - Detection heads: layers 120, 129 (class-specific, always trainable)
    """
    # Find detection head indices (layers with output matching our NEW_FILTERS)
    head_indices = set()
    for i, module in enumerate(model.module_list):
        if hasattr(module, 'conv') and module.conv.out_channels == NEW_FILTERS:
            head_indices.add(i)

    # Define layer ranges
    backbone_range = range(0, 109)  # Layers 0-108
    neck_range = list(range(109, 120)) + list(range(122, 129))  # Exclude heads

    # Freeze backbone
    if freeze_backbone:
        for i in backbone_range:
            for param in model.module_list[i].parameters():
                param.requires_grad = False

    # Freeze neck
    if freeze_neck:
        for i in neck_range:
            for param in model.module_list[i].parameters():
                param.requires_grad = False

    # Ensure detection heads are always trainable
    for i in head_indices:
        for param in model.module_list[i].parameters():
            param.requires_grad = True

    return head_indices

In [None]:
# Apply freezing
head_indices = freeze_model_layers(model, FREEZE_BACKBONE, FREEZE_NECK)
print(f"Backbone frozen: {FREEZE_BACKBONE}")
print(f"Neck frozen: {FREEZE_NECK}")

# Count trainable parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Parameters: {trainable_params:,} trainable / {total_params:,} total")

## Create DataLoaders

We need to create loaders for our dataset.

In [None]:
class YoloDataset(Dataset):
    """YOLO-format dataset loader using image list file."""

    def __init__(self, list_file, img_size=192, augment=False):
        self.img_size = img_size
        self.augment = augment

        # Read image from file
        with open(list_file, 'r') as f:
            self.img_files = [Path(line.strip()) for line in f if line.strip()]

        print(f"Loaded {len(self.img_files)} images from {list_file}")

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        # Load image
        img_path = self.img_files[idx]
        img = cv2.imread(str(img_path))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        orig_h, orig_w = img.shape[:2]

        # Load labels
        label_path = img_path.with_suffix('.txt')
        if label_path.exists() and label_path.stat().st_size > 0:
            labels = np.loadtxt(str(label_path)).reshape(-1, 5).copy()
        else:
            labels = np.zeros((0, 5))

        # Apply augmentation before resize
        if self.augment and len(labels) > 0:
            img, labels = self.apply_augmentation(img, labels)

        # Resize
        img = cv2.resize(img, (self.img_size, self.img_size))

        # Normalize to [0, 1] and convert to CHW
        img = img.astype(np.float32) / 255.0
        img = torch.from_numpy(img).permute(2, 0, 1)

        if len(labels) > 0:
            labels = torch.from_numpy(labels).float()
        else:
            labels = torch.zeros((0, 5))

        return img, labels

    def apply_augmentation(self, img, labels):
        """Apply random augmentations."""
        # Random horizontal flip (50% chance)
        if random.random() > 0.5:
            img = cv2.flip(img, 1)
            labels[:, 1] = 1.0 - labels[:, 1]  # Flip x_center

        # Random brightness adjustment
        if random.random() > 0.5:
            factor = 0.7 + random.random() * 0.6  # 0.7 to 1.3
            img = np.clip(img * factor, 0, 255).astype(np.uint8)

        # Random contrast adjustment
        if random.random() > 0.5:
            factor = 0.7 + random.random() * 0.6  # 0.7 to 1.3
            mean = img.mean()
            img = np.clip((img - mean) * factor + mean, 0, 255).astype(np.uint8)

        # Random saturation adjustment
        if random.random() > 0.5:
            img_hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV).astype(np.float32)
            factor = 0.7 + random.random() * 0.6  # 0.7 to 1.3
            img_hsv[:, :, 1] = np.clip(img_hsv[:, :, 1] * factor, 0, 255)
            img = cv2.cvtColor(img_hsv.astype(np.uint8), cv2.COLOR_HSV2RGB)

        return img, labels

In [None]:
def collate_fn(batch):
    """Collate function for variable-length labels."""
    imgs, labels = zip(*batch)

    # Stack images
    imgs = torch.stack(imgs, dim=0)

    # Add batch index to labels and concatenate
    batch_labels = []
    for i, label in enumerate(labels):
        if len(label) > 0:
            # Add batch index as first column
            batch_idx = torch.full((len(label), 1), i)
            batch_labels.append(torch.cat([batch_idx, label], dim=1))

    if batch_labels:
        labels = torch.cat(batch_labels, dim=0)  # [N, 6]: batch_idx, class, x, y, w, h
    else:
        labels = torch.zeros((0, 6))

    return imgs, labels

In [None]:
# Create datasets
train_dataset = YoloDataset(CONFIG_PATH / 'train.txt', img_size=IMG_HEIGHT, augment=True)
val_dataset = YoloDataset(CONFIG_PATH / 'val.txt', img_size=IMG_HEIGHT, augment=False)
test_dataset = YoloDataset(CONFIG_PATH / 'test.txt', img_size=IMG_HEIGHT, augment=False)

In [None]:
# Create dataloaders
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True,
    drop_last=True,
    collate_fn=collate_fn
)
val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True,
    collate_fn=collate_fn
)
test_loader = DataLoader(
    test_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True if device.type == 'cuda' else False,
    collate_fn=collate_fn
)

print(f"Train: {len(train_loader)} batches")
print(f"Val: {len(val_loader)} batches")
print(f"Test: {len(test_loader)} batches")

In [None]:
# Get one batch and verify details
imgs, labels = next(iter(train_loader))
print(f"Images: {imgs.shape}")
print(f"Labels: {labels.shape}")

In [None]:
# Pick an index in the batch
idx = 3

# Get an image and its labels
img = imgs[idx].permute(1, 2, 0).numpy()  # CHW -> HWC, range [0, 1]
img_labels = labels[labels[:, 0] == idx]

# Create plots
fig, ax = plt.subplots(1, figsize=(8, 8))

# Option 1: Display float image directly (matplotlib handles [0, 1] range)
ax.imshow(img)

# Option 2: Convert to uint8 properly
# ax.imshow((img * 255).astype('uint8'))

# Draw bounding boxes
for label in img_labels:
    # Calculate box coordinates
    _, cls, cx, cy, w, h = label
    x = (cx - w/2) * IMG_WIDTH
    y = (cy - h/2) * IMG_HEIGHT
    width = w * IMG_WIDTH
    height = h * IMG_HEIGHT

    # Add green rectangle for bounding box
    rect = patches.Rectangle(
        (x, y),
        width,
        height,
        linewidth=2,
        edgecolor='lime',
        facecolor='none')
    ax.add_patch(rect)

# Show image info
ax.set_title(f"Sample image with {len(img_labels)} bounding boxes")
ax.axis('off')
plt.show()

## Define Loss Function

We will use some helper functions from the custom [yolo_fastest_utils.py](https://github.com/ShawnHymel/intro-to-edge-ai-and-cv-with-renesas/blob/main/scripts/yolo_fastest_utils.py) script along with the custom loss function defined in [yolo_fastest_loss.py](https://github.com/ShawnHymel/intro-to-edge-ai-and-cv-with-renesas/blob/main/scripts/yolo_fastest_loss.py).

For each ground truth box, we find which grid cell contains its center and which anchor best matches its shape (based on IoU of width/height). We then compute three losses: complete IoU (CIoU) loss for box coordinates (penalizes poor overlap, center distance, and aspect ratio mismatch), binary cross-entropy (BCE) loss for objectness (does this anchor contain an object?), and BCE loss for classification (is it a person?). The losses are weighted and summed across both output scales (6×6 and 12×12 grids).

In [None]:
# Download utilities script
urllib.request.urlretrieve(UTILS_SCRIPT_URL, UTILS_SCRIPT_PATH)
print(f"Downloaded utils script to {UTILS_SCRIPT_PATH}")

# Import utilities
from yolo_fastest_utils import (
    get_anchors_from_config,
    box_iou,
    bbox_iou_tensor,
    box_iou_wh,
    build_targets,
    decode_predictions,
    nms,
    compute_map
)

In [None]:
# Download loss calculation class
urllib.request.urlretrieve(LOSS_SCRIPT_URL, LOSS_SCRIPT_PATH)
print(f"Downloaded loss script to {LOSS_SCRIPT_PATH}")

# Import loss function
from yolo_fastest_loss import YoloLoss

In [None]:
# Get anchors from model config
anchors, anchor_masks = get_anchors_from_config(model)
print(f"Anchors: {anchors}")
print(f"Masks: {anchor_masks}")

# Create loss function
criterion = YoloLoss(
    anchors=anchors,
    anchor_masks=anchor_masks,
    num_classes=NUM_CLASSES,
    img_size=IMG_HEIGHT
)
print(f"Grid sizes: {criterion.grid_sizes}")

In [None]:
# Get a set of images and labels from the training dataset
model.train()
imgs, labels = next(iter(train_loader))
imgs = imgs.to(device)
labels = labels.to(device)

# Calculate losses using our custom loss function
outputs = model(imgs)
loss, box_loss, obj_loss, cls_loss = criterion(outputs, labels)

# Print losses
print(f"Box:   {box_loss.item():.4f}")
print(f"Obj:   {obj_loss.item():.4f}")
print(f"Cls:   {cls_loss.item():.4f}")
print(f"Total: {loss.item():.4f}")

## Model Training

We are now ready to train our model!

In [None]:
# Optimizer
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)

In [None]:
def train_one_epoch(
    model,
    train_loader,
    optimizer,
    criterion,
    device,
    verbose=False
):
    """Train for one epoch."""
    total_loss = 0
    total_box_loss = 0
    total_obj_loss = 0
    total_cls_loss = 0
    num_batches = len(train_loader)

    # Enable training-specific behaviors (e.g. dropout)
    model.train()

    # Do one full training cycle on a batch of training data
    for i, (imgs, labels) in enumerate(train_loader):
        # Move data to the same device as the model
        imgs = imgs.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(imgs)
        loss, box_loss, obj_loss, cls_loss = criterion(outputs, labels)

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Accumulate losses
        total_loss += loss.item()
        total_box_loss += box_loss.item()
        total_obj_loss += obj_loss.item()
        total_cls_loss += cls_loss.item()

        # Optionally print progress every 50 batches
        if verbose and (i+1) % 50 == 0:
            print(f"    Batch {i+1}/{num_batches} - loss: {loss.item():.2f}")

    # Calculate averages
    return {
        'loss': total_loss / num_batches,
        'box': total_box_loss / num_batches,
        'obj': total_obj_loss / num_batches,
        'cls': total_cls_loss / num_batches
    }

In [None]:
def validate(model, val_loader, criterion, device):
    """Validate the model."""
    total_loss = 0
    total_box_loss = 0
    total_obj_loss = 0
    total_cls_loss = 0
    num_batches = len(val_loader)

    # Disable training-specific behaviors (e.g. dropout)
    model.eval()

    # Do not track gradients during validation
    with torch.no_grad():
        for imgs, labels in val_loader:
            # Move data to the same device as the model
            imgs = imgs.to(device)
            labels = labels.to(device)

            # Forward pass
            outputs = model(imgs)
            loss, box_loss, obj_loss, cls_loss = criterion(outputs, labels)

            # Get total loss for the batch
            total_loss += loss.item()
            total_box_loss += box_loss.item()
            total_obj_loss += obj_loss.item()
            total_cls_loss += cls_loss.item()

    # Calculate averages
    return {
        'loss': total_loss / num_batches,
        'box': total_box_loss / num_batches,
        'obj': total_obj_loss / num_batches,
        'cls': total_cls_loss / num_batches
    }

In [None]:
# Training loop
history = {
    'train_loss': [], 'train_box': [], 'train_obj': [], 'train_cls': [],
    'val_loss': [], 'val_box': [], 'val_obj': [], 'val_cls': []
}

# Train for a number of epochs, save best model (lowest val loss)
print(f"Starting training for {NUM_EPOCHS} epochs...")
best_val_loss = float('inf')
for epoch in range(NUM_EPOCHS):
    print(f"\nEpoch {epoch+1}/{NUM_EPOCHS}")

    # Train
    train_metrics = train_one_epoch(
        model,
        train_loader,
        optimizer,
        criterion,
        device,
        True
    )

    # Validate
    val_metrics = validate(model, val_loader, criterion, device)

    # Record history
    history['train_loss'].append(train_metrics['loss'])
    history['train_box'].append(train_metrics['box'])
    history['train_obj'].append(train_metrics['obj'])
    history['train_cls'].append(train_metrics['cls'])
    history['val_loss'].append(val_metrics['loss'])
    history['val_box'].append(val_metrics['box'])
    history['val_obj'].append(val_metrics['obj'])
    history['val_cls'].append(val_metrics['cls'])

    # Save best model
    if val_metrics['loss'] < best_val_loss:
        best_val_loss = val_metrics['loss']
        torch.save(model.state_dict(), BEST_MODEL_PATH)
        print(f"Epoch {epoch+1}/{NUM_EPOCHS} - "
              f"Train: {train_metrics['loss']:.4f} - "
              f"Val: {val_metrics['loss']:.4f} - "
              f"Saved best model")
    else:
        print(f"Epoch {epoch+1}/{NUM_EPOCHS} - "
              f"Train: {train_metrics['loss']:.4f} - "
              f"Val: {val_metrics['loss']:.4f}")

print("\n" + "="*60)
print("Training complete!")
print(f"Best validation loss: {best_val_loss:.4f}")

In [None]:
# Plot training and validation curves
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Total loss
axes[0].plot(history['train_loss'], label='Train')
axes[0].plot(history['val_loss'], label='Validation')
axes[0].set_xlabel('Epoch')
axes[0].set_ylabel('Loss')
axes[0].set_title('Total Loss')
axes[0].legend()
axes[0].grid(True)

# Component losses
axes[1].plot(history['train_box'], label='Train Box')
axes[1].plot(history['val_box'], label='Val Box')
axes[1].plot(history['train_obj'], label='Train Obj')
axes[1].plot(history['val_obj'], label='Val Obj')
axes[1].plot(history['train_cls'], label='Train Cls')
axes[1].plot(history['val_cls'], label='Val Cls')
axes[1].set_xlabel('Epoch')
axes[1].set_ylabel('Loss')
axes[1].set_title('Loss Components')
axes[1].legend()
axes[1].grid(True)

plt.tight_layout()
plt.show()

## Evaluate Model

We will compute the mean average precision at 50% IoU (mAP@0.5) using our test set. To do that, we need write a few helper functions for computing the non-maximum suppression (NMS) and intersection over union (IoU).

We'll also see how the model performs on a single image by drawing the ground-truth bounding boxes and comparing them to the predicted bounding boxes.

In [None]:
# Load the best model weights and switch to evaluation mode
model.load_state_dict(torch.load(BEST_MODEL_PATH, map_location=device))
model.eval()
print(f"Loaded model from {BEST_MODEL_PATH}")

In [None]:
# Compute mAP on test set
test_map, total_preds, total_gt = compute_map(
    model=model,
    data_loader=test_loader,
    anchors=anchors,
    anchor_masks=anchor_masks,
    img_size=IMG_HEIGHT,
    device=device,
    conf_thresh=0.25,
    iou_thresh=0.5
)

print(f"Test mAP@0.5: {test_map:.4f}")
print(f"Total predictions: {total_preds}")
print(f"Total ground truth: {total_gt}")

In [None]:
# Choose an index into the test dataset
idx = 42

# Get a single test image directly from dataset
img, label = test_dataset[idx]

# Prepare image for display (already RGB, values in [0, 1])
img_np = img.permute(1, 2, 0).numpy()

# Run inference
model.eval()
with torch.no_grad():
    img_input = img.unsqueeze(0).float().to(device)  # Add batch dimension
    outputs = model(img_input)

# Decode predictions and apply NMS (updated signature)
predictions = decode_predictions(
    outputs,
    anchors=anchors,
    anchor_masks=anchor_masks,
    img_size=IMG_HEIGHT,
    conf_thresh=0.25
)
pred_boxes = nms(predictions[0], iou_thresh=0.45)

# Plot
fig, ax = plt.subplots(1, figsize=(10, 10))
ax.imshow(img_np)  # matplotlib handles [0, 1] float range

# Draw ground truth boxes (green)
# Note: label shape is [N, 5] with (class, cx, cy, w, h) - no batch_idx
for lbl in label:
    cx = lbl[1].item() * IMG_WIDTH
    cy = lbl[2].item() * IMG_HEIGHT
    w = lbl[3].item() * IMG_WIDTH
    h = lbl[4].item() * IMG_HEIGHT
    x = cx - w / 2
    y = cy - h / 2
    rect = patches.Rectangle((x, y), w, h, linewidth=2, edgecolor='lime', facecolor='none')
    ax.add_patch(rect)

# Draw predicted boxes (red)
for pred in pred_boxes:
    x1, y1, x2, y2, conf, cls = pred
    rect = patches.Rectangle((x1, y1), x2-x1, y2-y1, linewidth=2, edgecolor='red', facecolor='none')
    ax.add_patch(rect)
    ax.text(x1, y1-5, f'{conf:.2f}', color='red', fontsize=10, fontweight='bold')

ax.set_title(f"Green: Ground Truth, Red: Predictions ({len(pred_boxes)} detections)")
ax.axis('off')
plt.show()

print(f"Ground truth boxes: {len(label)}")
print(f"Predicted boxes: {len(pred_boxes)}")

## Export Model

Note that in most cases, the export process will produce 2 separate files:
* **.onnx** - Model architecture and metadata (with references to external weight data)
* **.onnx.data** - Model weights (external data file)

In [None]:
# Put the model into evaluation mode
model.eval()

# Create a dummy input tensor with the same shape as one sample (batch=1)
dummy_input = torch.randn(1, 3, IMG_HEIGHT, IMG_WIDTH).to(device)

# Export to ONNX
torch.onnx.export(
    model,                              # Model to export
    dummy_input,                        # Example input (for tracing)
    ONNX_PATH,                          # Output file path
    export_params=True,                 # Export with trained weights
    opset_version=ONNX_OPSET_VERSION,   # Which operations are supported
    do_constant_folding=True,           # Optimize constant operations
    input_names=['input'],              # Name for input layer
    output_names=['output'],            # Name for output layer
    dynamic_axes=None                   # Fixed batch size of 1
)
print(f"Model exported to: {ONNX_PATH}")

## Export Calibration Data

Export a few samples from the validation set to act as calibration data for post-training quantization.

In [None]:
# Don't exceed the total number of available samples
num_samples = min(NUM_CALIB_SAMPLES, len(val_dataset))

# Randomly choose from validation set
indices = random.sample(range(len(val_dataset)), num_samples)

# Get samples (ignore the labels) and convert to NumPy arrays (float32 format)
calib_samples = []
for i in indices:
    x, _ = val_dataset[i]
    calib_samples.append(x.float().numpy())

# Stack into a single array: shape (num_samples, 3, H, W)
calib_data = np.stack(calib_samples, axis=0)

# Save samples as NPZ
np.savez(CALIB_NPZ_PATH, input=calib_data)
print(f"Calibration data shape: {calib_data.shape}")
print(f"Calibration data dtype: {calib_data.dtype}")
print(f"Calibration data range: [{calib_data.min():.1f}, {calib_data.max():.1f}]")
print(f"Saved calibration data to: {CALIB_NPZ_PATH}")

## Deploy!

Download your *model.onnx* and *model.onnx.data* files along with the *calibration_data.npz* file. Use your vendor's toolset (e.g. RUHMI) to quantize, compress, and compile the model for your target device.