# Player Tracking System Comparison

This notebook compares 3 different player tracking systems:
- Eagle
- Darkmyter (using Ultralytics YOLO)
- Ultralytics YOLO 11 + Botsort

**Important**: Run cells in order from top to bottom!

In [None]:
# Cell 1: Setup directories and utilities

from pathlib import Path
import os

BASE_DIR = Path("/content")
REPOS_DIR = BASE_DIR / "repositories"
VIDEOS_DIR = BASE_DIR / "videos"
CLIPS_DIR = BASE_DIR / "clips"
OUTPUT_DIR = BASE_DIR / "output"

for d in [REPOS_DIR, VIDEOS_DIR, CLIPS_DIR, OUTPUT_DIR]:
    d.mkdir(parents=True, exist_ok=True)

def print_status(msg, status="INFO"):
    "Print colored status messages"
    colors = {
        "INFO": "\033[94m",
        "SUCCESS": "\033[92m",
        "WARNING": "\033[93m",
        "ERROR": "\033[91m",
        "RESET": "\033[0m"
    }
    print(f"{colors.get(status, '')}[{status}] {msg}{colors['RESET']}")

print_status("Directory structure created", "SUCCESS")
print(f"Working directory: {BASE_DIR}")

In [None]:
# Cell 2: Clone all repositories

import subprocess

REPOSITORIES = {
    "eagle": "https://github.com/nreHieW/Eagle.git",
    "darkmyter": "https://github.com/Darkmyter/Football-Players-Tracking.git",
}

print_status("Cloning repositories...", "INFO")

for name, url in REPOSITORIES.items():
    repo_path = REPOS_DIR / name

    if repo_path.exists():
        print_status(f"{name}: Already exists, skipping", "WARNING")
        continue

    try:
        print_status(f"{name}: Cloning...", "INFO")
        result = subprocess.run(
            ["git", "clone", url, str(repo_path)],
            capture_output=True,
            text=True,
            timeout=300
        )

        if result.returncode == 0:
            print_status(f"{name}: Cloned successfully", "SUCCESS")
        else:
            print_status(f"{name}: Clone failed - {result.stderr[:100]}", "ERROR")

    except Exception as e:
        print_status(f"{name}: Clone failed - {str(e)}", "ERROR")

print_status("Repository cloning complete", "SUCCESS")

In [None]:
# Cell 3: Install dependencies

print_status("Installing dependencies...", "INFO")

!pip install -q torch torchvision torchaudio tracklab
!pip install -q opencv-python numpy scipy pandas scikit-learn matplotlib
!pip install -q ultralytics supervision
!pip install -q gdown Pillow tqdm requests
!pip install -q \
    loguru cython cython_bbox lap onemetric scikit-image tabulate tqdm numpy torch torchvision opencv-python pyyaml yolox
!pip install -q loguru
!pip install onemetric #THIS CELL IS IMPORTANT
!pip install psutil


print_status("Dependencies installed", "SUCCESS")

In [None]:
from ultralytics import YOLO
model = YOLO("yolo11m.pt")
print("Loaded weights from:", getattr(model, "ckpt_path", "unknown path"))

In [None]:
# Cell 4: Download videos from Google Drive

!pip install -q gdown

import gdown
from pathlib import Path

# Shared folder ID
FOLDER_ID = "1Cs4kTX6GYwfcpKyDZdqRKBezz49wT7_N"

print_status("Downloading videos from shared folder...", "INFO")

try:
    gdown.download_folder(
        id=FOLDER_ID,
        output=str(VIDEOS_DIR),
        quiet=False,
        use_cookies=False
    )

    # List downloaded videos
    video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.MP4', '.AVI', '.MOV', '.MKV']
    available_videos = []

    for ext in video_extensions:
        available_videos.extend(list(VIDEOS_DIR.glob(f"*{ext}")))

    if not available_videos:
        print_status("No video files found", "ERROR")
    else:
        print(f"\nDOWNLOADED {len(available_videos)} VIDEO(S)")
        print("="*50)

        for idx, video in enumerate(available_videos, 1):
            size_mb = video.stat().st_size / (1024 * 1024)
            print(f"{idx}. {video.name} ({size_mb:.1f} MB)")

        print("\n")
        print("VIDEO SELECTION")

        # Ask for number of videos
        print("\nHow many videos do you want to evaluate?")
        print(f"  - Enter a number between 1 and {len(available_videos)}")
        print(f"  - Enter 'all' or leave blank to process ALL {len(available_videos)} videos")

        num_selection = input("\nNumber of videos: ").strip().lower()

        VIDEO_PATHS = []

        if not num_selection or num_selection == 'all':
            VIDEO_PATHS = available_videos
            print_status(f"Selected ALL {len(VIDEO_PATHS)} videos", "SUCCESS")
        elif num_selection.isdigit():
            num_videos = int(num_selection)
            if 1 <= num_videos <= len(available_videos):
                if num_videos == len(available_videos):
                    VIDEO_PATHS = available_videos
                else:
                    print(f"\nSelect {num_videos} video(s) from the list above:")
                    print("  - Enter comma-separated numbers (e.g., '1,3,5')")
                    print(f"  - Or enter 'first' to select the first {num_videos} videos")

                    video_selection = input("\nYour selection: ").strip().lower()

                    if video_selection == 'first':
                        VIDEO_PATHS = available_videos[:num_videos]
                    else:
                        try:
                            indices = [int(x.strip()) for x in video_selection.split(',')]
                            if len(indices) != num_videos:
                                print_status(f"Warning: Selected {len(indices)} videos instead of {num_videos}", "WARNING")
                            for idx in indices[:num_videos]:
                                if 1 <= idx <= len(available_videos):
                                    VIDEO_PATHS.append(available_videos[idx - 1])
                        except ValueError:
                            print_status("Invalid input, selecting first videos", "WARNING")
                            VIDEO_PATHS = available_videos[:num_videos]

                print_status(f"Selected {len(VIDEO_PATHS)} video(s)", "SUCCESS")
                for video in VIDEO_PATHS:
                    print(f"  - {video.name}")
            else:
                print_status(f"Invalid number. Must be between 1 and {len(available_videos)}", "ERROR")
        else:
            print_status("Invalid input", "ERROR")

        if not VIDEO_PATHS:
            print_status("No videos selected", "ERROR")

