### App Evaluation Results Analysis

This notebook contains the code for evaluating app results, including preprocessing, metrics calculation, and visualization.

#### Step 1: Preprocess Data

This step involves:
1. Converting detection files to COCO format
2. Aligning IDs between detection and annotation COCO files

#### Step 2: Calculating metrics

#### Step 3: Plotting results

#### Step 4: Creating HTML report

In [None]:
import os
import json
import random
import base64
import cv2

from collections import defaultdict
from typing import Dict, List, Tuple, Set, Optional, Any
from pathlib import Path
from sklearn.cluster import DBSCAN

import numpy as np
import matplotlib.pyplot as plt


### Step 1: Preprocess Data

In [None]:
# Define input and output paths
detections_dir = "data/input/raw_detections"                        # This folder should contain the json's and images of detections
annotations_file = "data/input/raw_annotations/annotations.json"    # This file is the raw azure coco annotations file.
processed_detections_file = "data/output/processed_detections.json"
processed_annotations_file = "data/output/processed_annotations.json"

# Constants
DEFAULT_CATEGORY_MAPPING = {
    0: 1,  # 0 becomes person (1)
    1: 2,  # 1 becomes license plate (2)
    2: 3,  # 2 becomes container (3)
    3: 4,  # 3 becomes mobile toilet (4)
    4: 5   # 4 becomes scaffolding (5)
}
DEFAULT_CATEGORIES = [
    {"id": 1, "name": "person"},
    {"id": 2, "name": "license plate"},
    {"id": 3, "name": "container"},
    {"id": 4, "name": "mobile toilet"},
    {"id": 5, "name": "scaffolding"}
]

DEFAULT_WIDTH = 1280
DEFAULT_HEIGHT = 720

In [None]:
# Preprocess helpers

def create_image_entry(image_id: int, width: int, height: int, 
                       detection_data: Dict) -> Dict:
    """
    Create a COCO-formatted image entry from detection metadata.

    Args:
        image_id (int): Unique identifier for the image.
        width (int): Width of the image in pixels.
        height (int): Height of the image in pixels.
        detection_data (Dict): Dictionary containing metadata with:
            - image_file_name (str): Name of the image file.
            - image_file_timestamp (str): Timestamp when the image was captured.
            - gps_data (Dict): GPS information with keys:
                * latitude (float)
                * longitude (float)
                * accuracy (float)
                * coordinate_time_stamp (str)
            - record_timestamp (str): Timestamp when the detection was recorded.

    Returns:
        Dict: A dictionary representing a COCO image entry with fields:
            'id', 'width', 'height', 'file_name', 'date_captured',
            'gps_data', and 'record_timestamp'.
    """
    return {
        "id": image_id,
        "width": width,
        "height": height,
        "file_name": detection_data['image_file_name'],
        "date_captured": detection_data['image_file_timestamp'],
        "gps_data": {
            "latitude": detection_data['gps_data']['latitude'],
            "longitude": detection_data['gps_data']['longitude'],
            "accuracy": detection_data['gps_data']['accuracy'],
            "coordinate_time_stamp": detection_data['gps_data']['coordinate_time_stamp']
        },
        "record_timestamp": detection_data['record_timestamp']
    }


def convert_prediction_box(box: Dict) -> List[float]:
    """
    Convert a predicted bounding box to COCO [x, y, width, height] format.

    Args:
        box (Dict): Prediction dictionary containing a 'boundingBox' key with:
            - x_center (float): Normalized x-coordinate of box center.
            - y_center (float): Normalized y-coordinate of box center.
            - width (float): Normalized width of the box.
            - height (float): Normalized height of the box.

    Returns:
        List[float]: COCO-formatted box [x, y, width, height], where (x, y)
                     corresponds to the top-left corner in normalized coordinates.
    """
    x_center = box['boundingBox']['x_center']
    y_center = box['boundingBox']['y_center']
    width = box['boundingBox']['width']
    height = box['boundingBox']['height']
    
    # y is inverted
    x = x_center
    y = 1 - (y_center + height)
    
    return [x, y, width, height]


def group_annotations_by_image(annotations: Dict) -> Dict[int, List[Dict]]:
    """
    Organize a flat list of annotations into groups keyed by image ID.

    Args:
        annotations (Dict): Dictionary with an 'annotations' key containing
                            a list of annotation dicts, each having an 'image_id'.

    Returns:
        Dict[int, List[Dict]]: Mapping from image_id to a list of annotations
                               belonging to that image.
    """
    annotations_by_image = {}
    for ann in annotations['annotations']:
        img_id = ann['image_id']
        if img_id not in annotations_by_image:
            annotations_by_image[img_id] = []
        annotations_by_image[img_id].append(ann)
    return annotations_by_image


def add_gps_to_annotations(annotations: List[Dict], images: List[Dict]) -> List[Dict]:
    """
    Augment each annotation with GPS data from its corresponding image.

    Args:
        annotations (List[Dict]): List of annotation dictionaries, each
                                  containing an 'image_id' field.
        images (List[Dict]): List of image entries (e.g., COCO images)
                              each with 'id' and optional 'gps_data'.

    Returns:
        List[Dict]: The same list of annotations, modified in place, where
                    each annotation has a new 'gps_data' field set to the
                    matching image's GPS info or None if unavailable.
    """
    # Build a mapping from image_id to gps_data
    image_id_to_gps = {img['id']: img.get('gps_data') for img in images}
    for ann in annotations:
        ann['gps_data'] = image_id_to_gps.get(ann['image_id'])
    return annotations


def get_base_filename(filename: str) -> str:
    """
    Extract the base filename from a path or URL, stripping query parameters.

    Args:
        filename (str): Full file path or URL potentially containing query strings.

    Returns:
        str: The base filename without any directory path or query suffix.
    """
    base = os.path.basename(filename)
    return base.split('?')[0]


