# Fast RCNN

**Fast R-CNN** es una mejora directa de la R-CNN clásica.  

En este caso, las regiones de interés (*ROIs*) ya no se recortan ni se procesan individualmente para extraer características.  
En su lugar, la imagen completa pasa una sola vez por la red convolucional, generando un **feature map**.  

Después, se obtienen las propuestas de **Selective Search** y se proyectan sus coordenadas sobre ese mapa de características para extraer las regiones correspondientes.  

Dado que cada ROI tiene un tamaño distinto, se aplica **ROI Pooling** para normalizarlas a una forma fija, de modo que puedan pasar por las capas fully-connected del *head*.  (En implementaciones modernas usamos ROI Align)

La red se entrena optimizando simultáneamente:
- la **pérdida de clasificación** (qué objeto hay en cada ROI), y  
- la **pérdida de regresión** (ajuste fino de los *bounding boxes*).  

Durante el entrenamiento **no se aplica NMS**, pero **sí se usa en la inferencia** para eliminar detecciones redundantes.  

Esta arquitectura mejora notablemente los tiempos respecto a R-CNN, aunque **Selective Search sigue siendo el cuello de botella principal**.  


Diferencias:

| Etapa     | R-CNN clásica                  | Fast-RCNN                   |
| --------- | ------------------------------ | ------------------------------------------- |
| Proposals | Selective Search               | Selective Search                 |
| ROIs      | No hay como tal                | Subconjunto de proposals usado por ROIPooling / Align |
| NMS       | Final (sobre predicciones SVM) | Final (sobre predicciones del head)         |


Conceptos:


| Concepto      | Qué es                                                    | Cómo se obtiene                                        | Cuántos hay              | Cómo se usa                                                |
| ------------- | --------------------------------------------------------- | ------------------------------------------------------ | ------------------------ | ---------------------------------------------------------- |
| **Proposals** | Candidatas “donde podría haber algo”                      | De un algoritmo como **Selective Search** o un **RPN** | Miles (≈2000 por imagen) | Entrada bruta del detector                                 |
| **ROIs**      | Subconjunto de proposals elegidas para entrenar o inferir | Se **muestrean** (pos/neg) de las proposals            | Pocas (≈128 por imagen)  | Alimentan **ROIAlign + head** para clasificación/regresión |


Vamos a ver su implementación

In [1]:
import os, random, math, time, pathlib, shutil, json
import numpy as np
import torch
import torchvision as tv
import selectivesearch
from torchvision.models import ResNet50_Weights
from torchsummary import summary
from torchvision.datasets import VOCDetection
from torchvision.transforms.functional import to_tensor, normalize
from pathlib import Path
import pickle

#Train Mode
trainFreeze = True
trainFineTune = False

#Checkpoint de modelo
CKPT_PATH_FROZEN = "./checkpoints_fast_rcnn/fast_rcnn_frozen_best_01.pth"

#Proposals cache, para no tener que calcularlos de nuevo
PROPOSALS_PATH = Path("./checkpoints_fast_rcnn/proposals_cache_01.pkl")
if PROPOSALS_PATH.exists():
    with open(PROPOSALS_PATH, "rb") as f:
        PROPOSALS_CACHE = pickle.load(f)
    print(f"[CACHE] Loaded proposals cache ({len(PROPOSALS_CACHE)} images)")
else:
    PROPOSALS_CACHE = {}
    print("[CACHE] Starting with empty proposals cache")
# ===============================

weights = ResNet50_Weights.IMAGENET1K_V2
resnet_transforms = weights.transforms()

torch.manual_seed(42)
np.random.seed(42)
random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print("CUDA disponible:", torch.cuda.is_available())
print("Device:", device)

if device.type == "cuda":
    torch.set_float32_matmul_precision("high")
    torch.backends.cuda.matmul.allow_tf32 = True
    torch.backends.cudnn.allow_tf32 = True
    torch.backends.cudnn.benchmark = True
    torch.backends.cudnn.deterministic = False

VOC_ROOT = os.environ.get("VOC_ROOT", "./data/02")
print("VOC_ROOT:", VOC_ROOT)


[CACHE] Loaded proposals cache (5159 images)
CUDA disponible: True
Device: cuda
VOC_ROOT: ./data/02


  _C._set_float32_matmul_precision(precision)


#### Nos saltamos la parte de descargar el dataset, lo vimos en el cuaderno anterior.

Pasamos directamente a su carga.

In [2]:


train_root = "./data/02/VOCtrainval_06-Nov-2007"
test_root  = "./data/02/VOCtest_06-Nov-2007"

train_ds = VOCDetection(train_root, year="2007", image_set="trainval", download=False)
test_ds  = VOCDetection(test_root,  year="2007", image_set="test",     download=False)

print("Train:", len(train_ds))
print("Test:", len(test_ds))


Train: 5011
Test: 4952


In [3]:
# === Extracción automática de clases del dataset VOC ===

def extract_voc_classes(dataset):
    classes = set()
    for i in range(min(500, len(dataset))):  # escanea solo 500 imágenes para acelerar
        ann = dataset[i][1]['annotation']
        objs = ann.get('object', [])
        if isinstance(objs, dict):  # si solo hay un objeto
            objs = [objs]
        for obj in objs:
            name = obj['name']
            classes.add(name)
    return sorted(list(classes))

# Obtener clases del dataset de entrenamiento
voc_classes = extract_voc_classes(train_ds)

#Añadimos background porque es la clase 0, que no está en voc_classes, lo necesitamos para el loss. 
#Tendremos 21 clases: 20 de VOC + 1 de background.
CLASSES = ['background'] + voc_classes

NUM_CLASSES = len(CLASSES)

print("Clases detectadas:", voc_classes)
print("Total:", NUM_CLASSES)


