## Install packages and import libraries

In [1]:
# Install required Python packages
!pip install --upgrade pip
!pip install onnx onnxruntime git+https://github.com/espressif/esp-ppq.git
!pip install protobuf==3.20.2
!pip install torchvision==0.17.2
!git lfs install
!find imagenet-sample-images/ -mindepth 1 -name ".*" -exec rm -rf {} +
!pip install ultralytics


Collecting git+https://github.com/espressif/esp-ppq.git
  Cloning https://github.com/espressif/esp-ppq.git to /private/var/folders/mv/96db_j8j3ng3t9pc8jczncdh0000gn/T/pip-req-build-ftihjphm
  Running command git clone --filter=blob:none --quiet https://github.com/espressif/esp-ppq.git /private/var/folders/mv/96db_j8j3ng3t9pc8jczncdh0000gn/T/pip-req-build-ftihjphm
  Resolved https://github.com/espressif/esp-ppq.git to commit 2d66669c6d264d64d00b677b3c581f4350c3642c
  Preparing metadata (setup.py) ... [?25ldone
Updated Git hooks.
Git LFS initialized.
find: imagenet-sample-images/: No such file or directory


In [2]:
from pathlib import Path
import os
from typing import Final
from PIL import Image
from ppq.api import espdl_quantize_onnx
import torch
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
from ultralytics import YOLO
import sys
sys.path.append("./coco_detect/generate_onnx")
from export_onnx import ESP_YOLO, ESP_Attention, ESP_Detect
from ultralytics.nn.modules import Attention, Detect



    ___________ ____        ____  ____  ____ 
   / ____/ ___// __ \      / __ \/ __ \/ __ \
  / __/  \__ \/ /_/ /_____/ /_/ / /_/ / / / /
 / /___ ___/ / ____/_____/ ____/ ____/ /_/ / 
/_____//____/_/         /_/   /_/    \___\_\ 


Ultralytics 8.3.130 🚀 Python-3.10.11 torch-2.2.2 CPU (Apple M3)
YOLO11n summary (fused): 100 layers, 2,616,248 parameters, 0 gradients, 6.5 GFLOPs

[34m[1mPyTorch:[0m starting from '../models/yolo11n.pt' with input shape (1, 3, 640, 640) BCHW and output shape(s) ((1, 64, 80, 80), (1, 80, 80, 80), (1, 64, 40, 40), (1, 80, 40, 40), (1, 64, 20, 20), (1, 80, 20, 20)) (5.4 MB)

[34m[1mONNX:[0m starting export with onnx 1.17.0 opset 13...
[34m[1mONNX:[0m simplifying with onnxsim 0.4.36...
[34m[1mONNX:[0m export success ✅ 0.6s, saved as '../models/yolo11n.onnx' (10.1 MB)

Export complete (0.8s)
Results saved to [1m/Users/christophknaden/git/leezencounter/models[0m
Predict:         yolo predict task=detect model=../models/yolo11n.onnx imgsz=640  
Vali

## Make predictions with base YOLO model

In [3]:
from pathlib import Path
from typing import Final

model_path = "coco_detect/models/last.pt"  # Add your model path here

model = YOLO(model_path)

torch_model = model.model.eval()

IMAGE_PATH: Final[Path] = Path('./calib_images_compressed') # Path to your images directory
OUTPUT_DIR: Final[Path] = Path('./preds_calib_images') # Path to save predictions
OUTPUT_DIR.mkdir(exist_ok=True)
CONF_THRESHOLD = 0.10  # Minimum confidence for a detection to be considered
IOU_THRESHOLD = 0.70   # IoU threshold for Non-Maximum Suppression (NMS)
MAX_DETECTIONS = 20

for img_path in IMAGE_PATH.glob("*.jpg"):
    results = model(
        img_path,
        conf=CONF_THRESHOLD,
        iou=IOU_THRESHOLD,
        max_det=MAX_DETECTIONS,
        verbose=False
    )

    # Save the annotated image
    results[0].save(filename=OUTPUT_DIR / f"output_{img_path.name}")

    # Convert the results to a pandas DataFrame
    pred_df = results[0].to_df()

    # Save the DataFrame to CSV
    pred_df.to_csv(OUTPUT_DIR / f"output_{img_path.stem}.csv", index=False)


## Create .onnx file

In [4]:
model = ESP_YOLO(model_path)
for m in model.modules():
    if isinstance(m, Attention):
        m.forward = ESP_Attention.forward.__get__(m)
    if isinstance(m, Detect):
        m.forward = ESP_Detect.forward.__get__(m)

# Export to ONNX
model.export(format="onnx", simplify=True, opset=13, dynamic=True, imgsz=640)

Ultralytics 8.3.130 🚀 Python-3.10.11 torch-2.2.2 CPU (Apple M3)
YOLO11n summary (fused): 100 layers, 2,582,542 parameters, 0 gradients, 6.3 GFLOPs

[34m[1mPyTorch:[0m starting from 'coco_detect/models/last.pt' with input shape (1, 3, 640, 640) BCHW and output shape(s) ((1, 64, 80, 80), (1, 2, 80, 80), (1, 64, 40, 40), (1, 2, 40, 40), (1, 64, 20, 20), (1, 2, 20, 20)) (5.2 MB)

[34m[1mONNX:[0m starting export with onnx 1.17.0 opset 13...
[34m[1mONNX:[0m simplifying with onnxsim 0.4.36...
[34m[1mONNX:[0m export success ✅ 0.5s, saved as 'coco_detect/models/last.onnx' (9.9 MB)

Export complete (0.7s)
Results saved to [1m/Users/christophknaden/git/leezencounter/model-deployment/coco_detect/models[0m
Predict:         yolo predict task=detect model=coco_detect/models/last.onnx imgsz=640  
Validate:        yolo val task=detect model=coco_detect/models/last.onnx imgsz=640 data=./datasets/combined_rotate/combined_yolo_dataset.yaml  
Visualize:       https://netron.app


'coco_detect/models/last.onnx'

## Create calibration dataset

In [5]:
DEVICE: str = 'cpu'

BATCH_SIZE = 1
IMAGE_SIZE = 640

IMAGENET_PATH: Final[Path] = Path('./calib_images_compressed') # Path to your calibration images directory

class ImageFolderDataset(Dataset):
    def __init__(self, image_dir: Path, transform=None):
        self.image_dir = image_dir

        # Get the list of file names
        file_names = [
            f for f in os.listdir(image_dir)
            if f.lower().endswith((".jpg", ".jpeg", ".png"))
        ]

        self.image_files = sorted(file_names)

        self.transform = transform

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Now, idx reliably maps to the sorted list of file names
        image_path = self.image_dir / self.image_files[idx]

        image = Image.open(image_path).convert("RGB")
        if self.transform:
            transformed_image = self.transform(image)

        return transformed_image

# Transformation (resize only — no normalization!)
transform = transforms.Compose([
    transforms.Resize((IMAGE_SIZE, IMAGE_SIZE)),
    transforms.ToTensor(),
])


# Create dataset and dataloader
cal_dataset = ImageFolderDataset(IMAGENET_PATH, transform=transform)
cal_dataloader = DataLoader(
    cal_dataset,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=0,
    pin_memory=False,
)


print("--- Mapping of Index to Filename (First 10 Files) ---")
for i in range(len(cal_dataset)):
    # We don't need the tensor here, just the filename from the internal list
    filename = cal_dataset.image_files[i]
    print(f"Index {i}: {filename}")

--- Mapping of Index to Filename (First 10 Files) ---
Index 0: frame0.jpg
Index 1: frame1.jpg
Index 2: frame10.jpg
Index 3: frame11.jpg
Index 4: frame12.jpg
Index 5: frame13.jpg
Index 6: frame14.jpg
Index 7: frame15.jpg
Index 8: frame16.jpg
Index 9: frame17.jpg
Index 10: frame18.jpg
Index 11: frame19.jpg
Index 12: frame2.jpg
Index 13: frame20.jpg
Index 14: frame21.jpg
Index 15: frame22.jpg
Index 16: frame23.jpg
Index 17: frame24.jpg
Index 18: frame25.jpg
Index 19: frame26.jpg
Index 20: frame27.jpg
Index 21: frame28.jpg
Index 22: frame29.jpg
Index 23: frame3.jpg
Index 24: frame30.jpg
Index 25: frame31.jpg
Index 26: frame32.jpg
Index 27: frame33.jpg
Index 28: frame34.jpg
Index 29: frame35.jpg
Index 30: frame36.jpg
Index 31: frame37.jpg
Index 32: frame38.jpg
Index 33: frame39.jpg
Index 34: frame4.jpg
Index 35: frame40.jpg
Index 36: frame41.jpg
Index 37: frame42.jpg
Index 38: frame43.jpg
Index 39: frame44.jpg
Index 40: frame45.jpg
Index 41: frame49.jpg
Index 42: frame5.jpg
Index 43: frame5

## Create .espdl file

In [6]:
ONNX_YOLO_PATH: Final[Path] = Path('./coco_detect/generate_onnx/last.onnx') # Path to your ONNX model
ESPDL_YOLO_PATH: Final[Path] = Path('coco_detect/models/yolo11n.espdl') # Path to save the ESPDL model
TARGET_SOC: Final[str] = 'esp32s3'
NUM_OF_BITS: Final[int] = 8


x = cal_dataset[0]
if isinstance(x, (tuple, list)):
    x = x[0]
x = x.unsqueeze(0)

def collate_fn(batch: torch.Tensor) -> torch.Tensor:
    return batch.to(DEVICE)

# make use of ESP-PPQ to quantize the ONNX computation graph, optimize it for ESP32-S3 SoC, and convert it to .espdl
quantized_model = espdl_quantize_onnx(
    onnx_import_file=ONNX_YOLO_PATH.as_posix(),
    espdl_export_file=ESPDL_YOLO_PATH.as_posix(),
    calib_dataloader=cal_dataloader,
    calib_steps=8,
    input_shape=x.shape,
    inputs=None,
    target=TARGET_SOC,
    num_of_bits=NUM_OF_BITS,
    collate_fn=collate_fn,
    dispatching_override=None,
    device=DEVICE,
    error_report=True,
    skip_export=False,
    export_test_values=True,
    verbose=1,
)

[21:48:42] PPQ Quantization Fusion Pass Running ...       Finished.
[21:48:42] PPQ Quantize Simplify Pass Running ...         Finished.
[21:48:42] PPQ Parameter Quantization Pass Running ...    Finished.
[21:48:42] PPQ Runtime Calibration Pass Running ...       

Calibration Progress(Phase 1): 100%|██████████| 8/8 [00:00<00:00, 11.72it/s]
Calibration Progress(Phase 2): 100%|██████████| 8/8 [00:01<00:00,  6.42it/s]


Finished.
[21:48:44] PPQ Quantization Alignment Pass Running ...    Finished.
[21:48:44] PPQ Passive Parameter Quantization Running ... Finished.
--------- Network Snapshot ---------
Num of Op:                    [296]
Num of Quantized Op:          [296]
Num of Variable:              [499]
Num of Quantized Var:         [499]
------- Quantization Snapshot ------
Num of Quant Config:          [915]
ACTIVATED:                    [334]
OVERLAPPED:                   [369]
PASSIVE:                      [194]
FP32:                         [18]
Network Quantization Finished.


Analysing Graphwise Quantization Error(Phrase 1):: 100%|██████████| 8/8 [00:01<00:00,  6.58it/s]
Analysing Graphwise Quantization Error(Phrase 2):: 100%|██████████| 8/8 [00:02<00:00,  3.68it/s]


Layer                                        | NOISE:SIGNAL POWER RATIO 
/model.10/m/m.0/ffn/ffn.1/conv/Conv:         | ████████████████████ | 17.327%
/model.10/m/m.0/attn/proj/conv/Conv:         | ████████████████████ | 16.930%
/model.22/m.0/cv2/conv/Conv:                 | ███████████████      | 13.361%
/model.23/cv3.0/cv3.0.1/cv3.0.1.0/conv/Conv: | ███████████████      | 13.015%
/model.23/cv3.0/cv3.0.1/cv3.0.1.1/conv/Conv: | ███████████████      | 12.839%
/model.23/cv3.2/cv3.2.1/cv3.2.1.1/conv/Conv: | ██████████████       | 12.043%
/model.10/m/m.0/attn/MatMul_1:               | █████████████        | 10.916%
/model.23/cv3.2/cv3.2.1/cv3.2.1.0/conv/Conv: | ████████████         | 10.662%
/model.10/m/m.0/attn/MatMul:                 | ████████████         | 10.419%
/model.23/cv3.1/cv3.1.1/cv3.1.1.1/conv/Conv: | ████████████         | 10.208%
/model.23/cv3.1/cv3.1.1/cv3.1.1.0/conv/Conv: | ████████████         | 10.111%
/model.23/cv2.0/cv2.0.1/conv/Conv:           | ███████████          |

Analysing Layerwise quantization error:: 100%|██████████| 89/89 [01:44<00:00,  1.17s/it]


Layer                                        | NOISE:SIGNAL POWER RATIO 
/model.9/cv2/conv/Conv:                      | ████████████████████ | 0.117%
/model.4/cv1/conv/Conv:                      | ███████████          | 0.064%
/model.4/cv2/conv/Conv:                      | ███████████          | 0.063%
/model.23/cv3.0/cv3.0.2/Conv:                | ███████████          | 0.063%
/model.1/conv/Conv:                          | █████████            | 0.052%
/model.0/conv/Conv:                          | █████████            | 0.050%
/model.2/cv1/conv/Conv:                      | ██████               | 0.036%
/model.23/cv3.1/cv3.1.2/Conv:                | ████                 | 0.024%
/model.8/cv2/conv/Conv:                      | ███                  | 0.017%
/model.3/conv/Conv:                          | ███                  | 0.016%
/model.16/m.0/cv2/conv/Conv:                 | ███                  | 0.015%
/model.16/cv2/conv/Conv:                     | ██                   | 0.013%
/mo

# Image Preprocessing for inference

# Pre and Post-processing for ESP-DL

In [7]:
import cv2
import torch
import numpy as np
import torchvision
from ppq import TorchExecutor



def preprocess_for_esp_dl(image_path, model_input_shape, mean, std):
    """ Replicates the preprocessing logic from the ESP-DL C++ code. """
    img_bgr = cv2.imread(image_path)
    assert img_bgr is not None, f"Image not found at {image_path}"
    original_h, original_w = img_bgr.shape[:2]
    img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
    target_h, target_w = model_input_shape
    resized_img = cv2.resize(img_rgb, (target_w, target_h), interpolation=cv2.INTER_NEAREST)
    img_tensor = torch.from_numpy(resized_img).float()
    mean_tensor = torch.tensor(mean, dtype=torch.float32).reshape(1, 1, 3)
    std_tensor = torch.tensor(std, dtype=torch.float32).reshape(1, 1, 3)
    normalized_tensor = (img_tensor - mean_tensor) / std_tensor
    input_tensor = normalized_tensor.permute(2, 0, 1).unsqueeze(0)
    resize_scale_x = target_w / original_w
    resize_scale_y = target_h / original_h
    return input_tensor, (original_w, original_h), (resize_scale_x, resize_scale_y)


def postprocess_for_esp_dl(outputs, original_shape, resize_scales,
                           conf_threshold, iou_threshold, max_detections):
    """ Replicates the post-processing logic from the ESP-DL C++ code. """
    strides = [8, 16, 32]
    reg_max = 16
    bins = torch.arange(reg_max, device=outputs[0].device, dtype=torch.float32)
    all_boxes, all_scores, all_class_ids = [], [], []

    box_preds = [outputs[0], outputs[2], outputs[4]]
    cls_preds = [outputs[1], outputs[3], outputs[5]]

    for i in range(len(strides)):
        box_pred, cls_pred = box_preds[i], cls_preds[i]
        stride = strides[i]
        height, width = cls_pred.shape[2], cls_pred.shape[3]

        cls_pred = cls_pred.permute(0, 2, 3, 1).reshape(1, -1, cls_pred.shape[1])[0]
        box_pred = box_pred.permute(0, 2, 3, 1).reshape(1, -1, 4 * reg_max)[0]

        scores, class_ids = torch.sigmoid(cls_pred).max(1)
        confident_mask = scores > conf_threshold
        if not confident_mask.any():
            continue

        confident_boxes = box_pred[confident_mask]
        confident_scores = scores[confident_mask]
        confident_class_ids = class_ids[confident_mask]

        grid_y, grid_x = torch.meshgrid(torch.arange(height, device=box_pred.device, dtype=torch.float32),
                                        torch.arange(width, device=box_pred.device, dtype=torch.float32),
                                        indexing='ij')
        grid_coords_centered = torch.stack((grid_x.flatten() + 0.5, grid_y.flatten() + 0.5), dim=1)
        confident_grid_coords = grid_coords_centered[confident_mask]

        box_pred_probs = torch.softmax(confident_boxes.reshape(-1, reg_max), dim=-1)
        box_reg_dist = torch.matmul(box_pred_probs, bins).reshape(-1, 4)
        d_left, d_top, d_right, d_bottom = box_reg_dist.chunk(4, dim=1)

        x1 = (confident_grid_coords[:, 0] - d_left.squeeze(-1)) * stride
        y1 = (confident_grid_coords[:, 1] - d_top.squeeze(-1)) * stride
        x2 = (confident_grid_coords[:, 0] + d_right.squeeze(-1)) * stride
        y2 = (confident_grid_coords[:, 1] + d_bottom.squeeze(-1)) * stride
        decoded_boxes = torch.stack((x1, y1, x2, y2), dim=1)

        all_boxes.append(decoded_boxes)
        all_scores.append(confident_scores)
        all_class_ids.append(confident_class_ids)

    if not all_boxes:
        return []

    final_boxes = torch.cat(all_boxes, dim=0)
    final_scores = torch.cat(all_scores, dim=0)
    final_class_ids = torch.cat(all_class_ids, dim=0)

    inv_resize_scale_x = 1.0 / resize_scales[0]
    inv_resize_scale_y = 1.0 / resize_scales[1]
    final_boxes[:, [0, 2]] *= inv_resize_scale_x
    final_boxes[:, [1, 3]] *= inv_resize_scale_y

    orig_w, orig_h = original_shape
    final_boxes[:, 0].clamp_(0, orig_w)
    final_boxes[:, 1].clamp_(0, orig_h)
    final_boxes[:, 2].clamp_(0, orig_w)
    final_boxes[:, 3].clamp_(0, orig_h)

    nms_indices = torchvision.ops.nms(final_boxes, final_scores, iou_threshold)
    if len(nms_indices) > max_detections:
        nms_indices = nms_indices[:max_detections]

    detections = []
    for i in nms_indices:
        box = final_boxes[i].cpu().numpy().astype(int)
        score = final_scores[i].cpu().item()
        class_id = final_class_ids[i].cpu().item()
        detections.append( (class_id, score, box[0], box[1], box[2], box[3]) )

    # Sort by score descending to match the C++ list behavior
    detections.sort(key=lambda x: x[1], reverse=True)

    return detections


# Evaluation of predictions over all images compared to base model (without quantization)

In [8]:
import os
import glob
import torch
import torchvision
import cv2
import numpy as np
import pandas as pd
import ast
from collections import defaultdict
import re
from ppq import TorchExecutor
from pandas.errors import EmptyDataError




print(f"Using device: {DEVICE}")


def load_ground_truth_from_csv(csv_path):
    if not os.path.exists(csv_path):
        return [], []

    try:
        df = pd.read_csv(csv_path)
    except EmptyDataError:
        return [], []
    gt_boxes = []
    gt_class_ids = []

    for _, row in df.iterrows():
        box_dict = ast.literal_eval(row['box'])
        box = [box_dict['x1'], box_dict['y1'], box_dict['x2'], box_dict['y2']]
        gt_boxes.append(box)
        gt_class_ids.append(row['class'])

    return torch.tensor(gt_boxes, dtype=torch.float32), torch.tensor(gt_class_ids, dtype=torch.int64)

def calculate_iou(box1, box2):
    x_left = max(box1[0], box2[0])
    y_top = max(box1[1], box2[1])
    x_right = min(box1[2], box2[2])
    y_bottom = min(box1[3], box2[3])

    if x_right < x_left or y_bottom < y_top:
        return 0.0

    intersection_area = (x_right - x_left) * (y_bottom - y_top)
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union_area = box1_area + box2_area - intersection_area

    iou = intersection_area / union_area
    return iou.item()

def calculate_ap_for_class_across_images(predictions, ground_truths_by_image, total_gt_count, iou_threshold=0.5):
    """
    Calculates AP and also returns the IoU scores of all true positive matches.
    """
    # --- THIS IS THE FIX ---
    if total_gt_count == 0:
        # If there are no ground truths, AP is 1.0 if there were also no predictions,
        # and 0.0 otherwise. In either case, there are no True Positive IoUs.
        # We must return a tuple of (float, list) to match the other return path.
        fp_count = len(predictions)
        ap = 1.0 if not predictions else 0.0
        return ap, [], 0, fp_count, 0
    # --- END OF FIX ---

    # The rest of the function remains exactly the same...
    predictions.sort(key=lambda x: x[2], reverse=True)
    tp = np.zeros(len(predictions))
    fp = np.zeros(len(predictions))
    gt_used_map = {img_idx: [False] * len(boxes) for img_idx, boxes in ground_truths_by_image.items()}
    true_positive_ious = []

    for i, pred in enumerate(predictions):
        img_idx, pred_box, score = pred
        gt_boxes_for_image = ground_truths_by_image.get(img_idx, [])
        best_iou = 0
        best_gt_idx = -1
        for j, gt_box in enumerate(gt_boxes_for_image):
            iou = calculate_iou(torch.tensor(pred_box), torch.tensor(gt_box))
            if iou > best_iou:
                best_iou = iou
                best_gt_idx = j
        if best_iou >= iou_threshold and not gt_used_map[img_idx][best_gt_idx]:
            tp[i] = 1
            gt_used_map[img_idx][best_gt_idx] = True
            true_positive_ious.append(best_iou)
        else:
            fp[i] = 1

    tp_cumsum = np.cumsum(tp)
    fp_cumsum = np.cumsum(fp)
    recalls = tp_cumsum / total_gt_count
    precisions = tp_cumsum / (tp_cumsum + fp_cumsum)
    precisions = np.concatenate(([0.], precisions, [0.]))
    recalls = np.concatenate(([0.], recalls, [1.]))
    for i in range(len(precisions) - 2, -1, -1):
        precisions[i] = max(precisions[i], precisions[i+1])
    ap = 0
    for i in range(len(recalls) - 1):
        ap += (recalls[i+1] - recalls[i]) * precisions[i+1]


    tp_count = int(np.sum(tp))
    fp_count = int(np.sum(fp))
    fn_count = total_gt_count - tp_count
    return ap, true_positive_ious, tp_count, fp_count, fn_count

def save_predictions_to_csv(detections_tensor, class_names_map, output_csv_path):
    columns = ['name', 'class', 'confidence', 'box']

    # If there are no detections, create an empty CSV with the correct headers
    if detections_tensor is None or detections_tensor.numel() == 0:
        df = pd.DataFrame(columns=columns)
        df.to_csv(output_csv_path, index=False)
        return

    # Create a list of dictionaries, where each dict is a row
    rows = []
    for det in detections_tensor:
        x1, y1, x2, y2, score, class_id_tensor = det

        class_id = int(class_id_tensor.item())
        confidence = score.item()

        # Get the human-readable class name, provide a default if not found
        class_name = class_names_map.get(class_id, f"class_{class_id}")

        # Format the bounding box into the required string format
        box_string = f"{{'x1': {x1.item()}, 'y1': {y1.item()}, 'x2': {x2.item()}, 'y2': {y2.item()}}}"

        rows.append({
            'name': class_name,
            'class': class_id,
            'confidence': confidence,
            'box': box_string
        })

    # Create and save the DataFrame
    df = pd.DataFrame(rows, columns=columns)
    df.to_csv(output_csv_path, index=False)


# --- NEW: Function to evaluate pre-computed CSV predictions ---
def evaluate_csv_predictions(image_paths, gt_dir, prediction_dir, class_names_map):
    """
    Evaluates a model's performance based on pre-existing prediction CSV files.

    Args:
        image_paths: A list of paths to the input images.
        gt_dir: The directory containing the ground truth CSV files.
        prediction_dir: The directory containing the pre-computed prediction CSV files.
        class_names_map: A dictionary mapping class IDs to names.

    Returns:
        A dictionary containing the evaluation results.
    """
    all_predictions = defaultdict(list)
    all_ground_truths = defaultdict(list)

    print(f"\nEvaluating pre-computed CSVs from '{prediction_dir}'...")

    for i, image_path in enumerate(image_paths):
        image_name = os.path.basename(image_path)
        image_base_name = os.path.splitext(image_name)[0]

        # 1. Load Ground Truth
        gt_csv_name = f"output_{image_base_name}.csv"
        gt_csv_path = os.path.join(gt_dir, gt_csv_name)
        gt_boxes, gt_classes = load_ground_truth_from_csv(gt_csv_path)

        for box, cls_id in zip(gt_boxes, gt_classes):
            all_ground_truths[cls_id.item()].append([i, box.tolist()])

        # 2. Load Pre-computed Predictions
        pred_csv_name = f"output_{image_base_name}.csv"
        pred_csv_path = os.path.join(prediction_dir, pred_csv_name)
        pred_boxes, pred_classes, pred_scores = load_predictions_from_csv(pred_csv_path)

        for box, cls_id, score in zip(pred_boxes, pred_classes, pred_scores):
            all_predictions[cls_id.item()].append([i, box.tolist(), score.item()])

    # --- Calculate metrics (same logic as before) ---
    return calculate_metrics_from_collected_data(all_predictions, all_ground_truths)


# --- NEW: Function to load predictions from your specific CSV format ---
def load_predictions_from_csv(csv_path):
    """Parses prediction CSVs, returning boxes, classes, and scores."""
    if not os.path.exists(csv_path):
        return [], [], []
    try:
        df = pd.read_csv(csv_path)
    except EmptyDataError:
        return [], [], []

    pred_boxes = []
    pred_class_ids = []
    pred_scores = []

    for _, row in df.iterrows():
        box_dict = ast.literal_eval(row['box'])
        box = [box_dict['x1'], box_dict['y1'], box_dict['x2'], box_dict['y2']]
        pred_boxes.append(box)
        pred_class_ids.append(row['class'])
        pred_scores.append(row['confidence'])

    return torch.tensor(pred_boxes), torch.tensor(pred_class_ids), torch.tensor(pred_scores)


# --- NEW: Helper function to avoid code duplication in metric calculation ---
def calculate_metrics_from_collected_data(all_predictions, all_ground_truths):
    """Calculates AP, mAP, and Avg IoU from collected prediction and GT data."""
    results = {
        "ap_per_class": {},
        "avg_iou_per_class": {},
        "tps_per_class": {},
        "fps_per_class": {},
        "fns_per_class": {}
    }
    all_class_ids = sorted(list(set(all_predictions.keys()) | set(all_ground_truths.keys())))

    for class_id in all_class_ids:
        class_preds = all_predictions[class_id]
        class_gts_by_image = defaultdict(list)
        total_gt_count = 0
        for img_idx, box_list in all_ground_truths.get(class_id, []):
            class_gts_by_image[img_idx].append(box_list)
            total_gt_count += 1

        ap, tp_ious, tp_count, fp_count, fn_count = calculate_ap_for_class_across_images(
            class_preds, class_gts_by_image, total_gt_count, iou_threshold=0.5
        )
        results["ap_per_class"][class_id] = ap
        results["avg_iou_per_class"][class_id] = np.mean(tp_ious) if tp_ious else 0.0
        results["tps_per_class"][class_id] = tp_count
        results["fps_per_class"][class_id] = fp_count
        results["fns_per_class"][class_id] = fn_count

    results["mAP"] = np.mean(list(results["ap_per_class"].values())) if results["ap_per_class"] else 0.0

    return results


# --- Main controller script ---
def main():
    # --- Global Configuration ---
    DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
    print(f"Using device: {DEVICE}")

    IMAGE_DIR = "./calib_images_compressed"
    GT_DIR = "./new_ground_truth_labels"
    ORIGINAL_MODEL_PRED_DIR = "./preds_calib_images"
    QUANTIZED_MODEL_PRED_DIR = "./preds_quantized_model"
    CLASS_NAMES = {0: 'bicycle', 1: 'saddle'} # Update this!
    MODEL_MEAN = [0, 0, 0]
    MODEL_STD = [255, 255, 255]
    MODEL_INPUT_SHAPE = (640, 640)
    CONF_THRESHOLD = 0.10
    IOU_THRESHOLD = 0.70
    MAX_DETECTIONS = 20

    os.makedirs(QUANTIZED_MODEL_PRED_DIR, exist_ok=True)
    os.makedirs(ORIGINAL_MODEL_PRED_DIR, exist_ok=True) # Ensure it exists
    os.makedirs(GT_DIR, exist_ok=True)

    # A. Evaluate Pre-Computed "Original Model" Predictions
    image_paths = glob.glob(os.path.join(IMAGE_DIR, "*.jpg"))
    results_original = evaluate_csv_predictions(
        image_paths=image_paths,
        gt_dir=GT_DIR,
        prediction_dir=ORIGINAL_MODEL_PRED_DIR,
        class_names_map=CLASS_NAMES
    )

    # B. Evaluate Quantized Model (Live Inference)
    print("\n--- Evaluating QUANTIZED ESP-DL Model (Live Inference) ---")

    # Collect predictions from the live model
    live_predictions = defaultdict(list)
    live_ground_truths = defaultdict(list)

    executor = TorchExecutor(graph=quantized_model, device=DEVICE)

    for i, image_path in enumerate(image_paths):
        image_name = os.path.basename(image_path)
        image_base_name = os.path.splitext(image_name)[0]
        gt_csv_path = os.path.join(GT_DIR, f"output_{image_base_name}.csv")
        gt_boxes, gt_classes = load_ground_truth_from_csv(gt_csv_path)
        for box, cls_id in zip(gt_boxes, gt_classes):
            live_ground_truths[cls_id.item()].append([i, box.tolist()])


        input_tensor, orig_shape, scales = preprocess_for_esp_dl(image_path, MODEL_INPUT_SHAPE, MODEL_MEAN, MODEL_STD)
        input_tensor = input_tensor.to(DEVICE)



        outputs = executor(input_tensor)

        final_results = postprocess_for_esp_dl(
            outputs,
            orig_shape,
            scales,
            conf_threshold=CONF_THRESHOLD,
            iou_threshold=IOU_THRESHOLD,
            max_detections=MAX_DETECTIONS
        )

        detections_for_saving = []
        if final_results:
            for detection_tuple in final_results:
                class_id, score, x1, y1, x2, y2 = detection_tuple

                # 1. Collect for metric calculation
                box = [x1, y1, x2, y2]
                live_predictions[int(class_id)].append([i, box, score])

                # 2. Collect for saving, ensuring order is [x1, y1, x2, y2, score, class_id]
                detections_for_saving.append([x1, y1, x2, y2, score, class_id])

        # Convert to tensor for the save function
        detections_tensor = torch.tensor(detections_for_saving, dtype=torch.float32)

        # Define output path and save
        quantized_pred_csv_path = os.path.join(QUANTIZED_MODEL_PRED_DIR, f"output_{image_base_name}.csv")
        save_predictions_to_csv(detections_tensor, CLASS_NAMES, quantized_pred_csv_path)
        print(f"  - Saved {detections_tensor.shape[0]} predictions to '{quantized_pred_csv_path}'")

    results_quantized = calculate_metrics_from_collected_data(live_predictions, live_ground_truths)


    # --- 3. Present Comparison Table ---
    print("\n\n--- COMPREHENSIVE EVALUATION RESULTS ---\n")
    print("Note: True Negatives (TN) are not reported as they are ill-defined for object detection tasks.\n")
    header = f"{'CLASS':<15} | {'METRIC':<18} | {'ORIGINAL MODEL':<16} | {'QUANTIZED MODEL':<17} | {'CHANGE':<10}"
    print(header)
    print("-" * len(header))

    all_class_ids = sorted(list(set(results_original["ap_per_class"].keys()) | set(results_quantized["ap_per_class"].keys())))

    for class_id in all_class_ids:
        class_name = CLASS_NAMES.get(class_id, f"class_{class_id}")
        print(f"{class_name:<15} | {'-'*18} | {'-'*16} | {'-'*17} | {'-'*10}")

        # AP
        fp32_ap = results_original["ap_per_class"].get(class_id, 0)
        quant_ap = results_quantized["ap_per_class"].get(class_id, 0)
        ap_change = quant_ap - fp32_ap
        print(f"{'':<15} | {'AP @50':<18} | {fp32_ap:<16.4f} | {quant_ap:<17.4f} | {ap_change:<+10.4f}")

        # Avg IoU
        fp32_iou = results_original["avg_iou_per_class"].get(class_id, 0)
        quant_iou = results_quantized["avg_iou_per_class"].get(class_id, 0)
        iou_change = quant_iou - fp32_iou
        print(f"{'':<15} | {'Avg IoU (TPs)':<18} | {fp32_iou:<16.4f} | {quant_iou:<17.4f} | {iou_change:<+10.4f}")

        # TP Count
        fp32_tps = results_original["tps_per_class"].get(class_id, 0)
        quant_tps = results_quantized["tps_per_class"].get(class_id, 0)
        tps_change = quant_tps - fp32_tps
        print(f"{'':<15} | {'True Positives (TP)':<18} | {fp32_tps:<16} | {quant_tps:<17} | {tps_change:<+10}")

        # FP Count
        fp32_fps = results_original["fps_per_class"].get(class_id, 0)
        quant_fps = results_quantized["fps_per_class"].get(class_id, 0)
        fps_change = quant_fps - fp32_fps
        print(f"{'':<15} | {'False Positives (FP)':<18} | {fp32_fps:<16} | {quant_fps:<17} | {fps_change:<+10}")

        # FN Count
        fp32_fns = results_original["fns_per_class"].get(class_id, 0)
        quant_fns = results_quantized["fns_per_class"].get(class_id, 0)
        fns_change = quant_fns - fp32_fns
        print(f"{'':<15} | {'False Negatives (FN)':<18} | {fp32_fns:<16} | {quant_fns:<17} | {fns_change:<+10}")

    print("-" * len(header))

    # Overall Metrics
    print(f"{'OVERALL':<15} | {'-'*18} | {'-'*16} | {'-'*17} | {'-'*10}")
    fp32_map = results_original.get("mAP", 0)
    quant_map = results_quantized.get("mAP", 0)
    map_change = quant_map - fp32_map
    print(f"{'':<15} | {'mAP @50':<18} | {fp32_map:<16.4f} | {quant_map:<17.4f} | {map_change:<+10.4f}")

    # Total TP/FP/FN
    fp32_total_tps = sum(results_original.get("tps_per_class", {}).values())
    quant_total_tps = sum(results_quantized.get("tps_per_class", {}).values())
    total_tps_change = quant_total_tps - fp32_total_tps
    print(f"{'':<15} | {'Total TPs':<18} | {fp32_total_tps:<16} | {quant_total_tps:<17} | {total_tps_change:<+10}")

    fp32_total_fps = sum(results_original.get("fps_per_class", {}).values())
    quant_total_fps = sum(results_quantized.get("fps_per_class", {}).values())
    total_fps_change = quant_total_fps - fp32_total_fps
    print(f"{'':<15} | {'Total FPs':<18} | {fp32_total_fps:<16} | {quant_total_fps:<17} | {total_fps_change:<+10}")

    fp32_total_fns = sum(results_original.get("fns_per_class", {}).values())
    quant_total_fns = sum(results_quantized.get("fns_per_class", {}).values())
    total_fns_change = quant_total_fns - fp32_total_fns
    print(f"{'':<15} | {'Total FNs':<18} | {fp32_total_fns:<16} | {quant_total_fns:<17} | {total_fns_change:<+10}")
    print("-" * len(header))
    print("\n")

if __name__ == "__main__":
    main()

Using device: cpu
Using device: cpu

Evaluating pre-computed CSVs from './preds_calib_images'...

--- Evaluating QUANTIZED ESP-DL Model (Live Inference) ---
  - Saved 15 predictions to './preds_quantized_model/output_frame85.csv'
  - Saved 8 predictions to './preds_quantized_model/output_frame52.csv'
  - Saved 9 predictions to './preds_quantized_model/output_frame53.csv'
  - Saved 4 predictions to './preds_quantized_model/output_frame84.csv'
  - Saved 14 predictions to './preds_quantized_model/output_frame86.csv'
  - Saved 10 predictions to './preds_quantized_model/output_frame51.csv'
  - Saved 8 predictions to './preds_quantized_model/output_frame45.csv'
  - Saved 11 predictions to './preds_quantized_model/output_frame44.csv'
  - Saved 13 predictions to './preds_quantized_model/output_frame50.csv'
  - Saved 7 predictions to './preds_quantized_model/output_frame87.csv'
  - Saved 11 predictions to './preds_quantized_model/output_frame93.csv'
  - Saved 10 predictions to './preds_quantize

# Helper Functions

## Compress images to 640x640

In [34]:
import os
from PIL import Image

# Input and output directories
input_dir = "./calib_images"
output_dir = "./calib_images_compressed"

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

# Process all JPG files in the input directory
for filename in os.listdir(input_dir):
    if filename.lower().endswith('.jpg'):
        input_path = os.path.join(input_dir, filename)
        output_path = os.path.join(output_dir, filename)

        # Open, resize, and save the image
        with Image.open(input_path) as img:
            img_resized = img.resize((640, 640), Image.Resampling.LANCZOS)
            img_resized.save(output_path, quality=95)
            # Optional: use optimize flag for better compression
            # img_resized.save(output_path, optimize=True, quality=95)

print("Compression complete.")


Compression complete.


## Visualize predictions on image

In [18]:
import cv2

# Load image
image_path = "./yolo11_detect/main/bikes.jpg"
image = cv2.imread(image_path)

predictions = [
    (0, 0.6926, 135, 363, 193, 462),
    (0, 0.6225, 84,  356, 140, 454),
    (0, 0.5000, 165, 352, 232, 489),
    (0, 0.4533, 128, 362, 175, 460),
    (0, 0.4378, 42,  348, 82,  426),
    (0, 0.3923, 103, 361, 160, 458),
    (0, 0.2814, 407, 364, 468, 490),
    (0, 0.2689, 67,  354, 133, 444),
    (0, 0.2568, 517, 185, 572, 296),
    #(0, 0.2337, 112, 362, 172, 460), # This prediction is not on the esp
    (0, 0.1480, 57,  351, 111, 434),
    (0, 0.1480, 411, 357, 478, 479),
    #(0, 0.1403, 21,  347, 62,  415), # This prediction is not on the esp
    (0, 0.1128, 15,  345, 53,  417),

]
'''

predictions = [
    (0, 0.679179, 135, 362, 193, 462),
    (0, 0.622459, 84,  356, 138, 450),
    (0, 0.484380, 43,  348, 83,  426),
    (0, 0.468791, 165, 352, 232, 491),
    (0, 0.468791, 129, 362, 180, 460),
    (0, 0.348645, 103, 360, 160, 457),
    (0, 0.334589, 517, 185, 570, 296),
    (0, 0.294215, 67,  354, 131, 442),
    (0, 0.294215, 406, 365, 467, 492),
    (0, 0.182426, 58,  350, 112, 434),
    (0, 0.156105, 14,  346, 53,  416),
    (0, 0.156105, 410, 357, 478, 480),
]
'''
# Define colors for each class
colors = {
    0: (255, 0, 0),       # Blue for bicycles (BGR)
    1: (255, 255, 51),   # Light blue for saddles
}

# Class names
class_names = {
    0: "Bicycle",
    1: "Saddle",
}

# Loop through predictions and draw boxes
for category, score, x1, y1, x2, y2 in predictions:
    label = f"{class_names[category]}: {score:.2f}"
    color = colors[category]
    cv2.rectangle(image, (x1, y1), (x2, y2), color, 2)
    cv2.putText(image, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

# Save the image
cv2.imwrite("output_QAT_pc.jpg", image)


True

# Transform original labels to compressed labels

In [32]:
import os
import glob
import cv2
import numpy as np
import pandas as pd

# --- Helper Functions (from previous answers) ---

def yolo_to_xyxy(yolo_boxes, original_shape):
    """Converts YOLO format boxes to [x1, y1, x2, y2] format."""
    orig_h, orig_w = original_shape
    yolo_boxes[:, 0] *= orig_w
    yolo_boxes[:, 1] *= orig_h
    yolo_boxes[:, 2] *= orig_w
    yolo_boxes[:, 3] *= orig_h
    xyxy_boxes = np.zeros_like(yolo_boxes)
    xyxy_boxes[:, 0] = yolo_boxes[:, 0] - yolo_boxes[:, 2] / 2
    xyxy_boxes[:, 1] = yolo_boxes[:, 1] - yolo_boxes[:, 3] / 2
    xyxy_boxes[:, 2] = yolo_boxes[:, 0] + yolo_boxes[:, 2] / 2
    xyxy_boxes[:, 3] = yolo_boxes[:, 1] + yolo_boxes[:, 3] / 2
    return xyxy_boxes

def transform_boxes_to_letterbox(boxes_xyxy, original_shape, new_shape=(640, 640)):
    """Transforms [x1, y1, x2, y2] boxes to letterboxed coordinates."""
    orig_h, orig_w = original_shape
    new_h, new_w = new_shape
    r = min(new_h / orig_h, new_w / orig_w)
    new_unpad_w, new_unpad_h = int(round(orig_w * r)), int(round(orig_h * r))
    dw = (new_w - new_unpad_w) / 2
    dh = (new_h - new_unpad_h) / 2
    boxes = np.array(boxes_xyxy, dtype=np.float32)
    boxes[:, [0, 2]] = boxes[:, [0, 2]] * r + dw
    boxes[:, [1, 3]] = boxes[:, [1, 3]] * r + dh
    return boxes

def save_labels_to_csv(boxes, class_ids, class_names_map, output_csv_path):
    """Saves transformed labels to a CSV file."""
    columns = ['name', 'class', 'confidence', 'box']
    rows = []

    # Note: Ground truth has no 'confidence', so we can use a placeholder like 1.0
    for i, box in enumerate(boxes):
        class_id = class_ids[i]
        class_name = class_names_map.get(class_id, f"class_{class_id}")
        box_string = f"{{'x1': {box[0]}, 'y1': {box[1]}, 'x2': {box[2]}, 'y2': {box[3]}}}"
        rows.append({
            'name': class_name,
            'class': class_id,
            'confidence': 1.0,
            'box': box_string
        })

    df = pd.DataFrame(rows, columns=columns)
    df.to_csv(output_csv_path, index=False)

# --- Main Conversion Script ---

def convert_labels():
    # --- Configuration ---
    # Path to original, high-resolution images
    ORIGINAL_IMAGE_DIR = "/Users/christophknaden/git/leezencounter/model-training/datasets/security_camera/YOLO/images"
    # Path to original YOLO .txt labels
    ORIGINAL_LABEL_DIR = "/Users/christophknaden/git/leezencounter/model-training/datasets/security_camera/YOLO/labels"
    # Path where you want to save the new ground truth CSVs
    NEW_CSV_DIR = "./new_ground_truth_labels"

    # !!! IMPORTANT: Update this to match your classes.txt or data.yaml !!!
    CLASS_NAMES = {
        0: 'bicycle',
        1: 'saddle'
        # Add all your classes here
    }

    # Create the output directory if it doesn't exist
    os.makedirs(NEW_CSV_DIR, exist_ok=True)
    print(f"New CSV labels will be saved to: {os.path.abspath(NEW_CSV_DIR)}")

    # Find all original label files
    label_files = glob.glob(os.path.join(ORIGINAL_LABEL_DIR, "*.txt"))

    if not label_files:
        print(f"Error: No label files found in '{ORIGINAL_LABEL_DIR}'. Please check the path.")
        return

    print(f"Found {len(label_files)} label files to process.")

    for label_path in label_files:
        # --- 1. Load Original Data ---

        # Load the YOLO .txt file
        try:
            data = np.loadtxt(label_path, ndmin=2)
        except Exception as e:
            print(f"Warning: Could not read or empty file '{label_path}'. Skipping. Error: {e}")
            continue

        class_ids = data[:, 0].astype(int)
        yolo_boxes = data[:, 1:]

        # Find the corresponding original image to get its dimensions
        base_name = os.path.splitext(os.path.basename(label_path))[0]
        # Assume images can be .jpg, .png, etc.
        image_path_pattern = os.path.join(ORIGINAL_IMAGE_DIR, f"{base_name}.*")
        image_paths = glob.glob(image_path_pattern)

        if not image_paths:
            print(f"Warning: No matching image found for label '{label_path}'. Skipping.")
            continue

        original_image = cv2.imread(image_paths[0])
        original_shape = original_image.shape[:2] # (height, width)

        # --- 2. Perform Transformations ---

        # a. De-normalize YOLO boxes to pixel coordinates [x1, y1, x2, y2]
        pixel_boxes = yolo_to_xyxy(yolo_boxes, original_shape)

        # b. Transform pixel boxes to the 640x640 letterboxed space
        transformed_boxes = transform_boxes_to_letterbox(pixel_boxes, original_shape)

        # --- 3. Save the New CSV File ---

        # The output filename should match the format your evaluation script expects
        output_csv_name = f"output_{base_name}.csv"
        output_csv_path = os.path.join(NEW_CSV_DIR, output_csv_name)

        save_labels_to_csv(transformed_boxes, class_ids, CLASS_NAMES, output_csv_path)

        print(f"Successfully converted '{os.path.basename(label_path)}' -> '{output_csv_name}'")

# Run the conversion process
if __name__ == "__main__":
    convert_labels()

New CSV labels will be saved to: /Users/christophknaden/git/leezencounter/model-deployment/new_ground_truth_labels
Found 71 label files to process.
Successfully converted 'frame30.txt' -> 'output_frame30.csv'
Successfully converted 'frame24.txt' -> 'output_frame24.csv'
Successfully converted 'frame18.txt' -> 'output_frame18.csv'
Successfully converted 'frame19.txt' -> 'output_frame19.csv'
Successfully converted 'frame25.txt' -> 'output_frame25.csv'
Successfully converted 'frame31.txt' -> 'output_frame31.csv'
Successfully converted 'frame27.txt' -> 'output_frame27.csv'
Successfully converted 'frame33.txt' -> 'output_frame33.csv'
Successfully converted 'frame32.txt' -> 'output_frame32.csv'
Successfully converted 'frame26.txt' -> 'output_frame26.csv'
Successfully converted 'frame22.txt' -> 'output_frame22.csv'
Successfully converted 'frame36.txt' -> 'output_frame36.csv'
Successfully converted 'frame37.txt' -> 'output_frame37.csv'
Successfully converted 'frame23.txt' -> 'output_frame23.csv