except Exception as e:
    print_status(f"Download failed: {str(e)}", "ERROR")
    print("\nNote: Make sure the folder is set to 'Anyone with the link can view'")

In [None]:
# Cell 5: Prepare videos and clips

import cv2
import subprocess

CLIP_DURATION = 60

# Prepare both full videos and clips
FULL_VIDEOS = {}
VIDEO_CLIPS = {}

for VIDEO_PATH in VIDEO_PATHS:
    VIDEO_NAME = VIDEO_PATH.stem

    print(f"\nPREPARING: {VIDEO_NAME}")

    # Store full video path
    FULL_VIDEOS[VIDEO_NAME] = {"full": VIDEO_PATH}

    # Get video info for clip extraction
    cap = cv2.VideoCapture(str(VIDEO_PATH))
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration = total_frames / fps
    cap.release()

    print(f"Duration: {duration:.1f}s | FPS: {fps:.1f} | Frames: {total_frames}")

    # Determine clip positions
    if duration < CLIP_DURATION * 3:
        if duration < CLIP_DURATION:
            CLIPS = [(0, duration, "full")]
            print_status(f"Video shorter than {CLIP_DURATION}s, will use full video", "INFO")
        else:
            CLIPS = [
                (0, CLIP_DURATION, "start"),
                (max(duration - CLIP_DURATION, 0), CLIP_DURATION, "end")
            ]
            print_status("Will extract start and end clips", "INFO")
    else:
        CLIPS = [
            (0, CLIP_DURATION, "start"),
            ((duration - CLIP_DURATION) / 2, CLIP_DURATION, "middle"),
            (duration - CLIP_DURATION, CLIP_DURATION, "end")
        ]
        print_status("Will extract start, middle, and end clips", "INFO")

    # Extract clips
    CLIP_PATHS = {}
    for start_time, clip_dur, position in CLIPS:
        clip_name = f"{VIDEO_NAME}_{position}.mp4"
        clip_path = CLIPS_DIR / clip_name

        cmd = [
            "ffmpeg", "-i", str(VIDEO_PATH),
            "-ss", str(start_time),
            "-t", str(clip_dur),
            "-c", "copy",
            str(clip_path),
            "-y",
            "-loglevel", "error"
        ]

        result = subprocess.run(cmd, capture_output=True)

        if result.returncode == 0 and clip_path.exists():
            CLIP_PATHS[position] = clip_path
            size_mb = clip_path.stat().st_size / (1024 * 1024)
            print_status(f"Clip '{position}' ready ({size_mb:.1f} MB)", "SUCCESS")
        else:
            print_status(f"Failed to extract '{position}' clip", "ERROR")

    VIDEO_CLIPS[VIDEO_NAME] = CLIP_PATHS

print("\n")
print("PREPARATION COMPLETE")
print(f"Prepared {len(VIDEO_PATHS)} video(s) with both full and clip options")

In [None]:
# Cell: Setup Darkmyter (Original ByteTrack + YOLO - Authentic Implementation)

print_status("Setting up Darkmyter tracking...", "INFO")

import os
import subprocess
from pathlib import Path

darkmyter_dir = REPOS_DIR / "darkmyter"

# Clone original ByteTrack repo (required for authentic Darkmyter)
bytetrack_dir = darkmyter_dir / "ByteTrack"
if not bytetrack_dir.exists():
    print_status("Cloning original ByteTrack repository...", "INFO")
    subprocess.run([
        "git", "clone", "--depth", "1",
        "https://github.com/ifzhang/ByteTrack.git",
        str(bytetrack_dir)
    ], check=True)

# Install ByteTrack dependencies
!pip install -q cython lap cython_bbox

# Download football-specific weights
weights_dir = darkmyter_dir / "yolov8-weights"
weights_dir.mkdir(parents=True, exist_ok=True)

custom_weights = weights_dir / "yolov8l-football-players.pt"
gdrive_id = "12dWRBsegmyGE3feTdy9LBf1eZ-hTZ9Sx"

def download_darkmyter_weights():
    print_status("Downloading Darkmyter football weights...", "INFO")
    try:
        import gdown
        url = f"https://drive.google.com/uc?id={gdrive_id}"
        gdown.download(url, str(custom_weights), quiet=False)
        print_status("Darkmyter weights downloaded", "SUCCESS")
    except Exception as e:
        print_status(f"Failed to download weights: {e}", "ERROR")

if custom_weights.exists():
    try:
        with open(custom_weights, "rb") as f:
            header = f.read(16)
        if header.startswith(b"<"):
            print_status("Weights file is HTML, re-downloading...", "ERROR")
            custom_weights.unlink(missing_ok=True)
            download_darkmyter_weights()
        else:
            print_status("Darkmyter weights already present", "SUCCESS")
    except Exception:
        custom_weights.unlink(missing_ok=True)
        download_darkmyter_weights()
else:
    download_darkmyter_weights()

