### SETUP

In [2]:
from abc import ABC, abstractmethod

class BaseDetector(ABC):
    """Abstract base class for object detection models in the Guided Pipeline.

    The Guided Pipeline usually has two stages:
      - A guide or windshield detector that finds the region of interest.
      - A sticker detector that runs inside those regions.

    Subclasses can choose their own argument signatures for training and
    prediction (for example using a YOLO data YAML path), but they must
    follow the return formats described in the method docstrings so that
    the pipeline can consume their outputs.
    """

    @abstractmethod
    def train_model(self, data, **kwargs):
        """Train the detector model.

        Typical usage in the Guided Pipeline is to train the guide or
        windshield detector, but this method can also be used to train
        a sticker detector if needed.

        Args:
            data: Training data used to fit the model. The concrete
                implementation defines the exact type, for example:
                - Path to a YOLO data YAML file.
                - A dataset object or data loader.
                - A list of image paths or a dataset root directory.
            **kwargs: Additional keyword arguments for training. Implementations
                can support items such as:
                - Hyperparameters (epochs, batch size, learning rate).
                - Device selection.
                - Logging and checkpoint options.

        Returns:
            None. Implementations may optionally return internal training
            results, but the Guided Pipeline does not rely on a return value.
        """
        pass

    @abstractmethod
    def predict_windshield(self, data, **kwargs):
        """Run guide or windshield detection on full images.

        This method must run inference on full images and return the best
        guide or windshield bounding box per image in a standard format.

        Args:
            data: Input data to run inference on. The concrete implementation
                defines the exact type, for example:
                - Path to a YOLO data YAML file, which is then used to find
                  an image directory for a split.
                - A list of image paths.
                - A dataset object or iterable of images.
            **kwargs: Extra keyword arguments to control inference such as
                confidence threshold, IoU threshold, image size, device,
                batch size, or whether to save visualizations.

        Returns:
            list[dict]: A list of dictionaries, one per image, with the
            following structure:

                [
                    {
                        "image_path": "<path/to/image.jpg>",
                        "guides": [
                            {
                                "xyxy": [x1, y1, x2, y2],
                                "conf": float_confidence,
                                "cls": class_id
                            }
                        ]
                    },
                    ...
                ]

            Notes:
                - "image_path" must be the path of the corresponding image.
                - "guides" is a list of predicted guide boxes. Implementations
                  may choose to keep only the best box (like YOLODetector) or
                  multiple boxes, but the value must always be a list.
                - If an image has no detections, "guides" must be an empty list.
        """
        pass

    @abstractmethod
    def predict_sticker(self, data, **kwargs):
        """Run sticker detection on cropped or full images.

        This method is intended for the second stage of the Guided Pipeline.
        It typically receives cropped windshield images or a split of a
        dataset and returns all sticker detections for each image.

        Args:
            data: Input data for sticker inference. The concrete implementation
                defines the exact structure, for example:
                - Path to a YOLO data YAML file for a sticker crop dataset.
                - A list of crop image paths.
                - An iterable of images or crop records.
            **kwargs: Extra keyword arguments to control inference such as
                confidence threshold, IoU threshold, image size, device,
                batch size, or output saving options.

        Returns:
            list[dict]: A list of dictionaries, one per image or crop, with
            the following structure (as used by YOLODetector):

                [
                    {
                        "crop_path": "<path/to/crop_or_image.jpg>",
                        "boxes": [
                            {
                                "xyxy": [x1, y1, x2, y2],
                                "conf": float_confidence,
                                "cls": class_id
                            },
                            ...
                        ]
                    },
                    ...
                ]

            Notes:
                - "crop_path" should point to the image or crop used for
                  sticker prediction.
                - "boxes" is a list of all predicted sticker bounding boxes.
                - If an image has no detections, "boxes" must be an empty list.
        """
        pass


### YOLOv8


In [3]:
from ultralytics import YOLO
import torch
import yaml, os, cv2, uuid, shutil
import matplotlib.pyplot as plt
from pathlib import Path
import numpy as np
from shutil import copy2

# from base_detector import BaseDetector


class YOLODetector(BaseDetector):
    """YOLOv8 based implementation of :class:`BaseDetector` for the Guided Pipeline.

    This class wraps an ``ultralytics.YOLO`` model and provides:

      * Training utilities that integrate with a Reduce-on-Plateau scheduler.
      * Windshield prediction on full images in the Guided Pipeline format.
      * Sticker prediction on cropped images.

    The class is intended to be used as the guide or sticker detector within a
    two stage Guided Pipeline.
    """

    def __init__(
        self,
        pretrained: str = "yolov8n.pt",
        lr0: float = 0.001,
        momentum: float = 0.95,
        weight_decay: float = 0.0005,
        freeze: int = 10,
        ReduceLRonPlateau: bool = False,
    ):
        """Initialize a YOLODetector instance.

        Args:
            pretrained: Path to a pretrained YOLOv8 checkpoint or a model name
                that ``ultralytics.YOLO`` can load, for example ``"yolov8n.pt"``.
            lr0: Initial learning rate used during training.
            momentum: Momentum parameter for the SGD optimizer.
            weight_decay: Weight decay (L2 regularization) used during training.
            freeze: Number of layers or blocks that YOLO should freeze during
                training. Passed to ``YOLO.train`` via the ``freeze`` argument.
            ReduceLRonPlateau: If ``True``, enable learning rate scheduling using
                the custom ``ReduceOnPlateauMAP50_WithDetectorNNClone`` callback.
        """
        self.pretrained = pretrained
        self.model = YOLO(pretrained)

        self.lr0 = lr0
        self.momentum = momentum
        self.weight_decay = weight_decay
        self.freeze = freeze
        self.ReduceLRonPlateau = ReduceLRonPlateau

        print('pretrained:', pretrained)

    def get_split_dir(self, yaml_path, split: str = "train") -> str:
        """Get the image directory for a given split from a YOLO data YAML.

        This helper assumes the standard directory layout:

            <dataset_root>/
                data.yaml
                train/
                    images/
                    labels/
                val/
                    images/
                    labels/
                test/
                    images/
                    labels/

        Args:
            yaml_path: Path to a YOLO data YAML file.
            split: Dataset split key such as ``"train"``, ``"val"`` or ``"test"``.

        Returns:
            The path to the corresponding ``<split>/images`` directory as a
            string.
        """
        return str(Path(yaml_path).parent / split / "images")

    def _ensure_overrides_model(self):
        """Ensure that the underlying YOLO model has a valid ``overrides['model']``.

        Some ultralytics workflows expect ``model.overrides['model']`` to contain
        the model configuration path or checkpoint. This method sets that field
        if it is missing or ``None``, using either ``model.cfg`` or the
        ``pretrained`` path passed at initialization.
        """
        m = self.model
        if not hasattr(m, "overrides"):
            m.overrides = {}
        if "model" not in m.overrides or m.overrides["model"] is None:
            m.overrides["model"] = getattr(m, "cfg", None) or self.pretrained

    ########################################################
    def train_model(
        self,
        data_yaml,
        classes=None,
        epochs: int = 300,
        imgsz: int = 1200,
        seed: int = 30,
        AP_split: str = 'train',
        yolo_names=['car-sticker'],
        scheduled_epochs=None,
        conf: float = 0.5,
        iou: float = 0.5,
        **kwargs,
    ):
        """Train the YOLO model as a guide or windshield detector.

        This method wraps ``ultralytics.YOLO.train`` and optionally attaches a
        ``ReduceOnPlateauMAP50_WithDetectorNNClone`` callback to adjust the
        learning rate based on AP@50 on a COCO style evaluation.

        The intent is to train the first stage of the Guided Pipeline on
        windshield or guide bounding boxes.

        Args:
            data_yaml: Path to the YOLO data YAML that defines train and val
                splits and class names.
            classes: Optional list of class indices to train on. If ``None``,
                YOLO uses all classes defined in the YAML.
            epochs: Number of training epochs.
            imgsz: Input image size. Passed to YOLO as ``imgsz``.
            seed: Random seed for reproducibility.
            AP_split: Dataset split to use when evaluating AP within the
                scheduler, for example ``"train"`` or ``"val"``.
            yolo_names: List of class names used when computing AP metrics in
                the scheduler.
            scheduled_epochs: Optional list of epochs where the LR should be
                changed explicitly by the scheduler. If ``None``, no fixed
                schedule is applied.
            conf: Confidence threshold used during internal AP evaluation.
            iou: IoU threshold used during internal AP evaluation.
            **kwargs: Additional keyword arguments forwarded to
                ``YOLO.train``, such as augmentation options or logging flags.

        Returns:
            None. The underlying YOLO training call returns a ``Results``
            object, but this method does not forward it. Training history and
            test metrics are recorded inside the scheduler callback.
        """
        if scheduled_epochs is None:
            scheduled_epochs = []

        Global_History = []
        Test_History = []

        hyp_arr = {
            'lr0': self.lr0,
            'momentum': self.momentum,
            'weight_decay': self.weight_decay,
        }

        hyp = hyp_arr
        detector_factory = lambda: YOLODetector(pretrained=self.pretrained)     # Model copy

        plateau_cb = ReduceOnPlateauMAP50_WithDetectorNNClone(
            # ReduceOnPlateaurAP50
            use_plateau=self.ReduceLRonPlateau,                                  # turn off patience based logic
            factor=0.1,
            patience=10,                                                         # first plateau needs 100 bad epochs
            patience_after_first=10,                                             # after first LR drop, use 50
            cooldown=0,
            min_lr=0.000001, warmup_epochs=0, start_after_map=-1.0,             # patience counts only after AP50 > ...

            # MultiStepLR
            scheduled_epochs=scheduled_epochs,                                   # reduce exactly at these epochs
            scheduled_factors={},                                                # fallback to .factor if missing Epoch:Factor

            # For AP Generation
            owner=self,
            detector_factory=detector_factory,
            evaluate_fn=evaluate_fn,
            coco_split_dir=COCO_SPLIT,
            yolo_names=yolo_names,
            predict_kwargs=dict(
                data_yaml=DATA_YAML, classes=classes,
                imgsz=(800, 1200), AP_split=AP_split,
                conf=conf, iou=iou, verbose=False, save=False
            ),
            eval_every=10,
            clone_device="cuda:0",                                               # "cpu" or "cuda:0" if you prefer speed
            verbose=True,

            # Only for epoch testing, this does not affect training
            test_eval_start_epoch=999999999,
            test_eval_epochs=[],
            test_eval_every=99999,
            test_predict_kwargs=dict(                                            # only deltas from predict_kwargs are needed
                split="test",                                                    # ensure test split
                data_yaml=DATA_YAML
            ),
            test_coco_split_dir=COCO_SPLIT_TEST
        )

        # Register hooks
        self.model.add_callback("on_train_epoch_start", plateau_cb.on_train_epoch_start)
        self.model.add_callback("on_fit_epoch_end", plateau_cb)
        self.model.add_callback("on_fit_end", plateau_cb.on_fit_end)
        self.model.add_callback("on_train_end", plateau_cb.on_train_end)

        # Train
        results = self.model.train(
            data=data_yaml, classes=classes,
            epochs=epochs, imgsz=imgsz,
            conf=conf, iou=iou,
            optimizer="SGD", lr0=hyp['lr0'],
            momentum=hyp['momentum'],
            weight_decay=hyp['weight_decay'],
            seed=seed,
            batch=4,
            device=0,
            cos_lr=False,
            lrf=1.0,
            warmup_epochs=0,
            freeze=self.freeze,
            **kwargs
        )

        Global_History.append({'history': plateau_cb.history})
        Test_History.append({'Metrics:': plateau_cb.test_ap_history})

    ########################################################
    def predict_windshield(
        self,
        data_yaml,
        classes=None,
        imgsz=(800, 1200),
        split: str = 'test',
        conf: float = 0.5,
        iou: float = 0.5,
        **kwargs,
    ):
        """Run guide or windshield detection on full images.

        This method scans the image directory corresponding to the requested
        split in the given YOLO data YAML, runs YOLO inference, and returns the
        best windshield bounding box per image in the Guided Pipeline format.

        Args:
            data_yaml: Path to the YOLO data YAML file.
            classes: Optional list of class indices to detect. If ``None``, use
                all classes.
            imgsz: Image size (height, width) to use during inference.
            split: Dataset split key, for example ``"train"`` or ``"test"``.
            conf: Minimum confidence threshold for YOLO predictions.
            iou: IoU threshold used for internal non maximum suppression.
            **kwargs: Additional keyword arguments passed to
                ``YOLO.predict`` such as device, half precision, or visualization
                flags.

        Returns:
            list[dict]: A list with one entry per image, where each entry has
            the structure:

                {
                    "image_path": "<absolute or relative image path>",
                    "guides": [
                        {
                            "xyxy": [x1, y1, x2, y2],
                            "conf": float_confidence,
                            "cls": class_id
                        }
                    ]
                }

            If an image has no detections, the ``"guides"`` list is empty.
        """
        data_dir = self.get_split_dir(data_yaml, split)
        print('data_dir: ', data_dir)

        stream = self.model.predict(
            source=data_dir,
            conf=conf, iou=iou,
            classes=classes,
            imgsz=imgsz,
            stream=True,
            save=True,
            **kwargs
        )

        predicted_ws = []

        for r in stream:
            img_path = str(r.path)

            if r.boxes is None or len(r.boxes) == 0:
                predicted_ws.append({"image_path": img_path, "guides": []})
                continue

            # tensors -> numpy
            xyxy = r.boxes.xyxy.cpu().numpy()              # (N,4) [x1,y1,x2,y2]
            score = r.boxes.conf.cpu().numpy()             # (N,)
            cls = r.boxes.cls.cpu().numpy() if r.boxes.cls is not None else None

            # sort by confidence (desc)
            order = score.argsort()[::-1]
            xyxy = xyxy[order]
            score = score[order]
            if cls is not None:
                cls = cls[order]

            # index of best detection
            best_idx = int(score.argmax())

            b = xyxy[best_idx].tolist()
            g = {"xyxy": b, "conf": float(score[best_idx])}
            if cls is not None:
                g["cls"] = int(cls[best_idx])

            print(f"{img_path}: keeping best box -> conf={score[best_idx]:.3f}")

            predicted_ws.append({"image_path": img_path, "guides": [g]})

        return predicted_ws

    def predict_sticker(
        self,
        data_yaml,
        classes=None,
        imgsz=(800, 1200),
        split: str = 'train',
        conf: float = 0.5,
        iou: float = 0.5,
        save: bool = False,
        **kwargs,
    ):
        """Run sticker detection on cropped or full images.

        This method operates on all images in the specified split of the data
        YAML, runs YOLO inference, and returns all bounding box predictions for
        each image.

        It is intended for the second stage of the Guided Pipeline where the
        model predicts stickers inside previously cropped windshield regions or
        inside full images.

        Args:
            data_yaml: Path to the YOLO data YAML file.
            classes: Optional list of class indices to detect. If ``None``, use
                all classes.
            imgsz: Image size (height, width) to use during inference.
            split: Dataset split key, for example ``"train"`` or ``"test"``.
            conf: Minimum confidence threshold for YOLO predictions.
            iou: IoU threshold used for internal non maximum suppression.
            save: If ``True``, YOLO saves visualization outputs in its standard
                ``runs/detect`` directory.
            **kwargs: Additional keyword arguments forwarded to
                ``YOLO.predict``.

        Returns:
            list[dict]: A list with one entry per image, where each entry has
            the structure:

                {
                    "crop_path": "<image path>",
                    "boxes": [
                        {
                            "xyxy": [x1, y1, x2, y2],
                            "conf": float_confidence,
                            "cls": class_id
                        },
                        ...
                    ]
                }

            If an image has no detections, ``"boxes"`` is an empty list.
        """
        data_dir = self.get_split_dir(data_yaml, split)
        print('data_dir: ', data_dir)

        print("Prediction Start...\n\n")

        results = self.model.predict(
            source=data_dir,
            conf=conf, iou=iou,
            imgsz=imgsz,
            classes=classes,
            stream=True,
            save=save
        )

        preds = []
        for i, r in enumerate(results, 1):
            path = str(r.path)
            H, W = r.orig_shape  # (H, W)
            ms = float(r.speed['inference'])

            if r.boxes is None or len(r.boxes) == 0:
                preds.append({"crop_path": path, "boxes": []})
                continue

            xyxy = r.boxes.xyxy.cpu().numpy()
            confs = r.boxes.conf.cpu().numpy()
            clses = r.boxes.cls.cpu().numpy() if r.boxes.cls is not None else None

            # sort by confidence (desc), keep all
            order = confs.argsort()[::-1]
            xyxy = xyxy[order]
            confs = confs[order]
            if clses is not None:
                clses = clses[order]

            boxes = []
            for j in range(len(confs)):
                b = {
                    "xyxy": xyxy[j].tolist(),
                    "conf": float(confs[j])
                }
                if clses is not None:
                    b["cls"] = int(clses[j])
                boxes.append(b)

            preds.append({"crop_path": path, "boxes": boxes})

        return preds

