In [None]:
!pip install ultralytics supervision numpy opencv-python-headless
!pip install --upgrade supervision ultralytics



In [None]:
import cv2
import numpy as np
import supervision as sv
from ultralytics import YOLO
import os


VIDEO_URL = "https://media.roboflow.com/supervision/video-examples/people-walking.mp4"
VIDEO_FILE = "people_walking.mp4"
OUTPUT_VIDEO_FILE = "output_people_flow_final.mp4"
HEATMAP_FILE = "final_heatmap.png"

# Download the video
print(f"\nDownloading {VIDEO_FILE}...")
os.system(f"wget -q {VIDEO_URL} -O {VIDEO_FILE}")
if not os.path.exists(VIDEO_FILE):
    raise FileNotFoundError("Download failed.")
print("Download complete.")



Downloading people_walking.mp4...
Download complete.


In [None]:

# Initialize Model and Tracker
MODEL = YOLO("yolov8n.pt")
TRACKER = sv.ByteTrack()

BOX_ANNOTATOR = getattr(sv, "BoundingBoxAnnotator", sv.BoxAnnotator)(thickness=2)
LABEL_ANNOTATOR = sv.LabelAnnotator(text_scale=1, text_thickness=2)

# Heatmap initialization (supported params only)
ALL_CENTERS = []
HEATMAP_ANNOTATOR = sv.HeatMapAnnotator(
    opacity=0.4,
    kernel_size=25
)
# --- 3
FRAME_WIDTH = 1280
FRAME_HEIGHT = 720

LINE_1_Y = 250
LINE_2_Y = 550

LINE_1 = (sv.Point(x=0, y=LINE_1_Y), sv.Point(x=FRAME_WIDTH, y=LINE_1_Y))
LINE_2 = (sv.Point(x=0, y=LINE_2_Y), sv.Point(x=FRAME_WIDTH, y=LINE_2_Y))

# Valid Position enum anchors (corners) for LineZone
anchors_all = [
    sv.Position.TOP_LEFT,
    sv.Position.TOP_RIGHT,
    sv.Position.BOTTOM_LEFT,
    sv.Position.BOTTOM_RIGHT
]

line_zone_in = sv.LineZone(
    start=LINE_1[0],
    end=LINE_1[1],
    triggering_anchors=anchors_all
)
line_zone_out = sv.LineZone(
    start=LINE_2[0],
    end=LINE_2[1],
    triggering_anchors=anchors_all
)

print("Line Zones Initialized.")

# --- 4.  ---
def centers_to_detections(centers_xy: np.ndarray, eps: float = 1.0) -> sv.Detections:
    if centers_xy.size == 0:
        return sv.Detections(xyxy=np.zeros((0, 4), dtype=float))
    x = centers_xy[:, 0]
    y = centers_xy[:, 1]
    xyxy = np.column_stack([x - eps, y - eps, x + eps, y + eps]).astype(float)
    return sv.Detections(xyxy=xyxy)


[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2MB 111.7MB/s 0.1s
Line Zones Initialized.


In [None]:

# --- 5
def process_frame(frame: np.ndarray, model: YOLO) -> np.ndarray:
    global ALL_CENTERS

    # 1. Detection
    results = model(frame, verbose=False)[0]
    detections = sv.Detections.from_ultralytics(results)

    # Keep only 'person' (COCO class 0)
    if detections.xyxy.shape[0] > 0:
        detections = detections[detections.class_id == 0]
    else:
        detections = detections

    # 2. Tracking (public API)
    detections = TRACKER.update_with_detections(detections)

    # 3. Counting
    line_zone_in.trigger(detections=detections)
    line_zone_out.trigger(detections=detections)

    # 4. Heatmap centers via anchors
    if detections.xyxy.shape[0] > 0:
        centers = detections.get_anchors_coordinates(anchor=sv.Position.CENTER)
        ALL_CENTERS.extend(centers.tolist())

    # 5. Draw boxes and labels
    tids = getattr(detections, "tracker_id", None)
    if tids is None:
        tids_list = []
    else:
        tids_list = [int(t) for t in np.asarray(tids).tolist()]

    labels = [f"#{t}" for t in tids_list]

    annotated_frame = BOX_ANNOTATOR.annotate(scene=frame.copy(), detections=detections)
    annotated_frame = LABEL_ANNOTATOR.annotate(scene=annotated_frame, detections=detections, labels=labels)

    # 6. Draw lines and counters
    annotated_frame = sv.draw_line(scene=annotated_frame, start=LINE_1[0], end=LINE_1[1], color=sv.Color.BLUE, thickness=2)
    sv.draw_text(scene=annotated_frame, text=f"IN: {line_zone_in.in_count}", text_anchor=sv.Point(10, LINE_1_Y - 30), background_color=sv.Color.BLUE)

    annotated_frame = sv.draw_line(scene=annotated_frame, start=LINE_2[0], end=LINE_2[1], color=sv.Color.RED, thickness=2)
    sv.draw_text(scene=annotated_frame, text=f"OUT: {line_zone_out.out_count}", text_anchor=sv.Point(10, LINE_2_Y + 10), background_color=sv.Color.RED)

    return annotated_frame

# --- 6
print(f"\nStarting video processing. Output: {OUTPUT_VIDEO_FILE}")
cap = cv2.VideoCapture(VIDEO_FILE)
if not cap.isOpened():
    raise FileNotFoundError(f"Could not open video file: {VIDEO_FILE}")

frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
if not fps or fps <= 1 or np.isnan(fps):
    fps = 25.0

frame_shape = (frame_height, frame_width, 3)

fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(OUTPUT_VIDEO_FILE, fourcc, float(fps), (frame_width, frame_height))

frame_count = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    processed_frame = process_frame(frame, MODEL)
    out.write(processed_frame)

    frame_count += 1
    if frame_count % 300 == 0:
        print(f"Processed {frame_count} frames...")

cap.release()
out.release()
print(f"Video processing complete. Saved to {OUTPUT_VIDEO_FILE}")

# --- 7
def generate_final_heatmap(shape: tuple):
    if len(ALL_CENTERS) == 0:
        print("No movement data collected. Skipping heatmap.")
        return

    background = np.zeros(shape, dtype=np.uint8)
    all_centers_np = np.array(ALL_CENTERS, dtype=float)

    try:
        heatmap_image = HEATMAP_ANNOTATOR.annotate(cene=background, points=all_centers_np)
    except TypeError:
        dummy_detections = centers_to_detections(all_centers_np, eps=1.0)
        heatmap_image = HEATMAP_ANNOTATOR.annotate(scene=background, detections=dummy_detections)

    cv2.imwrite(HEATMAP_FILE, heatmap_image)
    print(f"\nFinal Heatmap saved as {HEATMAP_FILE}")
    print(f"Total IN count: {line_zone_in.in_count}")
    print(f"Total OUT count: {line_zone_out.out_count}")

generate_final_heatmap(frame_shape)



Starting video processing. Output: output_people_flow_final.mp4
Processed 300 frames...
Video processing complete. Saved to output_people_flow_final.mp4

Final Heatmap saved as final_heatmap.png
Total IN count: 6
Total OUT count: 6