def create_filename_to_id_mapping(coco_data: Dict) -> Dict[str, int]:
    """
    Build a lookup mapping base filenames to COCO image IDs.

    Args:
        coco_data (Dict): COCO dataset dictionary containing an 'images'
                          list of image entries with 'file_name' and 'id'.

    Returns:
        Dict[str, int]: Dictionary mapping each base filename (stripped of path
                        and query) to its corresponding image ID.
    """
    return {get_base_filename(img['file_name']): img['id'] 
            for img in coco_data.get('images', [])}

In [None]:
def convert_detections_to_coco(detections_dir: str, output_file: str) -> Dict:
    """
    Convert detection JSON files in a directory to COCO format and save to a file.

    Args:
        detections_dir (str): Directory containing detection `.json` files.
        output_file (str): Path to the output file where COCO-formatted JSON will be saved.

    Returns:
        Dict: COCO-formatted dataset including 'images', 'annotations', and 'categories' lists.
    """
    print("Converting detections to COCO format...")
    
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    coco_data = {
        "images": [],
        "annotations": [],
        "categories": DEFAULT_CATEGORIES
    }
    
    image_id = 1
    annotation_id = 1
    
    for json_file in sorted(Path(detections_dir).glob('*.json')):
        try:
            with open(json_file, 'r') as f:
                detection_data = json.load(f)
            
            image_path = json_file.with_suffix('.jpg')
            if not image_path.exists():
                print(f"Warning: Image file not found for {json_file}")
                continue
            
            # Add image entry
            image_entry = create_image_entry(image_id, DEFAULT_WIDTH, DEFAULT_HEIGHT, detection_data)
            coco_data["images"].append(image_entry)
            
            # Process detections
            for detection in detection_data.get('detections', []):
                bbox = convert_prediction_box(detection)
                raw_category_id = detection['object_class']
                
                if raw_category_id not in DEFAULT_CATEGORY_MAPPING:
                    print(f"Warning: Unknown category ID {raw_category_id}")
                    continue
                    
                mapped_category_id = DEFAULT_CATEGORY_MAPPING[raw_category_id]
                
                annotation = {
                    "id": annotation_id,
                    "image_id": image_id,
                    "category_id": mapped_category_id,
                    "bbox": bbox
                }
                coco_data["annotations"].append(annotation)
                annotation_id += 1
            
            image_id += 1
            
        except Exception as e:
            print(f"Error processing {json_file}: {str(e)}")
            continue
    
    # Save COCO format file
    with open(output_file, 'w') as f:
        json.dump(coco_data, f, indent=2)
    
    print(f"Processed {len(coco_data['images'])} images")
    print(f"Total detections: {len(coco_data['annotations'])}")

In [None]:
convert_detections_to_coco(detections_dir, processed_detections_file)

In [None]:
def align_coco_ids(detections_coco: str, annotations_coco: str, 
                    output_file: str) -> Tuple[Dict, Dict]:
    """
    Align image and annotation IDs between two COCO-format JSON files based on matching filenames.

    Args:
        detections_coco (str): Path to the COCO-format file containing detection data.
        annotations_coco (str): Path to the COCO-format file containing ground truth annotations.
        output_file (str): Path to the output file where the updated annotations will be saved.

    Returns:
        Tuple[Dict, Dict]: A tuple containing the original detections dictionary and the updated
                           annotations dictionary with aligned IDs and merged metadata.
    """
    print("Aligning IDs between COCO files...")
    
    os.makedirs(os.path.dirname(output_file), exist_ok=True)
    
    with open(detections_coco, 'r') as f:
        detections = json.load(f)
    with open(annotations_coco, 'r') as f:
        annotations = json.load(f)
    
    # Create mappings
    detections_filename_to_id = create_filename_to_id_mapping(detections)
    annotations_filename_to_id = create_filename_to_id_mapping(annotations)
    
    # Create ID mapping
    id_mapping = {annotations_filename_to_id[filename]: det_id
                    for filename, det_id in detections_filename_to_id.items()
                    if filename in annotations_filename_to_id}
    
    # Create detections data mapping
    detections_data = {img['id']: {
        'gps_data': img['gps_data'],
        'record_timestamp': img['record_timestamp']
    } for img in detections['images']}
    
    # Update annotations
    for img in annotations['images']:
        base_filename = get_base_filename(img['file_name'])
        if base_filename in detections_filename_to_id:
            new_id = detections_filename_to_id[base_filename]
            img['id'] = new_id
            if new_id in detections_data:
                img['gps_data'] = detections_data[new_id]['gps_data']
                img['record_timestamp'] = detections_data[new_id]['record_timestamp']
                img['file_name'] = base_filename
    
    # Update annotation IDs
    annotations_by_image = group_annotations_by_image(annotations)
    new_annotation_id = 1
    
    for old_image_id, new_image_id in id_mapping.items():
        if old_image_id in annotations_by_image:
            for ann in sorted(annotations_by_image[old_image_id], key=lambda x: x['id']):
                ann['id'] = new_annotation_id
                ann['image_id'] = new_image_id
                new_annotation_id += 1
    
    # Sort and save
    annotations['images'] = sorted(annotations['images'], key=lambda x: x['id'])
    annotations['annotations'] = sorted(annotations['annotations'], key=lambda x: x['id'])
    
    with open(output_file, 'w') as f:
        json.dump(annotations, f, indent=2)
    
    print(f"Matched {len(id_mapping)} images")
    print(f"Total annotations: {len(annotations['annotations'])}")

In [None]:
align_coco_ids(processed_detections_file, annotations_file, processed_annotations_file)


### Step 2: Calculate metrics

In [None]:
# Generic helper functions