# Create Darkmyter wrapper
darkmyter_wrapper = darkmyter_dir / "run_darkmyter.py"
darkmyter_wrapper.write_text('''
#!/usr/bin/env python
"""
Darkmyter: YOLOv8 + original ByteTrack (football, notebook-authentic).

This script is a CLI version of the Roboflow "track players with ByteTrack + YOLOv8"
notebook, adapted to output JSON instead of an annotated video.

It:
- Uses yolov8l-football-players.pt if present, else falls back to yolov8x.pt
- Uses original ifzhang/ByteTrack
- Uses football-specific BYTETrackerArgs
- Uses the same format_predictions + match_detections_with_tracks pattern
"""

import argparse
import json
import sys
from dataclasses import dataclass
from pathlib import Path

import cv2
import numpy as np

try:
    from ultralytics import YOLO
    import torch
except ImportError:
    print("Error: ultralytics or torch not installed", file=sys.stderr)
    sys.exit(1)

# Original ByteTrack imports
BYTETRACK_PATH = Path(__file__).resolve().parent / "ByteTrack"
sys.path.insert(0, str(BYTETRACK_PATH))

try:
    from yolox.tracker.byte_tracker import BYTETracker, STrack
except ImportError:
    print("Error: ByteTrack repo not found; expected at ./ByteTrack", file=sys.stderr)
    sys.exit(1)

try:
    from onemetric.cv.utils.iou import box_iou_batch
except ImportError:
    print("Error: onemetric not installed (needed for IoU). "
          "Install with: pip install onemetric", file=sys.stderr)
    sys.exit(1)
try:
    from yolox.tracker.byte_tracker import BYTETracker, STrack
except ImportError as e:
    import traceback
    print("Error importing ByteTrack from ./ByteTrack:", e, file=sys.stderr)
    traceback.print_exc()
    sys.exit(1)


#BYTETrackerArgs: football-specific params (from notebook)
@dataclass(frozen=True)
class BYTETrackerArgs:
    track_thresh: float = 0.25
    track_buffer: int = 30
    match_thresh: float = 0.8
    aspect_ratio_thresh: float = 3.0
    min_box_area: float = 1.0
    mot20: bool = False


# Same mapping as in the notebook
IND_TO_CLS = {
    0: "ball",
    1: "goalkeeper",
    2: "player",
    3: "referee",
}


def format_predictions(predictions, with_conf: bool = True) -> np.ndarray:
    """
    Format YOLO detections to ByteTrack format: (x1, y1, x2, y2, conf).

    This mirrors the notebook's function exactly:
        bbox = pred.boxes.xyxy.int().tolist()[0]
        conf = pred.boxes.conf.item()
    """
    frame_detections = []
    for pred in predictions:
        # pred is a ultralytics Results object with a single box
        bbox = pred.boxes.xyxy.int().tolist()[0]  # [x1, y1, x2, y2]
        conf = float(pred.boxes.conf.item())
        if with_conf:
            detection = bbox + [conf]
        else:
            detection = bbox
        frame_detections.append(detection)

    if not frame_detections:
        # shape must be (0, 5) or (0, 4) depending on with_conf
        return np.zeros((0, 5 if with_conf else 4), dtype=float)

    return np.array(frame_detections, dtype=float)


def match_detections_with_tracks(detections, tracks):
    """
    Notebook-authentic matching:

    - Build detections_bboxes using format_predictions(with_conf=False)
    - Build tracks_bboxes from track.tlbr
    - Compute IoU matrix with box_iou_batch
    - For each track, assign its track_id to the best IoU detection if IoU != 0
    """
    if not detections or not tracks:
        return detections

    detections_bboxes = format_predictions(detections, with_conf=False)
    tracks_bboxes = np.array([track.tlbr for track in tracks], dtype=float)

    iou = box_iou_batch(tracks_bboxes, detections_bboxes)  # [num_tracks, num_dets]
    track2detection = np.argmax(iou, axis=1)

    for tracker_index, detection_index in enumerate(track2detection):
        if iou[tracker_index, detection_index] != 0:
            detections[detection_index].tracker_id = tracks[tracker_index].track_id

    return detections


def main():
    parser = argparse.ArgumentParser(
        description="Darkmyter: YOLOv8 + original ByteTrack (football)"
    )
    parser.add_argument("--video", required=True, help="Path to input video")
    parser.add_argument("--output", required=True, help="Path to output JSON file")

    args = parser.parse_args()
    video_path = Path(args.video)
    output_path = Path(args.output)

    if not video_path.exists():
        print(f"Error: video not found: {video_path}", file=sys.stderr)
        sys.exit(1)

    # Load YOLO model (football weights if available)
    repo_root = Path(__file__).resolve().parent
    custom_weights = repo_root / "yolov8-weights" / "yolov8l-football-players.pt"

    if custom_weights.exists():
        print(f"[Darkmyter] Using football-specific weights: {custom_weights}", file=sys.stderr)
        model = YOLO(str(custom_weights))
        model_name = "yolov8l-football"
        football_specific = True
    else:
        print("[Darkmyter] Football weights not found, using yolov8x.pt", file=sys.stderr)
        model = YOLO("yolov8x.pt")
        model_name = "yolov8x"
        football_specific = False

    device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"[Darkmyter] Device: {device}", file=sys.stderr)

    # --- Open video -----------------------------------------------------------
    cap = cv2.VideoCapture(str(video_path))
    if not cap.isOpened():
        print(f"Error: cannot open video: {video_path}", file=sys.stderr)
        sys.exit(1)

    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps is None or fps <= 0:
        fps = 30.0
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if cap.get(cv2.CAP_PROP_FRAME_COUNT) > 0 else -1

    print(f"[Darkmyter] FPS={fps:.1f}, total_frames={total_frames}", file=sys.stderr)

    # --- Initialize ByteTrack (with proper frame_rate) ------------------------
    tracker = BYTETracker(BYTETrackerArgs(), frame_rate=int(round(fps)))

    detections_json = []
    total_tracks = set()
    frame_idx = 0


    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # Notebook: detections = yolo_model(frame, verbose=0)[0]
        results = model(frame, verbose=False)[0]

        # "detections" in the notebook is iterable; each element is a single-box Results
        detections_with_tracker = []
        for detection in results:
            detection.tracker_id = ""  # will be filled in after tracking
            detections_with_tracker.append(detection)

        if detections_with_tracker:
            # get trackers with ByteTrack
            bt_input = format_predictions(detections_with_tracker, with_conf=True)

            tracks = tracker.update(
                output_results=bt_input,
                img_info=frame.shape,
                img_size=frame.shape,
            )

            # set tracker_id in yolo detections
            detections_with_tracker = match_detections_with_tracks(
                detections_with_tracker,
                tracks,
            )

            # Convert to JSON rows
            for det in detections_with_tracker:
                if det.tracker_id == "":
                    continue

                # Single box per det
                bbox = det.boxes.xyxy.tolist()[0]
                x1, y1, x2, y2 = map(float, bbox)
                conf = float(det.boxes.conf.item())
                cls_idx = int(det.boxes.cls.item()) if det.boxes.cls is not None else 0

                detections_json.append(
                    {
                        "frame_id": int(frame_idx),
                        "track_id": int(det.tracker_id),
                        "bbox": [x1, y1, x2, y2],
                        "score": conf,
                        "class_id": cls_idx,
                        "class_name": IND_TO_CLS.get(cls_idx, "unknown"),
                    }
                )
                total_tracks.add(int(det.tracker_id))

        if frame_idx % 100 == 0:
            if total_frames > 0:
                pct = 100.0 * frame_idx / total_frames
                print(f"[Darkmyter] Processed {frame_idx}/{total_frames} frames ({pct:.1f}%)",
                      file=sys.stderr)
            else:
                print(f"[Darkmyter] Processed {frame_idx} frames...", file=sys.stderr)

        frame_idx += 1

    cap.release()

    stats = {
        "total_tracks": len(total_tracks),
        "frames_processed": frame_idx,
    }

    full_output = {
        "framework": "Darkmyter",
        "model": model_name,
        "tracker": "ByteTrack (ifzhang/ByteTrack)",
        "tracker_params": {
            "track_thresh": BYTETrackerArgs.track_thresh,
            "track_buffer": BYTETrackerArgs.track_buffer,
            "match_thresh": BYTETrackerArgs.match_thresh,
            "aspect_ratio_thresh": BYTETrackerArgs.aspect_ratio_thresh,
            "min_box_area": BYTETrackerArgs.min_box_area,
        },
        "features": {
            "football_specific": football_specific,
            "original_bytetrack": True,
        },
        "detections": detections_json,
        "statistics": stats,
    }

    output_path.parent.mkdir(parents=True, exist_ok=True)
    with output_path.open("w") as f:
        json.dump(full_output, f, indent=2)

    print(
        f"[Darkmyter] Complete: {len(detections_json)} detections, "
        f"{stats['total_tracks']} tracks. Output saved to: {output_path}",
        file=sys.stderr,
    )


if __name__ == "__main__":
    main()
''')

