In [2]:
import os
os.environ['CUDA_VISIBLE_DEVICES'] = '1'
import glob
import cv2
import numpy as np
from tqdm import tqdm
import time

from albumentations.augmentations import transforms
from albumentations.core.composition import Compose, OneOf
from albumentations import RandomRotate90,Resize

import torch
import torch.nn.functional as F

from ultralytics import YOLO

  check_for_updates()


In [3]:
# image / label root
image_root = "/local_data/dataset/polyp/detection/patients_complete/images/val/"
label_root = "/local_data/dataset/polyp/detection/patients_complete/labels/val/"

# ---------------------- config ----------------------
device = "cuda" if torch.cuda.is_available() else "cpu"

NUM_CLASSES_DET = 2     # hyperplastic, adenoma
BG_INDEX_SEG = 2        # segmentation 的背景 channel index
IOU_THRESH_EVAL = 0.5   # mAP50
CONF_THRESH_DET = 0.001 # 要求的 detection conf 門檻

In [4]:
# ---------------------- main loop ----------------------
img_exts = ("*.jpg", "*.jpeg", "*.png", "*.bmp", "*.tif")
img_paths = []
for e in img_exts:
    img_paths.extend(glob.glob(os.path.join(image_root, e)))
img_paths = sorted(img_paths)

print(f"#images in val: {len(img_paths)}")

#images in val: 704


In [5]:
# ---------------------- helpers ----------------------
def load_yolo_gt(label_path, img_w, img_h, num_classes=NUM_CLASSES_DET):
    """
    讀取 YOLO txt labels -> list of {cls, box=[x1,y1,x2,y2]}
    """
    if not os.path.exists(label_path):
        return {c: [] for c in range(num_classes)}

    with open(label_path, "r") as f:
        lines = [x.strip() for x in f.readlines() if x.strip()]

    gts_per_cls = {c: [] for c in range(num_classes)}
    if not lines:
        return gts_per_cls

    for line in lines:
        parts = line.split()
        if len(parts) != 5:
            continue
        cls = int(float(parts[0]))
        if cls >= num_classes:
            continue
        xc, yc, w, h = map(float, parts[1:])
        xc *= img_w
        yc *= img_h
        w *= img_w
        h *= img_h
        x1 = xc - w / 2
        y1 = yc - h / 2
        x2 = xc + w / 2
        y2 = yc + h / 2
        gts_per_cls[cls].append([x1, y1, x2, y2])

    return gts_per_cls
    
def box_iou_np(box1, box2):
    """
    box1: (N,4), box2:(M,4) in xyxy
    return IoU: (N,M)
    """
    if box1.size == 0 or box2.size == 0:
        return np.zeros((box1.shape[0], box2.shape[0]))

    box1 = box1.astype(np.float32)
    box2 = box2.astype(np.float32)

    area1 = np.clip(box1[:, 2] - box1[:, 0], 0, None) * np.clip(box1[:, 3] - box1[:, 1], 0, None)
    area2 = np.clip(box2[:, 2] - box2[:, 0], 0, None) * np.clip(box2[:, 3] - box2[:, 1], 0, None)

    inter_x1 = np.maximum(box1[:, None, 0], box2[None, :, 0])
    inter_y1 = np.maximum(box1[:, None, 1], box2[None, :, 1])
    inter_x2 = np.minimum(box1[:, None, 2], box2[None, :, 2])
    inter_y2 = np.minimum(box1[:, None, 3], box2[None, :, 3])

    inter_w = np.clip(inter_x2 - inter_x1, 0, None)
    inter_h = np.clip(inter_y2 - inter_y1, 0, None)
    inter = inter_w * inter_h

    union = area1[:, None] + area2[None, :] - inter + 1e-16
    return inter / union

