In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.216-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.17-py3-none-any.whl.metadata (14 kB)
Downloading ultralytics-8.3.216-py3-none-any.whl (1.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m11.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.17-py3-none-any.whl (28 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.216 ultralytics-thop-2.0.17


In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
import sys
from IPython.display import Video
from ultralytics.solutions.heatmap import Heatmap

# Settings
VIDEO_PATH = 'https://media.roboflow.com/supervision/video-examples/people-walking.mp4'   # Upload video
MODEL_PATH = 'yolo11n.pt'     # YOLO model

# Annotation coordinates (based on 1127x622 reference frame)
ANNOT_LINE_IN_START = (2, 529)
ANNOT_LINE_IN_END = (1125, 516)
ANNOT_LINE_OUT_START = (0, 570)
ANNOT_LINE_OUT_END = (1125, 557)
ANNOT_W, ANNOT_H = 1127, 622


# Person class in COCO dataset
PERSON_CLASS_ID = [0]

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [None]:
import cv2
import sys
import numpy as np
from ultralytics import YOLO
from collections import defaultdict

def process_video(video_path=VIDEO_PATH, output_path="output.mp4", output_image="last_frame.jpg"):
    try:
        model = YOLO(MODEL_PATH)
    except Exception as e:
        print(f"\n[ERROR] Failed to load YOLO model from '{MODEL_PATH}'.")
        print("Error details:", e)
        sys.exit(1)

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print("Failed to open video file.")
        return

    # Read first frame to get size
    ret, frame = cap.read()
    if not ret:
        print("Failed to read first frame.")
        return
    orig_h, orig_w = frame.shape[:2]

    # Scale annotation coordinates
    def scale_point(pt):
        x, y = pt
        return (int(x * orig_w / ANNOT_W), int(y * orig_h / ANNOT_H))

    LINE_IN_START = scale_point(ANNOT_LINE_IN_START)
    LINE_IN_END = scale_point(ANNOT_LINE_IN_END)
    LINE_OUT_START = scale_point(ANNOT_LINE_OUT_START)
    LINE_OUT_END = scale_point(ANNOT_LINE_OUT_END)

    # Initialize heatmap data
    heatmap_data = np.zeros((orig_h, orig_w), dtype=np.float32)
    decay_factor = 0.99

    # Prepare output writer
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, 20, (960, 540))

    # Reset video to start
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

    in_count, out_count = 0, 0
    already_counted_in, already_counted_out = set(), set()
    track_history = {}

    # Variable to store last processed frame
    last_display_frame = None

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Decay heatmap over time
        heatmap_data *= decay_factor

        results = model.track(frame, persist=True, tracker="botsort.yaml", verbose=False)[0]
        boxes = results.boxes

        if boxes is not None and boxes.xyxy is not None:
            class_ids = boxes.cls.cpu().numpy().astype(int)
            xyxy = boxes.xyxy.cpu().numpy()
            track_ids = boxes.id.cpu().numpy().astype(int) if boxes.id is not None else [None]*len(xyxy)
            vehicle_indices = [i for i, cid in enumerate(class_ids) if cid in PERSON_CLASS_ID]

            # Update heatmap with detections
            for idx in vehicle_indices:
                x1, y1, x2, y2 = map(int, xyxy[idx])
                track_id = track_ids[idx]
                center = ((x1 + x2) // 2, (y1 + y2) // 2)

                # Add heat to heatmap (gaussian-like spread)
                cv2.circle(heatmap_data, center, 30, 1.0, -1)

                if track_id not in track_history:
                    track_history[track_id] = []
                track_history[track_id].append(center)
                if len(track_history[track_id]) > 2:
                    track_history[track_id] = track_history[track_id][-2:]

                if len(track_history[track_id]) == 2:
                    prev_center, curr_center = track_history[track_id]

                    # Line IN
                    if (LINE_IN_START[0] <= prev_center[0] <= LINE_IN_END[0] and
                        LINE_IN_START[0] <= curr_center[0] <= LINE_IN_END[0]):
                        if prev_center[1] < LINE_IN_START[1] <= curr_center[1] and track_id not in already_counted_in:
                            in_count += 1
                            already_counted_in.add(track_id)

                    # Line OUT
                    if (LINE_OUT_START[0] <= prev_center[0] <= LINE_OUT_END[0] and
                        LINE_OUT_START[0] <= curr_center[0] <= LINE_OUT_END[0]):
                        if prev_center[1] > LINE_OUT_START[1] >= curr_center[1] and track_id not in already_counted_out:
                            out_count += 1
                            already_counted_out.add(track_id)

                # Draw bbox + ID
                cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 255), 2)
                cv2.circle(frame, center, 5, (0, 255, 0), -1)
                cv2.putText(frame, f'ID {track_id}', (center[0] - 10, center[1] - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2)

        # Apply heatmap overlay
        heatmap_normalized = cv2.normalize(heatmap_data, None, 0, 255, cv2.NORM_MINMAX)
        heatmap_colored = cv2.applyColorMap(heatmap_normalized.astype(np.uint8), cv2.COLORMAP_JET)
        frame = cv2.addWeighted(frame, 0.7, heatmap_colored, 0.3, 0)

        # Draw lines
        cv2.line(frame, LINE_IN_START, LINE_IN_END, (255, 0, 0), 2)
        cv2.line(frame, LINE_OUT_START, LINE_OUT_END, (0, 0, 255), 2)

        # Counts
        cv2.putText(frame, f'In: {in_count}', (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 2.2, (0,255,0), 5)
        cv2.putText(frame, f'Out: {out_count}', (10, 120), cv2.FONT_HERSHEY_SIMPLEX, 2.2, (0,0,255), 5)

        display_frame = cv2.resize(frame, (960, 540))
        out.write(display_frame)

        # Store the current frame as the last frame
        last_display_frame = display_frame.copy()

    cap.release()
    out.release()

    # Save the last frame as an image
    if last_display_frame is not None:
        cv2.imwrite(output_image, last_display_frame)
        print(f"Last frame saved to {output_image}")

    print(f"Processing complete. Saved to {output_path}")

In [None]:
process_video(VIDEO_PATH, "output.mp4", "final_frame.png")

[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo11n.pt to 'yolo11n.pt': 100% ━━━━━━━━━━━━ 5.4MB 19.7MB/s 0.3s
[31m[1mrequirements:[0m Ultralytics requirement ['lap>=0.5.12'] not found, attempting AutoUpdate...

[31m[1mrequirements:[0m AutoUpdate success ✅ 0.8s

Last frame saved to final_frame.png
Processing complete. Saved to output.mp4