Creating new Ultralytics Settings v0.0.6 file âœ… 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


### PIPELINE

In [5]:
from tqdm import tqdm
import glob
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import json, time


class GuidedPipeline:
    """Two stage guided detection pipeline for windshield and sticker detection.

    This pipeline coordinates two detectors that implement :class:`BaseDetector`:

      * ``guide`` - a guide or windshield detector that runs on full images and
        returns guide bounding boxes in the standard Guided Pipeline format:
        ``{"image_path": str, "guides": [{"xyxy": [...], "conf": float, "cls": int}, ...]}``.
      * ``detector`` - a sticker detector that runs on cropped windshield
        regions and returns sticker predictions in the standard format used by
        :meth:`BaseDetector.predict_sticker`.

    The typical flow is:

      1. Train the guide detector on full images for the windshield class.
      2. Use the guide detector to predict windshields and crop those regions.
      3. Build a YOLO dataset of sticker crops.
      4. Train the sticker detector on that crop dataset.
      5. At inference time, repeat steps 2 and 3, then run the sticker detector
         and remap crop coordinates back to the original image frame.

    The pipeline is designed to be compatible with :class:`YOLODetector` for
    both the guide and sticker stages.
    """

    def __init__(
        self,
        detector: BaseDetector,
        guide: BaseDetector,
        coco_split_dir,
        conf=[0.5, 0.5],
        iou=[0.5, 0.5],
        input_size=(960, 544),
        seed=30,
    ):
        """Initialize the GuidedPipeline.

        Args:
            detector: Sticker detector used in the second stage. Must implement
                :class:`BaseDetector` and return sticker predictions in the
                format used by :meth:`BaseDetector.predict_sticker`.
            guide: Guide or windshield detector used in the first stage. Must
                implement :class:`BaseDetector` and return windshields in the
                format used by :meth:`BaseDetector.predict_windshield`.
            coco_split_dir: Path to the COCO style split directory for this
                dataset. This is typically used by external evaluation utilities
                such as :func:`evaluate_fn`.
            conf: Two element list of confidence thresholds
                ``[sticker_conf, windshield_conf]`` used during prediction.
            iou: Two element list of IoU thresholds
                ``[sticker_iou, windshield_iou]`` used during prediction.
            input_size: Input resolution (width, height) that can be used by
                detectors. This value is stored but not enforced by the
                pipeline itself.
            seed: Seed for reproducibility. Forwarded to detector training
                calls where supported.
        """
        self.detector = detector
        self.guide = guide
        self.coco_split_dir = coco_split_dir
        self.conf = conf
        self.iou = iou
        self.input_size = input_size
        self.seed = seed

    ############################
    # Crop dataset utilities
    ############################

    def delete_folder_if_exists(self, folder_path):
        """Delete a folder and all its contents if it exists.

        Args:
            folder_path: Path to the folder to delete. This can be a string or
                a :class:`pathlib.Path` object.

        Side effects:
            Logs a message and removes the directory recursively if it exists.
        """
        path = Path(folder_path)
        if path.exists() and path.is_dir():
            print(f"Deleting existing folder: {path}")
            shutil.rmtree(path)

    def _yolo_line_to_xyxy(self, line: str, W: int, H: int):
        """Convert a YOLO label line to pixel coordinates in xyxy format.

        YOLO label lines have the form:

            cls cx cy w h

        where all coordinates are normalized relative to image width and height.

        Args:
            line: A single label line from a YOLO ``.txt`` file.
            W: Image width in pixels.
            H: Image height in pixels.

        Returns:
            Tuple ``(cls, x1, y1, x2, y2)`` where ``cls`` is the integer class
            id and ``x1, y1, x2, y2`` are the bounding box coordinates in pixels.
        """
        # "cls cx cy w h" (normalized) -> pixel xyxy
        c, cx, cy, w, h = map(float, line.split())
        bw, bh = w * W, h * H
        x1 = (cx * W) - bw / 2
        y1 = (cy * H) - bh / 2
        x2 = x1 + bw
        y2 = y1 + bh
        return int(c), x1, y1, x2, y2

    def _xyxy_to_yolo_line(self, x1, y1, x2, y2, W, H, cls=0):
        """Convert a pixel xyxy bounding box to a YOLO label line.

        Args:
            x1: Left coordinate of the bounding box in pixels.
            y1: Top coordinate of the bounding box in pixels.
            x2: Right coordinate of the bounding box in pixels.
            y2: Bottom coordinate of the bounding box in pixels.
            W: Image width in pixels.
            H: Image height in pixels.
            cls: Integer class id to write.

        Returns:
            A YOLO label line as a string in the format:

                "cls cx cy w h\\n"

            where ``cx, cy, w, h`` are normalized to the range [0, 1]. If the
            box is degenerate or has non positive width or height, returns
            ``None``.
        """
        # pixel xyxy -> "cls cx cy w h" normalized to W,H
        bw = max(0.0, x2 - x1)
        bh = max(0.0, y2 - y1)
        if bw <= 0 or bh <= 0:
            return None
        cx = (x1 + x2) / 2.0 / W
        cy = (y1 + y2) / 2.0 / H
        w = bw / W
        h = bh / H
        # clip to [0,1] just in case
        cx = min(max(cx, 0.0), 1.0)
        cy = min(max(cy, 0.0), 1.0)
        w = min(max(w, 0.0), 1.0)
        h = min(max(h, 0.0), 1.0)
        if w <= 0 or h <= 0:
            return None
        return f"{int(cls)} {cx:.6f} {cy:.6f} {w:.6f} {h:.6f}\n"

    def _label_path_for_image(self, img_path: Path) -> Path:
        """Return the YOLO label path corresponding to an image path.

        This helper expects the standard YOLO layout where labels are stored
        in a sibling ``labels`` directory:

            .../<split>/images/<name>.jpg
            .../<split>/labels/<name>.txt

        Args:
            img_path: Path to an image in the ``images`` directory.

        Returns:
            Path to the corresponding label file in the ``labels`` directory.
        """
        return img_path.parent.parent / "labels" / (img_path.stem + ".txt")

    def _resolve_split_dir(self, data_yaml_path: Path, split_key: str) -> Path | None:
        """Resolve the directory path for a dataset split from a data YAML.

        This method reads the given data YAML and looks up the value under the
        specified split key (for example ``"train"``, ``"val"``, or ``"test"``).
        If the path in the YAML is relative, it is resolved relative to the
        directory containing the YAML file.

        Args:
            data_yaml_path: Path to the data YAML file.
            split_key: Key of the split to resolve, for example ``"test"``.

        Returns:
            A :class:`Path` to the split directory, or ``None`` if the split
            key is missing or empty.
        """
        y = yaml.safe_load(data_yaml_path.read_text())
        rel = y.get(split_key)
        if not rel:
            return None
        p = Path(rel)
        return p if p.is_absolute() else (data_yaml_path.parent / p)

    def build_sticker_crop_dataset(
        self,
        cropped_windshields,
        data_yaml,
        class_needed: str = 'car-sticker',
        out_root: str = "/content/sticker_crops",
    ):
        """Build a YOLO sticker crop dataset from cropped windshield regions.

        This method constructs a new YOLO dataset rooted at ``out_root`` with
        the following structure:

            out_root/
                train/
                    images/
                    labels/
                test/
                    images/
                    labels/
                data.yaml

        The train split is populated from the provided ``cropped_windshields``
        list, while the test split is copied from the original dataset defined
        in ``data_yaml`` (if a test split exists there).

        Bounding boxes from the original labels are intersected with each crop
        and remapped into crop coordinates, then written in YOLO format.

        Args:
            cropped_windshields: Iterable of crop records. Each record is
                expected to be a dictionary with keys:

                  * ``"crop"``: Crop image as a NumPy array (H, W, C).
                  * ``"box"``: Tuple ``(x1, y1, x2, y2)`` in original image
                    pixel coordinates describing the crop region.
                  * ``"image_path"``: Path to the original source image.

            data_yaml: Path to the original YOLO data YAML file. Class names
                and the original test split are read from this file.
            class_needed: Logical class name used to identify sticker classes,
                for example ``"car-sticker"``. The name is normalized to
                lowercase and underscores before matching against the YAML
                names.
            out_root: Root directory where the new crop dataset should be
                created. Existing contents at this path are deleted.

        Returns:
            str: Path to the newly created ``data.yaml`` file inside
            ``out_root``.

        Raises:
            ValueError: If ``cropped_windshields`` is empty.
        """
        import re  # local import is fine here

        def _norm(s: str) -> str:
            # normalize "Car-Sticker", "car sticker", etc. -> "car_sticker"
            return re.sub(r'[^a-z0-9]+', '_', s.strip().lower())

        self.delete_folder_if_exists(out_root)

        out_root = Path(out_root)
        (out_root / "train/images").mkdir(parents=True, exist_ok=True)
        (out_root / "train/labels").mkdir(parents=True, exist_ok=True)
        (out_root / "test/images").mkdir(parents=True, exist_ok=True)
        (out_root / "test/labels").mkdir(parents=True, exist_ok=True)

        if not cropped_windshields:
            raise ValueError("No cropped windshields provided.")

        data_yaml_path = Path(data_yaml)
        y = yaml.safe_load(data_yaml_path.read_text())

        # preserve class list/order
        names_raw = y.get("names")
        if isinstance(names_raw, dict):
            names = [names_raw[k] for k in sorted(map(int, names_raw.keys()))]
            write_as_dict = True
            id2name = {int(k): v for k, v in names_raw.items()}
        else:
            names = list(names_raw) if names_raw is not None else []
            write_as_dict = False
            id2name = {i: n for i, n in enumerate(names)}
        nc = len(names)

        # figure out which original class IDs correspond to "car_sticker"
        wanted_names = {class_needed}
        wanted_norm = {_norm(w) for w in wanted_names}
        sticker_ids = {i for i, n in id2name.items() if _norm(str(n)) in wanted_norm}
        if not sticker_ids:
            # fallback if names do not exist in YAML
            sticker_ids = {0}

        # original test dir (copy as-is)
        src_test_dir = self._resolve_split_dir(data_yaml_path, "test")

        # cache original image sizes
        shape_cache = {}
        EPS = 1e-6
        MIN_PX = 2

        # filename bookkeeping to preserve names and handle multiple crops per image
        # key: absolute original image path -> number of crops already written
        crop_counts = {}

        # write crops for TRAIN folder only
        for it in cropped_windshields:
            crop_img, (x1c, y1c, x2c, y2c), orig_img_path = it["crop"], it["box"], Path(it["image_path"])

            # base stem + extension from original
            base_stem = orig_img_path.stem
            ext = orig_img_path.suffix if orig_img_path.suffix else ".jpg"

            # pick filename: first crop keeps exact name, subsequent get suffix _c2, _c3, ...
            n = crop_counts.get(orig_img_path, 0) + 1
            crop_counts[orig_img_path] = n
            if n == 1:
                img_name = f"{base_stem}{ext}"
            else:
                img_name = f"{base_stem}_c{n}{ext}"

            img_out = out_root / "train/images" / img_name
            lbl_out = out_root / "train/labels" / (Path(img_name).stem + ".txt")

            # ensure uint8 3ch
            arr = crop_img
            if isinstance(arr, np.ndarray) and arr.dtype != np.uint8:
                arr = arr.astype(np.uint8)
            if arr.ndim == 2:
                arr = cv2.cvtColor(arr, cv2.COLOR_GRAY2BGR)
            cv2.imwrite(str(img_out), arr)

            # map GT labels that intersect crop
            orig_lbl_path = self._label_path_for_image(orig_img_path)
            crop_W = max(EPS, (x2c - x1c))
            crop_H = max(EPS, (y2c - y1c))
            lines_out = []

            if orig_lbl_path.exists():
                if orig_img_path not in shape_cache:
                    im = cv2.imread(str(orig_img_path))
                    shape_cache[orig_img_path] = None if im is None else (im.shape[1], im.shape[0])  # (W,H)
                shape = shape_cache.get(orig_img_path)
                if shape is not None:
                    W, H = shape
                    with open(orig_lbl_path, "r") as f:
                        for line in f:
                            line = line.strip()
                            if not line:
                                continue
                            cls, bx1, by1, bx2, by2 = self._yolo_line_to_xyxy(line, W, H)

                            # intersect with crop (original coords)
                            ix1 = max(bx1, x1c)
                            iy1 = max(by1, y1c)
                            ix2 = min(bx2, x2c)
                            iy2 = min(by2, y2c)
                            if ix2 - ix1 < MIN_PX or iy2 - iy1 < MIN_PX:
                                continue  # no overlap / too tiny

                            # map to crop-local
                            cx1 = ix1 - x1c
                            cy1 = iy1 - y1c
                            cx2 = ix2 - x1c
                            cy2 = iy2 - y1c

                            yline = self._xyxy_to_yolo_line(
                                cx1, cy1, cx2, cy2, crop_W, crop_H, cls=int(cls)
                            )
                            if yline:
                                lines_out.append(yline)

            with open(lbl_out, "w") as f:
                for l in lines_out:
                    f.write(l)

        # copy ORIGINAL TEST split as-is
        if src_test_dir and (src_test_dir / "images").exists():
            for img in (src_test_dir / "images").glob("*.*"):
                if img.suffix.lower().lstrip(".") in {
                    'bmp', 'mpo', 'jpg', 'pfm', 'tif', 'tiff',
                    'png', 'webp', 'jpeg', 'dng', 'heic'
                }:
                    dst_img = out_root / "test/images" / img.name
                    dst_lbl = out_root / "test/labels" / (img.stem + ".txt")
                    shutil.copy2(img, dst_img)
                    lbl = src_test_dir / "labels" / (img.stem + ".txt")
                    if lbl.exists():
                        shutil.copy2(lbl, dst_lbl)

        # YAML: train and val both use train/, test uses test/ (if any)
        yaml_lines = [
            f"path: {out_root}",
            "train: train/images",
            "val: train/images",
        ]
        if (out_root / "test/images").exists() and any((out_root / "test/images").iterdir()):
            yaml_lines.append("test: test/images")
        yaml_lines += [f"nc: {nc}", "names:"]
        if write_as_dict:
            for idx, name in enumerate(names):
                yaml_lines.append(f"  {idx}: {name}")
        else:
            yaml_lines.append("  " + str(names))
        (out_root / "data.yaml").write_text("\n".join(yaml_lines) + "\n")

        print(
            'Built crop dataset with original filenames (subsequent crops use _c2/_c3 suffixes). '
            f"Root: {out_root}"
        )
        return str(out_root / "data.yaml")

    ############################
    # Core pipeline flow
    ############################

    def crop_windshields(self, predicted_ws):
        """Crop windshield regions from full images.

        This function consumes the guide predictions returned by
        :meth:`BaseDetector.predict_windshield` and extracts padded crops
        around each guide bounding box.

        Args:
            predicted_ws: List of guide prediction entries, where each entry
                has the form:

                    {
                        "image_path": "<path/to/image.jpg>",
                        "guides": [
                            {
                                "xyxy": [x1, y1, x2, y2],
                                "conf": float_confidence,
                                "cls": class_id
                            },
                            ...
                        ]
                    }

                If ``"guides"`` is empty, no crops are produced for that image.

        Returns:
            list[dict]: A list of crop records, each with the structure:

                {
                    "image_path": "<original/image/path.jpg>",
                    "crop": np.ndarray,              # cropped BGR image
                    "box": [x1c, y1c, x2c, y2c],     # crop box in original coords
                    "conf": float_confidence         # guide confidence
                }

            Entries whose crops are empty are skipped.
        """
        cropped_images = []
        pad = 5
        for item in tqdm(predicted_ws, desc="Cropping windshields"):
            img_path = item["image_path"]
            img = cv2.imread(img_path)
            if img is None:
                continue

            for g in item["guides"]:
                xy = g["xyxy"]
                conf = g.get("conf", 0.0)

                x1, y1, x2, y2 = map(int, xy[:4])

                x1c = max(0, x1 - pad)
                y1c = max(0, y1 - pad)
                x2c = min(img.shape[1], x2 + pad)
                y2c = min(img.shape[0], y2 + pad)
                crop = img[y1c:y2c, x1c:x2c]
                if crop.size == 0:
                    continue

                cropped_images.append({
                    "image_path": img_path,
                    "crop": crop,
                    "box": [x1c, y1c, x2c, y2c],
                    "conf": conf
                })
        return cropped_images

    def _make_crop_index(self, cropped_windshields, out_root="/content/sticker_crops"):
        """Build an index that maps crop file paths back to original images.

        This helper reconstructs the filenames that will be written by
        :meth:`build_sticker_crop_dataset` for the train split and builds a
        mapping from crop path to the original image path and crop box
        coordinates.

        It assumes the following behavior:

          * Crops are saved in ``out_root/train/images``.
          * The first crop for an original image keeps the base name
            ``<stem><ext>``.
          * Subsequent crops for the same original image receive suffixes
            ``_c2``, ``_c3``, and so on.

        Args:
            cropped_windshields: List of crop records produced by
                :meth:`crop_windshields`.
            out_root: Root directory where the sticker crop dataset is or will
                be created.

        Returns:
            dict: A mapping from crop image path to metadata:

                {
                    "<out_root>/train/images/<name>.jpg": {
                        "orig_path": "<original/image/path.jpg>",
                        "crop_box": [x1c, y1c, x2c, y2c]
                    },
                    ...
                }
        """
        out_root = Path(out_root)
        img_dir = out_root / "train" / "images"

        crop_counts = {}
        index = {}

        for it in cropped_windshields:
            orig_img_path = Path(it["image_path"])
            x1c, y1c, x2c, y2c = it["box"]
            base_stem = orig_img_path.stem
            ext = orig_img_path.suffix if orig_img_path.suffix else ".jpg"

            n = crop_counts.get(orig_img_path, 0) + 1
            crop_counts[orig_img_path] = n

            if n == 1:
                img_name = f"{base_stem}{ext}"
            else:
                img_name = f"{base_stem}_c{n}{ext}"

            crop_path = str(img_dir / img_name)

            index[crop_path] = {
                "orig_path": str(orig_img_path),
                "crop_box": [int(x1c), int(y1c), int(x2c), int(y2c)]
            }

        return index

    def remap_sticker_predictions(self, sticker_preds, crop_index):
        """Remap crop space sticker predictions back to original image space.

        This function takes sticker predictions produced by the second stage
        detector and moves each bounding box from crop coordinates into the
        coordinate frame of the original image, using the crop index built by
        :meth:`_make_crop_index`.

        Args:
            sticker_preds: Sticker predictions returned by
                :meth:`BaseDetector.predict_sticker`, typically with entries of
                the form:

                    {
                        "crop_path": "<path/to/crop.jpg>",
                        "boxes": [
                            {
                                "xyxy": [x1, y1, x2, y2],
                                "conf": float_confidence,
                                "cls": class_id
                            },
                            ...
                        ]
                    }

            crop_index: Mapping produced by :meth:`_make_crop_index` where each
                crop path is mapped to an original image path and crop box.

        Returns:
            dict: Mapping from original image path to a list of remapped boxes:

                {
                    "<original/image/path.jpg>": [
                        {
                            "xyxy": [X1, Y1, X2, Y2],   # original image coords
                            "conf": float_confidence,
                            "cls": class_id
                        },
                        ...
                    ],
                    ...
                }

            Entries whose crop paths are not found in ``crop_index`` are
            silently skipped.
        """
        remapped = {}

        for item in sticker_preds:
            crop_path = item["crop_path"]
            boxes = item["boxes"]

            if crop_path not in crop_index:
                # silently skip if crop is not in index (for example test images not from crops)
                continue

            info = crop_index[crop_path]
            orig_path = info["orig_path"]
            x1c, y1c, x2c, y2c = info["crop_box"]

            L = remapped.setdefault(orig_path, [])

            for b in boxes:
                x1, y1, x2, y2 = b["xyxy"]
                # shift from crop local to original image coords
                X1 = float(x1) + x1c
                Y1 = float(y1) + y1c
                X2 = float(x2) + x1c
                Y2 = float(y2) + y1c

                nb = {
                    "xyxy": [X1, Y1, X2, Y2],
                    "conf": b.get("conf", 0.0)
                }
                if "cls" in b:
                    nb["cls"] = b["cls"]
                L.append(nb)

        return remapped

    def train(
        self,
        data_yaml,
        epochs=[100, 100],
        scheduled_epochs=[],
        skip_windshield_train=False,
        skip_sticker_train=False,
    ):
        """Train the guide and sticker detectors in a two stage procedure.

        Stage 1 - guide or windshield detector:

          1. Train ``self.guide`` on the full dataset for the windshield class
             (class index 1 by convention).
          2. Predict windshields on the train split using the trained guide.
          3. Crop predicted windshields with :meth:`crop_windshields`.
          4. Build a sticker crop dataset and YAML using
             :meth:`build_sticker_crop_dataset`.

        Stage 2 - sticker detector:

          1. Train ``self.detector`` on the sticker crop dataset for the
             sticker class (class index 0 by convention).

        Either stage can be skipped if a pretrained model or prebuilt dataset
        already exists.

        Args:
            data_yaml: Path to the original YOLO data YAML file describing the
                full dataset.
            epochs: Two element list ``[sticker_epochs, windshield_epochs]``
                controlling the number of training epochs for each stage.
            scheduled_epochs: Reserved for integration with schedulers such as
                :class:`ReduceOnPlateauMAP50_WithDetectorNNClone`. Not used
                directly in this method but can be forwarded through
                ``**kwargs`` in custom implementations.
            skip_windshield_train: If ``True``, skip training the guide
                detector and assume ``data_yaml`` already points to a sticker
                crop dataset.
            skip_sticker_train: If ``True``, skip training the sticker
                detector.

        Returns:
            None. Training side effects are handled by the detector
            implementations.
        """
        if not skip_windshield_train:
            print("Training Windshield Detector...")
            windshield_ap = self.guide.train_model(
                data_yaml=data_yaml,
                classes=[1],
                epochs=epochs[1],
                conf=self.conf[1], iou=self.iou[1],
                seed=self.seed,
                yolo_names=['windshield']
            )

            print("Predicting Windshields for training...\n\n")
            predicted_ws = self.guide.predict_windshield(
                data_yaml=data_yaml,
                conf=self.conf[1], iou=self.iou[1],
                classes=[1],
                split='train'
            )

            print("Cropping windshields for sticker training...")
            cropped_windshields = self.crop_windshields(predicted_ws)

            # display 1 image
            if len(cropped_windshields) > 0:
                first_crop = cropped_windshields[0]["crop"]
                plt.figure(figsize=(6, 6))
                plt.imshow(cv2.cvtColor(first_crop, cv2.COLOR_BGR2RGB))
                plt.axis("off")
                plt.title("First cropped windshield")
                plt.show()
            else:
                print("No windshields were cropped.")

            print("\n\nBuilding Dataset...")
            sticker_yaml = self.build_sticker_crop_dataset(cropped_windshields, data_yaml)
            print('\n\nDataset Built Successfully.\n')

        else:
            sticker_yaml = data_yaml
            print("Skipping windshield training.")

        if not skip_sticker_train:
            print("Training Sticker Detector...")
            self.detector.train_model(
                data_yaml=sticker_yaml,
                classes=[0],
                epochs=epochs[0],
                conf=self.conf[0], iou=self.iou[0],
                seed=self.seed,
                yolo_names=['car-sticker']
            )

    def predict(self, data_yaml, **kwargs):
        """Run the full two stage pipeline at inference time.

        The prediction flow is:

          1. Use ``self.guide.predict_windshield`` on the test split of
             ``data_yaml`` to obtain guide boxes for each full image.
          2. Crop the predicted windshields with :meth:`crop_windshields`.
          3. Build a temporary sticker crop dataset and YAML via
             :meth:`build_sticker_crop_dataset`.
          4. Run ``self.detector.predict_sticker`` on the train split of the
             crop dataset (which contains all sticker crops).
          5. Remap crop space sticker predictions back to original image
             coordinates with :meth:`remap_sticker_predictions`.
          6. Visualize a small subset of results with
             :meth:`visualize_first5`.

        Args:
            data_yaml: Path to the original YOLO data YAML file describing the
                full dataset.
            **kwargs: Reserved for future options. Currently not used but can
                be wired to detectors if needed.

        Returns:
            dict: Remapped sticker predictions in the format produced by
            :meth:`remap_sticker_predictions`, that is:

                {
                    "<original/image/path.jpg>": [
                        {
                            "xyxy": [X1, Y1, X2, Y2],
                            "conf": float_confidence,
                            "cls": class_id
                        },
                        ...
                    ],
                    ...
                }
        """
        print("\n\nPredicting windshields...")
        predicted_ws = self.guide.predict_windshield(
            data_yaml=data_yaml,
            classes=[1],
            conf=self.conf[1], iou=self.iou[1],
            split='test'
        )

        print("\n\nCropping windshields...")
        windshield_results = self.crop_windshields(predicted_ws)

        # display 1 image
        if len(windshield_results) > 0:
            first_crop = windshield_results[0]["crop"]
            plt.figure(figsize=(6, 6))
            plt.imshow(cv2.cvtColor(first_crop, cv2.COLOR_BGR2RGB))
            plt.axis("off")
            plt.title("First cropped windshield")
            plt.show()

        else:
            print("No windshields were cropped.")

        print("\n\nBuilding Dataset...")
        sticker_yaml = self.build_sticker_crop_dataset(
            windshield_results,
            data_yaml
        )
        print('\n\nDataset Built Successfully.\n')

        print("\n\nPredicting Stickers...")
        # split = train because sticker_yaml has cropped images saved in train folder
        sticker_preds = self.detector.predict_sticker(
            data_yaml=sticker_yaml,
            conf=self.conf[0], iou=self.iou[0],
            classes=[0],
            split='train'
        )

        print("Sticker Detection Complete.")

        print("\n\nRemapping Sticker Predictions...")
        # build crop index using the SAME windshield_results and out_root used in build_sticker_crop_dataset
        crop_index = self._make_crop_index(windshield_results, out_root="/content/sticker_crops")

        # remap crop space predictions to original image coordinates
        remapped = self.remap_sticker_predictions(sticker_preds, crop_index)

        print("\n\nRemapping Complete...")

        # visualize first 5 originals with remapped predictions
        self.visualize_first5(remapped, max_show=5)

        return remapped

    def visualize_first5(self, remapped_preds, max_show=5, thickness=2):
        """Visualize a subset of remapped predictions against ground truth.

        For up to ``max_show`` original images, this function:

          * Loads the original image.
          * Draws ground truth boxes from the matching YOLO label file
            (class 0 only) in blue.
          * Draws remapped predictions in green, labeled with confidence and
            optional class id.
          * Shows the result with Matplotlib.

        Args:
            remapped_preds: Dictionary of remapped predictions returned by
                :meth:`predict`.
            max_show: Maximum number of images to visualize.
            thickness: Line thickness in pixels for the drawn rectangles.

        Returns:
            None.
        """
        shown = 0
        for orig_path, boxes in remapped_preds.items():
            if shown >= max_show:
                break

            img = cv2.imread(orig_path)
            if img is None:
                continue
            H, W = img.shape[:2]

            # draw GT from YOLO label file (blue)
            gt_count = 0
            p = Path(orig_path)
            lbl_path = p.parent.parent / "labels" / (p.stem + ".txt")  # .../<split>/labels/<name>.txt
            if lbl_path.exists():
                with open(lbl_path, "r") as f:
                    for line in f:
                        parts = line.strip().split()
                        if len(parts) != 5:
                            continue
                        c, cx, cy, w, h = map(float, parts)   # cls cx cy w h  (normalized)

                        # filter: only show class 0 GT
                        if int(c) != 0:
                            continue

                        bw, bh = w * W, h * H
                        x1 = int(cx * W - bw / 2.0)
                        y1 = int(cy * H - bh / 2.0)
                        x2 = int(x1 + bw)
                        y2 = int(y1 + bh)
                        cv2.rectangle(img, (x1, y1), (x2, y2), (255, 0, 0), thickness)  # blue = GT
                        cv2.putText(
                            img, f"GT c{int(c)}",
                            (x1, max(0, y1 - 5)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                            (255, 0, 0), 1, cv2.LINE_AA
                        )
                        gt_count += 1

            # draw predictions (green)
            pred_count = 0
            for b in boxes:
                x1, y1, x2, y2 = map(int, b["xyxy"])
                cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), thickness)  # green = Pred
                label = f"{b.get('conf', 0):.2f}"
                if "cls" in b:
                    label = f"c{b['cls']}:{label}"
                cv2.putText(
                    img, label,
                    (x1, max(0, y1 - 5)),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                    (0, 255, 0), 1, cv2.LINE_AA
                )
                pred_count += 1

            plt.figure(figsize=(8, 6))
            plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
            plt.axis("off")
            plt.title(f"{Path(orig_path).name} | preds: {pred_count} | gt: {gt_count}")
            plt.show()

            shown += 1


