# Tiny-DETR for PennFudanPed (Assignment 4)
Lightweight DETR implementation faithful to the paper (Carion et al., 2020) with MobileNetV2 and ResNet18 backbones. The notebook is Colab-friendly and includes data prep, model definition, Hungarian matching loss, training, evaluation (mAP@0.5), and visualization hooks. All explanations are in English; code cells have short helper comments for clarity.


## How to run in Colab
- Runtime → Change runtime type → GPU (A100/T4/V100 all work).
- Execute cells in order. The download cell grabs PennFudanPed and reorganizes it into `PennFudanPed/Pedestrian/{train,val,test}` with images + annotations together.
- Default epochs are small for smoke tests. Bump to ~20-30 for the actual assignment runs (baseline + at least two guided improvements).
- Save checkpoints to Drive if you want persistence.


In [23]:
import math, random, json, re, copy, time, shutil, torch, platform
from pathlib import Path
import urllib.request, zipfile
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.transforms import functional as TF
from torchvision.transforms import ColorJitter
from PIL import Image
import matplotlib.pyplot as plt

def set_seed(seed: int = 42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)

set_seed(123)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print('PyTorch version:', torch.__version__)
print('CUDA available:', torch.cuda.is_available())
print('Python:', platform.python_version())
print('Using device:', device)


PyTorch version: 2.9.1
CUDA available: False
Python: 3.13.7
Using device: cpu


## Dataset (organized folders)
PennFudanPed has 170 pedestrian images. We place data in `PennFudanPed/Pedestrian/{train,val,test}`, each containing paired `.png` and `.txt` files. If the organized folders already exist, we just reuse them; otherwise we download and reorganize using the official split lists baked into the notebook.


In [None]:
DATA_ROOT = Path('PennFudanPed')
ORG_ROOT = DATA_ROOT / 'Pedestrian'

organized_root = ORG_ROOT
print('Organized data root:', organized_root)

Organized data root: PennFudanPed/Pedestrian


## Data pipeline
- Files are read from `PennFudanPed/Pedestrian/<split>` (PNG + TXT side by side).
- Fixed 512×512 resolution (requirement). No padding is used because images are resized.
- Two augmentations: random horizontal flip and mild color jitter; both motivated by the paper's suggestion for basic robustness on small datasets.
- Targets are converted to normalized center-format boxes (`cx, cy, w, h` in [0,1]) as in DETR.


In [24]:
def parse_annotation(txt_path: Path):
    """Extract pedestrian boxes from a PennFudanPed annotation file."""
    lines = txt_path.read_text().splitlines()
    boxes = []
    for ln in lines:
        if 'Bounding box for object' in ln:
            coords = re.findall(r'\((\d+), (\d+)\)', ln)
            if len(coords) == 2:
                (x0, y0), (x1, y1) = coords
                boxes.append([float(x0), float(y0), float(x1), float(y1)])
    return boxes

def xyxy_to_cxcywh(boxes: torch.Tensor):
    x0, y0, x1, y1 = boxes.unbind(-1)
    return torch.stack([(x0 + x1) / 2, (y0 + y1) / 2, (x1 - x0), (y1 - y0)], dim=-1)

def cxcywh_to_xyxy(boxes: torch.Tensor):
    cx, cy, w, h = boxes.unbind(-1)
    x0 = cx - 0.5 * w
    y0 = cy - 0.5 * h
    x1 = cx + 0.5 * w
    y1 = cy + 0.5 * h
    return torch.stack([x0, y0, x1, y1], dim=-1)

class Compose:
    def __init__(self, transforms):
        self.transforms = transforms
    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

class RandomHorizontalFlip:
    def __init__(self, p=0.5):
        self.p = p
    def __call__(self, image, target):
        if random.random() < self.p:
            image = TF.hflip(image)
            w, _ = image.size
            if target is not None and len(target.get('boxes', [])):
                boxes = target['boxes'].clone()
                boxes[:, [0, 2]] = w - boxes[:, [2, 0]]
                target['boxes'] = boxes
        return image, target

class RandomColor:
    def __init__(self, brightness=0.2, contrast=0.2, saturation=0.2, hue=0.02):
        self.op = ColorJitter(brightness=brightness, contrast=contrast, saturation=saturation, hue=hue)
    def __call__(self, image, target):
        return self.op(image), target

class ResizeToSquare:
    def __init__(self, size=512):
        self.size = size
    def __call__(self, image, target):
        w, h = image.size
        image = TF.resize(image, (self.size, self.size))
        if target is not None and len(target.get('boxes', [])):
            scale_x, scale_y = self.size / w, self.size / h
            boxes = target['boxes'] * torch.tensor([scale_x, scale_y, scale_x, scale_y])
            target['boxes'] = boxes
        if target is not None:
            target['size'] = torch.tensor([self.size, self.size], dtype=torch.int64)
        return image, target

