# **Faster R-CNN – Detección de elementos arquitectónicos**

### **Faster R-CNN con Optuna + PreloadedDataset**

**Objetivo:** Detectar puertas, ventanas, balcones y railing  
**Dataset:** Dataset_Final (anotaciones YOLO)  
**Características:**
- Optimización automática de hiperparámetros con Optuna
- Carga de imágenes en RAM para acelerar
- Evaluación con mAP@0.5 y mAP@0.5:0.95
- Guardado de resultados y modelos en carpetas `exp01`, `exp02`, …
- Dashboard de comparación de métricas y pérdidas


In [None]:
# Descomprimir dataset
import zipfile
import os

zip_path = "/kaggle/input/tu_dataset/Dataset_Final.zip"  # ruta donde esta el zip
extract_path = "/kaggle/working/Dataset_Final"  # donde se descomprime

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)


In [None]:
!pip install --upgrade pip --quiet

# PyTorch y Torchvision (GPU compatible)
!pip install torch torchvision torchaudio --quiet

# Optuna para HPO
!pip install optuna --quiet

# PyCOCOTools para métricas de detección de objetos
!pip install pycocotools --quiet

# OpenCV y Matplotlib
!pip install opencv-python matplotlib --quiet


In [None]:
import os
import json

import torch
from torch.utils.data import Dataset, DataLoader
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor  
from torchvision.ops import box_iou

import cv2
import matplotlib.pyplot as plt

import optuna

In [None]:
DEVICE = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print("Device:", DEVICE)

# Si la salida es Device: cuda ya esta Pytorch + cuda funcionando
# No reinstalar Pytorch

In [None]:
# Rutas en Kaggle
DATASET_ROOT = "/kaggle/working/Dataset_Final"

TRAIN_IMG_DIR = os.path.join(DATASET_ROOT, "train/images")
TRAIN_LBL_DIR = os.path.join(DATASET_ROOT, "train/labels")
VAL_IMG_DIR = os.path.join(DATASET_ROOT, "valid/images")
VAL_LBL_DIR = os.path.join(DATASET_ROOT, "valid/labels")
TEST_IMG_DIR = os.path.join(DATASET_ROOT, "test/images")
TEST_LBL_DIR = os.path.join(DATASET_ROOT, "test/labels")

EXPERIMENT_ROOT = "/kaggle/working/Faster_Experiments"  # guardar modelos y logs
os.makedirs(EXPERIMENT_ROOT, exist_ok=True)

NUM_CLASSES = 5  # 4 clases + fondo


In [None]:
import os
import cv2
import torch
from torch.utils.data import Dataset, DataLoader