def compute_confusion_matrix(predictions,
                             gt_boxes_per_image,
                             num_classes=2,
                             conf_th=0.25,
                             iou_th=0.5):
    """
    Return confusion matrix of shape (num_classes+1, num_classes+1)
    rows:    predicted class (最後一列 = predicted background)
    columns: ground-truth class (最後一欄 = GT background)
    """
    bg = num_classes
    cm = np.zeros((num_classes + 1, num_classes + 1), dtype=np.int64)

    # 先把 prediction 按 image_id group 起來比較快
    preds_by_img = {}
    for p in predictions:
        if p["score"] < conf_th:
            continue
        preds_by_img.setdefault(p["image_id"], []).append(p)

    for img_id, gt_dict in gt_boxes_per_image.items():
        # collect all GT boxes for this image
        gt_boxes = []
        gt_cls = []
        for c in range(num_classes):
            for b in gt_dict[c]:
                gt_boxes.append(b)
                gt_cls.append(c)
        gt_boxes = np.array(gt_boxes, dtype=np.float32)
        gt_cls = np.array(gt_cls, dtype=np.int64)

        preds = preds_by_img.get(img_id, [])
        if len(preds) == 0 and gt_boxes.size == 0:
            continue

        pred_boxes = np.array([p["box"] for p in preds], dtype=np.float32) if preds else np.zeros((0, 4), dtype=np.float32)
        pred_cls = np.array([p["cls"] for p in preds], dtype=np.int64) if preds else np.zeros((0,), dtype=np.int64)

        N, M = pred_boxes.shape[0], gt_boxes.shape[0]

        if N > 0 and M > 0:
            ious = box_iou_np(pred_boxes, gt_boxes)  # (N,M)
            matched_pred = np.zeros(N, dtype=bool)
            matched_gt = np.zeros(M, dtype=bool)

            # greedy 1-1 matching by IoU
            while True:
                idx = np.unravel_index(np.argmax(ious), ious.shape)
                max_iou = ious[idx]
                if max_iou < iou_th:
                    break
                pi, gj = idx
                if matched_pred[pi] or matched_gt[gj]:
                    ious[pi, gj] = -1.0
                    continue
                matched_pred[pi] = True
                matched_gt[gj] = True

                pc = int(pred_cls[pi])
                gc = int(gt_cls[gj])
                cm[pc, gc] += 1

                ious[pi, :] = -1.0
                ious[:, gj] = -1.0

            # unmatched predictions -> predicted some class, GT background
            for i in range(N):
                if not matched_pred[i]:
                    pc = int(pred_cls[i])
                    cm[pc, bg] += 1

            # unmatched GT -> predicted background, GT some class
            for j in range(M):
                if not matched_gt[j]:
                    gc = int(gt_cls[j])
                    cm[bg, gc] += 1

        elif N > 0 and M == 0:
            # all preds are FP, GT background
            for pc in pred_cls:
                cm[int(pc), bg] += 1
        elif N == 0 and M > 0:
            # all GT are FN, predicted background
            for gc in gt_cls:
                cm[bg, int(gc)] += 1

    return cm

# -------------------  Detection Metrics (AP) ------------------------------
def prepare_gt_class_agnostic(gt_boxes_per_image):
    gt_nocls = {}
    for img_id, v in gt_boxes_per_image.items():
        boxes = []
        if isinstance(v, dict):
            # v: {cls: [[...], ...], ...}
            for cls, box_list in v.items():
                if box_list is None:
                    continue
                for b in box_list:
                    boxes.append(b)
        else:
            # 若本來就已經是 list of boxes
            boxes = v

        if len(boxes) > 0:
            gt_nocls[img_id] = np.asarray(boxes, dtype=np.float32).reshape(-1, 4)
        else:
            gt_nocls[img_id] = np.zeros((0, 4), dtype=np.float32)
    return gt_nocls


# -------------------------------------------------
# 單一 box 對多個 boxes 的 IoU
# box: shape (4,), boxes: shape (N, 4)
# -------------------------------------------------
def box_iou(box, boxes):
    if boxes.size == 0:
        return np.zeros((0,), dtype=np.float32)

    x1 = np.maximum(box[0], boxes[:, 0])
    y1 = np.maximum(box[1], boxes[:, 1])
    x2 = np.minimum(box[2], boxes[:, 2])
    y2 = np.minimum(box[3], boxes[:, 3])

    inter_w = np.clip(x2 - x1, a_min=0, a_max=None)
    inter_h = np.clip(y2 - y1, a_min=0, a_max=None)
    inter = inter_w * inter_h

    area_box = (box[2] - box[0]) * (box[3] - box[1])
    area_boxes = (boxes[:, 2] - boxes[:, 0]) * (boxes[:, 3] - boxes[:, 1])

    union = area_box + area_boxes - inter
    iou = np.where(union > 0, inter / union, 0.0)
    return iou


