In [None]:
!pip install gdown
!pip install tqdm
!pip install ultralytics



import gdown          
import os
import shutil
import zipfile
from google.colab import files 
import random

import numpy as np
import pandas as pd
from sklearn.metrics.pairwise import cosine_similarity
from collections import defaultdict
from collections import deque

import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms

import cv2
from tqdm import tqdm 
from PIL import Image
from IPython.display import display, FileLink, Markdown
from collections import defaultdict
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score
from scipy.optimize import linear_sum_assignment

url_dataset_1 = "https://drive.google.com/drive/folders/1nO_javaDEk0yFcglOX7WoqRNj_MtCEDd?usp=sharing"
url_dataset_2 = "https://drive.google.com/drive/folders/1VW5Y9-iUPNtz46uPFAVW__lu4WtJ1gAi?usp=sharing"
url_dataset_3 = "https://drive.google.com/drive/folders/1uwbs9kUTnRK9bE-B4QLdnEMVp_lgPRxM?usp=sharing"
gdown.download_folder(url_dataset_1, quiet=True)



['/content/train/reid_checkpoints/phase1_reid_hybrid_ENDZONE_epoch_20.pth',
 '/content/train/reid_checkpoints/phase1_reid_hybrid_SIDELINE_epoch_20.pth',
 '/content/train/reid_checkpoints/reid_hybrid_ENDZONE_best.pth',
 '/content/train/reid_checkpoints/reid_hybrid_SIDELINE_best.pth',
 '/content/train/best_dualview_96.pth',
 '/content/train/best_siamese_matcher_endzone.pth',
 '/content/train/best_siamese_matcher.pth',
 '/content/train/best_weights_players_detection.pt',
 '/content/train/best.pt',
 '/content/train/ckpt.t7',
 '/content/train/csv_helmets.zip',
 '/content/train/embeddings.zip',
 '/content/train/helmet_dataset.yaml',
 '/content/train/helmets_context_extracted.zip',
 '/content/train/helmets_extracted.zip',
 '/content/train/reid_gt_metadata_ENDZONE.csv',
 '/content/train/reid_gt_metadata_SIDELINE.csv',
 '/content/train/reid_gt_metadata.csv',
 '/content/train/train_player_tracking_frames.csv',
 '/content/train/video_directions.csv',
 '/content/train/Videos_dataset.zip']

In [None]:

original_path = "train"
new_base = "datasets"
new_path = os.path.join(new_base, "Dataset")

os.makedirs(new_base, exist_ok=True)

if not os.path.exists(new_path):
    shutil.move(original_path, new_path)
    print(f" Folder '{original_path}' moved in '{new_path}'")
dataset_path = "datasets/Dataset"

# extract zip files and remove them
def extract_and_remove_zip(zip_path, extract_to):
    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)
    os.remove(zip_path)
    print(f"File zip {zip_path} extracted and removed.")

zip_files = [f for f in os.listdir(dataset_path) if f.endswith('.zip')]
for zip_file in zip_files:
    zip_path = os.path.join(dataset_path, zip_file)
    extract_and_remove_zip(zip_path, dataset_path)

✅ Cartella 'train' spostata in 'datasets/Dataset'
File zip datasets/Dataset/Videos_dataset.zip estratto e rimosso.
File zip datasets/Dataset/csv_helmets.zip estratto e rimosso.
File zip datasets/Dataset/embeddings.zip estratto e rimosso.
File zip datasets/Dataset/helmets_context_extracted.zip estratto e rimosso.
File zip datasets/Dataset/helmets_extracted.zip estratto e rimosso.


In [None]:

ROOT_DIR = "/content/datasets/Dataset"
CHECKPOINT_DIR = os.path.join(ROOT_DIR, "reid_checkpoints")

CROPS_ROOT = os.path.join(ROOT_DIR, "helmets_extracted")

METADATA_PATH_BASE = os.path.join(ROOT_DIR, "reid_gt_metadata_")


OUTPUT_VIDEOS_DIR = "/content/tracked_output_samples"
os.makedirs(OUTPUT_VIDEOS_DIR, exist_ok=True)
VIDEOS_ROOT = os.path.join(ROOT_DIR, "Videos_dataset", "train")

FRAME_W = 1280
FRAME_H = 720
VIDEO_FPS = 59.94
EMBEDDING_DIM = 128
POSITIONAL_FEATURES_DIM = 4
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

MAX_COSINE_DISTANCE = 0.50
MAX_IOU_DISTANCE = 0.75
MAX_AGE = 60
N_INIT = 3

MAX_CENTER_DISTANCE_REACQ = 200 
MAX_COSINE_DISTANCE_REACQ = 0.60 
WEIGHT_REID_REACQ = 0.8  
WEIGHT_DIST_REACQ = 0.2 

