# Building NN to Identify Individual Players #

## Motivation ##

Player performance analysis requires a strong tracking mechanism. Position-based trackers such as ByteTrack can mistake identities when multiple detections overlap. Pre-trained identification algorithms like SigLip also perform very poorly, in part due to the very low resultion of the detection crops (which can be as small as 10 x 30 pixels).

We will attempt at training a model specific for our purposes, leveraging  tracklet-based self-supervision to create a triplet ([A]nchor, [P]ositive and [N]egative) data set to be fed into a siemese NN.

## Common Elements ##

In [10]:
import sys

# Set this to the absolute path of your project root
project_root = "/Users/fernandomousinho/Documents/Learning_to_Code/LaxAI"
if project_root not in sys.path:
    sys.path.insert(0, project_root)

from typing import Optional, List, Dict, Tuple
import torch
import supervision as sv
from tqdm import tqdm
from collections import deque, defaultdict
import numpy as np
import cv2
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from PIL import Image
import umap
import json

from modules.detection import DetectionModel
from modules.player import Player
from modules.team_identification import TeamIdentifier
from tools.store_driver import Store
from modules.custom_tracker import AffineAwareByteTrack
from modules.Siglip_reid import SiglipReID

In [3]:
input_video = "/Users/fernandomousinho/Library/CloudStorage/GoogleDrive-fmousinho76@gmail.com/My Drive/Colab_Notebooks/FCA_Upstate_NY_003.mp4"
device = torch.device("cpu")
store = Store()
debug_max_frames = 500

## Curate Training Data ##

In [6]:
RESULT_JSON_FILE_PATH = "detections.json"

In [None]:
video_info = sv.VideoInfo.from_video_path(video_path=input_video)
generator_params = {
    "source_path": input_video,
    "end": debug_max_frames if debug_max_frames else video_info.total_frames,
}
frames_generator = sv.get_video_frames_generator(**generator_params)
model = DetectionModel(store=store, device=device)

tracker = AffineAwareByteTrack(id_type='external', maintain_separate_track_obj=False)

frame_target = debug_max_frames if debug_max_frames else video_info.total_frames

frame_generator = sv.get_video_frames_generator(stride=1, **generator_params)

json_sink = sv.JSONSink(RESULT_JSON_FILE_PATH)

previous_frame: Optional[np.ndarray] = None
frame_id = 0

with json_sink as sink:
    for frame in tqdm(frame_generator, desc="Processing frames", total=frame_target):
        all_detections = model.generate_detections(frame)
        all_detections = all_detections.with_nms(threshold=0.4, class_agnostic=False)

        if previous_frame is not None:
            affine_matrix = tracker.calculate_affine_transform(previous_frame, frame)
        else:
            affine_matrix = tracker.get_identity_affine_matrix()
        previous_frame = frame.copy()

        all_detections = tracker.update_with_transform(
            detections=all_detections,
            frame=frame,
            affine_matrix=affine_matrix
        )
        sink.append(all_detections, custom_data={"frame_id": frame_id})
        frame_id += 1


Loading pretrain weights


Processing frames: 100%|██████████| 500/500 [01:28<00:00,  5.67it/s]


In [11]:
def json_to_detections(json_file: str) -> List[sv.Detections]:
    rows_by_frame_number = defaultdict(list)
    with open(json_file, "r") as f:
        data = json.load(f)
    for row in data:
        frame_number = int(row["frame_id"])
        rows_by_frame_number[frame_number].append(row)

    detections_list = []
    for frame_number, rows in rows_by_frame_number.items():
        xyxy = []
        class_id = []
        confidence = []
        tracker_id = []
        custom_data = defaultdict(list)

        for row in rows:
            xyxy.append([row[key] for key in ["x_min", "y_min", "x_max", "y_max"]])
            class_id.append(row["class_id"])
            confidence.append(row["confidence"])
            tracker_id.append(row["tracker_id"])

            for custom_key in row.keys():
                if custom_key in ["x_min", "y_min", "x_max", "y_max", "class_id", "confidence", "tracker_id"]:
                    continue
                custom_data[custom_key].append(row[custom_key])

        if all([val == "" for val in class_id]):
            class_id = None
        if all([val == "" for val in confidence]):
            confidence = None
        if all([val == "" for val in tracker_id]):
            tracker_id = None

        detections_list.append(
            sv.Detections(
                xyxy=np.array(xyxy, dtype=np.float32),
                class_id=np.array(class_id, dtype=int),
                confidence=np.array(confidence, dtype=np.float32),
                tracker_id=np.array(tracker_id, dtype=int),
                data=dict(custom_data)
            )
        )
    
    return detections_list

In [12]:
all_detections = json_to_detections(RESULT_JSON_FILE_PATH)

In [13]:
all_detections

