In [None]:
!pip install --upgrade ultralytics



In [4]:
# Dashcam Dangerous Event Analyzer
# --------------------------------
# Runnable in Google Colab. Uses Ultralytics YOLO11 for object detection + tracking,
# a simple speed estimation approach inspired by:
# https://docs.ultralytics.com/guides/speed-estimation/
# and ego-speed estimation from global optical flow.
#
# HOW TO USE IN COLAB:
# 1. Upload your dashcam video as "video.mp4" to the working directory, or change VIDEO_PATH below.
# 2. Run this cell.
# 3. Outputs:
#       - Annotated video: ./output_annotated.mp4
#       - Dangerous events JSON: ./dangerous_events.json
#       - Time profiling JSON: ./time_profile.json
#       - Spatial heatmap PNG: ./heatmap.png

# =========================
# CONFIGURABLE CONSTANTS
# =========================

VIDEO_PATH = "./video.mp4"      # default video location
OUTPUT_DIR = "./"               # default output directory

OUTPUT_VIDEO_FILENAME = "output_annotated.mp4"
DANGEROUS_EVENTS_JSON = "dangerous_events.json"
TIME_PROFILE_JSON = "time_profile.json"
HEATMAP_IMAGE = "heatmap.png"

# Ultralytics YOLO model (detection + tracking)
# (Make sure weights exist in Ultralytics; "yolo11x.pt" is the large YOLO11 detect model)
YOLO_MODEL_NAME = "yolo11x.pt"

# Run YOLO only on every N-th frame for speed; intermediate frames use last detections
ANALYZE_EVERY_N_FRAMES = 3

# Detection / tracking thresholds
CONF_THRESHOLD = 0.2
IOU_THRESHOLD = 0.1

# Speed estimation configuration (approximate!)
# Meter-per-pixel strongly depends on camera calibration and setup.
METERS_PER_PIXEL = 0.05 * 0.5  # tweak for your dashcam

# Ego vehicle speed estimation (optical-flow based)
EGO_SPEED_SMOOTHING = 0.5          # EMA smoothing factor [0..1]; closer to 1 = more smoothing
EGO_SPEED_MAX_KMH = 200.0          # clamp unrealistic ego speeds
# ROI used for optical flow (normalized coords: x1, y1, x2, y2)
# Adjust to roughly cover the road region in your dashcam.
EGO_FLOW_ROI_NORMALIZED = (0.3, 0.1, 0.7, 0.8)

# Dangerous event configuration
REL_SPEED_THRESHOLD_KMH = 10.0      # relative speed threshold to trigger dangerous event
DANGER_COOLDOWN_SECONDS = 3.0       # minimum gap between two dangerous events
DANGER_OVERLAY_DURATION_SECONDS = 2.0  # how long to display "DANGER" overlay

MIN_EVENT_CONFIDENCE = 0.4          # minimum detection confidence to consider for danger

# Time profiling: number of seconds per bin
PROFILE_WINDOW_SECONDS = 5.0

# Heatmap grid (coarse occupancy grid, not per-pixel)
HEATMAP_GRID_W = 40
HEATMAP_GRID_H = 24

# Timestamp overlay configuration (DD-MM-YYYY hh:mm:ss)
VIDEO_START_DATETIME_STR = "20-11-2025 8:00:00"   # starting timestamp
TIMESTAMP_FORMAT = "%d-%m-%Y %H:%M:%S"

# Alarm area polygon in NORMALIZED coordinates (relative to image width/height)
# This is a trapezoid roughly in front of the car, center of the frame.
# You can tune these values for your specific camera.
ALARM_POLYGON_NORMALIZED = [
    (0.43, 1.0),  # bottom-left
    (0.63, 1.0),  # bottom-right
    (0.47, 0.5),  # upper-right
    (0.59, 0.5),  # upper-left
]

