# Install Dependencies

In [None]:
!pip install ultralytics

Collecting ultralytics
  Downloading ultralytics-8.3.181-py3-none-any.whl.metadata (37 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.15-py3-none-any.whl.metadata (14 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch>=1.8.0->ultralytics)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch>=1.8.0->ultralytics)
  Downloading nv

# Importings

In [None]:
from google.colab import drive
import cv2
import numpy as np
import time
from ultralytics import YOLO
from scipy.spatial.distance import euclidean
import os
from base64 import b64encode
from IPython.display import HTML
drive.mount('/content/drive')

Mounted at /content/drive


# YOLO MODEL




In [None]:
model = YOLO("yolo12n.pt")

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo12n.pt to 'yolo12n.pt': 100%|██████████| 5.34M/5.34M [00:00<00:00, 92.9MB/s]


# Input & Output

In [None]:
video_path = "/content/drive/MyDrive/Object Detection/Input.mp4"
output_path = "/content/drive/MyDrive/Object Detection/detection_output_yolov12.mp4"

In [None]:
cap = cv2.VideoCapture(video_path)
w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

# Tracking Parameters

In [None]:
CONFIDENCE_THRESHOLD = 0.55 #Pc
MAX_PERSON_COUNT = 500

In [None]:
# Complete track history (no limit)
track_history = {}

In [None]:
# ID mapping and management
id_mapping = {} #Maps original IDs to consistent IDs
last_positions = {} #Stores the last known position of each ID
assigned_ids = set() #set of assigned consistent IDs
track_colors = {} #Consistent colors for each ID

# IOU (Interseciton over Union)

In [None]:
def calculate_iou(box1, box2):
    x1_inter = max(box1[0], box2[0])
    y1_inter = max(box1[1], box2[1])
    x2_inter = min(box1[2], box2[2])
    y2_inter = min(box1[3], box2[3])

    width_inter = max(0, x2_inter - x1_inter)
    height_inter = max(0, y2_inter - y1_inter)
    area_inter = width_inter * height_inter

    area_box1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
    area_box2 = (box2[2] - box2[0]) * (box2[3] - box2[1])

    area_union = area_box1 + area_box2 - area_inter

    if area_union == 0: # no overlap
        return 0

    return area_inter / area_union

In [None]:
tracker_config = "bytetrack.yaml"

In [None]:
frame_count = 0
inference_times = []

In [None]:
# Dictionary to store previous frames' detections for ID consistency
previous_detections = {}  # {consistent_id: [box, frame_last_seen]}

In [None]:
prev_boxes = []
prev_ids = []

# YOLO Detection