def boxes_overlap(box1: List[float], box2: List[float]) -> bool: 
    """Check if two bounding boxes overlap at all.
    
    Args:
        box1 (List[float]): First bounding box in format [x, y, width, height]
        box2 (List[float]): Second bounding box in format [x, y, width, height]
    
    Returns:
        bool: True if boxes overlap (intersection area > 0), False otherwise
    """
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[0] + box1[2], box2[0] + box2[2])
    y2 = min(box1[1] + box1[3], box2[1] + box2[3])
    intersection = max(0, x2 - x1) * max(0, y2 - y1)
    return intersection > 0

def filter_detections_to_annotated_images(annotations: List[Dict], 
                                          detections: List[Dict]) -> Tuple[List[Dict], List[Dict]]:
    """Filter detections to only keep those from images that have ground truth annotations.
    
    Args:
        annotations (List[Dict]): List of ground truth annotations
        detections (List[Dict]): List of predicted detections
    
    Returns:
        Tuple[List[Dict], List[Dict]]: A tuple containing:
            - annotations: The original annotations (unchanged)
            - filtered_detections: Only detections from images that have ground truth annotations
    """
    annotated_image_ids = set(ann['image_id'] for ann in annotations)
    filtered_detections = [det for det in detections if det['image_id'] in annotated_image_ids]
    return annotations, filtered_detections

def group_by_image(annotations: List[Dict], 
                   detections: List[Dict]) -> Tuple[Dict[int, List[Dict]], Dict[int, List[Dict]], Set[int]]:
    """Group annotations and detections by image_id.
    
    Args:
        annotations (List[Dict]): List of ground truth annotations
        detections (List[Dict]): List of predicted detections
    
    Returns:
        Tuple[Dict[int, List[Dict]], Dict[int, List[Dict]], Set[int]]: A tuple containing:
            - ground_truth_by_image: Dict mapping image_id to list of annotations
            - predictions_by_image: Dict mapping image_id to list of detections
            - annotated_image_ids: Set of image_ids that have ground truth annotations
    """
    ground_truth_by_image = defaultdict(list)
    predictions_by_image = defaultdict(list)
    
    for annotation in annotations:
        ground_truth_by_image[annotation['image_id']].append(annotation)
    
    annotated_image_ids = set(ground_truth_by_image.keys())
    for detection in detections:
        if detection['image_id'] in annotated_image_ids:
            predictions_by_image[detection['image_id']].append(detection)
            
    return ground_truth_by_image, predictions_by_image, annotated_image_ids

def get_classes_in_image(image_objects: List[Dict]) -> Set[int]:
    """Get set of unique class IDs in an image.
    
    Args:
        image_objects (List[Dict]): List of objects (either annotations or detections)
    
    Returns:
        Set[int]: Set of unique category_ids in the image
    """
    return {obj['category_id'] for obj in image_objects}

def filter_by_bbox_size(annotations: List[Dict], detections: List[Dict], 
                        min_relative_size: float = 0.001) -> Tuple[List[Dict], List[Dict]]:
    """Filter annotations and detections to only keep bounding boxes above a minimum relative size.
    
    Args:
        annotations (List[Dict]): List of ground truth annotations
        detections (List[Dict]): List of predicted detections
        min_relative_size (float): Minimum relative size (as fraction of image) to keep.
                                  Default is 0.001 (0.1% of image size)
    
    Returns:
        Tuple[List[Dict], List[Dict]]: A tuple containing:
            - filtered_annotations: List of annotations with boxes above size threshold
            - filtered_detections: List of detections with boxes above size threshold
    """
    def is_bbox_large_enough(bbox: List[float]) -> bool:
        """Check if bounding box area is above minimum relative size."""
        width, height = bbox[2], bbox[3]  # bbox format is [x, y, width, height]
        return (width * height) >= min_relative_size

    filtered_annotations = [ann for ann in annotations if is_bbox_large_enough(ann['bbox'])]
    filtered_detections = [det for det in detections if is_bbox_large_enough(det['bbox'])]
    
    return filtered_annotations, filtered_detections

def cluster_images_by_gps_and_select_per_class(annotations: List[Dict], 
                                               detections: List[Dict], 
                                               eps_meters: float = 20, 
                                               min_samples: int = 1) -> Tuple[List[Dict], List[Dict]]:
    """Cluster images by GPS location and select one image per cluster per class.
    
    Clusters images by GPS location using DBSCAN. For each cluster and each class,
    keeps only one image (the one with the most detections of that class).
    Filters annotations and detections to only those in the selected images.
    
    Args:
        annotations (List[Dict]): List of ground truth annotations (with gps_data)
        detections (List[Dict]): List of predicted detections (with gps_data)
        eps_meters (float): DBSCAN epsilon in meters. Default is 20.
        min_samples (int): DBSCAN min_samples. Default is 1.
    
    Returns:
        Tuple[List[Dict], List[Dict]]: A tuple containing:
            - filtered_annotations: Annotations from selected images
            - filtered_detections: Detections from selected images
    """
    # Build image_id to GPS mapping from annotations
    image_gps = {}
    for ann in annotations:
        img_id = ann['image_id']
        if img_id not in image_gps and ann.get('gps_data'):
            gps = ann['gps_data']
            if 'latitude' in gps and 'longitude' in gps:
                image_gps[img_id] = (gps['latitude'], gps['longitude'])

    image_ids = list(image_gps.keys())
    if not image_ids:
        return annotations, detections

    coords = np.array([image_gps[img_id] for img_id in image_ids])
    coords_rad = np.radians(coords)
    db = DBSCAN(eps=eps_meters/6371008.8, min_samples=min_samples, metric='haversine')
    labels = db.fit_predict(coords_rad)

    # Map image_id to cluster label
    image_id_to_cluster = {img_id: label for img_id, label in zip(image_ids, labels)}

    # For each cluster and class, keep one image (with most detections of that class)
    cluster_class_to_images = defaultdict(lambda: defaultdict(list))
    image_class_count = defaultdict(lambda: defaultdict(int))
    for det in detections:
        img_id = det['image_id']
        class_id = det['category_id']
        if img_id in image_id_to_cluster:
            cluster = image_id_to_cluster[img_id]
            cluster_class_to_images[cluster][class_id].append(img_id)
            image_class_count[img_id][class_id] += 1

    selected_image_ids = set()
    for cluster, class_to_images in cluster_class_to_images.items():
        for class_id, img_ids in class_to_images.items():
            best_img_id = max(img_ids, key=lambda img_id: image_class_count[img_id][class_id])
            selected_image_ids.add(best_img_id)

    filtered_annotations = [ann for ann in annotations if ann['image_id'] in selected_image_ids]
    filtered_detections = [det for det in detections if det['image_id'] in selected_image_ids]
    return filtered_annotations, filtered_detections