# -------------------------------------------------
# 計算「不分分類」的 AP@iou_thr
# all_predictions: list of dict
#   {"image_id": str, "cls": int, "score": float, "box": [x1,y1,x2,y2]}
# gt_boxes_per_image: img_id -> {cls: [[x1,y1,x2,y2], ...]}
# -------------------------------------------------
def compute_ap_class_agnostic(all_predictions, gt_boxes_per_image, iou_thr=0.5):
    """
    class-agnostic 的 AP 計算：
    - 忽略 pred["cls"]，只看 image_id / box / score。
    - 回傳 AP + 原始 PR curve + COCO 插值後 PR + 對應的 score threshold。
    
    Returns
    -------
    ap : float
        指定 IoU 門檻下的 Average Precision。
    recalls : np.ndarray, shape (N,)
        依照 score 從高到低加點時，每一點的 recall。
    precisions : np.ndarray, shape (N,)
        依照 score 從高到低加點時，每一點的 precision。
    rec_points : np.ndarray, shape (101,)
        COCO 風格的固定 recall 取樣點：0.00, 0.01, ..., 1.00。
    prec_interp : np.ndarray, shape (101,)
        對應到 rec_points 的「插值後」precision（用來算 AP 的 PR 曲線）。
    thresholds : np.ndarray, shape (N,)
        每一個 PR 點對應的 score threshold（第 i 個點 = 保留 score ≥ thresholds[i]）。
    """
    # 先把 GT 合併成「不分 class」版本
    gt_nocls = prepare_gt_class_agnostic(gt_boxes_per_image)

    # 總 GT 數量 (所有圖、所有類別加總)
    npos = sum(len(b) for b in gt_nocls.values())
    if npos == 0:
        return float("nan"), None, None, None, None, None

    # 依照 score 由大到小排序 (完全忽略 cls)
    preds = sorted(all_predictions, key=lambda x: x["score"], reverse=True)

    tp = np.zeros(len(preds), dtype=np.float32)
    fp = np.zeros(len(preds), dtype=np.float32)
    thresholds = np.array([p["score"] for p in preds], dtype=np.float32)

    # 每張圖的每個 GT 只能被 match 一次
    gt_used = {img_id: np.zeros(len(boxes), dtype=bool)
               for img_id, boxes in gt_nocls.items()}

    for i, p in enumerate(preds):
        img_id = p["image_id"]
        box = np.asarray(p["box"], dtype=np.float32)

        gt_boxes = gt_nocls.get(img_id, None)
        if gt_boxes is None or len(gt_boxes) == 0:
            # 這張圖沒有 GT，任何預測都是 FP
            fp[i] = 1.0
            continue

        ious = box_iou(box, gt_boxes)
        max_iou_idx = int(np.argmax(ious))
        max_iou = float(ious[max_iou_idx])

        if max_iou >= iou_thr and not gt_used[img_id][max_iou_idx]:
            tp[i] = 1.0
            gt_used[img_id][max_iou_idx] = True
        else:
            fp[i] = 1.0

    # ------- 原始 PR curve（每加一個預測點更新一次） -------
    tp_cum = np.cumsum(tp)
    fp_cum = np.cumsum(fp)

    recalls = tp_cum / npos
    precisions = tp_cum / np.maximum(tp_cum + fp_cum, 1e-8)

    # ------- COCO 風格：在 0~1 的 101 個 recall 點做插值 -------
    rec_points = np.linspace(0.0, 1.0, 101)
    prec_interp = np.zeros_like(rec_points)

    for idx, r in enumerate(rec_points):
        # 找到所有 recall >= r 的點，取其中最大的 precision
        mask = recalls >= r
        if np.any(mask):
            prec_interp[idx] = np.max(precisions[mask])
        else:
            prec_interp[idx] = 0.0

    ap = float(np.mean(prec_interp))

    return ap, recalls, precisions, rec_points, prec_interp, thresholds

