### Import necessary file

In [None]:
import os, time, cv2, torch, numpy as np, pandas as pd
import matplotlib
matplotlib.use("Agg")  # headless-safe
import matplotlib.pyplot as plt
from ultralytics import YOLO

### Data Paths

In [None]:
INPUT_VIDEO = r"D:\sangita-mam\assignment-07\rayhan_video.mp4"
OUTPUT_DIR = "outputs"; os.makedirs(OUTPUT_DIR, exist_ok=True)

### Provide local paths or model names resolvable by Ultralytics

In [None]:
MODEL_WEIGHTS = {
    "yolov8n": "yolov8n.pt",
    "yolo11n": "yolo11n.pt",
    "yolo12n": "yolo12n.pt",
}


### Configuring

In [None]:
CONF_THRESH = 0.25
IOU_THRESH  = 0.45
FONT = cv2.FONT_HERSHEY_SIMPLEX
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

### Ploting setup

In [None]:
# Nicer defaults
plt.rcParams.update({
    "figure.dpi": 140,
    "savefig.dpi": 240,
    "font.size": 10.5,
    "axes.grid": True,
    "grid.linestyle": "--",
    "grid.linewidth": 0.5,
    "axes.spines.top": False,
    "axes.spines.right": False,
})

def _fmt_val(v):
    """Pretty number for labels: int if close to int, else 2 decimals."""
    if isinstance(v, (np.floating, float)):
        if np.isfinite(v) and abs(v - round(v)) < 1e-9:
            return f"{int(round(v))}"
        return f"{v:.2f}"
    if isinstance(v, (np.integer, int)):
        return str(int(v))
    # fallback
    try:
        fv = float(v)
        if abs(fv - round(fv)) < 1e-9: return f"{int(round(fv))}"
        return f"{fv:.2f}"
    except Exception:
        return str(v)

### Load Model

In [None]:
def load_model(weight_path):
    m = YOLO(weight_path)
    try:
        m.to(DEVICE)
    except Exception:
        pass
    return m

def safe_writer(out_stem, fps, size):
    """Prefer AVI+XVID on Windows; fall back to MP4V if XVID fails."""
    fps = float(fps if fps and fps > 0 else 30.0)
    W, H = size
    # Try XVID AVI first
    avi_path = os.path.join(OUTPUT_DIR, f"{out_stem}.avi")
    w = cv2.VideoWriter(avi_path, cv2.VideoWriter_fourcc(*"XVID"), fps, (W, H))
    if w.isOpened():
        return w, avi_path
    # Fallback MP4V
    mp4_path = os.path.join(OUTPUT_DIR, f"{out_stem}.mp4")
    w2 = cv2.VideoWriter(mp4_path, cv2.VideoWriter_fourcc(*"mp4v"), fps, (W, H))
    if w2.isOpened():
        return w2, mp4_path
    return None, None

In [None]:
def annotate(frame, boxes, confs, clss, names):
    img = frame.copy()
    for (x1,y1,x2,y2), c, ci in zip(boxes, confs, clss):
        x1,y1,x2,y2 = map(int, [x1,y1,x2,y2])
        label = f"{names[int(ci)]} {c:.2f}"
        cv2.rectangle(img, (x1,y1), (x2,y2), (0,255,0), 2)
        (tw, th), _ = cv2.getTextSize(label, FONT, 0.5, 1)
        cv2.rectangle(img, (x1, y1-th-6), (x1+tw+4, y1), (0,255,0), -1)
        cv2.putText(img, label, (x1+2, y1-4), FONT, 0.5, (0,0,0), 1, cv2.LINE_AA)
    return img

def predict_on_frame(model, frame):
    res = model.predict(source=frame, conf=CONF_THRESH, iou=IOU_THRESH, verbose=False, device=DEVICE)[0]
    if res.boxes is None or len(res.boxes) == 0:
        return np.empty((0,4)), np.array([]), np.array([], dtype=int)
    boxes = res.boxes.xyxy.detach().cpu().numpy()
    confs = res.boxes.conf.detach().cpu().numpy()
    clss  = res.boxes.cls.detach().cpu().numpy().astype(int)
    return boxes, confs, clss

def robust_read_frame(video_path, target_idx):
    """Decode sequentially to target_idx (more reliable than CAP_PROP_POS_FRAMES)."""
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return None
    idx, frame = 0, None
    while idx <= target_idx:
        ok, f = cap.read()
        if not ok:
            break
        frame = f
        idx += 1
    cap.release()
    return frame if frame is not None and idx-1 == target_idx else None

