# Fast R-CNN in PyTorch (from scratch)

This notebook implements a minimal Fast R-CNN training pipeline in PyTorch:
- Dataset: PennFudan Pedestrian Detection (boxes derived from masks)
- Region proposals: Selective Search
- Model: ResNet-50 backbone (conv1..layer3), RoIAlign, 2-FC head, class-specific bbox regression
- Training: IoU-based RoI sampling, classification + Smooth L1 bbox loss

Notes:
- This is a teaching/reference implementation, optimized for clarity over performance.
- Expect modest accuracy with short training; selective search is slow. Proposals are cached per image to speed up subsequent epochs.
- By default we only train the Fast R-CNN head (backbone is frozen) for quicker convergence on CPU; you can unfreeze for better results if you have a GPU.


In [None]:
# Install dependencies (run once)
import sys, subprocess

def pip_install(pkgs):
    subprocess.check_call([sys.executable, "-m", "pip", "install", "-q"] + pkgs)

try:
    import selectivesearch  # noqa: F401
except Exception:
    pip_install(["selectivesearch"])  # pure-python selective search

try:
    import skimage  # noqa: F401
except Exception:
    pip_install(["scikit-image"])  

try:
    import cv2  # noqa: F401
except Exception:
    pip_install(["opencv-python-headless"])  

try:
    import torch, torchvision  # noqa: F401
except Exception:
    print("PyTorch not found. Attempting to install CPU wheels (you can skip if already available).")
    pip_install(["torch", "torchvision", "torchaudio"])  

pip_install(["tqdm"])  # progress bars
pip_install(["matplotlib"])  # visualization


In [None]:
import os
import math
import time
import zipfile
import random
import urllib.request
from pathlib import Path
from typing import List, Tuple, Dict

import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.ops import roi_align, nms
from torchvision import transforms as T
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import selectivesearch
from tqdm.auto import tqdm

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

IMAGENET_MEAN = [0.485, 0.456, 0.406]
IMAGENET_STD  = [0.229, 0.224, 0.225]

torch.manual_seed(42)
np.random.seed(42)
random.seed(42)


## Download PennFudanPed dataset and define Dataset class
We derive bounding boxes from the instance masks. Single class: person (label=1). Background=0.

In [None]:
DATA_ROOT = Path("/kaggle/working")
PENNFUDAN_URL = "https://www.cis.upenn.edu/~jshi/ped_html/PennFudanPed.zip"
PENNFUDAN_DIR = DATA_ROOT / "PennFudanPed"

def download_pennfudan():
    PENNFUDAN_DIR.parent.mkdir(parents=True, exist_ok=True)
    if PENNFUDAN_DIR.exists():
        print("PennFudan already present.")
        return
    zip_path = DATA_ROOT / "PennFudanPed.zip"
    if not zip_path.exists():
        print("Downloading PennFudanPed...")
        urllib.request.urlretrieve(PENNFUDAN_URL, zip_path)
    print("Extracting...")
    with zipfile.ZipFile(zip_path, 'r') as zf:
        zf.extractall(DATA_ROOT)
    print("Done.")

download_pennfudan()