In [None]:
# Metric calculation helpers

def calculate_class_metrics(metrics: Dict[str, int], class_id: int) -> Dict[str, float]:
    """Calculate precision and recall for a single class.
    
    Special handling for classes 1 and 2:
    - These classes only have false positives and false negatives
    - Only recall is calculated and returned
    
    Args:
        metrics (Dict[str, int]): Dictionary containing counts of:
            - tp: True positives
            - fp: False positives
            - fn: False negatives
        class_id (int): ID of the class to calculate metrics for
    
    Returns:
        Dict[str, float]: Dictionary containing:
            - For classes 1 and 2: recall, true_positives, false_positives, false_negatives
            - For other classes: precision, recall, true_positives, false_positives, false_negatives
    """
    true_positives = metrics['tp']
    false_positives = metrics['fp']
    false_negatives = metrics['fn']

    # For classes 1 and 2, only calculate recall
    if class_id in [1, 2]:
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        return {
            'recall': recall,
            'true_positives': true_positives,
            'false_positives': false_positives,
            'false_negatives': false_negatives
        }

    # For other classes, calculate both precision and recall
    precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
    recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0

    return {
        'precision': precision,
        'recall': recall,
        'true_positives': true_positives,
        'false_positives': false_positives,
        'false_negatives': false_negatives
    }

def process_image_predictions(image_ground_truth: List[Dict], 
                            image_predictions: List[Dict], 
                            class_metrics: Dict[int, Dict[str, int]]) -> None:
    """Process predictions for a single image using binary overlap.
    
    For each detection:
    - If it overlaps with an unmatched ground truth of the same class: count as true positive
    - If no overlap: count as false positive
    - Unmatched ground truth: count as false negative
    
    Special handling for classes 1 and 2:
    - These classes only have false negative annotations
    - All detections are counted as true positives (we found a false negative)
    - All ground truth annotations are counted as false negatives (they were missed detections)
    
    Args:
        image_ground_truth (List[Dict]): Ground truth annotations for an image
        image_predictions (List[Dict]): Predicted detections for an image
        class_metrics (Dict[int, Dict[str, int]]): Dictionary to store metrics per class,
            modified in place. Each class has counts for 'tp', 'fp', and 'fn'.
    """
    matched_ground_truth = set()
    
    for prediction in image_predictions:
        prediction_class = prediction['category_id']
        prediction_box = prediction['bbox']
        matched = False

        # For classes 1 and 2, all detections are true positives
        if prediction_class in [1, 2]:
            class_metrics[prediction_class]['tp'] += 1
            continue

        for gt_idx, ground_truth in enumerate(image_ground_truth):
            if gt_idx in matched_ground_truth:
                continue
            if ground_truth['category_id'] == prediction_class:
                ground_truth_box = ground_truth['bbox']
                if boxes_overlap(prediction_box, ground_truth_box):
                    class_metrics[prediction_class]['tp'] += 1
                    matched_ground_truth.add(gt_idx)
                    matched = True
                    break

        if not matched:
            class_metrics[prediction_class]['fp'] += 1

    for gt_idx, ground_truth in enumerate(image_ground_truth):
        # Special handling for classes 1 and 2, they only have false negative annotations
        if ground_truth['category_id'] in [1, 2]:
            class_metrics[ground_truth['category_id']]['fn'] += 1
        elif gt_idx not in matched_ground_truth:
            class_metrics[ground_truth['category_id']]['fn'] += 1

def calculate_binary_metrics(annotations: List[Dict], detections: List[Dict]) -> Dict[int, Dict[str, float]]: 
    """Calculate precision and recall using binary overlap (any overlap counts).
    
    For each class:
    - True positive: Detection overlaps with ground truth of same class
    - False positive: Detection doesn't overlap with any ground truth of same class
    - False negative: Ground truth doesn't overlap with any detection of same class
    
    Special classes (1 and 2):
    - Only recall is calculated
    - Recall is calculated as (detections) / (detections + annotations)
    
    Args:
        annotations (List[Dict]): Ground truth annotations
        detections (List[Dict]): Predicted detections
    
    Returns:
        Dict[int, Dict[str, float]]: Metrics per class including:
            - For classes 1 and 2: recall, true_positives, false_positives, false_negatives
            - For other classes: precision, recall, true_positives, false_positives, false_negatives
    """
    ground_truth_by_image, predictions_by_image, annotated_image_ids = group_by_image(annotations, detections)
    class_metrics = defaultdict(lambda: {'tp': 0, 'fp': 0, 'fn': 0, 'total_detections': 0, 'total_annotations': 0})

    for image_id in annotated_image_ids:
        image_ground_truth = ground_truth_by_image[image_id]
        image_predictions = predictions_by_image[image_id]
        process_image_predictions(image_ground_truth, image_predictions, class_metrics)

    results = {}
    for class_id, metrics in class_metrics.items():
        results[class_id] = calculate_class_metrics(metrics, class_id)

    return results

