# Download SoccerNet

In [None]:
# Installazione librerie necessarie
!pip install SoccerNet

import os
from SoccerNet.Downloader import SoccerNetDownloader

# Impostiamo un percorso universale (una cartella 'dataset' locale al notebook)
base_dir = './dataset_soccernet'

if not os.path.exists(base_dir):
    os.makedirs(base_dir)
    print(f"Creata cartella: {base_dir}")
else:
    print(f"Cartella gi√† esistente: {base_dir}")

# Inizializza il downloader
mySoccerNetDownloader = SoccerNetDownloader(LocalDirectory=base_dir)

# --- 1. Scaricare i dati di TRACKING ---
print("Inizio download Tracking Data...")
mySoccerNetDownloader.downloadDataTask(task="tracking", split=["challenge"])

# --- 2. Scaricare i dati di RE-IDENTIFICATION ---
print("Inizio download Re-ID Data...")
mySoccerNetDownloader.downloadDataTask(task="reid", split=["train", "valid", "test"])


# Conversione training per YOLO



In [None]:
# ==========================================
# 1. SETUP ENVIRONMENT & LIBRARIES
# ==========================================
import os
import shutil
import configparser
import glob
from tqdm.notebook import tqdm
from IPython.display import clear_output

# Installa Ultralytics (YOLO) se non presente
try:
    import ultralytics
    print("‚úÖ Ultralytics gi√† installato.")
except ImportError:
    print("‚¨áÔ∏è Installazione Ultralytics in corso...")
    !pip install ultralytics
    clear_output()
    print("‚úÖ Installazione completata.")

import torch
if torch.cuda.is_available():
    print(f"üî• GPU Attiva: {torch.cuda.get_device_name(0)}")
else:
    print("‚ö†Ô∏è ATTENZIONE: Stai usando la CPU! Attiva un acceleratore hardware per il training.")

# ==========================================
# 2. CONFIGURAZIONE PERCORSI (Relative Paths)
# ==========================================
# Istruzione per chi clona la repo: Assicurati di avere il file zip del dataset
# nella stessa cartella di questo notebook, oppure modifica 'LOCAL_ZIP_PATH'.
LOCAL_ZIP_PATH = './test.zip'
RAW_DATA_DIR = './dataset_raw'
YOLO_DATA_DIR = './dataset_yolo'

# ==========================================
# 3. ESTRAZIONE DATASET
# ==========================================
if not os.path.exists(RAW_DATA_DIR):
    if os.path.exists(LOCAL_ZIP_PATH):
        print(f"‚è≥ Estrazione di {LOCAL_ZIP_PATH} in corso...")
        shutil.unpack_archive(LOCAL_ZIP_PATH, RAW_DATA_DIR)
        print(f"‚úÖ Dataset estratto in: {RAW_DATA_DIR}")
    else:
        print(f"‚ö†Ô∏è ATTENZIONE: File {LOCAL_ZIP_PATH} non trovato. "
              f"Se non hai i dati grezzi, assicurati di scaricarli prima di procedere.")
else:
    print(f"‚úÖ Dataset gi√† presente in: {RAW_DATA_DIR}. Salto l'estrazione.")

# ==========================================
# 4. PREPROCESSING & CUSTOM FILTERING
# ==========================================
# Questa sezione converte le annotazioni nel formato YOLO
# e implementa un filtro personalizzato per escludere la palla (ball tracklets).

def get_ball_ids(ini_path):
    """Legge il file INI e restituisce gli ID corrispondenti alla palla."""
    ball_ids = set()
    if not os.path.exists(ini_path): return ball_ids

    try:
        with open(ini_path, 'r', encoding='utf-8', errors='ignore') as f:
            for line in f:
                line = line.strip()
                if line.startswith('trackletID_') and 'ball' in line.lower():
                    try:
                        key_part = line.split('=')[0].strip()
                        track_id = int(key_part.split('_')[1])
                        ball_ids.add(track_id)
                    except (IndexError, ValueError):
                        continue
    except Exception as e:
        print(f"‚ö†Ô∏è Errore lettura INI {ini_path}: {e}")
    return ball_ids

def convert_soccernet_clean(source_dir, dest_dir):
    """Converte le annotazioni SoccerNet in formato YOLO filtrando la palla."""
    if os.path.exists(dest_dir):
        print(f"üßπ Pulizia cartella destinazione ({dest_dir})...")
        shutil.rmtree(dest_dir)

    img_dest_path = os.path.join(dest_dir, 'images', 'train')
    lbl_dest_path = os.path.join(dest_dir, 'labels', 'train')
    os.makedirs(img_dest_path, exist_ok=True)
    os.makedirs(lbl_dest_path, exist_ok=True)

    print("üîç Ricerca file gt.txt in corso...")
    gt_files = glob.glob(os.path.join(source_dir, '**', 'gt.txt'), recursive=True)

    if not gt_files:
        print("‚ùå ERRORE: Nessun file gt.txt trovato nella cartella sorgente!")
        return

    print(f"üìÇ Trovate {len(gt_files)} sequenze. Inizio conversione...")

    total_frames = 0
    total_boxes = 0
    removed_balls = 0

    for gt_path in gt_files:
        gt_folder = os.path.dirname(gt_path)
        seq_folder = os.path.dirname(gt_folder)
        seq_name = os.path.basename(seq_folder)
        img_folder = os.path.join(seq_folder, 'img1')

        ini_file = os.path.join(seq_folder, 'gameinfo.ini')
        if not os.path.exists(ini_file):
            ini_file = os.path.join(seq_folder, 'seqinfo.ini')

        if not os.path.exists(img_folder): continue

        images = []
        for ext in ('*.jpg', '*.jpeg', '*.png', '*.JPG'):
            images.extend(glob.glob(os.path.join(img_folder, ext)))

        if not images: continue

        ids_to_ignore = get_ball_ids(ini_file)

        W, H = 1920, 1080
        if os.path.exists(ini_file):
            cfg = configparser.ConfigParser()
            try:
                cfg.read(ini_file)
                if 'Sequence' in cfg:
                    W = int(cfg['Sequence'].get('imWidth', 1920))
                    H = int(cfg['Sequence'].get('imHeight', 1080))
            except: pass

        anns = {}
        with open(gt_path, 'r') as f:
            for line in f:
                parts = line.strip().split(',')
                if len(parts) < 6: continue
                try:
                    fid, obj_id = int(parts[0]), int(parts[1])

                    # Filtro palla
                    if obj_id in ids_to_ignore:
                        removed_balls += 1
                        continue

                    x, y, w, h = float(parts[2]), float(parts[3]), float(parts[4]), float(parts[5])
                    xc = max(0.0, min(1.0, (x + w/2) / W))
                    yc = max(0.0, min(1.0, (y + h/2) / H))
                    wn = max(0.0, min(1.0, w / W))
                    hn = max(0.0, min(1.0, h / H))

                    label_str = f"0 {xc:.6f} {yc:.6f} {wn:.6f} {hn:.6f}"
                    if fid not in anns: anns[fid] = []
                    anns[fid].append(label_str)
                    total_boxes += 1
                except ValueError: continue

        images = sorted([os.path.basename(x) for x in images])
        for fname in tqdm(images, desc=f"{seq_name}", leave=False):
            try: fid = int(fname.split('.')[0])
            except: continue

            new_name = f"{seq_name}_{fname}"
            shutil.copy(os.path.join(img_folder, fname), os.path.join(img_dest_path, new_name))

            txt_name = os.path.splitext(new_name)[0] + ".txt"
            with open(os.path.join(lbl_dest_path, txt_name), 'w') as f_out:
                if fid in anns:
                    for line in anns[fid]:
                        f_out.write(line + '\n')
            total_frames += 1

    print("\n" + "="*50)
    print("‚úÖ DATASET FORMATTATO CON SUCCESSO")
    print(f"üìÅ Immagini processate: {total_frames}")
    print(f"üì¶ Box salvati (Player/Ref/GK): {total_boxes}")
    print(f"‚öΩ Box PALLA filtrati e rimossi: {removed_balls}")
    print("="*50)

# Esegui la conversione solo se la cartella RAW esiste ed √® stata estratta
if os.path.exists(RAW_DATA_DIR) and os.path.isdir(RAW_DATA_DIR):
    # Controllo rapido per evitare di lanciare la funzione se la cartella √® vuota
    if any(os.scandir(RAW_DATA_DIR)):
        convert_soccernet_clean(RAW_DATA_DIR, YOLO_DATA_DIR)

#YOLOv11M Standard
Addestramento del backbone

Visualizzazione labels

In [None]:
# ==========================================
# 5. DATASET VISUALIZATION (Sanity Check)
# ==========================================
import cv2
import matplotlib.pyplot as plt
import os

def visualize_sample(image_filename, split='train', dataset_root='./dataset_yolo'):
    """Visualizza un'immagine e le sue bounding box di ground truth."""
    if not image_filename.endswith('.jpg'): image_filename += '.jpg'
    image_name_no_ext = image_filename.replace('.jpg', '')
    txt_filename = image_name_no_ext + '.txt'

    img_path = os.path.join(dataset_root, 'images', split, image_filename)
    lbl_path = os.path.join(dataset_root, 'labels', split, txt_filename)

    if not os.path.exists(img_path):
        print(f"‚ùå Immagine non trovata: {img_path}")
        return

    img = cv2.imread(img_path)
    img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    h_img, w_img, _ = img.shape

    if os.path.exists(lbl_path):
        with open(lbl_path, 'r') as f:
            for line in f:
                parts = line.strip().split()
                class_id, x_center_norm, y_center_norm, width_norm, height_norm = map(float, parts[:5])

                # Conversione YOLO -> Pixel
                x_center, y_center = x_center_norm * w_img, y_center_norm * h_img
                box_w, box_h = width_norm * w_img, height_norm * h_img

                x1, y1 = int(x_center - box_w / 2), int(y_center - box_h / 2)
                x2, y2 = int(x_center + box_w / 2), int(y_center + box_h / 2)

                cv2.rectangle(img_rgb, (x1, y1), (x2, y2), (0, 255, 0), 2)
                cv2.putText(img_rgb, "Player", (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    plt.figure(figsize=(10, 6))
    plt.imshow(img_rgb)
    plt.axis('off')
    plt.title(f"Sample: {image_name_no_ext} | Split: {split}")
    plt.show()

visualize_sample('SNMOT-060_000001.jpg', split='train')

Training


In [None]:
# ==========================================
# 6. YOLO CONFIGURATION & TRAINING
# ==========================================
import yaml
from ultralytics import YOLO

# 1. Creazione file YAML per il dataset
dataset_config = {
    'path': './dataset_yolo',  # Percorso relativo radice
    'train': 'images/train',
    'val': 'images/test',      # Usiamo test come validazione se non c'√® val
    'names': {0: 'player'}
}

yaml_filename = 'soccernet.yaml'
with open(yaml_filename, 'w') as f:
    yaml.dump(dataset_config, f, default_flow_style=False)
print(f"‚úÖ Configurazione {yaml_filename} creata!")

# 2. Avvio Addestramento
print("\nüöÄ Avvio Training YOLOv11m...")
model = YOLO('yolo11m.pt')

# Addestramento con salvataggio locale nella cartella './runs'
results = model.train(
    data=yaml_filename,
    imgsz=1088,                # Risoluzione alta per il calcio
    epochs=80,
    batch=8,                   # Abbassare se OOM
    rect=False,
    project='./runs',          # <--- Modificato: Salvataggio locale
    name='soccernet_train',    # <--- Modificato: Nome pulito
    cache=True,
    workers=4,
    optimizer='Adam',
    lr0=0.001,
    cos_lr=True,
    patience=20,
    # Augmentation strategy
    mosaic=1.0, hsv_h=0.015, hsv_s=0.7, hsv_v=0.4, fliplr=0.5,
    scale=0.5, blur=0.1, degrees=0.0, mixup=0.0, copy_paste=0.0, close_mosaic=10
)

print("üèÜ Training completato con successo!")

Validazione e metriche

In [None]:
# ==========================================
# 7. VALIDATION & INFERENCE
# ==========================================
import os
from ultralytics import YOLO

# Percorsi relativi aggiornati in base alla cella di training
best_weights = './runs/soccernet_train/weights/best.pt'
yaml_path = 'soccernet.yaml'
test_images_path = './dataset_yolo/images/test'

if not os.path.exists(best_weights):
    print(f"‚ùå Errore: Pesi non trovati in {best_weights}. Eseguire prima il training.")
else:
    print(f"‚úÖ Modello caricato da: {best_weights}")
    model = YOLO(best_weights)

    # --- VALIDAZIONE ---
    print("\nüìä Avvio Validazione sul set di Test...")
    metrics = model.val(
        data=yaml_path,
        split='val',
        imgsz=1088,
        batch=8,
        augment=True,
        conf=0.25,
        iou=0.6,
        plots=True,
        save_json=True,
        project='./runs',
        name='soccernet_val'
    )

    print("\nüèÜ REPORT METRICHE:")
    print(f"üéØ Precision:  {metrics.box.mp:.4f}")
    print(f"üì° Recall:     {metrics.box.mr:.4f}")
    print(f"üìè mAP@50:     {metrics.box.map50:.4f}")
    print(f"üìê mAP@50-95:  {metrics.box.map:.4f}")

    # --- INFERENZA VISIVA ---
    print("\nüé® Avvio generazione predizioni visive (Inference)...")
    # Non serve il ciclo for se usiamo il parametro stream=False.
    # YOLO salva tutto automaticamente grazie a save=True.
    predictions = model.predict(
        source=test_images_path,
        imgsz=1088,
        conf=0.25,
        iou=0.45,
        max_det=50,
        line_width=3,
        show_labels=True,
        show_conf=True,
        augment=True,
        save=True,
        project='./runs',
        name='soccernet_predictions'
    )

    print("\n‚úÖ Predizioni salvate! Puoi trovarle nella cartella './runs/soccernet_predictions'")

# Fine tuning YOLO11 per occlusioni e blur

Train


In [None]:
# ==========================================
# 7. PHASE 2: ADVANCED FINE-TUNING (Motion Blur & Occlusion Handling)
# ==========================================
# Dopo un primo addestramento, implementiamo una pipeline aggressiva per
# risolvere i casi critici: giocatori in corsa (motion blur), telecamere
# fuori fuoco e occlusioni tra giocatori (mixup ed erasing).

import albumentations as A
from ultralytics import YOLO
from ultralytics.data import augment

# --- 1. Patching Ultralytics con Albumentations Custom ---
def custom_albumentations(self, p=1.0, **kwargs):
    self.p = p
    self.transform = None
    self.contains_spatial = False
    try:
        # Pipeline aggressiva specifica per SoccerNet / Sports Tracking
        self.transform = A.Compose([
            # --- BLUR (Anti-Ghosting per movimenti rapidi) ---
            A.OneOf([
                A.MotionBlur(blur_limit=(15, 35), p=0.6),
                A.ZoomBlur(max_factor=1.3, step_factor=0.02, p=0.3),
            ], p=0.6),

            # --- LUCE E QUALIT√Ä (Riprese notturne o artefatti) ---
            A.CLAHE(clip_limit=3.0, tile_grid_size=(8, 8), p=0.3),
            A.GaussNoise(var_limit=(20.0, 80.0), p=0.2),
            A.ImageCompression(quality_lower=60, quality_upper=90, p=0.2),

            # --- COLORE E OCCLUSIONI PARZIALI ---
            A.RandomBrightnessContrast(brightness_limit=0.25, contrast_limit=0.25, p=0.4),
            A.CoarseDropout(max_holes=8, max_height=32, max_width=32, min_holes=2, p=0.2),
        ], bbox_params=A.BboxParams(format='yolo'))
    except ImportError:
        print("‚ö†Ô∏è Albumentations non installato o errore nella composizione.")

# Applichiamo la patch iniettandola nella classe di Ultralytics
augment.Albumentations.__init__ = custom_albumentations
print("‚úÖ Custom Albumentations Pipeline iniettata con successo!")

# --- 2. Avvio Fine-Tuning ---
# Partiamo dai pesi migliori ottenuti nel training precedente
previous_best_weights = './runs/soccernet_train/weights/best.pt'

print(f"\nüöÄ Avvio Fine-Tuning partendo da: {previous_best_weights}")
model_finetune = YOLO(previous_best_weights)

# Addestramento mirato (Fine-Tuning)
results_finetune = model_finetune.train(
    data='soccernet.yaml',
    epochs=100,
    imgsz=1088,
    batch=16,          # Se va in Out Of Memory, abbassa a 8
    patience=20,
    workers=4,
    cache=True,

    # Ottimizzazione specifica per Fine-Tuning
    optimizer='adamw', # AdamW gestisce meglio il weight decay
    lr0=0.002,         # Learning rate pi√π basso per non distruggere i pesi precedenti
    cos_lr=True,

    # --- AUGMENTATION GEOMETRICHE & COLLISIONI ---
    mosaic=1.0,
    mixup=0.25,        # CRUCIALE: Insegna che due giocatori possono sovrapporsi
    erasing=0.5,       # CRUCIALE: Simula occlusioni parziali (gambe tagliate, ecc.)
    scale=0.4,
    degrees=0.0,
    fliplr=0.5,
    flipud=0.0,
    close_mosaic=10,   # Stabilizza le ultime 10 epoche su immagini reali pure

    # Salvataggio
    project='./runs',
    name='soccernet_finetune_blur'
)

print("üèÜ Fine-Tuning completato! Il modello √® ora robusto a blur e occlusioni.")

# RT-DETR L

Configurazione dataset


In [None]:
# ==========================================
# 1. RT-DETR CONFIGURATION & SETUP
# ==========================================
import yaml
import os

# Creazione del file YAML per RT-DETR con percorsi relativi
dataset_config = {
    'path': './dataset_rtdetr',  # Assicurati di avere i dati in questa cartella
    'train': 'images/train',
    'val': 'images/test',        # Usiamo il test set come validazione
    'names': {0: 'player'}
}

yaml_filename = 'soccernet_rtdetr.yaml'
with open(yaml_filename, 'w') as f:
    yaml.dump(dataset_config, f, default_flow_style=False)

print(f"‚úÖ File {yaml_filename} creato con successo!")

definizione classi e augmentation custom



In [None]:
# ==========================================
# 2. ADVANCED CROWD AUGMENTATION ENGINE
# ==========================================
# Implementazione di una pipeline di Copy-Paste intelligente:
# Estrae giocatori esistenti e li incolla casualmente SOLO sull'erba (tramite maschera convessa),
# simulando situazioni di "Crowd" (affollamento) e occlusioni tipiche del calcio.

import cv2
import numpy as np
import random
import albumentations as A
import ultralytics
from ultralytics.data import augment
from ultralytics.utils.instance import Instances

def get_field_mask_fast(image_bgr):
    """Genera una maschera convessa dell'erba per incollare i giocatori in zone sicure."""
    h, w = image_bgr.shape[:2]
    hsv = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2HSV)
    mask = cv2.inRange(hsv, (35, 50, 50), (85, 255, 255))

    kernel_open = cv2.getStructuringElement(cv2.MORPH_RECT, (5, 5))
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel_open)

    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    if not contours: return np.zeros((h, w), dtype=np.uint8)

    largest_cnt = max(contours, key=cv2.contourArea)
    if cv2.contourArea(largest_cnt) < (h * w * 0.05):
        return np.zeros((h, w), dtype=np.uint8)

    hull = cv2.convexHull(largest_cnt)
    safe_mask = np.zeros((h, w), dtype=np.uint8)
    cv2.drawContours(safe_mask, [hull], -1, 255, thickness=cv2.FILLED)

    erosion_size = int(h * 0.05)
    kernel_erode = cv2.getStructuringElement(cv2.MORPH_RECT, (erosion_size, erosion_size))
    return cv2.erode(safe_mask, kernel_erode, iterations=1)

