# Football Player Tracking System

In [6]:
# Environment setup
!pip install ultralytics torch torchvision opencv-python scipy lap numpy



In [7]:
# Import all necessary libraries
import os
import sys
import cv2
import torch
import torch.nn.functional as F
from ultralytics import YOLO
from ultralytics.nn.tasks import DetectionModel
import numpy as np
import torchvision.transforms as T
import torchvision.models as models
from scipy.optimize import linear_sum_assignment
import pickle
from collections import defaultdict

# Check GPU availability
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda:0


### Common Functions

In [8]:
def load_model(path):
    """Load YOLO model with proper error handling"""
    # Basic file checks
    if not os.path.isfile(path):
        print(f"Error: model file not found at {path}")
        return None
    if os.path.getsize(path) == 0:
        print(f"Error: model file at {path} is empty")
        return None

    # Allowlisting DetectionModel for safe unpickle
    torch.serialization.add_safe_globals([DetectionModel])

    try:
        # Test torch.load first
        torch.load(path, map_location=device, weights_only=False)

        # Load with Ultralytics YOLO
        model = YOLO(path)
        model.to(device)
        print("Model classes:", model.names)
        return model
    except Exception as e:
        print(f"Error loading model: {e}")
        return None

def draw_player_annotations(frame, player_box, ball_centers, track_id=None, proximity_thresh=30):
    """Draw tactical camera style ellipse annotations for players"""
    x1, y1, x2, y2 = player_box
    feet_x = (x1 + x2) // 2
    feet_y = y2

    # Default color: cyan, green if ball is close
    color = (255, 255, 0)
    if ball_centers:
        distances = [np.hypot(feet_x - bx, feet_y - by) for (bx, by) in ball_centers]
        if any(dist <= proximity_thresh for dist in distances):
            color = (0, 255, 0)

    # Draw tactical cam ellipse
    axes = ((x2 - x1) // 3, 4)
    angle = -3
    cv2.ellipse(frame, (feet_x, feet_y), axes, angle, 0, 360, color, 2)

    # Add track ID if provided
    if track_id is not None:
        head_x = (x1 + x2) // 2
        cv2.putText(frame, f"ID:{track_id}", (head_x, y1 - 10),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 165, 255), 2)

def draw_ball_tracker(frame, ball_center):
    """Draw downward-pointing triangle above ball"""
    bx, by = ball_center
    size = 8
    height = 16
    pts = np.array([
        [bx - size, by - height],  # left corner
        [bx + size, by - height],  # right corner
        [bx, by]                   # bottom tip at ball center
    ], np.int32)
    pts = pts.reshape((-1, 1, 2))
    cv2.fillPoly(frame, [pts], (0, 255, 0))

## 1. Tactical Camera Processing

In [9]:
def track_tactical_camera(model_path='best.pt', input_path='tacticam.mp4', output_path='tacticam_tracked_bytetrack.mp4'):
    """Track players and ball in tactical camera footage"""

    # Load model
    model = load_model(model_path)
    if model is None:
        return

    # Open video
    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print(f"Error: could not open video {input_path}")
        return

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Setup video writer
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    # Track with ByteTrack
    results = model.track(
        source=input_path,
        stream=True,
        tracker='bytetrack.yaml',
        persist=True,
        hide_labels=False,
        hide_conf=False,
        device=device,
    )

    frame_idx = 0
    for result in results:
        frame = result.orig_img
        ball_centers = []

        # First pass: collect ball centers
        for box in result.boxes:
            cls_id = int(box.cls[0].cpu().numpy())
            class_name = model.names.get(cls_id, str(cls_id)).lower()
            if 'ball' in class_name:
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
                cx = (x1 + x2) // 2
                cy = (y1 + y2) // 2
                ball_centers.append((cx, cy))
                draw_ball_tracker(frame, (cx, cy))

        # Debug: check for ball detection
        if not ball_centers and frame_idx % 30 == 0:  # Print every 30 frames
            detected_classes = [model.names[int(b.cls[0])] for b in result.boxes]
            print(f"Frame {frame_idx}: No ball detected (classes: {detected_classes})")

        # Second pass: draw player annotations
        for box in result.boxes:
            cls_id = int(box.cls[0].cpu().numpy())
            class_name = model.names.get(cls_id, str(cls_id)).lower()
            if 'ball' not in class_name:
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
                track_id = int(box.id[0].cpu().numpy())
                draw_player_annotations(frame, (x1, y1, x2, y2), ball_centers, track_id)

        writer.write(frame)
        frame_idx += 1

    cap.release()
    writer.release()
    print(f"Tactical camera tracking complete! Output saved to {output_path}")

# Run tactical camera tracking
track_tactical_camera()

Model classes: {0: 'ball', 1: 'goalkeeper', 2: 'player', 3: 'referee'}