def process_image_level_predictions(image_ground_truth: List[Dict], 
                                  image_predictions: List[Dict], 
                                  class_metrics: Dict[int, Dict[str, int]]) -> None:
    """Process image-level predictions where co-existence counts as true positive.
    
    For each class:
    - True positive: Class appears in both ground truth and predictions for this image
    - False positive: Class appears only in predictions for this image
    - False negative: Class appears only in ground truth for this image
    
    The counts represent the number of images where each case occurs.
    
    Args:
        image_ground_truth (List[Dict]): Ground truth annotations for an image
        image_predictions (List[Dict]): Predicted detections for an image
        class_metrics (Dict[int, Dict[str, int]]): Dictionary to store metrics per class,
            modified in place. Each class has counts for 'tp', 'fp', and 'fn'.
    """
    gt_classes = get_classes_in_image(image_ground_truth)
    pred_classes = get_classes_in_image(image_predictions)
    
    for class_id in gt_classes & pred_classes:
        class_metrics[class_id]['tp'] += 1  # One image counted as true positive
    
    for class_id in pred_classes - gt_classes:
        class_metrics[class_id]['fp'] += 1  # One image counted as false positive
    
    for class_id in gt_classes - pred_classes:
        class_metrics[class_id]['fn'] += 1  # One image counted as false negative

def calculate_image_level_metrics(annotations: List[Dict], detections: List[Dict]) -> Dict[int, Dict[str, float]]:
    """Calculate precision and recall at image level (co-existence counts as true positive).
    
    For each class:
    - True positives: Number of images where the class appears in both ground truth and predictions
    - False positives: Number of images where the class appears only in predictions
    - False negatives: Number of images where the class appears only in ground truth
    
    Precision = TP / (TP + FP)  # Proportion of images with predictions that are correct
    Recall = TP / (TP + FN)     # Proportion of images with ground truth that are detected
    
    Args:
        annotations (List[Dict]): Ground truth annotations
        detections (List[Dict]): Predicted detections
    
    Returns:
        Dict[int, Dict[str, float]]: Metrics per class including:
            - For classes 1 and 2: recall, true_positives, false_positives, false_negatives
            - For other classes: precision, recall, true_positives, false_positives, false_negatives
    """
    ground_truth_by_image, predictions_by_image, annotated_image_ids = group_by_image(annotations, detections)
    class_metrics = defaultdict(lambda: {'tp': 0, 'fp': 0, 'fn': 0})

    for image_id in annotated_image_ids:
        image_ground_truth = ground_truth_by_image[image_id]
        image_predictions = predictions_by_image[image_id]
        process_image_level_predictions(image_ground_truth, image_predictions, class_metrics)

    results = {}
    for class_id, metrics in class_metrics.items():
        results[class_id] = calculate_class_metrics(metrics, class_id)

    return results

def calculate_metrics(annotations: List[Dict], detections: List[Dict]) -> Dict[str, Dict[int, Dict[str, float]]]:
    """Calculate both binary overlap and image-level metrics.
    
    Args:
        annotations (List[Dict]): Ground truth annotations
        detections (List[Dict]): Predicted detections
    
    Returns:
        Dict[str, Dict[int, Dict[str, float]]]: Dictionary containing:
            - binary_overlap: Metrics calculated using binary overlap
            - image_level: Metrics calculated at image level
    """
    binary_metrics = calculate_binary_metrics(annotations, detections)
    image_level_metrics = calculate_image_level_metrics(annotations, detections)
    return {'binary_overlap': binary_metrics, 'image_level': image_level_metrics}

In [None]:
# Load and save helpers

def load_data() -> Tuple[List[Dict], List[Dict], List[Dict]]:
    """Load and prepare annotation and detection data for evaluation.
    
    Loads the processed annotations and detections from JSON files, extracts the relevant
    data, and adds GPS information to both annotations and detections.
    
    Returns:
        Tuple[List[Dict], List[Dict], List[Dict]]: A tuple containing:
            - annotations: List of ground truth annotations with GPS data
            - detections: List of predicted detections with GPS data
            - images: List of image metadata from the detections file
    """
    with open('data/output/processed_annotations.json', 'r') as f:
        ground_truth_annotations = json.load(f)
    with open('data/output/processed_detections.json', 'r') as f:
        predicted_detections = json.load(f)
    images = predicted_detections.get('images', [])
    detections = predicted_detections['annotations']
    annotations = ground_truth_annotations['annotations']
    
    # Add GPS data to both annotations and detections
    add_gps_to_annotations(annotations, images)
    add_gps_to_annotations(detections, images)
    return annotations, detections, images

def save_metrics(metrics: Dict[str, Dict[int, Dict[str, float]]], 
                threshold: float, 
                mode: str, 
                output_dir: str) -> None:
    """Save evaluation metrics to JSON files.
    
    Creates a directory structure based on the threshold and mode, then saves
    both binary overlap and image-level metrics as separate JSON files.
    
    Args:
        metrics (Dict[str, Dict[int, Dict[str, float]]]): Dictionary containing:
            - binary_overlap: Metrics calculated using binary overlap
            - image_level: Metrics calculated at image level
        threshold (float): The bounding box size threshold used for evaluation
        mode (str): Either 'unclustered' or 'clustered' to indicate the evaluation mode
        output_dir (str): Base directory where metrics will be saved
    """
    threshold_dir = f'{output_dir}/{mode}/threshold_{threshold:.5f}'
    os.makedirs(threshold_dir, exist_ok=True)
    with open(f'{threshold_dir}/binary_overlap.json', 'w') as f:
        json.dump(metrics['binary_overlap'], f, indent=2)
    with open(f'{threshold_dir}/image_level.json', 'w') as f:
        json.dump(metrics['image_level'], f, indent=2)

In [None]:
# Create output directory
output_dir = 'data/output/metrics'
os.makedirs(output_dir, exist_ok=True)
    
# Define thresholds
thresholds = np.arange(0, 0.10 + 0.00005, 0.00005)

# Run metrics for each threshold
eps_meters = 5
unclustered_binary_metrics_data = []
unclustered_image_metrics_data = []
clustered_binary_metrics_data = []
clustered_image_metrics_data = []

