In [None]:
# ============================================================
# Forbidden Zone Violation (RetinaNet, TorchVision, OpenCV)
# Goal:
#   - Detect persons & vehicles with RetinaNet (COCO).
#   - A 6-point polygon defines a pedestrian-only area (ROI).
#   - Count a violation when a vehicle enters the ROI.
#   - HUD: violation counter (top-left) + "Alert" bottom-center.
#   - Colors are provided directly in BGR (OpenCV order).
# ============================================================

In [1]:
import cv2
import numpy as np
import torch
from pathlib import Path

In [21]:
# --- calibrate_roi_click_6_points.py ---
# Click 6 polygon points on the first video frame, get normalized (0..1) coords to paste.


file_name = "video_name"
VIDEO_PATH = f"/path_to_your_video/{file_name}.mp4"

pts = []  # list of (x,y) in pixels

def on_mouse(event, x, y, flags, userdata):
    if event == cv2.EVENT_LBUTTONDOWN:
        if len(pts) < 6:
            pts.append((x, y))
        print(f"Point {len(pts)}: (px) {x}, {y}")

cap = cv2.VideoCapture(VIDEO_PATH)
ok, frame = cap.read()
if not ok:
    raise RuntimeError("Cannot read first frame.")
H, W = frame.shape[:2]

win = "Click 6 ROI points (clockwise), keys: u=undo, r=reset, s=save, q=quit"
cv2.namedWindow(win, cv2.WINDOW_NORMAL)
cv2.resizeWindow(win, min(1200, W), min(750, H))
cv2.setMouseCallback(win, on_mouse)