### EVAL FN

In [9]:
from pathlib import Path
import json, time
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval

def evaluate_fn(
    coco_split_dir,
    predictions,
    class_map=None,
    yolo_names=None,
    save_dir="./coco_eval_out",
    target_size=(1200, 800),
):
    """Evaluate detector predictions on a COCO style dataset using AP and AR.

    This function converts predictions from the Guided Pipeline format into
    COCO style detection results, rescales both ground truth and predictions
    to a fixed reference size, and runs COCOeval to compute standard metrics.

    The function is designed to work with outputs produced by
    :meth:`BaseDetector.predict_sticker` or remapped predictions from
    :meth:`GuidedPipeline.remap_sticker_predictions`.

    Args:
        coco_split_dir: Path to a COCO style split directory containing
            ``_annotations.coco.json`` and the corresponding images.
        predictions: Detector predictions in one of two formats:

              dict:
                  {
                      "<image_path>": [
                          {
                              "xyxy": [x1, y1, x2, y2],
                              "conf": float_confidence,
                              "cls": class_id
                          },
                          ...
                      ],
                      ...
                  }

              list:
                  [
                      {
                          "image_path" or "orig_path" or "crop_path": "<path>",
                          "detections" or "boxes": [
                              {
                                  "xyxy": [x1, y1, x2, y2],
                                  "conf": float_confidence,
                                  "cls": class_id
                              },
                              ...
                          ]
                      },
                      ...
                  ]

        class_map: Optional mapping from YOLO class index to COCO category id,
            for example ``{0: 1}``. If ``None``, the mapping is inferred from
            ``yolo_names`` and the COCO categories where possible.
        yolo_names: Optional list of class names in YOLO order, used to infer
            ``class_map`` when it is not provided.
        save_dir: Output directory where the temporary COCO detection JSON
            will be written.
        target_size: Tuple ``(width, height)`` that specifies the reference
            resolution. Both GT boxes and prediction boxes are scaled into
            this size before evaluation. This is useful to evaluate models
            that resize images internally.

    Returns:
        float: AP@0.50 (COCO mAP50) from ``COCOeval.stats[1]``.

    Raises:
        ValueError: If a class mapping cannot be inferred for a multi class
            COCO dataset.
        TypeError: If ``predictions`` is neither a dict nor a list.
    """
    coco_split_dir = Path(coco_split_dir)
    ann_path = coco_split_dir / "_annotations.coco.json"

    # ---- load COCO GT ----
    cocoGt = COCO(str(ann_path))
    img_ids = cocoGt.getImgIds()
    imgs = cocoGt.loadImgs(img_ids)

    # map by basename
    basename_to_imgid = {Path(im["file_name"]).name: im["id"] for im in imgs}

    # categories
    cat_ids = cocoGt.getCatIds()
    cats = cocoGt.loadCats(cat_ids)
    name_to_catid = {c["name"]: c["id"] for c in cats}

    # ---- build class_map ----
    if class_map is None:
        if yolo_names:
            tmp = {i: name_to_catid[nm] for i, nm in enumerate(yolo_names) if nm in name_to_catid}
            if tmp: class_map = tmp
        if class_map is None:
            # fallback: try to find a single sticker like category
            sticker_id = next((c["id"] for c in cats if "sticker" in c["name"].lower()), None)
            if sticker_id: class_map = {0: sticker_id}
    if class_map is None and len(cats) == 1:
        class_map = {0: cats[0]["id"]}
    if class_map is None:
        raise ValueError("[evaluate] Could not infer class_map on multi-class COCO.")

    eval_cat_ids = sorted(set(class_map.values()))
    def to_cat_id(yolo_cls): return class_map.get(int(yolo_cls), eval_cat_ids[0])

    target_w, target_h = target_size

    # ---- scale COCO GT to target size ----
    for img in imgs:
        orig_w, orig_h = img["width"], img["height"]
        scale_x, scale_y = target_w / orig_w, target_h / orig_h
        ann_ids = cocoGt.getAnnIds(imgIds=[img["id"]])
        anns = cocoGt.loadAnns(ann_ids)
        for ann in anns:
            x, y, w, h = ann["bbox"]
            ann["bbox"] = [x * scale_x, y * scale_y, w * scale_x, h * scale_y]

    # ---- normalize predictions ----
    norm_preds = []
    if isinstance(predictions, dict):
        for k, v in predictions.items():
            norm_preds.append({"image_path": k, "detections": v})
    elif isinstance(predictions, list):
        for it in predictions:
            dets = it.get("detections", it.get("boxes", []))
            ip = it.get("orig_path") or it.get("image_path") or it.get("crop_path")
            norm_preds.append({"image_path": ip, "detections": dets})
    else:
        raise TypeError("predictions must be dict or list")

    # ---- build DT list (scale predictions to target_size) ----
    dt_list = []
    skipped = 0
    for it in norm_preds:
        ipath = it["image_path"]
        if not ipath: continue
        base = Path(ipath).name
        img_id = basename_to_imgid.get(base)
        if img_id is None:
            skipped += 1
            continue

        img_info = next(im for im in imgs if im["id"] == img_id)
        orig_w, orig_h = img_info["width"], img_info["height"]
        scale_x, scale_y = target_w / orig_w, target_h / orig_h

        for d in it.get("detections", []):
            x1, y1, x2, y2 = d["xyxy"]
            # scale predictions from original image to target_size
            x1, x2 = x1 * scale_x, x2 * scale_x
            y1, y2 = y1 * scale_y, y2 * scale_y
            w, h = max(0.0, x2 - x1), max(0.0, y2 - y1)
            if w <= 0 or h <= 0: continue

            dt_list.append({
                "image_id": int(img_id),
                "category_id": int(to_cat_id(d.get("cls", 0))),
                "bbox": [float(x1), float(y1), w, h],
                "score": float(d.get("conf", 0.0))
            })

    if skipped:
        print(f"[evaluate] Skipped {skipped} prediction entries not found in COCO GT by basename.")

    # ---- debug counts per eval category ----
    for cid in eval_cat_ids:
        n_gt = len(cocoGt.getAnnIds(catIds=[cid]))
        n_dt = sum(1 for d in dt_list if d["category_id"] == cid)
        cname = next((c["name"] for c in cats if c["id"] == cid), str(cid))
        print(f"[evaluate] Category '{cname}' (id={cid}): GT={n_gt}, DT={n_dt}")

    # ---- run COCOeval ----
    save_dir = Path(save_dir); save_dir.mkdir(parents=True, exist_ok=True)
    dt_json = save_dir / f"coco_dt_{int(time.time())}.json"
    dt_json.write_text(json.dumps(dt_list))

    cocoDt = cocoGt.loadRes(str(dt_json)) if len(dt_list) > 0 else COCO()
    E = COCOeval(cocoGt, cocoDt, iouType="bbox")
    E.params.imgIds = img_ids
    E.params.catIds = eval_cat_ids
    E.evaluate(); E.accumulate(); E.summarize()

    stats = {k: float(E.stats[i]) for i, k in enumerate([
        "AP@[.5:.95]", "AP@0.50", "AP@0.75", "AP_small", "AP_medium", "AP_large",
        "AR@1", "AR@10", "AR@100", "AR_small", "AR_medium", "AR_large"
    ])}

    ap50 = float(E.stats[1])
    print("COCO stats:", stats)
    return ap50