# Load data
all_annotations, all_detections, all_images = load_data()

for threshold in thresholds:
    print(f"Processing threshold: {threshold:.5f}")
    # Filter by bbox size
    annotations, detections = filter_by_bbox_size(all_annotations, all_detections, threshold)
    # Filter out detection images to only keep annotated images
    annotations, detections = filter_detections_to_annotated_images(annotations, detections)
    # Cluster
    clustered_annotations, clustered_detections = cluster_images_by_gps_and_select_per_class(annotations, detections, eps_meters=eps_meters)

    # Calculate metrics
    metrics = calculate_metrics(annotations, detections)
    clustered_metrics = calculate_metrics(clustered_annotations, clustered_detections)
    # Save metrics
    save_metrics(metrics, threshold, 'unclustered', output_dir)
    save_metrics(clustered_metrics, threshold, 'clustered', output_dir)

    # Collect metrics for plotting
    unclustered_binary_metrics_data.append(metrics['binary_overlap'])
    unclustered_image_metrics_data.append(metrics['image_level'])
    clustered_binary_metrics_data.append(clustered_metrics['binary_overlap'])
    clustered_image_metrics_data.append(clustered_metrics['image_level'])

### Step 3: Plot results

In [None]:
# Plot helpers

def plot_metrics(thresholds: List[float], 
                metrics_data: List[Dict[int, Dict[str, float]]], 
                metric_type: str, 
                output_dir: str) -> None:
    """Create plots showing precision and recall trends for each class.
    
    For each class, creates a line plot showing how precision and recall change
    with different bounding box size thresholds. Classes 1 and 2 only show recall
    since they don't have precision metrics.
    
    Args:
        thresholds (List[float]): List of bounding box size thresholds used for evaluation
        metrics_data (List[Dict[int, Dict[str, float]]]): List of metric dictionaries, one per threshold.
            Each dictionary maps class IDs to their metrics, which may include:
            - precision: Precision score (not present for classes 1 and 2)
            - recall: Recall score
        metric_type (str): Type of metrics being plotted (e.g. 'binary_overlap' or 'image_level')
        output_dir (str): Directory where plot images will be saved
    
    The function creates one plot per class, saved as PNG files with names like:
    'class_1_binary_overlap.png', 'class_2_image_level.png', etc.
    """
    # Get all class IDs that exist in the data
    all_class_ids = set()
    for metrics in metrics_data:
        all_class_ids.update(metrics.keys())
    class_ids = sorted(all_class_ids)

    for class_id in class_ids:
        recalls = []
        precisions = []

        for metrics in metrics_data:
            if class_id in metrics:
                recalls.append(metrics[class_id].get('recall', 0))
                # Only append precision if it exists
                if 'precision' in metrics[class_id]:
                    precisions.append(metrics[class_id]['precision'])
                else:
                    precisions.append(None)
            else:
                recalls.append(0)
                precisions.append(None)

        plt.figure(figsize=(10, 6))
        plt.plot(thresholds, recalls, 'r-', label='Recall')
        if any(p is not None for p in precisions):
            # Only plot precision if it exists for this class
            plt.plot(thresholds, [p if p is not None else 0 for p in precisions], 'b-', label='Precision')

        plt.title(f'Class {class_id} - {metric_type} Metrics')
        plt.xlabel('Bounding Box Size Threshold')
        plt.ylabel('Score')
        plt.ylim(0, 1)
        plt.grid(True)
        plt.legend()
        plt.savefig(f'{output_dir}/class_{class_id}_{metric_type}.png')
        plt.close()

In [None]:
# Create output directory
output_dir = 'data/output/plots'
os.makedirs(output_dir, exist_ok=True)

# Plot results
plot_metrics(thresholds, unclustered_binary_metrics_data, 'binary_overlap (unclustered)', output_dir)
plot_metrics(thresholds, clustered_binary_metrics_data, 'binary_overlap (clustered)', output_dir)
plot_metrics(thresholds, unclustered_image_metrics_data, 'image_level (unclustered)', output_dir)
plot_metrics(thresholds, clustered_image_metrics_data, 'image_level (clustered)', output_dir)

### Step 3: Create HTML report

In [None]:
# Constants

# Paths
METRICS_DIR = 'data/output/metrics'
PLOTS_DIR = 'data/output/plots'
IMAGES_DIR = 'data/input/raw_detections'
ANNOTATED_DIR = 'data/output/annotated_examples'
REPORT_PATH = 'data/output/report.html'

# Classes
CLASSES = [1, 2, 3, 4, 5]

