# Task 3: Object Tracking

In [52]:
import cv2
import numpy as np
import time

VIDEO_PATH = "data/nightcars.mp4"

In [53]:
# List all legacy available trackers
TRACKER_NAMES = ["CSRT", "KCF", "MIL", "MOSSE", "BOOSTING", "MEDIANFLOW", "TLD"]
available_trackers = []
if hasattr(cv2, "legacy"):
    for t in TRACKER_NAMES:
        if callable(getattr(cv2.legacy, f"Tracker{t}_create", None)):
            available_trackers.append(t)
print("Available legacy trackers:", available_trackers)

# List all non-legacy available trackers
available_trackers_non_legacy = []
for t in TRACKER_NAMES:
    if callable(getattr(cv2, f"Tracker{t}_create", None)):
        available_trackers_non_legacy.append(t)
print("Available non-legacy trackers:", available_trackers_non_legacy)

Available legacy trackers: ['CSRT', 'KCF', 'MIL', 'MOSSE', 'TLD']
Available non-legacy trackers: ['CSRT', 'KCF', 'MIL']


In [54]:
# Load the first frame and manually select the object to track.
cap = cv2.VideoCapture(VIDEO_PATH)
ret, frame = cap.read()
if not ret:
    raise RuntimeError("Cannot read video file")

# Select ROI manually
window_name = "Select Object"
cv2.namedWindow(window_name, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_name, 1600, 900)

bbox = cv2.selectROI(window_name, frame, False)
cv2.destroyWindow(window_name)

cap.release()

print("Selected bounding box:", bbox)

Selected bounding box: (314, 534, 36, 19)


In [55]:
TRACKER_NAMES = globals().get("TRACKER_NAMES", ["CSRT", "KCF", "MIL", "MOSSE", "BOOSTING", "MEDIANFLOW", "TLD"])


def collect_tracker_factories():
    factories = {}
    if hasattr(cv2, "legacy"):
        for name in TRACKER_NAMES:
            factory = getattr(cv2.legacy, f"Tracker{name}_create", None)
            if callable(factory):
                factories[name] = factory
    for name in TRACKER_NAMES:
        factory = getattr(cv2, f"Tracker{name}_create", None)
        if callable(factory):
            factories[name] = factory
    return factories


def create_tracker(tracker_type):
    tracker_type = tracker_type.upper()
    factories = collect_tracker_factories()
    if tracker_type not in factories:
        raise ValueError(f"Tracker '{tracker_type}' is not available. Found: {list(factories.keys())}")
    return factories[tracker_type](), tracker_type


def track_video(tracker_type, video_path, bbox, output_path=None, display_width=720, drift_threshold=100, show=False):
    tracker, tracker_name = create_tracker(tracker_type)
    cap = cv2.VideoCapture(video_path)
    ret, frame = cap.read()
    if not ret:
        cap.release()
        raise RuntimeError("Cannot read video file for tracking.")

    # Ensure bounding box is integer type for OpenCV
    tracker_bbox = tuple(map(int, bbox))
    tracker.init(frame, tracker_bbox)

    frame_height, frame_width = frame.shape[:2]
    writer = None
    if output_path:
        fourcc = cv2.VideoWriter_fourcc(*"mp4v")
        fps_out = cap.get(cv2.CAP_PROP_FPS) or 30.0
        writer = cv2.VideoWriter(output_path, fourcc, fps_out, (frame_width, frame_height))

    display_height = int(display_width * frame_height / frame_width)
    display_dim = (display_width, display_height)
    reference_point = np.array([tracker_bbox[0], tracker_bbox[1]], dtype=np.float32)

    frame_idx = 0
    success_count = 0
    drift_count = 0
    fps_values = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_idx += 1
        start_time = time.time()
        ok, new_bbox = tracker.update(frame)
        elapsed = time.time() - start_time
        fps = 1.0 / elapsed if elapsed > 0 else 0.0
        fps_values.append(fps)

        draw_frame = frame.copy()
        if ok:
            success_count += 1
            p1 = (int(new_bbox[0]), int(new_bbox[1]))
            p2 = (int(new_bbox[0] + new_bbox[2]), int(new_bbox[1] + new_bbox[3]))
            cv2.rectangle(draw_frame, p1, p2, (0, 0, 255), 2, 1)
            cv2.putText(
                draw_frame, tracker_name, (p1[0], max(20, p1[1] - 10)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 2
            )

            drift_distance = np.linalg.norm(np.array(p1, dtype=np.float32) - reference_point)
            if drift_distance > drift_threshold:
                drift_count += 1
        else:
            cv2.putText(draw_frame, "Tracking failure", (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2)

        cv2.putText(
            draw_frame, f"{tracker_name} FPS: {fps:.0f}", (20, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2
        )

        if writer:
            writer.write(draw_frame)

        if show:
            display_frame = cv2.resize(draw_frame, display_dim, interpolation=cv2.INTER_AREA)
            cv2.imshow("Object Tracking", display_frame)
            if cv2.waitKey(1) & 0xFF == ord("q"):
                break

    cap.release()
    if writer:
        writer.release()
    if show:
        cv2.destroyAllWindows()

    avg_fps = float(np.mean(fps_values)) if fps_values else 0.0
    success_rate = success_count / frame_idx if frame_idx else 0.0

    return {
        "tracker": tracker_name,
        "avg_fps": avg_fps,
        "success_rate": success_rate,
        "drift_cases": drift_count,
        "frames_processed": frame_idx,
        "fps_samples": fps_values,
    }

In [56]:
tracker_results = {}

In [57]:
time_start = time.time()
tracker_results["KCF"] = track_video("KCF", VIDEO_PATH, bbox, output_path="kcf.mp4")
time_end = time.time()
print(f"KCF tracking completed in {time_end - time_start:.2f} seconds.")

KCF tracking completed in 2.99 seconds.


In [58]:
time_start = time.time()
tracker_results["CSRT"] = track_video("CSRT", VIDEO_PATH, bbox, output_path="csrt.mp4")
time_end = time.time()
print(f"CSRT tracking completed in {time_end - time_start:.2f} seconds.")

CSRT tracking completed in 8.15 seconds.


In [59]:
kcf_metrics = tracker_results["KCF"]
csrt_metrics = tracker_results["CSRT"]

In [60]:
from IPython.display import Markdown


def display_tracker_summary(*metrics):
    header = "| Tracker | Avg FPS | Success Rate | Drift Cases |"
    separator = "|---------|---------|--------------|-------------|"
    rows = [header, separator]
    for item in metrics:
        rows.append(
            f"| {item['tracker']} | {item['avg_fps']:.2f} | {item['success_rate']:.2f} | {item['drift_cases']} |"
        )
    display(Markdown("\n".join(rows)))


display_tracker_summary(kcf_metrics, csrt_metrics)

| Tracker | Avg FPS | Success Rate | Drift Cases |
|---------|---------|--------------|-------------|
| KCF | 337.79 | 0.21 | 0 |
| CSRT | 29.84 | 1.00 | 0 |

- KCF is a lot faster with 2.99 seconds total time used and achieve more than 10+ times the FPS of CSRT
- KCF loses the car after it goes under the bridge and the bounding box just disappears
- CSRT is a lot slower with 8.15 seconds total time used
- CSRT also loses the car after it goes under the bridge but the bounding box remains visible at the missing location