[Detections(xyxy=array([[1411.8977  ,  416.3982  , 1440.1497  ,  485.7675  ],
        [ 173.87529 ,  435.13794 ,  199.1615  ,  499.52884 ],
        [ 932.08575 ,  375.50076 ,  950.6435  ,  423.36725 ],
        [1321.875   ,  389.51437 , 1346.2482  ,  445.90344 ],
        [ 109.979256,  394.32056 ,  136.8536  ,  447.85153 ],
        [1728.9248  ,  396.95142 , 1752.5337  ,  445.99194 ],
        [ 793.1556  ,  378.97797 ,  813.1447  ,  428.47665 ],
        [1500.3121  ,  367.59988 , 1523.2166  ,  405.5103  ],
        [ 518.413   ,  388.37735 ,  542.0374  ,  440.76462 ],
        [1479.376   ,  386.11887 , 1500.6593  ,  433.79218 ],
        [1703.1719  ,  377.07166 , 1729.9406  ,  418.62247 ],
        [ 894.08936 ,  362.389   ,  913.3689  ,  400.4388  ],
        [1165.175   ,  349.4763  , 1181.7063  ,  384.49448 ],
        [1611.0602  ,  370.54886 , 1628.468   ,  412.5948  ],
        [ 500.9124  ,  385.6018  ,  525.374   ,  439.30518 ]],
       dtype=float32), mask=None, confidence=array([0

In [None]:
import shutil
import random

random.seed(42)  # For reproducibility

src_data_dir = "data"
train_dir = "train"
val_dir = "val"

# Create train and val directories
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)

# For each tracker_id directory in data/
for track_id in os.listdir(src_data_dir):
    track_path = os.path.join(src_data_dir, track_id)
    if not os.path.isdir(track_path):
        continue

    # List all crop files for this track
    crop_files = [f for f in os.listdir(track_path) if f.endswith('.jpg')]
    random.shuffle(crop_files)

    split_idx = int(0.8 * len(crop_files))
    train_files = crop_files[:split_idx]
    val_files = crop_files[split_idx:]

    # Create per-track folders in train/ and val/
    train_track_dir = os.path.join(train_dir, track_id)
    val_track_dir = os.path.join(val_dir, track_id)
    os.makedirs(train_track_dir, exist_ok=True)
    os.makedirs(val_track_dir, exist_ok=True)

    # Copy files
    for fname in train_files:
        src = os.path.join(track_path, fname)
        dst = os.path.join(train_track_dir, fname)
        shutil.copy2(src, dst)

    for fname in val_files:
        src = os.path.join(track_path, fname)
        dst = os.path.join(val_track_dir, fname)
        shutil.copy2(src, dst)

print(f"Done! Crops split into '{train_dir}' and '{val_dir}' with per-track structure.")

In [14]:
import os
from tqdm import tqdm
import cv2
from collections import deque

# Create the main data directory
os.makedirs("data", exist_ok=True)

# Build a mapping from track_id to list of (frame_id, bbox, confidence)
track_detections = {}

frame_generator = sv.get_video_frames_generator(stride=1, source_path=input_video)
frame_idx = 0
next_detected_frame = all_detections[0].data["frame_id"][0] if all_detections else 0
all_detections_dq = deque(all_detections)

for frame in tqdm(frame_generator, desc="Processing frames for crop extraction"):
    if frame_idx != next_detected_frame:
        frame_idx += 1
        continue
    
    detections = all_detections_dq.popleft()
    
    # Extract crops for each detection in this frame
    for i in range(len(detections.xyxy)):
        frame_id = detections.data["frame_id"][i]
        bbox = detections.xyxy[i]  # [x1, y1, x2, y2]
        tracker_id = detections.tracker_id[i]
        confidence = detections.confidence[i]
        
        if tracker_id is None:
            continue
            
        # Create folder for this tracker_id if it doesn't exist
        track_folder = os.path.join("data", str(tracker_id))
        os.makedirs(track_folder, exist_ok=True)
        
        # Extract crop from frame
        x1, y1, x2, y2 = map(int, bbox)
        crop = frame[y1:y2, x1:x2]
        
        # Save crop with filename: frame_id_confidence.jpg
        crop_filename = f"{frame_id}_{confidence:.3f}.jpg"
        crop_path = os.path.join(track_folder, crop_filename)
        cv2.imwrite(crop_path, crop)
    
    # Update for next frame
    if len(all_detections_dq) > 0:
        next_detected_frame = all_detections_dq[0].data["frame_id"][0]
    else:
        break
    frame_idx += 1

print(f"Crop extraction complete! Check the 'data' directory for organized crops.")



Processing frames for crop extraction: 0it [00:00, ?it/s]

Processing frames for crop extraction: 499it [00:02, 177.17it/s]

Crop extraction complete! Check the 'data' directory for organized crops.





In [None]:
import shutil
import random

# Directories for train and val splits
train_dir = "train"
val_dir = "val"
os.makedirs(train_dir, exist_ok=True)
os.makedirs(val_dir, exist_ok=True)

# For each tracker_id directory in 'data', split crops into train/val
for tracker_id in os.listdir("data"):
    track_folder = os.path.join("data", tracker_id)
    if not os.path.isdir(track_folder):
        continue
    crops = [f for f in os.listdir(track_folder) if f.lower().endswith(('.jpg', '.png'))]
    if not crops:
        continue
    random.shuffle(crops)
    split_idx = int(0.8 * len(crops))
    train_crops = crops[:split_idx]
    val_crops = crops[split_idx:]
    # Create per-track folders in train and val
    train_track_folder = os.path.join(train_dir, tracker_id)
    val_track_folder = os.path.join(val_dir, tracker_id)
    os.makedirs(train_track_folder, exist_ok=True)
    os.makedirs(val_track_folder, exist_ok=True)
    # Copy crops
    for crop_file in train_crops:
        src = os.path.join(track_folder, crop_file)
        dst = os.path.join(train_track_folder, crop_file)
        shutil.copy2(src, dst)
    for crop_file in val_crops:
        src = os.path.join(track_folder, crop_file)
        dst = os.path.join(val_track_folder, crop_file)
        shutil.copy2(src, dst)

print("Train/val split complete! Check the 'train' and 'val' directories.")
