# People Flow Detection using Object Tracking & Heatmap Visualization

In [None]:
!pip install supervision ultralytics roboflow pytesseract rfdetr inference

In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
import sys
from IPython.display import Video

# Settings
SLOWDOWN_MS = 50  # not needed for Colab but kept for consistency
VIDEO_PATH = 'https://media.roboflow.com/supervision/video-examples/people-walking.mp4'
MODEL_PATH = 'yolo11x.pt'     # YOLO model
#MODEL_PATH = "yolov8n.pt"  # YOLOv8 small model
OUTPUT_PATH = "people_count_output.mp4"

# Annotation coordinates
ANNOT_LINE_IN_START = (5, 380)
ANNOT_LINE_IN_END = (1915, 380)
ANNOT_LINE_OUT_START = (5, 720)
ANNOT_LINE_OUT_END = (1915, 720)
ANNOT_W, ANNOT_H = 1920, 1080


# Person class ID in COCO dataset
PERSON_CLASS_ID = 0

In [None]:
def process_video(video_path=VIDEO_PATH, output_path=OUTPUT_PATH):
    try:
        model = YOLO(MODEL_PATH)
    except Exception as e:
        print(f"\n[ERROR] Failed to load YOLO model from '{MODEL_PATH}'.")
        print("Error details:", e)
        sys.exit(1)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Failed to open video file.")
        return

    # Read first frame to get size
    ret, frame = cap.read()
    if not ret:
        print("Failed to read first frame.")
        return
    orig_h, orig_w = frame.shape[:2]

    # Scale annotation coordinates
    def scale_point(pt):
        x, y = pt
        return (int(x * orig_w / ANNOT_W), int(y * orig_h / ANNOT_H))

    LINE_IN_START = scale_point(ANNOT_LINE_IN_START)
    LINE_IN_END = scale_point(ANNOT_LINE_IN_END)
    LINE_OUT_START = scale_point(ANNOT_LINE_OUT_START)
    LINE_OUT_END = scale_point(ANNOT_LINE_OUT_END)

    # Prepare output writer
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, 20, (960, 540))

    # Reset video to start
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

    in_count, out_count = 0, 0
    already_counted_in, already_counted_out = set(), set()
    track_history = {}

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        results = model.track(frame, persist=True, tracker="botsort.yaml", verbose=False)[0]
        boxes = results.boxes
        if boxes is None or boxes.xyxy is None:
            display_frame = cv2.resize(frame, (960, 540))
            out.write(display_frame)
            continue

        class_ids = boxes.cls.cpu().numpy().astype(int)
        xyxy = boxes.xyxy.cpu().numpy()
        track_ids = boxes.id.cpu().numpy().astype(int) if boxes.id is not None else [None]*len(xyxy)
        person_indices = [i for i, cid in enumerate(class_ids) if cid == PERSON_CLASS_ID]

        # Draw lines
        cv2.line(frame, LINE_IN_START, LINE_IN_END, (255, 0, 0), 2)
        cv2.line(frame, LINE_OUT_START, LINE_OUT_END, (0, 0, 255), 2)

        for idx in person_indices:
            x1, y1, x2, y2 = map(int, xyxy[idx])
            track_id = track_ids[idx]
            center = ((x1 + x2) // 2, (y1 + y2) // 2)

            if track_id not in track_history:
                track_history[track_id] = []
            track_history[track_id].append(center)
            if len(track_history[track_id]) > 2:
                track_history[track_id] = track_history[track_id][-2:]

            if len(track_history[track_id]) == 2:
                prev_center, curr_center = track_history[track_id]

                # Line IN
                if (LINE_IN_START[0] <= prev_center[0] <= LINE_IN_END[0] and
                    LINE_IN_START[0] <= curr_center[0] <= LINE_IN_END[0]):
                    if prev_center[1] < LINE_IN_START[1] <= curr_center[1] and track_id not in already_counted_in:
                        in_count += 1
                        already_counted_in.add(track_id)

                # Line OUT
                if (LINE_OUT_START[0] <= prev_center[0] <= LINE_OUT_END[0] and
                    LINE_OUT_START[0] <= curr_center[0] <= LINE_OUT_END[0]):
                    if prev_center[1] > LINE_OUT_START[1] >= curr_center[1] and track_id not in already_counted_out:
                        out_count += 1
                        already_counted_out.add(track_id)

            # Draw bbox + ID
            cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)
            cv2.circle(frame, center, 5, (0, 255, 0), -1)
            cv2.putText(frame, f'ID {track_id}', (center[0] - 10, center[1] - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2)

        # Counts
        cv2.putText(frame, f'In: {in_count}', (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 2.2, (0,255,0), 5)
        cv2.putText(frame, f'Out: {out_count}', (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 2.2, (0,0,255), 5)

        display_frame = cv2.resize(frame, (960, 540))
        out.write(display_frame)

    cap.release()
    out.release()
    print(f"✅ Processing complete. Saved to {output_path}")

# Run
process_video(VIDEO_PATH, "people_count_output.mp4")

In [None]:
import supervision as sv

# Settings
OUTPUT_PATH = 'heatmap_output.mp4'

def process_heatmap(video_path=VIDEO_PATH, output_path=OUTPUT_PATH):
    # Load YOLO model
    model = YOLO(MODEL_PATH)
    cap = cv2.VideoCapture(video_path)

    if not cap.isOpened():
        print("❌ Failed to open video file.")
        return

    # Read first frame to get frame size
    ret, frame = cap.read()
    if not ret:
        print("❌ Failed to read first frame.")
        return

    orig_h, orig_w = frame.shape[:2]

    # Initialize Supervision Heatmap Annotator
    heatmap_annotator = sv.HeatMapAnnotator(
        opacity=0.6,
        radius=40,
        kernel_size=25,
        top_hue=0,       # red (high density)
        low_hue=120      # blue (low density)
    )

    # Prepare output writer
    out = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*"mp4v"), 20, (orig_w, orig_h))

    # Reset video to start
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    frame_count = 0

    print("🚀 Generating heatmap...")

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Detect people
        results = model(frame, verbose=False)[0]
        detections = sv.Detections.from_ultralytics(results)

        # Keep only 'person' class (ID = 0 in COCO)
        mask = detections.class_id == 0  # person only
        detections = detections[mask]

        # Annotate with heatmap
        frame_with_heatmap = heatmap_annotator.annotate(scene=frame.copy(), detections=detections)

        out.write(frame_with_heatmap)
        frame_count += 1

        # Progress logging
        if frame_count % 50 == 0:
            print(f"Processed {frame_count} frames...")

    cap.release()
    out.release()
    print(f"✅ Heatmap video saved to {output_path}")

# Run
process_heatmap()