class PennFudanDataset(Dataset):
    def __init__(self, root: Path, transforms=None):
        self.root = Path(root)
        self.imgs = sorted((self.root / "PNGImages").glob("*.png"))
        self.masks = sorted((self.root / "PedMasks").glob("*.png"))
        assert len(self.imgs) == len(self.masks)
        self.transforms = transforms

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, idx):
        img_path = self.imgs[idx]
        mask_path = self.masks[idx]
        img = Image.open(img_path).convert("RGB")
        mask = Image.open(mask_path)
        mask_np = np.array(mask)
        obj_ids = np.unique(mask_np)
        obj_ids = obj_ids[1:]  # remove background 0

        boxes = []
        for oid in obj_ids:
            pos = np.where(mask_np == oid)
            if pos[0].size == 0 or pos[1].size == 0:
                continue
            y1 = np.min(pos[0])
            y2 = np.max(pos[0])
            x1 = np.min(pos[1])
            x2 = np.max(pos[1])
            # clip and discard tiny boxes
            x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2)
            if (x2 - x1) >= 4 and (y2 - y1) >= 4:
                boxes.append([x1, y1, x2, y2])

        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.ones((boxes.shape[0],), dtype=torch.int64)  # single class: person=1

        if self.transforms:
            img_t = self.transforms(img)
        else:
            # default: to tensor + imagenet normalize
            img_t = T.Compose([
                T.ToTensor(),
                T.Normalize(IMAGENET_MEAN, IMAGENET_STD)
            ])(img)

        target = {
            'boxes': boxes,
            'labels': labels,
            'size': torch.tensor([img.height, img.width], dtype=torch.int64)
        }
        return img_t, target, img  # return PIL img for proposals/vis

full_dataset = PennFudanDataset(PENNFUDAN_DIR)
print("Total images:", len(full_dataset))

# Split train/test
indices = list(range(len(full_dataset)))
random.shuffle(indices)
split = int(0.8 * len(indices))
train_indices, test_indices = indices[:split], indices[split:]

class SubsetWrap(Dataset):
    def __init__(self, base, indices):
        self.base = base
        self.indices = indices
    def __len__(self):
        return len(self.indices)
    def __getitem__(self, i):
        return self.base[self.indices[i]]

train_ds = SubsetWrap(full_dataset, train_indices)
test_ds  = SubsetWrap(full_dataset, test_indices)
print("Train:", len(train_ds), " Test:", len(test_ds))


## Selective Search proposals
We use the Python `selectivesearch` package. Proposals are filtered by size/aspect and deduplicated, then clipped to image boundaries and limited to a maximum count for speed.

In [None]:
def generate_ss_proposals(np_img: np.ndarray,
                          scale: int = 450,
                          sigma: float = 0.8,
                          min_size: int = 30,
                          max_proposals: int = 2000,
                          min_box_size: int = 16,
                          max_aspect_ratio: float = 4.0) -> np.ndarray:
    # np_img: HxWx3 RGB uint8
    assert np_img.dtype == np.uint8 and np_img.ndim == 3
    H, W = np_img.shape[:2]
    _, regions = selectivesearch.selective_search(np_img, scale=scale, sigma=sigma, min_size=min_size)
    seen = set()
    props = []
    for r in regions:
        x, y, w, h = r['rect']
        if w <= 0 or h <= 0:
            continue
        if w < min_box_size or h < min_box_size:
            continue
        ar = max(w/h, h/w)
        if ar > max_aspect_ratio:
            continue
        x1, y1 = x, y
        x2, y2 = x + w, y + h
        # clip
        x1 = max(0, min(x1, W-1))
        y1 = max(0, min(y1, H-1))
        x2 = max(1, min(x2, W))
        y2 = max(1, min(y2, H))
        if x2 - x1 < min_box_size or y2 - y1 < min_box_size:
            continue
        key = (x1, y1, x2, y2)
        if key in seen:
            continue
        seen.add(key)
        props.append([x1, y1, x2, y2, r.get('size', (x2-x1)*(y2-y1))])
    # sort by region size desc, take top-k
    props.sort(key=lambda v: v[4], reverse=True)
    props = np.array([p[:4] for p in props[:max_proposals]], dtype=np.float32)
    return props

def pil_to_uint8_rgb(img: Image.Image) -> np.ndarray:
    return np.array(img.convert('RGB'), dtype=np.uint8)

proposal_cache: Dict[int, torch.Tensor] = {}

def get_cached_proposals(idx: int, pil_img: Image.Image) -> torch.Tensor:
    if idx in proposal_cache:
        return proposal_cache[idx]
    np_img = pil_to_uint8_rgb(pil_img)
    props = generate_ss_proposals(np_img)
    if props.shape[0] == 0:
        # Fallback: entire image
        H, W = np_img.shape[:2]
        props = np.array([[0,0,W,H]], dtype=np.float32)
    tprops = torch.from_numpy(props)
    proposal_cache[idx] = tprops
    return tprops

