In [None]:
import cv2
import numpy as np
import torch
from pathlib import Path
from ultralytics import YOLO
from deep_sort_realtime.deepsort_tracker import DeepSort
from sklearn.cluster import AgglomerativeClustering
from sklearn.preprocessing import normalize

In [None]:
# --- Court dimensions for US Open Women's Singles ---
COURT_WIDTH_METERS = 8.23
COURT_LENGTH_METERS = 23.77

# Load YOLOv8 pretrained Object Detection Model
model = YOLO('yolov8n.pt')
model.conf = 0.4
model.classes = [0]  # Only detect person class

# Initialize DeepSORT
tracker = DeepSort(max_age=15)

# Court corner pixel coordinates (replace with accurate values)
pixel_court_pts = np.array([
    [374, 171],   # top-left
    [838, 167],   # top-right
    [1078, 607],  # bottom-right
    [131, 619]    # bottom-left
], dtype=np.float32)

# Real-world court points in meters
real_court_pts = np.array([
    [0, 0],
    [COURT_WIDTH_METERS, 0],
    [COURT_WIDTH_METERS, COURT_LENGTH_METERS],
    [0, COURT_LENGTH_METERS]
], dtype=np.float32)

# Compute homography matrix
homography_matrix, _ = cv2.findHomography(pixel_court_pts, real_court_pts)

# Global dictionaries
player_tracks = {}
player_appearance_features = {}
player_live_distances = {}
track_id_to_player_name = {}
cluster_to_player_name = {}

# Track first few player IDs to anchor appearance clustering
early_player_ids = set()
MAX_EARLY_IDS = 2

In [None]:
def to_meters(coords):
    pts = np.array(coords, dtype=np.float32).reshape(-1, 1, 2)
    transformed = cv2.perspectiveTransform(pts, homography_matrix)
    return transformed.reshape(-1, 2)


