# Install necessary packages

##1-YOLO from Ultralytics
##2- Deep SORT for tracking

In [None]:
!pip install -q ultralytics
!pip install -q deep_sort_realtime

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m30.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m39.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m54.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m39.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m22.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

# Import necessary libraries


In [22]:
import warnings
warnings.filterwarnings(
    "ignore",
    message=r".*torch\.cuda\.amp\.autocast.*deprecated.*",
    category=FutureWarning
)

import cv2
import torch
import numpy as np
from ultralytics import YOLO
import math


# Define input and output video paths


In [18]:

# Paths to input and output videos
input_video_path = '/content/input.mp4'
output_video_path = 'output_video.mp4'

# Set detection and tracking parameters

In [19]:
confidence_threshold = 0.3
max_people = 3
distance_threshold = 50  # max distance to consider same person

# colors
available_colors = [
    (255, 0, 0),     # Blue
    (0, 255, 0),     # Green
    (0, 0, 255),     # Red
]


# Initialize YOLOv8 model

In [20]:
model = YOLO('yolov8x.pt')
model.fuse()
model.conf = confidence_threshold


YOLOv8x summary (fused): 112 layers, 68,200,608 parameters, 0 gradients, 257.8 GFLOPs


# Open the input video and prepare the output video writer

In [21]:
# open input video
cap = cv2.VideoCapture(input_video_path)
if not cap.isOpened():
    raise IOError(f"Couldn't open video file: {input_video_path}")
# get frame rate of input video
fps = cap.get(cv2.CAP_PROP_FPS)
# get width and height of video frames
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

out = cv2.VideoWriter(output_video_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (width, height))

# Prepare lists for tracking people and assigning IDs

In [None]:
assigned_ids = {}
paths = {}
yolo_to_custom_id = {}

# Define helper functions for distance calculation and ID matching

In [23]:
# Helper function

def euclidean_distance(p1, p2):
    return math.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)

# find the nearest available stable ID to the current center point (cx, cy)
# used to maintain consistent tracking IDs across frames
def get_nearest_assigned_id(cx, cy):
    min_dist = float('inf')
    best_id = None
    # iterate on currently assigned IDs and their positions
    for aid, pos in assigned_ids.items():
        dist = euclidean_distance((cx, cy), pos)
        # check if distance within threshold and ID is not already taken in this frame
        if dist < distance_threshold and aid not in yolo_to_custom_id.values():
          # if this the closest match so update best_id
            if dist < min_dist:
                min_dist = dist
                best_id = aid
    return best_id

# video processing and tracking

In [24]:
while True:
    ret, frame = cap.read()
    if not ret:
        break
    # perform tracking on the current frame
    results = model.track(source=frame, persist=True, verbose=False)[0]

    frame_seen_ids = set()  # track which stable IDs visible in this frame

    if results and results.boxes is not None:
        for box in results.boxes:
            class_id = int(box.cls[0])
            if class_id != 0:
                continue  # only persons

            x1, y1, x2, y2 = map(int, box.xyxy[0])
            # get YOLO's internal track ID
            track_id = int(box.id[0]) if box.id is not None else -1
            if track_id == -1:
                continue
            # Compute bottom center point of bounding box
            cx, cy = (x1 + x2) // 2, y2

            # If this YOLO track id already mapped, use it
            if track_id in yolo_to_custom_id:
                stable_id = yolo_to_custom_id[track_id]
                assigned_ids[stable_id] = (cx, cy)  # update position
            else:
                # try to match to previous stable IDs
                stable_id = get_nearest_assigned_id(cx, cy)

                if stable_id is not None:
                    yolo_to_custom_id[track_id] = stable_id
                    assigned_ids[stable_id] = (cx, cy)
                elif len(assigned_ids) < max_people:
                    new_id = len(assigned_ids)
                    assigned_ids[new_id] = (cx, cy)
                    yolo_to_custom_id[track_id] = new_id
                    stable_id = new_id
                else:
                    continue
            # mark this stable ID as seen in this frame
            frame_seen_ids.add(stable_id)
            # select color based on stable ID
            color = available_colors[stable_id]

            paths.setdefault(stable_id, []).append((cx, cy))
            # draw bounding box and label on frame
            cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
            cv2.putText(frame, f'Person {stable_id}', (x1, y1 - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
    # remove stale YOLO track IDs disappeared in the current frame
    current_track_ids = [int(box.id[0]) for box in results.boxes] if results.boxes is not None else []
    yolo_to_custom_id = {tid: sid for tid, sid in yolo_to_custom_id.items() if tid in current_track_ids}

    # draw motion paths for each tracked person
    for pid, pts in paths.items():
        if pid not in assigned_ids:
            continue
        color = available_colors[pid]
        for i in range(1, len(pts)):
            cv2.line(frame, pts[i - 1], pts[i], color, 3)

    out.write(frame)


cap.release()
out.release()
print(f"Tracked video saved to '{output_video_path}'")


Tracked video saved to 'output_video.mp4'