class ToTensorAndNormalize:
    def __init__(self, mean=None, std=None):
        self.mean = mean or [0.485, 0.456, 0.406]
        self.std = std or [0.229, 0.224, 0.225]
    def __call__(self, image, target):
        image = TF.to_tensor(image)
        image = TF.normalize(image, self.mean, self.std)
        return image, target

class ConvertToDetrTargets:
    def __call__(self, image, target):
        if target is None:
            return image, target
        boxes = target.get('boxes', torch.zeros((0, 4)))
        if boxes.numel() > 0:
            boxes = xyxy_to_cxcywh(boxes)
            w, h = target['size'][1].item(), target['size'][0].item()
            scale = torch.tensor([w, h, w, h], dtype=torch.float32)
            boxes = boxes / scale
        target['boxes'] = boxes
        target['area'] = (boxes[:, 2] * boxes[:, 3]) if boxes.numel() > 0 else torch.zeros((0,), dtype=torch.float32)
        target['iscrowd'] = torch.zeros((boxes.shape[0],), dtype=torch.int64)
        return image, target

def make_transforms(train: bool = True, image_size: int = 512):
    ops = []
    if train:
        ops.append(RandomColor())
        ops.append(RandomHorizontalFlip(p=0.5))
    ops.append(ResizeToSquare(size=image_size))
    ops.append(ConvertToDetrTargets())
    ops.append(ToTensorAndNormalize())
    return Compose(ops)

def list_split_files(split_dir: Path):
    return sorted([p for p in split_dir.glob('*.png')])

class PennFudanDataset(Dataset):
    def __init__(self, split_dir: Path, transforms=None, image_size: int = 512):
        self.split_dir = Path(split_dir)
        self.files = list_split_files(self.split_dir)
        self.transforms = transforms
        self.image_size = image_size
    def __len__(self):
        return len(self.files)
    def __getitem__(self, idx):
        img_path = self.files[idx]
        ann_path = img_path.with_suffix('.txt')
        image = Image.open(img_path).convert('RGB')
        orig_w, orig_h = image.size
        boxes = parse_annotation(ann_path)
        boxes = torch.as_tensor(boxes, dtype=torch.float32).reshape(-1, 4)
        labels = torch.zeros((boxes.shape[0],), dtype=torch.int64)  # single class index 0
        target = {
            'boxes': boxes,
            'labels': labels,
            'image_id': torch.tensor(idx),
            'orig_size': torch.tensor([orig_h, orig_w])
        }
        if self.transforms is not None:
            image, target = self.transforms(image, target)
        return image, target

def collate_fn(batch):
    images, targets = list(zip(*batch))
    images = torch.stack(images)
    return images, list(targets)

image_size = 512
train_ds = PennFudanDataset(organized_root / 'train', transforms=make_transforms(train=True, image_size=image_size), image_size=image_size)
val_ds = PennFudanDataset(organized_root / 'val', transforms=make_transforms(train=False, image_size=image_size), image_size=image_size)
test_ds = PennFudanDataset(organized_root / 'test', transforms=make_transforms(train=False, image_size=image_size), image_size=image_size)

print('Dataset sizes:', len(train_ds), len(val_ds), len(test_ds))


Dataset sizes: 118 25 27


## Model components (Tiny-DETR)
Key ideas from the paper retained in the lightweight version:
- CNN backbone (MobileNetV2 or ResNet18) to produce a feature map.
- 2D sinusoidal positional encoding added to the flattened feature map.
- Transformer encoder-decoder with learned object queries (set prediction, fixed query count).
- Shared FFN heads for class logits (+ no-object) and normalized box regression.


In [None]:
# Box utilities

def box_area(boxes):
    return (boxes[:, 2] - boxes[:, 0]).clamp(min=0) * (boxes[:, 3] - boxes[:, 1]).clamp(min=0)

def box_iou(boxes1, boxes2):
    area1 = box_area(boxes1)
    area2 = box_area(boxes2)
    lt = torch.max(boxes1[:, None, :2], boxes2[:, :2])
    rb = torch.min(boxes1[:, None, 2:], boxes2[:, 2:])
    wh = (rb - lt).clamp(min=0)
    inter = wh[:, :, 0] * wh[:, :, 1]
    union = area1[:, None] + area2 - inter
    iou = inter / (union + 1e-6)
    return iou, union

def generalized_box_iou(boxes1, boxes2):
    iou, union = box_iou(boxes1, boxes2)
    lt = torch.min(boxes1[:, None, :2], boxes2[:, :2])
    rb = torch.max(boxes1[:, None, 2:], boxes2[:, 2:])
    wh = (rb - lt).clamp(min=0)
    area = wh[:, :, 0] * wh[:, :, 1]
    return iou - (area - union) / (area + 1e-6)