def find_best_f1_threshold(precisions, recalls, thresholds):
    """
    根據原始 PR curve 的每個點計算 F1，找出 F1 最大的點。
    
    Returns
    -------
    best_thresh : float
        讓 F1 最大的 confidence threshold（score）。
    best_f1 : float
        最大的 F1 值。
    best_p : float
        該 threshold 底下的 precision。
    best_r : float
        該 threshold 底下的 recall。
    """
    # F1 = 2PR / (P+R)
    denom = precisions + recalls
    f1 = np.where(denom > 0, 2 * precisions * recalls / denom, 0.0)

    if len(f1) == 0:
        return None, None, None, None

    best_idx = int(np.argmax(f1))
    best_thresh = float(thresholds[best_idx])
    best_f1 = float(f1[best_idx])
    best_p = float(precisions[best_idx])
    best_r = float(recalls[best_idx])
    return best_thresh, best_f1, best_p, best_r
    

# -------------------------------------------------
# 實際計算 AP@50, AP@75, AP@50:95
# all_predictions / gt_boxes_per_image 用你 main loop 算好的那兩個變數
# -------------------------------------------------
# AP@50:95


In [6]:
import os
os.environ['CUDA_VISIBLE_DEVICES']='0'
from ultralytics import YOLO
# model_dict = {
#     'v8':  ['n', 's', 'm'],
#     'v9':  ['t', 's', 'm'],
#     'v10': ['n', 's', 'm'],
#     'v11': ['n', 's', 'm'],
#     'v12': ['n', 's', 'm'],
#     'v13': ['n', 's'],
# }
model_dict = {
    'v11': ['l', 'x'],
    'v12': ['l', 'x'],
    'v13': ['l', 'x'],
}
for k in model_dict.keys():
    for s in model_dict[k]:
        # detection model (YOLOv13-n)

        model_path = f'/nfs/P111yhchen/code/detection/det_branch/{k}/yolo{k}{s}/weights/best.pt'
        if not os.path.exists(model_path):
            continue
        print(f'yolo{k}{s}')
        model_det = YOLO(model_path)

        all_predictions = []  # 全部 pred bbox
        gt_boxes_per_image = {}  # img_id -> {cls: [[x1,y1,x2,y2], ...]}

        for img_path in tqdm(img_paths, desc="Evaluating dual-path"):
            img_id = os.path.splitext(os.path.basename(img_path))[0]
            img = cv2.imread(img_path)
            if img is None:
                continue
            h0, w0 = img.shape[:2]

            # ------ GT ------
            label_path = os.path.join(label_root, img_id + ".txt")
            gt_boxes_per_image[img_id] = load_yolo_gt(label_path, w0, h0)

            # ------ detection (YOLO) ------
            # Ultralytics: conf threshold 在這裡設定
            results = model_det.predict(
                img,  # BGR numpy
                single_cls=True,
                conf=CONF_THRESH_DET,
                iou=0.7,     # NMS 的 IoU 門檻，依需要調
                verbose=False,
                device=device,
            )
            
            r = results[0]
            if r.boxes is None or len(r.boxes) == 0:
                continue

            boxes = r.boxes.xyxy.cpu().numpy()
            scores = r.boxes.conf.cpu().numpy()
            clses = r.boxes.cls.cpu().numpy()
            for det_cls, box, score in zip(clses, boxes, scores):
                all_predictions.append({
                    "image_id": img_id,
                    "cls": int(det_cls),
                    "score": float(score),
                    "box": box.tolist(),
                })

        aps = []
        thrs = [] # conf_thr with best F1 score
        rs = [] 
        ps = []
        for thr in np.arange(0.5, 1.0, 0.05):  # 0.50, 0.55, ..., 0.95
            ap_i, r_i, p_i, _, _, thr_i = compute_ap_class_agnostic(
                all_predictions, gt_boxes_per_image, iou_thr=thr
            )
            aps.append(ap_i)
            thrs.append(thr_i)
            rs.append(r_i)
            ps.append(p_i)
        ap_50_95 = float(np.mean(aps))
        best_thr_50, best_f1_50, best_p_50, best_r_50 = find_best_f1_threshold(ps[0], rs[0], thrs[0])
        cm = compute_confusion_matrix(
            all_predictions,
            gt_boxes_per_image,
            num_classes=NUM_CLASSES_DET,
            conf_th=0.25,   # 想和 ultralytics 一樣就設 0.25
            iou_th=0.5
        )
        print("\nConfusion matrix @50 (rows=pred, cols=gt, last index = background):")
        print(cm)
        for ci in range(NUM_CLASSES_DET):
            pp = cm[ci, ci]/cm[ci].sum()
            rr = cm[ci, ci]/cm[:, ci].sum()
            print(f"[class {ci}] precision: {pp:.4f}  recall: {rr:.4f} ")

        print(f"AP@50:95: {ap_50_95:.4f}")

        print(f"[IoU=0.5] best threshold = {best_thr_50:.4f}")
        print(f"[IoU=0.5] AP = {aps[0]:.4f}")
        print(f"[IoU=0.5] precision = {best_p_50:.4f}, recall = {best_r_50:.4f}")
        print(f"[IoU=0.5] best F1 = {best_f1_50:.4f}")

        