### ReduceLROnPlateau

In [7]:
from dataclasses import dataclass, field
from typing import Any, Callable, Optional, Dict, List
import copy, math
import torch


@dataclass
class ReduceOnPlateauMAP50_WithDetectorNNClone:
    """External AP50 based learning rate scheduler for Ultralytics YOLO.

    This callback evaluates AP at IoU 0.50 (AP50) using an external detector
    clone and a COCO style evaluation function, and reduces the learning rate
    when performance stalls.

    Workflow at selected epochs:

    1. Copy the current network weights from `owner.model` into a new detector
       instance from `detector_factory`.
    2. Run `clone_det.predict_sticker(**predict_kwargs)` on a chosen split
       (usually the train split).
    3. Compute AP50 with `evaluate_fn` on a COCO split at `coco_split_dir`.
    4. If AP50 has not improved for `patience` epochs (after activation),
       reduce learning rate by `factor` while respecting `min_lr`.
    5. Optionally run a second evaluation on a test split and log results.

    This class is meant to be attached to Ultralytics callbacks:

    * `model.add_callback("on_fit_epoch_end", plateau_cb)` calls `__call__`.
    * `model.add_callback("on_train_epoch_start", plateau_cb.on_train_epoch_start)`
      enforces target learning rates at the start of each epoch.
    * `model.add_callback("on_fit_end", plateau_cb.on_fit_end)` and
      `model.add_callback("on_train_end", plateau_cb.on_train_end)` print
      an evaluation summary.

    It assumes `owner` follows the `YOLODetector` style interface and that the
    clone has a compatible `predict_sticker` method that returns the standard
    sticker prediction format for the Guided Pipeline.
    """

    factor: float = 0.5
    patience: int = 10
    patience_after_first: Optional[int] = None
    cooldown: int = 0
    min_lr: float = 1e-6

    warmup_epochs: int = 0
    start_after_map: float = 0.0

    use_plateau: bool = True

    scheduled_epochs: List[int] = field(default_factory=list)
    scheduled_factors: Dict[int, float] = field(default_factory=dict)
    scheduled_set_lrs: Dict[int, float] = field(default_factory=dict)

    owner: Any = None
    detector_factory: Optional[Callable[[], Any]] = None
    evaluate_fn: Optional[Callable[..., float]] = None
    coco_split_dir: str = ""
    yolo_names: Optional[List[str]] = None
    predict_kwargs: Optional[Dict[str, Any]] = None
    eval_every: int = 1
    clone_device: Optional[str] = "cpu"
    verbose: bool = True

    test_eval_every: Optional[int] = 25
    test_eval_start_epoch: int = 0
    test_eval_epochs: Optional[List[int]] = None
    test_predict_kwargs: Optional[Dict[str, Any]] = None
    test_coco_split_dir: Optional[str] = None
    test_ap_history: List[Dict[str, Any]] = field(default_factory=list)

    _best: float = float("-inf")
    _bad: int = 0
    _cool: int = 0
    _active: bool = False

    history: List[Dict[str, Any]] = field(default_factory=list)
    _reduced_flag: bool = False
    _lr_after: Optional[List[float]] = None
    _printed: bool = False
    _target_lrs: Optional[List[float]] = None
    _had_reduction: bool = False

    _scheduled_applied: set = field(default_factory=set)

    def __call__(self, trainer):
        """Ultralytics callback entry that runs at the end of each epoch.

        At epochs that pass the warmup and `eval_every` conditions, this method:

        1. Clones the current YOLO network from `owner.model` into a new
           detector instance returned by `detector_factory`.
        2. Runs `clone_det.predict_sticker(**predict_kwargs)` with gradients
           disabled.
        3. Computes AP50 using `evaluate_fn` on `coco_split_dir`.
        4. Applies any scheduled LR events through `_maybe_scheduled_step`.
        5. If plateau logic is active and AP50 has not improved for `patience`
           epochs, reduces learning rate and starts a cooldown.
        6. Optionally runs a test evaluation on a different split and logs
           the AP50 in `test_ap_history`.
        7. Logs epoch index, train loss snapshot, AP50 and LR values in
           `history`.

        Args:
            trainer: Ultralytics trainer object that owns the optimizer and
                training state.
        """
        epoch = int(getattr(trainer, "epoch", 0))
        if epoch < self.warmup_epochs:
            return
        if self.eval_every > 1 and (epoch % self.eval_every):
            return
        if not (self.owner and self.detector_factory and self.evaluate_fn):
            return

        self._reduced_flag = False
        self._lr_after = None

        train_loss = self._get_train_loss(trainer)

        ultra_train = getattr(self.owner, "model", None)
        if ultra_train is None:
            return
        nn_train = getattr(ultra_train, "model", ultra_train)

        clone_det = self.detector_factory()
        ultra_clone = getattr(clone_det, "model", None)
        if ultra_clone is None:
            return

        nn_copied = copy.deepcopy(nn_train)
        nn_copied.eval()
        if self.clone_device:
            nn_copied.to(self.clone_device)
        for p in nn_copied.parameters():
            p.requires_grad_(False)

        if hasattr(ultra_clone, "model"):
            ultra_clone.model = nn_copied
        else:
            clone_det.model = nn_copied

        for attr in ("names", "nc", "args"):
            if hasattr(ultra_train, attr) and hasattr(ultra_clone, attr):
                try:
                    setattr(ultra_clone, attr, copy.deepcopy(getattr(ultra_train, attr)))
                except Exception:
                    pass

        with torch.inference_mode():
            preds = clone_det.predict_sticker(**(self.predict_kwargs or {}))

        m50 = float(self.evaluate_fn(
            coco_split_dir=self.coco_split_dir,
            predictions=preds,
            yolo_names=self.yolo_names
        ) or 0.0)

        if self.verbose:
            print(f"[ExtEval:det-nn-clone] epoch {epoch} mAP50(main)={m50:.6f}")

        self._maybe_scheduled_step(trainer, epoch)

        if self.use_plateau:
            if not self._active:
                if m50 > self.start_after_map:
                    self._active = True
                    self._best = m50
                    self._bad = 0
                    if self.verbose:
                        print(f"[Plateau] activated at epoch {epoch}: mAP50={m50:.6f}")
            else:
                if m50 > self._best + 1e-12:
                    self._best = m50
                    self._bad = 0
                    if self.verbose:
                        print(f"[Plateau] new best mAP50={m50:.6f} (epoch {epoch})")
                else:
                    self._bad += 1
                    if self._cool > 0:
                        self._cool -= 1
                    else:
                        eff_patience = self.patience if not self._had_reduction else (self.patience_after_first or self.patience)
                        if self._bad >= eff_patience:
                            self._reduce_lr(trainer)
                            self._bad = 0
                            self._cool = self.cooldown

        do_test = False
        if self.test_eval_every and self.test_eval_every > 0:
            start = int(self.test_eval_start_epoch or 0)
            if epoch >= start and ((epoch - start) % self.test_eval_every == 0):
                do_test = True
        if self.test_eval_epochs:
            if epoch in self.test_eval_epochs:
                do_test = True

        if do_test:
            tk = dict(self.predict_kwargs or {})
            tk["split"] = "test"
            if self.test_predict_kwargs:
                tk.update(self.test_predict_kwargs)

            with torch.inference_mode():
                preds_test = clone_det.predict_sticker(**tk)

            test_dir = self.test_coco_split_dir or self.coco_split_dir
            m50_test = float(self.evaluate_fn(
                coco_split_dir=test_dir,
                predictions=preds_test,
                yolo_names=self.yolo_names
            ) or 0.0)
            self.test_ap_history.append({"epoch": epoch, "ap50": m50_test})
            if self.verbose:
                print(f"[ExtEval:TEST] epoch {epoch} mAP50(test)={m50_test:.6f}")

        cur_lrs = self._lr_after if self._lr_after is not None else self._get_lrs(trainer)
        self.history.append({
            "epoch": epoch,
            "train_loss": train_loss,
            "ap50": m50,
            "lrs": cur_lrs,
            "reduced": self._reduced_flag,
        })

        total_epochs = self._get_total_epochs(trainer)
        if total_epochs is not None and (epoch + 1) >= int(total_epochs) and not self._printed:
            self._print_history()
            self._printed = True

        if cur_lrs is not None:
            self._target_lrs = list(cur_lrs)

    def on_train_epoch_start(self, trainer):
        """Ultralytics callback to enforce target learning rates.

        This method should be registered as `on_train_epoch_start`. It applies
        the current target learning rates stored in `_target_lrs` to the
        trainer optimizer and scheduler before the new epoch begins.
        """
        self._apply_target_lrs(trainer)

    def _apply_target_lrs(self, trainer):
        if not self._target_lrs:
            return
        opt = getattr(trainer, "optimizer", None)
        if opt is None:
            return
        for pg, v in zip(opt.param_groups, self._target_lrs):
            pg["lr"] = float(v)
            if "initial_lr" in pg:
                pg["initial_lr"] = float(v)
        args = getattr(trainer, "args", None)
        try:
            if args is not None and hasattr(args, "lr0"):
                setattr(args, "lr0", float(min(self._target_lrs)))
        except Exception:
            pass
        try:
            if hasattr(trainer, "lr0"):
                if isinstance(trainer.lr0, (list, tuple)):
                    trainer.lr0 = list(self._target_lrs)
                else:
                    trainer.lr0 = float(min(self._target_lrs))
        except Exception:
            pass
        sch = getattr(trainer, "scheduler", None)
        if sch is not None and hasattr(sch, "base_lrs"):
            try:
                sch.base_lrs = [float(v) for v in self._target_lrs]
            except Exception:
                pass

    def _maybe_scheduled_step(self, trainer, epoch: int):
        if epoch in self._scheduled_applied:
            return
        if epoch in self.scheduled_set_lrs:
            target = float(self.scheduled_set_lrs[epoch])
            self._set_absolute_lr(trainer, target)
            self._scheduled_applied.add(epoch)
            if self.verbose:
                print(f"[Schedule] epoch {epoch}: set LR -> {target}")
            return
        if (epoch in self.scheduled_epochs) or (epoch in self.scheduled_factors):
            fac = float(self.scheduled_factors.get(epoch, self.factor))
            self._reduce_lr(trainer, factor=fac)
            self._scheduled_applied.add(epoch)
            if self.verbose:
                print(f"[Schedule] epoch {epoch}: reduce LR by factor {fac}")

    def _set_absolute_lr(self, trainer, target_lr: float):
        opt = getattr(trainer, "optimizer", None)
        if opt is None:
            return
        new_vals = []
        for pg in opt.param_groups:
            pg["lr"] = max(float(target_lr), self.min_lr)
            new_vals.append(float(pg["lr"]))
        self._reduced_flag = True
        self._lr_after = new_vals
        self._target_lrs = list(new_vals)
        self._had_reduction = True
        self._apply_target_lrs(trainer)

    def _reduce_lr(self, trainer, factor: Optional[float] = None):
        if not hasattr(trainer, "optimizer") or trainer.optimizer is None:
            return
        fac = float(factor if factor is not None else self.factor)
        new_vals = []
        for pg in trainer.optimizer.param_groups:
            old = float(pg.get("lr", 0.0))
            new = max(old * fac, self.min_lr)
            if new < old - 1e-12:
                pg["lr"] = new
            new_vals.append(pg.get("lr", old))
        self._reduced_flag = True
        self._lr_after = new_vals
        self._target_lrs = list(new_vals)
        if not self._had_reduction:
            self._had_reduction = True
            if self.patience_after_first is not None:
                self.patience = self.patience_after_first
        self._apply_target_lrs(trainer)
        if self.verbose:
            print(f"[Plateau] LR reduced -> {new_vals}")

    def _get_lrs(self, trainer):
        opt = getattr(trainer, "optimizer", None)
        if opt is None:
            return None
        try:
            return [float(pg.get("lr", 0.0)) for pg in opt.param_groups]
        except Exception:
            return None

    def _get_train_loss(self, trainer) -> Optional[float]:
        for attr in ("tloss", "train_loss", "loss"):
            if hasattr(trainer, attr):
                v = getattr(trainer, attr)
                try:
                    if torch.is_tensor(v):
                        return float(v.detach().cpu().item())
                    return float(v)
                except Exception:
                    pass
        if hasattr(trainer, "loss_items"):
            try:
                li = trainer.loss_items
                if torch.is_tensor(li):
                    li = li.detach().cpu().tolist()
                return float(sum(map(float, li)))
            except Exception:
                pass
        m = getattr(trainer, "metrics", None)
        if isinstance(m, dict):
            for k in ("train/loss", "loss", "metrics/loss"):
                if k in m:
                    try:
                        return float(m[k])
                    except Exception:
                        pass
        return None

    def _get_total_epochs(self, trainer) -> Optional[int]:
        if hasattr(trainer, "epochs"):
            try:
                return int(getattr(trainer, "epochs"))
            except Exception:
                pass
        args = getattr(trainer, "args", None)
        if args is not None and hasattr(args, "epochs"):
            try:
                return int(getattr(args, "epochs"))
            except Exception:
                pass
        return None

    def _print_history(self):
        """Print a summary of AP50 and LR history and periodic test AP50.

        This is called automatically at the end of training from `on_fit_end`
        or `on_train_end` if it has not been printed yet.
        """
        if not self.history:
            print("[History] No eval records.")
            return
        print("\n==== Eval History (epoch | loss | AP50(main) | LRs | LR reduced?) ====")
        for r in self.history:
            loss_s = "NA"
            if r["train_loss"] is not None and math.isfinite(r["train_loss"]):
                loss_s = f"{r['train_loss']:.6f}"
            ap_s = f"{r['ap50']:.6f}"
            lr_s = "NA" if r["lrs"] is None else "[" + ", ".join(f"{x:.6g}" for x in r["lrs"]) + "]"
            red_s = "Y" if r["reduced"] else "N"
            print(f"epoch {r['epoch']:03d} | loss={loss_s} | AP50={ap_s} | LRs={lr_s} | reduced={red_s}")
        print("================================================================")

        if self.test_ap_history:
            best = max(self.test_ap_history, key=lambda d: d["ap50"])
            print("\n==== Periodic TEST AP50 (every N epochs) ====")
            for t in self.test_ap_history:
                print(f"epoch {t['epoch']:03d} | AP50(test)={t['ap50']:.6f}")
            print(f"Best TEST AP50={best['ap50']:.6f} at epoch {best['epoch']}")
            print("=============================================\n")
        else:
            print("\n[TEST] No periodic test evaluations recorded.\n")

    def on_fit_end(self, trainer):
        """Ultralytics callback that prints the eval history at the end of fit."""
        if not self._printed:
            self._print_history()
            self._printed = True

    def on_train_end(self, trainer):
        """Ultralytics callback that prints the eval history at the end of train."""
        if not self._printed:
            self._print_history()
            self._printed = True

