In [None]:
# Method 1: reference official tutorial (https://github.com/ultralytics/notebooks/blob/main/notebooks/how-to-use-ultralytics-yolo-with-openai-for-number-plate-recognition.ipynb)
# using ultralytics pretrained car plate detection model
!pip install ultralytics opencv-python-headless

import base64

import cv2, json, math
from pathlib import Path
import ultralytics
from ultralytics import YOLO
from ultralytics.utils.downloads import safe_download
from ultralytics.utils.plotting import Annotator, colors

ultralytics.checks()

In [None]:
# download the sample video file
safe_download("https://github.com/ultralytics/assets/releases/download/v0.0.0/anpr-demo-video.mp4")

# download the sample model file
safe_download("https://github.com/ultralytics/assets/releases/download/v0.0.0/anpr-demo-model.pt")

In [None]:
video_path = str(Path("~/videos/gun_car.mp4").expanduser())
plate_model_path = "runs/detect/train4/weights/best.pt"  # put downloaded or self-trained model here (e.g., 'anpr-demo-model.pt', `runs/detect/train4/weights/best.pt`)
save_annotated = True            # Ultralytics will write annotated video to runs/track/...

# Get fps for timestamp math
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), f"Cannot open {video_path}"
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
cap.release()

# Load model
model = YOLO(plate_model_path)

# Run tracking and stream results to Python (persist=True keeps track IDs)
gen = model.track(
    source=video_path,
    tracker="bytetrack.yaml",
    imgsz=960,
    conf=0.4,
    iou=0.6,
    device=0,           # or '0' if CUDA; use 'cpu' if no GPU
    stream=True,
    persist=True,
    save=save_annotated # writes annotated video under runs/track/exp*/...
)

sidecar = {"fps": fps, "frames": []}

frame_idx = -1
for res in gen:  # res is ultralytics.engine.results.Results
    frame_idx += 1
    boxes = res.boxes
    if boxes is None or len(boxes) == 0:
        sidecar["frames"].append({"frame": frame_idx, "ts_ms": int(1000*frame_idx/fps), "objects": []})
        continue

    # XYWH, CLS, CONF, ID (IDs exist after tracker initializes; first few frames may be None)
    xywh = boxes.xywh.cpu().numpy()           # (N, 4) in pixels
    clss = boxes.cls.cpu().numpy()            # (N,)
    conf = boxes.conf.cpu().numpy()           # (N,)
    ids  = boxes.id.cpu().numpy() if boxes.id is not None else [None]*len(xywh)

    objs = []
    for (x,y,w,h), c, p, tid in zip(xywh, clss, conf, ids):
        obj = {
            "id": None if tid is None else int(tid),
            "cls": int(c),               # numeric class id (string name optional below)
            "conf": float(p),
            "bbox_xywh": [float(x), float(y), float(w), float(h)]
        }
        objs.append(obj)

    sidecar["frames"].append({
        "frame": frame_idx,
        "ts_ms": int(1000*frame_idx/fps),
        "objects": objs
    })

# Save sidecar JSON
out_json = "sidecar_tracks.json"
with open(out_json, "w") as f:
    json.dump(sidecar, f, indent=2)
out_json

In [None]:
# Method 2: using roboflow universe pretrained model, calling their API 
# https://universe.roboflow.com/roboflow-universe-projects/license-plate-recognition-rxg4e


In [None]:
#install env to call 
%%bash
set -euxo pipefail

# Load conda into this shell
source ~/miniconda/etc/profile.d/conda.sh 2>/dev/null || \
eval "$(/home/ec2-user/miniconda/bin/conda shell.bash hook)"

# Make a 3.11 env using only conda-forge (bypasses ToS prompts)
conda create --prefix ~/envs/rf311 -c conda-forge --override-channels python=3.11 -y

# Install your deps into that env
conda run -p ~/envs/rf311 python -m pip install -U pip wheel setuptools
conda run -p ~/envs/rf311 python -m pip install inference-sdk supervision opencv-python-headless ipykernel

# Register this env as a Jupyter kernel
conda run -p ~/envs/rf311 python -m ipykernel install --user --name rf311 --display-name "Python 3.11 (rf311)"