Clases detectadas: ['aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse', 'motorbike', 'person', 'pottedplant', 'sheep', 'sofa', 'train', 'tvmonitor']
Total: 21


Vamos a setear algunas constantes.

Tenemos unas 2000 proposals por imagen
Estas salen de Selective Search.
La mayoría no contiene ningún objeto útil; algunas sí cubren bien GT.
El objetivo es quedarse con 128 ROIs representativas para esa imagen.

Para ello calcularemos el IoU entre cada proposal y todas las cajas GT. (en otra celda, mas adelante)

Luego clasificamos cada proposal en Pos y Neg en base a los thresholds del IoU y poderamos las imagenes que nos queremos quedar.


In [4]:


# IoU thresholds
POS_IOU_THRESH = 0.3
NEG_IOU_THRESH = 0.3

# Sampler config
# Tomamos 128 ROIs por imagen.
ROIS_PER_IMG = 256
# 25% de esas ROIs son positivas (foreground) y el resto 75% negativas (background).
# Usamos esta distribución porque es lo que da mejores resultados en la literatura.
FG_FRACTION = 0.5# ~32 pos + 96 neg 

# BBox delta normalization
# Normalizamos las coordenadas de los bounding boxes para que sean más estables.
#Son valores que usamos fijos para todo el entrenamiento y que se han obtenido empíricamente.
#Son un estándar en la literatura.
BBOX_MEANS = torch.tensor([0.0, 0.0, 0.0, 0.0], dtype=torch.float32)
BBOX_STDS  = torch.tensor([0.1, 0.1, 0.2, 0.2], dtype=torch.float32)

# Proposals
# Tomamos hasta 2000 propuestas por imagen.
MAX_PROPOSALS_PER_IMG = 2000
# Las propuestas mínimas tienen 16px de lado.
MIN_SIZE = 16

print(f"Num clases: {NUM_CLASSES}")
print(f"Sampler: {ROIS_PER_IMG} (FG {int(ROIS_PER_IMG*FG_FRACTION)}, BG {ROIS_PER_IMG-int(ROIS_PER_IMG*FG_FRACTION)})")
print(f"PosIoU≥{POS_IOU_THRESH}, NegIoU≤{NEG_IOU_THRESH}")

Num clases: 21
Sampler: 256 (FG 128, BG 128)
PosIoU≥0.3, NegIoU≤0.3


Ahora necesitamos redimensionar las imagenes, mantendremos un tamaño fijo para el lado más corto (600) y uno variable de maximo 1000 para el lado largo, de desta manera podemos mantener el factor de escala, este facto de escala tendremos que aplicarlo también a los BBs para que la proyección sea coherente.

In [5]:

from PIL import Image

#Definimos el tamaño fijo para el lado más corto y el máximo para el lado largo
short_side = 600
max_side = 1000

#Obtenemos la media y la desviación estándar de ResNet para la normalización
R50_MEAN = resnet_transforms.mean
R50_STD = resnet_transforms.std
print(R50_MEAN, R50_STD)

#Llamaremos a esta función para preprocesar las imágenes
def preprocess_image(img, boxes, device=device, short_side=short_side, max_side=max_side):
    
    #Objeto PIL de la imagen
    w, h = img.size #Ej (500, 333)
    #Obtenemos el tamaño más corto y el más largo de la imagen
    actual_short = min(w, h) #Ej 333
    actual_long  = max(w, h) #Ej 500

    scale_multiplier = short_side / actual_short
    
    target_long = actual_long * scale_multiplier

    if target_long > max_side:
        scale_multiplier = max_side / actual_long #porque max_side = actual_long * scale_multiplier

    #Calculamos el nuevo tamaño de la imagen
    w = int(w * scale_multiplier)
    h = int(h * scale_multiplier)

    #Redimensionamos la imagen
    img_resized = img.resize((w, h), Image.BILINEAR)

    #Ajustamos los BBs
    #Los BBs vienen en formato (xmin, ymin, xmax, ymax)
    #Es un tensor de 4 columnas, cada fila es un BB
    #Multiplicamos cada columna por el factor de escala
    boxes = boxes * scale_multiplier

    #Convertimos la imagen a tensor
    img_t = to_tensor(img_resized)

    #Normalizamos la imagen 
    img_t_norm = normalize(img_t, R50_MEAN, R50_STD)

    #Los boxes NO se normalizan, se mantienen en las coordenadas originales porque son relativas a la imagen.
    
    return img_t_norm, boxes, scale_multiplier, img_resized



[0.485, 0.456, 0.406] [0.229, 0.224, 0.225]


Bien, ahora, necesitamos una funcion que reciba una imagen  y extraiga las bbs de esa imagen en un tensor.

In [6]:
CLASS_TO_IDX = {c: i for i, c in enumerate(CLASSES)}

def extract_gt_boxes_and_labels(annotation):
    objs = annotation["annotation"]["object"]
    if isinstance(objs, dict):
        objs = [objs]

    boxes = []
    labels = []
    for o in objs:
        bb = o["bndbox"]
        xmin = float(bb["xmin"])
        ymin = float(bb["ymin"])
        xmax = float(bb["xmax"])
        ymax = float(bb["ymax"])
        boxes.append([xmin, ymin, xmax, ymax])

        cls_name = o["name"]
        labels.append(CLASS_TO_IDX[cls_name])

    return (
        torch.tensor(boxes, dtype=torch.float32),
        torch.tensor(labels, dtype=torch.int64)
)


Ahora tendremos que crear una funcion que calcule los proposals con selective search de una imagen ya preprocesada

In [7]:


def get_selective_proposals(img_pil, max_proposals=MAX_PROPOSALS_PER_IMG, min_size=MIN_SIZE):
    img_np = np.array(img_pil)

    #Llamamos a selective search
    _, regions = selectivesearch.selective_search(
        img_np,
        scale=500,
        sigma=0.9,
        min_size=10
    )

    #Evitamos duplicados
    seen = set()
    #Guardamos las propuestas
    proposals = []

    #Recorremos las regiones
    for r in regions:
        #Obtenemos las coordenadas de la región
        x, y, w, h = r["rect"]

        #Evitamos duplicados
        if (x, y, w, h) in seen:
            continue

        #Añadimos la región a las ya vistas
        seen.add((x, y, w, h))

        #Evitamos regiones demasiado pequeñas
        if w < min_size or h < min_size:
            continue

        #Añadimos la región a las propuestas
        proposals.append([x, y, x + w, y + h])

        #Si tenemos más propuestas que el máximo, salimos
        if len(proposals) >= max_proposals:
            break

    #Si no hay propuestas, devolvemos un tensor de 0 filas y 4 columnas
    # A veces ss no encuentra ninguna región, por lo que devolvemos un tensor de 0 filas y 4 columnas
    # Si no lo hicieramos, explotaría el pipeline.
    if len(proposals) == 0:
        return torch.zeros((0, 4), dtype=torch.float32)

    #Devolvemos las propuestas
    return torch.tensor(proposals, dtype=torch.float32)







Implementamos IoU

In [8]:
import torch
import numpy as np

def get_iou(box1, box2):

    inter_x1 = max(box1[0], box2[0]) 
    inter_y1 = max(box1[1], box2[1]) 
    inter_x2 = min(box1[2], box2[2]) 
    inter_y2 = min(box1[3], box2[3])     

    inter_w = max(0, inter_x2 - inter_x1)
    inter_h = max(0, inter_y2 - inter_y1)
    inter_area = inter_w * inter_h

    # áreas de las cajas
    area1 = max(0, box1[2] - box1[0]) * max(0, box1[3] - box1[1])
    area2 = max(0, box2[2] - box2[0]) * max(0, box2[3] - box2[1])
    
    union = area1 + area2 - inter_area
    if union == 0:
        return 0.0

    return inter_area / union #IoU


def iou_with_many(box, gt_boxes):
    ious = []

    for gt_box in gt_boxes:
        ious.append(get_iou(box, gt_box))
    return torch.tensor(ious)





Implementamos una funcion para poder computar la matriz de proposals y gt_boxes

In [9]:
# proposals: tensor [P,4] en coords de la imagen reescalada
# boxes_t:  tensor [K,4] GT reescaladas (lo que ya tenías)

def compute_ious_matrix(proposals, gt_boxes):
    if proposals.numel() == 0 or gt_boxes.numel() == 0:
        return torch.zeros((proposals.shape[0], gt_boxes.shape[0]), dtype=torch.float32, device=proposals.device)

    proposals = proposals.to(gt_boxes.device)

    p = proposals[:, None, :]   # [P,1,4]
    g = gt_boxes[None, :, :]    # [1,K,4]

    inter_x1 = torch.maximum(p[..., 0], g[..., 0])
    inter_y1 = torch.maximum(p[..., 1], g[..., 1])
    inter_x2 = torch.minimum(p[..., 2], g[..., 2])
    inter_y2 = torch.minimum(p[..., 3], g[..., 3])

    inter_w = (inter_x2 - inter_x1).clamp(min=0)
    inter_h = (inter_y2 - inter_y1).clamp(min=0)
    inter_area = inter_w * inter_h

    area_p = ((p[..., 2] - p[..., 0]).clamp(min=0) *
              (p[..., 3] - p[..., 1]).clamp(min=0))
    area_g = ((g[..., 2] - g[..., 0]).clamp(min=0) *
              (g[..., 3] - g[..., 1]).clamp(min=0))

    union = area_p + area_g - inter_area
    ious = inter_area / union.clamp(min=1e-6)

    return ious





Ahora toca: clasificar proposals como FG/BG en base al IoU y ver cuántas tienes de cada.


In [10]:

def get_fg_bg_inds(maxiou, proposals=None, verbose=False):
    #Esto compara cada proposal con cada GT y crea un tensor de booleanos.
    fg_mask = maxiou >= POS_IOU_THRESH
    bg_mask = maxiou <  NEG_IOU_THRESH
    ig_mask = (maxiou >= NEG_IOU_THRESH) & (maxiou < POS_IOU_THRESH)

    #Esto obtiene los índices de los proposals que cumplen la máscara, hacemos squeeze(1) para que sea un tensor 1D, ya que torch.nonzero() devuelve un tensor 2D.
    fg_inds = torch.nonzero(fg_mask).squeeze(1)
    bg_inds = torch.nonzero(bg_mask).squeeze(1)
    ig_inds = torch.nonzero(ig_mask).squeeze(1)


    return fg_inds, bg_inds, ig_inds


 Vamos a preparar nuestro backbone; cortaremos en layer3 para quedarnos con un stride de 16, lo elegimos porque es el valor estandar para Fast RCNN (mejor RoiPooling)

El stride lo podemos calcular dividiendo la dimensión espacial de la imagen de entrada entre la dimensión espacial del feature map resultante.

In [11]:
from torchvision import models, transforms
from torch import nn
from torchsummary import summary


#Importamos el modelo de ResNet50
backbone = nn.Sequential(*list(models.resnet50(weights=weights).children())).to(device)
#Creamos un tensor de prueba, usamos 
x = torch.randn(1, 3, 512, 512).to(device)
feat = x.clone()

#Calculamos el stride de cada capa
for name, module in backbone.named_children():
    feat = module(feat)
    current_stride = 512 / feat.shape[2]
    print(name, feat.shape, "stride acumulado =", current_stride)
    if ( current_stride == 16):
        break
BACKBONE_STRIDE = int(name) + 1 #Sumamos 1 por que empieza en 0
print(current_stride, BACKBONE_STRIDE)