# Classes considered "pedestrian" or "vehicle" (COCO indexes)
PEDESTRIAN_CLASS_IDS = {0}  # person
VEHICLE_CLASS_IDS = {1, 2, 3, 5, 6, 7}  # bicycle, car, motorcycle, bus, train, truck

# Whether to show OpenCV window (off for Colab)
SHOW_PREVIEW = False

# =========================
# INSTALL DEPENDENCIES
# =========================

import sys
import subprocess

def install_if_missing(pkg: str):
    try:
        __import__(pkg.split("[")[0].split(">=")[0])
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg, "-q"])

# ultralytics & OpenCV are needed
install_if_missing("ultralytics>=8.3.0")
install_if_missing("opencv-python")

# =========================
# IMPORTS
# =========================

import os
import json
import math
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Dict, List, Tuple, Optional

import cv2
import numpy as np
import torch
from ultralytics import YOLO

# =========================
# DATA STRUCTURES
# =========================

@dataclass
class DetectionRecord:
    track_id: Optional[int]
    cls_id: int
    obj_type: str  # 'pedestrian', 'vehicle', or 'other'
    conf: float
    bbox_xyxy: Tuple[float, float, float, float]
    center_xy: Tuple[float, float]
    speed_kmh: Optional[float]
    rel_speed_kmh: Optional[float]
    is_in_alarm_area: bool
    is_dangerous: bool


# =========================
# HELPERS
# =========================

def get_device() -> str:
    """Return 'cuda' if available, otherwise 'cpu'."""
    if torch.cuda.is_available():
        return "cuda"
    return "cpu"


def build_alarm_polygon(image_width: int, image_height: int) -> np.ndarray:
    """Convert normalized polygon coordinates into pixel coordinates."""
    pts = []
    for x_norm, y_norm in ALARM_POLYGON_NORMALIZED:
        x = int(x_norm * image_width)
        y = int(y_norm * image_height)
        pts.append((x, y))
    polygon = np.array(pts, dtype=np.int32)
    return polygon


def map_class_to_type(cls_id: int) -> str:
    """Map COCO class ID to our simplified types."""
    if cls_id in PEDESTRIAN_CLASS_IDS:
        return "pedestrian"
    if cls_id in VEHICLE_CLASS_IDS:
        return "vehicle"
    return "other"


def point_inside_polygon(point: Tuple[float, float], polygon: np.ndarray) -> bool:
    """Return True if point is inside or on the polygon."""
    x, y = point
    res = cv2.pointPolygonTest(polygon, (float(x), float(y)), False)
    return res >= 0


def compute_speed_kmh(
    prev_center: Tuple[float, float],
    prev_frame_idx: int,
    cur_center: Tuple[float, float],
    cur_frame_idx: int,
    fps: float,
    meters_per_pixel: float,
) -> Optional[float]:
    """Estimate object speed in km/h from motion between two frames.

    Follows the principle described in Ultralytics' speed estimation guide:
    speed ~ (pixel_distance * meter_per_pixel) / delta_time * 3.6
    """
    dt_frames = cur_frame_idx - prev_frame_idx
    if dt_frames <= 0 or fps <= 0:
        return None
    dt_seconds = dt_frames / fps
    dx = cur_center[0] - prev_center[0]
    dy = cur_center[1] - prev_center[1]
    pixel_dist = math.hypot(dx, dy)
    meters = pixel_dist * meters_per_pixel
    speed_m_per_s = meters / dt_seconds
    speed_kmh = speed_m_per_s * 3.6
    # Filter out unrealistically high speeds (likely tracking glitches)
    if not math.isfinite(speed_kmh) or speed_kmh < 0 or speed_kmh > 3000:
        return None
    return speed_kmh


