In [None]:
import os


data_root = "/data/shared/rccars/"
mmdet_root = "../third_party/avstack-core/third_party/mmdetection"

### Analyze the performance of the models vs epoch

In [None]:
from avapi.rccars import RcCarsScenesManager

scene_manager = RcCarsScenesManager(data_dir="/data/shared/rccars")
print(f"Available scenes: {scene_manager.scenes}")

In [None]:
from avapi.visualize.snapshot import show_image_with_boxes


frame = 20
scene_dataset = scene_manager.get_scene_dataset_by_name("oneclass-detection-2025", "train")
img = scene_dataset.get_image(frame=frame)
boxes = scene_dataset.get_boxes(frame=frame, sensor="camera")
show_image_with_boxes(img=img, boxes=boxes, inline=True, show=True)

In [None]:
import numpy as np
from avstack.modules.assignment import build_A_from_iou, gnn_single_frame_assign
from avstack.modules.perception.object2dfv import MMDetObjectDetector2D


def compute_metrics(objs_det, objs_tru, do_print=False):
    # compute metrics
    # -- perform assignment
    A_assign = build_A_from_iou(objs_det, objs_tru)
    assigns = gnn_single_frame_assign(A_assign)

    # -- report fp/rn rate
    n_fp = len(assigns.unassigned_rows)
    n_fn = len(assigns.unassigned_cols)
    if do_print:
        print(f"{assigns.nrow} detected\n---------------------")
        print(f"{assigns.nrow - n_fp} assignment")
        if assigns.nrow > 0:
            print(f"{n_fp} false positives ({100*n_fp/assigns.nrow:.2f}% of detections)")
        else:
            print("No detections, so no FP rate")
        print(f"{n_fn} false negatives ({100*n_fn/assigns.ncol:.2f}% of truths)")

    # -- report average IoU of matches
    ious = [objs_det[a[0]].IoU(objs_tru[a[1]]) for a in assigns.assignment_tuples]
    if do_print:
        print(f"Matches had IoU of: [{', '.join(map(str, ious))}] each")
        print(f"Mean IoU: {np.mean(ious):.2f}")

    return assigns, n_fp, n_fn, ious



def load_model(epoch: int, threshold: float):
    # set up this model
    detector = MMDetObjectDetector2D(
        model="rtmdet",
        dataset="rccars-oneclass",
        deploy=False,
        threshold=threshold,
        gpu=0,
        epoch=epoch,
    )
    return detector

In [None]:

# the set of available epochs is in:
# /home/cpsl-interns/Documents/cpsl-label-studio/model-training/third_party/avstack-core/third_party/mmdetection/work_dirs/rtmdet_m_8xb32-300e_rccars-oneclass
# also in 
# /data/cpsl-interns/models/mmdet/work_dirs/rtmdet_m_8xb32-300e_rccars-oneclass
epoch = 10  # sweep this from 1 - 100
threshold = 0.1  # between [0, 1]. sweep this from 0.1 to 0.9
detector = load_model(epoch=epoch, threshold=threshold)

In [None]:
# test the model on some examples from train and val
frame = 0
img = scene_dataset.get_image(frame=frame)
objs_det = detector(img)

# metrics
objs_tru = scene_dataset.get_boxes(frame=frame, sensor="camera")
assigns, n_fp, n_fn, ious = compute_metrics(objs_det, objs_tru)

In [None]:
# visualize the detected
print("DETECTED")
det_colors = [
    "green" if assigns.has_assign(row=i) else "red"
    for i in range(len(objs_det))
]
show_image_with_boxes(
    img=img,
    boxes=objs_det,
    box_colors=det_colors,
    inline=True,
    show=True,
)

# visualize the truth
print("GROUND TRUTH")
show_image_with_boxes(
    img=img,
    boxes=objs_tru,
    box_colors="white",
    inline=True,
    show=True,
)

In [None]:
from tqdm import tqdm


# do the training and validation sets
train_dataset = scene_manager.get_scene_dataset_by_name("oneclass-detection-2025", "train")
val_dataset = scene_manager.get_scene_dataset_by_name("oneclass-detection-2025", "val")

import glob
import os

# checkpoint_dir = "/home/cpsl-interns/Documents/cpsl-label-studio/model-training/third_party/avstack-core/third_party/mmdetection/work_dirs/rtmdet_m_8xb32-300e_rccars-oneclass/"

checkpoint_dir = "/data/shared/models/mmdet/work_dirs/rtmdet_m_8xb32-300e_rccars-oneclass/"
checkpoint_files = glob.glob(os.path.join(checkpoint_dir, "epoch_*.pth"))
epoch_list = sorted([
    int(os.path.basename(f).split("_")[1].split(".")[0])
    for f in checkpoint_files
])

print("Available epochs:", epoch_list)

# epoch_list = list(range(10, 100)) 
threshold_list = [0.5, 0.7]
results = []
# metrics
for epoch in epoch_list:
    print(f"Trying to load model for epoch {epoch}...")
    for threshold in tqdm(threshold_list):
        try:
            detector = load_model(epoch=epoch, threshold=threshold)
        except FileNotFoundError:
            print(f"Model checkpoint for epoch {epoch} not found. Skipping...")
            continue  # Skip this epoch and move on
        
        for frame in tqdm(train_dataset.frames):
            try:
                img = train_dataset.get_image(frame)
            except FileNotFoundError:
                print(f"Image for frame {frame} not found. Skipping...")
                continue  # Skip missing images
            
            try:
                objs_tru = train_dataset.get_boxes(frame, sensor="camera")
                objs_det = detector(img)
                assigns, n_fp, n_fn, ious = compute_metrics(objs_det, objs_tru)
                results.append({
                    "epoch": epoch,
                    "threshold": threshold,
                    "dataset": "train",
                    "frame": frame,
                    "false_positives": n_fp,
                    "false_negatives": n_fn,
                    "mean_iou": np.mean(ious) if ious else 0,
                    "num_matches": len(ious),
                })
            except Exception as e:
                print(f"Error processing frame {frame}: {e}")

In [None]:
results[-1]

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Load results into DataFrame
df = pd.DataFrame(results)

# Convert epoch to numeric (in case of strings), and drop any invalid rows
df["epoch"] = pd.to_numeric(df["epoch"], errors="coerce")
df["false_positives"] = pd.to_numeric(df["false_positives"], errors="coerce")
df = df.dropna(subset=["epoch", "false_positives"])  # remove rows missing either
df["epoch"] = df["epoch"].astype(int)

# Group and calculate average false positives per epoch
avg_fp_per_epoch = df.groupby("epoch")["false_positives"].mean().reset_index()

# Plot
plt.figure(figsize=(10, 6))
plt.plot(avg_fp_per_epoch["epoch"], avg_fp_per_epoch["false_positives"], marker='o', label="Avg False Positives")
plt.xlabel("Epoch")
plt.ylabel("Average False Positives")
plt.title("Average False Positives vs. Epoch")
plt.grid(True)
plt.legend()
plt.show()

# Debug: show what's going on under the hood
print("Unique epochs found:", df["epoch"].unique())
print("Number of total results:", len(df))
print("Average FP per epoch table:")
print(avg_fp_per_epoch)
print("Counts per epoch:")
print(df.groupby("epoch")["false_positives"].count())

import glob

checkpoints = glob.glob("/home/cpsl-interns/.../epoch_*.pth")
epochs_available = [int(f.split("_")[-1].split(".")[0]) for f in checkpoints]
print(sorted(epochs_available))