In [None]:
def choose_common_idx_with_v8(video_path, v8_model):
    """Pick the frame index with the most detections using v8."""
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise FileNotFoundError(video_path)
    best_idx, best_cnt, idx = 0, -1, 0
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        boxes, confs, clss = predict_on_frame(v8_model, frame)
        if len(confs) > best_cnt:
            best_cnt, best_idx = len(confs), idx
        idx += 1
    cap.release()
    # fallback to middle if no frames
    if best_cnt < 0:
        best_idx = max(0, idx // 2)
    return best_idx

In [None]:
def run_model_full_video(model_name, weight, input_path, common_idx):
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise FileNotFoundError(input_path)
    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    W   = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 1280)
    H   = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 720)

    writer, out_path = safe_writer(f"{model_name}_annotated", fps, (W,H))
    if writer is None:
        cap.release()
        raise RuntimeError(f"Failed to open writer for {model_name}")

    model = load_model(weight)
    names = model.model.names if hasattr(model.model, "names") else model.names

    frames, tot_det, t_sum = 0, 0, 0.0
    while True:
        ok, frame = cap.read()
        if not ok:
            break
        t0 = time.time()
        boxes, confs, clss = predict_on_frame(model, frame)
        t_sum += (time.time() - t0)
        tot_det += len(confs); frames += 1
        writer.write(annotate(frame, boxes, confs, clss, names))
    cap.release(); writer.release()

    # ensure at least one frame written, else mark as failed
    if frames == 0:
        try: os.remove(out_path)
        except: pass
        raise RuntimeError(f"No frames processed for {model_name}; output removed")

    # SAME snapshot for ALL models
    snap = robust_read_frame(input_path, common_idx)
    if snap is None:
        raise RuntimeError(f"Failed to read comparison frame idx={common_idx}")
    s_boxes, s_confs, s_clss = predict_on_frame(model, snap)
    snap_anno = annotate(snap, s_boxes, s_confs, s_clss, names)
    snap_path = os.path.join(OUTPUT_DIR, f"{model_name}_selected_frame_sameidx_{common_idx}.jpg")
    cv2.imwrite(snap_path, snap_anno)

    fps_eff = frames / t_sum if t_sum > 0 else 0.0
    return {
        "model": model_name,
        "output_video": out_path,
        "frames_processed": frames,
        "throughput_fps": fps_eff,
        "total_detections": tot_det,
        "avg_detections_per_frame": tot_det / max(frames, 1),
        "selected_frame_index": common_idx,
        "selected_frame_det_count": int(len(s_confs)),
        "selected_frame_path": snap_path,
    }

In [None]:
# ------------------------- PLOTTING -------------------------
def _label_bars(ax, bars, values, pad=0.01):
    """Add value labels on top of bars with smart formatting."""
    y_max = max(values) if len(values) else 1.0
    for rect, v in zip(bars, values):
        height = rect.get_height()
        ax.annotate(_fmt_val(v),
                    xy=(rect.get_x() + rect.get_width()/2, height),
                    xytext=(0, 5), textcoords="offset points",
                    ha="center", va="bottom", fontsize=9)

In [None]:
def save_bar(df, col, ylabel, title, filename):
    vals = df[col].values
    labels = df["model"].values
    xs = np.arange(len(vals))

    fig, ax = plt.subplots(figsize=(8.8, 4.8))
    bars = ax.bar(xs, vals)
    ax.set_xticks(xs, labels)
    ax.set_ylabel(ylabel)
    ax.set_title(title)
    _label_bars(ax, bars, vals)
    fig.tight_layout()
    outp = os.path.join(OUTPUT_DIR, filename)
    fig.savefig(outp, bbox_inches="tight")
    plt.close(fig)
    print(f"Saved: {outp}")

In [None]:
def save_scatter_fps_vs_avgdet(df, filename):
    x = df["throughput_fps"].values
    y = df["avg_detections_per_frame"].values
    labels = df["model"].values

    fig, ax = plt.subplots(figsize=(7.2, 5.4))
    ax.scatter(x, y, s=90)
    for xi, yi, lab in zip(x, y, labels):
        ax.annotate(lab, (xi, yi), textcoords="offset points", xytext=(6, 6), fontsize=9)
    ax.set_xlabel("Throughput (FPS)")
    ax.set_ylabel("Avg detections per frame")
    ax.set_title("Speed vs Density (higher-right is better)")
    ax.grid(True, linestyle="--", linewidth=0.6)
    fig.tight_layout()
    outp = os.path.join(OUTPUT_DIR, filename)
    fig.savefig(outp, bbox_inches="tight")
    plt.close(fig)
    print(f"Saved: {outp}")