### SCRIPT

In [None]:
"""Example script for training and evaluating the GuidedPipeline.

This example shows how to:

1. Set up dataset paths for YOLO and COCO.
2. Instantiate two `YOLODetector` instances:
   * `windshield_model` as the guide or windshield detector.
   * `sticker_model` as the sticker detector.
3. Build a `GuidedPipeline` that uses:
   * `guide` to detect windshields on full images.
   * `detector` to detect stickers inside cropped windshields.
4. Train the two stage pipeline:
   * Stage 1: Train the guide on windshields.
   * Stage 2: Use predicted windshields to build a crop dataset and train the sticker detector.
5. Run inference on the test split:
   * Predict windshields.
   * Crop them.
   * Predict stickers on crops.
   * Remap sticker predictions back to original image coordinates.
6. Evaluate the final sticker detections with COCO metrics using `evaluate_fn`.

Expected formats:

* `GuidedPipeline.predict` returns a dict mapping original image paths to
  a list of remapped sticker detections:

    {
        "<original/image.jpg>": [
            {
                "xyxy": [x1, y1, x2, y2],
                "conf": float_confidence,
                "cls": int_class_id
            },
            ...
        ],
        ...
    }

  This format is accepted directly by `evaluate_fn` as `predictions`.

* `evaluate_fn` computes COCO metrics on the given COCO split directory and
  returns AP at IoU 0.50 (AP50).

To run the full pipeline:

1. Adjust the paths for `DATA_YAML`, `COCO_SPLIT`, and `COCO_SPLIT_TEST` to
   your environment.
2. Ensure that `YOLODetector`, `GuidedPipeline`, and `evaluate_fn` are imported
   or defined in the same module.
3. Run this script. Training, prediction, and evaluation will execute in order.
"""