#Now switch your notebook to that kernel:
#Kernel → Change kernel → “Python 3.11 (rf311)”


In [None]:
import os, time, json, cv2, numpy as np
from pathlib import Path
import supervision as sv
from inference_sdk import InferenceHTTPClient

# ---- Client ----
CLIENT = InferenceHTTPClient(
    api_url="https://serverless.roboflow.com",
    api_key="UA5NSGPR0yXQ3X1FTEkj",
)
MODEL_ID = "license-plate-recognition-rxg4e/11"

# ---- Video ----
video_path = str(Path("~/videos/anpr-demo-video.mp4").expanduser())
cap = cv2.VideoCapture(video_path)
assert cap.isOpened(), f"Cannot open {video_path}"
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0

# Process ~3 FPS to avoid hammering serverless
STRIDE =1
proc_fps = fps

tracker = sv.ByteTrack(
    track_activation_threshold=0.15,     # was track_thresh
    minimum_matching_threshold=0.8,      # was match_thresh
    lost_track_buffer=int(proc_fps * 2), # was track_buffer; keep tracks ~2s
    frame_rate=proc_fps,                 # use processed FPS, not raw fps
)

CONF_MIN = 0.15  # keep low, can raise later

def rf_to_detections(rf_json):
    preds = [p for p in rf_json.get("predictions", []) if p.get("confidence", 0) >= CONF_MIN]
    if not preds:
        return sv.Detections.empty()
    # Roboflow gives center xy + width/height in pixels
    xyxy = np.array([
        [p["x"] - p["width"]/2, p["y"] - p["height"]/2,
         p["x"] + p["width"]/2, p["y"] + p["height"]/2]
        for p in preds
    ], dtype=float)
    conf = np.array([p["confidence"] for p in preds], dtype=float)
    # if class ids not present, just 0
    cid  = np.array([p.get("class_id", 0) if isinstance(p.get("class_id", 0), (int,np.integer)) else 0
                     for p in preds], dtype=int)
    return sv.Detections(xyxy=xyxy, confidence=conf, class_id=cid)

sidecar = {"fps": fps, "frames": []}
frame_idx = -1

def empty_entry(i):  # keep per-frame alignment
    return {"frame": i, "ts_ms": int(1000 * i / fps), "objects": []}

def infer_with_retry(frame, tries=3, base_sleep=0.5):
    for t in range(tries):
        try:
            return CLIENT.infer(frame, model_id=MODEL_ID)
        except Exception as e:
            if t == tries - 1:
                raise
            time.sleep(base_sleep * (2 ** t))

while True:
    ok, frame = cap.read()
    if not ok:
        break
    frame_idx += 1

    # throttle: only send every Nth frame; still append empty for others
    if frame_idx % STRIDE != 0:
        sidecar["frames"].append(empty_entry(frame_idx))
        continue

    try:
        rf_json = infer_with_retry(frame)
    except Exception as e:
        # transient server error — record empty and continue
        print(f"[warn] frame {frame_idx}: {e}")
        sidecar["frames"].append(empty_entry(frame_idx))
        continue

    det = tracker.update_with_detections(rf_to_detections(rf_json))

    objs = []
    for i in range(len(det)):
        x1, y1, x2, y2 = det.xyxy[i]
        w = float(x2 - x1); h = float(y2 - y1)
        cx = float((x1 + x2) / 2.0); cy = float((y1 + y2) / 2.0)
        cls = int(det.class_id[i]) if det.class_id is not None else 0
        conf = float(det.confidence[i]) if det.confidence is not None else 0.0
        tid = (int(det.tracker_id[i]) if det.tracker_id is not None and det.tracker_id[i] is not None else None)
        objs.append({"id": tid, "cls": cls, "conf": conf, "bbox_xywh": [cx, cy, w, h]})

    sidecar["frames"].append({"frame": frame_idx, "ts_ms": int(1000*frame_idx/fps), "objects": objs})

cap.release()

with open("sidecar_tracks.json", "w") as f:
    json.dump(sidecar, f, indent=2)
print("Wrote sidecar_tracks.json")