# quick smoke test on one image
img_t, tgt, pil_img = train_ds[0]
props0 = get_cached_proposals(train_indices[0], pil_img)
print("Proposals example:", props0.shape)


## IoU, bbox encoding/decoding, and RoI sampling utilities
We sample a minibatch of RoIs per image, with a fraction of positives (IoU≥0.5) and the rest negatives (0.1≤IoU<0.5). Regression targets use the Fast R-CNN parameterization and are class-specific.

In [None]:
def box_area(boxes: torch.Tensor) -> torch.Tensor:
    # boxes: [N, 4] (x1,y1,x2,y2)
    return (boxes[:, 2] - boxes[:, 0]).clamp(min=0) * (boxes[:, 3] - boxes[:, 1]).clamp(min=0)

def box_iou(boxes1: torch.Tensor, boxes2: torch.Tensor) -> torch.Tensor:
    # returns [N, M]
    area1 = box_area(boxes1)
    area2 = box_area(boxes2)

    lt = torch.max(boxes1[:, None, :2], boxes2[:, :2])
    rb = torch.min(boxes1[:, None, 2:], boxes2[:, 2:])
    wh = (rb - lt).clamp(min=0)
    inter = wh[:, :, 0] * wh[:, :, 1]
    union = area1[:, None] + area2 - inter + 1e-6
    return inter / union