while True:
    vis = frame.copy()

    # draw already clicked points and poly lines
    for i, (x, y) in enumerate(pts):
        cv2.circle(vis, (x, y), 5, (0, 255, 255), -1)
        cv2.putText(vis, str(i+1), (x+6, y-6), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,255), 2, cv2.LINE_AA)
    if len(pts) >= 2:
        cv2.polylines(vis, [np.array(pts, np.int32)], len(pts)==6, (0, 255, 255), 2)

    # instructions
    cv2.putText(vis, "Left-click 6 points around the pedestrian area.",
                (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (20,20,20), 4, cv2.LINE_AA)
    cv2.putText(vis, "Left-click 6 points around the pedestrian area.",
                (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2, cv2.LINE_AA)
    cv2.putText(vis, "u=undo  r=reset  s=save/print  q=quit",
                (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (20,20,20), 3, cv2.LINE_AA)
    cv2.putText(vis, "u=undo  r=reset  s=save/print  q=quit",
                (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2, cv2.LINE_AA)

    cv2.imshow(win, vis)
    k = cv2.waitKey(10) & 0xFF
    if k == ord('u') and pts:
        pts.pop()
    elif k == ord('r'):
        pts.clear()
    elif k == ord('s') and len(pts) == 6:
        # normalize
        norm = [(round(x/(W-1), 4), round(y/(H-1), 4)) for (x,y) in pts]
        print("\nPaste this into ROI_NORM_PTS in your main script:")
        print("ROI_NORM_PTS = [")
        for x, y in norm:
            print(f"    ({x}, {y}),")
        print("]\n")
    elif k in [27, ord('q')]:
        break

cv2.destroyAllWindows()
cap.release()


Point 1: (px) 2, 655
Point 2: (px) 2, 435
Point 3: (px) 180, 238
Point 4: (px) 1092, 221
Point 5: (px) 1277, 369
Point 6: (px) 1277, 655

Paste this into ROI_NORM_PTS in your main script:
ROI_NORM_PTS = [
    (0.0016, 0.997),
    (0.0016, 0.6621),
    (0.1407, 0.3623),
    (0.8538, 0.3364),
    (0.9984, 0.5616),
    (0.9984, 0.997),
]


Paste this into ROI_NORM_PTS in your main script:
ROI_NORM_PTS = [
    (0.0016, 0.997),
    (0.0016, 0.6621),
    (0.1407, 0.3623),
    (0.8538, 0.3364),
    (0.9984, 0.5616),
    (0.9984, 0.997),
]


Paste this into ROI_NORM_PTS in your main script:
ROI_NORM_PTS = [
    (0.0016, 0.997),
    (0.0016, 0.6621),
    (0.1407, 0.3623),
    (0.8538, 0.3364),
    (0.9984, 0.5616),
    (0.9984, 0.997),
]


Paste this into ROI_NORM_PTS in your main script:
ROI_NORM_PTS = [
    (0.0016, 0.997),
    (0.0016, 0.6621),
    (0.1407, 0.3623),
    (0.8538, 0.3364),
    (0.9984, 0.5616),
    (0.9984, 0.997),
]


Paste this into ROI_NORM_PTS in your main script:
ROI_NORM

In [51]:
# Settings #
file_name = "video_name"  # filename stem (no extension)
VIDEO_PATH = f"/path_to_your_video/{file_name}.mp4"  # full path to your video
SAVE_PATH  = "path_to_your_file/forbidden_zone_retinanet.mp4"  # set to None to disable saving

DET_CONF = 0.50             # detection confidence threshold (good range: 0.40–0.60)
IOU_MATCH_THR = 0.20        # IoU threshold for matching
TRACK_MAX_MISS = 12         # keep tracks a bit longer to ride through brief drops

# minimal stabilization knobs #
CENTER_THR_PX  = 160        # bottom-center distance (pixels) to keep same ID

# ROI: 6 normalized points (x,y in 0..1) #
ROI_NORM_PTS = [
    (0.0000, 0.997),  # bottom left
    (0.0000, 0.6621), # middle left
    (0.1407, 0.3623), # top left
    (0.8538, 0.3364), # top right
    (0.9999, 0.5616), # middle right
    (0.9999, 0.997),  # bottom right
]


# Colors (already in BGR for OpenCV) #
ROI_OK_BGR     = (255, 51, 51)
ROI_ALERT_BGR  = (0 , 0, 255)
HUD_BG_BGR     = (0, 0, 204)
PERSON_BGR     = (255, 255, 204)
VEHICLE_BGR    = (153, 255, 153)
ALPHA_FILL     = 0.30

PERSON_ID   = 1
VEHICLE_IDS = {2, 3, 4, 6, 7, 8}   # bicycle, car, motorcycle, bus, train, truck

# Small helper functions #
def scale_norm_points(norm_pts, W, H):
    """
    Convert normalized points (0..1) to integer pixel coordinates for WxH frame.
    """
    out = []
    for x, y in norm_pts:
        x = float(np.clip(x, 0, 1)); y = float(np.clip(y, 0, 1))
        out.append((int(round(x * (W - 1))), int(round(y * (H - 1)))))
    return out

def iou_xyxy(a, b):
    """
    Compute Intersection-over-Union between two [x1,y1,x2,y2] boxes.
    """
    xA, yA = max(a[0], b[0]), max(a[1], b[1])
    xB, yB = min(a[2], b[2]), min(a[3], b[3])
    inter = max(0, xB - xA) * max(0, yB - yA)
    if inter <= 0: return 0.0
    areaA = max(0, a[2]-a[0]) * max(0, a[3]-a[1])
    areaB = max(0, b[2]-b[0]) * max(0, b[3]-b[1])
    return inter / max(1e-6, (areaA + areaB - inter))

def nms_xyxy(boxes, scores, iou_thr=0.60):
    """
    Very small NMS to drop duplicate vehicle boxes.
    """
    if len(boxes) == 0: return []
    idxs = list(range(len(boxes)))
    idxs.sort(key=lambda i: float(scores[i]), reverse=True)
    keep = []
    while idxs:
        i = idxs.pop(0)
        keep.append(i)
        idxs = [j for j in idxs if iou_xyxy(boxes[i], boxes[j]) < iou_thr]
    return keep

def make_writer(path, w, h, fps):
    """
    Create a cv2.VideoWriter with a few FOURCC fallbacks so it opens reliably.
    """
    if path is None: return None
    Path(path).parent.mkdir(parents=True, exist_ok=True)
    for fourcc in ("mp4v","avc1","XVID","MJPG"):
        vw = cv2.VideoWriter(str(path), cv2.VideoWriter_fourcc(*fourcc), fps, (w,h))
        if vw.isOpened():
            print(f"[INFO] Using FOURCC '{fourcc}' -> {path}")
            return vw
    raise RuntimeError("No compatible codec opened; try .avi")

# Load RetinaNet (TorchVision) #
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
    from torchvision.models.detection import retinanet_resnet50_fpn, RetinaNet_ResNet50_FPN_Weights
    model = retinanet_resnet50_fpn(weights=RetinaNet_ResNet50_FPN_Weights.DEFAULT).to(device).eval()
except Exception:
    from torchvision.models.detection import retinanet_resnet50_fpn
    model = retinanet_resnet50_fpn(pretrained=True).to(device).eval()
torch.set_grad_enabled(False)

# Video, ROI, writer #
cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened(): raise RuntimeError(f"Cannot open video: {VIDEO_PATH}")
ok, first = cap.read();  assert ok, "Cannot read the first frame."
H, W = first.shape[:2]; fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

ROI_PTS = scale_norm_points(ROI_NORM_PTS, W, H)
ROI_CNT = np.array(ROI_PTS, np.int32).reshape(-1, 1, 2)
writer  = make_writer(SAVE_PATH, W, H, fps) if SAVE_PATH else None

# Tiny vehicle tracker #
class Track:
    """
    Minimal vehicle track:
    id, box, miss, hits
    confirmed -> True after TRACK_MIN_HITS (for display only)
    inside_prev -> inside/outside status at the previous frame (to count passes)
    counted -> already counted since the last exit (arm/disarm)
    """
    def __init__(self, tid, box):
        self.id = tid
        self.box = box
        self.miss = 0
        self.inside_prev = False  # keep as in your original -> counts first vehicle correctly

tracks = []
next_id = 1
violations = 0

# Main processing loop #
while True:
    ok, frame_bgr = cap.read()
    if not ok: break

    #  Detection (RetinaNet) #
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    img_t = torch.from_numpy(frame_rgb).permute(2,0,1).float()/255.0
    out = model([img_t.to(device)])[0]

    boxes  = out["boxes"].detach().to("cpu").numpy()
    labels = out["labels"].detach().to("cpu").numpy()
    scores = out["scores"].detach().to("cpu").numpy()
    keep   = scores >= DET_CONF
    boxes, labels, scores = boxes[keep], labels[keep], scores[keep]

    # split persons / vehicles #
    person_boxes  = [b for b,c in zip(boxes, labels) if c==PERSON_ID]
    veh_boxes_all = [b for b,c in zip(boxes, labels) if c in VEHICLE_IDS]
    veh_scores_all= [s for s,c in zip(scores, labels) if c in VEHICLE_IDS]

    # extra NMS on vehicles (drops duplicate boxes around the vehicle) #
    keep_idx = nms_xyxy(veh_boxes_all, veh_scores_all, iou_thr=0.60)
    vehicle_boxes = [veh_boxes_all[i] for i in keep_idx]

    # global greedy matching (distance first, IoU fallback) #
    assigned_tracks = set()
    assigned_dets   = set()
    pairs = []
    
    # Build all track-det pairs with cost (smaller is better) #
    for ti, t in enumerate(tracks):
        tx1,ty1,tx2,ty2 = t.box
        tcx,tcy = 0.5*(tx1+tx2), ty2
        for dj, b in enumerate(vehicle_boxes):
            bx1,by1,bx2,by2 = b
            bcx,bcy = 0.5*(bx1+bx2), by2
            dist = np.hypot(bcx-tcx, bcy-tcy)
            iou  = iou_xyxy(t.box, b)
            cost = dist + (1.0 - iou) * CENTER_THR_PX  # unify scales
            pairs.append((cost, ti, dj, dist, iou))
    pairs.sort(key=lambda x: x[0])

    # Assign best non-conflicting pairs, with gate: IoU high OR distance close #
    for cost, ti, dj, dist, iou in pairs:
        if ti in assigned_tracks or dj in assigned_dets: 
            continue
        if (iou >= IOU_MATCH_THR) or (dist <= CENTER_THR_PX):
            tracks[ti].box = vehicle_boxes[dj].tolist()
            tracks[ti].miss = 0
            assigned_tracks.add(ti)
            assigned_dets.add(dj)

    # Unmatched tracks: age #
    for idx, t in enumerate(tracks):
        if idx not in assigned_tracks:
            t.miss += 1

    # Create new tracks for unmatched dets #
    for dj, b in enumerate(vehicle_boxes):
        if dj in assigned_dets: 
            continue
        tracks.append(Track(next_id, b.tolist()))
        next_id += 1

    # Prune old tracks #
    tracks = [t for t in tracks if t.miss <= TRACK_MAX_MISS]

    # Counting #
    any_vehicle_inside = False
    for t in tracks:
        x1,y1,x2,y2 = t.box
        cx, cy = 0.5*(x1+x2), y2
        inside_now = cv2.pointPolygonTest(ROI_CNT, (cx, cy), False) >= 0
        if inside_now: any_vehicle_inside = True
        if (not t.inside_prev) and inside_now:
            violations += 1
        t.inside_prev = inside_now

    # Draw ROI #
    fill = ROI_ALERT_BGR if any_vehicle_inside else ROI_OK_BGR
    overlay = frame_bgr.copy()
    cv2.fillPoly(overlay, [np.array(ROI_PTS, np.int32)], fill)
    frame_bgr = cv2.addWeighted(overlay, ALPHA_FILL, frame_bgr, 1-ALPHA_FILL, 0)
    cv2.polylines(frame_bgr, [np.array(ROI_PTS, np.int32)], True, fill, 1)

    # Draw detections #
    for (x1,y1,x2,y2) in person_boxes:
        x1,y1,x2,y2 = map(int,[x1,y1,x2,y2])
        cv2.rectangle(frame_bgr,(x1,y1),(x2,y2),PERSON_BGR,1)
        cv2.putText(frame_bgr,"person",(x1,max(20,y1-6)),
                    cv2.FONT_HERSHEY_SIMPLEX,0.6,PERSON_BGR,1,cv2.LINE_AA)

    for t in tracks:
        x1,y1,x2,y2 = map(int, t.box)
        cv2.rectangle(frame_bgr,(x1,y1),(x2,y2),VEHICLE_BGR,1)
        cv2.putText(frame_bgr,f"vehicle #{t.id}",(x1,max(20,y1-6)),
                    cv2.FONT_HERSHEY_SIMPLEX,0.6,VEHICLE_BGR,1,cv2.LINE_AA)

    # HUD #
    text = f"Violations: {violations}"
    (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 1.0, 1)
    cv2.rectangle(frame_bgr,(8,8),(8+tw+14,8+th+14),(0, 0, 0),-1)
    cv2.putText(frame_bgr,text,(15,8+th+4),
                cv2.FONT_HERSHEY_SIMPLEX,1.0,(255,255,255),1,cv2.LINE_AA)

    if any_vehicle_inside:
        alert = "Alert"
        (aw, ah), _ = cv2.getTextSize(alert, cv2.FONT_HERSHEY_SIMPLEX, 1.2, 1)
        cxm, yb = W//2, H-20
        cv2.rectangle(frame_bgr,(cxm-aw//2-12,yb-ah-12),(cxm+aw//2+12,yb+8),(0, 0, 0),-1)
        cv2.putText(frame_bgr,alert,(cxm-aw//2,yb),
                    cv2.FONT_HERSHEY_SIMPLEX,1.2,(0, 0, 204),1,cv2.LINE_AA)

    # Show & Save #
    if writer: writer.write(frame_bgr)
    cv2.imshow("Forbidden Zone Violation (RetinaNet)", frame_bgr)
    if cv2.waitKey(1) & 0xFF in [27, ord('q')]: break

# Cleanup #
cap.release()
if writer: writer.release()
cv2.destroyAllWindows()
print(f"[OK] Done. Total violations = {violations}")

[INFO] Using FOURCC 'mp4v' -> outputs/forbidden_zone_retinanet.mp4
[OK] Done. Total violations = 5