In [None]:
def draw_boxes(image_path: str, boxes: List[List[float]], color: Tuple[int, int, int], label: Optional[str] = None) -> np.ndarray:
    """Draw bounding boxes on an image.
    
    Args:
        image_path (str): Path to the input image
        boxes (List[List[float]]): List of bounding boxes in format [[x, y, width, height], ...]
        color (Tuple[int, int, int]): BGR color tuple for the boxes
        label (Optional[str]): Optional label to display above each box
    
    Returns:
        np.ndarray: Image with drawn boxes
    """
    img = cv2.imread(image_path)
    for box in boxes:
        x, y, w, h = box
        h_img, w_img = img.shape[:2]
        pt1 = (int(x * w_img), int(y * h_img))
        pt2 = (int((x + w) * w_img), int((y + h) * h_img))
        cv2.rectangle(img, pt1, pt2, color, 2)
        if label:
            cv2.putText(img, label, (pt1[0], pt1[1] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
    return img

def save_annotated_example(image_id: int, image_info: Dict[str, Any], class_id: int, 
                         fp_boxes: List[List[float]], fn_boxes: List[List[float]], 
                         prefix: str) -> Tuple[Optional[str], Optional[str]]:
    """Save annotated images showing false positives and false negatives.
    
    Args:
        image_id (int): ID of the image
        image_info (Dict[str, Any]): Dictionary containing image information including file_name
        class_id (int): ID of the class being evaluated
        fp_boxes (List[List[float]]): List of false positive bounding boxes
        fn_boxes (List[List[float]]): List of false negative bounding boxes
        prefix (str): Prefix for output filenames
    
    Returns:
        Tuple[Optional[str], Optional[str]]: Paths to saved FP and FN images, or None if no boxes
    """
    file_name = image_info['file_name']
    img_path = os.path.join(IMAGES_DIR, file_name)
    img = cv2.imread(img_path)
    if img is None:
        return None, None
    # Draw FPs in red, FNs in blue
    img_fp = draw_boxes(img_path, fp_boxes, (0,0,255), 'FP') if fp_boxes else img.copy()
    img_fn = draw_boxes(img_path, fn_boxes, (255,0,0), 'FN') if fn_boxes else img.copy()
    out_fp = f'{ANNOTATED_DIR}/{prefix}_class{class_id}_img{image_id}_fp.jpg'
    out_fn = f'{ANNOTATED_DIR}/{prefix}_class{class_id}_img{image_id}_fn.jpg'
    if fp_boxes:
        cv2.imwrite(out_fp, img_fp)
    if fn_boxes:
        cv2.imwrite(out_fn, img_fn)
    return out_fp if fp_boxes else None, out_fn if fn_boxes else None

def find_fp_fn(ann_by_img_cls: Dict[int, Dict[int, List[Dict[str, Any]]]], 
               det_by_img_cls: Dict[int, Dict[int, List[Dict[str, Any]]]], 
               class_id: int, 
               max_examples: int = 9) -> Tuple[List[Tuple[int, List[float]]], List[Tuple[int, List[float]]]]:
    """Find false positive and false negative examples for a given class.
    
    Args:
        ann_by_img_cls (Dict[int, Dict[int, List[Dict[str, Any]]]]): Ground truth annotations by image and class
        det_by_img_cls (Dict[int, Dict[int, List[Dict[str, Any]]]]): Detections by image and class
        class_id (int): ID of the class to evaluate
        max_examples (int): Maximum number of examples to return for each type
    
    Returns:
        Tuple[List[Tuple[int, List[float]]], List[Tuple[int, List[float]]]]: Lists of (image_id, box) tuples for FPs and FNs
    """
    fp_examples = []
    fn_examples = []
    for image_id in ann_by_img_cls:
        gt_boxes = [a['bbox'] for a in ann_by_img_cls[image_id][class_id]]
        det_boxes = [d['bbox'] for d in det_by_img_cls[image_id][class_id]]
        # FNs: GT box not matched by any detection
        for gt in gt_boxes:
            matched = any(boxes_overlap(gt, det) for det in det_boxes)
            if not matched:
                fn_examples.append((image_id, gt))
        # FPs: Det box not matched by any GT
        for det in det_boxes:
            matched = any(boxes_overlap(det, gt) for gt in gt_boxes)
            if not matched:
                fp_examples.append((image_id, det))
    # Randomly sample up to max_examples
    if len(fp_examples) > max_examples:
        fp_examples = random.sample(fp_examples, max_examples)
    if len(fn_examples) > max_examples:
        fn_examples = random.sample(fn_examples, max_examples)
    return fp_examples, fn_examples

def img_to_base64(img_path: str) -> str:
    """Convert an image file to base64 string.
    
    Args:
        img_path (str): Path to the image file
    
    Returns:
        str: Base64 encoded string of the image
    """
    with open(img_path, 'rb') as img_f:
        return base64.b64encode(img_f.read()).decode('utf-8')

def plot_to_base64(plot_path: str) -> str:
    """Convert a plot file to base64 string.
    
    Args:
        plot_path (str): Path to the plot file
    
    Returns:
        str: Base64 encoded string of the plot
    """
    with open(plot_path, 'rb') as img_f:
        return base64.b64encode(img_f.read()).decode('utf-8')

In [None]:
# Create directory for new annotation
os.makedirs(ANNOTATED_DIR, exist_ok=True)

# Load class names from processed_annotations.json
with open('data/output/processed_annotations.json') as f:
    gt_data = json.load(f)
category_id_to_name = {cat['id']: cat['name'] for cat in gt_data['categories']}

# Load binary metrics at threshold 0
with open(f'{METRICS_DIR}/unclustered/threshold_0.00000/binary_overlap.json') as f:
    unclustered_metrics = json.load(f)
with open(f'{METRICS_DIR}/clustered/threshold_0.00000/binary_overlap.json') as f:
    clustered_metrics = json.load(f)

# Load detections
with open('data/output/processed_detections.json') as f:
    det_data = json.load(f)

detections = det_data['annotations']
images_info = {img['id']: img for img in det_data['images']}
annotations = gt_data['annotations']

# Group by image_id and class
ann_by_img_cls = defaultdict(lambda: defaultdict(list))
det_by_img_cls = defaultdict(lambda: defaultdict(list))
for ann in annotations:
    ann_by_img_cls[ann['image_id']][ann['category_id']].append(ann)
for det in detections:
    det_by_img_cls[det['image_id']][det['category_id']].append(det)

In [None]:
# HTML header
html = ['<html><head><title>Detection Metrics Report</title>\
<style>\
body { font-family: "Segoe UI", Arial, sans-serif; background: #f8f9fa; color: #222; margin: 0; padding: 0 0 40px 0; }\
h1 { background: #2d6cdf; color: #fff; margin: 0 0 24px 0; padding: 24px 0 16px 32px; font-size: 2.2em; letter-spacing: 1px; }\
h2 { color: #2d6cdf; border-bottom: 2px solid #e3e6ea; padding-bottom: 4px; margin-top: 36px; margin-bottom: 12px; }\
h3 { color: #1a3a6b; margin-top: 28px; margin-bottom: 8px; }\
p, b { font-size: 1.08em; }\
table { border-collapse: collapse; margin: 24px 0; background: #fff; box-shadow: 0 2px 8px #0001; }\
th, td { border: 1px solid #e3e6ea; padding: 10px 18px; text-align: center; font-size: 1.05em; }\
th { background: #e3e6ea; color: #1a3a6b; }\
tr:nth-child(even) { background: #f4f6fa; }\
tr:hover { background: #eaf1fb; }\
div[style*="display:flex"] { gap: 16px; flex-wrap: wrap; margin-bottom: 18px; }\
img { border-radius: 8px; box-shadow: 0 2px 8px #0002; transition: transform 0.2s, box-shadow 0.2s; background: #fff; }\
img:hover { transform: scale(1.04); box-shadow: 0 4px 16px #0003; }\
</style></head><body>']
html.append('<h1>Detection Metrics Report</h1>')
html.append('<p><b>Bounding box size threshold</b>:  The minimum relative area (as a fraction of the image) that a bounding box must have to be included in the evaluation.</p>')
html.append('<p><b>Unclustered:</b> metrics are computed on the raw unclustered detections.</p>')
html.append('<p><b>Clustered:</b> metrics are computed after grouping images by GPS proximity, and only keeping the image with the most confident detection per object. This removes false negatives for the same object in different images. We use a 5m radius for clustering.</p>')
html.append('<p><b>Evaluation metric:</b> Binary overlap, which means that a detection is considered a true positive if its bounding box overlaps with a ground truth bounding box of the same class. Detections that do not overlap with any ground truth are counted as false positives, and ground truth boxes not overlapped by any detection are counted as false negatives.</p>')

# Section: Classes 1 and 2
for class_id in [1,2]:
    class_name = category_id_to_name[class_id]
    recall = unclustered_metrics[str(class_id)]['recall']
    html.append(f'<h2>{class_name.title()}</h2>')
    html.append(f'<b>Recall (size threshold 0, unclustered):</b> {recall:.4f}<br>')
    # Annotated examples
    html.append('<h3>False Positives</h3>')
    html.append('<p>Not shown: For this class, only false negatives are annotated, so false positives are not available for visualization.</p>')
    fp_examples, fn_examples = find_fp_fn(ann_by_img_cls, det_by_img_cls, class_id)
    html.append(f'<h3>False Negatives</h3><div style="display:flex;flex-wrap:wrap;">')
    for idx, (img_id, box) in enumerate(fn_examples):
        img_info = images_info[img_id]
        _, fn_path = save_annotated_example(img_id, img_info, class_id, [], [box], f'class{class_id}_fn{idx}')
        if fn_path:
            img_b64 = img_to_base64(fn_path)
            html.append(f'<img src="data:image/jpeg;base64,{img_b64}" width="320" style="margin:5px;">')
    html.append('</div>')

# Section: Classes 3, 4, 5
html.append('<h2>Precision and Recall (size threshold 0)</h2>')
html.append('<table border="1"><tr><th>Class</th><th>Precision (Unclustered)</th><th>Recall (Unclustered)</th><th>Precision (Clustered)</th><th>Recall (Clustered)</th></tr>')
for class_id in [3,4,5]:
    class_name = category_id_to_name[class_id]
    u = unclustered_metrics[str(class_id)]
    c = clustered_metrics[str(class_id)]
    html.append(f'<tr><td>{class_name.title()}</td><td>{u.get("precision","-"):.4f}</td><td>{u["recall"]:.4f}</td><td>{c.get("precision","-"):.4f}</td><td>{c["recall"]:.4f}</td></tr>')
html.append('</table>')

# Plots
for class_id in [3,4,5]:
    class_name = category_id_to_name[class_id]
    html.append(f'<h3>{class_name.title()} Precision/Recall Plot (Unclustered)</h3>')
    plot_path = f'{PLOTS_DIR}/class_{class_id}_binary_overlap (unclustered).png'
    plot_b64 = plot_to_base64(plot_path)
    html.append(f'<img src="data:image/png;base64,{plot_b64}" width="600">')
    html.append(f'<h3>{class_name.title()} Precision/Recall Plot (Clustered)</h3>')
    plot_path = f'{PLOTS_DIR}/class_{class_id}_binary_overlap (clustered).png'
    plot_b64 = plot_to_base64(plot_path)
    html.append(f'<img src="data:image/png;base64,{plot_b64}" width="600">')

# Annotated examples for 3,4,5
for class_id in [3,4,5]:
    class_name = category_id_to_name[class_id]
    html.append(f'<h3>{class_name.title()} False Positives</h3><div style="display:flex;flex-wrap:wrap;">')
    fp_examples, fn_examples = find_fp_fn(ann_by_img_cls, det_by_img_cls, class_id)
    for idx, (img_id, box) in enumerate(fp_examples):
        img_info = images_info[img_id]
        fp_path, _ = save_annotated_example(img_id, img_info, class_id, [box], [], f'class{class_id}_fp{idx}')
        if fp_path:
            img_b64 = img_to_base64(fp_path)
            html.append(f'<img src="data:image/jpeg;base64,{img_b64}" width="320" style="margin:5px;">')
    html.append('</div>')
    html.append(f'<h3>{class_name.title()} False Negatives</h3><div style="display:flex;flex-wrap:wrap;">')
    for idx, (img_id, box) in enumerate(fn_examples):
        img_info = images_info[img_id]
        _, fn_path = save_annotated_example(img_id, img_info, class_id, [], [box], f'class{class_id}_fn{idx}')
        if fn_path:
            img_b64 = img_to_base64(fn_path)
            html.append(f'<img src="data:image/jpeg;base64,{img_b64}" width="320" style="margin:5px;">')
    html.append('</div>')

html.append('</body></html>')

with open(REPORT_PATH, 'w') as f:
    f.write('\n'.join(html))

print(f'Report generated: {REPORT_PATH}') 