class PositionEncodingSine(nn.Module):
    def __init__(self, num_pos_feats=128, temperature=10000, normalize=True, scale=None):
        super().__init__()
        self.num_pos_feats = num_pos_feats
        self.temperature = temperature
        self.normalize = normalize
        self.scale = scale or 2 * math.pi
    def forward(self, x: torch.Tensor):
        # x: [B, C, H, W]
        b, c, h, w = x.shape
        mask = torch.zeros((b, h, w), device=x.device, dtype=torch.bool)
        y_embed = mask.cumsum(1, dtype=torch.float32)
        x_embed = mask.cumsum(2, dtype=torch.float32)
        if self.normalize:
            eps = 1e-6
            y_embed = (y_embed + 0.5) / (y_embed[:, -1:, :] + eps) * self.scale
            x_embed = (x_embed + 0.5) / (x_embed[:, :, -1:] + eps) * self.scale
        dim_t = self.temperature ** (2 * (torch.arange(self.num_pos_feats, device=x.device) // 2) / self.num_pos_feats)
        pos_x = x_embed[:, :, :, None] / dim_t
        pos_y = y_embed[:, :, :, None] / dim_t
        pos_x = torch.stack((pos_x[:, :, :, 0::2].sin(), pos_x[:, :, :, 1::2].cos()), dim=4).flatten(3)
        pos_y = torch.stack((pos_y[:, :, :, 0::2].sin(), pos_y[:, :, :, 1::2].cos()), dim=4).flatten(3)
        pos = torch.cat((pos_y, pos_x), dim=3).permute(0, 3, 1, 2)
        return pos

class TransformerEncoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation='relu'):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.activation = F.relu if activation == 'relu' else F.gelu
    def with_pos_embed(self, tensor, pos):
        return tensor if pos is None else tensor + pos
    def forward(self, src, src_mask=None, src_key_padding_mask=None, pos=None):
        q = k = self.with_pos_embed(src, pos)
        src2 = self.self_attn(q, k, value=src, attn_mask=src_mask, key_padding_mask=src_key_padding_mask)[0]
        src = self.norm1(src + self.dropout1(src2))
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = self.norm2(src + self.dropout2(src2))
        return src

class TransformerDecoderLayer(nn.Module):
    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1, activation='relu'):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout, batch_first=True)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        self.activation = F.relu if activation == 'relu' else F.gelu
    def with_pos_embed(self, tensor, pos):
        return tensor if pos is None else tensor + pos
    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None, tgt_key_padding_mask=None,
                memory_key_padding_mask=None, pos=None, query_pos=None):
        q = k = self.with_pos_embed(tgt, query_pos)
        tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask, key_padding_mask=tgt_key_padding_mask)[0]
        tgt = self.norm1(tgt + self.dropout1(tgt2))
        tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt, query_pos),
                                   key=self.with_pos_embed(memory, pos),
                                   value=memory, attn_mask=memory_mask,
                                   key_padding_mask=memory_key_padding_mask)[0]
        tgt = self.norm2(tgt + self.dropout2(tgt2))
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
        tgt = self.norm3(tgt + self.dropout3(tgt2))
        return tgt

class TransformerEncoder(nn.Module):
    def __init__(self, encoder_layer, num_layers, norm=None):
        super().__init__()
        self.layers = nn.ModuleList([copy.deepcopy(encoder_layer) for _ in range(num_layers)])
        self.norm = norm
    def forward(self, src, mask=None, src_key_padding_mask=None, pos=None):
        output = src
        for layer in self.layers:
            output = layer(output, src_mask=mask, src_key_padding_mask=src_key_padding_mask, pos=pos)
        if self.norm is not None:
            output = self.norm(output)
        return output

class TransformerDecoder(nn.Module):
    def __init__(self, decoder_layer, num_layers, norm=None, return_intermediate=False):
        super().__init__()
        self.layers = nn.ModuleList([copy.deepcopy(decoder_layer) for _ in range(num_layers)])
        self.norm = norm
        self.return_intermediate = return_intermediate
    def forward(self, tgt, memory, tgt_mask=None, memory_mask=None, tgt_key_padding_mask=None,
                memory_key_padding_mask=None, pos=None, query_pos=None):
        output = tgt
        intermediate = []
        for layer in self.layers:
            output = layer(output, memory, tgt_mask=tgt_mask, memory_mask=memory_mask,
                           tgt_key_padding_mask=tgt_key_padding_mask, memory_key_padding_mask=memory_key_padding_mask,
                           pos=pos, query_pos=query_pos)
            if self.return_intermediate:
                intermediate.append(self.norm(output) if self.norm is not None else output)
        if self.norm is not None:
            output = self.norm(output)
            if self.return_intermediate:
                intermediate[-1] = output
        if self.return_intermediate:
            return torch.stack(intermediate)
        return output.unsqueeze(0)

