# 5. Estrategia: El Ensemble "L√≥gico" (WBF) y Evaluaci√≥n Comparativa

Este notebook implementa la estrategia de combinaci√≥n de modelos utilizando **Weighted Box Fusion (WBF)** y compara sus m√©tricas (mAP, Precisi√≥n, Recall) con los modelos individuales.

### Los Modelos
1. **El Especialista (Acuatico-Nano):** Alta Precisi√≥n, bajo Recall. (Peso: 2)
2. **El Sabueso (Acuatico-Medium):** Bajo Precisi√≥n, alto Recall. (Peso: 1)

### La Estrategia (WBF)
*   **Weighted Box Fusion (WBF)** combina predicciones ponderadas.
*   Prioridad al **Acuatico-Nano** (2:1) para mantener precisi√≥n.
*   El **Acuatico-Medium** mejora el Recall.


In [1]:
import os
import glob
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
from ultralytics import YOLO
from ensemble_boxes import weighted_boxes_fusion
from tqdm.notebook import tqdm

%matplotlib inline

In [2]:
# --- CONFIGURACI√ìN ---
PATH_NANO = 'modelos_entrenados/modelo-acuatico-n.pt'
PATH_MEDIUM = 'modelos_entrenados/modelo-acuatico-m.pt'
DATASET_YAML = 'dataset_yolo/dataset.yml'
TEST_IMAGES_DIR = 'dataset_yolo/images/test'
TEST_LABELS_DIR = 'dataset_yolo/labels/test'
# Par√°metros del Ensemble
WEIGHTS = [2, 1] # [Nano, Medium]
IOU_THR = 0.5    # Umbral de WBF
SKIP_BOX_THR = 0.001
CONF_THR = 0.25

# Verificar rutas
print(f"Modelo Nano: {os.path.exists(PATH_NANO)}")
print(f"Modelo Medium: {os.path.exists(PATH_MEDIUM)}")
print(f"Labels Test: {os.path.exists(TEST_LABELS_DIR)}")

Modelo Nano: True
Modelo Medium: True
Labels Test: True


## 1. Implementaci√≥n del Ensemble (WBF)

In [3]:
def prepare_boxes(results):
    """
    Extrae cajas normalizadas de resultados YOLO.
    Retorna listas de boxes, scores y labels.
    """
    boxes_list = []
    scores_list = []
    labels_list = []

    for result in results:
        boxes = result.boxes.xyxyn.cpu().numpy()
        scores = result.boxes.conf.cpu().numpy()
        labels = result.boxes.cls.cpu().numpy()

        boxes_list.append(boxes)
        scores_list.append(scores)
        labels_list.append(labels)

    return boxes_list, scores_list, labels_list


def run_wbf(preds_nano, preds_medium, weights=[2, 1], iou_thr=0.5, skip_thr=0.001):
    """
    Combina predicciones de dos modelos YOLO usando Weighted Boxes Fusion (WBF).
    """
    # Preparar predicciones
    b1, s1, l1 = prepare_boxes(preds_nano)
    b2, s2, l2 = prepare_boxes(preds_medium)

    # WBF solo funciona con listas de listas -> extraemos el batch 0
    boxes_list = [b1[0], b2[0]]
    scores_list = [s1[0], s2[0]]
    labels_list = [l1[0], l2[0]]

    # Aplicar fusi√≥n ponderada
    boxes, scores, labels = weighted_boxes_fusion(
        boxes_list,
        scores_list,
        labels_list,
        weights=weights,
        iou_thr=iou_thr,
        skip_box_thr=skip_thr
    )

    return boxes, scores, labels


## 2. Herramientas de Evaluaci√≥n Personalizada (mAP Calculator)Dado que el Ensemble no es un modelo YOLO est√°ndar, implementamos un calculador de m√©tricas (mAP50, mAP50-95) para evaluar sus predicciones contra el Ground Truth.