class PreloadedDataset(Dataset):
    def __init__(self, img_dir, lbl_dir, img_size=640):
        self.img_dir = img_dir
        self.lbl_dir = lbl_dir
        self.img_size = img_size

        self.img_files = sorted([
            f for f in os.listdir(img_dir)
            if f.lower().endswith((".jpg", ".png"))
        ])

        if len(self.img_files) == 0:
            raise RuntimeError("No hay imágenes en el directorio")

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        img_name = self.img_files[idx]

        img_path = os.path.join(self.img_dir, img_name)
        lbl_path = os.path.join(self.lbl_dir, img_name.rsplit(".", 1)[0] + ".txt")

        # -------- Imagen --------
        img = cv2.imread(img_path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

        img = cv2.resize(img, (self.img_size, self.img_size))
        h, w = self.img_size, self.img_size

        img_tensor = torch.from_numpy(img).float().permute(2, 0, 1) / 255.0

        # -------- Labels --------
        boxes, labels = self.load_yolo_labels(lbl_path, w, h)

        target = {
            "boxes": boxes,
            "labels": labels
        }

        return img_tensor, target

    def load_yolo_labels(self, path, w, h):
        boxes = []
        labels = []

        if not os.path.exists(path):
            return (
                torch.zeros((0, 4), dtype=torch.float32),
                torch.zeros((0,), dtype=torch.int64)
            )

        with open(path) as f:
            for line in f:
                parts = line.strip().split()
                cls = int(parts[0])
                x, y, bw, bh = map(float, parts[1:])

                x1 = (x - bw / 2) * w
                y1 = (y - bh / 2) * h
                x2 = (x + bw / 2) * w
                y2 = (y + bh / 2) * h

                boxes.append([x1, y1, x2, y2])
                labels.append(cls)

        if len(boxes) == 0:
            return (
                torch.zeros((0, 4), dtype=torch.float32),
                torch.zeros((0,), dtype=torch.int64)
            )

        return (
            torch.tensor(boxes, dtype=torch.float32),
            torch.tensor(labels, dtype=torch.int64)
        )


In [None]:
# Función para mAP@0.5 y mAP@0.5:0.95
def compute_map50_95(model, data_loader, device):
    model.eval()
    ious_all = {0.5:[], 0.55:[], 0.6:[], 0.65:[], 0.7:[], 0.75:[], 0.8:[], 0.85:[], 0.9:[], 0.95:[]}
    
    with torch.no_grad():
        for imgs, targets in data_loader:
            imgs = [img.to(device) for img in imgs]
            outputs = model(imgs)
            for output, target in zip(outputs, targets):
                if len(output["boxes"]) == 0 or len(target["boxes"]) == 0:
                    continue
                iou = box_iou(output["boxes"].cpu(), target["boxes"])
                for t in ious_all.keys():
                    max_iou_per_gt = iou.max(dim=0)[0]
                    ious_all[t].extend(max_iou_per_gt.tolist())

    map50 = sum([x>=0.5 for x in ious_all[0.5]]) / max(len(ious_all[0.5]),1)
    map50_95 = sum([sum([x>=t for t in ious_all.keys()])/len(ious_all) for x in zip(*ious_all.values())]) / max(len(ious_all[0.5]),1)
    
    return map50, map50_95


In [None]:
# Crear datasets (precarga en RAM)
train_dataset = PreloadedDataset(TRAIN_IMG_DIR, TRAIN_LBL_DIR, img_size=640)
val_dataset   = PreloadedDataset(VAL_IMG_DIR, VAL_LBL_DIR, img_size=640)

# Protección mínima
if len(train_dataset) == 0:
    raise RuntimeError("Train dataset vacío")                                              

if len(val_dataset) == 0:
    print("Warning: Validation dataset vacío")


In [None]:
def objective(trial):
    lr = trial.suggest_float("lr", 1e-5, 1e-3, log=True)
    weight_decay = trial.suggest_float("weight_decay", 1e-5, 1e-2, log=True)
    optimizer_name = trial.suggest_categorical("optimizer", ["Adam", "AdamW"])

    batch_size = 4
    epochs = 8
    
    # Carpeta para guardar los trials
    trial_dir = os.path.join(EXPERIMENT_ROOT, f"trial_{trial.number}")
    os.makedirs(trial_dir, exist_ok=True)

    # DataLoaders
    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=lambda x: tuple(zip(*x)),
        num_workers=4  #num_workers=0  En kaggle por estabilidad
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        collate_fn=lambda x: tuple(zip(*x)),
        num_workers=4  #num_workers=0  En kaggle por estabilidad
    )

    # Modelo
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights="DEFAULT")

    # Congelar backbone
    for param in model.backbone.parameters():
        param.requires_grad = False

    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(
        in_features,
        NUM_CLASSES
    )

    model.to(DEVICE)

    params = [p for p in model.parameters() if p.requires_grad]

    optimizer = (
        torch.optim.Adam(params, lr=lr, weight_decay=weight_decay)
        if optimizer_name == "Adam"
        else torch.optim.AdamW(params, lr=lr, weight_decay=weight_decay)
    )

    # Entrenamiento
    model.train()
    epoch_losses = []

    for epoch in range(1, epochs + 1):
        total_loss = 0.0

        for imgs, targets in train_loader:
            imgs = [img.to(DEVICE) for img in imgs]
            targets = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]

            loss_dict = model(imgs, targets)
            losses = sum(loss for loss in loss_dict.values())

            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

            total_loss += losses.item()

        avg_loss = total_loss / len(train_loader)
        epoch_losses.append(avg_loss)
        
        # Map50 y pruning (estrategia)****
        if epoch % 2 == 0:  # Evaluar map50 cada 2 epochs
            map50, map50_95 = compute_map50_95(model, val_loader, DEVICE)
        
            # Activar pruner solo desde epoch 4 en adelante
            if epoch >= 4:
                trial.report(1 - map50, epoch)
                if trial.should_prune():
                    raise optuna.exceptions.TrialPruned()
        

    # Guardar losses del trial
    with open(os.path.join(trial_dir, "losses.json"), "w") as f:
        json.dump(epoch_losses, f, indent=4)

    # Evaluación
    map50, map50_95 = compute_map50_95(model, val_loader, DEVICE)

    metrics = {
        "map50": map50,
        "map50_95": map50_95,
        "lr": lr,
        "weight_decay": weight_decay,
        "optimizer": optimizer_name
    }

    with open(os.path.join(trial_dir, "metrics.json"), "w") as f:
        json.dump(metrics, f, indent=4)
        
    # Guardar modelo del trial
    torch.save(model.state_dict(), os.path.join(trial_dir, "model.pth"))
    
    return 1 - map50