darkmyter_wrapper.chmod(0o755)
print_status("Darkmyter wrapper created (original ByteTrack)", "SUCCESS")

In [None]:
# Patch ByteTrack for NumPy 1.24+ compatibility
import os

bytetrack_path = REPOS_DIR / "darkmyter" / "ByteTrack"

# Files that typically have np.float issues
for root, dirs, files in os.walk(bytetrack_path):
    for file in files:
        if file.endswith('.py'):
            filepath = os.path.join(root, file)
            try:
                with open(filepath, 'r') as f:
                    content = f.read()

                # Replace deprecated numpy aliases
                new_content = content
                new_content = new_content.replace('np.float,', 'float,')
                new_content = new_content.replace('np.float)', 'float)')
                new_content = new_content.replace('np.float]', 'float]')
                new_content = new_content.replace('np.int,', 'int,')
                new_content = new_content.replace('np.int)', 'int)')
                new_content = new_content.replace('np.int]', 'int]')

                if new_content != content:
                    with open(filepath, 'w') as f:
                        f.write(new_content)
                    print(f"Patched: {filepath}")
            except Exception as e:
                pass

print("ByteTrack patched for NumPy compatibility")

In [None]:
import textwrap
from pathlib import Path
import yaml
import ultralytics

REPOS_DIR = Path("/content/repositories")
ultra_dir = REPOS_DIR / "ultra_trackers"
ultra_dir.mkdir(parents=True, exist_ok=True)