0 torch.Size([1, 64, 256, 256]) stride acumulado = 2.0
1 torch.Size([1, 64, 256, 256]) stride acumulado = 2.0
2 torch.Size([1, 64, 256, 256]) stride acumulado = 2.0
3 torch.Size([1, 64, 128, 128]) stride acumulado = 4.0
4 torch.Size([1, 256, 128, 128]) stride acumulado = 4.0
5 torch.Size([1, 512, 64, 64]) stride acumulado = 8.0
6 torch.Size([1, 1024, 32, 32]) stride acumulado = 16.0
16.0 7


Ahora vamos a cortar la red por donde vimos.

In [12]:

#Cortamos el backbone en el layer3 que es donde tenemos stride 16 que vimos antes.

backbone = nn.Sequential(*list(models.resnet50(weights=weights).children())[:BACKBONE_STRIDE]).to(device)


# Para una primera fase de entrenamiento congelamos los parametros del backbone
if trainFreeze:
    for p in backbone.parameters():
        p.requires_grad = False

    # Descongelamos solo los últimos 2 módulos del backbone truncado
    # Ajustar [-1:], [-3:], segun cuantas capas queremos descongelar
    for m in list(backbone.children())[-2:]:
        for p in m.parameters():
            p.requires_grad = True
#Cuando queramos reentrenar el modelo, tenemos que descongelar algunas capas del backbone
elif trainFineTune:
    # Primero congelamos todo
    for p in backbone.parameters():
        p.requires_grad = False

    # Descongelamos solo los últimos 2 módulos del backbone truncado
    # Ajustar [-1:], [-3:], segun cuantas capas queremos descongelar
    for m in list(backbone.children())[-2:]:
        for p in m.parameters():
            p.requires_grad = True


### ROIAlign: qué hace y por qué se usa

En Fast R-CNN original se usaba ROI Pooling, que tenía un problema importante: **redondeaba las coordenadas** de las ROIs al proyectarlas sobre el feature map. Ese redondeo generaba errores de alineamiento y hacía que las features no correspondieran exactamente con la región real del objeto.

**ROIAlign** elimina ese problema.

Dado un feature map de la CNN:
[C, Hf, Wf]

y una ROI en coordenadas de imagen:
(x1, y1, x2, y2)

ROIAlign sigue estos pasos:

1. Proyecta la ROI al feature map dividiendo por el stride (≈16 en ResNet50-C4):
   (x1/16, y1/16, x2/16, y2/16)

2. Recorta exactamente esa región del feature map **sin redondear**.

3. Remuestrea la región usando **interpolación bilineal** para mantener continuidad espacial.

4. Devuelve un tensor de tamaño fijo, por ejemplo:
   [C, 7, 7]

El resultado final de procesar N ROIs es:
[N, C, 7, 7]

Esto permite alimentar capas fully connected con bloques de tamaño constante, independientemente del tamaño original de cada ROI, y garantiza alineamiento preciso entre la imagen y las features.


In [13]:
# 
from torchvision.ops import roi_align

ROI_OUTPUT_SIZE = (7, 7)
SPATIAL_SCALE = 1.0 / BACKBONE_STRIDE
ROI_SAMPLING_RATIO = 2

def build_rois_from_proposals(proposals, batch_idx=0):
    if not torch.is_tensor(proposals):
        proposals = torch.as_tensor(proposals, dtype=torch.float32)
    batch_inds = torch.full((proposals.shape[0], 1), batch_idx, dtype=proposals.dtype, device=proposals.device)
    rois = torch.cat([batch_inds, proposals], dim=1)
    return rois

def extract_roi_features(feat_map, rois):
    return roi_align(
        feat_map,
        rois,
        output_size=ROI_OUTPUT_SIZE,
        spatial_scale=SPATIAL_SCALE,
        sampling_ratio=ROI_SAMPLING_RATIO,
        aligned=True,
    )

Vamos a crear la funcion que crea los bbox targets

Los bbtargets son la operación que hay que aplicar a la proposal para que se convierta en su GT correspondiente.

In [14]:
def compute_bbox_targets(proposals, gt_boxes, means, stds):
    p = proposals
    g = gt_boxes

    px = (p[:, 0] + p[:, 2]) / 2
    py = (p[:, 1] + p[:, 3]) / 2
    pw = (p[:, 2] - p[:, 0]).clamp(min=1e-6)
    ph = (p[:, 3] - p[:, 1]).clamp(min=1e-6)

    gx = (g[:, 0] + g[:, 2]) / 2
    gy = (g[:, 1] + g[:, 3]) / 2
    gw = (g[:, 2] - g[:, 0]).clamp(min=1e-6)
    gh = (g[:, 3] - g[:, 1]).clamp(min=1e-6)

    tx = (gx - px) / pw
    ty = (gy - py) / ph
    tw = torch.log(gw / pw)
    th = torch.log(gh / ph)

    deltas = torch.stack([tx, ty, tw, th], dim=1)

    if isinstance(means, torch.Tensor):
        means_t = means.to(deltas.device, deltas.dtype)
    else:
        means_t = torch.tensor(means, device=deltas.device, dtype=deltas.dtype)

    if isinstance(stds, torch.Tensor):
        stds_t = stds.to(deltas.device, deltas.dtype)
    else:
        stds_t = torch.tensor(stds, device=deltas.device, dtype=deltas.dtype)

    return (deltas - means_t) / stds_t


Vamos a construir nuestro pipeline usando los metodos definidos, procesará una imagen por vez
 antes de pasar a la RCNN como tal.