class SimpleTransformer(nn.Module):
    def __init__(self, d_model=256, nhead=8, num_encoder_layers=3, num_decoder_layers=3,
                 dim_feedforward=1024, dropout=0.1, return_intermediate_dec=True):
        super().__init__()
        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward, dropout)
        decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward, dropout)
        self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, norm=nn.LayerNorm(d_model))
        self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, norm=nn.LayerNorm(d_model),
                                          return_intermediate=return_intermediate_dec)
        self.d_model = d_model
    def forward(self, src, mask, query_embed, pos_embed):
        bs, c, h, w = src.shape
        src_flat = src.flatten(2).permute(0, 2, 1)
        pos_flat = pos_embed.flatten(2).permute(0, 2, 1)
        query_embed = query_embed.unsqueeze(0).expand(bs, -1, -1)
        tgt = torch.zeros_like(query_embed)
        memory = self.encoder(src_flat, mask=None, src_key_padding_mask=None, pos=pos_flat)
        hs = self.decoder(tgt, memory, memory_key_padding_mask=None, pos=pos_flat, query_pos=query_embed)
        return hs, memory

class MLP(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super().__init__()
        layers = []
        for i in range(num_layers):
            in_dim = input_dim if i == 0 else hidden_dim
            out_dim = output_dim if i == num_layers - 1 else hidden_dim
            layers.append(nn.Linear(in_dim, out_dim))
        self.layers = nn.ModuleList(layers)
    def forward(self, x):
        for layer in self.layers[:-1]:
            x = F.relu(layer(x))
        return self.layers[-1](x)

def build_backbone(name='resnet18', train_backbone=False):
    name = name.lower()
    if name == 'resnet18':
        weights = torchvision.models.ResNet18_Weights.IMAGENET1K_V1
        backbone = torchvision.models.resnet18(weights=weights)
        modules = list(backbone.children())[:-2]
        backbone = nn.Sequential(*modules)
        num_channels = 512
    elif name in ['mobilenet_v2', 'mobilenetv2']:
        weights = torchvision.models.MobileNet_V2_Weights.IMAGENET1K_V2
        backbone = torchvision.models.mobilenet_v2(weights=weights).features
        num_channels = 1280
    else:
        raise ValueError(f'Unknown backbone {name}')
    for param in backbone.parameters():
        param.requires_grad = train_backbone
    return backbone, num_channels

class TinyDETR(nn.Module):
    def __init__(self, backbone_name='resnet18', num_classes=1, num_queries=50, hidden_dim=256,
                 nheads=8, num_encoder_layers=3, num_decoder_layers=3, dim_feedforward=1024,
                 dropout=0.1, aux_loss=True, train_backbone=False):
        super().__init__()
        self.backbone, num_backbone_channels = build_backbone(backbone_name, train_backbone=train_backbone)
        self.input_proj = nn.Conv2d(num_backbone_channels, hidden_dim, kernel_size=1)
        self.position_embedding = PositionEncodingSine(hidden_dim // 2, normalize=True)
        self.query_embed = nn.Embedding(num_queries, hidden_dim)
        self.transformer = SimpleTransformer(d_model=hidden_dim, nhead=nheads,
                                             num_encoder_layers=num_encoder_layers,
                                             num_decoder_layers=num_decoder_layers,
                                             dim_feedforward=dim_feedforward,
                                             dropout=dropout, return_intermediate_dec=aux_loss)
        self.class_embed = nn.Linear(hidden_dim, num_classes + 1)  # +1 for no-object
        self.bbox_embed = MLP(hidden_dim, hidden_dim, 4, num_layers=3)
        self.aux_loss = aux_loss
    def forward(self, samples):
        if isinstance(samples, (list, tuple)):
            samples = torch.stack(samples)
        features = self.backbone(samples)
        src = self.input_proj(features)
        pos = self.position_embedding(src)
        hs, memory = self.transformer(src, mask=None, query_embed=self.query_embed.weight, pos_embed=pos)
        outputs_class = self.class_embed(hs)
        outputs_coord = self.bbox_embed(hs).sigmoid()
        out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]}
        if self.aux_loss:
            out['aux_outputs'] = [{'pred_logits': c, 'pred_boxes': b} for c, b in zip(outputs_class[:-1], outputs_coord[:-1])]
        return out


## Hungarian matching + loss
- One-to-one assignment between predictions and ground truth with Hungarian matching (set prediction objective).
- Loss = classification (cross-entropy with no-object), L1 box loss, and GIoU loss. Weights mirror DETR’s recipe but can be tuned.
- Auxiliary decoder outputs receive the same loss to aid optimization on the shallow transformer.