In [4]:
class MAPCalculator:
    """Calculadora simple de mAP para una sola clase (Corrosi√≥n)."""

    def __init__(self):
        self.predictions = []
        self.targets = []

    def update(self, pred_boxes, pred_scores, target_boxes):
        """
        Acumula predicciones y targets por imagen.
        pred_boxes: [[x1, y1, x2, y2], ...]
        target_boxes: [[x1, y1, x2, y2], ...]
        """
        self.predictions.append({'boxes': pred_boxes, 'scores': pred_scores})
        self.targets.append(target_boxes)

    def compute_iou(self, box1, box2):
        """Calcula IoU entre dos cajas (formato x1, y1, x2, y2)."""
        x1 = max(box1[0], box2[0])
        y1 = max(box1[1], box2[1])
        x2 = min(box1[2], box2[2])
        y2 = min(box1[3], box2[3])

        inter_area = max(0, x2 - x1) * max(0, y2 - y1)
        box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
        box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])

        union_area = box1_area + box2_area - inter_area
        return inter_area / union_area if union_area > 0 else 0

    def compute_ap(self, iou_threshold):
        """Calcula Average Precision de forma robusta."""
        all_scores = []
        all_tps = []
        total_gt = 0

        # 1. Aplanar listas de forma segura
        for preds, targets in zip(self.predictions, self.targets):
            total_gt += len(targets)
            if len(preds['boxes']) == 0:
                continue
            
            p_boxes = preds['boxes']
            p_scores = preds['scores']
            
            # Ordenar por imagen para matching
            sorted_idx = np.argsort(-p_scores)
            p_boxes = p_boxes[sorted_idx]
            p_scores = p_scores[sorted_idx]
            
            tp = np.zeros(len(p_boxes))
            detected_gt = np.zeros(len(targets))
            
            for i, p_box in enumerate(p_boxes):
                best_iou = 0
                best_gt_idx = -1
                for j, t_box in enumerate(targets):
                    iou = self.compute_iou(p_box, t_box)
                    if iou > best_iou:
                        best_iou = iou
                        best_gt_idx = j
                
                if best_iou >= iou_threshold and not detected_gt[best_gt_idx]:
                    tp[i] = 1
                    detected_gt[best_gt_idx] = 1
            
            all_scores.extend(p_scores)
            all_tps.extend(tp)

        # 2. Short-circuit si no hay Ground Truth o Predicciones
        if total_gt == 0 or len(all_scores) == 0:
            return 0.0, 0.0, 0.0

        # 3. Conversi√≥n a NumPy expl√≠cita
        all_scores = np.array(all_scores)
        all_tps = np.array(all_tps)

        # Ordenar globalmente
        sorted_indices = np.argsort(-all_scores)
        all_tps = all_tps[sorted_indices]

        # 4. C√°lculo de m√©tricas acumuladas
        cum_tp = np.cumsum(all_tps)
        cum_fp = np.cumsum(1 - all_tps)

        # Evitar divisi√≥n por cero con epsilons
        precision = cum_tp / (cum_tp + cum_fp + 1e-16)
        recall = cum_tp / (total_gt + 1e-16)

        # 5. Construcci√≥n de Centinelas (El punto cr√≠tico)
        # Concatenamos expl√≠citamente y verificamos formas si fuera necesario
        mrec = np.concatenate(([0.0], recall, [1.0]))
        mpre = np.concatenate(([1.0], precision, [0.0]))

        # Compute the precision envelope
        mpre = np.maximum.accumulate(mpre[::-1])[::-1]

        # M√©todo de integraci√≥n (Area Under Curve)
        # Buscamos puntos donde cambia el recall (eje X)
        i = np.where(mrec[1:] != mrec[:-1])[0]

        # (mrec[i + 1] - mrec[i]) es el ancho del paso
        # mpre[i + 1] es la altura (precisi√≥n rectificada)
        ap = np.sum((mrec[i + 1] - mrec[i]) * mpre[i + 1])

        # Extraer m√©tricas puntuales (F1 m√°ximo)
        f1 = 2 * precision * recall / (precision + recall + 1e-16)
        best_idx = np.argmax(f1) if len(f1) > 0 else 0

        # Retornamos el valor del mejor punto operativo
        return ap, precision[best_idx], recall[best_idx]

    def compute_metrics(self):
        """Calcula mAP50 y mAP50-95."""
        print("Calculando m√©tricas...")

        ap50, p50, r50 = self.compute_ap(0.5)

        aps = []
        for thr in np.arange(0.5, 0.96, 0.05):
            ap, _, _ = self.compute_ap(thr)
            aps.append(ap)

        map50_95 = np.mean(aps)
        f1 = 2 * p50 * r50 / (p50 + r50 + 1e-16)

        return {
            'mAP50-95': map50_95,
            'mAP50': ap50,
            'Precisi√≥n': p50,
            'Recall': r50,
            'F1-Score': f1
        }