In [15]:
def decode_boxes(proposals, bbox_deltas, means=BBOX_MEANS, stds=BBOX_STDS):
    """
    Convierte las transformaciones predichas (bbox_deltas) en coordenadas absolutas de cajas.
    
    Es la operación INVERSA de compute_bbox_targets.
    
    Args:
        proposals: tensor [N, 4] con formato (x1, y1, x2, y2) - cajas originales
        bbox_deltas: tensor [N, 4] con formato (dx, dy, dw, dh) - transformaciones predichas por el modelo
        means: tensor [4] - medias usadas en la normalización
        stds: tensor [4] - desviaciones estándar usadas en la normalización
        
    Returns:
        tensor [N, 4] con formato (x1, y1, x2, y2) - cajas finales predichas
    """
    # Denormalizar los deltas
    if isinstance(means, torch.Tensor):
        means_t = means.to(bbox_deltas.device, bbox_deltas.dtype)
    else:
        means_t = torch.tensor(means, device=bbox_deltas.device, dtype=bbox_deltas.dtype)
    
    if isinstance(stds, torch.Tensor):
        stds_t = stds.to(bbox_deltas.device, bbox_deltas.dtype)
    else:
        stds_t = torch.tensor(stds, device=bbox_deltas.device, dtype=bbox_deltas.dtype)
    
    # Revertir la normalización: deltas_real = deltas_norm * std + mean
    bbox_deltas = bbox_deltas * stds_t + means_t
    
    # Extraer centro y dimensiones de las proposals
    px = (proposals[:, 0] + proposals[:, 2]) / 2
    py = (proposals[:, 1] + proposals[:, 3]) / 2
    pw = (proposals[:, 2] - proposals[:, 0]).clamp(min=1e-6)
    ph = (proposals[:, 3] - proposals[:, 1]).clamp(min=1e-6)
    
    # Extraer los deltas predichos
    dx = bbox_deltas[:, 0]
    dy = bbox_deltas[:, 1]
    dw = bbox_deltas[:, 2]
    dh = bbox_deltas[:, 3]
    
    # Aplicar las transformaciones inversas:
    # tx = (gx - px) / pw  -->  gx = px + tx * pw
    # ty = (gy - py) / ph  -->  gy = py + ty * ph
    # tw = log(gw / pw)    -->  gw = pw * exp(tw)
    # th = log(gh / ph)    -->  gh = ph * exp(th)
    
    pred_cx = px + dx * pw
    pred_cy = py + dy * ph
    pred_w = pw * torch.exp(dw)
    pred_h = ph * torch.exp(dh)
    
    # Convertir de (cx, cy, w, h) a (x1, y1, x2, y2)
    x1 = pred_cx - pred_w / 2
    y1 = pred_cy - pred_h / 2
    x2 = pred_cx + pred_w / 2
    y2 = pred_cy + pred_h / 2
    
    return torch.stack([x1, y1, x2, y2], dim=1)


In [16]:
def build_sample(img, ann):
    gt_boxes, gt_labels = extract_gt_boxes_and_labels(ann)

    img_t, gt_boxes_t, scale, img_resized = preprocess_image(img, gt_boxes)

    filename = ann["annotation"]["filename"]

    if filename in PROPOSALS_CACHE:
        proposals_t = PROPOSALS_CACHE[filename].to(device)
    else:
        proposals = get_selective_proposals(img_resized)
        proposals_t = torch.as_tensor(proposals, dtype=torch.float32)
        PROPOSALS_CACHE[filename] = proposals_t.cpu()
        proposals_t = proposals_t.to(device)

    ious = compute_ious_matrix(proposals_t, gt_boxes_t.to(device))
    max_iou, gt_idx = ious.max(dim=1)

    fg_inds, bg_inds, ig_inds = get_fg_bg_inds(max_iou, proposals_t, verbose=False)

    num_fg_total = fg_inds.numel()
    num_bg_total = bg_inds.numel()

    num_fg = min(int(ROIS_PER_IMG * FG_FRACTION), num_fg_total)
    num_bg = min(ROIS_PER_IMG - num_fg, num_bg_total)

    if num_fg > 0:
        fg_sampled = fg_inds[torch.randperm(num_fg_total, device=device)[:num_fg]]
    else:
        fg_sampled = torch.empty(0, dtype=torch.long, device=device)

    if num_bg > 0:
        bg_sampled = bg_inds[torch.randperm(num_bg_total, device=device)[:num_bg]]
    else:
        bg_sampled = torch.empty(0, dtype=torch.long, device=device)

    if fg_sampled.numel() == 0 and bg_sampled.numel() == 0:
        return None

    sampled_inds = torch.cat([fg_sampled, bg_sampled], dim=0)
    proposals_sampled = proposals_t[sampled_inds]

    rois = build_rois_from_proposals(proposals_sampled, batch_idx=0)

    img_t = img_t.unsqueeze(0).to(device)

    if trainFreeze:
        feat_map = backbone(img_t)
        roi_feats = extract_roi_features(feat_map, rois)
    elif trainFineTune:
        feat_map = backbone(img_t)
        roi_feats = extract_roi_features(feat_map, rois)

    gt_idx_sampled = gt_idx[sampled_inds]

    return roi_feats, proposals_sampled, gt_idx_sampled, gt_labels, gt_boxes_t, num_fg


Hagamos un resumen antes de definir el pipeline completo:

- **roi_feats**  
  Features de cada ROI después de ROIAlign.  
  Shape: `[N_rois, C, 7, 7]`.  
  Orden: primero todas las ROIs FG, luego todas las BG.

- **proposals_sampled**  
  Cajas de esas mismas ROIs, en coordenadas de la imagen preprocesada.  
  Shape: `[N_rois, 4]`.  
  Mismo orden que `roi_feats`.

- **gt_idx_sampled**  
  Para cada ROI, índice de la GT con mayor IoU.  
  Shape: `[N_rois]`.  
  Mismo orden que `roi_feats`.  
  Las ROIs BG también tienen un `gt_idx_sampled[i]`, aunque solo se usa realmente para FG.