yolov11l


Evaluating dual-path: 100%|██████████| 704/704 [00:51<00:00, 13.77it/s]



Confusion matrix @50 (rows=pred, cols=gt, last index = background):
[[191  88  88]
 [ 81 252  93]
 [ 51  71   0]]
[class 0] precision: 0.5204  recall: 0.5913 
[class 1] precision: 0.5915  recall: 0.6131 
AP@50:95: 0.5651
[IoU=0.5] best threshold = 0.3104
[IoU=0.5] AP = 0.8727
[IoU=0.5] precision = 0.8207, recall = 0.8106
[IoU=0.5] best F1 = 0.8156
yolov11x


Evaluating dual-path: 100%|██████████| 704/704 [00:16<00:00, 42.52it/s]



Confusion matrix @50 (rows=pred, cols=gt, last index = background):
[[168  84  58]
 [ 87 272  71]
 [ 68  55   0]]
[class 0] precision: 0.5419  recall: 0.5201 
[class 1] precision: 0.6326  recall: 0.6618 
AP@50:95: 0.5696
[IoU=0.5] best threshold = 0.4148
[IoU=0.5] AP = 0.8870
[IoU=0.5] precision = 0.8797, recall = 0.7970
[IoU=0.5] best F1 = 0.8363
yolov12l


Evaluating dual-path: 100%|██████████| 704/704 [00:22<00:00, 31.29it/s]



Confusion matrix @50 (rows=pred, cols=gt, last index = background):
[[187  74  84]
 [ 93 276  99]
 [ 43  61   0]]
[class 0] precision: 0.5420  recall: 0.5789 
[class 1] precision: 0.5897  recall: 0.6715 
AP@50:95: 0.5566
[IoU=0.5] best threshold = 0.4495
[IoU=0.5] AP = 0.8741
[IoU=0.5] precision = 0.8782, recall = 0.7956
[IoU=0.5] best F1 = 0.8349
yolov12x


Evaluating dual-path: 100%|██████████| 704/704 [00:22<00:00, 31.09it/s]



Confusion matrix @50 (rows=pred, cols=gt, last index = background):
[[186  77  86]
 [ 86 275 100]
 [ 51  59   0]]
[class 0] precision: 0.5330  recall: 0.5759 
[class 1] precision: 0.5965  recall: 0.6691 
AP@50:95: 0.5685
[IoU=0.5] best threshold = 0.4412
[IoU=0.5] AP = 0.8799
[IoU=0.5] precision = 0.8737, recall = 0.7916
[IoU=0.5] best F1 = 0.8306
yolov13l


Evaluating dual-path:  88%|████████▊ | 622/704 [00:25<00:03, 24.07it/s]


KeyboardInterrupt: 