def estimate_ego_speed_kmh(
    prev_gray: Optional[np.ndarray],
    cur_gray: np.ndarray,
    fps: float,
    meters_per_pixel: float,
    roi_norm: Optional[Tuple[float, float, float, float]] = None,
) -> Optional[float]:
    """
    Estimate ego-vehicle speed from global optical flow between two frames.
    Uses dense Farneback flow and returns speed in km/h (very approximate).
    """
    if prev_gray is None or cur_gray is None or fps <= 0:
        return None

    # Optional ROI focusing on the road area
    if roi_norm is not None:
        h, w = prev_gray.shape[:2]
        x1 = int(roi_norm[0] * w)
        y1 = int(roi_norm[1] * h)
        x2 = int(roi_norm[2] * w)
        y2 = int(roi_norm[3] * h)
        prev = prev_gray[y1:y2, x1:x2]
        cur = cur_gray[y1:y2, x1:x2]
    else:
        prev = prev_gray
        cur = cur_gray

    if prev.size == 0 or cur.size == 0:
        return None

    # Dense optical flow
    flow = cv2.calcOpticalFlowFarneback(
        prev, cur, None,
        0.5,   # pyr_scale
        3,     # levels
        15,    # winsize
        3,     # iterations
        5,     # poly_n
        1.2,   # poly_sigma
        0      # flags
    )

    mag, ang = cv2.cartToPolar(flow[..., 0], flow[..., 1])

    mag_flat = mag.reshape(-1)
    if mag_flat.size == 0:
        return None

    # Robust central value of motion (median)
    px_per_frame = float(np.median(mag_flat))
    px_per_s = px_per_frame * fps
    speed_m_s = px_per_s * meters_per_pixel
    speed_kmh = speed_m_s * 3.6

    if not math.isfinite(speed_kmh) or speed_kmh < 0:
        return None

    # Clamp unrealistic optical-flow spikes
    if speed_kmh > EGO_SPEED_MAX_KMH:
        speed_kmh = EGO_SPEED_MAX_KMH

    # Very small values are likely noise
    if speed_kmh < 1.0:
        return 0.0

    return speed_kmh


def draw_timestamp_and_speed(
    frame: np.ndarray,
    timestamp_str: str,
    ego_speed_kmh: float,
) -> None:
    """Draw timestamp and vehicle speed in the top-right corner."""
    h, w = frame.shape[:2]
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 0.6
    thickness = 2

    text_ts = timestamp_str
    text_speed = f"Speed: {ego_speed_kmh:.1f} km/h"

    # Compute text widths
    (ts_w, ts_h), _ = cv2.getTextSize(text_ts, font, font_scale, thickness)
    (sp_w, sp_h), _ = cv2.getTextSize(text_speed, font, font_scale, thickness)

    margin = 10
    x_ts = w - ts_w - margin
    y_ts = margin + ts_h

    x_sp = w - sp_w - margin
    y_sp = y_ts + sp_h + 5

    # Background rectangles
    pad = 5
    cv2.rectangle(
        frame,
        (min(x_ts, x_sp) - pad, margin),
        (w - margin + pad, y_sp + pad),
        (0, 0, 0),
        thickness=-1,
    )

    # Timestamp
    cv2.putText(
        frame,
        text_ts,
        (x_ts, y_ts),
        font,
        font_scale,
        (255, 255, 255),
        thickness,
        cv2.LINE_AA,
    )

    # Speed
    cv2.putText(
        frame,
        text_speed,
        (x_sp, y_sp),
        font,
        font_scale,
        (0, 255, 255),
        thickness,
        cv2.LINE_AA,
    )


def draw_alarm_polygon(frame: np.ndarray, polygon: np.ndarray, highlight: bool = False) -> None:
    """Draw the alarm area polygon on the frame."""
    color = (0, 255, 255) if not highlight else (0, 0, 255)  # yellow vs red
    cv2.polylines(frame, [polygon], isClosed=True, color=color, thickness=2)