In [None]:
# Crear estudio con Successive Halving Pruner
pruner = optuna.pruners.SuccessiveHalvingPruner()
study = optuna.create_study(direction="minimize", pruner=pruner)

# Ejecutar Optuna
study.optimize(objective, n_trials=20)

print("Mejor configuración encontrada:")
print(study.best_params)


In [None]:
# Dashboard comparativo entre trials
exp_dirs = sorted([d for d in os.listdir(EXPERIMENT_ROOT) if d.startswith("trial_")])

maps50, maps50_95 = [], []
losses_all = []

for d in exp_dirs:
    with open(os.path.join(EXPERIMENT_ROOT, d, "metrics.json"), "r") as f:
        log = json.load(f)
        maps50.append(log["map50"])
        maps50_95.append(log["map50_95"])

    with open(os.path.join(EXPERIMENT_ROOT, d, "losses.json"), "r") as f:
        loss_log = json.load(f)
        losses_all.append(loss_log)


# Gráfico de pérdida por trial
plt.figure(figsize=(10,5))
for i, loss in enumerate(losses_all):
    plt.plot(loss, label=exp_dirs[i])
plt.title("Pérdida por epoch - todos los experiments")
plt.xlabel("Epoch")
plt.ylabel("Loss")
plt.legend()
plt.show()

# Gráfico mAP@0.5 y mAP@0.5:0.95
plt.figure(figsize=(6,4))
plt.bar(range(len(exp_dirs)), maps50, label="mAP@0.5")
plt.bar(range(len(exp_dirs)), maps50_95, alpha=0.5, label="mAP@0.5:0.95")
plt.xticks(range(len(exp_dirs)), exp_dirs)
plt.ylabel("mAP")
plt.title("Comparativa mAP por experiment")
plt.legend()
plt.show()


In [None]:
# Obtener el mejor trial
best_trial_number = study.best_trial.number

best_model_dir = os.path.join(EXPERIMENT_ROOT, f"trial_{best_trial_number}")

print(f"El mejor trial es trial_{best_trial_number} con los siguientes hiperparámetros:")
print(study.best_params)


In [None]:
# Cargar el modelo entrenado del mejor trial
from torchvision.models.detection import fasterrcnn_resnet50_fpn
import torch

# Inicializar el modelo con la misma arquitectura
model = fasterrcnn_resnet50_fpn(weights=None)

in_features = model.roi_heads.box_predictor.cls_score.in_features

model.roi_heads.box_predictor = FastRCNNPredictor(
        in_features,
        NUM_CLASSES
    )

model.to(DEVICE)

# Cargar los pesos del mejor trial
model.load_state_dict(torch.load(os.path.join(best_model_dir, "model.pth")))
model.eval()


In [None]:
# Evaluar en validación
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

map50, map50_95 = compute_map50_95(model, val_loader, DEVICE)

print(f"Mejor modelo - mAP@0.5: {map50:.4f}")
print(f"Mejor modelo - mAP@0.5:0.95: {map50_95:.4f}")