# 1) Choose custom configs for bytetrack + botsort
runner_script = ultra_dir / "run_ultra_yolo_tracker.py"
runner_script.write_text(textwrap.dedent("""\
    #!/usr/bin/env python
    \"\"\"Run Ultralytics YOLO (v5 / v8 / v11 weights) with a chosen tracker and dump JSON tracks.

    Usage:
      python run_ultra_yolo_tracker.py \\
          --video input.mp4 \\
          --output output.json \\
          --weights yolo11m.pt \\
          --tracker botsort
    \"\"\"

    import argparse
    import json
    from pathlib import Path
    from ultralytics import YOLO
    import yaml

    def main():
        parser = argparse.ArgumentParser(description="YOLO + tracker to JSON")
        parser.add_argument("--video", required=True, help="Path to input video")
        parser.add_argument("--output", required=True, help="Path to output JSON")
        parser.add_argument(
            "--weights",
            default="yolo11m.pt",
            help="YOLO weights (e.g., yolov5s.pt, yolov8n.pt, yolo11m.pt, ...)",
        )
        parser.add_argument(
            "--tracker",
            default="botsort",
            choices=["botsort", "bytetrack", "deepsort"],
            help="Which tracker to use",
        )
        parser.add_argument("--conf", type=float, default=0.3,
                    help="Confidence threshold (detector)")
        parser.add_argument("--iou", type=float, default=0.4,
                    help="IOU threshold for NMS (lower = keep more boxes)")
        parser.add_argument("--imgsz", type=int, default=1280,
                    help="Image size for inference")
        parser.add_argument("--max-det", type=int, default=300,
                    help="Maximum detections per image")

        args = parser.parse_args()

        video_path = Path(args.video)
        out_path = Path(args.output)

        if not video_path.exists():
            raise SystemExit(f"Video not found: {video_path}")

        # Load YOLO model
        model = YOLO(args.weights)

        # Get the script's directory for saving custom configs
        ultra_root = Path(__file__).resolve().parent

        # Try to load Ultralytics default configs first
        import ultralytics
        ultra_path = Path(ultralytics.__file__).parent
        tracker_base_path = ultra_path / "cfg" / "trackers"

        # Select the default config path
        if args.tracker == "bytetrack":
            default_cfg_path = tracker_base_path / "bytetrack.yaml"
        elif args.tracker == "botsort":
            default_cfg_path = tracker_base_path / "botsort.yaml"
        else:  # deepsort
            default_cfg_path = tracker_base_path / "deepsort.yaml"

        # Path for our custom config
        custom_cfg_path = ultra_root / f"{args.tracker}_football.yaml"

        # Load and modify the config
        if default_cfg_path.exists():
            # Load the default config
            with open(default_cfg_path, 'r') as f:
                tracker_cfg = yaml.safe_load(f)

            # Modify with football-optimized values from tracklab

            if args.tracker == "botsort":
                tracker_cfg.update({
                    "track_high_thresh": 0.33824964456239337,
                    "new_track_thresh": 0.21144301345190655,
                    "track_buffer": 60,
                    "match_thresh": 0.22734550911325851,
                    "proximity_thresh": 0.5945380911899254,
                    "appearance_thresh": 0.4818211117541298,
                    "cmc_method": "sparseOptFlow",
                    "frame_rate": 30,
                    "lambda_": 0.9896143462366406,
                    "conf_thres": 0.3501265956918775,
                    "with_reid": True
                })

            # Save the modified config to a file
            with open(custom_cfg_path, 'w') as f:
                yaml.dump(tracker_cfg, f)

            # Use the custom config FILE PATH (not the dictionary!)
            tracker_cfg_path = str(custom_cfg_path)
        else:
            # Fallback: just use the default tracker name
            print(f"Warning: Could not find default config at {default_cfg_path}")
            print(f"Using default tracker: {args.tracker}.yaml")
            tracker_cfg_path = f"{args.tracker}.yaml"

        # Run tracking with the config FILE PATH
        results = model.track(
            source=str(video_path),
            tracker=tracker_cfg_path,  # Pass the FILE PATH, not dictionary!
            conf=args.conf,
            iou=args.iou,
            imgsz=args.imgsz,
            max_det=args.max_det,
            stream=True,
            device=0,
            save=False,
            verbose=False,
            persist=True,
            vid_stride=1,
        )

        print(f"Tracking with {args.tracker} on device: {model.device}")

        all_detections = []
        frame_idx = 0

        for r in results:
            boxes = r.boxes
            if boxes is None:
                frame_idx += 1
                continue

            ids = boxes.id
            if ids is None:
                frame_idx += 1
                continue

            xyxy = boxes.xyxy
            confs = boxes.conf
            clses = boxes.cls

            ids = ids.cpu().tolist()
            xyxy = xyxy.cpu().tolist()
            confs = confs.cpu().tolist()
            clses = clses.cpu().tolist()

            for tid, (x1, y1, x2, y2), score, c in zip(ids, xyxy, confs, clses):
                all_detections.append({
                    "frame_id": frame_idx,
                    "track_id": int(tid),
                    "bbox": [float(x1), float(y1), float(x2), float(y2)],
                    "score": float(score),
                    "class_id": int(c),
                })

            frame_idx += 1

        out_path.parent.mkdir(parents=True, exist_ok=True)
        with out_path.open("w") as f:
            json.dump(all_detections, f)

        print(f"Wrote {len(all_detections)} tracked detections to {out_path}")


    if __name__ == "__main__":
        main()
    """))

runner_script.chmod(0o755)
print_status("Created wrapper for Botsort and Bytetrack", "SUCCESS")


In [None]:

# Cell: Setup Eagle with Python 3.13

print_status("Setting up Eagle with Python 3.13...", "INFO")

eagle_dir = REPOS_DIR / "eagle"

# Install Python 3.13 (Eagle's required version)
print_status("Installing Python 3.13...", "INFO")
!apt-get update -qq
!apt-get install -y software-properties-common
!add-apt-repository -y ppa:deadsnakes/ppa
!apt-get update -qq
!apt-get install -y python3.13 python3.13-venv python3.13-dev python3.13-distutils

# Install pip for Python 3.13
!curl -sS https://bootstrap.pypa.io/get-pip.py | python3.13

# Install uv if not already installed
print_status("Installing uv...", "INFO")
!curl -LsSf https://astral.sh/uv/install.sh | sh

# Add uv to PATH
import os
os.environ['PATH'] = f"/root/.local/bin:{os.environ['PATH']}"

# Create Eagle environment with Python 3.13
os.chdir(eagle_dir)
print_status("Creating Eagle environment with Python 3.13...", "INFO")
!uv venv --python python3.13
!uv sync

# Download model weights
print_status("Downloading Eagle model weights...", "INFO")
models_dir = eagle_dir / "eagle" / "models"
if models_dir.exists():
    os.chdir(models_dir)
    !bash get_weights.sh
    os.chdir(eagle_dir)
    print_status("Eagle weights downloaded", "SUCCESS")
else:
    print_status("Eagle models directory not found", "ERROR")

# Create Eagle wrapper that uses Python 3.13