In [None]:
class HungarianMatcher(nn.Module):
    def __init__(self, class_cost=1.0, bbox_cost=5.0, giou_cost=2.0):
        super().__init__()
        self.class_cost = class_cost
        self.bbox_cost = bbox_cost
        self.giou_cost = giou_cost
        assert class_cost != 0 or bbox_cost != 0 or giou_cost != 0, 'All costs are zero!'
    @torch.no_grad()
    def forward(self, outputs, targets):
        bs, num_queries = outputs['pred_logits'].shape[:2]
        out_prob = outputs['pred_logits'].softmax(-1)  # [B, Q, K]
        out_bbox = outputs['pred_boxes']
        indices = []
        for b in range(bs):
            tgt_ids = targets[b]['labels']
            tgt_bbox = targets[b]['boxes']
            if tgt_bbox.numel() == 0:
                indices.append((torch.as_tensor([], dtype=torch.int64), torch.as_tensor([], dtype=torch.int64)))
                continue
            prob = out_prob[b][:, tgt_ids]
            cost_class = -prob
            cost_bbox = torch.cdist(out_bbox[b], tgt_bbox, p=1)
            cost_giou = -generalized_box_iou(cxcywh_to_xyxy(out_bbox[b]), cxcywh_to_xyxy(tgt_bbox))
            C = self.class_cost * cost_class + self.bbox_cost * cost_bbox + self.giou_cost * cost_giou
            C = C.cpu()
            from scipy.optimize import linear_sum_assignment
            i, j = linear_sum_assignment(C)
            indices.append((torch.as_tensor(i, dtype=torch.int64), torch.as_tensor(j, dtype=torch.int64)))
        return indices

def get_src_permutation_idx(indices):
    batch_idx = torch.cat([torch.full_like(src, i) for i, (src, _) in enumerate(indices)])
    src_idx = torch.cat([src for (src, _) in indices])
    return batch_idx, src_idx

class SetCriterion(nn.Module):
    def __init__(self, num_classes, matcher, weight_dict, eos_coef, losses):
        super().__init__()
        self.num_classes = num_classes
        self.matcher = matcher
        self.weight_dict = weight_dict
        self.eos_coef = eos_coef
        self.losses = losses
        empty_weight = torch.ones(self.num_classes + 1)
        empty_weight[-1] = self.eos_coef
        self.register_buffer('empty_weight', empty_weight)
    def loss_labels(self, outputs, targets, indices):
        src_logits = outputs['pred_logits']
        idx = get_src_permutation_idx(indices)
        target_classes_o = torch.cat([t['labels'][J] for t, (_, J) in zip(targets, indices)], dim=0)
        target_classes = torch.full(src_logits.shape[:2], self.num_classes, dtype=torch.int64, device=src_logits.device)
        target_classes[idx] = target_classes_o
        loss_ce = F.cross_entropy(src_logits.transpose(1, 2), target_classes, self.empty_weight)
        return {'loss_ce': loss_ce}
    def loss_boxes(self, outputs, targets, indices):
        idx = get_src_permutation_idx(indices)
        src_boxes = outputs['pred_boxes'][idx]
        target_boxes = torch.cat([t['boxes'][i] for t, (_, i) in zip(targets, indices)], dim=0)
        loss_bbox = F.l1_loss(src_boxes, target_boxes, reduction='none')
        losses = {'loss_bbox': loss_bbox.mean()}
        loss_giou = 1 - torch.diag(generalized_box_iou(cxcywh_to_xyxy(src_boxes), cxcywh_to_xyxy(target_boxes)))
        losses['loss_giou'] = loss_giou.mean()
        return losses
    def _get_loss(self, loss, outputs, targets, indices):
        loss_map = {'labels': self.loss_labels, 'boxes': self.loss_boxes}
        return loss_map[loss](outputs, targets, indices)
    def forward(self, outputs, targets):
        outputs_no_aux = {k: v for k, v in outputs.items() if k != 'aux_outputs'}
        indices = self.matcher(outputs_no_aux, targets)
        losses = {}
        for loss in self.losses:
            losses.update(self._get_loss(loss, outputs_no_aux, targets, indices))
        if 'aux_outputs' in outputs:
            for i, aux in enumerate(outputs['aux_outputs']):
                idxs = self.matcher(aux, targets)
                for loss in self.losses:
                    l_dict = self._get_loss(loss, aux, targets, idxs)
                    l_dict = {f'{k}_{i}': v for k, v in l_dict.items()}
                    losses.update(l_dict)
        total = sum(self.weight_dict.get(k, 1.0) * v for k, v in losses.items())
        losses['loss_total'] = total
        return losses


## Training + evaluation utilities
- AdamW optimizer with a smaller LR for the backbone (paper suggestion).
- Optional gradient clipping and AMP for speed on Colab.
- Simple mAP@0.5 implementation for the single-class setup.


In [None]:
def build_model(config):
    model = TinyDETR(
        backbone_name=config['backbone'],
        num_classes=1,
        num_queries=config['num_queries'],
        hidden_dim=config['hidden_dim'],
        nheads=config['nheads'],
        num_encoder_layers=config['enc_layers'],
        num_decoder_layers=config['dec_layers'],
        dim_feedforward=config['ff_dim'],
        dropout=config['dropout'],
        aux_loss=True,
        train_backbone=config['train_backbone']
    )
    matcher = HungarianMatcher(class_cost=config['cls_cost'], bbox_cost=config['bbox_cost'], giou_cost=config['giou_cost'])
    weight_dict = {'loss_ce': config['cls_loss_coef'], 'loss_bbox': config['bbox_loss_coef'], 'loss_giou': config['giou_loss_coef']}
    criterion = SetCriterion(num_classes=1, matcher=matcher, weight_dict=weight_dict, eos_coef=config['no_object_weight'], losses=['labels', 'boxes'])
    return model, criterion