def draw_detections(
    frame: np.ndarray,
    detections: List[DetectionRecord],
    class_names: Dict[int, str],
) -> None:
    """Draw detection bounding boxes, labels, and speed on the frame."""
    for det in detections:
        x1, y1, x2, y2 = map(int, det.bbox_xyxy)
        color = (0, 255, 0)  # default green
        if det.obj_type == "pedestrian":
            color = (255, 255, 0)  # cyan-like
        if det.is_dangerous:
            color = (0, 0, 255)  # red for danger

        # Draw bounding box
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

        # Label text
        name = class_names.get(det.cls_id, str(det.cls_id))
        label_bits = [name]
        if det.track_id is not None:
            label_bits.append(f"#{det.track_id}")
        if det.speed_kmh is not None:
            label_bits.append(f"{det.speed_kmh:.1f} km/h")
        label = " ".join(label_bits)

        font = cv2.FONT_HERSHEY_SIMPLEX
        font_scale = 0.5
        thickness = 1
        (text_w, text_h), _ = cv2.getTextSize(label, font, font_scale, thickness)
        cv2.rectangle(
            frame,
            (x1, y1 - text_h - 4),
            (x1 + text_w + 4, y1),
            (0, 0, 0),
            thickness=-1,
        )
        cv2.putText(
            frame,
            label,
            (x1 + 2, y1 - 4),
            font,
            font_scale,
            (255, 255, 255),
            thickness,
            cv2.LINE_AA,
        )


def draw_danger_overlay(frame: np.ndarray) -> None:
    """Draw a big 'DANGEROUS EVENT' overlay in the middle/top of the frame."""
    h, w = frame.shape[:2]
    text = "DANGEROUS EVENT"
    font = cv2.FONT_HERSHEY_SIMPLEX
    font_scale = 1.2
    thickness = 3
    (text_w, text_h), _ = cv2.getTextSize(text, font, font_scale, thickness)
    x = (w - text_w) // 2
    y = int(0.15 * h)

    cv2.rectangle(
        frame,
        (x - 10, y - text_h - 10),
        (x + text_w + 10, y + 10),
        (0, 0, 255),
        thickness=-1,
    )
    cv2.putText(
        frame,
        text,
        (x, y),
        font,
        font_scale,
        (255, 255, 255),
        thickness,
        cv2.LINE_AA,
    )


# =========================
# MAIN ANALYSIS PIPELINE
# =========================