eagle_wrapper = eagle_dir / "run_eagle.py"
eagle_wrapper.write_text('''
#!/usr/bin/env python3
"""
Clean Eagle wrapper that produces a single output file
Outputs Eagle's native raw format directly
"""

import argparse
import json
import subprocess
import sys
import os
from pathlib import Path
import time


def get_eagle_output(eagle_output_dir):
    """
    Find and return Eagle's raw output data

    Args:
        eagle_output_dir: Path to Eagle's output directory

    Returns:
        Raw Eagle data dictionary
    """
    coords_dir = eagle_output_dir / "raw_coordinates"
    if not coords_dir.exists():
        coords_dir = eagle_output_dir

    raw_coords_file = coords_dir / "raw_coordinates.json"
    raw_data_file = coords_dir / "raw_data.json"
    processed_file = coords_dir / "processed_data.json"

    data = None
    source_file = None

    for file_path in [raw_coords_file, processed_file, raw_data_file]:
        if file_path.exists():
            with open(file_path, 'r') as f:
                data = json.load(f)
            source_file = file_path
            print(f"[Eagle] Using {file_path.name} as source", file=sys.stderr)
            break

    if data is None:
        json_files = list(coords_dir.glob("*.json"))
        if json_files:
            with open(json_files[0], 'r') as f:
                data = json.load(f)
            source_file = json_files[0]
            print(f"[Eagle] Using {json_files[0].name} as source", file=sys.stderr)

    if data is None:
        print(f"[Eagle] No output files found in {coords_dir}", file=sys.stderr)
        raise FileNotFoundError(f"No Eagle JSON outputs found in {coords_dir}")

    return data


def main():
    parser = argparse.ArgumentParser(description='Eagle wrapper for unified output')
    parser.add_argument('--video', required=True, help='Path to input video')
    parser.add_argument('--output', required=True, help='Path to output JSON file')
    parser.add_argument('--fps', default=24, type=int, help='FPS to process (default: 24)')
    args = parser.parse_args()

    video_path = Path(args.video)
    output_path = Path(args.output)

    if not video_path.exists():
        print(f"Error: Video not found: {video_path}", file=sys.stderr)
        sys.exit(1)

    output_path.parent.mkdir(parents=True, exist_ok=True)

    env = os.environ.copy()
    env["CUDA_VISIBLE_DEVICES"] = "0"

    cmd = [
        "uv", "run", "--python", "python3.13",
        "main.py",
        "--video_path", str(video_path),
        "--fps", str(args.fps),
    ]

    print(f"[Eagle] Processing {video_path.name} at {args.fps} FPS...", file=sys.stderr)
    start = time.time()

    eagle_dir = Path(__file__).parent
    result = subprocess.run(
        cmd,
        capture_output=True,
        text=True,
        cwd=eagle_dir,
        env=env,
    )

    elapsed = time.time() - start
    print(f"[Eagle] Processing took {elapsed:.1f}s", file=sys.stderr)

    if result.returncode != 0:
        print(f"[Eagle] Warning: Process returned {result.returncode}", file=sys.stderr)
        if result.stderr:
            print(f"[Eagle] Stderr: {result.stderr[:500]}", file=sys.stderr)

    video_stem = video_path.stem
    eagle_output_base = eagle_dir / "output"
    eagle_output_dir = eagle_output_base / video_stem

    if not eagle_output_dir.exists():
        for d in eagle_output_base.iterdir():
            if d.is_dir() and video_stem in d.name:
                eagle_output_dir = d
                break

    if not eagle_output_dir.exists():
        print(f"[Eagle] Error: Could not find output directory for {video_stem}", file=sys.stderr)
        with open(output_path, 'w') as f:
            json.dump([], f)
        sys.exit(1)

    try:
        print(f"[Eagle] Consolidating output from {eagle_output_dir}", file=sys.stderr)
        raw_data = get_eagle_output(eagle_output_dir)
    except Exception as e:
        print(f"[Eagle] Error while getting Eagle output: {e}", file=sys.stderr)
        with open(output_path, "w") as f:
            json.dump([], f)
        sys.exit(1)

    with open(output_path, 'w') as f:
        json.dump(raw_data, f, indent=2)

    if isinstance(raw_data, dict):
        print(f"[Eagle] Output: {len(raw_data)} frames (raw format)", file=sys.stderr)
    elif isinstance(raw_data, list):
        print(f"[Eagle] Output: {len(raw_data)} frames", file=sys.stderr)

    print(f"[Eagle] Saved to: {output_path}", file=sys.stderr)
    sys.exit(0)


if __name__ == "__main__":
    main()
''')

eagle_wrapper.chmod(0o755)
print_status("Eagle  wrapper created", "SUCCESS")

In [None]:
from google.colab import drive
from pathlib import Path

drive.mount("/content/drive")

# Base folder on Drive where all results will go
BASE_DIR = Path("/content/drive/MyDrive/tracklab_eval")  # change name if you like
OUTPUT_DIR = BASE_DIR / "results"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print("Results will be saved under:", OUTPUT_DIR)


In [None]:
# Cell: Final System Evaluation

import time
import json
import subprocess
import os
import psutil
import threading
from pathlib import Path

# System configurations
SYSTEM_CONFIGS = {
    "eagle": {
        "path": REPOS_DIR / "eagle",
        "script": "run_eagle.py",
        "python": "python3.13",
    },
    "yolo11_botsort": {
        "path": REPOS_DIR / "ultra_trackers",
        "script": "run_ultra_yolo_tracker.py",
        "args": ["--weights", "yolo11m.pt", "--tracker", "botsort"],
    },
    "darkmyter": {
        "path": REPOS_DIR / "darkmyter",
        "script": "run_darkmyter.py",
    },
}

def get_gpu_memory():
    """Get current GPU memory usage in MB using nvidia-smi"""
    try:
        result = subprocess.run(
            ['nvidia-smi', '--query-gpu=memory.used', '--format=csv,nounits,noheader'],
            capture_output=True, text=True
        )
        return int(result.stdout.strip().split('\n')[0])
    except:
        return None

def monitor_memory(stop_event, memory_stats):
    """Background thread to sample memory usage"""
    memory_stats['peak_ram_mb'] = 0
    memory_stats['peak_gpu_mb'] = 0

    while not stop_event.is_set():
        ram_mb = psutil.virtual_memory().used / (1024 * 1024)
        memory_stats['peak_ram_mb'] = max(memory_stats['peak_ram_mb'], ram_mb)

        gpu_mb = get_gpu_memory()
        if gpu_mb:
            memory_stats['peak_gpu_mb'] = max(memory_stats['peak_gpu_mb'], gpu_mb)

        stop_event.wait(0.5)

# Ask user for processing mode
print("EVALUATION MODE SELECTION \n")
print("\nHow do you want to evaluate the videos?")
print("  1. Use clips (faster - 60s segments)")
print("  2. Use full videos (comprehensive but slower)")

mode_choice = input("\nEnter your choice (1 or 2): ").strip()
USE_CLIPS = mode_choice != '2'

if USE_CLIPS:
    print_status("Mode: CLIP-BASED EVALUATION", "INFO")
    ALL_VIDEOS = VIDEO_CLIPS
    eval_type = "clips"