video 1/1 (frame 1/201) /content/tacticam.mp4: 384x640 22 players, 2 referees, 68.6ms
Frame 0: No ball detected (classes: ['player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'player', 'referee', 'player', 'referee'])
video 1/1 (frame 2/201) /content/tacticam.mp4: 384x640 22 players, 2 referees, 40.5ms
video 1/1 (frame 3/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 40.1ms
video 1/1 (frame 4/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 39.5ms
video 1/1 (frame 5/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 39.4ms
video 1/1 (frame 6/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 32.2ms
video 1/1 (frame 7/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 33.3ms
video 1/1 (frame 8/201) /content/tacticam.mp4: 384x

## 2. Broadcast Camera Processing

In [10]:
def init_reid_model(device):
    """Initialize ReID model for player re-identification"""
    reid = models.resnet50(pretrained=True)
    reid.fc = torch.nn.Identity()  # Remove classification head
    reid.to(device).eval()

    # Preprocessing transform
    transform = T.Compose([
        T.ToPILImage(),
        T.Resize((224, 224)),
        T.ToTensor(),
        T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    return reid, transform

def extract_embedding(crop, reid, transform, device):
    """Extract ReID embedding from player crop"""
    if crop.size == 0:
        return torch.zeros(2048).to(device)

    try:
        x = transform(crop).unsqueeze(0).to(device)
        with torch.no_grad():
            feat = reid(x)
        return F.normalize(feat, p=2, dim=1).squeeze(0)
    except Exception:
        return torch.zeros(2048).to(device)

def track_and_embed(video_path, model, reid, transform, device):
    """Extract embeddings from tactical camera for each tracked player"""
    tact_embs = defaultdict(list)

    results = model.track(
        source=video_path,
        stream=True,
        tracker='bytetrack.yaml',
        persist=True,
        hide_labels=True,
        hide_conf=True,
        device=device
    )

    for res in results:
        frame = res.orig_img
        for box in res.boxes:
            cls_id = int(box.cls[0].cpu())
            name = model.names[cls_id].lower()
            if 'ball' in name:
                continue

            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
            track_id = int(box.id[0].cpu())

            # Extract player crop
            crop = frame[y1:y2, x1:x2]
            if crop.shape[0] == 0 or crop.shape[1] == 0:
                continue

            # Get embedding
            emb = extract_embedding(crop, reid, transform, device)
            tact_embs[track_id].append(emb)

    return tact_embs

def compute_mean_embeddings(tact_embs):
    """Compute mean embeddings for each player"""
    mean_embs = {}
    for pid, embs in tact_embs.items():
        if embs:
            mean_embs[pid] = F.normalize(torch.stack(embs).mean(0), p=2, dim=0)
    return mean_embs

def match_embeddings_hungarian(curr_embs, mean_embs):
    """Match current embeddings to known players using Hungarian algorithm"""
    if not mean_embs or not curr_embs:
        return {}

    pids = list(mean_embs.keys())
    cost = np.zeros((len(curr_embs), len(pids)), dtype=np.float32)

    for i, emb in enumerate(curr_embs):
        for j, pid in enumerate(pids):
            sim = F.cosine_similarity(emb.unsqueeze(0), mean_embs[pid].unsqueeze(0))
            cost[i, j] = -sim.item()  # Negative because we want to maximize similarity

    row_idx, col_idx = linear_sum_assignment(cost)
    return {i: pids[j] if -cost[i, j] > 0.3 else None for i, j in zip(row_idx, col_idx)}

def temporal_smoothing(assignments, max_gap=10):
    """Apply temporal smoothing to reduce ID switching"""
    smoothed = {}
    last_valid = {}

    for frame_idx in sorted(assignments):
        frame_ids = assignments[frame_idx]
        smooth_frame = []

        for i, pid in enumerate(frame_ids):
            if pid is not None:
                last_valid[i] = (frame_idx, pid)
                smooth_frame.append(pid)
            else:
                if i in last_valid and frame_idx - last_valid[i][0] <= max_gap:
                    smooth_frame.append(last_valid[i][1])
                else:
                    smooth_frame.append(None)

        smoothed[frame_idx] = smooth_frame

    return smoothed

def save_embeddings(tact_embs, path='tactic_embeddings.pkl'):
    """Save embeddings to file"""
    with open(path, 'wb') as f:
        pickle.dump(dict(tact_embs), f)
    print(f"Saved embeddings to {path}")

def load_embeddings(path='tactic_embeddings.pkl'):
    """Load embeddings from file"""
    if os.path.exists(path):
        with open(path, 'rb') as f:
            return pickle.load(f)
    return None

In [11]:
def process_broadcast_tracking(model_path='best.pt',
                              tactical_video='tacticam.mp4',
                              broadcast_video='broadcast.mp4',
                              output_path='broadcast_tracked.mp4'):
    """Complete broadcast tracking pipeline"""

    # Load model and ReID
    model = load_model(model_path)
    if model is None:
        return

    reid, transform = init_reid_model(device)

    # Step 1: Extract embeddings from tactical camera (or load if exists)
    embedding_file = 'tactic_embeddings.pkl'
    if os.path.exists(embedding_file):
        print("Loading existing embeddings...")
        tact_embs = load_embeddings(embedding_file)
    else:
        print("Extracting embeddings from tactical camera...")
        tact_embs = track_and_embed(tactical_video, model, reid, transform, device)
        save_embeddings(tact_embs, embedding_file)

    # Step 2: Compute mean embeddings
    mean_embs = compute_mean_embeddings(tact_embs)
    print(f"Computed mean embeddings for {len(mean_embs)} players")

    # Step 3: Process broadcast video
    print("Processing broadcast video...")
    cap = cv2.VideoCapture(broadcast_video)
    fps = cap.get(cv2.CAP_PROP_FPS)
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    writer = cv2.VideoWriter(output_path, cv2.VideoWriter_fourcc(*'mp4v'), fps, (w, h))

    assignments = {}
    results = model.track(
        source=broadcast_video,
        stream=True,
        tracker='bytetrack.yaml',
        persist=True,
        hide_labels=True,
        hide_conf=True,
        device=device
    )

    frame_idx = 0
    for res in results:
        frame = res.orig_img
        current_embs = []
        bboxes = []

        # Extract embeddings for current frame
        for box in res.boxes:
            cls_id = int(box.cls[0].cpu())
            if 'ball' in model.names[cls_id].lower():
                continue

            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy().astype(int)
            crop = frame[y1:y2, x1:x2]

            if crop.shape[0] == 0 or crop.shape[1] == 0:
                continue

            emb = extract_embedding(crop, reid, transform, device)
            current_embs.append(emb)
            bboxes.append((x1, y1, x2, y2))

        # Match embeddings to known players
        matches = match_embeddings_hungarian(current_embs, mean_embs)
        assignments[frame_idx] = [matches.get(i) for i in range(len(current_embs))]

        # Draw annotations
        for i, (x1, y1, x2, y2) in enumerate(bboxes):
            track_id = assignments[frame_idx][i]
            draw_player_annotations(frame, (x1, y1, x2, y2), [], track_id)

        writer.write(frame)
        frame_idx += 1

        if frame_idx % 100 == 0:
            print(f"Processed {frame_idx} frames")

    cap.release()
    writer.release()

    # Step 4: Apply temporal smoothing
    print("Applying temporal smoothing...")
    smoothed = temporal_smoothing(assignments)

    print(f"Broadcast tracking complete! Output saved to {output_path}")
    return smoothed

# Run broadcast tracking
smoothed_results = process_broadcast_tracking()

Model classes: {0: 'ball', 1: 'goalkeeper', 2: 'player', 3: 'referee'}


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 165MB/s]


Extracting embeddings from tactical camera...

video 1/1 (frame 1/201) /content/tacticam.mp4: 384x640 22 players, 2 referees, 57.6ms
video 1/1 (frame 2/201) /content/tacticam.mp4: 384x640 22 players, 2 referees, 42.9ms
video 1/1 (frame 3/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 32.4ms
video 1/1 (frame 4/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 30.8ms
video 1/1 (frame 5/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 32.6ms
video 1/1 (frame 6/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 32.0ms
video 1/1 (frame 7/201) /content/tacticam.mp4: 384x640 22 players, 1 referee, 31.8ms
video 1/1 (frame 8/201) /content/tacticam.mp4: 384x640 21 players, 1 referee, 31.5ms
video 1/1 (frame 9/201) /content/tacticam.mp4: 384x640 21 players, 1 referee, 31.6ms
video 1/1 (frame 10/201) /content/tacticam.mp4: 384x640 21 players, 2 referees, 32.3ms
video 1/1 (frame 11/201) /content/tacticam.mp4: 384x640 20 players, 3 referees, 32.7ms
video 1/1 (f

## Analyze Results

In [12]:
# View tracking statistics
def analyze_tracking_results(smoothed_results):
    """Analyze the tracking results"""
    if smoothed_results:
        frame_count = len(smoothed_results)
        player_ids = set()

        for frame_assignments in smoothed_results.values():
            for pid in frame_assignments:
                if pid is not None:
                    player_ids.add(pid)

        print(f"Total frames processed: {frame_count}")
        print(f"Unique player IDs detected: {len(player_ids)}")
        print(f"Player IDs: {sorted(player_ids)}")

        # Calculate detection rate
        total_detections = sum(len([p for p in frame if p is not None])
                             for frame in smoothed_results.values())
        avg_detections_per_frame = total_detections / frame_count
        print(f"Average detections per frame: {avg_detections_per_frame:.2f}")

# Analyze results if you ran broadcast tracking
if 'smoothed_results' in locals():
    analyze_tracking_results(smoothed_results)

Total frames processed: 132
Unique player IDs detected: 36
Player IDs: [1, 2, 3, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 49, 54, 55, 66, 73, 76, 77, 87, 94, 101, 105, 106, 127, 144]
Average detections per frame: 11.30