In [None]:
def save_grouped_fps_avgdet(df, filename):
    labels = df["model"].values
    fps = df["throughput_fps"].values
    avgd = df["avg_detections_per_frame"].values

    xs = np.arange(len(labels))
    width = 0.38

    fig, ax = plt.subplots(figsize=(9.2, 5.0))
    b1 = ax.bar(xs - width/2, fps,  width, label="FPS")
    b2 = ax.bar(xs + width/2, avgd, width, label="Avg det./frame")
    ax.set_xticks(xs, labels)
    ax.set_ylabel("Value")
    ax.set_title("Grouped Comparison: FPS vs Avg detections")
    ax.legend()
    _label_bars(ax, b1, fps)
    _label_bars(ax, b2, avgd)
    fig.tight_layout()
    outp = os.path.join(OUTPUT_DIR, filename)
    fig.savefig(outp, bbox_inches="tight")
    plt.close(fig)
    print(f"Saved: {outp}")

In [None]:

# ------------------------- MAIN -------------------------
if __name__ == "__main__":
    if not os.path.exists(INPUT_VIDEO):
        raise FileNotFoundError(f"Missing: {INPUT_VIDEO}")

    # 1) pick ONE common frame with YOLOv8
    v8 = load_model(MODEL_WEIGHTS["yolov8n"])
    common_idx = choose_common_idx_with_v8(INPUT_VIDEO, v8)
    print(f"[Common frame] index = {common_idx}")

    # 2) run all models, force SAME snapshot
    summaries = []
    for name, weight in MODEL_WEIGHTS.items():
        print(f"\n--- Running {name} ---")
        try:
            s = run_model_full_video(name, weight, INPUT_VIDEO, common_idx)
            summaries.append(s)
            print(f"OK: {s['output_video']}")
            print(f"Snapshot: {s['selected_frame_path']} (idx={common_idx}, det={s['selected_frame_det_count']})")
            print(f"FPS={s['throughput_fps']:.2f}, total={s['total_detections']}, avg/frame={s['avg_detections_per_frame']:.2f}")
        except Exception as e:
            print(f"[WARN] {name} failed: {e}")

    if summaries:
        df = pd.DataFrame(summaries)
        csv_path = os.path.join(OUTPUT_DIR, "comparison_table.csv")
        df.to_csv(csv_path, index=False)
        print("\nSaved table:", csv_path)

        # Console view (rounded for readability)
        df_view = df.copy()
        df_view["throughput_fps"] = df_view["throughput_fps"].map(lambda v: float(f"{v:.2f}"))
        df_view["avg_detections_per_frame"] = df_view["avg_detections_per_frame"].map(lambda v: float(f"{v:.2f}"))
        print("\nComparison:")
        print(df_view[[
            "model","throughput_fps","total_detections","avg_detections_per_frame",
            "selected_frame_index","selected_frame_det_count","selected_frame_path"
        ]].to_string(index=False))

        # --------- Pretty Charts ---------
        save_bar(df, "throughput_fps", "FPS", "Model Throughput (FPS)", "chart_throughput_fps.png")
        save_bar(df, "total_detections", "Count", "Total Detections (whole video)", "chart_total_detections.png")
        save_bar(df, "avg_detections_per_frame", "Avg / frame", "Average Detections per Frame", "chart_avg_detections_per_frame.png")
        save_bar(
            df, "selected_frame_det_count", "Count",
            f"Detections on Shared Frame (idx={int(df['selected_frame_index'].iloc[0])})",
            "chart_shared_frame_detections.png"
        )
        save_scatter_fps_vs_avgdet(df, "chart_fps_vs_avgdet_scatter.png")
        save_grouped_fps_avgdet(df, "chart_grouped_fps_avgdet.png")

    else:
        print("No successful runs.")


[Common frame] index = 103

--- Running yolov8n ---
OK: outputs\yolov8n_annotated.avi
Snapshot: outputs\yolov8n_selected_frame_sameidx_103.jpg (idx=103, det=8)
FPS=59.95, total=2050, avg/frame=6.21

--- Running yolo11n ---
OK: outputs\yolo11n_annotated.avi
Snapshot: outputs\yolo11n_selected_frame_sameidx_103.jpg (idx=103, det=8)
FPS=57.31, total=2665, avg/frame=8.08

--- Running yolo12n ---
OK: outputs\yolo12n_annotated.avi
Snapshot: outputs\yolo12n_selected_frame_sameidx_103.jpg (idx=103, det=9)
FPS=46.16, total=2942, avg/frame=8.92

Saved table: outputs\comparison_table.csv

Comparison:
  model  throughput_fps  total_detections  avg_detections_per_frame  selected_frame_index  selected_frame_det_count                            selected_frame_path
yolov8n           59.95              2050                      6.21                   103                         8 outputs\yolov8n_selected_frame_sameidx_103.jpg
yolo11n           57.31              2665                      8.08           