In [None]:
while cap.isOpened():
    success, frame = cap.read()
    if not success:
        break

    start_time = time.time()
    frame_count += 1

    #detection + tracking
    results = model.track(
        source=frame,
        conf=CONFIDENCE_THRESHOLD,
        iou=0.45,
        classes=[0],  # Only 'person'
        persist=True,
        verbose=False,
        stream=False,
        tracker=tracker_config
    )[0]

    #Get current detections
    current_boxes = []
    current_ids = []

    if results.boxes.id is not None:
        for box, original_id_tensor in zip(results.boxes.xyxy, results.boxes.id):
            box_coords = box.tolist()
            original_id = int(original_id_tensor.item())
            current_boxes.append(box_coords)
            current_ids.append(original_id)

    #Process each detection and maintain consistent IDs
    processed_ids = set()

    #First, handle detections with existing mappings
    for i, original_id in enumerate(current_ids):
        if original_id in id_mapping:
            consistent_id = id_mapping[original_id]
            x1, y1, x2, y2 = map(int, current_boxes[i])

            #Update previous detections
            previous_detections[consistent_id] = [current_boxes[i], frame_count]
            processed_ids.add(original_id)

            #Update or create track history
            root_point = (int((x1 + x2) / 2), y2) # Bottom center
            if consistent_id not in track_history:
                track_history[consistent_id] = []
            track_history[consistent_id].append(root_point)
            last_positions[consistent_id] = root_point

    #Then, handle new detections with no existing mappings
    for i, original_id in enumerate(current_ids):
        if original_id in processed_ids:
            continue

        x1, y1, x2, y2 = map(int, current_boxes[i])
        current_box = current_boxes[i]
        found_match = False

        #Check if this is a reappearance of a previously seen person
        for consistent_id, (prev_box, last_frame) in list(previous_detections.items()):
            # only consider IDs that haven't been matched yet and within reasonable time window
            if consistent_id not in [id_mapping.get(pid) for pid in processed_ids] and frame_count - last_frame < 60:
                #Calculate IoU and position similarity
                iou = calculate_iou(current_box, prev_box)

                prev_center_x = (prev_box[0] + prev_box[2]) / 2
                prev_center_y = (prev_box[1] + prev_box[3]) / 2
                curr_center_x = (current_box[0] + current_box[2]) / 2
                curr_center_y = (current_box[1] + current_box[3]) / 2

                distance = np.sqrt((prev_center_x - curr_center_x)**2 + (prev_center_y - curr_center_y)**2)

                #If high IoU or close distance, this is likely the same person
                if iou > 0.3 or distance < 150:
                    id_mapping[original_id] = consistent_id
                    previous_detections[consistent_id] = [current_box, frame_count]
                    found_match = True

                    #Update track history
                    root_point = (int((x1 + x2) / 2), y2)
                    track_history[consistent_id].append(root_point)
                    last_positions[consistent_id] = root_point
                    processed_ids.add(original_id)
                    break

        #If no match found, assign new consistent ID
        if not found_match:
            if not assigned_ids or len(assigned_ids) < MAX_PERSON_COUNT:
                #Generate a new consistent ID
                if not assigned_ids:
                    new_id = 1
                else:
                    new_id = max(assigned_ids) + 1

                assigned_ids.add(new_id)
                id_mapping[original_id] = new_id

                #Initialize track history
                root_point = (int((x1 + x2) / 2), y2)
                track_history[new_id] = [root_point]
                last_positions[new_id] = root_point
                previous_detections[new_id] = [current_box, frame_count]
            else:
                #Find the closest existing ID
                min_dist = float('inf')
                closest_id = None
                root_point = (int((x1 + x2) / 2), y2)

                for existing_id in assigned_ids:
                    if existing_id in last_positions:
                        dist = euclidean(root_point, last_positions[existing_id])
                        if dist < min_dist:
                            min_dist = dist
                            closest_id = existing_id

                if closest_id is not None and min_dist < 200:  #Threshold for assignment
                    id_mapping[original_id] = closest_id
                    track_history[closest_id].append(root_point)
                    last_positions[closest_id] = root_point
                    previous_detections[closest_id] = [current_box, frame_count]
                else:
                    #Replace the oldest ID if necessary
                    oldest_id = min(previous_detections, key=lambda k: previous_detections[k][1]) if previous_detections else None
                    if oldest_id and frame_count - previous_detections[oldest_id][1] > 120:  #4 seconds at 30fps
                        id_mapping[original_id] = oldest_id
                        track_history[oldest_id] = [root_point]  #Reset trail for clarity
                        last_positions[oldest_id] = root_point
                        previous_detections[oldest_id] = [current_box, frame_count]
                    else:
                        # As last resort, create a new ID even beyond MAX_PERSON_COUNT
                        new_id = max(assigned_ids) + 1 if assigned_ids else 1
                        assigned_ids.add(new_id)
                        id_mapping[original_id] = new_id
                        track_history[new_id] = [root_point]
                        last_positions[new_id] = root_point
                        previous_detections[new_id] = [current_box, frame_count]

    #Draw all tracked objects with consistent IDs
    if results.boxes.id is not None:
        for box, original_id_tensor in zip(results.boxes.xyxy, results.boxes.id):
            original_id = int(original_id_tensor.item())
            if original_id in id_mapping:
                consistent_id = id_mapping[original_id]
                x1, y1, x2, y2 = map(int, box.tolist())

                #Generate consistent color based on consistent ID
                if consistent_id not in track_colors:
                    #Seed with consistent_id for reproducible colors
                    np.random.seed(consistent_id * 100)
                    track_colors[consistent_id] = tuple(map(int, np.random.randint(0, 255, 3)))

                color = track_colors[consistent_id]

                #Draw bounding box and ID
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
                cv2.line(frame, (x1, y2), (x2, y2), color, thickness=5)

                #Draw label with consistent ID
                label = f"ID {consistent_id}"
                label_size, _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.8, 2)
                cv2.rectangle(frame, (x1, y1 - label_size[1] - 5), (x1 + label_size[0] + 5, y1), color, -1)
                cv2.putText(frame, label, (x1 + 3, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)

                #Draw full trail
                if consistent_id in track_history:
                    for i in range(1, len(track_history[consistent_id])):
                        cv2.line(frame, track_history[consistent_id][i-1], track_history[consistent_id][i],
                               color, thickness=3)

    #Clean up old entries from previous_detections
    for consistent_id in list(previous_detections.keys()):
        if frame_count - previous_detections[consistent_id][1] > 300:  # 10 seconds at 30fps
            # Keep the ID in assigned_ids but remove from active tracking
            if consistent_id in previous_detections:
                del previous_detections[consistent_id]

    #Draw frame counter
    cv2.putText(frame, f"Frame: {frame_count}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

    #Write the frame to output video
    out.write(frame)

    #Save previous state
    prev_boxes = current_boxes.copy()
    prev_ids = current_ids.copy()

    #Track inference time
    inference_time = time.time() - start_time
    inference_times.append(inference_time)

    if frame_count % 100 == 0:
        print(f"Processed {frame_count} frames...")
        print(f"Current active IDs: {sorted(list(assigned_ids))}")

# Cleanup
cap.release()
out.release()

[31m[1mrequirements:[0m Ultralytics requirement ['lap>=0.5.12'] not found, attempting AutoUpdate...

[31m[1mrequirements:[0m AutoUpdate success ✅ 0.6s

Processed 100 frames...
Current active IDs: []
Processed 200 frames...
Current active IDs: [1]
Processed 300 frames...
Current active IDs: [1]
Processed 400 frames...
Current active IDs: [1]
Processed 500 frames...
Current active IDs: [1]
Processed 600 frames...
Current active IDs: [1, 2]
Processed 700 frames...
Current active IDs: [1, 2]
Processed 800 frames...
Current active IDs: [1, 2]
Processed 900 frames...
Current active IDs: [1, 2]
Processed 1000 frames...
Current active IDs: [1, 2]
Processed 1100 frames...
Current active IDs: [1, 2, 3]
Processed 1200 frames...
Current active IDs: [1, 2, 3]
Processed 1300 frames...
Current active IDs: [1, 2, 3]
Processed 1400 frames...
Current active IDs: [1, 2, 3]


# Result

In [None]:
# Statistics
avg_inference_time = np.mean(inference_times)
avg_fps = 1 / avg_inference_time if avg_inference_time > 0 else 0

print(f"✅ Output saved: {output_path}")
print(f"📈 Avg Inference Time/Frame: {avg_inference_time:.4f} sec")
print(f"🎞️ Avg FPS: {avg_fps:.2f}")
print(f"👥 Unique IDs tracked: {len(assigned_ids)}")
print(f"🔄 ID mapping summary: {id_mapping}")

✅ Output saved: /content/drive/MyDrive/Object Detection/detection_output_yolov12.mp4
📈 Avg Inference Time/Frame: 0.0374 sec
🎞️ Avg FPS: 26.72
👥 Unique IDs tracked: 3
🔄 ID mapping summary: {2: 1, 6: 2, 8: 2, 9: 3, 12: 1, 13: 1, 14: 2}


In [None]:
original_output_path = "/kaggle/working/detection_output_yolov12.mp4"
reencoded_output_path = "/kaggle/working/detection_output_yolov12_reencoded.mp4"

reencode_cmd = f"ffmpeg -y -i {original_output_path} -c:v libx264 -pix_fmt yuv420p {reencoded_output_path}"
reencode_result = os.system(reencode_cmd)

if reencode_result != 0 or not os.path.exists(reencoded_output_path):
    raise RuntimeError(f"Failed to re-encode output video. ffmpeg return code: {reencode_result}")

with open(reencoded_output_path, 'rb') as f:
    mp4 = f.read()

data_url = "data:video/mp4;base64," + b64encode(mp4).decode()

HTML(f"""
<video width=600 controls>
    <source src="{data_url}" type="video/mp4">
</video>
""")