def apply_soccer_copy_paste_crowd(image, bboxes_xyxy, classes, p=0.5):
    """Incolla cloni di giocatori esistenti per aumentare la densit√† della folla."""
    if random.random() > p or len(bboxes_xyxy) == 0:
        return image, bboxes_xyxy, classes

    h_img, w_img = image.shape[:2]
    field_mask = get_field_mask_fast(image)

    if cv2.countNonZero(field_mask) < (h_img * w_img * 0.05):
        return image, bboxes_xyxy, classes

    out_bboxes, out_classes = list(bboxes_xyxy), list(classes)
    num_to_paste = random.randint(5, 15)
    valid_indices = list(range(len(bboxes_xyxy)))

    for _ in range(num_to_paste):
        if not valid_indices: break
        idx = random.choice(valid_indices)
        x1, y1, x2, y2 = map(int, bboxes_xyxy[idx])
        cls_id = classes[idx]

        x1, y1 = max(0, x1), max(0, y1)
        x2, y2 = min(w_img, x2), min(h_img, y2)
        w_obj, h_obj = x2 - x1, y2 - y1

        if w_obj <= 5 or h_obj <= 5: continue
        patch = image[y1:y2, x1:x2].copy()

        for _ in range(15): # Max 15 tentativi di posizionamento
            rx = random.randint(0, w_img - w_obj)
            ry = random.randint(0, h_img - h_obj)
            feet_x, feet_y = min(rx + w_obj // 2, w_img - 1), min(ry + h_obj, h_img - 1)

            if field_mask[feet_y, feet_x] > 0:
                image[ry:ry+h_obj, rx:rx+w_obj] = patch
                out_bboxes.append([rx, ry, rx+w_obj, ry+h_obj])
                out_classes.append(cls_id)
                break

    return image, np.array(out_bboxes, dtype=np.float32), np.array(out_classes, dtype=np.float32)

class CustomSoccerAugmentV5_Final:
    """Classe custom per iniettare Copy-Paste e trasformazioni Albumentations."""
    def __init__(self, p=1.0):
        self.p = p
        self.transform = A.Compose([
            A.OneOf([A.MotionBlur(blur_limit=(10, 25), p=0.6), A.ZoomBlur(max_factor=1.15, step_factor=0.02, p=0.3)], p=0.5),
            A.GaussNoise(var_limit=(20.0, 60.0), p=0.1),
            A.RandomBrightnessContrast(p=0.4),
            A.Affine(shear={'x': (-10, 10)}, p=0.3),
        ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['class_labels'], min_visibility=0.1))

    def __call__(self, labels):
        img = labels.get('img')
        if img is None or 'instances' not in labels: return labels

        h, w = img.shape[:2]
        labels['shape'] = (h, w)
        orig_instances = labels['instances']

        try:
            bboxes_xyxy, classes = labels['instances'].bboxes, labels['cls'].squeeze()
            if classes.ndim == 0: classes = classes.reshape(-1)

            img_aug, aug_bboxes, aug_classes = apply_soccer_copy_paste_crowd(img, bboxes_xyxy, classes, p=0.6)

            if not np.isfinite(aug_bboxes).all(): raise ValueError("NaN in CopyPaste boxes")

            safe_bboxes, safe_classes = [], []
            h_aug, w_aug = img_aug.shape[:2]
            for box, cls in zip(aug_bboxes, aug_classes):
                x1, y1, x2, y2 = np.clip(box, 0, [w_aug-1, h_aug-1, w_aug-1, h_aug-1])
                if (x2 > x1 + 2) and (y2 > y1 + 2):
                    safe_bboxes.append([x1, y1, x2, y2])
                    safe_classes.append(cls)

            if not safe_bboxes: raise ValueError("No boxes left after clipping")

            transformed = self.transform(image=img_aug, bboxes=safe_bboxes, class_labels=safe_classes)
            final_img = transformed['image']
            final_boxes = np.array(transformed['bboxes'], dtype=np.float32)

            if not np.isfinite(final_boxes).all(): raise ValueError("NaN after Albumentations")

            labels['img'] = final_img
            labels['cls'] = np.array(transformed['class_labels'], dtype=np.float32).reshape(-1, 1)
            labels['shape'] = final_img.shape[:2]

            new_inst = Instances(final_boxes, segments=np.zeros((0, 2), dtype=np.float32), bbox_format="xyxy", normalized=False)
            new_inst.shape = final_img.shape[:2]
            labels['instances'] = new_inst
            labels['bboxes'] = final_boxes
            return labels

        except Exception:
            labels['instances'] = orig_instances
            labels['shape'] = (h, w)
            return labels

class AlbumentationsHijack:
    def __init__(self, *args, **kwargs):
        self.crowd_transform = CustomSoccerAugmentV5_Final(p=0.7)
    def __call__(self, labels):
        return self.crowd_transform(labels)

# Iniezione della classe custom all'interno del motore di Ultralytics
ultralytics.data.augment.Albumentations = AlbumentationsHijack
print("‚úÖ HIJACK ATTIVO: Extreme Crowd Augmentation V2 inizializzata con successo!")

‚úÖ HIJACK ATTIVO: Usa CustomSoccerAugmentV4_Crowd_Final


train

In [None]:
# ==========================================
# 3. RT-DETR TRAINING
# ==========================================
from ultralytics import YOLO

print("üöÄ Avvio Training RT-DETR-Large...")

# Ultralytics supporta RT-DETR nativamente usando la stessa API di YOLO
model = YOLO('rtdetr-l.pt')

results = model.train(
    data='soccernet_rtdetr.yaml',
    epochs=100,
    imgsz=1088,
    batch=8,           # Assicurati di avere abbastanza VRAM per un Transformer
    patience=20,

    # --- AUGMENTATION STRUTTURALI ---
    mosaic=1.0,
    mixup=0.2,         # Fondamentale per gestire collisioni nel Crowd
    erasing=0.4,       # Simula occlusioni
    scale=0.5,
    fliplr=0.5,
    flipud=0.0,

    # --- PERFORMANCE & OTTIMIZZAZIONE ---
    workers=8,
    cache='disk',      # Usa 'ram' se hai molta memoria di sistema
    optimizer='adamW', # AdamW √® fortemente raccomandato per i Transformer (rispetto a SGD)
    lr0=0.001,
    lrf=0.01,
    close_mosaic=15,   # Stabilizzazione finale su dati reali

    # --- SALVATAGGIO LOCALE ---
    project='./runs/RtDetr',
    name='Crowd_Blur_Extreme_Tuned'
)

print("üèÜ Training RT-DETR completato!")

Validazione e metriche

In [None]:
# ==========================================
# 4. RT-DETR VALIDATION & METRICS
# ==========================================
from ultralytics import YOLO
import os

# Percorso relativo generato automaticamente dalla cella precedente
weights_path = './runs/RtDetr/Crowd_Blur_Extreme_Tuned/weights/best.pt'
yaml_path = 'soccernet_rtdetr.yaml'

if os.path.exists(weights_path):
    print(f"‚úÖ Caricamento modello RT-DETR da: {weights_path}")
    model = YOLO(weights_path)

    print("üìä Avvio Validazione sul Test Set...")

    metrics = model.val(
        data=yaml_path,
        split='val',
        imgsz=1088,
        batch=8,
        augment=True,
        conf=0.25,
        iou=0.6,
        plots=True,
        save_json=True,
        project='./runs/RtDetr',
        name='rtdetr_metrics',
        exist_ok=True
    )

    print("\n" + "="*40)
    print("üèÜ REPORT DI VALIDAZIONE RT-DETR")
    print("="*40)
    print(f"üéØ Precision (Media):  {metrics.box.mp:.4f}")
    print(f"üì° Recall (Media):     {metrics.box.mr:.4f}")
    print(f"üìè mAP @ 50%:          {metrics.box.map50:.4f}")
    print(f"üìê mAP @ 50-95%:       {metrics.box.map:.4f}")

    if metrics.box.nc > 1:
        print("\n--- Dettaglio per Classe ---")
        for i, c in enumerate(metrics.names.values()):
            print(f"   {c}: P={metrics.box.p[i]:.3f}, R={metrics.box.r[i]:.3f}")

    print("\nüìÇ Grafici e metriche salvati in: ./runs/RtDetr/rtdetr_metrics")
else:
    print(f"‚ùå Errore: Il file pesi non esiste in {weights_path}. Eseguire prima il training.")

# Tracking - Conversione del dataset reid

In [None]:
# ==========================================
# 1. RE-ID DATASET PREPARATION & CLEANING
# ==========================================
# In questa fase estraiamo i metadati dal JSON di SoccerNet, creiamo ID univoci (Hash MD5)
# per i giocatori attraverso diverse telecamere e filtriamo le identit√† con troppe poche
# immagini, un passaggio cruciale per generare batch validi per la Triplet Loss.

import os
import json
import shutil
import hashlib
from pathlib import Path
from tqdm import tqdm

# --- CONFIGURAZIONE PERCORSI RELATIVI ---
RAW_DATASET_ROOT = Path('./dataset_reid_raw')
CLEAN_DATASET_ROOT = Path('./dataset_reid_clean')

def get_unique_pid(relative_path, clazz, player_id):
    """Genera un ID univoco crittografico (MD5) basato su Partita + Squadra + PlayerID"""
    raw_string = f"{relative_path}_{clazz}_{player_id}"
    return hashlib.md5(raw_string.encode()).hexdigest()[:10]

def construct_filename(item):
    """Ricostruisce il filename originale dai metadati JSON"""
    return f"{item['bbox_idx']}-{item['action_idx']}-{item['person_uid']}-{item['frame_idx']}-{item['clazz']}-{item['id']}-{item['UAI']}-{item['height']}x{item['width']}.png"

def restructure_reid_data(split='train'):
    """Estrae le immagini usando il JSON e le raggruppa in cartelle per ID univoco."""
    source_split_dir = RAW_DATASET_ROOT / split
    target_split_dir = CLEAN_DATASET_ROOT / split
    json_path = source_split_dir / 'bbox_info.json' # Adatta il nome se necessario

    if not json_path.exists():
        print(f"‚ö†Ô∏è JSON non trovato per lo split '{split}' in {json_path}. Salto.")
        return

    if target_split_dir.exists():
        shutil.rmtree(target_split_dir)
    target_split_dir.mkdir(parents=True, exist_ok=True)

    with open(json_path, 'r') as f:
        data = json.load(f)

    # Adattamento per strutture JSON piatte o annidate
    items = list(data.values()) if isinstance(data, dict) else data

    unique_identities_map = {}
    pid_counter = 0
    moved = 0

    print(f"\nüöÄ Ristrutturazione {split.upper()} set in corso...")
    for item in tqdm(items, desc=f"Processing {split}"):
        try:
            player_id = str(item.get('id', 'None'))
            if player_id == "None": continue # Scartiamo le identit√† non note

            unique_hash = get_unique_pid(item['relative_path'], item['clazz'], player_id)
            if unique_hash not in unique_identities_map:
                unique_identities_map[unique_hash] = f"{pid_counter:05d}"
                pid_counter += 1

            pid_folder = unique_identities_map[unique_hash]
            filename = construct_filename(item)

            # Cerca il file sorgente (gestendo possibili discrepanze di path)
            src_path = source_split_dir / item['relative_path'] / filename
            if not src_path.exists():
                src_path = source_split_dir / split / item['relative_path'] / filename # Fallback nested

            if src_path.exists():
                dest_dir = target_split_dir / pid_folder
                dest_dir.mkdir(exist_ok=True)
                shutil.copy2(src_path, dest_dir / filename)
                moved += 1

        except Exception as e: continue

    print(f"‚úÖ {split.upper()}: Create {pid_counter} identit√†, {moved} immagini spostate.")

def clean_reid_dataset(target_dir, min_images=4):
    """
    Filtro vitale per Triplet Loss: scarta identit√† con meno di `min_images`.
    La Triplet Loss richiede Anchor, Positive e Negative, quindi servono pi√π scatti per ID.
    """
    if not target_dir.exists(): return

    trash_dir = CLEAN_DATASET_ROOT / 'trash_bin'
    trash_dir.mkdir(exist_ok=True)

    classes = [d for d in os.listdir(target_dir) if os.path.isdir(target_dir / d)]
    moved_count = 0

    print(f"\nüßπ Pulizia {target_dir.name}: Rimuovo classi con < {min_images} immagini...")
    for class_name in tqdm(classes, desc="Filtering IDs"):
        class_path = target_dir / class_name
        imgs = [f for f in os.listdir(class_path) if f.lower().endswith(('.png', '.jpg'))]

        if len(imgs) < min_images:
            shutil.move(str(class_path), str(trash_dir / f"{target_dir.name}_{class_name}"))
            moved_count += 1

    print(f"üóëÔ∏è Scartate {moved_count} classi. Rimaste: {len(classes) - moved_count} classi valide per il training.")

# --- ESECUZIONE DELLA PIPELINE ---
if RAW_DATASET_ROOT.exists():
    for current_split in ['train', 'valid', 'test']:
        restructure_reid_data(current_split)
        clean_reid_dataset(CLEAN_DATASET_ROOT / current_split, min_images=4)
else:
    print("‚ö†Ô∏è Cartella dati RAW non trovata. Inserire i dati in './dataset_reid_raw' per eseguire.")

# Organizzazione del dataset di tracking per Re-ID

In [None]:
# ==========================================
# 1. RE-ID DATASET GENERATION (Crops Extraction)
# ==========================================
import os
import cv2
from tqdm.notebook import tqdm
from collections import defaultdict
import shutil

# --- CONFIGURAZIONE PERCORSI RELATIVI ---
SOURCE_ROOT = './dataset_soccernet_yolo'
OUTPUT_ROOT = './soccernet_reid_robust'

TRAIN_SEQUENCES = ['SNMOT-060', 'SNMOT-065', 'SNMOT-070', 'SNMOT-097', 'SNMOT-107']
TEST_SEQUENCES = ['SNMOT-116', 'SNMOT-129', 'SNMOT-130', 'SNMOT-141']

# Variabili globali per tracciare le identit√† univoche attraverso le sequenze
global_identity_map = {}
next_global_pid = 0

def parse_gameinfo(ini_path):
    """Estrae GameID e Tempo di gioco per creare identit√† univoche."""
    game_id, half_period = None, "1"
    tracklet_map = {}
    with open(ini_path, 'r') as f:
        for line in f:
            line = line.strip()
            if 'gameID=' in line: game_id = line.split('=')[1].strip()
            if 'gameTimeStart=' in line:
                time_val = line.split('=')[1].strip()
                half_period = time_val.split(' - ')[0].strip() if ' - ' in time_val else time_val[0]
            if line.startswith('trackletID_'):
                parts = line.split('=')
                local_id = int(parts[0].split('_')[1])
                tracklet_map[local_id] = parts[1].strip()
    return game_id, half_period, tracklet_map

def extract_crops(sequences, split_type='train', gallery_freq=10):
    """Estrae i crop dei giocatori e li salva in Train o Query/Gallery."""
    global next_global_pid
    split_source = os.path.join(SOURCE_ROOT, 'train' if split_type == 'train' else 'test')

    print(f"\nüöÄ Inizio estrazione {split_type.upper()} SET...")

    for seq_name in sequences:
        seq_path = os.path.join(split_source, seq_name)
        ini_path, gt_path = os.path.join(seq_path, 'gameinfo.ini'), os.path.join(seq_path, 'gt', 'gt.txt')
        img_dir = os.path.join(seq_path, 'img1')

        if not os.path.exists(ini_path) or not os.path.exists(gt_path): continue

        game_id, half_period, tracklet_map = parse_gameinfo(ini_path)
        if not game_id: continue

        # Mapping Locale -> Globale
        local_to_global = {}
        for local_id, label in tracklet_map.items():
            if 'ball' in label.lower(): continue
            identity_key = (game_id, half_period, label)
            if identity_key not in global_identity_map:
                global_identity_map[identity_key] = next_global_pid
                next_global_pid += 1
            local_to_global[local_id] = global_identity_map[identity_key]

        # Lettura GT
        frame_data = defaultdict(list)
        with open(gt_path, 'r') as f:
            for line in f:
                p = line.strip().split(',')
                frame, obj_id, x, y, w, h = int(p[0]), int(p[1]), float(p[2]), float(p[3]), float(p[4]), float(p[5])
                if obj_id in local_to_global:
                    frame_data[frame].append((obj_id, int(x), int(y), int(w), int(h)))

        # Estrazione Immagini
        id_saved_count = defaultdict(int)
        for frame_idx in tqdm(sorted(frame_data.keys()), desc=f"Cropping {seq_name}", leave=False):
            img_path = os.path.join(img_dir, f"{frame_idx:06d}.jpg")
            if not os.path.exists(img_path): continue

            image = cv2.imread(img_path)
            if image is None: continue
            h_img, w_img, _ = image.shape

            for (local_id, x, y, w, h) in frame_data[frame_idx]:
                if w < 10 or h < 10: continue
                x1, y1, x2, y2 = max(0, x), max(0, y), min(w_img, int(x+w)), min(h_img, int(y+h))
                crop = image[y1:y2, x1:x2]

                pid = local_to_global[local_id]
                pid_str = f"{pid:05d}"
                save_name = f"{seq_name}_{frame_idx}_{local_id}.jpg"

                if split_type == 'train':
                    save_dir = os.path.join(OUTPUT_ROOT, 'train', pid_str)
                else: # Gestione Query/Gallery
                    if id_saved_count[pid] == 0:
                        save_dir = os.path.join(OUTPUT_ROOT, 'test', 'query', pid_str)
                    elif id_saved_count[pid] % gallery_freq == 0:
                        save_dir = os.path.join(OUTPUT_ROOT, 'test', 'gallery', pid_str)
                    else:
                        id_saved_count[pid] += 1
                        continue

                os.makedirs(save_dir, exist_ok=True)
                cv2.imwrite(os.path.join(save_dir, save_name), crop)
                id_saved_count[pid] += 1

# Esecuzione
if os.path.exists(OUTPUT_ROOT): shutil.rmtree(OUTPUT_ROOT)
extract_crops(TRAIN_SEQUENCES, split_type='train')
extract_crops(TEST_SEQUENCES, split_type='test', gallery_freq=10)
print(f"\n‚úÖ Generazione completata! Identit√† totali create: {next_global_pid}")

In [None]:
# ==========================================
# 2. SMART DATASET BALANCING & MIGRATION
# ==========================================
import os
import shutil
from tqdm.notebook import tqdm

TRAIN_DIR = './soccernet_reid_robust/train'
QUERY_DIR = './soccernet_reid_robust/test/query'
GALLERY_DIR = './soccernet_reid_robust/test/gallery'

def balance_dataset_smart(root_dir, keep_freq=5, min_threshold=50):
    """Decima le immagini delle classi sovrarappresentate salvaguardando la coda lunga."""
    print(f"\nüöÄ Inizio Bilanciamento in: {root_dir} (Keep 1 every {keep_freq} if total > {min_threshold})")
    stats = {"before": 0, "deleted": 0, "preserved_classes": 0}

    for pid in tqdm(sorted(os.listdir(root_dir)), desc="Balancing Classes"):
        pid_path = os.path.join(root_dir, pid)
        if not os.path.isdir(pid_path): continue

        imgs = sorted([f for f in os.listdir(pid_path) if f.endswith('.jpg')])
        stats["before"] += len(imgs)

        if len(imgs) <= min_threshold:
            stats["preserved_classes"] += 1
            continue

        for i, img_name in enumerate(imgs):
            if i % keep_freq != 0:
                os.remove(os.path.join(pid_path, img_name))
                stats["deleted"] += 1

    print(f"‚úÖ Bilanciamento completato! Rimosse {stats['deleted']} immagini superflue. "
          f"Classi povere salvaguardate: {stats['preserved_classes']}")

def migrate_test_to_train(num_classes_to_move=50):
    """Sposta classi dal Test al Train per aumentare la diversit√† del training set."""
    gallery_classes = sorted(os.listdir(GALLERY_DIR))
    if len(gallery_classes) < num_classes_to_move: return

    last_train_id = int(sorted(os.listdir(TRAIN_DIR))[-1]) if os.listdir(TRAIN_DIR) else -1
    next_id = last_train_id + 1

    print(f"\nüöÄ Migrazione di {num_classes_to_move} classi da Test a Train...")
    for old_class in tqdm(gallery_classes[:num_classes_to_move], desc="Migrating"):
        new_class_name = f"{next_id:05d}"
        dst_path = os.path.join(TRAIN_DIR, new_class_name)
        os.makedirs(dst_path, exist_ok=True)

        for src_dir in [QUERY_DIR, GALLERY_DIR]:
            src_class_path = os.path.join(src_dir, old_class)
            if os.path.exists(src_class_path):
                for img in os.listdir(src_class_path):
                    shutil.move(os.path.join(src_class_path, img), os.path.join(dst_path, img))
                os.rmdir(src_class_path)
        next_id += 1
    print(f"‚úÖ Migrazione completata! Nuovo range Train ID: fino a {next_id-1:05d}")

# Esecuzione Pipeline di Ottimizzazione
migrate_test_to_train(num_classes_to_move=30)
balance_dataset_smart(TRAIN_DIR, keep_freq=5, min_threshold=50)

In [None]:
# ==========================================
# 3. DATASET EXPLORATORY DATA ANALYSIS (EDA)
# ==========================================
import os
import numpy as np
import matplotlib.pyplot as plt

subsets = {
    'TRAIN': './soccernet_reid_robust/train',
    'QUERY': './soccernet_reid_robust/test/query',
    'GALLERY': './soccernet_reid_robust/test/gallery'
}

plt.figure(figsize=(18, 5))

for i, (name, path) in enumerate(subsets.items()):
    if not os.path.exists(path): continue

    pids = [p for p in sorted(os.listdir(path)) if os.path.isdir(os.path.join(path, p))]
    counts = np.array([len(os.listdir(os.path.join(path, pid))) for pid in pids])

    if len(counts) == 0: continue

    print(f"\nüìä {name} SET -> Classi: {len(pids)} | Img Totali: {np.sum(counts)} | "
          f"Media/Classe: {np.mean(counts):.1f} | Max Img: {np.max(counts)}")

    plt.subplot(1, 3, i+1)
    plt.hist(counts, bins=30, color='#4CAF50' if name=='TRAIN' else '#2196F3', edgecolor='black', alpha=0.7)
    plt.title(f'Distribuzione {name}\n(Tot: {len(counts)} classi)')
    plt.xlabel('Numero di Immagini per Classe')
    plt.ylabel('Frequenza (N. Classi)')
    plt.axvline(np.mean(counts), color='red', linestyle='dashed', linewidth=2, label=f'Media: {np.mean(counts):.1f}')
    plt.grid(axis='y', alpha=0.3)
    plt.legend()

plt.tight_layout()
plt.show()

# Re-Identification



In [None]:
# ==========================================
# 1. CUSTOM AUGMENTATIONS & TRANSFORMS
# ==========================================
import torch
import torchvision.transforms as T
import random
import cv2
import numpy as np
from PIL import Image

class RandomMotionBlur:
    """
    Applica un Motion Blur casuale (effetto scia).
    Simula movimenti rapidi dei giocatori o pan/tilt veloci della telecamera.
    """
    def __init__(self, p=0.5, kernel_size=(3, 10), angle_range=(-45, 45)):
        self.p = p
        self.kernel_size = kernel_size
        self.angle_range = angle_range

    def __call__(self, img):
        if random.random() > self.p: return img

        image = np.array(img)
        k = random.randint(self.kernel_size[0], self.kernel_size[1])
        if k % 2 == 0: k += 1

        # Generazione Kernel direzionale
        kernel = np.zeros((k, k))
        kernel[int((k-1)/2), :] = np.ones(k)
        kernel /= k

        # Rotazione casuale per la direzione del movimento
        angle = random.randint(self.angle_range[0], self.angle_range[1])
        if angle != 0:
            M = cv2.getRotationMatrix2D((k/2, k/2), angle, 1)
            kernel = cv2.warpAffine(kernel, M, (k, k))

        blurred = cv2.filter2D(image, -1, kernel)
        return Image.fromarray(blurred)

def build_custom_transforms(height=256, width=128, is_train=True):
    """Costruisce la pipeline di trasformazioni (PIL -> Tensor -> Normalization)."""
    if is_train:
        heavy_transforms_pil = T.Compose([
            RandomMotionBlur(p=0.5, kernel_size=(3, 9), angle_range=(-15, 15)),
            T.RandomAffine(degrees=0, translate=(0.05, 0.05), scale=(0.8, 1.2), fill=(85, 115, 85)),
            T.ColorJitter(brightness=0.2, contrast=0.15, saturation=0.15, hue=0.1),
            T.RandomApply([T.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5))], p=0.3),
        ])

        return T.Compose([
            T.Resize((height, width)),
            T.Pad(10, padding_mode='edge'),
            T.RandomCrop((height, width)),
            T.RandomHorizontalFlip(p=0.5),
            T.RandomApply([heavy_transforms_pil], p=0.7),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            T.RandomErasing(p=0.5, scale=(0.02, 0.40), ratio=(0.3, 3.3), value=0) # Sul tensore!
        ])
    else:
        return T.Compose([
            T.Resize((height, width)),
            T.ToTensor(),
            T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

print("‚úÖ Trasformazioni Custom Inizializzate!")

In [None]:
# ==========================================
# 2. SOCCER-NET RE-ID DATASET CLASS
# ==========================================
import os
import torchreid
from torchreid.data import ImageDataset

class SoccerNetReID(ImageDataset):
    """
    Dataset Adapter per caricare i dati strutturati in Torchreid.
    """
    def __init__(self, root='', **kwargs):
        self.root = root
        self.train_dir = os.path.join(self.root, 'train')
        self.query_dir = os.path.join(self.root, 'test', 'query')
        self.gallery_dir = os.path.join(self.root, 'test', 'gallery')

        train = self.process_dir(self.train_dir, is_query=False)
        query = self.process_dir(self.query_dir, is_query=True)
        gallery = self.process_dir(self.gallery_dir, is_query=False)

        super().__init__(train, query, gallery, **kwargs)

    def process_dir(self, dir_path, is_query=False):
        data = []
        if not os.path.exists(dir_path): return data

        # Ordine alfabetico per garantire che gli ID (0, 1, 2...) siano sempre coerenti
        pid_dirs = sorted([p for p in os.listdir(dir_path) if os.path.isdir(os.path.join(dir_path, p))])

        for pid_idx, pid_folder in enumerate(pid_dirs):
            pid_path = os.path.join(dir_path, pid_folder)
            imgs = [f for f in os.listdir(pid_path) if f.endswith(('.png', '.jpg'))]

            if len(imgs) < 1: continue

            for img_name in imgs:
                img_path = os.path.join(pid_path, img_name)
                camid = 1 if is_query else 0 # Torchreid richiede ID camera fittizi se non noti
                data.append((img_path, pid_idx, camid))

        return data

# --- REGISTRAZIONE SICURA IN TORCHREID ---
dataset_name = 'soccernet-reid'
if dataset_name in torchreid.data.datasets.__image_datasets:
    del torchreid.data.datasets.__image_datasets[dataset_name]

torchreid.data.register_image_dataset(dataset_name, SoccerNetReID)
print(f"‚úÖ Dataset '{dataset_name}' registrato con successo nel framework Torchreid!")

In [None]:
# ==========================================
# 3. DATAMANAGER & PK-SAMPLER VISUALIZATION
# ==========================================
from torchreid.data import ImageDataManager
import matplotlib.pyplot as plt
from collections import defaultdict

print("üöÄ Inizializzazione DataManager (con RandomIdentitySampler per Triplet Loss)...")

# Setup DataManager
datamanager = ImageDataManager(
    root='./dataset_reid_clean',   # Punta direttamente ai dati puliti dello step precedente!
    sources='soccernet-reid',
    targets='soccernet-reid',
    height=256,
    width=128,
    batch_size_train=64,
    batch_size_test=100,
    transforms=['random_flip'],    # Override necessario interno, ma usiamo la nostra pipeline
    train_sampler='RandomIdentitySampler', # FONDAMENTALE per la Triplet Loss
    num_instances=4,               # K = 4 immagini per ogni P (persona)
    workers=2
)

# Applichiamo le nostre trasformazioni avanzate
datamanager.train_loader.dataset.transform = build_custom_transforms(is_train=True)
datamanager.test_loader['soccernet-reid']['query'].dataset.transform = build_custom_transforms(is_train=False)
datamanager.test_loader['soccernet-reid']['gallery'].dataset.transform = build_custom_transforms(is_train=False)

print(f"‚úÖ DataLoader pronto. Immagini di training caricate: {len(datamanager.train_loader.dataset)}")

# --- VISUALIZZAZIONE DEL BATCH ---
def denormalize(tensor, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]):
    for t, m, s in zip(tensor, mean, std): t.mul_(s).add_(m)
    return tensor

def visualize_pk_batch(dataloader, num_instances=4):
    """Estrazione e visualizzazione di un batch per validare il PxK Sampler."""
    batch = next(iter(dataloader))
    imgs, pids = batch['img'] if isinstance(batch, dict) else batch[0], batch['pid'] if isinstance(batch, dict) else batch[1]

    unique_pids, counts = torch.unique(pids, return_counts=True)

    print("\nüïµÔ∏è Analisi del Sampler (PK-Batch Check):")
    print(f"   ID unici nel batch: {len(unique_pids)}")
    print(f"   Shape Immagini: {imgs.shape}")

    if all(c == num_instances for c in counts):
        print(f"   ‚úÖ SUCCESS: Ogni ID ha esattamente {num_instances} istanze. Triplet Loss ottimizzata!")
    else:
        print(f"   ‚ö†Ô∏è WARNING: Il sampler non rispetta strettamente K={num_instances}.")

    # Plot (Primi 4 ID)
    grouped_images = defaultdict(list)
    for i, pid in enumerate(pids): grouped_images[pid.item()].append(imgs[i])
    selected_pids = list(grouped_images.keys())[:4]

    if not selected_pids: return

    fig, axes = plt.subplots(len(selected_pids), num_instances, figsize=(12, 3 * len(selected_pids)))
    if len(selected_pids) == 1: axes = np.expand_dims(axes, axis=0)

    for row_idx, pid in enumerate(selected_pids):
        for col_idx in range(num_instances):
            ax = axes[row_idx, col_idx]
            if col_idx < len(grouped_images[pid]):
                img_t = denormalize(grouped_images[pid][col_idx].clone())
                ax.imshow(np.clip(img_t.permute(1, 2, 0).cpu().numpy(), 0, 1))
                ax.set_title(f"ID: {pid}")
            else:
                ax.text(0.5, 0.5, 'Missing', ha='center')
            ax.axis('off')

    plt.suptitle("Validazione Batch: P identit√† x K istanze", fontsize=16)
    plt.tight_layout()
    plt.show()

# Esegui l'analisi
visualize_pk_batch(datamanager.train_loader)

# Avvio training Architettura Ibrida CNN-Transformer
OSNet + CBAM + MHSA + GeM

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchreid
from torchreid import models, metrics
from torchreid.losses import TripletLoss, CrossEntropyLoss

import os
import time
import datetime
import numpy as np
import shutil


# riporto la classe per mantenere early stopping, override dei metodi save e run.
class EarlyStoppingTripletEngine(torchreid.engine.ImageTripletEngine):

    def save_model(self, epoch, rank1, map_score, save_dir, is_best=False):
        """Override: Salva solo Last (sovrascritto) e Best."""
        state = {
            'state_dict': self.model.state_dict(),
            'epoch': epoch + 1,
            'rank1': rank1,
            'map': map_score,  # Ora possiamo salvarlo correttamente
            'optimizer': self.optimizer.state_dict(),
            'scheduler': self.scheduler.state_dict() if self.scheduler else None
        }

        # 1. Salva/Sovrascrive sempre model_last.pth.tar
        last_fpath = os.path.join(save_dir, 'model/model_last.pth.tar')
        torch.save(state, last_fpath)

        # 2. Se √® best, crea una copia chiamata model_best.pth.tar
        if is_best:
            best_fpath = os.path.join(save_dir, 'model/model_best.pth.tar')
            shutil.copy(last_fpath, best_fpath)

    def test(self, dist_metric='euclidean', normalize_feature=True,
             visrank=False, visrank_topk=10, save_dir='',
             use_metric_cuhk03=False, ranks=[1, 5, 10, 20], rerank=False):
        """
        Override semplificato: usa la logica interna _evaluate ma ritorna anche mAP.
        """
        self.set_model_mode('eval')
        targets = list(self.test_loader.keys())

        final_rank1 = 0
        final_mAP = 0

        for name in targets:
            domain = 'source' if name in self.datamanager.sources else 'target'
            print('##### Evaluating {} ({}) #####'.format(name, domain))

            # Recuperiamo i loader specifici
            query_loader = self.test_loader[name]['query']
            gallery_loader = self.test_loader[name]['gallery']

            # USIAMO IL METODO NATIVO DEL PADRE
            # _evaluate fa gi√†: extract_features -> compute_distmat -> evaluate_rank
            rank1, mAP = self._evaluate(
                dataset_name=name,
                query_loader=query_loader,
                gallery_loader=gallery_loader,
                dist_metric=dist_metric,
                normalize_feature=normalize_feature,
                visrank=visrank,
                visrank_topk=visrank_topk,
                save_dir=save_dir,
                use_metric_cuhk03=use_metric_cuhk03,
                ranks=ranks,
                rerank=rerank
            )

            # Logghiamo su Tensorboard anche qui per sicurezza
            if self.writer is not None:
                self.writer.add_scalar(f'Test/{name}/rank1', rank1, self.epoch)
                self.writer.add_scalar(f'Test/{name}/mAP', mAP, self.epoch)

            # Aggiorniamo i valori finali (nel caso multi-dataset prende l'ultimo)
            final_rank1 = rank1
            final_mAP = mAP

        # QUESTA √à LA MODIFICA CHIAVE: Ritorniamo la tupla
        return final_rank1, final_mAP


    def run(self, save_dir='log', max_epoch=60, start_epoch=0,
            print_freq=10, fixbase_epoch=0, open_layers=None,
            start_eval=0, eval_freq=-1, test_only=False,
            dist_metric='euclidean', normalize_feature=True,
            visrank=False, visrank_topk=10, use_metric_cuhk03=False,
            ranks=[1, 5, 10, 20], rerank=False,
            patience=10): # <--- Parametro aggiunto

        # Setup iniziale (copiato dalla logica base)
        if test_only:
            self.test(dist_metric, normalize_feature, visrank, visrank_topk,
                      save_dir, use_metric_cuhk03, ranks, rerank)
            return

        if self.writer is None:
            from torch.utils.tensorboard import SummaryWriter
            self.writer = SummaryWriter(log_dir=save_dir)

        time_start = time.time()
        self.start_epoch = start_epoch
        self.max_epoch = max_epoch
        print(f'=> Start training with Early Stopping (Patience: {patience})')

        # Variabili per Early Stopping
        best_rank1 = -np.inf
        patience_counter = 0

        for self.epoch in range(self.start_epoch, self.max_epoch):
            # 1. Fase di Training (usa il metodo della classe padre)
            self.train(print_freq=print_freq, fixbase_epoch=fixbase_epoch, open_layers=open_layers)

            # 2. Valutazione (se siamo nell'epoca giusta o se eval_freq √® settato)
            # Nota: eval_freq=-1 significa "valuta solo alla fine", qui forziamo
            # una valutazione se vogliamo usare l'early stopping in modo sensato,
            # tipicamente si valuta ogni epoca dopo start_eval.
            should_eval = (self.epoch + 1) >= start_eval and \
                          (eval_freq > 0 and (self.epoch+1) % eval_freq == 0)

            if should_eval:
                print(f"üîç Validazione Epoca {self.epoch + 1}...")

                # Esegue il test (usa il metodo della classe padre)
                rank1, mAP = self.test(
                    dist_metric=dist_metric,
                    normalize_feature=normalize_feature,
                    visrank=visrank,
                    visrank_topk=visrank_topk,
                    save_dir=save_dir,
                    use_metric_cuhk03=use_metric_cuhk03,
                    ranks=ranks
                )

                # Log su Tensorboard
                if self.writer is not None:
                    self.writer.add_scalar('Val/Rank1', rank1, self.epoch)

                # 3. Logica Early Stopping & Salvataggio
                is_best = False

                # Caso A: Miglior Rank-1
                if rank1 > best_rank1:
                    print(f"‚≠ê NUOVO BEST Rank-1: {rank1:.1%} (prev: {best_rank1:.1%}) | mAP: {mAP:.1%}")
                    best_rank1 = rank1
                    best_map = mAP
                    is_best = True

                # Caso B: Stesso Rank-1, Miglior mAP
                elif rank1 == best_rank1 and mAP > best_map:
                    print(f"‚≠ê Rank-1 Invariato ({rank1:.1%}), ma mAP MIGLIORATO: {mAP:.1%} > {best_map:.1%}")
                    best_map = mAP
                    is_best = True

                if is_best:
                    patience_counter = 0
                else:
                    patience_counter += 1
                    print(f"‚è≥ Nessun miglioramento. Patience: {patience_counter}/{patience}. Best R1: {best_rank1:.1%} (mAP {best_map:.1%})")

                self.save_model(self.epoch, rank1, mAP, save_dir, is_best=is_best)

                if patience_counter >= patience:
                    print(f"üõë EARLY STOPPING attivato all'epoca {self.epoch + 1}")
                    break
            else:
                self.save_model(self.epoch, 0, 0, save_dir, is_best=False)

        # Chiusura
        elapsed = round(time.time() - time_start)
        elapsed = str(datetime.timedelta(seconds=elapsed))
        print('Elapsed {}'.format(elapsed))
        if self.writer is not None:
            self.writer.close()


class VisionTransformerBlock(nn.Module):
    def __init__(self, dim, heads=4, height=16, width=8, dropout=0.1, debug_freq=20):
        super().__init__()
        self.dim = dim
        self.heads = heads
        self.height = height
        self.width = width

        # --- AGGIUNTA PER DEBUG ---
        self.debug_freq = debug_freq  # Ogni quanti batch stampare
        self.batch_counter = 0        # Contatore interno
        # --------------------------

        # Proiezioni Q, K, V usando Linear (come il prof)
        # Nota: Linear richiede input (Batch, Seq_Len, Dim)
        self.query_projection = nn.Linear(dim, dim)
        self.key_projection = nn.Linear(dim, dim)
        self.value_projection = nn.Linear(dim, dim)
        self.output_projection = nn.Linear(dim, dim)

        # Positional Embedding IMPARABILE (stile ViT)
        # Invece di quello sinusoidale del prof (ottimo per testo),
        # per le immagini fisse conviene lasciare che la rete impari la posizione.
        self.num_patches = height * width
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches, dim))

        self.norm = nn.LayerNorm(dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        Input x: (Batch, Channel, Height, Width) -> Es. (B, 512, 16, 8)
        """

        # --- LOGICA DI DEBUG PERIODICA ---
        if self.training:
            self.batch_counter += 1
            # Stampa solo se il contatore √® multiplo della frequenza scelta
            if self.batch_counter % self.debug_freq == 0:
                print(f"üîç [ViT Heartbeat] Batch {self.batch_counter} | Input Shape: {x.shape} (Atteso H={self.height}, W={self.width})")
        # ---------------------------------

        B, C, H, W = x.shape

        # 1. FLATTEN: Trasformiamo l'immagine 2D in una sequenza 1D
        # Da (B, C, H, W) -> (B, H*W, C)
        # .permute(0, 2, 3, 1) mette i canali alla fine
        # .flatten(1, 2) schiaccia H e W insieme
        x_flat = x.permute(0, 2, 3, 1).flatten(1, 2) # (B, SeqLen, Dim)

        # 2. Aggiunta Positional Embedding
        # Se le dimensioni cambiano dinamicamente, interpoliamo il pos embedding
        if x_flat.shape[1] != self.pos_embedding.shape[1]:
             # Gestione sicurezza ridimensionamento
             pos_emb = F.interpolate(
                 self.pos_embedding.permute(0, 2, 1).view(1, C, self.height, self.width),
                 size=(H, W), mode='bilinear').flatten(2).permute(0, 2, 1)
             x_flat = x_flat + pos_emb
        else:
             x_flat = x_flat + self.pos_embedding

        # 3. Layer Norm prima dell'attention (Pre-Norm architecture, pi√π stabile)
        residual = x_flat
        x_norm = self.norm(x_flat)

        # 4. Proiezioni Lineari
        Q = self.query_projection(x_norm)
        K = self.key_projection(x_norm)
        V = self.value_projection(x_norm)

        # 5. Split Heads (come nel codice del prof, ma ottimizzato con view)
        # (B, SeqLen, Heads, HeadDim) -> (B, Heads, SeqLen, HeadDim)
        Q = Q.view(B, -1, self.heads, C // self.heads).transpose(1, 2)
        K = K.view(B, -1, self.heads, C // self.heads).transpose(1, 2)
        V = V.view(B, -1, self.heads, C // self.heads).transpose(1, 2)

        # 6. SCALED DOT PRODUCT ATTENTION (Il cuore ottimizzato)
        # is_causal=False perch√© nell'immagine guardiamo tutto il contesto
        out = F.scaled_dot_product_attention(Q, K, V, dropout_p=0.1 if self.training else 0.0, is_causal=False)

        # 7. Join Heads
        out = out.transpose(1, 2).contiguous().view(B, -1, C)

        # 8. Output Projection + Residual
        out = self.output_projection(out)
        out = out + residual

        # 9. UNFLATTEN: Torniamo a immagine 2D
        # Da (B, SeqLen, C) -> (B, C, H, W)
        out = out.permute(0, 2, 1).view(B, C, H, W)

        return out

# --- 1. Moduli Base (CBAM, GeM) restano uguali ---
class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=16):
        super(ChannelAttention, self).__init__()

        # 1. Average Pooling
        # Riduce (H, W) a (1, 1) facendo la media.
        self.avg_pool = nn.AdaptiveAvgPool2d(1)

        # 2. Max Pooling
        # Riduce (H, W) a (1, 1) prendendo il valore massimo.
        # Serve a preservare le feature di texture pi√π forti.
        self.max_pool = nn.AdaptiveMaxPool2d(1)

        # 3. COMPRESSIONE (Shared MLP)
        # Questo √® il primo strato del "bottleneck" che riduce i canali.
        # Usa Conv2d con kernel 1x1 che equivale a un Fully Connected layer.
        self.fc1 = nn.Conv2d(in_planes, in_planes // ratio, 1, bias=False)
        self.relu1 = nn.ReLU()

        # 4. RIESPANSIONE (Shared MLP)
        # Ripristina il numero originale di canali.
        self.fc2 = nn.Conv2d(in_planes // ratio, in_planes, 1, bias=False)

        # 5. GATING (Sigmoide)
        # Prepara l'attivazione tra 0 e 1 per pesare i canali.
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):

        # Le stesse fc1 e fc2 vengono usate SIA per il vettore delle medie SIA per quello dei massimi.

        # Percorso A: Quello che descrivevi tu (Avg -> MLP)

        avg_out = self.fc2(self.relu1(self.fc1(self.avg_pool(x))))

        # Percorso B: L'aggiunta del CBAM (Max -> MLP)
        max_out = self.fc2(self.relu1(self.fc1(self.max_pool(x))))

        # a questo punto il tensore in ingresso √® stato trasformato in due vettori monodimensionali, lunghi la dimensione dei canali.

        # FUSIONE
        # I due vettori vengono sommati element-wise.
        out = avg_out + max_out

        # ATTENZIONE:
        # Questo blocco restituisce SOLO i pesi (la maschera di attenzione),
        # NON esegue la moltiplicazione finale per l'input 'x'.

        return self.sigmoid(out) #restituisce vettore con valori di attention per ogni feature map del tensore x.

class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()
        # Per mantenere le dimensioni H x W inalterate dopo una conv 7x7,
        # serve un padding di 3. (formula: p = (k-1)/2)
        padding = 3 if kernel_size == 7 else 1

        # Questa convoluzione ridurr√† i 2 canali di input (Max+Avg) a 1 canale di output.
        self.conv1 = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # x √® il tuo tensore di input [Batch, Canali, Altezza, Larghezza]

        # 1. COMPRESSIONE DEI CANALI (Average Pooling)
        # Invece di fare la media spaziale (come nel blocco Channel), qui facciamo la media SUI CANALI.
        # "In media, quanto √® attiva questa posizione (h,w) su tutti i filtri?"
        # Output: [B, 1, H, W]
        avg_out = torch.mean(x, dim=1, keepdim=True)

        # 2. COMPRESSIONE DEI CANALI (Max Pooling)
        # "Qual √® la feature pi√π forte in assoluto in questa posizione?"
        # Questo √® utilissimo per trovare i bordi o dettagli unici del giocatore.
        # Output: [B, 1, H, W]
        max_out, _ = torch.max(x, dim=1, keepdim=True)

        # 3. CONCATENAZIONE
        # Uniamo le due mappe. Ora abbiamo una rappresentazione "grezza" di dove si trovano le informazioni.
        # Output: [B, 2, H, W]
        x = torch.cat([avg_out, max_out], dim=1)

        # 4. CONVOLUZIONE SPAZIALE
        # Qui sta l'intelligenza locale. Un kernel grande (7x7) scorre sull'immagine.
        # Impara a capire che se c'√® un picco di attivazione in un punto,
        # probabilmente √® importante anche l'area subito vicina.
        # Trasforma i 2 canali in 1 solo canale di "importanza spaziale".
        # Output: [B, 1, H, W]
        x = self.conv1(x)

        # 5. GENERAZIONE DELLA MASCHERA di dimensione [B, 1, H, W]
        # Schiaccia i valori tra 0 e 1.
        # 1 = "Questa zona √® importante (il giocatore)"
        # 0 = "Questa zona √® rumore (il campo)"
        return self.sigmoid(x)

class CBAM(nn.Module):
    def __init__(self, in_planes, ratio=16, kernel_size=7):
        super(CBAM, self).__init__()
        self.ca = ChannelAttention(in_planes, ratio)
        self.sa = SpatialAttention(kernel_size)

    def forward(self, x):
        # 1. CHANNEL ATTENTION
        out = x * self.ca(x) # Moltiplica l'input per i pesi dei canali

        # 2. SPATIAL ATTENTION (applicata sull'output del channel)
        result = out * self.sa(out) # Moltiplica il risultato raffinato per la maschera spaziale

        return result

class GeM(nn.Module):
    def __init__(self, p=3.0, eps=1e-6):
        super(GeM, self).__init__()

        # 1. PARAMETRO P ADDESTRABILE
        # Qui sta la magia. Non definiamo p come una costante (self.p = p),
        # ma come un nn.Parameter.
        # Questo dice a PyTorch: "Durante la backpropagation, aggiorna anche questo valore
        # per minimizzare la loss".
        # Inizializziamo a 3.0 perch√© √® un buon punto di partenza empirico per il ReID.
        self.p = nn.Parameter(torch.ones(1) * p)

        # 2. EPSILON
        # Un valore piccolissimo per evitare divisioni per zero o radici di numeri instabili.
        self.eps = eps

    def forward(self, x):
        # x: Tensore di input [Batch, Channels, Height, Width]

        # 3. CLAMPING (Sicurezza Numerica)
        # x.clamp(min=self.eps) forza tutti i valori nel tensore a essere almeno 1e-6.
        # Evita NaN (Not a Number) durante il training.

        # 4. ELEVAMENTO A POTENZA
        # .pow(self.p) eleva ogni singolo pixel alla potenza p (che la rete sta imparando).
        # Se p > 1, questo enfatizza i valori alti (i dettagli salienti del calciatore)
        # e schiaccia verso zero i valori bassi (lo sfondo).

        # 5. GLOBAL AVERAGE POOLING
        # F.avg_pool2d(...) calcola la media spaziale.
        # Il kernel size √® (x.size(-2), x.size(-1)), ovvero (Height, Width).
        # Questo significa: "Prendi tutta la feature map HxW e fanne una media unica".
        # Output parziale: [Batch, Channels, 1, 1]

        # 6. RADICE P-ESIMA (Inverse Power)
        # .pow(1. / self.p) applica la radice p-esima.
        # Serve a riportare i valori alla scala originale (dopo averli elevati alla p prima della media).
        return F.avg_pool2d(x.clamp(min=self.eps).pow(self.p), (x.size(-2), x.size(-1))).pow(1. / self.p)


# --- 2. Modello HPM con Output Multipli ---
class SoccerNetHybridModel(nn.Module):
    def __init__(self, num_classes, model_name='osnet_x1_0', loss='triplet', best_weights_path=None):
        super(SoccerNetHybridModel, self).__init__()
        self.loss = loss

        print(f"üèóÔ∏è HYBRID Model 2.0: Backbone={model_name} + CBAM + VisionTransformer + GeM")

        # Caricamento del backbone OSNet ed eliminazione del classificatore.
        # 1. Costruisci Backbone VUOTO (pretrained=False perch√© carichiamo i nostri)
        base_model = models.build_model(name=model_name, num_classes=num_classes, pretrained=False, loss='triplet')

        # --- CARICAMENTO PESI CUSTOM (WARM-UP) ---
        if best_weights_path and os.path.exists(best_weights_path):
            print(f"‚ôªÔ∏è Caricamento pesi Backbone da: {best_weights_path}")
            checkpoint = torch.load(best_weights_path, weights_only=False)
            state_dict = checkpoint['state_dict']

            # Pulizia delle chiavi (Rimuove 'module.' e layer non backbone)
            new_state_dict = {}
            for k, v in state_dict.items():
                k = k.replace("module.", "") # Gestione DataParallel
                # Carichiamo solo la parte feature extractor, ignoriamo classifier/fc vecchi
                if not k.startswith("classifier") and not k.startswith("fc"):
                    new_state_dict[k] = v

            # Carica con strict=False per ignorare le chiavi mancanti (classifier)
            base_model.load_state_dict(new_state_dict, strict=False)
            print("‚úÖ Pesi Backbone caricati con successo!")
        else:
            print("‚ö†Ô∏è Nessun peso custom fornito o file non trovato. Uso inizializzazione casuale.")
        # -----------------------------------------


        self.backbone = base_model
        if hasattr(self.backbone, 'classifier'): del self.backbone.classifier
        if hasattr(self.backbone, 'fc'): del self.backbone.fc

        # caricamento dei moduli CBAM e GeM
        self.in_channels = 512
        self.cbam = CBAM(self.in_channels)

        # TRANSFORMER BLOCK (Attention Globale - Relazioni a lungo raggio)
        # OSNet riduce le dimensioni di 16x.
        # Se input standard ReID (256x128) -> Feature map (16x8)
        self.trans = VisionTransformerBlock(self.in_channels, heads=4, height=16, width=8)

        self.gem = GeM()

        # 3. Head Singola
        self.bn = nn.BatchNorm1d(self.in_channels)
        self.bn.bias.requires_grad_(False)
        self.bn.apply(self._weights_init_kaiming)

        self.classifier = nn.Linear(self.in_channels, num_classes, bias=False)
        self.classifier.apply(self._weights_init_classifier)

    def forward(self, x):
        features = self.backbone(x, return_featuremaps=True)
        # Output atteso: (B, 512, H, W)

        features = self.cbam(features)
        # CBAM preserva le dimensioni

        # 3. TRANSFORMER (Collega le parti del corpo)
        features = self.trans(features)

        # Pooling (da H,W a 1,1) -> Flatten
        global_features = self.gem(features).view(features.size(0), -1)

        # Normalization
        feat_norm = self.bn(global_features)

        if self.training:
            logits = self.classifier(feat_norm)
            # Ritorna: (logits, features) per le due loss
            if self.loss == 'triplet':
                return logits, feat_norm
            return logits
        else:
            # In inferenza: solo il vettore normalizzato
            return feat_norm

    def _weights_init_kaiming(self, m):
        if isinstance(m, nn.BatchNorm1d):
            nn.init.constant_(m.weight, 1.0); nn.init.constant_(m.bias, 0.0)
    def _weights_init_classifier(self, m):
        if isinstance(m, nn.Linear):
            nn.init.normal_(m.weight, std=0.001);
            if m.bias is not None: nn.init.constant_(m.bias, 0.0)


# Configurazione
model_name = 'osnet_x1_0'
path_to_best_osnet = "./runs/OSNet_1/model/model_best.pth.tar"
output_dir = './runs/HybridOSNet-CNN_Transformer'

print("üöÄ Inizializzazione Deep Supervision Training...")

# --- CONFIGURAZIONE RIPRESA ADDESTRAMENTO ---
RESUME = True  # Imposta a False per partire da zero
model_path = os.path.join(output_dir, 'model/model_last.pth.tar')

# 1. Istanzia Modello
model = SoccerNetHybridModel(
    num_classes=datamanager.num_train_pids,
    model_name=model_name,
    loss='triplet',
    best_weights_path=path_to_best_osnet
).cuda()

# 2. Optimizer & Scheduler
optimizer = torchreid.optim.build_optimizer(model, optim='adam', lr=0.0003)
scheduler = torchreid.optim.build_lr_scheduler(optimizer, lr_scheduler='multi_step', stepsize=[25, 45])

# --- LOGICA DI CARICAMENTO CHECKPOINT (RESUME) ---
start_epoch = 0
if RESUME and os.path.exists(model_path):
    print(f"üîÑ Ripresa addestramento dal checkpoint: {model_path}")
    checkpoint = torch.load(model_path, map_location='cuda' if torch.cuda.is_available() else 'cpu', weights_only=False)

    # Carica i pesi del modello
    model.load_state_dict(checkpoint['state_dict'])

    # Carica lo stato dell'ottimizzatore e dello scheduler
    optimizer.load_state_dict(checkpoint['optimizer'])
    if scheduler and 'scheduler' in checkpoint and checkpoint['scheduler'] is not None:
        scheduler.load_state_dict(checkpoint['scheduler'])

    # Recupera l'epoca da cui ripartire
    start_epoch = checkpoint['epoch']
    print(f"‚úÖ Checkpoint caricato. Riprendo dall'epoca {start_epoch}")
elif RESUME:
    print("‚ö†Ô∏è Checkpoint non trovato, il training partir√† da zero.")

# 3. Istanzia NUOVO Engine
engine = EarlyStoppingTripletEngine(
    datamanager,
    model,
    optimizer,
    margin=0.3,
    weight_t=1.0,
    weight_x=1.0,
    scheduler=scheduler
)

my_open_layers = ['cbam', 'trans', 'gem', 'bn', 'classifier']

# 4. Run
engine.run(
    save_dir=output_dir,
    max_epoch=100,
    start_epoch=start_epoch,
    start_eval=25,
    eval_freq=1,
    patience=15,
    fixbase_epoch=20 if start_epoch == 0 else 0,
    open_layers=my_open_layers
)

Validazione

In [None]:
import torch
import os
import torchreid

# --- CONFIGURAZIONE ---
# Scegli quale checkpoint testare: 'model_best.pth.tar' (il record) o 'model_last.pth.tar' (l'ultimo salvato)
checkpoint_type = 'model_best.pth.tar'
# checkpoint_type = 'model_last.pth.tar'

output_dir = './runs/HybridOSNet-AIN'
model_path = os.path.join(output_dir, 'model', checkpoint_type)
model_name = 'osnet_ain_x1_0'

print(f"üîç Avvio Validazione Manuale su: {checkpoint_type}")

# 1. Istanziazione del Modello (Deve essere identica al Training per matchare i pesi)
model = SoccerNetDeepSupervisionModel(
    num_classes=datamanager.num_train_pids,
    model_name=model_name,
    num_stripes=4,
    loss='triplet'
).cuda()

# 2. Imposta il modello in modalit√† Valutazione
# (Disabilita Dropout e BatchNormalization in training mode)
model.eval()

# 3. Caricamento dei Pesi
if os.path.exists(model_path):
    print(f"üì• Caricamento pesi da: {model_path}")
    # Nota: weights_only=False per evitare l'errore di sicurezza su file fidati
    checkpoint = torch.load(model_path, map_location='cuda' if torch.cuda.is_available() else 'cpu', weights_only=False)

    # Carica lo stato
    try:
        model.load_state_dict(checkpoint['state_dict'])
        print("‚úÖ Pesi caricati con successo.")
    except RuntimeError as e:
        print(f"‚ùå Errore nel caricamento dei pesi (Mismatch architettura?): {e}")
else:
    raise FileNotFoundError(f"Il file {model_path} non esiste!")

# 4. Creazione dell'Optimizer Dummy
# L'engine richiede un optimizer per essere inizializzato, anche se in test non serve.
optimizer = torchreid.optim.build_optimizer(model, optim='adam', lr=0.0003)

# 5. Istanziazione dell'Engine
engine = MultiLossEngine(
    datamanager,
    model,
    optimizer,
    margin=0.3,
    weight_t=1.0,
    weight_x=1.0
)

# 6. Esecuzione del Test
print("üöÄ Inizio calcolo metriche (CMC & mAP)...")
engine.run(
    save_dir=output_dir,
    test_only=True,  # <--- Questo dice all'engine di saltare il training e fare solo test
    dist_metric='euclidean',
    normalize_feature=True, # Importante per la distanza euclidea
    visrank=False,          # Metti True se vuoi vedere le immagini dei risultati
    visrank_topk=10
)

# Inizio Pipeline


dipendenze

In [None]:
# Installa la versione corretta e completa da GitHub
!pip install git+https://github.com/KaiyangZhou/deep-person-reid.git

# Installa le dipendenze accessorie
! pip install gdown

! pip install ultralytics


import os
import sys
import shutil
import importlib
import re

# ==============================================================================
# 1. PULIZIA E PREPARAZIONE (Reset dell'ambiente)
# ==============================================================================
REPO_PATH = '/content/yolo_tracking'

print("üßπ 1. Pulizia vecchie installazioni...")
# Se la cartella esiste, la cancelliamo per scaricarla pulita (evita errori di file mancanti)
if os.path.exists(REPO_PATH):
    shutil.rmtree(REPO_PATH)

# ==============================================================================
# 2. CLONAZIONE E INSTALLAZIONE
# ==============================================================================
print("üì• 2. Clonazione repository fresco...")
!git clone https://github.com/mikel-brostrom/yolo_tracking.git {REPO_PATH}

print("üì¶ 3. Installazione dipendenze (requirements.txt)...")
# Questo installa lap, cython-bbox, etc.
!pip install -q -r {REPO_PATH}/requirements.txt

print("üîó 4. Installazione di boxmot in modalit√† editabile...")
# Questo collega la cartella a Python in modo permanente
!pip install -q -e {REPO_PATH}

# ==============================================================================
# 3. IL TUO SCRIPT DI IMPORTAZIONE (CORRETTO)
# ==============================================================================
print("\nüîç 5. Avvio ricerca classe e importazione...")

# Percorso del file target
target_file = os.path.join(REPO_PATH, 'boxmot', 'trackers', 'botsort', 'botsort.py')

if os.path.exists(target_file):
    print(f"üìÇ File trovato: {target_file}")

    # Cerchiamo il nome della classe con una Regex
    with open(target_file, 'r') as f:
        content = f.read()
        matches = re.findall(r'^class\s+(\w+)', content, re.MULTILINE)

    if matches:
        class_name = matches[0]
        print(f"‚úÖ Classe identificata nel codice: {class_name}")

        # Aggiungiamo il path per sicurezza
        if REPO_PATH not in sys.path:
            sys.path.append(REPO_PATH)

        try:
            # Importazione Dinamica
            module_path = "boxmot.trackers.botsort.botsort"
            module = importlib.import_module(module_path)

            # Recuperiamo la classe dal modulo
            BoTSORT_Class = getattr(module, class_name)

            # Creiamo l'alias BoTSORT (come ti serve per il tuo codice)
            BoTSORT = BoTSORT_Class

            print("\n" + "="*50)
            print(f"üöÄ {class_name} importata con successo!")
            print(f"üìå Alias creato: 'BoTSORT' √® pronto all'uso.")
            print("="*50)

        except Exception as e:
            print(f"‚ùå Errore durante l'importazione del modulo: {e}")
    else:
        print("‚ö†Ô∏è Nessuna classe trovata nel file.")
else:
    print(f"‚ùå Errore critico: Il file {target_file} non esiste nemmeno dopo la clonazione.")

Processing delle detections

In [None]:
# ==============================================================================
# PARTE B: FUNZIONI DI PULIZIA (Detection Cleaning)
# ==============================================================================
import cv2
import numpy as np
import json
import os
from tqdm import tqdm
import matplotlib.pyplot as plt
import os
import sys
import cv2
import torch
import torchreid
import numpy as np
import importlib
import re
import glob
from ultralytics import YOLO
from torchvision import transforms
from PIL import Image
from tqdm import tqdm
from types import SimpleNamespace
import sys
import torch.nn as nn
import torch.nn.functional as F
from torchreid import models, metrics
from torchreid.losses import TripletLoss, CrossEntropyLoss

import os
import time
import datetime
import shutil
from sklearn.linear_model import RANSACRegressor

import shutil # Import necessario per copiare il file

# --- 2. LOGICA RANSAC (Statistica sui Punti) ---

import numpy as np
import cv2

def boost_peaks_pixel_level(mask, roi_size=150, dilate_iter=25, peak_sensitivity=20):
    """
    Scansiona il contorno superiore della maschera pixel per pixel alla ricerca di "punte"
    (minimi locali di Y) e le espande.

    Args:
        peak_sensitivity: Quanti pixel a dx e sx devono essere 'pi√π bassi'
                          per considerare il punto attuale una vera punta.
    """
    h, w = mask.shape
    mask_out = mask.copy()

    # 1. Trova il contorno principale
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) # NONE = tutti i pixel
    if not contours: return mask

    cnt = max(contours, key=cv2.contourArea)
    points = cnt.reshape(-1, 2) # Array di punti (x, y)

    # 2. Identificazione dei "Picchi" (Punti visivamente alti = Y Bassa)
    # Un punto √® un picco se ha una Y minore (√® pi√π in alto) dei suoi vicini a +/- k

    peaks = []
    num_points = len(points)

    # Per evitare rumore, usiamo un passo di controllo (step)
    # Non controlliamo ogni singolo pixel vicino, ma a distanza di 'sensitivity'
    step = peak_sensitivity

    for i in range(0, num_points, 5): # Saltiamo di 5 in 5 per velocit√†
        pt_curr = points[i]
        x_curr, y_curr = pt_curr

        # Indici dei vicini (gestendo il wrap-around dell'array circolare)
        idx_prev = (i - step) % num_points
        idx_next = (i + step) % num_points

        pt_prev = points[idx_prev]
        pt_next = points[idx_next]

        # Logica: Sono un picco ALTO se la mia Y √® MINORE dei vicini
        # (Ricorda: Y=0 √® il bordo alto dell'immagine)
        is_higher_than_prev = y_curr < (pt_prev[1] - 5) # 5px di tolleranza rumore
        is_higher_than_next = y_curr < (pt_next[1] - 5)

        # Filtro extra: Il picco deve essere nella met√† superiore dell'immagine
        # per evitare di prendere i piedi del cameraman come "picchi"
        is_top_half = y_curr < (h * 0.75)

        if is_higher_than_prev and is_higher_than_next and is_top_half:
            # Abbiamo trovato una potenziale punta!
            peaks.append(pt_curr)

    # 3. Consolidamento (Clusterizzazione)
    # Spesso un angolo genera 10-20 punti "picco" vicini. Ne teniamo uno per gruppo.
    valid_corners = []
    if len(peaks) > 0:
        peaks = np.array(peaks)
        # Semplice logica: se due picchi distano meno di 50px, sono lo stesso angolo
        # Prendiamo solo il primo di ogni cluster
        valid_corners.append(peaks[0])
        for i in range(1, len(peaks)):
            # Distanza euclidea dall'ultimo aggiunto
            dist = np.linalg.norm(peaks[i] - valid_corners[-1])
            if dist > 100: # Se dista pi√π di 100px √® un NUOVO angolo
                valid_corners.append(peaks[i])

    # 4. Applicazione Dilatazione sui Vertici Trovati
    for pt in valid_corners:
        cx, cy = pt

        # Definiamo ROI
        x1 = max(0, cx - roi_size)
        y1 = max(0, cy - roi_size)
        x2 = min(w, cx + roi_size)
        y2 = min(h, cy + roi_size)

        roi = mask[y1:y2, x1:x2]

        if roi.size == 0: continue

        # Kernel Ellittico (per fare un 'bozzo' rotondo e naturale)
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (9, 9))

        # Dilatazione
        roi_dilated = cv2.dilate(roi, kernel, iterations=dilate_iter)

        # Merge
        mask_out[y1:y2, x1:x2] = cv2.bitwise_or(mask_out[y1:y2, x1:x2], roi_dilated)

        # [DEBUG VISIVO] Disegna un cerchio rosso nella maschera debug (opzionale)
        # cv2.circle(mask_out, (cx, cy), 10, 0, -1) # Buco nero per vederlo

    return mask_out

def fill_field_holes(mask, max_hole_area=10000):
    """
    Riempe i buchi neri all'interno della maschera bianca se sono pi√π piccoli di max_hole_area.
    Serve per coprire arbitri, giocatori o teste in primo piano che bucano la maschera del campo.
    """
    # 1. Copia per non modificare l'originale per riferimento
    mask_filled = mask.copy()

    # 2. Invertiamo la maschera:
    # Ora il CAMPO √® NERO (0) e lo SFONDO/BUCHI sono BIANCHI (255)
    inverted_mask = cv2.bitwise_not(mask_filled)

    # 3. Troviamo le componenti connesse dell'immagine invertita
    # num_labels: quanti oggetti distinti ci sono
    # stats: contiene le info su area, bounding box, ecc.
    num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(inverted_mask, connectivity=8)

    # 4. Identifichiamo lo "sfondo vero" (le tribune, l'esterno del campo)
    # √à quasi sempre la componente con l'AREA MAGGIORE (escluso lo sfondo 0 che ora √® il campo).
    # Se num_labels <= 1 significa che √® tutto campo o tutto sfondo, usciamo.
    if num_labels <= 1:
        return mask

    # Cerchiamo l'indice della label con area massima, escludendo la label 0 (il campo nero)
    # stats[1:, cv2.CC_STAT_AREA] prende tutte le aree tranne la prima (background)
    # np.argmax(...) ci d√† l'indice relativo, aggiungiamo +1 per tornare all'indice assoluto delle label
    largest_label_idx = 1 + np.argmax(stats[1:, cv2.CC_STAT_AREA])

    # 5. Iteriamo su tutte le componenti trovate
    for i in range(1, num_labels):
        # Se questa componente √® lo "sfondo vero" (es. le tribune), NON la riempiamo.
        if i == largest_label_idx:
            continue

        area = stats[i, cv2.CC_STAT_AREA]

        # Se l'area √® inferiore alla soglia, √® un "buco spurio" (persona, ostacolo).
        # Lo coloriamo di BIANCO (255) nella maschera ORIGINALE.
        # Nota: usiamo labels == i per trovare i pixel di quel buco.
        if area < max_hole_area:
            mask_filled[labels == i] = 255

    return mask_filled

def clean_side_with_ransac(points, image_shape, side_name):
    """
    Prende un array di punti (x, y), trova la linea dominante ignorando le sporgenze,
    e restituisce i due punti estremi della linea pulita.
    """
    if len(points) < 10: return None # Troppi pochi punti

    X = points[:, 0].reshape(-1, 1) # Coordinate x
    y = points[:, 1]                # Coordinate y

    # RANSAC: Cerca la linea che fitta meglio la maggioranza dei punti
    # residual_threshold=10: Se un punto dista pi√π di 10px dalla retta, √® OUTLIER (cartellone)
    ransac = RANSACRegressor(residual_threshold=10.0, random_state=42)

    try:
        ransac.fit(X, y)
    except:
        return None # Fallimento nel fit

    # Recuperiamo gli inlier (i punti che formano la linea "buona")
    inlier_mask = ransac.inlier_mask_

    # Se la linea √® quasi verticale (es. lati laterali), RANSAC su y=mx+q fatica.
    # Controllo di sicurezza: se il coefficiente angolare √® folle, gestiamo diversamente.
    # Ma per il lato ALTO (orizzontale), questo √® perfetto.

    # Calcoliamo i punti estremi della linea predetta estendendola a tutto il frame
    line_X = np.array([[0], [image_shape[1]]]) # Da x=0 a x=width
    line_y_pred = ransac.predict(line_X)

    p1 = (int(line_X[0][0]), int(line_y_pred[0]))
    p2 = (int(line_X[1][0]), int(line_y_pred[1]))

    return p1, p2

def get_field_mask_base(image_bgr):
    hsv = cv2.cvtColor(image_bgr, cv2.COLOR_BGR2HSV)
    mask = cv2.inRange(hsv, (35, 40, 40), (85, 255, 255))
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, cv2.getStructuringElement(cv2.MORPH_RECT, (15, 15)))
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, cv2.getStructuringElement(cv2.MORPH_RECT, (3, 3)))
    num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(mask, connectivity=8)
    if num_labels > 1:
        largest_label = 1 + np.argmax(stats[1:, cv2.CC_STAT_AREA])
        mask = (labels == largest_label).astype(np.uint8) * 255
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    cv2.drawContours(mask, contours, -1, 255, thickness=cv2.FILLED)
    return mask


def get_field_mask_ransac(image_bgr):
    # A. Ottieni Maschera Grezza e Contorno
    mask = get_field_mask_base(image_bgr)

    # --- NUOVO STEP: RIEMPIMENTO BUCHI ---
    # Una soglia di 5000-10000 px √® sicura per risoluzioni HD (1920x1080).
    # Un giocatore intero occupa meno pixel di uno stadio.
    mask = fill_field_holes(mask, max_hole_area=8000)
    # -------------------------------------

    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_NONE) # NONE per avere tutti i punti
    if not contours: return mask

    cnt = max(contours, key=cv2.contourArea)
    points = cnt.reshape(-1, 2) #(N, 2)

    # B. Dividi i punti in 4 lati (Nord, Sud, Est, Ovest)
    # Usiamo il centroide per determinare la direzione
    M = cv2.moments(cnt)
    if M["m00"] == 0: return mask
    cX = int(M["m10"] / M["m00"])
    cY = int(M["m01"] / M["m00"])

    top_points = []
    bottom_points = []
    left_points = []
    right_points = []

    h, w = mask.shape

    for pt in points:
        x, y = pt
        # Logica semplice basata sulle diagonali del rettangolo immagine
        # Dividiamo l'immagine in una "X" centrata nel baricentro
        # y < cY e distanza verticale > orizzontale -> TOP

        dx = x - cX
        dy = y - cY

        if abs(dx) > abs(dy): # Siamo sui lati Destro/Sinistro
            if dx > 0: right_points.append(pt)
            else: left_points.append(pt)
        else: # Siamo sui lati Alto/Basso
            if dy < 0: top_points.append(pt) # Y cresce verso il basso, quindi dy < 0 √® sopra
            else: bottom_points.append(pt)

    # C. Applichiamo RANSAC solo al lato "Top" (quello dei cartelloni)
    # Nota: Puoi abilitarlo anche per gli altri se vuoi
    top_points = np.array(top_points)

    final_mask = mask.copy()

    # Disegniamo una maschera di taglio nera sopra le irregolarit√†
    if len(top_points) > 50:
        line_pts = clean_side_with_ransac(top_points, mask.shape, "TOP")

        if line_pts:
            p1, p2 = line_pts

            # Creiamo un poligono che copre tutto ci√≤ che sta SOPRA la linea trovata
            # Punti: (0,0) -> (W,0) -> P2 -> P1
            cut_poly = np.array([
                [0, 0],
                [w, 0],
                [p2[0], p2[1]],
                [p1[0], p1[1]]
            ])
            cv2.fillPoly(final_mask, [cut_poly], 0) # Riempi di Nero (Taglia via)

    # Opzionale: Fallo anche per il lato BASSO per tagliare fotografi
    bottom_points = np.array(bottom_points)
    if len(bottom_points) > 50:
        line_pts_b = clean_side_with_ransac(bottom_points, mask.shape, "BOTTOM")
        if line_pts_b:
            p1, p2 = line_pts_b
            # Poligono sotto la linea: P1 -> P2 -> (W, H) -> (0, H)
            cut_poly_b = np.array([
                [p1[0], p1[1]],
                [p2[0], p2[1]],
                [w, h],
                [0, h]
            ])
            cv2.fillPoly(final_mask, [cut_poly_b], 0)

    final_mask = boost_peaks_pixel_level(final_mask, roi_size=100, dilate_iter=5, peak_sensitivity=200)
    final_mask = cv2.dilate(final_mask, None, iterations=2)

    return final_mask


# --- FUNZIONE DI VERIFICA (Semplice e Veloce) ---
def is_feet_in_field(bbox_xywh, field_mask, vertical_margin=5, threshold=0.98):
    """
    Valuta se i piedi sono nel campo calcolando la DENSIT√Ä di pixel bianchi
    in una striscia a cavallo del bordo inferiore del Bounding Box.

    Args:
        vertical_margin: Quanti pixel guardare sopra e sotto (es. 5 -> totale 10px altezza)
        threshold: Percentuale minima di bianco richiesta (0.4 = 40%)
    """
    x, y, w, h = map(int, bbox_xywh)
    h_img, w_img = field_mask.shape

    # --- 1. BORDER SAFETY (Giocatori mezzo busto) ---
    # Se il box tocca il fondo dell'immagine, lo salviamo a prescindere dalla maschera.
    # Usiamo un margine di tolleranza (es. 10 pixel dal fondo).
    bottom_margin = 10
    if (y + h) >= (h_img - bottom_margin):
        # √à attaccato al fondo: assumiamo sia valido (es. giocatore vicino camera)
        return True

    # 1. Definizione ROI (Region of Interest)
    # Non guardiamo tutta la larghezza, ma il 50% centrale per evitare il background ai lati
    roi_w_start = int(x + w * 0.25)
    roi_w_end = int(x + w * 0.75)

    # Altezza: dal tallone - margin al tallone + margin
    y_bottom = y + h
    roi_h_start = max(0, y_bottom - vertical_margin)
    roi_h_end = min(h_img, y_bottom + vertical_margin)

    # Safety check
    if roi_w_end <= roi_w_start or roi_h_end <= roi_h_start:
        return False

    # 2. Estrazione Patch dalla Maschera (Solo 0 o 255)
    mask_patch = field_mask[roi_h_start:roi_h_end, roi_w_start:roi_w_end]

    # 3. Calcolo Percentuale
    white_pixels = cv2.countNonZero(mask_patch)
    total_pixels = mask_patch.size # larghezza * altezza

    ratio = white_pixels / total_pixels

    # DEBUG (Opzionale: stampa per capire i valori)
    # print(f"Ratio: {ratio:.2f}")

    return ratio > threshold


def is_shadow_advanced(image_bgr, bbox_xywh):
    """
    Rilevamento Ombre basato su Inversione + Contrasto Hard.
    Replica l'effetto visivo: Campo Nero, Ombre Viola Luminose.
    """
    x, y, w, h = map(int, bbox_xywh)

    '''# Safety Checks
    h_img, w_img = image_bgr.shape[:2]
    if w <= 0 or h <= 0: return True
    # Margine di sicurezza bordi
    margin = 8
    if (x <= margin) or (x + w >= w_img - margin) or (y + h >= h_img - margin):
        return False'''

    # 1. CROP
    crop = image_bgr[y:y+h, x:x+w]
    if crop.size == 0: return True

    # 2. INVERSIONE COLORI
    # Il verde scuro (ombra) diventa Magenta Luminoso.
    inverted = cv2.bitwise_not(crop)

    # 3. CONTRASTO E LUMINOSIT√Ä (La tua richiesta specifica)
    # alpha = Contrasto (es. 2.0 raddoppia le differenze)
    # beta = Luminosit√† (es. -100 spegne tutto ci√≤ che non √® super luminoso)
    alpha = 2.0
    beta = -100

    # Formula: pixel_new = pixel_old * alpha + beta
    contrast_enhanced = cv2.convertScaleAbs(inverted, alpha=alpha, beta=beta)

    # 4. ANALISI DEL COLORE RISULTANTE (Viola vs Bianco)
    hsv = cv2.cvtColor(contrast_enhanced, cv2.COLOR_BGR2HSV)

    # Ora cerchiamo il VIOLA/ROSA che sopravvive al buio.
    # Hue del Magenta √® intorno a 150 (range 130-170)
    # Saturation deve essere alta (altrimenti √® il bianco della maglia nera invertita)
    # Value deve essere alto (deve essere luminoso, altrimenti √® il campo spento)

    lower_violet = np.array([130, 60, 60])
    upper_violet = np.array([170, 255, 255])

    # Maschera dell'ombra
    shadow_mask = cv2.inRange(hsv, lower_violet, upper_violet)

    # Maschera del "Giocatore Nero" (che invertito √® diventato Bianco/Grigio Chiaro)
    # Bassa saturazione, Alta luminosit√†
    lower_white = np.array([0, 0, 150])
    upper_white = np.array([180, 50, 255])
    player_white_mask = cv2.inRange(hsv, lower_white, upper_white)

    # Maschera "Altri Colori" (Giocatori Colorati -> Colori complementari luminosi)
    # Tutto ci√≤ che √® luminoso (>60) ma NON √® viola e NON √® bianco
    bright_mask = cv2.inRange(hsv, np.array([0, 0, 60]), np.array([180, 255, 255]))
    # Sottraiamo viola e bianco
    other_features_mask = cv2.bitwise_and(bright_mask, bright_mask, mask=cv2.bitwise_not(shadow_mask))
    other_features_mask = cv2.bitwise_and(other_features_mask, other_features_mask, mask=cv2.bitwise_not(player_white_mask))

    # --- CONTEGGI ---
    total_pixels = crop.shape[0] * crop.shape[1]
    count_shadow = cv2.countNonZero(shadow_mask)
    count_player = cv2.countNonZero(player_white_mask) + cv2.countNonZero(other_features_mask)

    # --- DECISIONE ---

    # Se vediamo pixel di "Giocatore" (Bianco o Colore acceso non viola)
    # Basta poco, perch√© il giocatore √® solido.
    if count_player > total_pixels * 0.10:
        return False # Trovato struttura di giocatore

    # Se la maggior parte di ci√≤ che brilla √® Viola
    # Nota: col contrasto alto, il campo "vuoto" diventa nero (0,0,0), quindi non conta.
    # Contiamo solo ci√≤ che √® emerso dal buio.
    valid_pixels = count_shadow + count_player
    if valid_pixels == 0:
        return True # Se √® tutto nero, era erba piatta -> Ombra/Scarto

    ratio_shadow = count_shadow / float(valid_pixels)

    if ratio_shadow > 0.80:
        return True # √à quasi solo viola -> Ombra

    return False

def batch_shadow_filtering(detections_xywh, image_bgr, anchor_tolerance=8):
    """
    Pipeline completa per rimozione sovrapposizioni e ombre.

    FASE 1: Filtro intrinseco (is_shadow_advanced) per rimuovere ombre scure/senza texture.
    FASE 2: Filtro geometrico 'NMS sui piedi'. Ordina per AREA.
            Se due box condividono i piedi (angoli inferiori), MANTIENE IL PI√ô GRANDE
            e rimuove il pi√π piccolo (che sia un'ombra o un pezzo di corpo).
    """
    if not detections_xywh:
        return []

    h_img, w_img = image_bgr.shape[:2]

    # --- FASE 1: FILTRO INTRINSECO (Analisi Singola) ---
    survivors_phase_1 = []

    for i, bbox in enumerate(detections_xywh):
        # Chiamiamo la funzione singola (filtro texture/colore)
        is_bad = is_shadow_advanced(image_bgr, bbox)

        if not is_bad:
            x, y, w, h = map(int, bbox)
            x1, y1 = max(0, x), max(0, y)
            x2, y2 = min(w_img, x + w), min(h_img, y + h)
            area = w * h

            survivors_phase_1.append({
                'box': bbox,
                'coords': (x1, y1, x2, y2), # x1, y1, x2, y2
                'area': area,
                'original_idx': i
            })

    # --- FASE 2: FILTRO RELAZIONALE (Priorit√† al Grande) ---

    # 1. ORDINAMENTO DECRESCENTE PER AREA
    # Fondamentale: mettiamo i box pi√π grandi all'inizio della lista.
    survivors_phase_1.sort(key=lambda x: x['area'], reverse=True)

    indices_to_remove_phase_2 = set()
    num_survivors = len(survivors_phase_1)

    for i in range(num_survivors):
        # Se i √® gi√† stato rimosso (perch√© era pi√π piccolo di uno precedente), saltalo
        if i in indices_to_remove_phase_2:
            continue

        cand_A = survivors_phase_1[i] # Box "Dominante" (Il pi√π grande attuale)
        xA_1, yA_1, xA_2, yA_2 = cand_A['coords']

        # Confrontiamo solo con i successivi (j > i), che sono sicuramente PI√ô PICCOLI o uguali
        for j in range(i + 1, num_survivors):
            if j in indices_to_remove_phase_2:
                continue

            cand_B = survivors_phase_1[j] # Box "Piccolo" (Candidato alla rimozione)
            xB_1, yB_1, xB_2, yB_2 = cand_B['coords']

            # LOGICA GEOMETRICA SUGLI ANGOLI INFERIORI (PIEDI)

            # 1. Controllo asse Y (Altezza piedi simile)
            if abs(yA_2 - yB_2) <= anchor_tolerance:

                # 2. Controllo asse X (Angolo SX o Angolo DX coincidenti)
                dist_left = abs(xA_1 - xB_1)
                dist_right = abs(xA_2 - xB_2)

                if dist_left <= anchor_tolerance or dist_right <= anchor_tolerance:
                    # Abbiamo trovato un box B pi√π piccolo che "nasce" dagli stessi piedi di A.
                    # Poich√© A √® pi√π grande (garantito dal sort), A √® il "Corpo Intero".
                    # B √® un frammento (busto) o un'ombra inclusa.
                    # RIMUOVIAMO B.
                    indices_to_remove_phase_2.add(j)

    # --- COSTRUZIONE OUTPUT ---
    final_detections = []
    for i in range(num_survivors):
        if i not in indices_to_remove_phase_2:
            final_detections.append(survivors_phase_1[i]['box'])

    return final_detections


def calculate_iou(box1, box2):
    """
    Calcola IoU tra due box in formato (x, y, w, h)
    """
    x1, y1, w1, h1 = box1
    x2, y2, w2, h2 = box2

    # Coordinate angoli
    xA = max(x1, x2)
    yA = max(y1, y2)
    xB = min(x1 + w1, x2 + w2)
    yB = min(y1 + h1, y2 + h2)

    interWidth = max(0, xB - xA)
    interHeight = max(0, yB - yA)
    interArea = interWidth * interHeight

    box1Area = w1 * h1
    box2Area = w2 * h2

    unionArea = box1Area + box2Area - interArea
    if unionArea == 0: return 0

    return interArea / unionArea

def clean_blur_artifacts(detections, confidences,
                         iou_thresh=0.4,
                         vertical_tol=3,
                         conf_target_thresh=0.40):
    """
    Rimuove le rilevazioni duplicate causate dal motion blur orizzontale.

    Logica:
    1. Ordina per confidenza.
    2. Se due box hanno IoU > 0.4 AND altezze identiche (Ymin e Ymax uguali entro tolleranza).
    3. Elimina quello con confidenza minore SE la sua confidenza √® sotto la soglia di rischio (40%).

    Args:
        detections: lista di list/array [x, y, w, h]
        confidences: lista di float [conf] corrispondente
        iou_thresh: IoU minimo per considerare la sovrapposizione (default 0.4)
        vertical_tol: Tolleranza in pixel per l'allineamento verticale (default 3px)
        conf_target_thresh: Agisce solo se la vittima ha confidenza < 0.40
    """

    if len(detections) == 0:
        return [], []

    # Creiamo una lista strutturata per ordinare mantenendo gli indici originali
    # Format: {'bbox': [x,y,w,h], 'conf': 0.xyz, 'keep': True}
    dets = []
    for i, (bbox, conf) in enumerate(zip(detections, confidences)):
        dets.append({
            'bbox': bbox,
            'conf': conf,
            'keep': True,
            'y_min': bbox[1],
            'y_max': bbox[1] + bbox[3]
        })

    # Ordina per confidenza decrescente (Il pi√π sicuro decide chi eliminare)
    dets.sort(key=lambda x: x['conf'], reverse=True)

    for i in range(len(dets)):
        if not dets[i]['keep']:
            continue

        base = dets[i]

        for j in range(i + 1, len(dets)):
            if not dets[j]['keep']:
                continue

            cand = dets[j]

            # 1. CONTROLLO RAPIDO VERTICALE (Il cuore della tua idea)
            # Verifica allineamento Y_MIN e Y_MAX
            diff_ymin = abs(base['y_min'] - cand['y_min'])
            diff_ymax = abs(base['y_max'] - cand['y_max'])

            # Se non sono allineati verticalmente quasi al pixel, non √® blur orizzontale
            if diff_ymin > vertical_tol or diff_ymax > vertical_tol:
                continue

            # 2. CONTROLLO SICUREZZA
            # Eliminiamo solo se il candidato (che ha confidenza minore di base)
            # √® effettivamente "incerto" (< 0.40).
            # Se entrambi hanno 0.80 e 0.75, probabilmente sono due giocatori veri vicini.
            if cand['conf'] >= conf_target_thresh:
                continue

            # 3. CONTROLLO IoU
            # Se sono allineati verticalmente e si sovrappongono abbastanza
            iou = calculate_iou(base['bbox'], cand['bbox'])

            if iou > iou_thresh:
                # √à un ghost da blur!
                dets[j]['keep'] = False

    # Ricostruisce le liste finali
    final_dets = [d['bbox'] for d in dets if d['keep']]
    final_confs = [d['conf'] for d in dets if d['keep']]

    return final_dets, final_confs

# YOLO/RT-DETR + HybridOSNet

esegui pipeline

In [None]:
if './yolo_tracking' not in sys.path:
    sys.path.insert(0, './yolo_tracking')

try:
    from boxmot.trackers.botsort.botsort import BotSort
    BoTSORT = BotSort  # Creiamo l'alias necessario
    print("‚úÖ BoTSORT importato e pronto.")
except ImportError:
    print("‚ö†Ô∏è Tentativo importazione fallito. Riprovo con path assoluto...")
    # Tentativo disperato se il primo fallisce
    sys.path.append('/content/yolo_tracking/boxmot')
    from boxmot.trackers.botsort.botsort import BotSort
    BoTSORT = BotSort


# --- 1. CONFIGURAZIONE UTENTE ---
VIDEO_SEQ = "1"   # <--- Nome cartella (es. "4", senza zeri iniziali se hai rinominato)
GROUP_ID = "13"   # <--- Il tuo numero di gruppo

# Percorsi aggiornati per essere relativi alla cartella del progetto
DATASET_ROOT = './dataset/videos'
OUTPUT_RESULTS_DIR = './Predictions_folder'
YOLO_WEIGHTS = './runs/RT-Detr/weights/best.pt'
REID_WEIGHTS = './runs/HybridOSNet-CNN_Transformer/model/model_best.pth.tar'



class VisionTransformerBlock(nn.Module):
    def __init__(self, dim, heads=4, height=16, width=8, dropout=0.1, debug_freq=20):
        super().__init__()
        self.dim = dim
        self.heads = heads
        self.height = height
        self.width = width

        # --- AGGIUNTA PER DEBUG ---
        self.debug_freq = debug_freq  # Ogni quanti batch stampare
        self.batch_counter = 0        # Contatore interno
        # --------------------------

        # Proiezioni Q, K, V usando Linear (come il prof)
        # Nota: Linear richiede input (Batch, Seq_Len, Dim)
        self.query_projection = nn.Linear(dim, dim)
        self.key_projection = nn.Linear(dim, dim)
        self.value_projection = nn.Linear(dim, dim)
        self.output_projection = nn.Linear(dim, dim)

        # Positional Embedding IMPARABILE (stile ViT)
        # Invece di quello sinusoidale del prof (ottimo per testo),
        # per le immagini fisse conviene lasciare che la rete impari la posizione.
        self.num_patches = height * width
        self.pos_embedding = nn.Parameter(torch.randn(1, self.num_patches, dim))

        self.norm = nn.LayerNorm(dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        """
        Input x: (Batch, Channel, Height, Width) -> Es. (B, 512, 16, 8)
        """

        # --- LOGICA DI DEBUG PERIODICA ---
        if self.training:
            self.batch_counter += 1
            # Stampa solo se il contatore √® multiplo della frequenza scelta
            if self.batch_counter % self.debug_freq == 0:
                print(f"üîç [ViT Heartbeat] Batch {self.batch_counter} | Input Shape: {x.shape} (Atteso H={self.height}, W={self.width})")
        # ---------------------------------

        B, C, H, W = x.shape

        # 1. FLATTEN: Trasformiamo l'immagine 2D in una sequenza 1D
        # Da (B, C, H, W) -> (B, H*W, C)
        # .permute(0, 2, 3, 1) mette i canali alla fine
        # .flatten(1, 2) schiaccia H e W insieme
        x_flat = x.permute(0, 2, 3, 1).flatten(1, 2) # (B, SeqLen, Dim)

        # 2. Aggiunta Positional Embedding
        # Se le dimensioni cambiano dinamicamente, interpoliamo il pos embedding
        if x_flat.shape[1] != self.pos_embedding.shape[1]:
             # Gestione sicurezza ridimensionamento
             pos_emb = F.interpolate(
                 self.pos_embedding.permute(0, 2, 1).view(1, C, self.height, self.width),
                 size=(H, W), mode='bilinear').flatten(2).permute(0, 2, 1)
             x_flat = x_flat + pos_emb
        else:
             x_flat = x_flat + self.pos_embedding

        # 3. Layer Norm prima dell'attention (Pre-Norm architecture, pi√π stabile)
        residual = x_flat
        x_norm = self.norm(x_flat)

        # 4. Proiezioni Lineari
        Q = self.query_projection(x_norm)
        K = self.key_projection(x_norm)
        V = self.value_projection(x_norm)

        # 5. Split Heads (come nel codice del prof, ma ottimizzato con view)
        # (B, SeqLen, Heads, HeadDim) -> (B, Heads, SeqLen, HeadDim)
        Q = Q.view(B, -1, self.heads, C // self.heads).transpose(1, 2)
        K = K.view(B, -1, self.heads, C // self.heads).transpose(1, 2)
        V = V.view(B, -1, self.heads, C // self.heads).transpose(1, 2)

        # 6. SCALED DOT PRODUCT ATTENTION (Il cuore ottimizzato)
        # is_causal=False perch√© nell'immagine guardiamo tutto il contesto
        out = F.scaled_dot_product_attention(Q, K, V, dropout_p=0.1 if self.training else 0.0, is_causal=False)

        # 7. Join Heads
        out = out.transpose(1, 2).contiguous().view(B, -1, C)

        # 8. Output Projection + Residual
        out = self.output_projection(out)
        out = out + residual

        # 9. UNFLATTEN: Torniamo a immagine 2D
        # Da (B, SeqLen, C) -> (B, C, H, W)
        out = out.permute(0, 2, 1).view(B, C, H, W)

        return out

# --- 1. Moduli Base (CBAM, GeM) restano uguali ---
class ChannelAttention(nn.Module):
    def __init__(self, in_planes, ratio=16):
        super(ChannelAttention, self).__init__()

        # 1. Average Pooling
        # Riduce (H, W) a (1, 1) facendo la media.
        self.avg_pool = nn.AdaptiveAvgPool2d(1)

        # 2. Max Pooling
        # Riduce (H, W) a (1, 1) prendendo il valore massimo.
        # Serve a preservare le feature di texture pi√π forti.
        self.max_pool = nn.AdaptiveMaxPool2d(1)

        # 3. COMPRESSIONE (Shared MLP)
        # Questo √® il primo strato del "bottleneck" che riduce i canali.
        # Usa Conv2d con kernel 1x1 che equivale a un Fully Connected layer.
        self.fc1 = nn.Conv2d(in_planes, in_planes // ratio, 1, bias=False)
        self.relu1 = nn.ReLU()

        # 4. RIESPANSIONE (Shared MLP)
        # Ripristina il numero originale di canali.
        self.fc2 = nn.Conv2d(in_planes // ratio, in_planes, 1, bias=False)

        # 5. GATING (Sigmoide)
        # Prepara l'attivazione tra 0 e 1 per pesare i canali.
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):

        # Le stesse fc1 e fc2 vengono usate SIA per il vettore delle medie SIA per quello dei massimi.

        # Percorso A: Quello che descrivevi tu (Avg -> MLP)

        avg_out = self.fc2(self.relu1(self.fc1(self.avg_pool(x))))

        # Percorso B: L'aggiunta del CBAM (Max -> MLP)
        max_out = self.fc2(self.relu1(self.fc1(self.max_pool(x))))

        # a questo punto il tensore in ingresso √® stato trasformato in due vettori monodimensionali, lunghi la dimensione dei canali.

        # FUSIONE
        # I due vettori vengono sommati element-wise.
        out = avg_out + max_out

        # ATTENZIONE:
        # Questo blocco restituisce SOLO i pesi (la maschera di attenzione),
        # NON esegue la moltiplicazione finale per l'input 'x'.

        return self.sigmoid(out) #restituisce vettore con valori di attention per ogni feature map del tensore x.

class SpatialAttention(nn.Module):
    def __init__(self, kernel_size=7):
        super(SpatialAttention, self).__init__()
        # Per mantenere le dimensioni H x W inalterate dopo una conv 7x7,
        # serve un padding di 3. (formula: p = (k-1)/2)
        padding = 3 if kernel_size == 7 else 1

        # Questa convoluzione ridurr√† i 2 canali di input (Max+Avg) a 1 canale di output.
        self.conv1 = nn.Conv2d(2, 1, kernel_size, padding=padding, bias=False)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        # x √® il tuo tensore di input [Batch, Canali, Altezza, Larghezza]

        # 1. COMPRESSIONE DEI CANALI (Average Pooling)
        # Invece di fare la media spaziale (come nel blocco Channel), qui facciamo la media SUI CANALI.
        # "In media, quanto √® attiva questa posizione (h,w) su tutti i filtri?"
        # Output: [B, 1, H, W]
        avg_out = torch.mean(x, dim=1, keepdim=True)

        # 2. COMPRESSIONE DEI CANALI (Max Pooling)
        # "Qual √® la feature pi√π forte in assoluto in questa posizione?"
        # Questo √® utilissimo per trovare i bordi o dettagli unici del giocatore.
        # Output: [B, 1, H, W]
        max_out, _ = torch.max(x, dim=1, keepdim=True)

        # 3. CONCATENAZIONE
        # Uniamo le due mappe. Ora abbiamo una rappresentazione "grezza" di dove si trovano le informazioni.
        # Output: [B, 2, H, W]
        x = torch.cat([avg_out, max_out], dim=1)

        # 4. CONVOLUZIONE SPAZIALE
        # Qui sta l'intelligenza locale. Un kernel grande (7x7) scorre sull'immagine.
        # Impara a capire che se c'√® un picco di attivazione in un punto,
        # probabilmente √® importante anche l'area subito vicina.
        # Trasforma i 2 canali in 1 solo canale di "importanza spaziale".
        # Output: [B, 1, H, W]
        x = self.conv1(x)

        # 5. GENERAZIONE DELLA MASCHERA di dimensione [B, 1, H, W]
        # Schiaccia i valori tra 0 e 1.
        # 1 = "Questa zona √® importante (il giocatore)"
        # 0 = "Questa zona √® rumore (il campo)"
        return self.sigmoid(x)

class CBAM(nn.Module):
    def __init__(self, in_planes, ratio=16, kernel_size=7):
        super(CBAM, self).__init__()
        self.ca = ChannelAttention(in_planes, ratio)
        self.sa = SpatialAttention(kernel_size)

    def forward(self, x):
        # 1. CHANNEL ATTENTION
        out = x * self.ca(x) # Moltiplica l'input per i pesi dei canali

        # 2. SPATIAL ATTENTION (applicata sull'output del channel)
        result = out * self.sa(out) # Moltiplica il risultato raffinato per la maschera spaziale

        return result

class GeM(nn.Module):
    def __init__(self, p=3.0, eps=1e-6):
        super(GeM, self).__init__()

        # 1. PARAMETRO P ADDESTRABILE
        # Qui sta la magia. Non definiamo p come una costante (self.p = p),
        # ma come un nn.Parameter.
        # Questo dice a PyTorch: "Durante la backpropagation, aggiorna anche questo valore
        # per minimizzare la loss".
        # Inizializziamo a 3.0 perch√© √® un buon punto di partenza empirico per il ReID.
        self.p = nn.Parameter(torch.ones(1) * p)

        # 2. EPSILON
        # Un valore piccolissimo per evitare divisioni per zero o radici di numeri instabili.
        self.eps = eps

    def forward(self, x):
        # x: Tensore di input [Batch, Channels, Height, Width]

        # 3. CLAMPING (Sicurezza Numerica)
        # x.clamp(min=self.eps) forza tutti i valori nel tensore a essere almeno 1e-6.
        # Evita NaN (Not a Number) durante il training.

        # 4. ELEVAMENTO A POTENZA
        # .pow(self.p) eleva ogni singolo pixel alla potenza p (che la rete sta imparando).
        # Se p > 1, questo enfatizza i valori alti (i dettagli salienti del calciatore)
        # e schiaccia verso zero i valori bassi (lo sfondo).

        # 5. GLOBAL AVERAGE POOLING
        # F.avg_pool2d(...) calcola la media spaziale.
        # Il kernel size √® (x.size(-2), x.size(-1)), ovvero (Height, Width).
        # Questo significa: "Prendi tutta la feature map HxW e fanne una media unica".
        # Output parziale: [Batch, Channels, 1, 1]

        # 6. RADICE P-ESIMA (Inverse Power)
        # .pow(1. / self.p) applica la radice p-esima.
        # Serve a riportare i valori alla scala originale (dopo averli elevati alla p prima della media).
        return F.avg_pool2d(x.clamp(min=self.eps).pow(self.p), (x.size(-2), x.size(-1))).pow(1. / self.p)


# --- 2. Modello HPM con Output Multipli ---
class SoccerNetHybridModel(nn.Module):
    def __init__(self, num_classes, model_name='osnet_x1_0', loss='triplet', best_weights_path=None):
        super(SoccerNetHybridModel, self).__init__()
        self.loss = loss

        print(f"üèóÔ∏è HYBRID Model 2.0: Backbone={model_name} + CBAM + VisionTransformer + GeM")

        # Caricamento del backbone OSNet ed eliminazione del classificatore.
        # 1. Costruisci Backbone VUOTO (pretrained=False perch√© carichiamo i nostri)
        base_model = models.build_model(name=model_name, num_classes=num_classes, pretrained=False, loss='triplet')

        # --- CARICAMENTO PESI CUSTOM (WARM-UP) ---
        if best_weights_path and os.path.exists(best_weights_path):
            print(f"‚ôªÔ∏è Caricamento pesi Backbone da: {best_weights_path}")
            checkpoint = torch.load(best_weights_path, weights_only=False)
            state_dict = checkpoint['state_dict']

            # Pulizia delle chiavi (Rimuove 'module.' e layer non backbone)
            new_state_dict = {}
            for k, v in state_dict.items():
                k = k.replace("module.", "") # Gestione DataParallel
                # Carichiamo solo la parte feature extractor, ignoriamo classifier/fc vecchi
                if not k.startswith("classifier") and not k.startswith("fc"):
                    new_state_dict[k] = v

            # Carica con strict=False per ignorare le chiavi mancanti (classifier)
            base_model.load_state_dict(new_state_dict, strict=False)
            print("‚úÖ Pesi Backbone caricati con successo!")
        else:
            print("‚ö†Ô∏è Nessun peso custom fornito o file non trovato. Uso inizializzazione casuale.")
        # -----------------------------------------


        self.backbone = base_model
        if hasattr(self.backbone, 'classifier'): del self.backbone.classifier
        if hasattr(self.backbone, 'fc'): del self.backbone.fc

        # caricamento dei moduli CBAM e GeM
        self.in_channels = 512
        self.cbam = CBAM(self.in_channels)

        # TRANSFORMER BLOCK (Attention Globale - Relazioni a lungo raggio)
        # OSNet riduce le dimensioni di 16x.
        # Se input standard ReID (256x128) -> Feature map (16x8)
        self.trans = VisionTransformerBlock(self.in_channels, heads=4, height=16, width=8)

        self.gem = GeM()

        # 3. Head Singola
        self.bn = nn.BatchNorm1d(self.in_channels)
        self.bn.bias.requires_grad_(False)
        self.bn.apply(self._weights_init_kaiming)

        self.classifier = nn.Linear(self.in_channels, num_classes, bias=False)
        self.classifier.apply(self._weights_init_classifier)

    def forward(self, x):
        features = self.backbone(x, return_featuremaps=True)
        # Output atteso: (B, 512, H, W)

        features = self.cbam(features)
        # CBAM preserva le dimensioni

        # 3. TRANSFORMER (Collega le parti del corpo)
        features = self.trans(features)

        # Pooling (da H,W a 1,1) -> Flatten
        global_features = self.gem(features).view(features.size(0), -1)

        # Normalization
        feat_norm = self.bn(global_features)

        if self.training:
            logits = self.classifier(feat_norm)
            # Ritorna: (logits, features) per le due loss
            if self.loss == 'triplet':
                return logits, feat_norm
            return logits
        else:
            # In inferenza: solo il vettore normalizzato
            return feat_norm

    def _weights_init_kaiming(self, m):
        if isinstance(m, nn.BatchNorm1d):
            nn.init.constant_(m.weight, 1.0); nn.init.constant_(m.bias, 0.0)
    def _weights_init_classifier(self, m):
        if isinstance(m, nn.Linear):
            nn.init.normal_(m.weight, std=0.001);
            if m.bias is not None: nn.init.constant_(m.bias, 0.0)

# ==============================================================================
# PARTE A: CLASSE REID EXTRACTOR (Il tuo modello custom)
# ==============================================================================
class ReIDExtractor:
    def __init__(self, model_path, model_name='osnet_x1_0', device='0'):
        # Logica Device Robusta
        if device == '0' or device == 'cuda':
            self.device = torch.device('cuda')
        else:
            self.device = torch.device('cpu')
        print(f"üîÑ Init ReID: {model_name}...")
        # 1. ISTANZIAZIONE DEL MODELLO CUSTOM
        # Passiamo num_classes=1000 (fittizio) perch√© in inferenza non usiamo il classificatore.
        # Passiamo best_weights_path=None perch√© carichiamo i pesi COMPLETI subito dopo.
        self.model = SoccerNetHybridModel(
            num_classes=1000,
            model_name=model_name,
            loss='triplet',
            best_weights_path=None
        )

        # 2. CARICAMENTO DEI PESI (Full Hybrid Model)
        if model_path and os.path.exists(model_path):
            print(f"üì• Caricamento pesi modello ibrido da: {os.path.basename(model_path)}")
            try:
                # Carica il checkpoint
                checkpoint = torch.load(model_path, map_location=self.device, weights_only = False)

                # Gestione se i pesi sono dentro una chiave 'state_dict' (tipico di torchreid)
                state_dict = checkpoint['state_dict'] if 'state_dict' in checkpoint else checkpoint

                # --- FIX CRITICO: FILTRAGGIO MANUALE ---
                clean_state_dict = {}
                for k, v in state_dict.items():
                    name = k.replace('module.', '') # Rimuove prefisso DataParallel

                    # SE IL NOME CONTIENE 'classifier' o 'fc', LO BUTTIAMO VIA
                    if 'classifier' in name or 'fc' in name:
                        continue

                    clean_state_dict[name] = v

                # Carichiamo solo i pesi filtrati (Backbone + CBAM + ViT + GeM)
                # strict=False √® fondamentale perch√© mancher√† il classifier, ma a noi va bene cos√¨!
                self.model.load_state_dict(clean_state_dict, strict=False)

                print("‚úÖ SUCCESSO: Pesi caricati correttamente (Classifier rimosso, size mismatch evitato).")

            except Exception as e:
                print(f"‚ùå Errore critico nel caricamento pesi: {e}")
        else:
            print(f"‚ö†Ô∏è WARNING: File pesi non trovato: {model_path}. Il modello √® inizializzato a caso!")

        self.model.to(self.device)
        self.model.eval()
        self.transform = transforms.Compose([
            transforms.Resize((256, 128)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    def extract_features(self, crops):
        if len(crops) == 0: return np.empty((0, 512))
        batch = []
        for img in crops:
            if isinstance(img, np.ndarray):
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                img = Image.fromarray(img)
            batch.append(self.transform(img))
        batch = torch.stack(batch).to(self.device)
        with torch.no_grad():
            features = self.model(batch)
        return features.cpu().numpy()




# ==============================================================================
# PARTE C: PIPELINE DI TRACKING
# ==============================================================================
def run_tracking_pipeline(video_seq, group_id):
    # 1. Carica Modelli
    print(f"üèóÔ∏è Caricamento YOLO: {os.path.basename(YOLO_WEIGHTS)}")
    yolo_model = YOLO(YOLO_WEIGHTS)
    reid_extractor = ReIDExtractor(model_path=REID_WEIGHTS, model_name='osnet_x1_0')

    device = '0' if torch.cuda.is_available() else 'cpu'

    # --- TRUCCO ANTI-CRASH (Il "Trojan Horse") ---
    # Il problema: BoTSORT esige che il file dei pesi abbia un nome tipo "modello_dataset.pt".
    # Se il tuo file si chiama "best.pt", la libreria crasha cercando di leggere il nome del dataset.
    # Soluzione: Copiamo temporaneamente i tuoi pesi con un nome che piace a BoTSORT ("..._market1501.pt").

    temp_weights = '/content/osnet_x1_0_market1501.pt' # Nome fittizio "corretto"

    # Usiamo i tuoi pesi se esistono, altrimenti quelli di YOLO come tappabuchi
    source = REID_WEIGHTS if os.path.exists(REID_WEIGHTS) else YOLO_WEIGHTS
    shutil.copy(source, temp_weights)
    print(f"üé≠ Bypass naming check: creato {temp_weights}")




    #risultati migliori con track_buffer basso
    tracker = BoTSORT(
      frame_rate=25,
      device=device,
      half=False,
      reid_weights=temp_weights,
      track_high_thresh=0.25,
      track_low_thresh=0.1,
      new_track_thresh=0.25,
      match_thresh=0.5,
      track_buffer=60,
      mot20=False,
      cmc_method="sof",
      name='botsort',
      ablation=False,
      with_reid=True,
      proximity_thresh=0.3,
      appearance_thresh=0.25
  )



    def get_features_bridge(xyxy, img):
        crops = []
        h, w, _ = img.shape
        for box in xyxy:
            x1, y1, x2, y2 = map(int, box)
            x1, y1, x2, y2 = max(0, x1), max(0, y1), min(w, x2), min(h, y2)
            crop = img[y1:y2, x1:x2] if x2 > x1 and y2 > y1 else np.zeros((256, 128, 3), dtype=np.uint8)
            crops.append(crop)
        return reid_extractor.extract_features(crops)

    reid_extractor.get_features = get_features_bridge


    # Pulizia: cancelliamo il file temporaneo
    if os.path.exists(temp_weights):
        os.remove(temp_weights)

    # --- 3. HOT-SWAP DEL MODELLO (La parte cruciale) ---
    # Ora che il tracker √® inizializzato, buttiamo via il modello che ha caricato lui
    # e ci mettiamo dentro il TUO 'reid_extractor' configurato con torchreid.
    tracker.model = reid_extractor

    # Adattatore: BoTSORT a volte chiama funzioni diverse, le mappiamo tutte al tuo extractor
    tracker.model.get_features = lambda x: reid_extractor.extract_features(x)
    tracker.model.forward = lambda x: reid_extractor.extract_features(x)

    # Colleghiamo il bridge al tracker
    tracker.model.get_features = get_features_bridge
    # Colleghiamo anche forward per sicurezza (anche se BoTSORT usa get_features)
    tracker.model.forward = get_features_bridge



    print("üîß ReID Custom integrato nel Tracker (Modello standard rimosso).")

    # [BEHAVIOR ADDON 1] CARICAMENTO ROI
    # Carichiamo le ROI prima di iniziare il loop sui frame
    roi_path = os.path.join(DATASET_ROOT, video_seq, 'roi.json')
    rois = {}
    roi_names = []
    if os.path.exists(roi_path):
        with open(roi_path) as f:
            rois = json.load(f)
        roi_names = sorted(rois.keys()) # Ordine alfabetico essenziale per region_id 1 e 2
        print(f"üìê ROI caricate per behavior: {roi_names}")
    else:
        print("‚ö†Ô∏è Nessun file roi.json trovato! Il file behavior sar√† vuoto.")

    # Struttura per accumulare i conteggi: {frame_id: {region_id: count}}
    behavior_accumulated = {}

    # 4. Preparazione Loop
    img_dir = os.path.join(DATASET_ROOT, video_seq, 'img1')
    if not os.path.exists(img_dir): raise FileNotFoundError(f"Manca cartella: {img_dir}")

    frames = sorted(glob.glob(os.path.join(img_dir, "*.jpg")) + glob.glob(os.path.join(img_dir, "*.png")))
    os.makedirs(OUTPUT_RESULTS_DIR, exist_ok=True)
    results_txt = []

    # --- NUOVO: CONFIGURAZIONE CARTELLE DEBUG ---
    debug_base_dir = os.path.join(OUTPUT_RESULTS_DIR, 'debug_frames_smart', video_seq)
    debug_raw_dir = os.path.join(debug_base_dir, 'raw')
    debug_clean_dir = os.path.join(debug_base_dir, 'clean')

    # Pulizia preventiva cartelle debug per evitare file vecchi
    if os.path.exists(debug_base_dir):
        shutil.rmtree(debug_base_dir)
    os.makedirs(debug_raw_dir, exist_ok=True)
    os.makedirs(debug_clean_dir, exist_ok=True)

    rejection_count = 0

    print(f"\nüöÄ START TRACKING: {video_seq} ({len(frames)} frames)")

    # 5. LOOP PRINCIPALE (Detection -> Field Filter -> Batch Shadow Filter -> Track)
    for i, img_path in enumerate(tqdm(frames)):
        frame = cv2.imread(img_path)
        if frame is None: continue
        fid = i + 1

        debug_filename = f"{fid:06d}.txt"


        # A. Pre-elaborazione
        mask = get_field_mask_ransac(frame) # Usa la NUOVA funzione con Ransac
        yolo_out = yolo_model.predict(frame, conf=0.25, iou=0.6, verbose=False, imgsz=1088, augment=True, half=True)[0]

        # Strutture dati per questo frame
        raw_lines = []         # Per debug: tutte le detection YOLO
        clean_lines = []       # Per debug: solo quelle finali

        # Lista temporanea per i candidati che passano il primo step (Campo)
        # Salviamo un dizionario per non perdere conf, cls e coordinate originali
        candidates_on_field = []

        # --- STEP 1: Estrazione YOLO e Filtro CAMPO ---
        for box in yolo_out.boxes:
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            conf = float(box.conf[0].cpu().numpy())
            cls = int(box.cls[0].cpu().numpy())

            # Calcolo xywh (Interi per le funzioni di filtro)
            w, h = int(x2 - x1), int(y2 - y1)
            x, y = int(x1), int(y1)
            bbox_xywh = [x, y, w, h]

            # Salviamo la stringa RAW per debug
            line_str = f"{x1:.2f},{y1:.2f},{x2:.2f},{y2:.2f},{conf:.4f},{cls}"
            raw_lines.append(line_str)

            # 1. Filtro Piedi in Campo
            if is_feet_in_field(bbox_xywh, mask):
                candidates_on_field.append({
                    'xywh': bbox_xywh,                 # [x, y, w, h] per i filtri
                    'tracker_data': [x1, y1, x2, y2, conf, cls], # [x1, y1, x2, y2, conf, cls] per BoTSORT
                    'line_str': line_str               # Per debug CLEAN
                })

        # --- STEP 2: Filtro OMBRE BATCH (Processa il gruppo) ---
        # Lista temporanea per chi sopravvive alle ombre
        survivors_shadow = []

        if candidates_on_field:
            # Estraiamo solo la lista [x,y,w,h] da passare alla funzione batch
            input_boxes = [c['xywh'] for c in candidates_on_field]

            # La funzione restituisce la lista dei box [x,y,w,h] SOPRAVVISSUTI
            final_valid_boxes_shadow = batch_shadow_filtering(input_boxes, frame)

            # Recuperiamo i dizionari completi dei sopravvissuti alle ombre
            for cand in candidates_on_field:
                if cand['xywh'] in final_valid_boxes_shadow:
                    survivors_shadow.append(cand)

        # --- [NUOVO] STEP 2.5: Filtro BLUR ARTIFACTS (Anti-Ghosting) ---
        clean_dets = []

        if survivors_shadow:
            # Prepariamo gli input specifici per la funzione:
            # 1. Lista box [x, y, w, h]
            blur_in_boxes = [s['xywh'] for s in survivors_shadow]
            # 2. Lista confidenze (l'indice 4 di tracker_data √® la confidenza)
            blur_in_confs = [s['tracker_data'][4] for s in survivors_shadow]

            # Chiamata alla funzione (con tolleranza verticale stretta = 2px)
            blur_out_boxes, _ = clean_blur_artifacts(
                blur_in_boxes,
                blur_in_confs,
                iou_thresh=0.4,
                vertical_tol=3,       # <--- Fondamentale per il blur orizzontale
                conf_target_thresh=0.40
            )

            # Ricostruzione finale per il Tracker
            # Manteniamo solo i candidati il cui box √® presente nell'output del filtro blur
            for s in survivors_shadow:
                if s['xywh'] in blur_out_boxes:
                    clean_dets.append(s['tracker_data'])
                    clean_lines.append(s['line_str'])

        # Gestione Debug (Salviamo se c'√® differenza tra Raw e Clean)
        has_rejections = len(raw_lines) != len(clean_lines)
        if has_rejections:
            rejection_count += 1
            with open(os.path.join(debug_raw_dir, debug_filename), 'w') as f:
                f.write('\n'.join(raw_lines))
            with open(os.path.join(debug_clean_dir, debug_filename), 'w') as f:
                f.write('\n'.join(clean_lines))

        # --- STEP 3: Tracking Update ---
        dets_array = np.array(clean_dets) if clean_dets else np.empty((0, 6))

        # 1. Otteniamo le tracce  dal Tracker BoTSORT
        online_targets = tracker.update(dets_array, frame)

        # [BEHAVIOR ADDON 2] INIZIALIZZA CONTEGGIO FRAME
        # Reset contatori per questo frame
        current_frame_counts = {1: 0, 2: 0} # Supporta fino a 2 ROI come da specifiche

        # C. Formattazione Output
        for t in online_targets:
            x1, y1, x2, y2 = t[0], t[1], t[2], t[3]
            tid = int(t[4])
            conf = t[5]

            w_box = x2 - x1
            h_box = y2 - y1

            # --- LOGICA BEHAVIOR (Center of Basis) ---
            # Calcolo il punto centrale dei PIEDI (non del rettangolo intero)
            foot_cx = x1 + (w_box / 2)
            foot_cy = y2

            # Controllo ROI
            img_h, img_w = frame.shape[:2] # Dovrebbe essere 1080, 1920

            for r_idx, r_name in enumerate(roi_names):
                r = rois[r_name]
                # Convertiamo ROI normalizzata in Pixel
                rx1 = r["x"] * img_w
                ry1 = r["y"] * img_h
                rx2 = (r["x"] + r["width"]) * img_w
                ry2 = (r["y"] + r["height"]) * img_h

                # Check inclusione
                if rx1 <= foot_cx <= rx2 and ry1 <= foot_cy <= ry2:
                    region_id = r_idx + 1 # 1-based index
                    if region_id in current_frame_counts:
                        current_frame_counts[region_id] += 1
                    break # IMPORTANTE: Se √® nella ROI 1, non contarlo nella ROI 2

            line = f"{fid},{tid},{x1:.2f},{y1:.2f},{w_box:.2f},{h_box:.2f},{conf:.2f},-1,-1,-1"
            results_txt.append(line)

        # Salviamo i conteggi di questo frame nel dizionario globale
        behavior_accumulated[fid] = current_frame_counts

    # --- SALVATAGGIO FILE ---

    # 1. FILE TRACKING: tracking_VID_GROUP.txt
    tracking_filename = f"tracking_{video_seq}_{group_id}.txt"
    out_file = os.path.join(OUTPUT_RESULTS_DIR, tracking_filename)

    with open(out_file, 'w') as f:
        f.write('\n'.join(results_txt))
    print(f"üíæ Risultati Tracking salvati in: {out_file}")

    # 2. FILE BEHAVIOR: behavior_VID_GROUP.txt
    behavior_filename = f"behavior_{video_seq}_{group_id}.txt"
    beh_out_file = os.path.join(OUTPUT_RESULTS_DIR, behavior_filename)

    with open(beh_out_file, 'w') as f:
        for f_id in sorted(behavior_accumulated.keys()):
            counts = behavior_accumulated[f_id]
            f.write(f"{f_id},1,{counts[1]}\n")
            if len(roi_names) > 1:
                f.write(f"{f_id},2,{counts[2]}\n")

    print(f"üìä Risultati Behavior salvati in: {beh_out_file}")
    print(f"üóëÔ∏è Frame con scarti salvati per debug: {rejection_count}")

    return out_file, beh_out_file

if __name__ == "__main__":
    if os.path.exists(DATASET_ROOT):
        try:
            # Passiamo sia la sequenza video che il gruppo
            track_file, beh_file = run_tracking_pipeline(VIDEO_SEQ, GROUP_ID)
            print(f"\n‚úÖ Pipeline completata per Video {VIDEO_SEQ} - Gruppo {GROUP_ID}")
        except Exception as e:
            print(f"\n‚ùå Errore durante l'esecuzione: {e}")
            import traceback
            traceback.print_exc()
    else:
        print(f"‚ùå Errore Critico: La cartella dataset non esiste: {DATASET_ROOT}")

# creazione behavior_gt

In [None]:
import os
import glob

# --- CONFIGURAZIONE ---
base_path = './dataset/test_set_videos'
image_width = 1920
image_height = 1080

# Definizione delle ROI (normalizzate)
roi_content = {
    1: {"x": 0.01, "y": 0.01, "width": 0.4, "height": 0.75}, # ROI 1
    2: {"x": 0.5, "y": 0.35, "width": 0.5, "height": 0.5}    # ROI 2
}

def is_point_in_roi(px, py, roi_def, img_w, img_h):
    """
    Verifica se il punto (px, py) cade nella ROI specificata.
    Le coordinate della ROI sono normalizzate (0-1), quelle del punto sono in pixel.
    """
    # Conversione ROI da normalizzato a pixel assoluti
    rx = roi_def["x"] * img_w
    ry = roi_def["y"] * img_h
    rw = roi_def["width"] * img_w
    rh = roi_def["height"] * img_h

    # Verifica inclusione
    return (rx <= px <= rx + rw) and (ry <= py <= ry + rh)

def generate_behavior_gt():
    # Cerca tutte le cartelle video. Poich√© sono numeri (001, 002...),
    # filtriamo per assicurarci di prendere solo le directory numeriche.
    all_items = sorted(os.listdir(base_path))
    video_folders = [os.path.join(base_path, item) for item in all_items if item.isdigit() and os.path.isdir(os.path.join(base_path, item))]

    print(f"Trovate {len(video_folders)} sequenze video in {base_path} (da {os.path.basename(video_folders[0])} a {os.path.basename(video_folders[-1])})")

    for video_folder in video_folders:
        video_name = os.path.basename(video_folder)

        # DEFINIZIONE PERCORSI AGGIORNATA
        gt_track_path = os.path.join(video_folder, 'gt', 'gt.txt')
        output_path = os.path.join(video_folder, 'gt', 'behavior_gt.txt') # Ora √® dentro la cartella gt

        if not os.path.exists(gt_track_path):
            print(f"ATTENZIONE: gt.txt non trovato per {video_name}, salto.")
            continue

        # Dizionario per accumulare i conteggi: frame_id -> {roi_id: count}
        frame_counts = {}

        # 1. Lettura e Processing del Tracking GT
        with open(gt_track_path, 'r') as f:
            for line in f:
                parts = line.strip().split(',')
                # Parsing: frame, id, left, top, width, height, ...
                try:
                    frame_id = int(parts[0])
                    # Coordinate Bbox
                    left = float(parts[2])
                    top = float(parts[3])
                    width = float(parts[4])
                    height = float(parts[5])
                except ValueError:
                    continue # Salta righe malformate se ce ne fossero

                # Calcolo del punto "piedi" (centro del lato inferiore)
                foot_x = left + (width / 2.0)
                foot_y = top + height

                # Inizializza il frame nel dizionario se non esiste
                if frame_id not in frame_counts:
                    frame_counts[frame_id] = {1: 0, 2: 0}

                # Verifica ROI 1
                if is_point_in_roi(foot_x, foot_y, roi_content[1], image_width, image_height):
                    frame_counts[frame_id][1] += 1

                # Verifica ROI 2
                if is_point_in_roi(foot_x, foot_y, roi_content[2], image_width, image_height):
                    frame_counts[frame_id][2] += 1

        # 2. Scrittura del file behavior_gt.txt
        sorted_frames = sorted(frame_counts.keys())

        if len(sorted_frames) == 0:
            print(f"Nessun dato valido trovato in {video_name}.")
            continue

        print(f"Scrittura behavior_gt.txt per {video_name} in {output_path} ({len(sorted_frames)} frame)...")

        with open(output_path, 'w') as out_f:
            for fid in sorted_frames:
                # Scrittura ROI 1: frame, region_id, count
                out_f.write(f"{fid},1,{frame_counts[fid][1]}\n")
                # Scrittura ROI 2: frame, region_id, count
                out_f.write(f"{fid},2,{frame_counts[fid][2]}\n")

    print("\nGenerazione completata per tutti i video.")

# Esegui la funzione
generate_behavior_gt()