In [None]:
import torch
import numpy as np
import cv2
from ultralytics import YOLO
from collections import defaultdict
from copy import deepcopy
from math import ceil
from scipy.spatial.distance import cdist
from scipy.optimize import linear_sum_assignment

In [None]:
def enable_dropout_mc(model):
    for m in model.modules():
        if isinstance(m, torch.nn.Dropout):
            m.train()
        if isinstance(m, (torch.nn.Dropout2d, torch.nn.Dropout3d)):
            m.train()
        if isinstance(m, (torch.nn.BatchNorm2d, torch.nn.BatchNorm1d, torch.nn.BatchNorm3d)):
            m.eval()

In [None]:
def yolov5_predict_raw(model, img_tensor):
    results = model.predict(img_tensor, verbose=False)
    r = results[0]
    if hasattr(r, "boxes") and len(r.boxes) > 0:
        boxes = r.boxes.xyxy.cpu().numpy()
        conf = r.boxes.conf.cpu().numpy()
        cls = r.boxes.cls.cpu().numpy().astype(int)
        detections = []
        for b, c, cl in zip(boxes, conf, cls):
            x1, y1, x2, y2 = b
            detections.append([float(x1), float(y1), float(x2), float(y2), float(c), int(cl)])
        return detections
    else:
        return []

In [None]:
def iou(boxA, boxB):
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])
    interW = max(0, xB - xA)
    interH = max(0, yB - yA)
    interArea = interW * interH
    boxAArea = max(0, (boxA[2]-boxA[0])) * max(0, (boxA[3]-boxA[1]))
    boxBArea = max(0, (boxB[2]-boxB[0])) * max(0, (boxB[3]-boxB[1]))
    denom = boxAArea + boxBArea - interArea + 1e-8
    return interArea / denom

In [None]:
def cluster_boxes_by_hungarian(all_boxes, iou_thresh=0.5, n_samples=10):
    from collections import defaultdict
    class_to_boxes = defaultdict(list)
    for idx, box in enumerate(all_boxes):
        class_to_boxes[box[5]].append((idx, box))

    clusters = []
    for cls, boxlist in class_to_boxes.items():
        runs = defaultdict(list)
        for idx, box in boxlist:
            runs[box[6]].append((idx, box))
        run_ids = sorted(runs.keys())
        if len(run_ids) < 2:
            for idx, box in boxlist:
                clusters.append([idx])
            continue

        ref_run = run_ids[0]
        ref_boxes = runs[ref_run]
        matched_indices = [[idx] for idx, _ in ref_boxes]
        for other_run in run_ids[1:]:
            other_boxes = runs[other_run]
            if not other_boxes:
                continue
            iou_matrix = np.zeros((len(ref_boxes), len(other_boxes)), dtype=np.float32)
            for i, (_, boxA) in enumerate(ref_boxes):
                for j, (_, boxB) in enumerate(other_boxes):
                    iou_matrix[i, j] = iou(boxA[:4], boxB[:4])
            row_ind, col_ind = linear_sum_assignment(-iou_matrix)
            for r, c in zip(row_ind, col_ind):
                if iou_matrix[r, c] >= iou_thresh:
                    matched_indices[r].append(other_boxes[c][0])
        for cluster in matched_indices:
            clusters.append(cluster)
        matched_other = set()
        for cluster in matched_indices:
            matched_other.update(cluster[1:])
        for other_run in run_ids[1:]:
            for idx, _ in runs[other_run]:
                if idx not in matched_other:
                    clusters.append([idx])
    return clusters

In [None]:
def aggregate_cluster(all_boxes, cluster_indices):
    coords = np.array([all_boxes[i][:4] for i in cluster_indices], dtype=np.float32)
    confs  = np.array([all_boxes[i][4]  for i in cluster_indices], dtype=np.float32)
    classes = [all_boxes[i][5] for i in cluster_indices]
    runs = [all_boxes[i][6] for i in cluster_indices]
    mean_box = coords.mean(axis=0).tolist()
    std_box  = coords.std(axis=0).tolist()
    mean_conf= float(confs.mean())
    std_conf = float(confs.std())
    count = len(cluster_indices)
    cls_mode = max(set(classes), key=classes.count)
    return {
        "mean_box": mean_box,
        "std_box": std_box,
        "mean_conf": mean_conf,
        "std_conf": std_conf,
        "count": count,
        "class": int(cls_mode),
        "runs": runs
    }