N_SAMPLES_VIDEOS = 3 
MAX_SECONDS_PER_VIDEO = None 
FONT_SCALE = 0.6
THICKNESS = 2
TEXT_COLOR = (255, 255, 255) 


EXTRACTORS = {}
METADATA_GT = {}
PROCESSED_VIDEOS = defaultdict(list)
TRACKING_RESULTS = defaultdict(dict)
TRACK_EMBEDDINGS_CACHE = defaultdict(lambda: defaultdict(dict))

def get_metadata(view_type):
    global METADATA_GT, METADATA_PATH_BASE, CROPS_ROOT
    if view_type in METADATA_GT:
        return METADATA_GT[view_type]

    view = view_type.upper()

    if view == 'ENDZONE':
        metadata_path = os.path.join(ROOT_DIR, "reid_gt_metadata_ENDZONE.csv")
    elif view == 'SIDELINE':
        metadata_path = os.path.join(ROOT_DIR, "reid_gt_metadata_SIDELINE.csv")
    else:
        raise FileNotFoundError(f"CSV not found for {view}.")

    if not os.path.exists(metadata_path):
        raise FileNotFoundError(f"Metadata GT not founf for {view}: {metadata_path}")

    df = pd.read_csv(metadata_path)

    df['video_folder'] = df['video_file'].apply(lambda x: os.path.splitext(x)[0])

   
    df['crop_path'] = df['image_path'].apply(lambda rel_path: os.path.join(CROPS_ROOT, rel_path))

    METADATA_GT[view_type] = df
    return df



def run_tracking_logic(video_folder_name, metadata_df, max_frames_limit=None):
    global N_INIT
    global TRACK_EMBEDDINGS_CACHE 

    TRACK_EMBEDDINGS_CACHE[video_folder_name] = defaultdict(dict)

    if 'Endzone' in video_folder_name:
        view_type = 'ENDZONE'
    elif 'Sideline' in video_folder_name:
        view_type = 'SIDELINE'
    else:
        print(f"LOG: Ignoring {video_folder_name}. View label not existing.")
        return []


    video_metadata = metadata_df[metadata_df['video_folder'] == video_folder_name].copy().reset_index(drop=True)
    if video_metadata.empty:
        print(f" LOG: metadata not found for {video_folder_name}.")
        return []

    print(f"\nTracking: {video_folder_name} ({view_type})")
    all_frame_nums = sorted(video_metadata['frame_num'].unique())
    total_frames = len(all_frame_nums)

    if max_frames_limit and max_frames_limit < total_frames:
        print(f"⚠️ LOG: Limiting process to {max_frames_limit} frames on {total_frames}.")
        all_frame_nums = all_frame_nums[:max_frames_limit]
        total_frames = len(all_frame_nums)
    else:
        print(f"ℹ️ LOG: processed {total_frames}.")

    try:
        extractor = get_feature_extractor(view_type)
    except FileNotFoundError as e:
        print(f"❌ ERROR: {e}")
        return []

    all_tracks = []
    video_tracking_records = []


    with tqdm(total=total_frames, desc=f"Tracking {view_type}: {video_folder_name}") as pbar:
        for frame_num in all_frame_nums:

            current_frame_data = video_metadata[video_metadata['frame_num'] == frame_num].copy()
            new_detections_data = []

            for _, row in current_frame_data.iterrows():
                try:
                    crop = cv2.imread(row['crop_path'])
                    if crop is None or crop.size == 0:
                        continue
                except Exception:
                    continue

                x1, y1, x2, y2 = int(row['x1']), int(row['y1']), int(row['x2']), int(row['y2'])
                w = x2 - x1
                h = y2 - y1
                x_center = x1 + w / 2
                y_center = y1 + h / 2

                bbox_tlwh = np.array([x1, y1, w, h], dtype=np.float32)
                bbox_abs = np.array([x_center, y_center, w, h], dtype=np.float32)

                new_detections_data.append((bbox_tlwh, crop, bbox_abs))

            num_detections = len(new_detections_data)
            num_active_tracks_before = len(all_tracks)

            if num_detections == 0 and num_active_tracks_before == 0:
                pbar.update(1)
                continue

            matches, all_tracks_before_cleanup, unmatched_det_indices = associate_detections_to_tracks(
                all_tracks, new_detections_data, extractor)

            matched_tracks = set()

            for det_idx, track in matches:
                _, crop, bbox_abs = new_detections_data[det_idx]

                current_embedding = extractor([crop], np.array([bbox_abs]))[0]

                track.update(current_embedding, bbox_abs)
                matched_tracks.add(track)

                TRACK_EMBEDDINGS_CACHE[video_folder_name][frame_num][track.track_id] = current_embedding

            missed_tracks = 0
            deleted_tracks = 0
            tracks_to_keep = []

            for track in all_tracks:
                if track not in matched_tracks:
                    track.mark_missed()
                    missed_tracks += 1

                if track.is_deleted():
                    deleted_tracks += 1
                else:
                    tracks_to_keep.append(track)

            all_tracks = tracks_to_keep


            newly_created_tracks = []
            for det_idx in unmatched_det_indices:
                _, crop, bbox_abs = new_detections_data[det_idx]
                new_embedding = extractor([crop], np.array([bbox_abs]))[0]
                new_track = SimpleTrack(new_embedding, bbox_abs, frame_num)
                all_tracks.append(new_track)
                newly_created_tracks.append(new_track.track_id)

                TRACK_EMBEDDINGS_CACHE[video_folder_name][frame_num][new_track.track_id] = new_embedding


            for track in all_tracks:

                if track.time_since_update == 0:
                    x1, y1, x2, y2 = track.predicted_bbox_tlbr.tolist()

                    video_tracking_records.append({
                        'frame_num': frame_num,
                        'track_id': track.track_id,
                        'x_min': x1,
                        'y_min': y1,
                        'x_max': x2,
                        'y_max': y2,
                    })

                elif track.is_missing_but_active():
                    continue

                else:
                    continue 


            num_confirmed_tracks_after = len([t for t in all_tracks if t.is_confirmed()])

            pbar.update(1)

    num_total_tracks = SimpleTrack._count
    print(f"\n Tracking complete: {video_folder_name}")
    print(f"  -> # ID Tracks Generated: {num_total_tracks}")
    print(f"  -> Cache Embedding: {sum(len(v) for f in TRACK_EMBEDDINGS_CACHE[video_folder_name].values() for v in f.values())} embedding saved.")

    return video_tracking_records