def to_center_wh(boxes: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:
    x1, y1, x2, y2 = boxes.unbind(-1)
    w = (x2 - x1).clamp(min=1e-6)
    h = (y2 - y1).clamp(min=1e-6)
    cx = x1 + 0.5 * w
    cy = y1 + 0.5 * h
    return cx, cy, w, h

def encode_boxes(gt: torch.Tensor, proposals: torch.Tensor) -> torch.Tensor:
    # gt/proposals: [N, 4]
    gcx, gcy, gw, gh = to_center_wh(gt)
    pcx, pcy, pw, ph = to_center_wh(proposals)
    tx = (gcx - pcx) / pw
    ty = (gcy - pcy) / ph
    tw = torch.log(gw / pw)
    th = torch.log(gh / ph)
    return torch.stack([tx, ty, tw, th], dim=-1)

def decode_boxes(deltas: torch.Tensor, proposals: torch.Tensor, img_size: Tuple[int,int]) -> torch.Tensor:
    # deltas: [N, 4], proposals: [N, 4]
    H, W = img_size
    pcx, pcy, pw, ph = to_center_wh(proposals)
    dx, dy, dw, dh = deltas.unbind(-1)
    cx = dx * pw + pcx
    cy = dy * ph + pcy
    w = pw * torch.exp(dw)
    h = ph * torch.exp(dh)
    x1 = cx - 0.5 * w
    y1 = cy - 0.5 * h
    x2 = cx + 0.5 * w
    y2 = cy + 0.5 * h
    boxes = torch.stack([x1, y1, x2, y2], dim=-1)
    # clip to image
    boxes[:, 0::2] = boxes[:, 0::2].clamp(0, W)
    boxes[:, 1::2] = boxes[:, 1::2].clamp(0, H)
    return boxes

def sample_rois(proposals: torch.Tensor,
                gt_boxes: torch.Tensor,
                gt_labels: torch.Tensor,
                batch_size: int = 128,
                fg_fraction: float = 0.25,
                fg_thresh: float = 0.5,
                bg_thresh_hi: float = 0.5,
                bg_thresh_lo: float = 0.1,
                num_classes: int = 2):
    # Add GT boxes to proposals to ensure positives
    all_props = torch.cat([proposals, gt_boxes], dim=0)

    ious = box_iou(all_props, gt_boxes)  # [R, G]
    max_iou, gt_idx = ious.max(dim=1)
    labels = gt_labels[gt_idx]

    fg_idxs = torch.nonzero(max_iou >= fg_thresh).squeeze(1)
    bg_idxs = torch.nonzero((max_iou < bg_thresh_hi) & (max_iou >= bg_thresh_lo)).squeeze(1)

    fg_rois_per_image = int(round(batch_size * fg_fraction))
    fg_idxs = fg_idxs[torch.randperm(fg_idxs.numel())[:fg_rois_per_image]] if fg_idxs.numel() > 0 else fg_idxs
    bg_rois_per_image = batch_size - fg_idxs.numel()
    bg_idxs = bg_idxs[torch.randperm(bg_idxs.numel())[:bg_rois_per_image]] if bg_idxs.numel() > 0 else bg_idxs

    keep = torch.cat([fg_idxs, bg_idxs], dim=0)
    if keep.numel() == 0:
        # fallback: take top IoUs
        keep = torch.topk(max_iou, k=min(batch_size, max_iou.numel())).indices
    rois = all_props[keep]

    roi_labels = labels[keep].clone()
    # background label = 0
    roi_labels[torch.arange(roi_labels.numel()) >= fg_idxs.numel()] = 0

    # bbox targets (class-specific)
    bbox_targets = torch.zeros((rois.size(0), 4 * num_classes), dtype=torch.float32)
    # only positives get regression targets
    pos_mask = roi_labels > 0
    pos_inds = torch.nonzero(pos_mask).squeeze(1)
    if pos_inds.numel() > 0:
        gt_assigned = gt_boxes[gt_idx[keep[pos_inds]]]
        deltas = encode_boxes(gt_assigned, rois[pos_inds])
        for i, cls in zip(pos_inds, roi_labels[pos_inds]):
            cls = int(cls.item())
            start = 4 * cls
            bbox_targets[i, start:start+4] = deltas[pos_inds == i][0]

    return rois, roi_labels, bbox_targets


## Fast R-CNN model
- Backbone: ResNet-50 up to layer3 (stride=16)
- RoIAlign: 7x7 pooling, spatial_scale=1/16
- Two 1024-d FC layers + classification and bbox regression heads
- Bbox regression is class-specific (4 x num_classes)

We freeze the backbone by default for speed; you can set `train_backbone=True` to fine-tune it as well.

In [None]:
class FastRCNN(nn.Module):
    def __init__(self, num_classes: int = 2, pool_size: int = 7, train_backbone: bool = False):
        super().__init__()
        self.num_classes = num_classes
        self.pool_size = pool_size
        # Load ResNet-50 backbone
        try:
            weights = torchvision.models.ResNet50_Weights.DEFAULT
            resnet = torchvision.models.resnet50(weights=weights)
        except Exception:
            resnet = torchvision.models.resnet50(pretrained=True)

        # Use layers up to layer3 (output channels=1024, stride=16)
        self.backbone = nn.Sequential(
            resnet.conv1,
            resnet.bn1,
            resnet.relu,
            resnet.maxpool,
            resnet.layer1,
            resnet.layer2,
            resnet.layer3,
        )
        self.backbone_out_channels = 1024
        if not train_backbone:
            for p in self.backbone.parameters():
                p.requires_grad = False

        # Head: 2 FC layers of 1024 dims
        self.avgpool_out = self.backbone_out_channels * self.pool_size * self.pool_size
        self.fc1 = nn.Linear(self.avgpool_out, 1024)
        self.fc2 = nn.Linear(1024, 1024)
        self.dropout = nn.Dropout(0.5)
        self.cls_score = nn.Linear(1024, num_classes)
        self.bbox_pred = nn.Linear(1024, 4 * num_classes)

        # Initialize heads
        for l in [self.fc1, self.fc2, self.cls_score, self.bbox_pred]:
            nn.init.normal_(l.weight, std=0.01)
            nn.init.constant_(l.bias, 0)

        # stride=16 -> spatial_scale = 1/16
        self.spatial_scale = 1.0 / 16.0

    def forward(self, images: torch.Tensor, rois: List[torch.Tensor]):
        # images: [N,3,H,W], rois: list of [Ri, 4] in image coords
        feats = self.backbone(images)
        # RoIAlign returns [sumR, C, pool, pool]
        pooled = roi_align(feats, rois, output_size=(self.pool_size, self.pool_size),
                           spatial_scale=self.spatial_scale, sampling_ratio=2, aligned=True)
        x = pooled.flatten(start_dim=1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        class_logits = self.cls_score(x)
        bbox_deltas = self.bbox_pred(x)
        return class_logits, bbox_deltas

def fast_rcnn_losses(class_logits: torch.Tensor,
                     bbox_deltas: torch.Tensor,
                     labels: torch.Tensor,
                     bbox_targets: torch.Tensor,
                     num_classes: int) -> Tuple[torch.Tensor, torch.Tensor]:
    # classification loss
    cls_loss = F.cross_entropy(class_logits, labels)
    # bbox loss: only for positives
    pos_mask = labels > 0
    if pos_mask.any():
        pos_inds = torch.nonzero(pos_mask).squeeze(1)
        # gather class-specific predictions
        pred = bbox_deltas[pos_inds]
        tgt = bbox_targets[pos_inds]
        # For each ROI i with class c, select slice [4c:4c+4]
        idx = labels[pos_inds]
        rows = torch.arange(pred.size(0), device=pred.device)
        cols = (idx * 4).unsqueeze(1) + torch.arange(4, device=pred.device).unsqueeze(0)
        pred_sel = pred[rows.unsqueeze(1), cols]
        tgt_sel = tgt[rows.unsqueeze(1), cols]
        box_loss = F.smooth_l1_loss(pred_sel, tgt_sel, reduction='mean')
    else:
        box_loss = torch.tensor(0.0, device=class_logits.device)
    return cls_loss, box_loss


## Train loop
We cache proposals per image to avoid re-running Selective Search each epoch. By default, we train only the Fast R-CNN head for a few epochs. Increase epochs/batch size if you have a GPU for better results.

In [None]:
def collate_fn(batch):
    # batch of size 1 for simplicity
    return batch[0]

train_loader = DataLoader(train_ds, batch_size=1, shuffle=True, collate_fn=collate_fn)
test_loader  = DataLoader(test_ds, batch_size=1, shuffle=False, collate_fn=collate_fn)

num_classes = 2  # background + person
model = FastRCNN(num_classes=num_classes, pool_size=7, train_backbone=True).to(device)

params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.003, momentum=0.9, weight_decay=1e-4)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=4, gamma=0.1)