else:
    print_status("Mode: FULL VIDEO EVALUATION", "INFO")
    ALL_VIDEOS = FULL_VIDEOS
    eval_type = "full"

position_to_number = {"start": 1, "middle": 2, "end": 3, "full": 1}

def run_system_on_video(system_name, system_config, video_name, segment_name, video_path):
    """Run a tracking system on a video or clip"""

    if USE_CLIPS:
        segment_number = position_to_number.get(segment_name, 1)
        output_dir = OUTPUT_DIR / video_name / "clips" / str(segment_number) / system_name
    else:
        output_dir = OUTPUT_DIR / video_name / "full" / system_name

    output_dir.mkdir(parents=True, exist_ok=True)

    if USE_CLIPS:
        print_status(f"Running {system_name} on {video_name}/clip_{segment_number}...", "INFO")
    else:
        print_status(f"Running {system_name} on {video_name} (full video)...", "INFO")

    output_file = output_dir / f"{system_name}_output.json"
    system_path = system_config.get("path", REPOS_DIR)

    # Build command
    if system_name == "eagle":
        cmd = [
            "uv", "run", "--python", system_config.get("python", "python3.13"),
            "run_eagle.py",
            "--video", str(video_path),
            "--output", str(output_file),
        ]
    else:
        cmd = [
            "python", system_config["script"],
            "--video", str(video_path),
            "--output", str(output_file),
        ] + [str(extra) for extra in system_config.get("args", [])]

    # Get baseline memory before starting
    baseline_ram = psutil.virtual_memory().used / (1024 * 1024)
    baseline_gpu = get_gpu_memory() or 0

    # Start memory monitoring
    memory_stats = {}
    stop_event = threading.Event()
    monitor_thread = threading.Thread(target=monitor_memory, args=(stop_event, memory_stats))
    monitor_thread.start()

    start_time = time.time()

    try:
        result = subprocess.run(
            cmd,
            capture_output=True,
            text=True,
            cwd=str(system_path),
        )

        elapsed = time.time() - start_time

        # Stop monitoring
        stop_event.set()
        monitor_thread.join()

        # Calculate peak usage above baseline
        peak_ram_delta = max(0, memory_stats.get('peak_ram_mb', 0) - baseline_ram)
        peak_gpu_delta = max(0, memory_stats.get('peak_gpu_mb', 0) - baseline_gpu)

        if result.returncode == 0 and output_file.exists():
            try:
                with open(output_file) as f:
                    data = json.load(f)

                if isinstance(data, list):
                    num_detections = len(data)
                elif isinstance(data, dict):
                    num_detections = sum(
                        len(dets) if isinstance(dets, list) else 0
                        for dets in data.values()
                    )
                else:
                    num_detections = 0

                print_status(
                    f"{system_name}: SUCCESS - {num_detections} detections in {elapsed:.1f}s "
                    f"(RAM: +{peak_ram_delta:.0f}MB, GPU: +{peak_gpu_delta:.0f}MB)",
                    "SUCCESS",
                )
                return {
                    "success": True,
                    "time": elapsed,
                    "output": str(output_file),
                    "detections": num_detections,
                    "peak_ram_mb": peak_ram_delta,
                    "peak_gpu_mb": peak_gpu_delta,
                }

            except json.JSONDecodeError as e:
                print_status(f"{system_name}: Invalid JSON", "ERROR")
                return {
                    "success": False,
                    "time": elapsed,
                    "error": f"Invalid JSON: {e}",
                    "peak_ram_mb": peak_ram_delta,
                    "peak_gpu_mb": peak_gpu_delta,
                }
        else:
            error_msg = result.stderr[-500:] if result.stderr else "Unknown error"
            print_status(f"{system_name}: FAILED", "ERROR")
            print(f"Error: {error_msg}")
            return {
                "success": False,
                "time": elapsed,
                "error": error_msg,
                "peak_ram_mb": peak_ram_delta,
                "peak_gpu_mb": peak_gpu_delta,
            }

    except subprocess.TimeoutExpired:
        stop_event.set()
        monitor_thread.join()
        peak_ram_delta = max(0, memory_stats.get('peak_ram_mb', 0) - baseline_ram)
        peak_gpu_delta = max(0, memory_stats.get('peak_gpu_mb', 0) - baseline_gpu)
        print_status(f"{system_name}: TIMEOUT", "ERROR")
        return {
            "success": False,
            "time": 10000,
            "error": "Timeout",
            "peak_ram_mb": peak_ram_delta,
            "peak_gpu_mb": peak_gpu_delta,
        }

    except Exception as e:
        stop_event.set()
        monitor_thread.join()
        peak_ram_delta = max(0, memory_stats.get('peak_ram_mb', 0) - baseline_ram)
        peak_gpu_delta = max(0, memory_stats.get('peak_gpu_mb', 0) - baseline_gpu)
        print_status(f"{system_name}: EXCEPTION - {str(e)}", "ERROR")
        return {
            "success": False,
            "time": time.time() - start_time,
            "error": str(e),
            "peak_ram_mb": peak_ram_delta,
            "peak_gpu_mb": peak_gpu_delta,
        }

def save_progress(all_results, eval_type):
    """Save current progress to disk (in Drive, via OUTPUT_DIR)"""
    progress_file = OUTPUT_DIR / f"progress_{eval_type}.json"
    with open(progress_file, "w") as f:
        json.dump(all_results, f, indent=2)
    return progress_file

def load_progress(eval_type):
    """Load existing progress if available"""
    progress_file = OUTPUT_DIR / f"progress_{eval_type}.json"
    if progress_file.exists():
        try:
            with open(progress_file) as f:
                return json.load(f)
        except Exception:
            return {}
    return {}

print(f"STARTING {eval_type.upper()} EVALUATION\n")

