In [7]:
"""
Real-time Object Detection + Tracking
------------------------------------
This script uses Ultralytics YOLO (v8+) for detection and ``deep-sort-realtime`` for tracking.
It supports webcam or video-file input, draws bounding boxes with class labels + tracking IDs
and displays the result in a real-time window. It also optionally writes an output video file.

Dependencies (install before running):
    pip install -U ultralytics opencv-python deep-sort-realtime numpy
    # if you need GPU support, install torch + torchvision appropriate for your system
"""

import time
from pathlib import Path

import cv2
import numpy as np
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort


# ---------------------- Config ----------------------
CONF_THR = 0.35            # detection confidence threshold
IOU_THR = 0.45             # NMS / IOU threshold (if used)
MODEL_NAME = "yolov8n.pt"  # small and fast model; change to yolov8s.pt/yolov8m.pt if you want
MAX_COSINE_DISTANCE = 0.2  # DeepSort appearance matching threshold (lower -> stricter)
EMBEDDER = "mobilenet"     # default embedder in deep-sort-realtime; optional


# ---------------------- Helpers ----------------------

def xyxy_to_xywh(box):
    """Convert [x1,y1,x2,y2] to [x_center,y_center,width,height]"""
    x1, y1, x2, y2 = box
    w = x2 - x1
    h = y2 - y1
    cx = x1 + w / 2.0
    cy = y1 + h / 2.0
    return [cx, cy, w, h]


# ---------------------- Main ----------------------