- **gt_labels**  
  Clases de cada GT box.  
  Shape: `[num_gt]`.  
  Valores típicos: `1..NUM_CLASSES-1` (sin background).

- **gt_boxes_t**  
  Cajas GT reescaladas a la imagen preprocesada.  
  Shape: `[num_gt, 4]`.

- **num_fg**  
  Número de ROIs positivas (FG) dentro de esos `N_rois`.  
  Interpretación: índices `[0 .. num_fg-1]` → FG; índices `[num_fg .. N_rois-1]` → BG.


Ahora vamos as definir nuestra HEAD para clasificar las features que sacamos antes
Como tenemos dos salidas (regresion de las bboxes y clasificacion de los scores)
Necesitaremos dos tipos de calculo de error.

In [17]:
import torch
import torch.nn as nn
import torch.nn.functional as F



class FastRCNNHead(nn.Module):
    def __init__(self, in_channels, num_classes):
        super().__init__()
        
        # Parte común de la red
        self.fc1 = nn.Linear(in_channels * 7 * 7, 1024) 
        self.fc2 = nn.Linear(1024, 1024)

        # Parte de clasificación de los labels de las proposiciones
        self.cls_score = nn.Linear(1024, num_classes)

        # Parte de regresión de las bounding boxes
        self.bbox_pred = nn.Linear(1024, 4)

    def forward(self, x):
        x = x.flatten(1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        #Forward de la clasificación
        scores = self.cls_score(x)
        #Forward de la regresión
        bbox_deltas = self.bbox_pred(x)
        return scores, bbox_deltas


In [18]:
IN_CHANNELS = 1024

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

rcnn_head = FastRCNNHead(IN_CHANNELS, NUM_CLASSES).to(device)

#Cargamos el checkpoint si existe
if trainFineTune:
    ckpt = torch.load(CKPT_PATH_FROZEN, map_location=device)
    rcnn_head.load_state_dict(ckpt["model_state_dict"])
    print(f"[CKPT] Loaded frozen head from epoch {ckpt['epoch']} (best_val_loss={ckpt['best_val_loss']:.4f})")
else:
    print("[CKPT] No frozen checkpoint found, fine-tune will start from random head")


cls_criterion = nn.CrossEntropyLoss()
reg_criterion = nn.SmoothL1Loss()


if trainFreeze:
    params_backbone = [p for p in backbone.parameters() if p.requires_grad]
    params_head = list(rcnn_head.parameters())

    optimizer = torch.optim.Adam(
        [
            {"params": params_backbone, "lr": 1e-5},
            {"params": params_head, "lr": 1e-4},
        ]
    )
elif trainFineTune:
    params_backbone = [p for p in backbone.parameters() if p.requires_grad]
    params_head = list(rcnn_head.parameters())

    optimizer = torch.optim.Adam(
        [
            {"params": params_backbone, "lr": 1e-5},
            {"params": params_head, "lr": 1e-4},
        ]
    )

[CKPT] No frozen checkpoint found, fine-tune will start from random head


Ahora definimos el loop general para procesar todas las imagenes de train_ds

In [19]:


def voc_collate(batch):
    img, ann = batch[0]
    return img, ann

In [36]:
from torch.utils.data import DataLoader

RED = "\033[91m"
RESET = "\033[0m"

LOG_INTERVAL = 10

train_loader = DataLoader(
    train_ds,
    batch_size=1,
    shuffle=True,
    num_workers=4,
    collate_fn=voc_collate,
    pin_memory=True,
    persistent_workers=True
)

def TrainPipeline(max_iters=None):
    running_cls = 0.0
    running_reg = 0.0
    running_total = 0.0
    step = 0

    for i, (img, ann) in enumerate(train_loader, start=1):
        if max_iters is not None and i > max_iters:
            break

        out = build_sample(img, ann)
        if out is None:
            continue

        roi_feats, proposals_sampled, gt_idx_sampled, gt_labels, gt_boxes_t, num_fg = out

        device_ = proposals_sampled.device

        gt_labels = gt_labels.to(device_)
        gt_idx_sampled = gt_idx_sampled.to(device_)
        gt_boxes_t = gt_boxes_t.to(device_)
        proposals_sampled = proposals_sampled.to(device_)
        roi_feats = roi_feats.to(device_)

        roi_labels = torch.zeros_like(gt_idx_sampled, dtype=torch.long)
        if num_fg > 0:
            roi_labels[:num_fg] = gt_labels[gt_idx_sampled[:num_fg]]

        bbox_targets = torch.zeros_like(proposals_sampled, dtype=torch.float32)
        if num_fg > 0:
            fg_props = proposals_sampled[:num_fg]
            fg_gt = gt_boxes_t[gt_idx_sampled[:num_fg]]
            bbox_targets[:num_fg] = compute_bbox_targets(
                fg_props,
                fg_gt,
                BBOX_MEANS,
                BBOX_STDS
            )

        scores, bbox_deltas = rcnn_head(roi_feats)

        loss_cls = cls_criterion(scores, roi_labels)
        if num_fg > 0:
            loss_reg = reg_criterion(bbox_deltas[:num_fg], bbox_targets[:num_fg])
        else:
            loss_reg = torch.tensor(0.0, device=device_)

        loss_total = loss_cls + loss_reg

        optimizer.zero_grad()
        loss_total.backward()
        optimizer.step()

        running_cls += loss_cls.item()
        running_reg += loss_reg.item()
        running_total += loss_total.item()
        step += 1

        print(f"[TRAIN] iter={i} step={step} cls={loss_cls.item():.4f} reg={loss_reg.item():.4f} total={loss_total.item():.4f}")

        if step % LOG_INTERVAL == 0:
            mean_cls = running_cls / step
            mean_reg = running_reg / step
            mean_total = running_total / step

            print(
                f"{RED}[LOG] step={step} | mean_cls={mean_cls:.4f} | mean_reg={mean_reg:.4f} | mean_total={mean_total:.4f}{RESET}"
            )

    if step > 0:
        mean_cls = running_cls / step
        mean_reg = running_reg / step
        mean_total = running_total / step
        print(
            f"{RED}[EPOCH] steps={step} | mean_cls={mean_cls:.4f} | mean_reg={mean_reg:.4f} | mean_total={mean_total:.4f}{RESET}"
        )


Definimos un pipeline similar para validacion

In [34]:


test_loader = DataLoader(
    test_ds,
    batch_size=1,
    shuffle=True,
    num_workers=4,
    collate_fn=voc_collate,
    pin_memory=True,
    persistent_workers=True
)

BLUE = "\033[94m"
RESET = "\033[0m"

def EvalPipeline(dataloader=test_loader, max_iters=None):
    rcnn_head.eval()
    backbone.eval()

    running_cls = 0.0
    running_reg = 0.0
    running_total = 0.0
    step = 0

    with torch.no_grad():
        for i, (img, ann) in enumerate(dataloader, start=1):
            if max_iters is not None and i > max_iters:
                break

            out = build_sample(img, ann)
            if out is None:
                continue

            roi_feats, proposals_sampled, gt_idx_sampled, gt_labels, gt_boxes_t, num_fg = out

            device_ = proposals_sampled.device

            gt_labels = gt_labels.to(device_)
            gt_idx_sampled = gt_idx_sampled.to(device_)
            gt_boxes_t = gt_boxes_t.to(device_)
            proposals_sampled = proposals_sampled.to(device_)
            roi_feats = roi_feats.to(device_)

            roi_labels = torch.zeros_like(gt_idx_sampled, dtype=torch.long)
            if num_fg > 0:
                roi_labels[:num_fg] = gt_labels[gt_idx_sampled[:num_fg]]

            bbox_targets = torch.zeros_like(proposals_sampled, dtype=torch.float32)
            if num_fg > 0:
                fg_props = proposals_sampled[:num_fg]
                fg_gt = gt_boxes_t[gt_idx_sampled[:num_fg]]
                bbox_targets[:num_fg] = compute_bbox_targets(
                    fg_props,
                    fg_gt,
                    BBOX_MEANS,
                    BBOX_STDS
                )

            scores, bbox_deltas = rcnn_head(roi_feats)

            loss_cls = cls_criterion(scores, roi_labels)

            if num_fg > 0:
                bbox_deltas_fg = bbox_deltas[:num_fg]
                bbox_targets_fg = bbox_targets[:num_fg]
                loss_reg = reg_criterion(bbox_deltas_fg, bbox_targets_fg)
            else:
                loss_reg = torch.tensor(0.0, device=device_)

            loss_total = loss_cls + loss_reg

            running_cls += loss_cls.item()
            running_reg += loss_reg.item()
            running_total += loss_total.item()
            step += 1

            if step % 100 == 0:
                print(f"[VAL-STEP] iter={i} step={step}")

    rcnn_head.train()
    backbone.train()

    if step == 0:
        return {"val_loss_cls": None, "val_loss_reg": None, "val_loss_total": None}

    mean_cls = running_cls / step
    mean_reg = running_reg / step
    mean_total = running_total / step

    print(
        f"{BLUE}[VAL] steps={step} | val_cls={mean_cls:.4f} | val_reg={mean_reg:.4f} | val_total={mean_total:.4f}{RESET}"
    )

    return {
        "val_loss_cls": mean_cls,
        "val_loss_reg": mean_reg,
        "val_loss_total": mean_total
    }



In [22]:
YELLOW = "\033[93m"
RESET = "\033[0m"
import pickle
#Entrenamos el modelo congelado si trainFreeze es True
if trainFreeze:
    
    best_val_loss = float("inf")
    best_epoch = -1

    num_epochs = 10

    for epoch in range(num_epochs):
        print(f"{YELLOW}\n=== Epoch {epoch+1}/{num_epochs} ==={RESET}")

        TrainPipeline()  # sin max_iters → una época completa sobre train_ds

        val_metrics = EvalPipeline(max_iters=50)
        print(val_metrics)

        val_total = val_metrics["val_loss_total"]
        if val_total is not None and val_total < best_val_loss:
            best_val_loss = val_total
            best_epoch = epoch + 1
            torch.save(
                {
                    "epoch": best_epoch,
                    "backbone_state_dict": backbone.state_dict(),
                    "rcnn_head_state_dict": rcnn_head.state_dict(),
                    "optimizer_state_dict": optimizer.state_dict(),
                    "best_val_loss": best_val_loss,
                },
                CKPT_PATH_FROZEN,
            )
            print(f"{BLUE}[CKPT] Nuevo mejor modelo en epoch {best_epoch} con val_total={val_total:.4f}{RESET}")
        
        #Guardamos el cache de proposals para no tener que calcularlo de nuevo
        with open("./checkpoints_fast_rcnn/proposals_cache.pkl", "wb") as f:
            pickle.dump(PROPOSALS_CACHE, f)


[93m
=== Epoch 1/10 ===[0m
[TRAIN] iter=1 step=1 cls=3.0329 reg=1.2212 total=4.2541
[TRAIN] iter=2 step=2 cls=2.5231 reg=1.0220 total=3.5451
[TRAIN] iter=3 step=3 cls=2.0963 reg=0.8661 total=2.9624
[TRAIN] iter=4 step=4 cls=2.5383 reg=1.3941 total=3.9324
[TRAIN] iter=5 step=5 cls=2.6176 reg=1.4503 total=4.0678
[TRAIN] iter=6 step=6 cls=2.4868 reg=1.4463 total=3.9331
[TRAIN] iter=7 step=7 cls=2.4012 reg=1.0831 total=3.4843
[TRAIN] iter=8 step=8 cls=2.1938 reg=1.4332 total=3.6270
[TRAIN] iter=9 step=9 cls=1.8420 reg=1.5174 total=3.3594
[TRAIN] iter=10 step=10 cls=2.2048 reg=0.8060 total=3.0108
[91m[LOG] step=10 | mean_cls=2.3937 | mean_reg=1.2240 | mean_total=3.6176[0m
[TRAIN] iter=11 step=11 cls=2.6845 reg=1.1055 total=3.7900
[TRAIN] iter=12 step=12 cls=2.6904 reg=1.0026 total=3.6930
[TRAIN] iter=13 step=13 cls=2.4784 reg=0.5334 total=3.0118
[TRAIN] iter=14 step=14 cls=2.6776 reg=2.0569 total=4.7345
[TRAIN] iter=15 step=15 cls=2.1458 reg=0.5465 total=2.6924
[TRAIN] iter=16 step=16 c



[94m[VAL] steps=50 | val_cls=0.4575 | val_reg=1.1167 | val_total=1.5742[0m
{'val_loss_cls': 0.4574515423178673, 'val_loss_reg': 1.1167284306883811, 'val_loss_total': 1.5741799771785736}
[94m[CKPT] Nuevo mejor modelo en epoch 1 con val_total=1.5742[0m
[93m
=== Epoch 2/10 ===[0m
[TRAIN] iter=1 step=1 cls=0.4291 reg=1.6152 total=2.0443
[TRAIN] iter=2 step=2 cls=0.5199 reg=1.3807 total=1.9006
[TRAIN] iter=3 step=3 cls=0.4604 reg=1.0519 total=1.5123
[TRAIN] iter=4 step=4 cls=0.2247 reg=0.9486 total=1.1734
[TRAIN] iter=5 step=5 cls=0.3351 reg=1.1239 total=1.4590
[TRAIN] iter=6 step=6 cls=0.1010 reg=0.9045 total=1.0055
[TRAIN] iter=7 step=7 cls=0.5851 reg=0.8861 total=1.4712
[TRAIN] iter=8 step=8 cls=0.3805 reg=0.8952 total=1.2756
[TRAIN] iter=9 step=9 cls=0.8168 reg=1.0036 total=1.8204
[TRAIN] iter=10 step=10 cls=0.3674 reg=0.9541 total=1.3216
[91m[LOG] step=10 | mean_cls=0.4220 | mean_reg=1.0764 | mean_total=1.4984[0m
[TRAIN] iter=11 step=11 cls=0.4408 reg=0.6607 total=1.1014
[TRAIN

KeyboardInterrupt: 

In [37]:
ckpt = torch.load("./checkpoints_fast_rcnn/fast_rcnn_head_refined.pth", map_location=device)
backbone.load_state_dict(ckpt["backbone_state_dict"])
rcnn_head.load_state_dict(ckpt["rcnn_head_state_dict"])

# congelar backbone
for p in backbone.parameters():
    p.requires_grad = False

# solo entrenar head
for p in rcnn_head.parameters():
    p.requires_grad = True

optimizer = torch.optim.SGD(
    rcnn_head.parameters(),
    lr=1e-3,
    momentum=0.9,
    weight_decay=1e-4,
)

scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode="min",
    factor=0.1,
    patience=1,
)

best_val_loss = float("inf")
total_epochs = 8

for epoch in range(1, total_epochs + 1):
    TrainPipeline()

    val_loss_total = val_metrics["val_loss_total"]
    print(f"[HEAD-ONLY] epoch={epoch} val_loss_total={val_loss_total:.4f}")

    scheduler.step(val_loss_total)

    if val_loss_total < best_val_loss:
        best_val_loss = val_loss_total
        torch.save(
            {
                "backbone_state_dict": backbone.state_dict(),
                "rcnn_head_state_dict": rcnn_head.state_dict(),
                "best_val_loss": best_val_loss,
            },
            "./checkpoints_fast_rcnn/fast_rcnn_head_refined_2.pth",
        )
        print("[CKPT] nuevo mejor HEAD-ONLY")



[TRAIN] iter=1 step=1 cls=0.3865 reg=0.9601 total=1.3465
[TRAIN] iter=2 step=2 cls=0.0998 reg=0.0910 total=0.1908
[TRAIN] iter=3 step=3 cls=0.5868 reg=1.1548 total=1.7416
[TRAIN] iter=4 step=4 cls=0.6545 reg=1.1317 total=1.7862
[TRAIN] iter=5 step=5 cls=0.2651 reg=1.1661 total=1.4312
[TRAIN] iter=6 step=6 cls=0.4755 reg=0.7056 total=1.1811
[TRAIN] iter=7 step=7 cls=0.6665 reg=1.2268 total=1.8934
[TRAIN] iter=8 step=8 cls=0.5090 reg=1.0983 total=1.6073
[TRAIN] iter=9 step=9 cls=1.2667 reg=1.4840 total=2.7508
[TRAIN] iter=10 step=10 cls=0.4759 reg=1.2013 total=1.6772
[91m[LOG] step=10 | mean_cls=0.5386 | mean_reg=1.0220 | mean_total=1.5606[0m
[TRAIN] iter=11 step=11 cls=0.2542 reg=0.5248 total=0.7790
[TRAIN] iter=12 step=12 cls=0.7465 reg=1.2934 total=2.0399
[TRAIN] iter=13 step=13 cls=0.1365 reg=0.2996 total=0.4361
[TRAIN] iter=14 step=14 cls=0.4278 reg=1.3367 total=1.7644
[TRAIN] iter=15 step=15 cls=0.4238 reg=0.9094 total=1.3331
[TRAIN] iter=16 step=16 cls=0.1919 reg=1.8740 total=2.

KeyboardInterrupt: 