In [1]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.130-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.14-py3-none-any.whl.metadata (9.4 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading n

In [13]:
# needed libraries
import cv2
import numpy as np
from ultralytics import YOLO
import os
from collections import defaultdict
import random

In [14]:
# download the model
model = YOLO('yolov8x-pose.pt')

In [15]:
# function of drawing the routes
def draw_routes(frame, routes, colors):
    for person_id, points in routes.items():
        color = colors[person_id % len(colors)]
        for i in range(1, len(points)):
            cv2.line(frame, points[i-1], points[i], color, 2)
    return frame

In [16]:
# function to make distinct colors
def get_distinct_colors(n):
    distinct_colors = [
        (255, 0, 0),
        (0, 255, 0),
        (0, 0, 255),
        (255, 255, 0),
        (255, 0, 255),
        (0, 255, 255),
        (128, 0, 0),
        (0, 128, 0),
        (0, 0, 128),
        (128, 128, 0)
    ]
    if n > len(distinct_colors):
        for i in range(len(distinct_colors), n):
            color = (random.randint(0, 255), random.randint(0, 255), random.randint(0, 255))
            distinct_colors.append(color)
    return distinct_colors

In [17]:
def predict_next_position(positions, velocities):

    # Return last known position if not enough history
    if len(positions) < 2:
        return positions[-1] if positions else None

    # Calculate average velocity from last 3 frames if available
    if len(velocities) >= 3:
        recent_vx = sum(v[0] for v in velocities[-3:]) / 3
        recent_vy = sum(v[1] for v in velocities[-3:]) / 3
    else:

        # Use most recent velocity if less than 3 frames of data
        recent_vx, recent_vy = velocities[-1]

    # Calculate predicted position using last position + velocity
    last_x, last_y = positions[-1]
    return (int(last_x + recent_vx), int(last_y + recent_vy))

In [18]:
def track_and_draw(input_path, output_path, track_multiple=False, conf_threshold=0.7):

    # Initialize video capture and writer
    cap = cv2.VideoCapture(input_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Set up video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    # Initialize tracking variables
    MAX_PERSONS = 3 # maximum number of persons in the video
    colors = get_distinct_colors(MAX_PERSONS) # Get unique colors for each person
    color_map = {} # Maps person ID to color
    routes = defaultdict(list) # Stores movement paths
    positions = defaultdict(list) # Stores position history
    velocities = defaultdict(list) # Stores velocity history
    lost_frames = defaultdict(int) # Counts frames where person is lost
    initial_ids = set() # Stores IDs of initially detected people

    MAX_LOST_FRAMES = 45 # to keep tracking lost person
    frame_count = 0
    active_tracks = set()

    while cap.isOpened():
        ret, frame = cap.read() # Read a frame from video
        if not ret: # End of video
            break

        frame_count += 1
        current_tracks = set() # Store currently detected people IDs

        # Run YOLO model
        results = model.track(frame,
                            persist=True, # Maintain tracking between frames
                            classes=0,  # Only detect persons (class 0)
                            conf=conf_threshold, # Minimum detection confidence
                            iou=0.7, # Intersection over Union threshold
                            max_det=MAX_PERSONS) # Maximum detections per frame

        # Process detection results
        if results[0].boxes.id is not None:
            # Extract detection information
            boxes = results[0].boxes.xywh.cpu() # bounding box
            track_ids = results[0].boxes.id.cpu().numpy().astype(int) #unique IDs
            confidences = results[0].boxes.conf.cpu().numpy() # Detection confidences

            # Sort detections by confidence (highest first)
            sorted_indices = np.argsort(confidences)[::-1]
            if len(sorted_indices) > MAX_PERSONS:
                sorted_indices = sorted_indices[:MAX_PERSONS]

            # Initialize tracking IDs for new people
            if len(initial_ids) < MAX_PERSONS:
                for idx in sorted_indices:
                    if len(initial_ids) < MAX_PERSONS and track_ids[idx] not in initial_ids:
                        initial_ids.add(track_ids[idx])

            # Process each detected person
            for idx in sorted_indices:
                box = boxes[idx]
                track_id = track_ids[idx]
                conf = confidences[idx]

                # Handle ID reassignment for lost tracks
                if track_id not in initial_ids and len(initial_ids) == MAX_PERSONS:
                    unused_ids = initial_ids - current_tracks
                    if unused_ids:
                        x, y, w, h = map(int, box)
                        min_dist = float('inf')
                        best_id = None

                        # Find closest matching previous track
                        for unused_id in unused_ids:
                            if positions[unused_id]:
                                prev_x, prev_y = positions[unused_id][-1]
                                dist = np.sqrt((x - prev_x)**2 + (y - prev_y)**2)
                                if dist < min_dist:
                                    min_dist = dist
                                    best_id = unused_id

                        if best_id is not None:
                            track_id = best_id

                # Update position tracking
                x, y, w, h = map(int, box)
                current_tracks.add(track_id)

                # Store position history (limited to 30 frames)
                positions[track_id].append((x, y))
                if len(positions[track_id]) > 30:
                    positions[track_id].pop(0)


                # Smooth position using 3-frame moving average
                if len(positions[track_id]) >= 3:
                    x = int(sum(p[0] for p in positions[track_id][-3:]) / 3)
                    y = int(sum(p[1] for p in positions[track_id][-3:]) / 3)

                # Assign colors to tracks
                if track_id not in color_map and track_id in initial_ids:
                    color_map[track_id] = colors[len(color_map)]
                color = color_map.get(track_id, colors[0])

                # Draw bounding box
                cv2.rectangle(frame,
                            (x-int(w/2), y-int(h/2)),
                            (x+int(w/2), y+int(h/2)),
                            color, 2)

                # Add person label with confidence score
                label = f"Person {track_id} ({conf:.2f})"
                cv2.putText(frame, label,
                           (x-int(w/2), y-int(h/2)-10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,0,0), 3)
                cv2.putText(frame, label,
                           (x-int(w/2), y-int(h/2)-10),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)

                # Update tracking data
                routes[track_id].append((x, y))
                lost_frames[track_id] = 0

        # Handle lost tracks
        lost_tracks = active_tracks - current_tracks
        for track_id in lost_tracks:
            if track_id in initial_ids:
                lost_frames[track_id] += 1

                # Predict position for temporarily lost tracks
                if lost_frames[track_id] <= MAX_LOST_FRAMES:
                    if track_id in velocities and velocities[track_id]:
                        predicted_pos = predict_next_position(positions[track_id], velocities[track_id])
                        if predicted_pos:
                            x, y = predicted_pos
                            color = color_map.get(track_id, colors[0])
                            cv2.circle(frame, (x, y), 4, color, -1)
                            routes[track_id].append((x, y))

        # Draw movement paths
        for track_id, points in routes.items():
            if track_id in initial_ids and len(points) > 1:
                color = color_map.get(track_id, colors[0])
                for i in range(1, len(points)):
                    cv2.line(frame, points[i-1], points[i], color, 2)

        # Add frame info overlay
        cv2.putText(frame,
                    f"Frame: {frame_count} | Active Tracks: {len(current_tracks & initial_ids)}",
                    (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255,255,255), 2)

        # Add person status overlay
        y_offset = 60
        for track_id in sorted(initial_ids):
            if track_id in color_map:
                status = "Active" if track_id in current_tracks else f"Lost ({lost_frames[track_id]})"
                cv2.putText(frame, f"Person {track_id} - {status}",
                           (10, y_offset),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, color_map[track_id], 2)
                y_offset += 20

        # Write frame to output video
        out.write(frame)
        active_tracks = current_tracks

    cap.release()
    out.release()

In [19]:
track_and_draw('/content/20250510_161515.mp4', 'multi_person_trackingfinal.mp4', track_multiple=True)

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
0: 384x640 3 persons, 33.6ms
Speed: 2.8ms preprocess, 33.6ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 31.3ms
Speed: 2.7ms preprocess, 31.3ms inference, 2.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 31.3ms
Speed: 6.1ms preprocess, 31.3ms inference, 2.0ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 31.3ms
Speed: 4.2ms preprocess, 31.3ms inference, 2.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 30.7ms
Speed: 2.3ms preprocess, 30.7ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 31.6ms
Speed: 2.3ms preprocess, 31.6ms inference, 2.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 32.8ms
Speed: 2.2ms preprocess, 32.8ms inference, 2.1ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 3 persons, 32.1ms
Speed: 2.4ms prep

In [20]:
# download the video
from google.colab import files
files.download('/content/multi_person_trackingfinal.mp4')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>