def train_one_epoch(epoch: int, max_images: int = None):
    model.train()
    pbar = tqdm(enumerate(train_loader), total=min(len(train_loader), max_images) if max_images else len(train_loader))
    total_cls, total_box = 0.0, 0.0
    count = 0
    for i, (img_t, target, pil_img) in pbar:
        if max_images and i >= max_images:
            break
        img_t = img_t.to(device).unsqueeze(0)  # [1,3,H,W]
        gt_boxes = target['boxes'].to(device)
        gt_labels = target['labels'].to(device)
        H, W = int(target['size'][0]), int(target['size'][1])

        idx_global = train_indices[i] if i < len(train_indices) else None
        props = get_cached_proposals(idx_global, pil_img).to(device)
        # sample RoIs
        rois, roi_labels, bbox_targets = sample_rois(
            props, gt_boxes, gt_labels,
            batch_size=128, fg_fraction=0.25,
            fg_thresh=0.5, bg_thresh_hi=0.5, bg_thresh_lo=0.1,
            num_classes=num_classes
        )
        # forward
        class_logits, bbox_deltas = model(img_t, [rois])
        cls_loss, box_loss = fast_rcnn_losses(class_logits, bbox_deltas, roi_labels.to(device), bbox_targets.to(device), num_classes)
        loss = cls_loss + box_loss

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_cls += cls_loss.item()
        total_box += box_loss.item()
        count += 1
        pbar.set_description(f"Epoch {epoch} | cls {cls_loss.item():.3f} box {box_loss.item():.3f}")
    return total_cls / max(count,1), total_box / max(count,1)