In [None]:
# Visualización de resultados del mejor modelo
def show_predictions(model, dataset, device, class_names, num_images=5, score_threshold=0.5):
    model.eval()
    for i in range(num_images):
        img, target = dataset[i]
        with torch.no_grad():
            pred = model([img.to(device)])
        
        img_np = img.permute(1,2,0).cpu().numpy()
        plt.figure(figsize=(6,6))
        plt.imshow(img_np)
        
        # dibujar ground-truth
        for box in target["boxes"]:
            x1, y1, x2, y2 = box
            plt.gca().add_patch(plt.Rectangle((x1,y1), x2-x1, y2-y1, edgecolor='green', facecolor='none', linewidth=2))
        
        # dibujar predicciones
        for box, score, label in zip(pred[0]["boxes"].cpu(), pred[0]["scores"].cpu(), pred[0]["labels"].cpu()):
            if score > score_threshold:
                x1, y1, x2, y2 = box
                plt.gca().add_patch(plt.Rectangle((x1,y1), x2-x1, y2-y1, edgecolor='red', facecolor='none', linewidth=2))
                class_name = class_names[label - 1]  # Ajustar según tu dataset
                plt.text(
                    x1, y1 - 5,
                    f"{class_name} ({score:.2f})",
                    color="red",
                    fontsize=9,
                    backgroundcolor="white"
                )
        plt.axis('off')
        plt.show()

class_names = ["door", "window", "balcony", "railing"]  
show_predictions(model, val_dataset, DEVICE, class_names, num_images=5)

In [None]:
plt.savefig(os.path.join(best_model_dir, f"pred_{i}.png"))

Verde → ground-truth (cajas reales)

Rojo → predicciones del modelo

score > 0.5

In [None]:
import torch
from sklearn.metrics import precision_recall_fscore_support, confusion_matrix
import json
import matplotlib.pyplot as plt
import numpy as np
import os

# Función para evaluar otras métricas (Precisión, Recall, F1, Matriz de Confusión)
def evaluate_model(model, dataset, device, score_threshold=0.5, class_names=None):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for img, target in dataset:
            pred = model([img.to(device)])
            pred_labels = pred[0]["labels"].cpu()
            pred_scores = pred[0]["scores"].cpu()

            # Filtrar por score_threshold
            keep = pred_scores > score_threshold
            pred_labels = pred_labels[keep]

            all_preds.extend(pred_labels.tolist())
            all_labels.extend(target["labels"].tolist())

    # Precision, Recall, F1 por clase
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_labels, all_preds, labels=list(range(len(class_names))), zero_division=0
    )

    # Matriz de confusión
    cm = confusion_matrix(all_labels, all_preds, labels=list(range(len(class_names))))

    # Resultados como diccionario
    results = {
        "precision": dict(zip(class_names, precision)),
        "recall": dict(zip(class_names, recall)),
        "f1": dict(zip(class_names, f1)),
        "confusion_matrix": cm.tolist()
    }

    return results

# Evaluar el modelo
class_names = ["door", "window", "balcony", "railing"]
results = evaluate_model(model, val_dataset, DEVICE, score_threshold=0.5, class_names=class_names)

# Guardar resultados en JSON
results_file = os.path.join(best_model_dir, "evaluation_results.json")
with open(results_file, "w") as f:
    json.dump(results, f, indent=4)

print("Resultados guardados en:", results_file)
print(json.dumps(results, indent=4))

# Visualización de la matriz de confusión con matplotlib
cm = np.array(results["confusion_matrix"])
fig, ax = plt.subplots(figsize=(6,5))
im = ax.imshow(cm, cmap="Blues")

# Mostrar valores encima de cada celda
for i in range(len(class_names)):
    for j in range(len(class_names)):
        ax.text(j, i, cm[i, j], ha="center", va="center", color="black")

ax.set_xticks(np.arange(len(class_names)))
ax.set_yticks(np.arange(len(class_names)))
ax.set_xticklabels(class_names)
ax.set_yticklabels(class_names)
ax.set_xlabel("Predicciones")
ax.set_ylabel("Ground-truth")
ax.set_title("Matriz de Confusión del Mejor Modelo")
plt.show()