DATA_YAML = '/content/drive/MyDrive/Dataset/windshield/31shot_ws/data.yaml'
COCO_SPLIT = '/content/drive/MyDrive/Dataset_COCO/windshields/31shot_COCO/train'
COCO_SPLIT_TEST = '/content/drive/MyDrive/Dataset_COCO/windshields/31shot_COCO/test'

# Create guide (windshield) and detector (sticker) models
windshield_model = YOLODetector(pretrained='yolov8n.pt')
sticker_model = YOLODetector(pretrained='yolov8n.pt')

# Build the guided pipeline
pipeline = GuidedPipeline(
    detector=sticker_model,
    guide=windshield_model,
    coco_split_dir=COCO_SPLIT_TEST,
    conf=[0.05, 0.7],     # [sticker_conf, windshield_conf]
    iou=[0.5, 0.7],       # [sticker_iou, windshield_iou]
    input_size=(800, 1200),
    seed=30,
)

# Two stage training:
#   1) Train windshield detector, crop predictions, build sticker dataset.
#   2) Train sticker detector on cropped windshield patches.
pipeline.train(
    data_yaml=DATA_YAML,
    epochs=[100, 100],     # [sticker_epochs, windshield_epochs]
    scheduled_epochs=[],   # can be used together with ReduceOnPlateau scheduler
)

# Full two stage inference on the test split:
#   1) Predict windshields.
#   2) Crop windshields.
#   3) Build a temporary sticker crop dataset.
#   4) Predict stickers on the crops.
#   5) Remap sticker predictions to original image coordinates.
preds = pipeline.predict(
    data_yaml=DATA_YAML,
)

# COCO evaluation of remapped sticker predictions on the test split.
# `preds` is a dict mapping original image paths to detection lists,
# which matches the expected input format of `evaluate_fn`.
metrics = evaluate_fn(
    coco_split_dir=COCO_SPLIT_TEST,
    predictions=preds,
    yolo_names=["car-sticker"],
    target_size=(1280, 800),
)


### OPTIONAL FUNCTIONS

In [None]:
import json, time, os, random
from pathlib import Path
from collections import defaultdict
import numpy as np
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from typing import Optional