## 3. Ejecuci√≥n de la Evaluaci√≥n

In [5]:
# Cargar modelos
model_nano = YOLO(PATH_NANO)
model_medium = YOLO(PATH_MEDIUM)# Lista para guardar resultados
resultados = []

In [6]:
def evaluar_modelo_individual(model, nombre):
    """Eval√∫a un modelo individual usando ultralytics."""
    print(f"--- Evaluando {nombre} ---")

    metrics = model.val(data=DATASET_YAML, split='test', verbose=False)

    res = {
        'Modelo': nombre,
        'mAP50-95': metrics.box.map,
        'mAP50': metrics.box.map50,
        'Precisi√≥n': metrics.box.p[0] if len(metrics.box.p) > 0 else 0,
        'Recall': metrics.box.r[0] if len(metrics.box.r) > 0 else 0,
        'F1-Score': metrics.box.f1[0] if len(metrics.box.f1) > 0 else 0
    }

    return res


# 1. Evaluar Individuales
resultados.append(evaluar_modelo_individual(model_nano, 'Acuatico-Nano'))
resultados.append(evaluar_modelo_individual(model_medium, 'Acuatico-Medium'))

--- Evaluando Acuatico-Nano ---
Ultralytics 8.3.228 üöÄ Python-3.9.13 torch-2.8.0+cu128 CPU (AMD Ryzen 7 5800X 8-Core Processor)
YOLOv12n summary (fused): 159 layers, 2,556,923 parameters, 0 gradients, 6.3 GFLOPs
[34m[1mval: [0mFast image access ‚úÖ (ping: 0.0¬±0.0 ms, read: 2199.3¬±268.5 MB/s, size: 27.1 KB)
[K[34m[1mval: [0mScanning /home/user/work/EONSEA/Articulo-Corrosion/dataset_yolo/labels/test... 21 images, 12 backgrounds, 0 corrupt: 100% ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ 33/33 2.8Kit/s 0.0s
[K                 Class     Images  Instances      Box(P          R      mAP50  mAP50-95): 100% ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ 3/3 1.4it/s 2.2s2.0s
                   all         33         52      0.905      0.548      0.735      0.436
Speed: 1.3ms preprocess, 60.1ms inference, 0.0ms loss, 1.4ms postprocess per image
Results saved to [1m/home/user/work/EONSEA/Articulo-Corrosion/runs/detect/val56[0m
--- Evaluando Acuatico-Medium ---
Ultralytics 8.3.228 üöÄ Python-3.9.13

In [8]:
def evaluar_ensemble(model1, model2, dataset_dir, labels_dir):
    """Eval√∫a el Ensemble manualmente utilizando WBF."""
    print("--- Evaluando Ensemble (WBF) ---")

    calc = MAPCalculator()

    # Obtener lista de im√°genes
    image_files = glob.glob(os.path.join(dataset_dir, '*.jpg'))
    glob.glob(os.path.join(dataset_dir, '*.png'))

    for img_path in tqdm(image_files, desc="Procesando Ensemble"):

        # 1. Inferencia de ambos modelos
        r1 = model1.predict(img_path, conf=CONF_THR, verbose=False)
        r2 = model2.predict(img_path, conf=CONF_THR, verbose=False)

        # 2. Fusi√≥n WBF
        boxes, scores, labels = run_wbf(
            r1, r2,
            weights=WEIGHTS,
            iou_thr=IOU_THR,
            skip_thr=SKIP_BOX_THR
        )

        # 3. Cargar Ground Truth
        label_name = os.path.basename(img_path).rsplit('.', 1)[0] + '.txt'
        label_path = os.path.join(labels_dir, label_name)

        gt_boxes = []
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                for line in f:
                    parts = list(map(float, line.strip().split()))
                    cls, xc, yc, w, h = parts

                    x1 = xc - w / 2
                    y1 = yc - h / 2
                    x2 = xc + w / 2
                    y2 = yc + h / 2

                    gt_boxes.append([x1, y1, x2, y2])

            gt_boxes = np.array(gt_boxes)

        # 4. Actualizar m√©tricas solo si hay predicciones
        if len(boxes) > 0:
            calc.update(boxes, scores, gt_boxes)

    # Calcular m√©tricas finales despu√©s de procesar todas las im√°genes
    metrics = calc.compute_metrics()
    metrics['Modelo'] = 'Ensemble (WBF 2:1)'

    return metrics


# 2. Evaluar Ensemble
try:
    res_ensemble = evaluar_ensemble(
        model_nano,
        model_medium,
        TEST_IMAGES_DIR,
        TEST_LABELS_DIR
    )
    resultados.append(res_ensemble)

except Exception as e:
    print(f"Error en evaluaci√≥n del Ensemble: {e}")

--- Evaluando Ensemble (WBF) ---


Procesando Ensemble:   0%|          | 0/33 [00:00<?, ?it/s]

Calculando m√©tricas...


## 5. Optimizaci√≥n de Hiperpar√°metros (Grid Search)
B√∫squeda exhaustiva para encontrar la mejor combinaci√≥n de pesos, umbral IoU y umbral de confianza para maximizar el F1-Score y mAP.

In [11]:
import itertools

# --- CONFIGURACI√ìN DE B√öSQUEDA ---
# Define los rangos de par√°metros a probar
search_space = {
    'weights': [[1, 1], [2, 1], [3, 1], [1, 2]], # [Nano, Medium]
    'iou_thr': [0.45, 0.5, 0.55, 0.6],
    'conf_thr': [0.25, 0.30, 0.35]
}

# Generar todas las combinaciones
keys, values = zip(*search_space.items())
combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]

print(f"Probando {len(combinations)} combinaciones...")

results_grid = []

# Cargar im√°genes una sola vez para no repetir I/O
image_files = glob.glob(os.path.join(TEST_IMAGES_DIR, '*.jpg'))
glob.glob(os.path.join(TEST_IMAGES_DIR, '*.png'))

# Pre-calcular predicciones crudas para ahorrar tiempo (¬°Clave para velocidad!)
# Guardamos las predicciones de cada modelo en memoria
print("Pre-calculando inferencias...")
preds_cache = []
for img_path in tqdm(image_files, desc="Caching Inferences"):
    # Usamos un conf bajo (0.01) aqu√≠ para filtrar despu√©s din√°micamente
    r1 = model_nano.predict(img_path, conf=0.01, verbose=False)
    r2 = model_medium.predict(img_path, conf=0.01, verbose=False) # Nota: Variable model_medium es el Medium

    # Cargar GT
    label_name = os.path.basename(img_path).rsplit('.', 1)[0] + '.txt'
    label_path = os.path.join(TEST_LABELS_DIR, label_name)
    gt_boxes = []
    if os.path.exists(label_path):
        with open(label_path, 'r') as f:
            for line in f:
                parts = list(map(float, line.strip().split()))
                cls, xc, yc, w, h = parts

                x1 = xc - w / 2
                y1 = yc - h / 2
                x2 = xc + w / 2
                y2 = yc + h / 2

                gt_boxes.append([x1, y1, x2, y2])

    preds_cache.append({
        'r1': r1,
        'r2': r2,
        'gt': np.array(gt_boxes)
    })

def filter_preds(results, conf_lim):
    # Funci√≥n auxiliar r√°pida para filtrar por confianza
    boxes_list, scores_list, labels_list = [], [], []
    for res in results:
        b = res.boxes.xyxyn.cpu().numpy()
        s = res.boxes.conf.cpu().numpy()
        l = res.boxes.cls.cpu().numpy()

        mask = s >= conf_lim
        boxes_list.append(b[mask])
        scores_list.append(s[mask])
        labels_list.append(l[mask])
    return boxes_list, scores_list, labels_list

# --- BUCLE DE OPTIMIZACI√ìN ---
for params in tqdm(combinations, desc="Grid Search"):
    calc = MAPCalculator()

    for item in preds_cache:
        # Preparar datos filtrados
        b1, s1, l1 = filter_preds(item['r1'], params['conf_thr'])
        b2, s2, l2 = filter_preds(item['r2'], params['conf_thr'])

        # Ejecutar WBF
        if len(b1[0]) == 0 and len(b2[0]) == 0:
            boxes, scores = [], []
        else:
            boxes, scores, labels = weighted_boxes_fusion(
                [b1[0], b2[0]],
                [s1[0], s2[0]],
                [l1[0], l2[0]],
                weights=params['weights'],
                iou_thr=params['iou_thr'],
                skip_box_thr=0.0 # Ya filtramos por conf
            )

        if len(boxes) > 0:
            calc.update(boxes, scores, item['gt'])

    # Calcular m√©tricas para esta combinaci√≥n
    # Silenciamos la salida de 'Calculando m√©tricas...' para no saturar
    try:
        from io import StringIO 
        import sys
        old_stdout = sys.stdout
        sys.stdout = mystdout = StringIO()
        
        m = calc.compute_metrics()
        
        sys.stdout = old_stdout
    except Exception:
        sys.stdout = old_stdout
        m = {}

    # Guardar
    res_entry = params.copy()
    res_entry.update(m)
    results_grid.append(res_entry)

# --- MOSTRAR MEJORES RESULTADOS ---
df_grid = pd.DataFrame(results_grid)
print("--- Top 5 Mejores Configuraciones (por F1-Score) ---")
best_f1_cfg = df_grid.sort_values(by='F1-Score', ascending=False).head(1)
print(df_grid.sort_values(by='F1-Score', ascending=False).head(5).to_string())

# Agregar el mejor a la tabla de resultados global
if not best_f1_cfg.empty:
    best_row = best_f1_cfg.iloc[0]
    best_res = {
        'Modelo': f"Ensemble Optimizado (W:{best_row['weights']} I:{best_row['iou_thr']} C:{best_row['conf_thr']})",
        'mAP50-95': best_row['mAP50-95'],
        'mAP50': best_row['mAP50'],
        'Precisi√≥n': best_row['Precisi√≥n'],
        'Recall': best_row['Recall'],
        'F1-Score': best_row['F1-Score']
    }
    resultados.append(best_res)

Probando 48 combinaciones...
Pre-calculando inferencias...


Caching Inferences:   0%|          | 0/33 [00:00<?, ?it/s]

Grid Search:   0%|          | 0/48 [00:00<?, ?it/s]

--- Top 5 Mejores Configuraciones (por F1-Score) ---
   weights  iou_thr  conf_thr  mAP50-95     mAP50  Precisi√≥n  Recall  F1-Score
1   [1, 1]     0.45       0.3  0.442033  0.698175   0.969697    0.64  0.771084
16  [2, 1]     0.50       0.3  0.434643  0.691674   0.916667    0.66  0.767442
34  [3, 1]     0.60       0.3  0.425463  0.665652   0.916667    0.66  0.767442
28  [3, 1]     0.50       0.3  0.429250  0.686438   0.916667    0.66  0.767442
25  [3, 1]     0.45       0.3  0.429250  0.686438   0.916667    0.66  0.767442


## 4. Tabla Final de Resultados

In [13]:
print("--- TABLA DE RESULTADOS: DATASET ACUATICO ---")

df_res = pd.DataFrame(resultados)

if not df_res.empty:

    # Ordenar y formatear columnas
    cols = ['Modelo', 'mAP50-95', 'mAP50', 'Precisi√≥n', 'Recall', 'F1-Score']
    df_res = (
        df_res[cols]
        .sort_values(by='mAP50', ascending=False)
        .reset_index(drop=True)
    )

    print(df_res.to_string(float_format='{:.4f}'.format))

else:
    print("No se generaron resultados.")

--- TABLA DE RESULTADOS: DATASET ACUATICO ---
                                        Modelo  mAP50-95  mAP50  Precisi√≥n  Recall  F1-Score
0                                Acuatico-Nano    0.4364 0.7348     0.9047  0.5479    0.6825
1  Ensemble Optimizado (W:[1, 1] I:0.45 C:0.3)    0.4420 0.6982     0.9697  0.6400    0.7711
2                           Ensemble (WBF 2:1)    0.4065 0.6825     0.8750  0.6731    0.7609
3                              Acuatico-Medium    0.3960 0.6724     0.7797  0.5769    0.6632