def run(source=0, save_path=None, show=True, max_frames=100, time_limit=None):
    """
    Run object detection and tracking on a video source
    
    Args:
        source: Video source (0 for webcam, or path to video file)
        save_path: Optional path to save output video
        show: Whether to display the video in a window
        max_frames: Maximum number of frames to process (useful for testing)
        time_limit: Maximum time in seconds to run (None for no limit)
    """
    # Load YOLO model (will auto-download if not present)
    model = YOLO(MODEL_NAME)

    # Initialize DeepSort tracker
    tracker = DeepSort(max_age=30,
                       n_init=3,
                       max_cosine_distance=MAX_COSINE_DISTANCE,
                       embedder=EMBEDDER)

    # Open video source
    cap = cv2.VideoCapture(source)
    if not cap.isOpened():
        raise RuntimeError(f"Cannot open source {source}")

    # Prepare writer if saving
    writer = None
    if save_path is not None:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        writer = cv2.VideoWriter(str(save_path), fourcc, fps, (width, height))

    frame_idx = 0
    start_time = time.time()

    try:
        while frame_idx < max_frames:  # Limit frames for testing
            # Check if time limit exceeded
            if time_limit is not None and time.time() - start_time > time_limit:
                print(f"Time limit of {time_limit} seconds reached. Stopping...")
                break
                
            ret, frame = cap.read()
            if not ret:
                break

            frame_idx += 1

            # Ultralytics 'stream' mode returns results generator which is efficient
            # But here we do a single-frame predict for clarity
            results = model.predict(frame, imgsz=640, stream=False, conf=CONF_THR)

            detections_for_tracker = []  # list of [x1,y1,x2,y2, confidence, class_name]

            # results can contain multiple items (batch); when passing a single image, take first
            for r in results:
                # r.boxes contains detected boxes; each box has xyxy, conf, cls
                if r.boxes is None:
                    continue

                for box in r.boxes:
                    xyxy = box.xyxy[0].cpu().numpy() if hasattr(box.xyxy[0], 'cpu') else np.array(box.xyxy[0])
                    conf = float(box.conf[0]) if hasattr(box.conf[0], 'cpu') else float(box.conf[0])
                    clsid = int(box.cls[0]) if hasattr(box.cls[0], 'cpu') else int(box.cls[0])

                    if conf < CONF_THR:
                        continue

                    x1, y1, x2, y2 = map(float, xyxy)
                    class_name = model.model.names.get(clsid, str(clsid)) if hasattr(model, 'model') else str(clsid)

                    detections_for_tracker.append(([x1, y1, x2, y2], conf, class_name))

            # Prepare detections in the format expected by deep-sort-realtime: list of tuples (tlbr, confidence, class)
            ds_dets = []
            for (xyxy, conf, clsname) in detections_for_tracker:
                tlbr = xyxy  # top-left bottom-right
                ds_dets.append((tlbr, conf, clsname))

            # Update tracker -- returns list of Track objects
            tracks = tracker.update_tracks(ds_dets, frame=frame)

            # Draw detections + tracks
            for track in tracks:
                if not track.is_confirmed():
                    continue
                
                # Make sure track_id is an integer
                try:
                    track_id = int(track.track_id)
                except (ValueError, TypeError):
                    track_id = hash(str(track.track_id)) % 10000  # Convert to a numeric hash if it's not already an integer
                
                ltrb = track.to_ltrb()  # left, top, right, bottom
                bbox = list(map(int, ltrb))
                x1, y1, x2, y2 = bbox

                # Generate a consistent color based on track ID
                color = (int((track_id * 37) % 255), int((track_id * 91) % 255), int((track_id * 53) % 255))
                cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)

                # Label with ID and class (if available)
                class_name = track.det_class if hasattr(track, 'det_class') else ''
                label = f"ID {track_id} {class_name}".strip()
                (w_label, h_label), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)
                cv2.rectangle(frame, (x1, y1 - 18), (x1 + w_label + 6, y1), color, -1)
                cv2.putText(frame, label, (x1 + 3, y1 - 3), cv2.FONT_HERSHEY_SIMPLEX, 0.45, (255, 255, 255), 1)

            # FPS display
            elapsed = time.time() - start_time
            fps = frame_idx / elapsed if elapsed > 0 else 0.0
            cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
            
            # Time display - remaining time if time_limit is set
            if time_limit is not None:
                remaining = time_limit - (time.time() - start_time)
                if remaining > 0:
                    cv2.putText(frame, f"Time: {remaining:.1f}s", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)

            # show / write frame
            if show:
                cv2.imshow("YOLO + DeepSORT", frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

            if writer is not None:
                writer.write(frame)

        elapsed_time = time.time() - start_time
        print(f"Processed {frame_idx} frames in {elapsed_time:.2f} seconds at {fps:.2f} FPS")
        return True

    finally:
        cap.release()
        if writer is not None:
            writer.release()
        cv2.destroyAllWindows()

In [8]:
# Set up options for testing the object detection and tracking

# Run with a 30-second time limit
print("Running object detection and tracking with a 30-second time limit")
print("Note: The first run will download the YOLO model if not already available")
print("Press 'q' to quit early, otherwise it will stop after 30 seconds")
run(source=0, save_path=None, show=True, max_frames=10000, time_limit=30)

# Option 2: For a different time limit, uncomment and modify below
# run(source=0, save_path="output.mp4", show=True, time_limit=60)  # 60 seconds

# Option 3: To use a video file instead of webcam
# run(source="path/to/your/video.mp4", save_path="output.mp4", time_limit=30)

Running object detection and tracking with a 30-second time limit
Note: The first run will download the YOLO model if not already available
Press 'q' to quit early, otherwise it will stop after 30 seconds


0: 480x640 1 person, 83.7ms
Speed: 2.7ms preprocess, 83.7ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)
0: 480x640 1 person, 83.7ms
Speed: 2.7ms preprocess, 83.7ms inference, 1.0ms postprocess per image at shape (1, 3, 480, 640)


0: 480x640 1 person, 107.5ms
Speed: 1.8ms preprocess, 107.5ms inference, 1.9ms postprocess per image at shape (1, 3, 480, 640)
0: 480x640 1 person, 107.5ms
Speed: 1.8ms preprocess, 107.5ms inference, 1.9ms postprocess per image at shape (1, 3, 480, 640)


0: 480x640 1 person, 90.0ms
Speed: 1.9ms preprocess, 90.0ms inference, 1.2ms postprocess per image at shape (1, 3, 480, 640)
0: 480x640 1 person, 90.0ms
Speed: 1.9ms preprocess, 90.0ms inference, 1.2ms postprocess per image at shape (1, 3, 480, 640)


0: 480x640 1 person, 80.9ms
Speed

True