In [8]:
# === Ensemble: Weighted Boxes Fusion (WBF) y evaluación en test, test2, test3 ===
# Requiere: pip install ensemble-boxes (solo para WBF)

import os
import time
from pathlib import Path
import numpy as np
import pandas as pd
from ultralytics import YOLO
from lib.YOLO_lib import config

try:
    from ensemble_boxes import weighted_boxes_fusion
    HAS_WBF = True
except ImportError:
    HAS_WBF = False
    print("Instala ensemble-boxes para WBF: pip install ensemble-boxes")

In [16]:
ensemble_model_paths = {
    "yolov12s": config.final_model_path["yolov12s"],
    "yolov11s": config.final_model_path["yolov11s"],
    "yolov10s":  config.final_model_path["yolov8s"],
}

base_models = {k: YOLO(p) for k,p in ensemble_model_paths.items()}

model_weights = {
    "yolov12s": 0.6,
    "yolov11s": 0.1,
    "yolov10s":  0.3
}

In [17]:
# Utilidades
def load_yolo_labels(label_file):
    boxes = []
    if not os.path.exists(label_file):
        return np.zeros((0,5), dtype=float)
    with open(label_file, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 5:
                continue
            cls, x, y, w, h = map(float, parts)
            boxes.append([cls, x, y, w, h])
    return np.array(boxes, dtype=float) if boxes else np.zeros((0,5), dtype=float)


def yolo_to_xyxy_norm(boxes):
    # boxes: (N,5) cls, cx, cy, w, h (normalizado)
    if boxes.size == 0:
        return boxes[:, :0]
    cx, cy, w, h = boxes[:,1], boxes[:,2], boxes[:,3], boxes[:,4]
    x1 = cx - w/2
    y1 = cy - h/2
    x2 = cx + w/2
    y2 = cy + h/2
    return np.stack([x1,y1,x2,y2], axis=1)


def iou_matrix(a, b):
    # a,b: (N,4) (M,4) en formato xyxy normalizado
    if a.size == 0 or b.size == 0:
        return np.zeros((len(a), len(b)), dtype=float)
    ious = np.zeros((len(a), len(b)), dtype=float)
    for i, box_a in enumerate(a):
        ax1, ay1, ax2, ay2 = box_a
        aarea = max(0, ax2-ax1) * max(0, ay2-ay1)
        for j, box_b in enumerate(b):
            bx1, by1, bx2, by2 = box_b
            barea = max(0, bx2-bx1) * max(0, by2-by1)
            ix1 = max(ax1, bx1)
            iy1 = max(ay1, by1)
            ix2 = min(ax2, bx2)
            iy2 = min(ay2, by2)
            iw = max(0, ix2-ix1)
            ih = max(0, iy2-iy1)
            inter = iw * ih
            union = aarea + barea - inter
            ious[i,j] = inter / union if union > 0 else 0.0
    return ious


def predict_single_model(model, image_path, imgsz):
    r = model.predict(image_path, imgsz=imgsz, conf=0.001, verbose=False)[0]
    h, w = r.orig_shape
    if r.boxes.shape[0] == 0:
        return {"boxes": np.zeros((0,4)), "conf": np.zeros((0,)), "cls": np.zeros((0,))}
    xyxy = r.boxes.xyxy.cpu().numpy()
    conf = r.boxes.conf.cpu().numpy()
    cls = r.boxes.cls.cpu().numpy()
    # Normalizar
    xyxy_norm = xyxy.copy()
    xyxy_norm[:,[0,2]] /= w
    xyxy_norm[:,[1,3]] /= h
    return {"boxes": xyxy_norm, "conf": conf, "cls": cls}


def ensemble_wbf(image_path, imgsz=704, iou_thr=0.9, skip_box_thr=0.5):
    if not HAS_WBF:
        return np.zeros((0,6)), 0
    
    start_time = time.time()
    boxes_list, scores_list, labels_list, weights = [], [], [], []
    
    for name, m in base_models.items():
        r = predict_single_model(m, image_path, imgsz)
        
        boxes_list.append(r["boxes"].tolist())
        scores_list.append(r["conf"].tolist())
        labels_list.append(r["cls"].astype(int).tolist())
        weights.append(model_weights[name])
    
    if sum(len(b) for b in boxes_list) == 0:
        return np.zeros((0,6)), (time.time() - start_time) * 1000
        
    fused_boxes, fused_scores, fused_labels = weighted_boxes_fusion(
        boxes_list, scores_list, labels_list,
        weights=weights, iou_thr=iou_thr, skip_box_thr=skip_box_thr
    )
    
    fused_boxes = np.array(fused_boxes)
    fused_scores = np.array(fused_scores)
    fused_labels = np.array(fused_labels)
    
    dets = np.concatenate([fused_boxes, fused_scores[:,None], fused_labels[:,None]], axis=1)
    inference_time = (time.time() - start_time) * 1000  # ms
    
    return dets, inference_time

# Métricas (AP@0.5 y AP@[0.5:0.95], precisión y recall global)
def compute_metrics(preds_all, gts_all, iou_thresholds=None):
    # preds_all & gts_all: listas por imagen
    if iou_thresholds is None:
        iou_thresholds = np.arange(0.5, 1.0, 0.05)  # [0.5, 0.55, ..., 0.95]
    
    # Para precisión/recall global a IoU=0.5 (primera)
    iou_main = iou_thresholds[0]
    
    num_classes = 1
    
    class_APs = np.zeros((num_classes, len(iou_thresholds)))
    
    # Para cada clase (solo tenemos una pero mantenemos la estructura)
    for class_id in range(num_classes):
        for t_idx, t in enumerate(iou_thresholds):
            # Recolectar todas las predicciones y sus etiquetas TP/FP
            all_pred_entries = []
            total_gts = 0
            
            # Procesar cada imagen
            for preds, gts in zip(preds_all, gts_all):
                total_gts += gts.shape[0]
                if preds.shape[0] == 0:
                    continue
                
                # Filtrar predicciones de esta clase (si tuviéramos múltiples)
                # preds_class = preds[preds[:,5] == class_id]
                preds_class = preds  # Todas son de la misma clase
                
                if preds_class.shape[0] == 0:
                    continue
                
                # Ordenar por score
                order = np.argsort(-preds_class[:,4])
                p = preds_class[order]
                
                # Marcar TP/FP
                matched = np.zeros(gts.shape[0], dtype=bool)
                for row in p:
                    box_p = row[:4][None,:]
                    ious = iou_matrix(box_p, gts)
                    best_i = np.argmax(ious[0]) if gts.shape[0] else -1
                    best_iou = ious[0,best_i] if gts.shape[0] else 0
                    is_tp = best_iou >= t and (best_i >=0) and (not matched[best_i])
                    if is_tp:
                        matched[best_i] = True
                    all_pred_entries.append((row[4], 1 if is_tp else 0))
            
            # Si no hay GT o predicciones, AP=0
            if total_gts == 0 or not all_pred_entries:
                class_APs[class_id, t_idx] = 0.0
                continue
            
            # Ordenar predicciones por confianza (mayor a menor)
            all_pred_entries.sort(key=lambda x: -x[0])
            
            # Calcular precisión y recall acumulados
            cum_tp = 0
            cum_fp = 0
            precisions = []
            recalls = []
            
            for _, is_tp in all_pred_entries:
                if is_tp:
                    cum_tp += 1
                else:
                    cum_fp += 1
                precisions.append(cum_tp / (cum_tp + cum_fp))
                recalls.append(cum_tp / total_gts)
            
            # Convertir a arrays para procesamiento
            precisions = np.array(precisions)
            recalls = np.array(recalls)
            
            # Interpolación COCO: asegurar monotonicidad descendente (máximo a la derecha)
            for i in range(len(precisions)-2, -1, -1):
                precisions[i] = max(precisions[i], precisions[i+1])
            
            # Método COCO: interpolar en 101 puntos de recall [0, 0.01, 0.02, ..., 1.0]
            ap = 0.0
            for r in np.linspace(0, 1, 101):
                # Encontrar la precisión máxima para recall >= r
                prec = 0.0
                for i in range(len(recalls)):
                    if recalls[i] >= r:
                        prec = max(prec, precisions[i])
                ap += prec / 101.0
            
            # Asignar AP para esta clase y umbral IoU
            class_APs[class_id, t_idx] = ap
    
    # Calcular estadísticas globales a IoU=0.5
    tp_main = 0
    fp_main = 0
    
    for preds, gts in zip(preds_all, gts_all):
        if preds.shape[0] == 0:
            tp_main += 0
            fp_main += 0
            continue
        
        # Ordenar por score
        order = np.argsort(-preds[:,4])
        p = preds[order]
        
        # Marcar TP/FP para IoU=0.5
        matched = np.zeros(gts.shape[0], dtype=bool)
        for row in p:
            box_p = row[:4][None,:]
            ious = iou_matrix(box_p, gts)
            best_i = np.argmax(ious[0]) if gts.shape[0] else -1
            best_iou = ious[0,best_i] if gts.shape[0] else 0
            
            if best_iou >= iou_main and best_i >=0 and (not matched[best_i]):
                matched[best_i] = True
                tp_main += 1
            else:
                fp_main += 1
    
    fn_main = sum(gt.shape[0] for gt in gts_all) - tp_main
    mp = tp_main / (tp_main + fp_main) if (tp_main + fp_main) > 0 else 0.0
    mr = tp_main / (tp_main + fn_main) if (tp_main + fn_main) > 0 else 0.0
    
    # Calcular mAP@0.5 y mAP@[0.5:0.95] según COCO
    map50 = np.mean(class_APs[:, 0])  # Primera columna = IoU 0.5
    map5095 = np.mean(class_APs)       # Media de todas las IoUs
    
    return {
        "mp": mp,
        "mr": mr,
        "map50": map50,
        "map": map5095
    }


def get_split_image_label_dirs(split_name):
    base = Path("..") / "03.Datasets" / "YOLO_Datasets"
    images = base / split_name / "images"
    labels = base / split_name / "labels"
    return images, labels


def evaluate_ensemble(split, method="wbf", imgsz=config.IMGSZ):
    img_dir, label_dir = get_split_image_label_dirs(split)
    image_paths = [p for p in img_dir.iterdir() if p.suffix.lower() in [".jpg", ".png", ".jpeg"]]
    preds_all = []
    gts_all = []
    inference_times = []
    
    for img_path in image_paths:
        # Ground truth
        gt_label = (label_dir / (img_path.stem + ".txt"))
        gts_raw = load_yolo_labels(str(gt_label))  # cls, cx, cy, w, h
        gts_xyxy = yolo_to_xyxy_norm(gts_raw) if gts_raw.size else np.zeros((0,4))
        gts_all.append(gts_xyxy)

        dets, inf_time = ensemble_wbf(str(img_path), imgsz=imgsz, iou_thr=0.50, skip_box_thr=0.3)
        inference_times.append(inf_time)
        # dets: (N,6) xyxy + conf + cls
        preds_all.append(dets if dets.size else np.zeros((0,6)))
        
    metrics = compute_metrics(preds_all, gts_all)
    # Añadir tiempo de inferencia promedio a las métricas
    metrics["inference_time"] = np.mean(inference_times)
    return metrics

In [18]:
test_splits = ["test_original","test","test2","test3"]
ensemble_results = []
for split in test_splits:
    m2 = evaluate_ensemble(split, method="wbf")
    ensemble_results.append({
        "Modelo": "ensemble_wbf",
        "Test": split,
        "Precisión": m2["mp"],
        "Recall": m2["mr"],
        "mAP@0.5": m2["map50"],
        "mAP@0.5:0.95": m2["map"],
        "Inferencia (ms)": m2["inference_time"]
    })

df_ensemble = pd.DataFrame(ensemble_results)
display(df_ensemble)

Unnamed: 0,Modelo,Test,Precisión,Recall,mAP@0.5,mAP@0.5:0.95,Inferencia (ms)
0,ensemble_wbf,test_original,0.780227,0.917518,0.882009,0.716803,112.061919
1,ensemble_wbf,test,0.838343,0.88881,0.859278,0.684976,105.827329
2,ensemble_wbf,test2,0.895425,0.951389,0.944046,0.486713,75.143909
3,ensemble_wbf,test3,0.832524,0.906608,0.877287,0.70219,93.94653