EPOCHS = 50  # increase for better results
for epoch in range(1, EPOCHS+1):
    t0 = time.time()
    avg_cls, avg_box = train_one_epoch(epoch, max_images=None)
    lr_scheduler.step()
    print(f"Epoch {epoch} done in {time.time()-t0:.1f}s | avg cls {avg_cls:.3f} box {avg_box:.3f}")


## Inference and visualization
For a given test image:
- Generate proposals (cached)
- Forward through Fast R-CNN head
- Decode class-specific bbox deltas
- Score with softmax, per-class NMS
- Visualize top detections for class `person`

In [None]:
@torch.inference_mode()
def fast_rcnn_infer_single(img_t: torch.Tensor, pil_img: Image.Image, conf_thresh=0.5, nms_thresh=0.5, max_dets=50):
    model.eval()
    img_t = img_t.to(device).unsqueeze(0)
    H, W = pil_img.height, pil_img.width
    idx = None  # for cache key not used here
    props = get_cached_proposals(-1 if idx is None else idx, pil_img).to(device)
    class_logits, bbox_deltas = model(img_t, [props])
    probs = F.softmax(class_logits, dim=1)  # [R, C]

    detections = []
    # For each class > 0
    for c in range(1, model.num_classes):
        scores = probs[:, c]
        keep = scores >= conf_thresh
        if keep.sum() == 0:
            continue
        scores = scores[keep]
        prop_keep = props[keep]
        deltas_c = bbox_deltas[keep, 4*c:4*c+4]
        boxes_c = decode_boxes(deltas_c, prop_keep, (H, W))
        # NMS per class
        keep_idx = nms(boxes_c, scores, nms_thresh)
        boxes_c = boxes_c[keep_idx]
        scores = scores[keep_idx]
        for b, s in zip(boxes_c.cpu().numpy(), scores.cpu().numpy()):
            detections.append((int(c), float(s), b))
    # sort by score
    detections.sort(key=lambda x: x[1], reverse=True)
    return detections[:max_dets]

def visualize_detections(pil_img: Image.Image, detections, class_names={1: 'person'}):
    fig, ax = plt.subplots(1, 1, figsize=(8, 8))
    ax.imshow(pil_img)
    for cls, score, box in detections:
        x1, y1, x2, y2 = box
        rect = patches.Rectangle((x1, y1), x2-x1, y2-y1, linewidth=2, edgecolor='lime', facecolor='none')
        ax.add_patch(rect)
        ax.text(x1, y1-5, f"{class_names.get(cls, str(cls))}: {score:.2f}", color='yellow', fontsize=10,
                bbox=dict(facecolor='black', alpha=0.5, pad=1))
    ax.axis('off')
    plt.show()

# Run on a few test images
for i in range(min(3, len(test_ds))):
    img_t, target, pil_img = test_ds[i]
    dets = fast_rcnn_infer_single(img_t, pil_img, conf_thresh=0.5, nms_thresh=0.5)
    print(f"Image {i}: {len(dets)} detections")
    visualize_detections(pil_img, dets)


## Tips and extensions
- Increase EPOCHS and lower `conf_thresh` initially to inspect detections early.
- Unfreeze backbone (`train_backbone=True`) for better performance on GPU.
- Add bbox target normalization (mean/std) for more stable training.
- Hard example mining: adjust bg thresholds or sample more RoIs.
- Multi-class datasets: change labels and `num_classes` accordingly.
- For speed and accuracy, consider Faster R-CNN (with an RPN) via `torchvision.models.detection.fasterrcnn_resnet50_fpn`.