def main():
    # Prepare paths
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    output_video_path = os.path.join(OUTPUT_DIR, OUTPUT_VIDEO_FILENAME)
    events_json_path = os.path.join(OUTPUT_DIR, DANGEROUS_EVENTS_JSON)
    time_profile_path = os.path.join(OUTPUT_DIR, TIME_PROFILE_JSON)
    heatmap_path = os.path.join(OUTPUT_DIR, HEATMAP_IMAGE)

    # Parse start datetime
    try:
        start_datetime = datetime.strptime(VIDEO_START_DATETIME_STR, TIMESTAMP_FORMAT)
    except ValueError:
        print(f"[WARN] Could not parse VIDEO_START_DATETIME_STR '{VIDEO_START_DATETIME_STR}'.")
        print("       Falling back to current time as start.")
        start_datetime = datetime.now()

    # Open video
    cap = cv2.VideoCapture(VIDEO_PATH)
    if not cap.isOpened():
        raise RuntimeError(f"Could not open video: {VIDEO_PATH}")

    frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS) or 0.0
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    if fps <= 0:
        print("[WARN] Video FPS not detected; assuming 30 FPS.")
        fps = 30.0

    print(f"[INFO] Video: {VIDEO_PATH}")
    print(f"       Resolution: {frame_w}x{frame_h}, FPS: {fps:.2f}, Frames: {total_frames}")

    # Prepare writer
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_w, frame_h))

    # Build alarm polygon
    alarm_polygon_px = build_alarm_polygon(frame_w, frame_h)

    # Load YOLO model
    device = get_device()
    print(f"[INFO] Using device: {device}")
    model = YOLO(YOLO_MODEL_NAME)
    # (model.to(device) is usually not required, but harmless)
    try:
        model.to(device)
    except Exception:
        # Some Ultralytics versions don't expose .to on YOLO wrapper; rely on device=... instead
        pass

    class_names = model.model.names if hasattr(model, "model") else model.names

    # Tracking state for speed estimation
    # track_id -> (last_frame_idx, (cx, cy))
    last_track_positions: Dict[int, Tuple[int, Tuple[float, float]]] = {}

    # Statistics
    dangerous_events: List[dict] = []
    next_event_id = 1
    last_event_time_sec: Optional[float] = None
    danger_overlay_until_sec: float = -1.0

    # Time profiling: window_index -> detection count
    time_profile_counts: Dict[int, int] = defaultdict(int)

    # Heatmap accumulator
    heatmap = np.zeros((HEATMAP_GRID_H, HEATMAP_GRID_W), dtype=np.float32)

    # Object counts (per-frame counts – not unique track-based)
    total_pedestrians = 0
    total_vehicles = 0

    # Cache latest detections for intermediate frames
    cached_detections: List[DetectionRecord] = []

    # Ego-speed estimation state
    prev_gray: Optional[np.ndarray] = None
    ego_speed_kmh: float = 0.0

    frame_idx = 0
    while True:
        success, frame = cap.read()
        if not success:
            break

        # --- Ego speed from optical flow ---
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        estimated_ego_speed = estimate_ego_speed_kmh(
            prev_gray,
            gray,
            fps,
            METERS_PER_PIXEL,
            roi_norm=EGO_FLOW_ROI_NORMALIZED,
        )
        if estimated_ego_speed is not None:
            # Exponential moving average smoothing
            ego_speed_kmh = (
                EGO_SPEED_SMOOTHING * ego_speed_kmh
                + (1.0 - EGO_SPEED_SMOOTHING) * estimated_ego_speed
            )
        prev_gray = gray
        # --- end ego speed ---

        current_time_sec = frame_idx / fps
        timestamp_str = (start_datetime + timedelta(seconds=current_time_sec)).strftime(TIMESTAMP_FORMAT)

        is_analysis_frame = (frame_idx % ANALYZE_EVERY_N_FRAMES == 0)

        # ========================
        # RUN VISION MODEL PERIODICALLY
        # ========================
        if is_analysis_frame:
            # YOLO tracking call (persist=True keeps track IDs across frames)
            results = model.track(
                frame,
                conf=CONF_THRESHOLD,
                iou=IOU_THRESHOLD,
                device=device,
                persist=True,
                verbose=False,
            )
            result = results[0]

            new_detections: List[DetectionRecord] = []

            if result.boxes is not None and len(result.boxes) > 0:
                boxes_xyxy = result.boxes.xyxy.cpu().numpy()
                cls_ids = result.boxes.cls.int().cpu().numpy()
                confs = result.boxes.conf.cpu().numpy()
                if result.boxes.id is not None:
                    track_ids = result.boxes.id.int().cpu().tolist()
                else:
                    track_ids = [None] * len(boxes_xyxy)

                for bbox, cls_id, conf, tid in zip(boxes_xyxy, cls_ids, confs, track_ids):
                    if conf < CONF_THRESHOLD:
                        continue

                    x1, y1, x2, y2 = bbox
                    cx = 0.5 * (x1 + x2)
                    cy = 0.5 * (y1 + y2)
                    center = (cx, cy)

                    obj_type = map_class_to_type(cls_id)

                    # Update counts (simple per-frame counts)
                    if obj_type == "pedestrian":
                        total_pedestrians += 1
                    elif obj_type == "vehicle":
                        total_vehicles += 1

                    # Speed estimation using previous track position
                    speed_kmh = None
                    if tid is not None:
                        if tid in last_track_positions:
                            prev_frame_idx, prev_center = last_track_positions[tid]
                            speed_kmh = compute_speed_kmh(
                                prev_center,
                                prev_frame_idx,
                                center,
                                frame_idx,
                                fps,
                                METERS_PER_PIXEL,
                            )
                        # Update last position
                        last_track_positions[tid] = (frame_idx, center)

                    # Relative speed vs ego vehicle (estimated from optical flow)
                    rel_speed_kmh = None
                    if speed_kmh is not None:
                        rel_speed_kmh = abs(speed_kmh - ego_speed_kmh)

                    # Check if inside alarm area
                    is_in_alarm_area = point_inside_polygon(center, alarm_polygon_px)

                    # Dangerous event?
                    is_dangerous = False
                    if (
                        obj_type in {"pedestrian", "vehicle"}
                        and is_in_alarm_area
                        and rel_speed_kmh is not None
                        and rel_speed_kmh >= REL_SPEED_THRESHOLD_KMH
                        and conf >= MIN_EVENT_CONFIDENCE
                    ):
                        # Enforce cooldown
                        if last_event_time_sec is None or (
                            current_time_sec - last_event_time_sec >= DANGER_COOLDOWN_SECONDS
                        ):
                            is_dangerous = True
                            # Log event
                            event_timestamp = (start_datetime + timedelta(seconds=current_time_sec)).strftime(
                                TIMESTAMP_FORMAT
                            )
                            events_entry = {
                                "id": next_event_id,
                                "timestamp": event_timestamp,
                                "type": obj_type,
                                "confidence": float(conf),
                                "ego_speed_kmh": float(ego_speed_kmh),
                                "object_speed_kmh": float(speed_kmh) if speed_kmh is not None else None,
                                "relative_speed_kmh": float(rel_speed_kmh),
                            }
                            dangerous_events.append(events_entry)
                            print(f"[EVENT] Dangerous event #{next_event_id} @ {event_timestamp} ({obj_type})")
                            next_event_id += 1
                            last_event_time_sec = current_time_sec
                            danger_overlay_until_sec = current_time_sec + DANGER_OVERLAY_DURATION_SECONDS

                    det_rec = DetectionRecord(
                        track_id=tid,
                        cls_id=int(cls_id),
                        obj_type=obj_type,
                        conf=float(conf),
                        bbox_xyxy=(float(x1), float(y1), float(x2), float(y2)),
                        center_xy=center,
                        speed_kmh=speed_kmh,
                        rel_speed_kmh=rel_speed_kmh,
                        is_in_alarm_area=is_in_alarm_area,
                        is_dangerous=is_dangerous,
                    )
                    new_detections.append(det_rec)

                # Update heatmap & time profile using new detections
                for det in new_detections:
                    cx, cy = det.center_xy
                    gx = int(np.clip(cx / frame_w * HEATMAP_GRID_W, 0, HEATMAP_GRID_W - 1))
                    gy = int(np.clip(cy / frame_h * HEATMAP_GRID_H, 0, HEATMAP_GRID_H - 1))
                    heatmap[gy, gx] += 1.0

                window_idx = int(current_time_sec / PROFILE_WINDOW_SECONDS)
                time_profile_counts[window_idx] += len(new_detections)

            cached_detections = new_detections

        # ========================
        # OVERLAY VISUALS (for every frame)
        # ========================

        # Copy frame for drawing
        annotated_frame = frame.copy()

        # Draw alarm polygon (highlighted if a danger has just been detected on this frame)
        highlight_polygon = any(det.is_dangerous for det in cached_detections)
        draw_alarm_polygon(annotated_frame, alarm_polygon_px, highlight=highlight_polygon)

        # Draw cached detections (latest analyzed frame)
        draw_detections(annotated_frame, cached_detections, class_names)

        # Timestamp and ego speed (estimated)
        draw_timestamp_and_speed(annotated_frame, timestamp_str, ego_speed_kmh)

        # Danger overlay (if within overlay duration)
        if current_time_sec <= danger_overlay_until_sec:
            draw_danger_overlay(annotated_frame)

        # Write frame
        writer.write(annotated_frame)

        # Optional preview window (typically off in Colab)
        if SHOW_PREVIEW:
            cv2.imshow("Annotated", annotated_frame)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

        # Progress print
        if frame_idx % 100 == 0:
            if total_frames > 0:
                pct = 100.0 * frame_idx / total_frames
                print(f"[INFO] Processed {frame_idx}/{total_frames} frames ({pct:.1f}%)")
            else:
                print(f"[INFO] Processed {frame_idx} frames")

        frame_idx += 1

    # Cleanup
    cap.release()
    writer.release()
    if SHOW_PREVIEW:
        cv2.destroyAllWindows()

    # ========================
    # SAVE OUTPUTS
    # ========================

    # Dangerous events JSON
    with open(events_json_path, "w", encoding="utf-8") as f:
        json.dump(dangerous_events, f, indent=2)
    print(f"[INFO] Saved dangerous events JSON -> {events_json_path}")

    # Time profile JSON (objects per time window)
    profile_list = []
    for window_idx, count in sorted(time_profile_counts.items()):
        window_start_sec = window_idx * PROFILE_WINDOW_SECONDS
        window_end_sec = (window_idx + 1) * PROFILE_WINDOW_SECONDS
        window_start_ts = (start_datetime + timedelta(seconds=window_start_sec)).strftime(TIMESTAMP_FORMAT)
        window_end_ts = (start_datetime + timedelta(seconds=window_end_sec)).strftime(TIMESTAMP_FORMAT)
        profile_list.append(
            {
                "window_index": int(window_idx),
                "window_start_timestamp": window_start_ts,
                "window_end_timestamp": window_end_ts,
                "detections": int(count),
                "detections_per_second": float(count / PROFILE_WINDOW_SECONDS),
            }
        )

    with open(time_profile_path, "w", encoding="utf-8") as f:
        json.dump(profile_list, f, indent=2)
    print(f"[INFO] Saved time profile JSON -> {time_profile_path}")

    # Heatmap image
    if heatmap.max() > 0:
        heatmap_norm = (heatmap / heatmap.max() * 255.0).astype(np.uint8)
    else:
        heatmap_norm = heatmap.astype(np.uint8)

    heatmap_resized = cv2.resize(
        heatmap_norm, (frame_w, frame_h), interpolation=cv2.INTER_NEAREST
    )
    heatmap_color = cv2.applyColorMap(heatmap_resized, cv2.COLORMAP_JET)
    cv2.imwrite(heatmap_path, heatmap_color)
    print(f"[INFO] Saved spatial heatmap image -> {heatmap_path}")

    print(f"[INFO] Total pedestrians (frame-level): {total_pedestrians}")
    print(f"[INFO] Total vehicles (frame-level): {total_vehicles}")
    print(f"[INFO] Annotated video saved to -> {output_video_path}")


if __name__ == "__main__":
    main()


[INFO] Video: ./video.mp4
       Resolution: 1920x1080, FPS: 60.04, Frames: 596
[INFO] Using device: cuda
[INFO] Processed 0/596 frames (0.0%)
[INFO] Processed 100/596 frames (16.8%)
[INFO] Processed 200/596 frames (33.6%)
[INFO] Processed 300/596 frames (50.3%)
[INFO] Processed 400/596 frames (67.1%)
[INFO] Processed 500/596 frames (83.9%)
[EVENT] Dangerous event #1 @ 20-11-2025 08:00:08 (pedestrian)
[INFO] Saved dangerous events JSON -> ./dangerous_events.json
[INFO] Saved time profile JSON -> ./time_profile.json
[INFO] Saved spatial heatmap image -> ./heatmap.png
[INFO] Total pedestrians (frame-level): 59
[INFO] Total vehicles (frame-level): 1039
[INFO] Annotated video saved to -> ./output_annotated.mp4