def build_loaders(config):
    train_loader = DataLoader(train_ds, batch_size=config['batch_size'], shuffle=True,
                              collate_fn=collate_fn, num_workers=config['num_workers'])
    val_loader = DataLoader(val_ds, batch_size=config['batch_size'], shuffle=False,
                            collate_fn=collate_fn, num_workers=config['num_workers'])
    test_loader = DataLoader(test_ds, batch_size=config['batch_size'], shuffle=False,
                             collate_fn=collate_fn, num_workers=config['num_workers'])
    return train_loader, val_loader, test_loader

def targets_to_device(targets, device):
    new_targets = []
    for t in targets:
        new_t = {k: v.to(device) if torch.is_tensor(v) else v for k, v in t.items()}
        new_targets.append(new_t)
    return new_targets

def train_one_epoch(model, criterion, data_loader, optimizer, device, epoch, max_norm=0.1, scaler=None):
    model.train()
    criterion.train()
    losses_all = []
    ce_all, bbox_all, giou_all = [], [], []
    for images, targets in data_loader:
        images = images.to(device)
        targets = targets_to_device(targets, device)
        optimizer.zero_grad()
        with torch.cuda.amp.autocast(enabled=scaler is not None):
            outputs = model(images)
            loss_dict = criterion(outputs, targets)
            loss = loss_dict['loss_total']
        if scaler is not None:
            scaler.scale(loss).backward()
            if max_norm > 0:
                scaler.unscale_(optimizer)
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
            scaler.step(optimizer)
            scaler.update()
        else:
            loss.backward()
            if max_norm > 0:
                torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm)
            optimizer.step()
        losses_all.append(loss.item())
        ce_all.append(loss_dict['loss_ce'].item())
        bbox_all.append(loss_dict['loss_bbox'].item())
        giou_all.append(loss_dict['loss_giou'].item())
    stats = {
        'loss': float(np.mean(losses_all)),
        'loss_ce': float(np.mean(ce_all)),
        'loss_bbox': float(np.mean(bbox_all)),
        'loss_giou': float(np.mean(giou_all))
    }
    print(f"Epoch {epoch}: train loss {stats['loss']:.4f} | ce {stats['loss_ce']:.4f} | bbox {stats['loss_bbox']:.4f} | giou {stats['loss_giou']:.4f}")
    return stats

def compute_ap(recall, precision):
    recall = np.concatenate(([0.0], recall, [1.0]))
    precision = np.concatenate(([0.0], precision, [0.0]))
    for i in range(precision.size - 1, 0, -1):
        precision[i - 1] = np.maximum(precision[i - 1], precision[i])
    indices = np.where(recall[1:] != recall[:-1])[0]
    ap = np.sum((recall[indices + 1] - recall[indices]) * precision[indices + 1])
    return ap

def evaluate_map(model, data_loader, device, score_thr=0.3, iou_thr=0.5):
    model.eval()
    preds = []
    gts = {}
    with torch.no_grad():
        for images, targets in data_loader:
            images = images.to(device)
            targets = targets_to_device(targets, device)
            outputs = model(images)
            probs = outputs['pred_logits'].softmax(-1)
            boxes = outputs['pred_boxes']
            for b in range(images.shape[0]):
                tgt = targets[b]
                img_id = int(tgt['image_id'])
                h, w = tgt['size']
                gt_boxes = cxcywh_to_xyxy(tgt['boxes']) * torch.tensor([w, h, w, h], device=device)
                gts[img_id] = gt_boxes.cpu()
                ped_scores = probs[b, :, 0]
                ped_boxes = cxcywh_to_xyxy(boxes[b]) * torch.tensor([w, h, w, h], device=device)
                keep = ped_scores > score_thr
                for score, box in zip(ped_scores[keep], ped_boxes[keep]):
                    preds.append({'image_id': img_id, 'score': float(score.cpu()), 'box': box.cpu()})
    preds = sorted(preds, key=lambda x: x['score'], reverse=True)
    tp, fp = [], []
    matched = {img_id: np.zeros(len(gts[img_id])) for img_id in gts}
    for pred in preds:
        img_id = pred['image_id']
        box = pred['box']
        if img_id not in gts or len(gts[img_id]) == 0:
            fp.append(1); tp.append(0); continue
        ious, _ = box_iou(box.unsqueeze(0), gts[img_id])
        ious = ious.squeeze(0).numpy()
        best_idx = np.argmax(ious)
        if ious[best_idx] >= iou_thr and matched[img_id][best_idx] == 0:
            tp.append(1); fp.append(0); matched[img_id][best_idx] = 1
        else:
            fp.append(1); tp.append(0)
    tp = np.array(tp)
    fp = np.array(fp)
    if len(tp) == 0:
        return {'map50': 0.0, 'precision': 0.0, 'recall': 0.0}
    tp_cum = np.cumsum(tp)
    fp_cum = np.cumsum(fp)
    recalls = tp_cum / (sum(len(v) for v in gts.values()) + 1e-6)
    precisions = tp_cum / np.maximum(tp_cum + fp_cum, 1e-6)
    ap = compute_ap(recalls, precisions)
    return {'map50': float(ap), 'precision': float(precisions[-1]), 'recall': float(recalls[-1])}