def process_frame_with_tracking(frame):
    results = model(frame, verbose=False)
    person_detections = []

    for box in results[0].boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
        conf = box.conf[0].cpu().item()
        cls = int(box.cls[0].cpu().item())
        if cls == 0 and conf >= model.conf:
            person_detections.append([[x1, y1, x2 - x1, y2 - y1], conf, 'person'])

    tracks = tracker.update_tracks(person_detections, frame=frame)

    for track in tracks:
        if not track.is_confirmed():
            continue

        track_id = track.track_id

        # Track early players (anchor the first two confirmed tracks)
        if len(early_player_ids) < MAX_EARLY_IDS and track_id not in early_player_ids:
            early_player_ids.add(track_id)

        if track.features is not None:
            if track_id not in player_appearance_features:
                player_appearance_features[track_id] = []
            feature = track.features[0]
            if isinstance(feature, torch.Tensor):
                feature = feature.cpu().numpy()
            player_appearance_features[track_id].append(feature)

            # Hard-code early known IDs as players
            if len(player_appearance_features) >= 2 and 1 in player_appearance_features and 3 in player_appearance_features:
                if track_id == 1:
                    track_id_to_player_name[1] = "Player A"
                elif track_id == 3:
                    track_id_to_player_name[3] = "Player B"

        l, t, r, b = track.to_ltrb()
        cx, cy = int((l + r) / 2), int(b)
        world_coord = to_meters([(cx, cy)])[0]

        if track_id not in player_tracks:
            player_tracks[track_id] = []
            player_live_distances[track_id] = 0.0

        if player_tracks[track_id]:
            last_pos = player_tracks[track_id][-1]
            dist = np.linalg.norm(world_coord - last_pos)
            if dist < 10:  # Cap to avoid huge jumps (Safety Check)
                player_live_distances[track_id] += dist
            

        player_tracks[track_id].append(world_coord)


        # Use mapped name if available
        display_name = track_id_to_player_name.get(track_id, f"ID {track_id}")

        cv2.circle(frame, (cx, cy), 5, (0, 255, 0), -1)
        cv2.putText(frame, display_name, (int(l), int(t) - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
        cv2.putText(frame, f"{player_live_distances[track_id]:.1f} m", (int(l), int(t) + 20), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)

    return frame


def merge_tracks_by_appearance():
    avg_features = {tid: np.mean(np.vstack(feats), axis=0) for tid, feats in player_appearance_features.items() if feats}
    track_ids = list(avg_features.keys())
    if len(track_ids) <= 2:
        return {i: [i] for i in track_ids}

    feature_matrix = normalize(np.vstack([avg_features[tid] for tid in track_ids]))
    clustering = AgglomerativeClustering(n_clusters=None, distance_threshold=0.5, metric='cosine', linkage='average')
    labels = clustering.fit_predict(feature_matrix)

    clusters = {}
    for tid, label in zip(track_ids, labels):
        clusters.setdefault(label, []).append(tid)

    return clusters


def assign_cluster_names(clusters):
    # First, assign default names based on distance (sorted order)
    sorted_clusters = sorted(clusters.items(), key=lambda item: -sum(player_live_distances.get(tid, 0.0) for tid in item[1]))
    used_names = set()

    for i, (cluster_label, tids) in enumerate(sorted_clusters):
        name = f"Player {'ABCD'[i] if i < 4 else i}"

        # Check for known IDs in the cluster and override the name
        if 1 in tids:
            name = "Player A"
        elif 3 in tids:
            name = "Player B"

        # Avoid assigning the same name to two clusters
        if name in used_names:
            name = f"Player {'ABCD'[i] if i < 4 else i}"
        used_names.add(name)

        cluster_to_player_name[cluster_label] = name
        for tid in tids:
            track_id_to_player_name[tid] = name

def is_wide_angle_frame(frame, min_players=2, spread_threshold=200, max_bbox_height=300):
    results = model(frame, verbose=False)
    player_centers_y = []
    heights = []

    for box in results[0].boxes:
        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
        conf = box.conf[0].cpu().item()
        cls = int(box.cls[0].cpu().item())
        if cls == 0 and conf >= model.conf:
            cy = (y1 + y2) / 2
            h = y2 - y1
            player_centers_y.append(cy)
            heights.append(h)

    if len(player_centers_y) < min_players:
        return False

    # If any bounding box is taller than max_bbox_height, consider zoomed-in
    if any(h > max_bbox_height for h in heights):
        return False

    vertical_spread = max(player_centers_y) - min(player_centers_y)
    return vertical_spread > spread_threshold

In [None]:
if __name__ == '__main__':
    cap = cv2.VideoCapture("tennis_video_assignment.mp4")
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    out = cv2.VideoWriter("output_processed_video.mp4", cv2.VideoWriter_fourcc(*'mp4v'), fps, (frame_width, frame_height))
    frame_count = 0

    #Visual Debug court points
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    ret, test_frame = cap.read()
    for pt in pixel_court_pts:
        cv2.circle(test_frame, tuple(pt.astype(int)), 5, (0, 0, 255), -1)
    cv2.imshow("Court Calibration Check", test_frame)
    cv2.waitKey(0)
    #

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        wide = is_wide_angle_frame(frame)
        processed = process_frame_with_tracking(frame.copy()) if wide else frame.copy()
        label = "WIDE ANGLE" if wide else "ZOOMED"
        cv2.putText(processed, label, (30, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 0) if wide else (0, 0, 255), 3)

        out.write(processed)
        cv2.imshow("Tennis Tracking", processed)
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break

    cap.release()
    out.release()
    cv2.destroyAllWindows()

    print("\nFinal distances after merging appearances:")
    clusters = merge_tracks_by_appearance()
    assign_cluster_names(clusters)

    for cluster_label, tids in clusters.items():
        merged_path = []
        for tid in tids:
            merged_path.extend(player_tracks.get(tid, []))

        if len(merged_path) < 2:
            continue

        distances = [np.linalg.norm(np.array(merged_path[i]) - np.array(merged_path[i - 1])) for i in range(1, len(merged_path))]
        total_distance = np.sum(distances)
        name = cluster_to_player_name.get(cluster_label, f"Cluster {cluster_label}")
        print(f"{name}: {total_distance:.2f} meters")