all_results = load_progress(eval_type)
if all_results:
    print_status(f"Loaded existing progress with {len(all_results)} videos", "INFO")
    print("Videos already processed:")
    for video_name in all_results.keys():
        print(f"  - {video_name}")

    print("\nDo you want to:")
    print("  1. Continue from where you left off")
    print("  2. Start fresh (delete existing progress)")
    continue_choice = input("\nEnter your choice (1 or 2): ").strip()

    if continue_choice == '2':
        all_results = {}
        print_status("Starting fresh evaluation", "INFO")

for video_name, segments in ALL_VIDEOS.items():
    print(f"\nVIDEO: {video_name}")

    video_results = all_results.get(video_name, {})

    for segment_name, video_path in segments.items():
        if USE_CLIPS:
            segment_number = position_to_number.get(segment_name, 1)
            segment_key = f"clip_{segment_number}"
            print(f"\nProcessing clip {segment_number} ({segment_name})...")
        else:
            segment_key = "full"
            print(f"\nProcessing full video...")

        segment_results = video_results.get(segment_key, {})
        video_results[segment_key] = segment_results

        for system_name, system_config in SYSTEM_CONFIGS.items():
            if system_name in segment_results and segment_results[system_name].get("success"):
                print_status(f"{system_name}: Already completed successfully", "SKIP")
                continue
            elif system_name in segment_results:
                print_status(f"{system_name}: Retrying previous failure", "RETRY")

            result = run_system_on_video(
                system_name, system_config, video_name, segment_name, video_path
            )
            segment_results[system_name] = result

            all_results[video_name] = video_results
            progress_file = save_progress(all_results, eval_type)
            print_status(f"Progress saved to {progress_file.name}", "SAVE")

        successful = sum(1 for r in segment_results.values() if r.get("success", False))
        total = len(segment_results)

        if USE_CLIPS:
            print(f"\nClip summary: {successful}/{total} systems succeeded")
        else:
            print(f"\nVideo summary: {successful}/{total} systems succeeded")

    summary_file = OUTPUT_DIR / video_name / f"summary_{eval_type}.json"
    summary_file.parent.mkdir(parents=True, exist_ok=True)
    with open(summary_file, "w") as f:
        json.dump(video_results, f, indent=2)
    print_status(f"Video summary saved to {summary_file.name}", "SAVE")

# Save overall summary
overall_summary = OUTPUT_DIR / f"overall_summary_{eval_type}.json"
with open(overall_summary, "w") as f:
    json.dump(all_results, f, indent=2)

print("\n" + "=" * 60)
print(f"{eval_type.upper()} EVALUATION COMPLETE")
print("=" * 60)

system_stats = {
    sys: {
        "success": 0,
        "total": 0,
        "avg_time": [],
        "avg_detections": [],
        "avg_ram": [],
        "avg_gpu": [],
    }
    for sys in SYSTEM_CONFIGS.keys()
}

for video_results in all_results.values():
    for segment_results in video_results.values():
        for system_name, result in segment_results.items():
            if system_name in system_stats:
                system_stats[system_name]["total"] += 1
                if result.get("success", False):
                    system_stats[system_name]["success"] += 1
                    if "time" in result:
                        system_stats[system_name]["avg_time"].append(result["time"])
                    if "detections" in result:
                        system_stats[system_name]["avg_detections"].append(result["detections"])
                    if "peak_ram_mb" in result:
                        system_stats[system_name]["avg_ram"].append(result["peak_ram_mb"])
                    if "peak_gpu_mb" in result:
                        system_stats[system_name]["avg_gpu"].append(result["peak_gpu_mb"])

print("\nSystem Performance Summary:")
print("-" * 50)
for system_name, stats in system_stats.items():
    if stats["total"] > 0:
        success_rate = (stats["success"] / stats["total"]) * 100
        print(f"\n{system_name}:")
        print(f"  Success Rate: {stats['success']}/{stats['total']} ({success_rate:.1f}%)")
        if stats["avg_time"]:
            avg_time = sum(stats["avg_time"]) / len(stats["avg_time"])
            print(f"  Avg Time: {avg_time:.1f}s")
        if stats["avg_detections"]:
            avg_det = sum(stats["avg_detections"]) / len(stats["avg_detections"])
            print(f"  Avg Detections: {avg_det:.0f}")
        if stats["avg_ram"]:
            avg_ram = sum(stats["avg_ram"]) / len(stats["avg_ram"])
            max_ram = max(stats["avg_ram"])
            print(f"  Avg RAM: +{avg_ram:.0f}MB (peak: +{max_ram:.0f}MB)")
        if stats["avg_gpu"]:
            avg_gpu = sum(stats["avg_gpu"]) / len(stats["avg_gpu"])
            max_gpu = max(stats["avg_gpu"])
            print(f"  Avg GPU: +{avg_gpu:.0f}MB (peak: +{max_gpu:.0f}MB)")

print(f"\nResults Directory: {OUTPUT_DIR}")
print(f"Overall Summary: {overall_summary}")
print(f"Progress File: {OUTPUT_DIR / f'progress_{eval_type}.json'}")

total_expected = len(ALL_VIDEOS) * len(next(iter(ALL_VIDEOS.values())))
total_processed = sum(len(video_results) for video_results in all_results.values())

if total_processed == total_expected:
    print("\n")
    print("ALL VIDEOS PROCESSED SUCCESSFULLY!")
    print("Progress file kept for reference.")
else:
    print("\n")
    print(f"PARTIAL COMPLETION: {total_processed}/{total_expected} segments processed")
    print("Run the script again to continue from where you left off.")

In [None]:
#Cell 10: Download pre-run results from google.

import os

if not os.path.exists("/content/results"):
    !gdown 1-TR0bycui1zpL5TRZLzhdxcuJIv9qLUO -O results.zip
    !unzip results.zip -d /content
    print("Using cached results")


In [None]:
# Cell 9: Download results

from google.colab import files
import shutil

print_status("Creating archive...", "INFO")

archive_name = "tracking_results"
archive_path = BASE_DIR / archive_name

shutil.make_archive(str(archive_path), 'zip', OUTPUT_DIR)

print_status("Downloading...", "SUCCESS")
files.download(f"{archive_path}.zip")

print_status("Complete!", "SUCCESS")