def run_training(config, train_loader, val_loader):
    model, criterion = build_model(config)
    model.to(device)
    criterion.to(device)
    backbone_params = []
    transformer_params = []
    for name, p in model.named_parameters():
        if not p.requires_grad:
            continue
        if 'backbone' in name:
            backbone_params.append(p)
        else:
            transformer_params.append(p)
    param_dicts = [{'params': transformer_params}]
    if backbone_params:
        param_dicts.append({'params': backbone_params, 'lr': config['lr_backbone']})
    optimizer = torch.optim.AdamW(param_dicts, lr=config['lr'], weight_decay=config['weight_decay'])
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=config['lr_drop'], gamma=0.1)
    scaler = torch.cuda.amp.GradScaler(enabled=config['use_amp'])
    history = []
    best_map = -1
    save_path = Path(config['checkpoint_path'])
    save_path.parent.mkdir(exist_ok=True, parents=True)
    for epoch in range(1, config['epochs'] + 1):
        train_stats = train_one_epoch(model, criterion, train_loader, optimizer, device, epoch,
                                      max_norm=config['clip_max_norm'], scaler=scaler)
        val_stats = evaluate_map(model, val_loader, device, score_thr=config['score_threshold'])
        scheduler.step()
        record = {'epoch': epoch, **train_stats, **val_stats, 'lr': optimizer.param_groups[0]['lr']}
        history.append(record)
        print(f"Val mAP@0.5: {val_stats['map50']:.3f} | precision {val_stats['precision']:.3f} | recall {val_stats['recall']:.3f}")
        if val_stats['map50'] > best_map:
            best_map = val_stats['map50']
            torch.save({'model': model.state_dict(), 'config': config, 'epoch': epoch}, save_path)
            print(f'✅ New best model saved to {save_path} (mAP@0.5={best_map:.3f})')
    return model, history


## Baseline configuration (ResNet18)
Paper-inspired defaults, shrunk for the small dataset. Increase epochs to ~25 for the full run.
- Backbone: ResNet18 (ImageNet-pretrained), LR 1e-5.
- Transformer: 3 encoder / 3 decoder layers, 8 heads, hidden dim 256, FFN dim 1024.
- Queries: 50 (tunable).
- Loss weights: cls=1, bbox=5, giou=2, no-object weight=0.1.
- Augmentations: color jitter + horizontal flip.


In [None]:
base_config = {
    'backbone': 'resnet18',
    'num_queries': 50,
    'hidden_dim': 256,
    'nheads': 8,
    'enc_layers': 3,
    'dec_layers': 3,
    'ff_dim': 1024,
    'dropout': 0.1,
    'train_backbone': False,
    'lr': 1e-4,
    'lr_backbone': 1e-5,
    'weight_decay': 1e-4,
    'lr_drop': 15,
    'batch_size': 4,
    'num_workers': 2,
    'epochs': 10,  # set to 20-30 for the full assignment run
    'clip_max_norm': 0.1,
    'score_threshold': 0.3,
    'cls_cost': 1.0,
    'bbox_cost': 5.0,
    'giou_cost': 2.0,
    'cls_loss_coef': 1.0,
    'bbox_loss_coef': 5.0,
    'giou_loss_coef': 2.0,
    'no_object_weight': 0.1,
    'use_amp': True,
    'checkpoint_path': 'checkpoints/tiny_detr_resnet18.pth'
}
train_loader, val_loader, test_loader = build_loaders(base_config)
print('Ready loaders. Baseline config:')
print(json.dumps({k: v for k, v in base_config.items() if k not in ['checkpoint_path']}, indent=2))


### Train baseline
Set `base_config['epochs']=20` (or 25) for the graded run. Keep the cell idempotent so you can re-run with new hyperparameters. Training time on Colab T4 for 20 epochs is typically ~20–30 minutes.


In [None]:
RUN_BASELINE = False  # flip to True to launch training
if RUN_BASELINE:
    model, history = run_training(base_config, train_loader, val_loader)
    with open('history_resnet18.json', 'w') as f:
        json.dump(history, f, indent=2)
    print('Training complete. History saved to history_resnet18.json')
else:
    print('Skipping baseline training (set RUN_BASELINE=True to run).')


## Guided improvements
Try at least two changes (paper-guided): different backbone, query count, transformer depth, or augmentation tweaks. Two ready-made configs are below; feel free to add more.