def evaluate_fn_tags_visualize(
    coco_split_dir,
    predictions,
    class_map=None,
    yolo_names=None,
    save_dir="./coco_eval_out",
    target_size=(1280, 800),
    tags_of_interest=("left", "right", "far", "near"),
    tags_coco_json=None,
    # visualization params (matplotlib-based)
    visualize_img_indices=None,
    save_visuals=False,
    visual_save_dir=None,
    visual_conf_thresh=0.0,
    visual_box_alpha=0.45,
    visual_text_alpha=0.35,
    vis_limit=10,
    # toggle for side recall computation
    side_recall_mode="manual",  # "manual" or "coco"
    recall_conf_thresh=None,     # used in manual mode only. None = no filter
):
    """
    Evaluate detector outputs on a COCO style split with tag based breakdowns,
    side aware metrics, and optional visualizations.

    This function extends a standard COCO evaluation with:
      * Overall AP and AR on the the full split.
      * Per tag AP and AR for tags such as "left", "right", "far", "near".
      * Side aware AP for left and right halves of the windshield, using the
        largest windshield box in ground truth to define the midline.
      * Per tag side recall at IoU 0.5 for left and right halves, plus an
        overall recall that combines both sides.
      * Optional matplotlib based visualizations that highlight:
          - True positives in green.
          - False positives in orange.
          - Missed ground truth boxes in red.
          - Windshield boxes in purple.
        Visualizations can be shown inline or saved to disk.

    The function expects predictions in one of these formats:

      1) Dict indexed by image path:

         {
             "<image_path>": [
                 {"xyxy": [x1, y1, x2, y2], "conf": float, "cls": int},
                 ...
             ],
             ...
         }

      2) List of entries with explicit detection lists:

         [
             {
                 "image_path": "<path>",
                 "detections": [
                     {"xyxy": [...], "conf": float, "cls": int},
                     ...
                 ]
             },
             ...
         ]

         or with the key "boxes" instead of "detections".

    Ground truth boxes and prediction boxes are rescaled to a fixed reference
    size given by ``target_size`` before running COCOeval. Tag information is
    loaded from a separate COCO style JSON file that stores per image
    ``extra.user_tags`` or ``extra.tags`` lists.

    Args:
        coco_split_dir (str or Path):
            Directory containing the COCO split with ``_annotations.coco.json``
            and the corresponding images.
        predictions (dict or list):
            Detector predictions in one of the formats described above.
        class_map (dict, optional):
            Mapping from YOLO class indices to COCO category ids. If None, the
            function tries to infer it from ``yolo_names`` or by searching for
            a category whose name contains "sticker". If the split only has a
            single category, that category is used by default.
        yolo_names (list of str, optional):
            YOLO class names in index order. Used to construct ``class_map``
            automatically when possible.
        save_dir (str or Path, optional):
            Directory where the COCO detection JSON and optional visualizations
            will be written. Created if it does not exist.
        target_size (tuple[int, int], optional):
            Target image size as ``(width, height)`` used to rescale both
            ground truth and predictions before COCOeval.
        tags_of_interest (iterable of str, optional):
            Tag names that define the tag specific subsets, for example
            ``("left", "right", "far", "near")``. Matching is done on the
            lowercased tag strings.
        tags_coco_json (str or Path, optional):
            Path to a COCO style JSON file that contains per image
            ``extra.user_tags`` or ``extra.tags`` fields. These tags are used
            to assign each image to the tag subsets. If None, only the main
            COCO split is used and tag based subsets will be empty.
        visualize_img_indices (list[int], optional):
            If given, attempt to visualize only images whose file names start
            with any of the integer prefixes in this list (for example
            ``[149, 164, 194]``). Indices are matched to file names by prefix.
            If None, the function selects up to ``vis_limit`` images that have
            at least one true positive, false positive, or false negative.
        save_visuals (bool, optional):
            If True, save visualization figures as PNG files. If False, show
            them interactively with ``plt.show()``.
        visual_save_dir (str or Path, optional):
            Target directory for saved visualizations. If None and
            ``save_visuals`` is True, the directory
            ``save_dir / "visualizations"`` is used.
        visual_conf_thresh (float, optional):
            Minimum detection confidence required for a prediction to appear in
            the visualization panes. Has no effect on COCO metrics, only on
            what is drawn.
        visual_box_alpha (float, optional):
            Alpha value for the bounding box outlines in the visualizations.
            Used for all true positive, false positive, and false negative
            rectangles.
        visual_text_alpha (float, optional):
            Alpha value for the text background in the label boxes drawn on
            top of each rectangle.
        vis_limit (int, optional):
            Maximum number of images to visualize when
            ``visualize_img_indices`` is None.
        side_recall_mode (str, optional):
            Strategy used for per tag side recall at IoU 0.5:

              * "manual" - Uses a simple greedy matching between ground truth
                and detections based on IoU, and counts true positives and
                false negatives directly.
              * "coco"   - Builds a side specific COCO subset and uses
                COCOeval internals at IoU 0.5 to derive recall.

        recall_conf_thresh (float or None, optional):
            Confidence threshold used in the "manual" side recall mode to
            filter detections before matching. If None, all detections are
            considered.

    Returns:
        dict:
            A nested result dictionary with keys:

            ``"overall"``:
                Summary for the full split with keys:

                * ``"stats"``: dict of COCO metrics, including
                  ``"AP@[.5:.95]"``, ``"AP@0.50"``, ``"AP@0.75"``, and AR
                  values.
                * ``"n_images"``: number of images in this subset.
                * ``"n_gt"``: number of ground truth instances for the
                  evaluated classes.
                * ``"n_dt"``: number of detections for the evaluated classes.

            One entry per tag in ``tags_of_interest`` (lowercased):
                Same structure as ``"overall"``, but computed only on images
                that contain that tag.

            ``"left_side"`` and ``"right_side"``:
                Side aware COCO metrics restricted to the appropriate half of
                the windshield, if a windshield category can be identified.
                Each entry has the same structure as ``"overall"``. If no
                windshield category is found, these entries contain zero
                counts and ``stats=None``.

            ``"per_tag_side_recall"``:
                Nested dict indexed first by tag (lowercased) and then by side:

                * ``results["per_tag_side_recall"][tag]["left"]``:
                    Manual or COCO based recall at IoU 0.5 for the left half.
                * ``results["per_tag_side_recall"][tag]["right"]``:
                    Same for the right half.
                * ``results["per_tag_side_recall"][tag]["overall"]``:
                    Combined recall across both halves.

                Each side dictionary contains:

                  * ``"recall@0.5"``: recall value at IoU 0.5.
                  * ``"TP"``: number of true positives.
                  * ``"FN"``: number of false negatives.
                  * ``"n_images"``: number of images that contributed ground
                    truth or detections for that tag and side.
                  * ``"n_gt"``: total number of ground truth boxes considered.

            ``"AP50_overall"``:
                Convenience shortcut for
                ``results["overall"]["stats"]["AP@0.50"]``.

    Notes:
        * The function modifies the COCO ground truth annotations in place by
          rescaling bounding boxes and updating image sizes to match
          ``target_size``.
        * Side aware computations rely on a "windshield" category being
          present in the COCO categories. If it is missing, side based AP and
          side based recall are skipped.
        * Visualization requires matplotlib and PIL. If these imports fail,
          evaluation still runs but visualizations are skipped.
    """

    # -------------- utils --------------
    def _ensure_dir(p: Path):
        p.mkdir(parents=True, exist_ok=True)
        return p

    def _largest_bbox(anns_list):
        if not anns_list:
            return None
        best, best_area = None, -1.0
        for a in anns_list:
            x, y, w, h = a["bbox"]
            area = w * h
            if area > best_area:
                best_area, best = area, a
        return best

    def _resolve_image_path(split_dir: Path, file_name: str) -> Optional[Path]:
        p = Path(file_name)
        cand = []
        if p.is_absolute():
            cand.append(p)
        cand.append(split_dir / p)
        cand.append(split_dir / "images" / p)
        cand.append(split_dir / "images" / p.name)
        if "images" in p.parts:
            cand.append(split_dir / "images" / p.name)
        for c in cand:
            if c.exists():
                return c
        return None

    # IoU helpers for manual recall
    def _xywh_to_xyxy(box):
        x, y, w, h = box
        return (x, y, x + w, y + h)

    def _iou(a_xywh, b_xywh):
        ax1, ay1, ax2, ay2 = _xywh_to_xyxy(a_xywh)
        bx1, by1, bx2, by2 = _xywh_to_xyxy(b_xywh)
        ix1, iy1 = max(ax1, bx1), max(ay1, by1)
        ix2, iy2 = min(ax2, bx2), min(ay2, by2)
        iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1)
        inter = iw * ih
        if inter <= 0.0:
            return 0.0
        a_area = (ax2 - ax1) * (ay2 - ay1)
        b_area = (bx2 - bx1) * (by2 - by1)
        denom = a_area + b_area - inter
        return inter / denom if denom > 0 else 0.0

    def _greedy_match_tp(gt_boxes, dt_boxes, iou_thr=0.5):
        # dt_boxes is list of (bbox, score)
        if not gt_boxes:
            return 0, 0
        dt_sorted = sorted(dt_boxes, key=lambda z: float(z[1]), reverse=True)
        gt_used = [False] * len(gt_boxes)
        tp = 0
        for db, _sc in dt_sorted:
            best_iou, best_idx = 0.0, -1
            for i, gb in enumerate(gt_boxes):
                if gt_used[i]:
                    continue
                iou = _iou(gb, db)
                if iou > best_iou:
                    best_iou, best_idx = iou, i
            if best_iou >= iou_thr and best_idx >= 0:
                gt_used[best_idx] = True
                tp += 1
        fn = len(gt_boxes) - tp
        return tp, fn

    try:
        import cv2  # optional
    except Exception:
        cv2 = None

    # -------------- load GT --------------
    coco_split_dir = Path(coco_split_dir)
    ann_path = coco_split_dir / "_annotations.coco.json"
    cocoGt = COCO(str(ann_path))
    img_ids = cocoGt.getImgIds()
    imgs = cocoGt.loadImgs(img_ids)

    basename_to_imgid = {Path(im["file_name"]).name: im["id"] for im in imgs}
    cats = cocoGt.loadCats(cocoGt.getCatIds())
    name_to_catid = {c["name"]: c["id"] for c in cats}

    # meta for sub-COCOs
    info_meta = cocoGt.dataset.get("info", {})
    licenses_meta = cocoGt.dataset.get("licenses", [])

    def _build_coco_subset(images_subset, anns_subset, use_eval_cats=True):
        sub = COCO()
        sub.dataset = {
            "info": info_meta,
            "licenses": licenses_meta,
            "images": images_subset,
            "annotations": anns_subset,
            "categories": ([c for c in cats if c["id"] in eval_cat_ids] if use_eval_cats else cats),
        }
        sub.createIndex()
        return sub

    # -------------- class_map --------------
    if class_map is None:
        if yolo_names:
            tmp = {i: name_to_catid[nm] for i, nm in enumerate(yolo_names) if nm in name_to_catid}
            if tmp:
                class_map = tmp
        if class_map is None:
            sticker_id = next((c["id"] for c in cats if "sticker" in c["name"].lower()), None)
            if sticker_id:
                class_map = {0: sticker_id}
    if class_map is None and len(cats) == 1:
        class_map = {0: cats[0]["id"]}
    if class_map is None:
        raise ValueError("[evaluate] Could not infer class_map on multi-class COCO.")
    eval_cat_ids = sorted(set(class_map.values()))

    def to_cat_id(yolo_cls):
        return class_map.get(int(yolo_cls), eval_cat_ids[0])

    # windshield cat (optional)
    windshield_cat_id = None
    if yolo_names and "windshield" in yolo_names and "windshield" in name_to_catid:
        windshield_cat_id = name_to_catid["windshield"]
    if windshield_cat_id is None:
        for c in cats:
            if c["name"].lower() == "windshield":
                windshield_cat_id = c["id"]
                break

    # for viz color override
    windshield_cat_ids = {c["id"] for c in cats if "windshield" in c["name"].lower()}

    target_w, target_h = target_size

    # -------------- scale GT to target (in-place) --------------
    imgid_to_origsz = {im["id"]: (im["width"], im["height"]) for im in imgs}
    for im in imgs:
        ow, oh = im["width"], im["height"]
        sx, sy = target_w / ow, target_h / oh
        ann_ids_img = cocoGt.getAnnIds(imgIds=[im["id"]])
        for ann in cocoGt.loadAnns(ann_ids_img):
            x, y, w, h = ann["bbox"]
            ann["bbox"] = [x * sx, y * sy, w * sx, h * sy]
        im["width"], im["height"] = target_w, target_h

    # -------------- normalize predictions --------------
    norm_preds = []
    if isinstance(predictions, dict):
        for k, v in predictions.items():
            norm_preds.append({"image_path": k, "detections": v})
    elif isinstance(predictions, list):
        for it in predictions:
            dets = it.get("detections", it.get("boxes", []))
            ip = it.get("orig_path") or it.get("image_path") or it.get("crop_path")
            norm_preds.append({"image_path": ip, "detections": dets})
    else:
        raise TypeError("predictions must be dict or list")

    # -------------- build detection list (scaled) --------------
    dt_list, skipped = [], 0
    for it in norm_preds:
        ipath = it["image_path"]
        if not ipath:
            continue
        base = Path(ipath).name
        img_id = basename_to_imgid.get(base)
        if img_id is None:
            skipped += 1
            continue
        ow, oh = imgid_to_origsz[img_id]
        sx, sy = target_w / ow, target_h / oh
        for d in it.get("detections", []):
            x1, y1, x2, y2 = d["xyxy"]
            x1, x2 = x1 * sx, x2 * sx
            y1, y2 = y1 * sy, y2 * sy
            w, h = max(0.0, x2 - x1), max(0.0, y2 - y1)
            if w <= 0 or h <= 0:
                continue
            dt_list.append({
                "image_id": int(img_id),
                "category_id": int(to_cat_id(d.get("cls", 0))),
                "bbox": [float(x1), float(y1), w, h],
                "score": float(d.get("conf", 0.0))
            })
    if skipped:
        print(f"[evaluate] Skipped {skipped} prediction entries not found in COCO GT by basename.")

    for cid in eval_cat_ids:
        n_gt = len(cocoGt.getAnnIds(catIds=[cid]))
        n_dt = sum(1 for d in dt_list if d["category_id"] == cid)
        cname = next((c["name"] for c in cats if c["id"] == cid), str(cid))
        print(f"[evaluate] Category '{cname}' (id={cid}): GT={n_gt}, DT={n_dt}")

    # -------------- COCO results --------------
    save_dir = Path(save_dir)
    _ensure_dir(save_dir)
    dt_json = save_dir / f"coco_dt_{int(time.time())}.json"
    dt_json.write_text(json.dumps(dt_list))
    cocoDt = cocoGt.loadRes(str(dt_json)) if len(dt_list) > 0 else COCO()

    metric_names = [
        "AP@[.5:.95]", "AP@0.50", "AP@0.75", "AP_small", "AP_medium", "AP_large",
        "AR@1", "AR@10", "AR@100", "AR_small", "AR_medium", "AR_large"
    ]

    def run_eval(subset_img_ids, label="overall", coco_gt=None, coco_dt=None):
        if not subset_img_ids:
            print(f"[evaluate:{label}] No images in subset.")
            return {"stats": None, "n_images": 0, "n_gt": 0, "n_dt": 0}
        _coco_gt = coco_gt if coco_gt is not None else cocoGt
        _coco_dt = coco_dt if coco_dt is not None else cocoDt
        E = COCOeval(_coco_gt, _coco_dt, iouType="bbox")
        E.params.imgIds = list(subset_img_ids)
        E.params.catIds = eval_cat_ids
        E.evaluate()
        E.accumulate()
        E.summarize()
        n_gt = sum(len(_coco_gt.getAnnIds(imgIds=[iid], catIds=eval_cat_ids)) for iid in subset_img_ids)
        n_dt = 0
        if hasattr(_coco_dt, "anns") and _coco_dt.anns:
            for _, ann in _coco_dt.anns.items():
                if ann.get("image_id") in subset_img_ids and ann.get("category_id") in eval_cat_ids:
                    n_dt += 1
        return {
            "stats": {k: float(E.stats[i]) for i, k in enumerate(metric_names)},
            "n_images": len(subset_img_ids),
            "n_gt": int(n_gt),
            "n_dt": int(n_dt),
        }

    # -------------- tags ingestion --------------
    tags_map = {}
    if tags_coco_json:
        tdata = json.loads(Path(tags_coco_json).read_text())
        for im in tdata.get("images", []):
            keys = set()
            fn = im.get("file_name") or ""
            en = (im.get("extra") or {}).get("name") or ""
            for s in (fn, Path(fn).name, Path(fn).stem, en, Path(en).name, Path(en).stem):
                if s:
                    keys.add(str(s))
            raw = (im.get("extra") or {}).get("user_tags") or (im.get("extra") or {}).get("tags") or []
            if not isinstance(raw, list):
                raw = [raw]
            tags = [t.strip().lower() for t in raw if isinstance(t, str) and t.strip()]
            for k in keys:
                tags_map[k] = tags

    imgid_to_tags = {}
    for im in imgs:
        keys = [
            im.get("file_name") or "",
            Path(im.get("file_name") or "").name,
            Path(im.get("file_name") or "").stem,
        ]
        tags = []
        for k in keys:
            if k in tags_map:
                tags = tags_map[k]
                break
        imgid_to_tags[im["id"]] = tags

    # -------------- overall + per-tag AP and AR --------------
    results = {}
    results["overall"] = run_eval(img_ids, "overall")
    tags_lower = [t.lower() for t in tags_of_interest]
    for t in tags_lower:
        subset = [iid for iid, tl in imgid_to_tags.items() if t in tl]
        print(f"[evaluate] Tag '{t}': {len(subset)} images.")
        results[t] = run_eval(subset, t)
    results["AP50_overall"] = results["overall"]["stats"]["AP@0.50"] if results["overall"]["stats"] else None

    # -------------- side-aware AP for left and right images --------------
    if windshield_cat_id is not None:
        imgid_to_ws = {}
        for iid in img_ids:
            ws_ids = cocoGt.getAnnIds(imgIds=[iid], catIds=[windshield_cat_id])
            if not ws_ids:
                continue
            best = _largest_bbox(cocoGt.loadAnns(ws_ids))
            if not best:
                continue
            bx, by, bw, bh = best["bbox"]
            xmid = bx + bw * 0.5
            imgid_to_ws[iid] = {"bbox": [bx, by, bw, bh], "xmid": xmid}

        left_tag_imgs = {iid for iid, tags in imgid_to_tags.items() if "left" in tags}
        right_tag_imgs = {iid for iid, tags in imgid_to_tags.items() if "right" in tags}
        left_img_ids_ws = sorted(i for i in left_tag_imgs if i in imgid_to_ws)
        right_img_ids_ws = sorted(i for i in right_tag_imgs if i in imgid_to_ws)

        def _collect_side_gt(img_id_list, side: str):
            out = []
            for iid in img_id_list:
                meta = imgid_to_ws[iid]
                xmid = meta["xmid"]
                wx, wy, ww, wh = meta["bbox"]
                for a in cocoGt.loadAnns(cocoGt.getAnnIds(imgIds=[iid], catIds=eval_cat_ids)):
                    x, y, w, h = a["bbox"]
                    cx = x + w * 0.5
                    if not (wx <= cx <= wx + ww):
                        continue
                    if (side == "left" and cx <= xmid) or (side == "right" and cx > xmid):
                        out.append(a)
            return out

        def _collect_side_dt(img_id_list, side: str):
            out = []
            for iid in img_id_list:
                meta = imgid_to_ws.get(iid)
                if not meta:
                    continue
                xmid = meta["xmid"]
                wx, wy, ww, wh = meta["bbox"]
                for d in dt_list:
                    if d["image_id"] != iid:
                        continue
                    if d["category_id"] not in eval_cat_ids:
                        continue
                    x, y, w, h = d["bbox"]
                    cx = x + w * 0.5
                    if not (wx <= cx <= wx + ww):
                        continue
                    if (side == "left" and cx <= xmid) or (side == "right" and cx > xmid):
                        out.append(d)
            return out

        left_gt_anns = _collect_side_gt(left_img_ids_ws, "left")
        left_imgs = [im for im in imgs if im["id"] in left_img_ids_ws]
        coco_left = _build_coco_subset(left_imgs, left_gt_anns)
        left_dt = _collect_side_dt(left_img_ids_ws, "left")
        cocoDt_left = coco_left.loadRes(left_dt) if left_dt else COCO()
        results["left_side"] = run_eval(set(left_img_ids_ws), "left_side", coco_gt=coco_left, coco_dt=cocoDt_left)

        right_gt_anns = _collect_side_gt(right_img_ids_ws, "right")
        right_imgs = [im for im in imgs if im["id"] in right_img_ids_ws]
        coco_right = _build_coco_subset(right_imgs, right_gt_anns)
        right_dt = _collect_side_dt(right_img_ids_ws, "right")
        cocoDt_right = coco_right.loadRes(right_dt) if right_dt else COCO()
        results["right_side"] = run_eval(set(right_img_ids_ws), "right_side", coco_gt=coco_right, coco_dt=cocoDt_right)
    else:
        print("[evaluate][side-aware] No 'windshield' category; skipping left and right side AP.")
        results["left_side"] = {"stats": None, "n_images": 0, "n_gt": 0, "n_dt": 0}
        results["right_side"] = {"stats": None, "n_images": 0, "n_gt": 0, "n_dt": 0}

    # -------------- per-tag side-wise recall@0.5 --------------
    results["per_tag_side_recall"] = {}
    if windshield_cat_id is not None:
        # build once
        imgid_to_ws = {}
        for iid in img_ids:
            ws_ids = cocoGt.getAnnIds(imgIds=[iid], catIds=[windshield_cat_id])
            if not ws_ids:
                continue
            best = _largest_bbox(cocoGt.loadAnns(ws_ids))
            if not best:
                continue
            bx, by, bw, bh = best["bbox"]
            xmid = bx + bw * 0.5
            imgid_to_ws[iid] = {"bbox": [bx, by, bw, bh], "xmid": xmid}

        # index GT and DT per image
        gt_by_img = {
            iid: [a for a in cocoGt.loadAnns(cocoGt.getAnnIds(imgIds=[iid], catIds=eval_cat_ids))]
            for iid in img_ids
        }
        dt_by_img = defaultdict(list)
        for d in dt_list:
            if d["category_id"] in eval_cat_ids:
                dt_by_img[d["image_id"]].append(d)

        def _side_boxes_from_anns(iid, side, anns, ws_meta):
            out = []
            xmid = ws_meta["xmid"]
            wx, wy, ww, wh = ws_meta["bbox"]
            for a in anns:
                x, y, w, h = a["bbox"]
                cx = x + w * 0.5
                if not (wx <= cx <= wx + ww):
                    continue
                if (side == "left" and cx <= xmid) or (side == "right" and cx > xmid):
                    out.append(a)
            return out

        def _collect_gt_dt_for_tag_side_manual(tag_str, side):
            all_gt_boxes = []
            all_dt_boxes = []
            contributing_img_ids = set()
            for iid, tags in imgid_to_tags.items():
                if tag_str not in tags:
                    continue
                if iid not in imgid_to_ws:
                    continue
                ws_meta = imgid_to_ws[iid]
                # GT side
                gt_side_anns = _side_boxes_from_anns(iid, side, gt_by_img[iid], ws_meta)
                if gt_side_anns:
                    contributing_img_ids.add(iid)
                all_gt_boxes.extend([g["bbox"] for g in gt_side_anns])
                # DT side
                dts = dt_by_img.get(iid, [])
                if recall_conf_thresh is None:
                    keep = dts
                else:
                    keep = [d for d in dts if float(d.get("score", 0.0)) >= float(recall_conf_thresh)]
                dt_side = _side_boxes_from_anns(iid, side, keep, ws_meta)
                if dt_side:
                    contributing_img_ids.add(iid)
                all_dt_boxes.extend([(d["bbox"], float(d.get("score", 0.0))) for d in dt_side])
            return all_gt_boxes, all_dt_boxes, contributing_img_ids

        def _collect_gt_dt_for_tag_side_coco(tag_str, side):
            imgs_sub = []
            gt_anns = []
            dt_anns = []
            contributing_img_ids = set()
            for iid, tags in imgid_to_tags.items():
                if tag_str not in tags:
                    continue
                if iid not in imgid_to_ws:
                    continue
                ws_meta = imgid_to_ws[iid]
                gt_side_anns = _side_boxes_from_anns(iid, side, gt_by_img[iid], ws_meta)
                dts = dt_by_img.get(iid, [])
                dt_side = _side_boxes_from_anns(iid, side, dts, ws_meta)
                if gt_side_anns or dt_side:
                    imgs_sub.append(next(i for i in imgs if i["id"] == iid))
                    contributing_img_ids.add(iid)
                gt_anns.extend(gt_side_anns)
                dt_anns.extend(dt_side)
            coco_sub = _build_coco_subset(imgs_sub, gt_anns)
            cocoDt_sub = coco_sub.loadRes(dt_anns) if dt_anns else COCO()
            return coco_sub, cocoDt_sub, [im["id"] for im in imgs_sub], contributing_img_ids

        def _recall_at_05_coco(coco_sub, cocoDt_sub, img_ids_sub):
            if not img_ids_sub:
                return 0.0, 0, 0
            E = COCOeval(coco_sub, cocoDt_sub, iouType="bbox")
            E.params.imgIds = list(img_ids_sub)
            E.params.catIds = eval_cat_ids
            E.evaluate()
            E.accumulate()
            iou_thrs = E.params.iouThrs
            t_idx = int(np.argmin(np.abs(iou_thrs - 0.5)))
            total_gt, matched_gt = 0, 0
            for ei in (E.evalImgs or []):
                if ei is None or ei.get("category_id") not in eval_cat_ids:
                    continue
                gtIds = ei.get("gtIds", [])
                gtMatches = ei.get("gtMatches")
                gtIgnore = ei.get("gtIgnore", np.zeros(len(gtIds), dtype=bool))
                if gtMatches is None:
                    continue
                gtMatches = np.array(gtMatches)
                gtIgnore = np.array(gtIgnore)
                if gtMatches.shape[1] != len(gtIds):
                    continue
                rowg = gtMatches[t_idx]
                for k, gid in enumerate(gtIds):
                    if gtIgnore[k]:
                        continue
                    total_gt += 1
                    if rowg[k] > 0:
                        matched_gt += 1
            recall = float(matched_gt) / float(total_gt) if total_gt > 0 else 0.0
            return recall, matched_gt, total_gt

        for tag in tags_lower:
            results["per_tag_side_recall"].setdefault(tag, {})

            if side_recall_mode.lower() == "manual":
                # left
                gt_l, dt_l, ids_l = _collect_gt_dt_for_tag_side_manual(tag, "left")
                tp_l, fn_l = _greedy_match_tp(gt_l, dt_l, iou_thr=0.5)
                tot_l = tp_l + fn_l
                r_l = (tp_l / tot_l) if tot_l > 0 else 0.0
                # right
                gt_r, dt_r, ids_r = _collect_gt_dt_for_tag_side_manual(tag, "right")
                tp_r, fn_r = _greedy_match_tp(gt_r, dt_r, iou_thr=0.5)
                tot_r = tp_r + fn_r
                r_r = (tp_r / tot_r) if tot_r > 0 else 0.0
            else:  # "coco"
                coco_l, cocoDt_l, img_ids_l, ids_l = _collect_gt_dt_for_tag_side_coco(tag, "left")
                r_l, tp_l, tot_l = _recall_at_05_coco(coco_l, cocoDt_l, img_ids_l)
                coco_r, cocoDt_r, img_ids_r, ids_r = _collect_gt_dt_for_tag_side_coco(tag, "right")
                r_r, tp_r, tot_r = _recall_at_05_coco(coco_r, cocoDt_r, img_ids_r)

            n_img_tag_ws = len(ids_l.union(ids_r))

            # store left and right
            results["per_tag_side_recall"][tag]["left"] = {
                "recall@0.5": r_l,
                "TP": tp_l,
                "FN": tot_l - tp_l if side_recall_mode == "coco" else fn_l,
                "n_images": len(ids_l),
                "n_gt": tot_l,
            }
            results["per_tag_side_recall"][tag]["right"] = {
                "recall@0.5": r_r,
                "TP": tp_r,
                "FN": tot_r - tp_r if side_recall_mode == "coco" else fn_r,
                "n_images": len(ids_r),
                "n_gt": tot_r,
            }

            # overall across left and right
            tp_all = tp_l + tp_r
            gt_all = tot_l + tot_r
            fn_all = gt_all - tp_all
            r_all = (tp_all / gt_all) if gt_all > 0 else 0.0
            results["per_tag_side_recall"][tag]["overall"] = {
                "recall@0.5": r_all,
                "TP": tp_all,
                "FN": fn_all,
                "n_images": n_img_tag_ws,
                "n_gt": gt_all,
            }
    else:
        print("[evaluate][per-tag side recall] No 'windshield' category; skipping.")

    # -------------- summary print --------------
    print("\n=== EVAL SUMMARY ===")

    def _line(lbl, d):
        if not d["stats"]:
            return f"{lbl:>12}: n_images={d['n_images']} | n_gt={d['n_gt']} | n_dt={d['n_dt']} (no stats)"
        return (
            f"{lbl:>12}: n_images={d['n_images']} | n_gt={d['n_gt']} | n_dt={d['n_dt']} | "
            f"AP50={d['stats']['AP@0.50']:.4f} | AP={d['stats']['AP@[.5:.95]']:.4f}"
        )

    print(_line("overall", results["overall"]))
    for t in tags_lower:
        print(_line(t, results[t]))
    if "left_side" in results:
        print(_line("left_side", results["left_side"]))
    if "right_side" in results:
        print(_line("right_side", results["right_side"]))
    if "per_tag_side_recall" in results:
        print("\n=== PER-TAG SIDE RECALL @0.5 ===")
        for tag, sides in results["per_tag_side_recall"].items():
            l = sides.get("left", {"recall@0.5": 0.0, "n_gt": 0, "n_images": 0, "TP": 0})
            r = sides.get("right", {"recall@0.5": 0.0, "n_gt": 0, "n_images": 0, "TP": 0})
            o = sides.get("overall", {"recall@0.5": 0.0, "n_gt": 0, "n_images": 0, "TP": 0})
            print(
                f"{tag:>8} | left:  R={l['recall@0.5']:.4f} (TP={l['TP']}/GT={l['n_gt']}, imgs={l['n_images']})"
                f"   right: R={r['recall@0.5']:.4f} (TP={r['TP']}/GT={r['n_gt']}, imgs={r['n_images']})"
                f"   overall: R={o['recall@0.5']:.4f} (TP={o['TP']}/GT={o['n_gt']}, imgs={o['n_images']})"
            )

    # -------------- visualization (matplotlib) --------------
    try:
        import matplotlib.pyplot as plt
        import matplotlib.patches as patches
        from PIL import Image

        Eall = COCOeval(cocoGt, cocoDt, iouType="bbox")
        Eall.params.imgIds = list(img_ids)
        Eall.params.catIds = eval_cat_ids
        Eall.evaluate()
        Eall.accumulate()

        iou_thrs = Eall.params.iouThrs
        t_idx = int(np.argmin(np.abs(iou_thrs - 0.5)))

        tp_per_img = defaultdict(list)  # (bbox, cat_id, score)
        fp_per_img = defaultdict(list)  # (bbox, cat_id, score)
        fn_per_img = defaultdict(list)  # (bbox, cat_id)

        dt_anns = {
            a["id"]: a
            for a in (
                cocoDt.loadAnns(cocoDt.getAnnIds())
                if hasattr(cocoDt, "dataset") and cocoDt.dataset
                else []
            )
        }
        gt_anns = {a["id"]: a for a in cocoGt.loadAnns(cocoGt.getAnnIds())}

        for ev in (Eall.evalImgs or []):
            if ev is None:
                continue
            img_id = ev["image_id"]
            if ev.get("category_id") not in eval_cat_ids:
                continue

            dtIds = ev.get("dtIds", [])
            gtIds = ev.get("gtIds", [])
            if len(dtIds) == 0 and len(gtIds) == 0:
                continue

            dtMatches = np.array(ev.get("dtMatches", []))
            gtMatches = np.array(ev.get("gtMatches", []))
            dtIgnore = np.array(ev.get("dtIgnore", np.zeros(len(dtIds), dtype=bool)))
            gtIgnore = np.array(ev.get("gtIgnore", np.zeros(len(gtIds), dtype=bool)))

            # detections
            if dtIds is not None:
                is_2d = dtMatches.ndim == 2 and dtMatches.shape[0] == len(iou_thrs)
                for j, dt_id in enumerate(dtIds):
                    ann = dt_anns.get(dt_id)
                    if not ann:
                        continue
                    score = float(ann.get("score", 0.0))
                    if score < visual_conf_thresh:
                        continue
                    if is_2d:
                        matched_gt_id = int(dtMatches[t_idx, j]) if j < dtMatches.shape[1] else 0
                        is_ignored = (
                            bool(dtIgnore[t_idx, j])
                            if dtIgnore.ndim == 2
                            else bool(dtIgnore[j])
                            if dtIgnore.size > j
                            else False
                        )
                    else:
                        matched_gt_id = int(dtMatches[j]) if dtMatches.size > j else 0
                        is_ignored = bool(dtIgnore[j]) if dtIgnore.size > j else False
                    if is_ignored:
                        continue
                    bbox = ann["bbox"]
                    cat_id = ann["category_id"]
                    if matched_gt_id > 0:
                        tp_per_img[img_id].append((bbox, cat_id, score))
                    else:
                        fp_per_img[img_id].append((bbox, cat_id, score))

            # ground truths
            if gtIds is not None:
                is_2d = gtMatches.ndim == 2 and gtMatches.shape[0] == len(iou_thrs)
                for j, gt_id in enumerate(gtIds):
                    ann = gt_anns.get(gt_id)
                    if not ann:
                        continue
                    if is_2d:
                        matched_dt_id = int(gtMatches[t_idx, j]) if j < gtMatches.shape[1] else 0
                        is_ignored = (
                            bool(gtIgnore[t_idx, j])
                            if gtIgnore.ndim == 2
                            else bool(gtIgnore[j])
                            if gtIgnore.size > j
                            else False
                        )
                    else:
                        matched_dt_id = int(gtMatches[j]) if gtMatches.size > j else 0
                        is_ignored = bool(gtIgnore[j]) if gtIgnore.size > j else False
                    if is_ignored:
                        continue
                    if matched_dt_id == 0:
                        bbox = ann["bbox"]
                        cat_id = ann["category_id"]
                        fn_per_img[img_id].append((bbox, cat_id))

        # choose images
        if visualize_img_indices is not None:
            candidate_imgs = []
            for idx in visualize_img_indices:
                idx_str = str(idx)
                prefix1 = f"{idx_str}_jpg"
                prefix2 = f"{idx_str}_"
                matched_img_id = None
                for base, _img_id in basename_to_imgid.items():
                    if base.startswith(prefix1) or base.startswith(prefix2):
                        matched_img_id = _img_id
                        break
                if matched_img_id is not None and matched_img_id not in candidate_imgs:
                    candidate_imgs.append(matched_img_id)

            # keep only ones that have any TP, FP, or FN at the applied thresholds
            candidate_imgs = [
                img_id
                for img_id in candidate_imgs
                if tp_per_img[img_id] or fp_per_img[img_id] or fn_per_img[img_id]
            ]
            if not candidate_imgs:
                print(f"[viz] No images matched visualize_img_indices={visualize_img_indices} after thresholding.")
        else:
            # take first N in dataset order that have any TP, FP, or FN
            candidate_imgs = []
            for img_id in img_ids:
                if tp_per_img[img_id] or fp_per_img[img_id] or fn_per_img[img_id]:
                    candidate_imgs.append(img_id)
                if len(candidate_imgs) == vis_limit:
                    break

        if not candidate_imgs:
            print("[viz] No images to visualize.")
        else:
            if save_visuals:
                vis_dir = Path(visual_save_dir) if visual_save_dir else (save_dir / "visualizations")
                _ensure_dir(vis_dir)

            def get_color(base_color, cat_id):
                return "purple" if cat_id in windshield_cat_ids else base_color

            def draw_label(ax, x, y, text):
                ax.text(
                    x,
                    max(y - 2, 0),
                    text,
                    fontsize=6,
                    color="white",
                    alpha=visual_text_alpha,
                    ha="left",
                    va="bottom",
                    bbox=dict(
                        facecolor="black",
                        edgecolor="none",
                        pad=1.0,
                        alpha=visual_text_alpha,
                    ),
                )

            for img_id in candidate_imgs:
                img_info = cocoGt.loadImgs([img_id])[0]
                img_path = _resolve_image_path(coco_split_dir, img_info["file_name"])
                if img_path is None:
                    print(f"[viz] Missing image file for {img_info['file_name']} under {coco_split_dir}, skipping.")
                    continue
                try:
                    from PIL import Image
                    img = Image.open(img_path).convert("RGB")
                except Exception as e:
                    print(f"[viz] Could not load image for visualization: {img_path} ({e})")
                    continue
                img_resized = img.resize((target_w, target_h))

                fig = plt.figure(figsize=(8, 6))
                plt.imshow(img_resized)
                ax = plt.gca()
                ax.set_title(f"image_id={img_id}")
                ax.axis("off")

                # TP
                for (bbox, cat_id, score) in tp_per_img.get(img_id, []):
                    x, y, w, h = bbox
                    rect = patches.Rectangle(
                        (x, y),
                        w,
                        h,
                        linewidth=1,
                        edgecolor=get_color("green", cat_id),
                        facecolor="none",
                        alpha=visual_box_alpha,
                    )
                    ax.add_patch(rect)
                    draw_label(ax, x, y, f"{score:.2f}")

                # FP
                for (bbox, cat_id, score) in fp_per_img.get(img_id, []):
                    x, y, w, h = bbox
                    rect = patches.Rectangle(
                        (x, y),
                        w,
                        h,
                        linewidth=1,
                        edgecolor=get_color("orange", cat_id),
                        facecolor="none",
                        alpha=visual_box_alpha,
                    )
                    ax.add_patch(rect)
                    draw_label(ax, x, y, f"{score:.2f}")

                # FN
                for (bbox, cat_id) in fn_per_img.get(img_id, []):
                    x, y, w, h = bbox
                    rect = patches.Rectangle(
                        (x, y),
                        w,
                        h,
                        linewidth=1,
                        edgecolor=get_color("red", cat_id),
                        facecolor="none",
                        alpha=visual_box_alpha,
                    )
                    ax.add_patch(rect)
                    draw_label(ax, x, y, "miss")

                if save_visuals:
                    out_name = f"vis_{img_id}.png"
                    out_path = (
                        Path(visual_save_dir)
                        if visual_save_dir
                        else (save_dir / "visualizations")
                    ) / out_name
                    fig.savefig(out_path, bbox_inches="tight", dpi=150)
                    plt.close(fig)
                    print(f"[viz] Wrote {out_path}")
                else:
                    plt.show()

    except ImportError as e:
        print(f"[viz] Visualization skipped (missing dependency): {e}")

    return results