class SimpleTrack:
    _count = 0

    def __init__(self, embedding, bbox_abs, frame_num):
        SimpleTrack._count += 1
        self.track_id = SimpleTrack._count

        self.last_bbox_abs = bbox_abs.copy()

        self.embeddings = deque([embedding], maxlen=10)
        self.hits = 1
        self.time_since_update = 0
        self.start_frame = frame_num
        self.deleted = False

    def update(self, embedding, bbox_abs):
        self.embeddings.append(embedding)
        self.last_bbox_abs = bbox_abs.copy()
        self.hits += 1
        self.time_since_update = 0

    def mark_missed(self):
        global MAX_AGE
        self.time_since_update += 1

        if self.time_since_update > MAX_AGE:
            self.deleted = True

    def is_missing_but_active(self):
        return self.time_since_update > 0 and not self.is_deleted()

    def is_confirmed(self):
        return self.hits >= N_INIT

    def is_deleted(self):
        return self.deleted

    @property
    def predicted_bbox_tlbr(self):
        x_c, y_c, w, h = self.last_bbox_abs
        return np.array([x_c - w/2, y_c - h/2, x_c + w/2, y_c + h/2])

    @property
    def mean_embedding(self):
        mean_emb = np.mean(self.embeddings, axis=0)
        norm = np.linalg.norm(mean_emb)
        if norm > 0:
            return (mean_emb / norm).reshape(1, -1)
        else:
            return mean_emb.reshape(1, -1)


def bbox_iou_simple(boxA, boxB):
    
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    interArea = max(0, xB - xA) * max(0, yB - yA)

    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

    union = float(boxAArea + boxBArea - interArea)
    if union == 0:
        return 0.0

    iou = interArea / union
    return iou


class HybridFrameReIDModel(nn.Module):
    def __init__(self, embedding_dim=128, positional_features_dim=4):
        super().__init__()
        resnet = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)
        self.features = nn.Sequential(*(list(resnet.children())[:-2]))
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))

        VISUAL_FEATURE_DIM = 512
        COMBINED_DIM = VISUAL_FEATURE_DIM + positional_features_dim
        self.embedding_layer = nn.Linear(COMBINED_DIM, embedding_dim)

    def forward(self, x_image, x_positional):
        x_image = self.features(x_image)
        x_image = self.avgpool(x_image)
        x_image = torch.flatten(x_image, 1)
        combined_features = torch.cat((x_image, x_positional), dim=1)
        embedding = self.embedding_layer(combined_features)
        return nn.functional.normalize(embedding, p=2, dim=1)

transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((64, 64)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

class CustomFeatureExtractor:
    def __init__(self, reid_model, transform, device, frame_w, frame_h):
        self.reid_model = reid_model
        self.transform = transform
        self.device = device
        self.FRAME_W = frame_w
        self.FRAME_H = frame_h
        self.reid_model.eval()

    def __call__(self, img_crops, bbox_xywh_abs):
        if len(img_crops) == 0:
            return np.empty((0, self.reid_model.embedding_layer.out_features), dtype=np.float32)

        image_tensors = []
        for crop in img_crops:
            if crop is not None and crop.size > 0:
                rgb_crop = cv2.cvtColor(crop, cv2.COLOR_BGR2RGB)
                image_tensors.append(self.transform(rgb_crop))
            else:
                image_tensors.append(torch.zeros(3, 64, 64, dtype=torch.float32))

        image_batch = torch.stack(image_tensors).to(self.device)

        x_c, y_c, w_bb, h_bb = bbox_xywh_abs[:, 0], bbox_xywh_abs[:, 1], bbox_xywh_abs[:, 2], bbox_xywh_abs[:, 3]
        x_center_norm = torch.tensor(x_c / self.FRAME_W, dtype=torch.float32)
        y_center_norm = torch.tensor(y_c / self.FRAME_H, dtype=torch.float32)
        w_bb_norm = torch.tensor(w_bb / self.FRAME_W, dtype=torch.float32)
        h_bb_norm = torch.tensor(h_bb / self.FRAME_H, dtype=torch.float32)

        positional_data_batch = torch.stack([x_center_norm, y_center_norm, w_bb_norm, h_bb_norm], dim=1).to(self.device)

        with torch.no_grad():
            embeddings = self.reid_model(image_batch, positional_data_batch)

        return embeddings.cpu().numpy()


def visualize_and_save_video(video_folder_name, fps, tracking_data, data_key, suffix):
    global VIDEOS_ROOT, OUTPUT_VIDEOS_DIR, THICKNESS, FONT_SCALE, TEXT_COLOR

    print(f"\n Visualization: {video_folder_name} ({suffix.strip('_')})")

    if data_key not in tracking_data or not tracking_data[data_key]:
        print(f" Error: {video_folder_name}: Not found or empty '{data_key}'.")
        return

    tracked_data = tracking_data[data_key] 

    video_file = video_folder_name + '.mp4'
    VIDEO_PATH = os.path.join(VIDEOS_ROOT, video_file)

    if not os.path.exists(VIDEO_PATH):
        print(f" Error: {VIDEO_PATH}.")
        return

    cap = cv2.VideoCapture(VIDEO_PATH)
    if not cap.isOpened():
        print(f" Error: {VIDEO_PATH}.")
        return

    output_frames = []
    current_frame_num = 0

    ID_COLORS = {}
    max_frame_to_read = max(tracked_data.keys()) if tracked_data else 0

    if max_frame_to_read == 0:
        print(f" LOG: {video_folder_name}: No tracking.")
        cap.release()
        return

    num_frames_with_tracks = 0

    with tqdm(desc=f"Visualization {video_folder_name} {suffix}", total=max_frame_to_read) as pbar:
        while cap.isOpened() and current_frame_num < max_frame_to_read:
            ret, frame = cap.read()
            if not ret:
                break

            current_frame_num += 1

            if current_frame_num in tracked_data:

                frame_tracks = tracked_data[current_frame_num]

                if frame_tracks:
                    num_frames_with_tracks += 1

                    if current_frame_num % 100 == 1:
                        print(f"      [F{current_frame_num}]: Drawing {len(frame_tracks)} tracks. E.G. BBox: {frame_tracks[0]['bbox']}")

                    for track_info in frame_tracks:
                        track_id = track_info['id']
                        x1, y1, x2, y2 = track_info['bbox']

                        if track_id not in ID_COLORS:
                            r = (track_id * 85) % 255
                            g = (track_id * 15) % 255
                            b = (track_id * 50) % 255
                            ID_COLORS[track_id] = (int(b), int(g), int(r))

                        color = ID_COLORS[track_id]

                        cv2.rectangle(frame, (x1, y1), (x2, y2), color, THICKNESS)

                        label = f"ID: {track_id}"
                        (tw, th), _ = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, FONT_SCALE, THICKNESS)

                        cv2.rectangle(frame, (x1, max(0,y1-th-4)), (x1+tw+4, y1), (0,0,0), -1)
                        cv2.putText(frame, label, (x1+2, y1-4), cv2.FONT_HERSHEY_SIMPLEX, FONT_SCALE, TEXT_COLOR, THICKNESS-1, cv2.LINE_AA)

                output_frames.append(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            pbar.update(1)

    cap.release()

    if output_frames:
        output_path_gif = os.path.join(OUTPUT_VIDEOS_DIR, f"{video_folder_name}{suffix}.gif") 

        first_frame = Image.fromarray(output_frames[0]).resize((640, 360))
        pil_frames = [first_frame] + [Image.fromarray(f).resize((640, 360)) for f in output_frames[1:]]

        pil_frames[0].save(
            output_path_gif,
            save_all=True,
            append_images=pil_frames[1:],
            duration=int(1000/fps),
            loop=0
        )
        print(f"\n Visualization Completed: {video_folder_name}")
        print(f"  -> Frame with tracks: {num_frames_with_tracks}")
        print(f"  -> Saved example: {output_path_gif}")

    return True

def get_feature_extractor(view_type):
    global EXTRACTORS, CHECKPOINT_DIR
    if view_type in EXTRACTORS:
        return EXTRACTORS[view_type]

    view = view_type.upper()

    reid_model = HybridFrameReIDModel(EMBEDDING_DIM, POSITIONAL_FEATURES_DIM).to(device)

    if view_type.lower() == 'sideline':
        checkpoint_name = "reid_hybrid_SIDELINE_best.pth"
    else:
        checkpoint_name = "reid_hybrid_ENDZONE_best.pth"

    checkpoint_path = os.path.join(CHECKPOINT_DIR, checkpoint_name)

    if not os.path.exists(checkpoint_path):
        raise FileNotFoundError(f"Checkpoint not found for {view}: {checkpoint_path}")

    print(f" Checkpoint weights: {checkpoint_path}")
    checkpoint = torch.load(checkpoint_path, map_location=device)

    if isinstance(checkpoint, dict) and 'model_state_dict' in checkpoint:
        reid_model.load_state_dict(checkpoint['model_state_dict'])
    else:
        try:
            reid_model.load_state_dict(checkpoint)
        except RuntimeError as e:
            print(f" Error loading checkpoint: {e}")
            new_state_dict = {}
            for k, v in checkpoint.items():
                name = k.replace("module.", "") if k.startswith("module.") else k
                new_state_dict[name] = v
            reid_model.load_state_dict(new_state_dict)

    reid_model.eval()

    feature_extractor = CustomFeatureExtractor(reid_model, transform, device, FRAME_W, FRAME_H)

    EXTRACTORS[view_type] = feature_extractor
    print(f"ℹ LOG: Re-ID {view} loaded from {checkpoint_name}.")
    return feature_extractor

def associate_detections_to_tracks(all_tracks, new_detections_data, extractor):
    global MAX_COSINE_DISTANCE, MAX_IOU_DISTANCE, MAX_CENTER_DISTANCE_REACQ
    global WEIGHT_REID_REACQ, WEIGHT_DIST_REACQ, MAX_COSINE_DISTANCE_REACQ

    WEIGHT_REID_PRIM = 0.7
    WEIGHT_IOU_PRIM = 0.3
    MAX_COMBINED_COST_PRIM = MAX_COSINE_DISTANCE * WEIGHT_REID_PRIM + (1.0 - MAX_IOU_DISTANCE) * WEIGHT_IOU_PRIM

    active_tracks = [t for t in all_tracks if not t.is_deleted()]

    if not active_tracks or not new_detections_data:
        return [], active_tracks, list(range(len(new_detections_data)))

    track_embeddings = np.vstack([t.mean_embedding for t in active_tracks])
    track_bboxes_tlbr = np.array([t.predicted_bbox_tlbr for t in active_tracks]) 

    det_crops = [d[1] for d in new_detections_data]
    det_bboxes_abs = np.vstack([d[2] for d in new_detections_data])
    det_bboxes_tlbr = np.array([[d[0][0], d[0][1], d[0][0] + d[0][2], d[0][1] + d[0][3]] for d in new_detections_data])

    det_embeddings = extractor(det_crops, det_bboxes_abs)

    cosine_similarity_matrix = cosine_similarity(det_embeddings, track_embeddings)
    identity_cost = 1.0 - cosine_similarity_matrix 

    iou_cost = np.zeros((len(new_detections_data), len(active_tracks)))
    distance_cost = np.zeros((len(new_detections_data), len(active_tracks)))

    det_centers = det_bboxes_abs[:, :2]
    track_centers = np.array([t.last_bbox_abs[:2] for t in active_tracks])

    IOU_GATE_THRESHOLD_PRIM = 0.25
    GATE_VALUE = 100.0

    for i in range(len(new_detections_data)):
        for j, track in enumerate(active_tracks):
            iou = bbox_iou_simple(det_bboxes_tlbr[i], track_bboxes_tlbr[j])

            center_dist = np.linalg.norm(det_centers[i] - track_centers[j])

            if iou < IOU_GATE_THRESHOLD_PRIM:
                iou_cost[i, j] = GATE_VALUE
            else:
                iou_cost[i, j] = 1.0 - iou

            if center_dist > MAX_CENTER_DISTANCE_REACQ:
                distance_cost[i, j] = GATE_VALUE
            else:
                distance_cost[i, j] = center_dist / MAX_CENTER_DISTANCE_REACQ

    COMBINED_COST_PRIM = identity_cost * WEIGHT_REID_PRIM + iou_cost * WEIGHT_IOU_PRIM

    matches = []
    unmatched_detections = list(range(len(new_detections_data)))
    unmatched_tracks = list(range(len(active_tracks)))

    cost_matrix_temp = COMBINED_COST_PRIM.copy()

    while unmatched_detections and unmatched_tracks:
        sub_matrix = cost_matrix_temp[np.ix_(unmatched_detections, unmatched_tracks)]
        if sub_matrix.size == 0: break

        min_cost_idx = np.unravel_index(sub_matrix.argmin(), sub_matrix.shape)
        det_idx = unmatched_detections[min_cost_idx[0]]
        track_idx = unmatched_tracks[min_cost_idx[1]]
        cost = COMBINED_COST_PRIM[det_idx, track_idx]

        if cost < MAX_COMBINED_COST_PRIM:
            matches.append((det_idx, active_tracks[track_idx]))
            unmatched_detections.remove(det_idx)
            unmatched_tracks.remove(track_idx)
        else:
            break

    missing_tracks_to_recover = [active_tracks[i] for i in unmatched_tracks if active_tracks[i].time_since_update > 0]
    missing_track_indices = [i for i in unmatched_tracks if active_tracks[i].time_since_update > 0]

    if not unmatched_detections or not missing_tracks_to_recover:
        return matches, active_tracks, unmatched_detections

    missing_identity_cost = identity_cost[np.ix_(unmatched_detections, missing_track_indices)]
    missing_distance_cost = distance_cost[np.ix_(unmatched_detections, missing_track_indices)]


    COMBINED_COST_REACQ = missing_identity_cost * WEIGHT_REID_REACQ + missing_distance_cost * WEIGHT_DIST_REACQ


    REID_GATE_MASK = missing_identity_cost > MAX_COSINE_DISTANCE_REACQ

    DISTANCE_GATE_MASK = missing_distance_cost >= GATE_VALUE


    GATE_MASK = REID_GATE_MASK | DISTANCE_GATE_MASK

    COMBINED_COST_REACQ[GATE_MASK] = GATE_VALUE


    MAX_COMBINED_COST_REACQ = MAX_COSINE_DISTANCE_REACQ * WEIGHT_REID_REACQ + 1.0 * WEIGHT_DIST_REACQ

    reacquisition_matches = []

    while True:
        if COMBINED_COST_REACQ.size == 0 or np.all(COMBINED_COST_REACQ >= GATE_VALUE):
            break

        min_cost_idx = np.unravel_index(COMBINED_COST_REACQ.argmin(), COMBINED_COST_REACQ.shape)

        det_idx_in_unmatched = min_cost_idx[0]
        track_idx_in_missing = min_cost_idx[1]

        cost = COMBINED_COST_REACQ[det_idx_in_unmatched, track_idx_in_missing]

        if cost < MAX_COMBINED_COST_REACQ:

            original_det_idx = unmatched_detections[det_idx_in_unmatched]
            track_to_update = missing_tracks_to_recover[track_idx_in_missing]

            reacquisition_matches.append((original_det_idx, track_to_update))

            COMBINED_COST_REACQ[det_idx_in_unmatched, :] = GATE_VALUE
            COMBINED_COST_REACQ[:, track_idx_in_missing] = GATE_VALUE
        else:
            break

    final_matches = matches + reacquisition_matches
    final_unmatched_detections = [idx for idx in unmatched_detections if idx not in [m[0] for m in reacquisition_matches]]

    return final_matches, active_tracks, final_unmatched_detections


MIN_TRACK_LENGTH = 10         
MAX_FRAME_GAP_MERGE = 15      
MAX_COSINE_DISTANCE_PP = 0.55 


MAX_DISPLACEMENT_ENDZONE = 113
MAX_DISPLACEMENT_SIDELINE = 123 

FRAME_WIDTH = 1280
FRAME_HEIGHT = 720
EDGE_TOLERANCE = 50


def get_embedding_from_cache(video_id, frame_num, track_id):

    global TRACK_EMBEDDINGS_CACHE
    try:
        return TRACK_EMBEDDINGS_CACHE[video_id][frame_num][track_id]
    except KeyError:
        return None

def center_distance(boxA, boxB):
    center_Ax = (boxA[0] + boxA[2]) / 2
    center_Ay = (boxA[1] + boxA[3]) / 2
    center_Bx = (boxB[0] + boxB[2]) / 2
    center_By = (boxB[1] + boxB[3]) / 2
    distance = np.sqrt((center_Ax - center_Bx)**2 + (center_Ay - center_By)**2)
    return distance

def is_at_edge(bbox_record):
    
    x_min = bbox_record['x_min']
    x_max = bbox_record['x_max']
    y_min = bbox_record['y_min']
    y_max = bbox_record['y_max']

    at_left = x_min <= EDGE_TOLERANCE
    at_right = x_max >= FRAME_WIDTH - EDGE_TOLERANCE
    at_top = y_min <= EDGE_TOLERANCE
    at_bottom = y_max >= FRAME_HEIGHT - EDGE_TOLERANCE

    return at_left or at_right or at_top or at_bottom


def _apply_post_processing(df):
    
    if df.empty:
        return df.copy()

    df = df.copy().sort_values(by=['track_id', 'frame_num']).reset_index(drop=True)
    video_id = df['video_id'].iloc[0]

    if 'Endzone' in video_id:
        max_displacement_per_frame = MAX_DISPLACEMENT_ENDZONE
    elif 'Sideline' in video_id:
        max_displacement_per_frame = MAX_DISPLACEMENT_SIDELINE
    else:
        max_displacement_per_frame = MAX_DISPLACEMENT_SIDELINE

    print(f"ℹ️ LOG: Soglia Max Spostamento per {video_id}: {max_displacement_per_frame} pixel/frame.")

    track_lengths = df.groupby('track_id')['frame_num'].nunique()
    valid_track_ids = track_lengths[track_lengths >= MIN_TRACK_LENGTH].index
    df_filtered = df[df['track_id'].isin(valid_track_ids)].copy()

    if df_filtered.empty:
        return pd.DataFrame()


    grouped = df_filtered.groupby('track_id')
    all_tracks_data = [track_df.sort_values(by='frame_num') for _, track_df in grouped]
    all_tracks_data.sort(key=lambda x: x['frame_num'].iloc[0]) 

    merged_tracks = []
    current_track_list = []
    current_new_id = 1

    for track_df_next in all_tracks_data:

        if not current_track_list:
            track_df_next['track_id'] = current_new_id
            current_track_list = [track_df_next]
            continue

        last_track_df = current_track_list[-1]
        last_frame = last_track_df['frame_num'].iloc[-1]
        start_frame_next = track_df_next['frame_num'].iloc[0]
        gap_size = start_frame_next - last_frame


        if 0 < gap_size <= MAX_FRAME_GAP_MERGE:

            last_record = last_track_df.iloc[-1]
            next_record = track_df_next.iloc[0]


            emb_last = get_embedding_from_cache(video_id, last_frame, last_record['track_id'])
            emb_next = get_embedding_from_cache(video_id, start_frame_next, next_record['track_id'])

            is_reid_match = False
            if emb_last is not None and emb_next is not None:

                similarity = cosine_similarity(emb_last.reshape(1, -1), emb_next.reshape(1, -1))[0][0]
                cost_reid = 1.0 - similarity
                is_reid_match = cost_reid <= MAX_COSINE_DISTANCE_PP

            last_bbox = [
                last_record['x_min'], last_record['y_min'],
                last_record['x_max'], last_record['y_max']
            ]
            next_bbox = [
                next_record['x_min'], next_record['y_min'],
                next_record['x_max'], next_record['y_max']
            ]

            distance = center_distance(last_bbox, next_bbox)

            max_allowed_distance = gap_size * max_displacement_per_frame
            is_spatially_close = distance <= max_allowed_distance

            is_edge_entry = is_at_edge(next_record)

            if is_reid_match and is_spatially_close and (not is_edge_entry):
                track_df_next['track_id'] = current_new_id 
                current_track_list.append(track_df_next)
                continue



        if current_track_list:
            merged_df_segment = pd.concat(current_track_list, ignore_index=True)
            merged_tracks.append(merged_df_segment)

        current_new_id += 1
        track_df_next['track_id'] = current_new_id
        current_track_list = [track_df_next]

    if current_track_list:
        merged_df_segment = pd.concat(current_track_list, ignore_index=True)
        merged_tracks.append(merged_df_segment)

    if not merged_tracks:
        return pd.DataFrame()

    final_df = pd.concat(merged_tracks, ignore_index=True).sort_values(by=['track_id', 'frame_num']).reset_index(drop=True)

    unique_ids = final_df['track_id'].unique()
    id_map = {old_id: new_id + 1 for new_id, old_id in enumerate(unique_ids)}
    final_df['track_id'] = final_df['track_id'].map(id_map)

    print(f" Post-Processing completed for {video_id}. Final Tracks: {len(unique_ids)}")
    return final_df

def save_tracking_results_to_csv(df, video_id="consolidated"):
    output_csv_name = f"{video_id}_tracking_log.csv"
    df.to_csv(output_csv_name, index=False, na_rep='NaN')

    print(f" LOG: File CSV saved: {output_csv_name}")

    print("\n DOWNLOAD FILE:")
    try:
        from google.colab import files
        files.download(output_csv_name)
    except Exception as e:
        print(f"Error: {e}")



In [None]:
TRACKING_RESULTS_VISUAL = {} 


all_video_folders = [d for d in os.listdir(CROPS_ROOT) if os.path.isdir(os.path.join(CROPS_ROOT, d))]
endzone_folders = [f for f in all_video_folders if 'Endzone' in f]
sideline_folders = [f for f in all_video_folders if 'Sideline' in f]


videos_to_process_folders = endzone_folders + sideline_folders

print(f"Elaborating {len(videos_to_process_folders)}")

max_frames_limit = None
try:
    metadata_df_endzone = get_metadata('ENDZONE')
    metadata_df_sideline = get_metadata('SIDELINE')
except FileNotFoundError as e:
    print(f"ERROR: {e}")
    exit()
all_tracking_records_post = []

for video_folder in videos_to_process_folders:
    metadata_df = metadata_df_endzone if 'Endzone' in video_folder else metadata_df_sideline

    video_records_pre = run_tracking_logic(video_folder, metadata_df, max_frames_limit)
    video_records_pre_df = pd.DataFrame(video_records_pre)
    video_records_pre_df['video_id'] = video_folder 

    if video_records_pre_df.empty:
        print(f" LOG: No record for {video_folder}.")
        continue

    video_records_post_df = _apply_post_processing(video_records_pre_df)

    all_tracking_records_post.extend(video_records_post_df.to_dict('records'))

    print(f" LOG: Tracks POST-PP: {video_records_post_df['track_id'].nunique() if not video_records_post_df.empty else 0}.")


if not all_tracking_records_post:
    print("ATTENTION: No data POST-PP found.")
else:
    final_tracking_df = pd.DataFrame(all_tracking_records_post)

    final_tracking_df = final_tracking_df[[
        'video_id', 'frame_num', 'track_id',
        'x_min', 'y_min', 'x_max', 'y_max' 
    ]].sort_values(by=['video_id', 'track_id', 'frame_num']).reset_index(drop=True)

    print(f" LOG: DataFrame POST-PP created (Total {len(final_tracking_df)} record).")

    save_tracking_results_to_csv(final_tracking_df, video_id="consolidated_post_processed")



In [None]:
TARGET_VIDEO_FILENAME = "58102_002798_Endzone.mp4"
TARGET_VIDEO_FOLDER = TARGET_VIDEO_FILENAME.replace('.mp4', '')

possible_roots = [
    "/content/datasets/Dataset/Videos_dataset/train",
    "/content/datasets/Dataset/Videos_dataset/test",
    "/content/datasets/Dataset/Videos_dataset"
]

found_video_path = None
for root in possible_roots:
    path = os.path.join(root, TARGET_VIDEO_FILENAME)
    if os.path.exists(path):
        found_video_path = root
        break

if not found_video_path:
    print(f" ERROR: {TARGET_VIDEO_FILENAME}!")
    VIDEOS_DATA = "/content/datasets/Dataset/Videos_dataset"
else:
    VIDEOS_DATA = found_video_path

if 'final_tracking_df' in locals():
    df_source = final_tracking_df
else:
    print("Error.")
    df_source = pd.DataFrame()

if not df_source.empty:
    col_video = next((c for c in ['video_id', 'video_folder', 'video'] if c in df_source.columns), None)

    if col_video:
        df_target = df_source[df_source[col_video].astype(str).str.contains(TARGET_VIDEO_FOLDER)].copy()
    else:
        df_target = pd.DataFrame()

    if not df_target.empty:
        from collections import defaultdict
        vis_tracks = defaultdict(list)

        for _, row in df_target.iterrows():
            frame = int(row.get('frame_num', row.get('frame')))
            tid = int(row['track_id'])

            if 'x_min' in row:
                x1, y1 = int(row['x_min']), int(row['y_min'])
                x2, y2 = int(row['x_max']), int(row['y_max'])
            else:
                x1, y1 = int(row['left']), int(row['top'])
                x2 = int(x1 + row['width'])
                y2 = int(y1 + row['height'])

            vis_tracks[frame].append({'id': tid, 'bbox': [x1, y1, x2, y2]})

        VIS_KEY = 'manual_check'
        vis_wrapper = {VIS_KEY: vis_tracks}

        visualize_and_save_video(
            video_folder_name=TARGET_VIDEO_FOLDER,
            fps=59.94,
            tracking_data=vis_wrapper,
            data_key=VIS_KEY,
            suffix='_FINAL_CHECK'
        )
    else:
        print(f" No tracking data for: {TARGET_VIDEO_FILENAME}.")
else:
    print(" DataFrame Empty.")