In [None]:
mobile_config = {**base_config, 'backbone': 'mobilenet_v2', 'lr_backbone': 5e-5,
                 'hidden_dim': 192, 'num_queries': 75, 'checkpoint_path': 'checkpoints/tiny_detr_mobilenetv2.pth'}
queries_config = {**base_config, 'num_queries': 100, 'checkpoint_path': 'checkpoints/tiny_detr_resnet18_q100.pth'}

print('Example configs prepared (mobilenet_v2 and resnet18 with more queries). Set RUN_EXPERIMENT to train them.')
RUN_EXPERIMENT = False
if RUN_EXPERIMENT:
    for cfg in [mobile_config, queries_config]:
        print(f"Running config: {cfg['checkpoint_path']}")
        model, history = run_training(cfg, *build_loaders(cfg))
        with open(Path(cfg['checkpoint_path']).with_suffix('.history.json'), 'w') as f:
            json.dump(history, f, indent=2)


## Load best checkpoint and evaluate on test set
After training, load the saved weights and compute test mAP@0.5. This cell is safe to run multiple times with different checkpoints.


In [None]:
def load_model_from_checkpoint(path, config):
    ckpt = torch.load(path, map_location=device)
    cfg = ckpt.get('config', config)
    model, _ = build_model(cfg)
    model.load_state_dict(ckpt['model'])
    model.to(device)
    model.eval()
    return model

CHECKPOINT_TO_EVAL = base_config['checkpoint_path']
if Path(CHECKPOINT_TO_EVAL).exists():
    eval_model = load_model_from_checkpoint(CHECKPOINT_TO_EVAL, base_config)
    test_stats = evaluate_map(eval_model, test_loader, device, score_thr=base_config['score_threshold'])
    print('Test mAP@0.5:', test_stats)
else:
    print('No checkpoint found at', CHECKPOINT_TO_EVAL)


## Visualization (qualitative results)
Use this to inspect predictions. Good + failure cases should be included in the report.


In [None]:
def denormalize(img_tensor):
    mean = torch.tensor([0.485, 0.456, 0.406], device=img_tensor.device).view(3, 1, 1)
    std = torch.tensor([0.229, 0.224, 0.225], device=img_tensor.device).view(3, 1, 1)
    return (img_tensor * std + mean).clamp(0, 1)

def visualize_predictions(model, dataset, num_images=3, score_thr=0.4):
    model.eval()
    idxs = random.sample(range(len(dataset)), k=num_images)
    fig, axes = plt.subplots(1, num_images, figsize=(6 * num_images, 6))
    if num_images == 1:
        axes = [axes]
    with torch.no_grad():
        for ax, idx in zip(axes, idxs):
            img, target = dataset[idx]
            h, w = target['size']
            inp = img.unsqueeze(0).to(device)
            out = model(inp)
            prob = out['pred_logits'].softmax(-1)[0, :, 0]
            boxes = cxcywh_to_xyxy(out['pred_boxes'][0]) * torch.tensor([w, h, w, h], device=device)
            keep = prob > score_thr
            img_np = denormalize(img).permute(1, 2, 0).cpu().numpy()
            ax.imshow(img_np)
            for score, box in zip(prob[keep], boxes[keep]):
                x0, y0, x1, y1 = box.cpu()
                rect = plt.Rectangle((x0, y0), x1 - x0, y1 - y0, fill=False, color='lime', linewidth=2)
                ax.add_patch(rect)
                ax.text(x0, y0, f'{score:.2f}', color='yellow', fontsize=9, bbox=dict(facecolor='black', alpha=0.5))
            gt_boxes = cxcywh_to_xyxy(target['boxes']) * torch.tensor([w, h, w, h])
            for box in gt_boxes:
                x0, y0, x1, y1 = box
                rect = plt.Rectangle((x0, y0), x1 - x0, y1 - y0, fill=False, color='red', linestyle='--', linewidth=1.5)
                ax.add_patch(rect)
            ax.set_title(f'Image {idx} | green=pred, red=gt')
            ax.axis('off')
    plt.show()

# Example usage (requires a trained checkpoint loaded into eval_model)
# visualize_predictions(eval_model, val_ds, num_images=3, score_thr=0.5)


## Report template (fill after experiments)
- **Backbone comparison (MobileNetV2 vs ResNet18):** describe feature strength vs. speed, validation/test mAP@0.5, training time/epoch. Relate to DETR paper notes on backbone capacity.
- **Positional encoding:** summarize why DETR needs it and confirm sinusoidal implementation.
- **Transformer depth / queries:** report query counts tried (e.g., 50 vs 100) and impact on convergence.
- **Augmentation study:** effect of color jitter + flip (and any additional tweak) on mAP or overfitting.
- **Final model:** chosen config, epochs, LR schedule, best val mAP, test mAP, and qualitative examples (good + failure cases).
- **Training curves:** include loss + val mAP plots (load from history JSON files).