In [None]:
def mc_dropout_detections(model, image_path, n_samples=10, iou_thresh=0.5, device='cpu'):
    img_bgr = cv2.imread(image_path)
    if img_bgr is None:
        raise FileNotFoundError(image_path)
    enable_dropout_mc(model.model)
    all_boxes = []

    for run in range(n_samples):
        results = model.predict(source=image_path, conf=0.1, verbose=False)
        r = results[0]
        if hasattr(r, "boxes") and len(r.boxes) > 0:
            boxes = r.boxes.xyxy.cpu().numpy()
            confs = r.boxes.conf.cpu().numpy()
            clss = r.boxes.cls.cpu().numpy().astype(int)
            for b, c, cl in zip(boxes, confs, clss):
                x1,y1,x2,y2 = map(float,b)
                all_boxes.append([x1,y1,x2,y2,float(c),int(cl), run])

    if len(all_boxes) == 0:
        return []

    clusters = cluster_boxes_by_hungarian(all_boxes, iou_thresh=iou_thresh, n_samples=n_samples)
    aggregated = [aggregate_cluster(all_boxes, cl) for cl in clusters]
    total_runs = n_samples
    for ag in aggregated:
        unique_runs = len(set(ag["runs"]))
        ag["detection_frequency"] = unique_runs / total_runs
        img_h, img_w = img_bgr.shape[:2]
        diag = np.sqrt(img_h**2 + img_w**2)
        box_std_norm = np.mean(ag["std_box"]) / (diag + 1e-8)
        ag["uncertainty_score"] = float(np.tanh(ag["std_conf"]) * 0.7 + box_std_norm * 0.3 + (1.0 - ag["detection_frequency"]) * 0.5)
    return aggregated, all_boxes

def draw_aggregated_boxes(img_path, aggregated, conf_threshold=0.0):
    img = cv2.imread(img_path)
    h,w = img.shape[:2]
    out = img.copy()
    for a in aggregated:
        x1,y1,x2,y2 = map(int, a["mean_box"])
        cls = a["class"]
        uscore = a["uncertainty_score"]
        mconf = a["mean_conf"]
        col = (int(255 * min(1.0, uscore)), int(255 * (1.0-uscore)), 0)
        cv2.rectangle(out, (x1,y1), (x2,y2), col, 2)
        label = f"cls:{cls} conf:{mconf:.2f} unc:{uscore:.2f}"
        cv2.putText(out, label, (x1, max(0,y1-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.45, col, 1, cv2.LINE_AA)
    return out

In [None]:
model = YOLO("PATH_TO_BEST_PT")
image_path = "PATH_TO_IMAGE"
aggregated, all_boxes = mc_dropout_detections(model, image_path, n_samples=20, iou_thresh=0.5)
print("Found clusters:", len(aggregated))
for a in aggregated:
    print(a)
vis = draw_aggregated_boxes(image_path, aggregated)
from matplotlib import pyplot as plt
plt.figure(figsize=(8,8))
plt.imshow(cv2.cvtColor(vis, cv2.COLOR_BGR2RGB))
plt.axis("off")

In [None]:
import pandas as pd
if isinstance(aggregated, tuple):
    aggregated = aggregated[0]
if aggregated:
    df = pd.DataFrame([{
        'class': a['class'],
        'mean_conf': a['mean_conf'],
        'std_conf': a['std_conf'],
        'detection_frequency': a['detection_frequency'],
        'uncertainty_score': a['uncertainty_score'],
        'count': a['count'],
        'mean_box': a['mean_box'],
        'std_box': a['std_box']
    } for a in aggregated])
    display(df)
else:
    print('No detections found.')