<a href="https://colab.research.google.com/github/Riccardo-Venturi/Tesi_Script_Colab/blob/main/Unet%2B%2BV2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

    Obiettivo del Pre-addestramento (le 25 epoche): L'obiettivo NON è la precisione. È insegnare al modello le caratteristiche di basso livello:

        Che aspetto ha la texture della fibra di carbonio?

        Qual è la forma generale, circolare, di un foro?

        Dove, approssimativamente, si trova il danno rispetto al foro?

        Come distinguere i bordi netti da quelli frastagliati?

  Poi si passa al fine-tune incrementale pre-trained_model.pth; modello conoscenza base dominio del danno :: **ENCODER CONGELATO**

Il Ciclo di Fine-Tuning (per ogni "ondata" di 10 maschere nuove):

  model.load_state_dict(torch.load('pre-trained_model.pth'))
   for param in model.parameters():
    param.requires_grad = True

<details>Learning Rate BASSISSIMO: Questa è la regola d'oro del fine-tuning.[2][3] I pesi sono già buoni, non vogliamo stravolgerli. Usa un learning rate di uno o due ordini di grandezza inferiore a quello del pre-training. Se prima avevi 1e-3, ora usa 1e-5.[4]

Pochissime Epoche per Sessione: NON devi fare 25 epoche su 10 immagini. Le imparerebbe a memoria in un attimo. Per ogni sessione di fine-tuning (dopo aver aggiunto 10 nuove maschere), addestra per 3-5 epoche al massimo.[5] L'obiettivo è dare un "nudge", una piccola spinta nella direzione giusta, non ri-addestrare da capo.[6]

Monitoraggio Ossessivo della Validation Loss: Ad ogni epoca, calcola la loss sul tuo "Validation Set d'Elite".

    Se la val_loss scende: Fantastico! Salva il modello. Stai andando nella direzione giusta.

    Se la val_loss smette di scendere o, peggio, inizia a salire mentre la train_loss continua a scendere: STOP IMMEDIATO. Quello è il segnale inequivocabile di overfitting.[7][8] Significa che il modello ha smesso di imparare e ha iniziato a memorizzare.

Data Augmentation Forte: Siccome il tuo Golden Dataset è piccolo, devi sfruttarlo al massimo. Usa albumentations per applicare rotazioni, flip, variazioni di luminosità/contrasto. Questo crea artificialmente nuovi esempi e costringe il modello a generalizzare.

In [None]:
# ===================================================================
# #@title CELLA 1: SETUP INIZIALE
# ===================================================================
print("--- [1/4] Installazione librerie ---")
!pip install -q segmentation-models-pytorch==0.3.3 albumentations==1.3.1 torchinfo==1.8.0

import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Dataset
import segmentation_models_pytorch as smp
import albumentations as A
from albumentations.pytorch import ToTensorV2
import cv2
import numpy as np
from pathlib import Path
from tqdm.notebook import tqdm
import shutil
import random
import re

print("✅ Librerie pronte.")

In [None]:
# =============================================================================
# CELLA 2: PREPARAZIONE E SPLIT DEL DATASET RADIOGRAFICO
# =============================================================================
print("--- [2/4] Preparazione e Split del Dataset ---")

# --- 1. CONFIGURA I TUOI PERCORSI ---
# Attenzione: .glob('**/*.jpg') legge ricorsivamente in tutte le sottocartelle
IMG_DIR = Path("/content/drive/MyDrive/Patches_test/Radio_Patches_Normalized")
MASK_DIR = Path("/content/drive/MyDrive/Patches_test/Patches_masks") # Le nuove maschere

# Directory di output per il dataset splittato
DATASET_DIR = Path("/content/dataset_radiografico_split")

# --- 2. TROVA E ACCOPPIA I FILE ---
print("Scansione file in corso...")
all_images = list(IMG_DIR.glob("**/*.jpg"))
all_masks = list(MASK_DIR.glob("*.png"))
print(f"Trovate {len(all_images)} immagini e {len(all_masks)} maschere.")

# Per accoppiarli, dobbiamo usare un ID comune. Usiamo il numero del foro.
# Funzione per estrarre H<numero> dal nome del file
def extract_hole_id(path):
    match = re.search(r'H(\d+)', path.stem)
    return f"H{match.group(1)}" if match else None

# Creiamo dei dizionari per un lookup veloce
img_map = {extract_hole_id(p): p for p in all_images if extract_hole_id(p) is not None}
mask_map = {extract_hole_id(p): p for p in all_masks if extract_hole_id(p) is not None}

# Trova gli ID in comune
common_ids = sorted(list(set(img_map.keys()) & set(mask_map.keys())))
print(f"\nTrovate {len(common_ids)} coppie (immagine-maschera) corrispondenti.")

# --- 3. CREAZIONE LISTA DI COPPIE E SPLIT ---
# Lista di tuple (percorso_immagine, percorso_maschera)
paired_files = [(img_map[id], mask_map[id]) for id in common_ids]

if DATASET_DIR.exists(): shutil.rmtree(DATASET_DIR)
for subset in ["train", "val", "test"]:
    (DATASET_DIR / subset / "images").mkdir(parents=True)
    (DATASET_DIR / subset / "masks").mkdir(parents=True)

random.seed(42)
random.shuffle(paired_files)
train_end = int(len(paired_files) * 0.8)
val_end = train_end + int(len(paired_files) * 0.1)

train_files = paired_files[:train_end]
val_files = paired_files[train_end:val_end]
test_files = paired_files[val_end:]

def copy_files(files, subset_name):
    for img_path, mask_path in tqdm(files, desc=f"Copia in {subset_name}"):
        # Usiamo lo stesso nome file per entrambi per coerenza
        shutil.copy(img_path, DATASET_DIR / subset_name / "images" / mask_path.name)
        shutil.copy(mask_path, DATASET_DIR / subset_name / "masks" / mask_path.name)

copy_files(train_files, "train")
copy_files(val_files, "val")
copy_files(test_files, "test")

print(f"\n✅ Dataset creato con successo in '{DATASET_DIR}':")
print(f"   - Train: {len(train_files)} campioni")
print(f"   - Val: {len(val_files)} campioni")
print(f"   - Test: {len(test_files)} campioni")

In [None]:
# ===================================================================
# CELLA 3: DATASET PYTORCH E DATALOADERS
# ===================================================================
print("--- [3/4] Creazione Dataset e DataLoaders ---")

# --- 1. PARAMETRI ---
IMG_SIZE = 512
BATCH_SIZE = 8 # Iniziamo con 8, si può aumentare se la VRAM lo permette

# --- 2. TRASFORMAZIONI CON ALBUMENTATIONS ---
# Queste sono immagini in scala di grigi (1 canale)
transform = A.Compose([
    A.Resize(IMG_SIZE, IMG_SIZE),
    A.Normalize(mean=(0.5,), std=(0.5,)), # Normalizzazione per 1 canale
    ToTensorV2(),
])

# --- 3. CLASSE DATASET PERSONALIZZATA ---
class RadioDataset(Dataset):
    def __init__(self, subset):
        self.img_dir = DATASET_DIR / subset / "images"
        self.mask_dir = DATASET_DIR / subset / "masks"
        # I nomi ora corrispondono perfettamente grazie alla cella precedente
        self.file_names = sorted([p.name for p in self.mask_dir.glob("*.png")])

    def __len__(self):
        return len(self.file_names)

    def __getitem__(self, idx):
        file_name = self.file_names[idx]
        img_path = self.img_dir / file_name
        mask_path = self.mask_dir / file_name

        # Carica immagine in SCALA DI GRIGI
        img = cv2.imread(str(img_path), cv2.IMREAD_GRAYSCALE)
        # Carica maschera
        mask = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)

        # Applica le trasformazioni
        augmented = transform(image=img, mask=mask)
        # Ritorna il tensore immagine e il tensore maschera (.long() è cruciale per la loss)
        return augmented['image'], augmented['mask'].long()

# --- 4. CREA I DATALOADER ---
train_loader = DataLoader(RadioDataset("train"), batch_size=BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(RadioDataset("val"), batch_size=BATCH_SIZE, shuffle=False, num_workers=2)

print(f"✅ DataLoaders pronti. Batch size: {BATCH_SIZE}.")

# Test veloce per verificare che tutto funzioni
try:
    img_batch, msk_batch = next(iter(train_loader))
    print(f"Dimensioni batch immagini: {img_batch.shape}") # Atteso: [BATCH_SIZE, 1, 512, 512]
    print(f"Dimensioni batch maschere: {msk_batch.shape}")   # Atteso: [BATCH_SIZE, 512, 512]
except Exception as e:
    print(f"❌ ERRORE NEL DATALOADER: {e}")

In [None]:
# =============================================================================
# CELLA 4: TRAINING LOOP (PRE-ADDESTRAMENTO CON ENCODER CONGELATO)
# =============================================================================
print("--- [4/4] Avvio Pre-addestramento ---")

# --- 1. PARAMETRI DI TRAINING ---
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
ENCODER_NAME = "efficientnet-b0" # Leggero ed efficace
NUM_EPOCHS = 25 # Come richiesto
LEARNING_RATE = 1e-3 # Partiamo un po' più aggressivi per il pre-training
CHECKPOINT_PATH = "/content/drive/MyDrive/PesiUNETPP/best_model_pre-training.pth"

# --- 2. DEFINIZIONE MODELLO ---
model = smp.UnetPlusPlus(
    encoder_name=ENCODER_NAME,
    encoder_weights="imagenet", # Usiamo i pesi pre-addestrati!
    in_channels=1,              # <-- CRUCIALE: le nostre immagini hanno 1 canale
    classes=3                   # 0: bg, 1: hole, 2: damage
).to(DEVICE)

# --- 3. STRATEGIA CECCHINO: CONGELA L'ENCODER ---
for param in model.encoder.parameters():
    param.requires_grad = False
print(f"✅ Modello UNet++ con encoder '{ENCODER_NAME}' (CONGELATO) pronto su {DEVICE}.")

# --- 4. LOSS, OPTIMIZER, SCHEDULER ---
# La DiceLoss è un'ottima scelta per la segmentazione sbilanciata
loss_fn = smp.losses.DiceLoss(mode='multiclass', from_logits=True)
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS)
scaler = torch.cuda.amp.GradScaler() # Per mixed precision

# --- 5. LOOP DI TRAINING ---
best_val_loss = float('inf')

for epoch in range(NUM_EPOCHS):
    model.train()
    train_loss = 0.0
    loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/{NUM_EPOCHS}")
    for images, masks in loop:
        images, masks = images.to(DEVICE), masks.to(DEVICE)

        optimizer.zero_grad()

        with torch.cuda.amp.autocast():
            outputs = model(images)
            loss = loss_fn(outputs, masks)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        train_loss += loss.item()
        loop.set_postfix(loss=loss.item())

    model.eval()
    val_loss = 0.0
    with torch.no_grad():
        for images, masks in val_loader:
            images, masks = images.to(DEVICE), masks.to(DEVICE)
            outputs = model(images)
            loss = loss_fn(outputs, masks)
            val_loss += loss.item()

    avg_train_loss = train_loss / len(train_loader)
    avg_val_loss = val_loss / len(val_loader)

    print(f"Epoch {epoch+1}: Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f} | LR: {scheduler.get_last_lr()[0]:.6f}")

    if avg_val_loss < best_val_loss:
        print(f"🏆 Nuovo modello migliore! Salvo i pesi in '{CHECKPOINT_PATH}'")
        best_val_loss = avg_val_loss
        # Assicurati che la cartella esista
        Path(CHECKPOINT_PATH).parent.mkdir(parents=True, exist_ok=True)
        torch.save(model.state_dict(), CHECKPOINT_PATH)

    scheduler.step()

print("\n🎉 Pre-addestramento Finito.")

In [None]:
# =============================================================================
# CELLA 4.5: VALUTAZIONE DEL MODELLO PRE-ADDESTRATO (ROBUSTA SMP)
# =============================================================================
import torch
import segmentation_models_pytorch as smp
from tqdm import tqdm
import numpy as np
import cv2
from pathlib import Path

print("--- [FASE di Valutazione] Valutazione del modello pre-addestrato sul Test Set ---")

MODEL_TO_EVAL_PATH = "/content/drive/MyDrive/PesiUNETPP/best_model_pre-training.pth"
EVAL_DATASET_DIR = Path("/content/dataset_radiografico_split")
PREDICTIONS_DIR = Path("/content/pre-training_predictions"); PREDICTIONS_DIR.mkdir(exist_ok=True)

# --- Modello ---
model = smp.UnetPlusPlus(encoder_name=ENCODER_NAME, in_channels=1, classes=3).to(DEVICE)
model.load_state_dict(torch.load(MODEL_TO_EVAL_PATH, map_location=DEVICE))
model.eval()

# --- Dataloader test ---
test_dataset = RadioDataset("test")
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, drop_last=False)
test_file_names = test_dataset.file_names
print(f"Valutazione su {len(test_dataset)} campioni del test set.")

# --- Accumulatori per stats (liste di tensori [B,C]) ---
tps, fps, fns, tns = [], [], [], []

with torch.no_grad():
    for i, (images, masks) in enumerate(tqdm(test_loader, desc="Generando predizioni sul test set")):
        images = images.to(DEVICE, non_blocking=True)
        masks_gt = masks.to(DEVICE, non_blocking=True).long()   # dtype intero, shape [B,H,W]

        logits = model(images)           # [B,3,H,W]
        preds  = torch.argmax(logits, dim=1)  # [B,H,W] in {0,1,2}

        # 1) Stats per-batch (posizionali, NO keywords per compatibilità SMP)
        tp, fp, fn, tn = smp.metrics.get_stats(preds, masks_gt, mode='multiclass', num_classes=3)

        # 2) Accumula per concatenazione successiva
        tps.append(tp); fps.append(fp); fns.append(fn); tns.append(tn)

        # 3) Salva maschere predette per ispezione
        preds_np = preds.detach().cpu().numpy()
        for j, pred_mask in enumerate(preds_np):
            file_idx = i * BATCH_SIZE + j
            if file_idx < len(test_file_names):
                cv2.imwrite(str(PREDICTIONS_DIR / test_file_names[file_idx]),
                            pred_mask.astype(np.uint8))

# --- Concatena su dimensione "immagini" ---
tp = torch.cat(tps, dim=0)
fp = torch.cat(fps, dim=0)
fn = torch.cat(fns, dim=0)
tn = torch.cat(tns, dim=0)

# Sanity checks
assert tp.ndim == 2 and tp.shape[1] == 3, f"tp shape inattesa: {tp.shape}"
M, C = tp.shape

# --- Metriche ---
iou_micro_img  = smp.metrics.iou_score(tp, fp, fn, tn, reduction='micro-imagewise')
dice_micro_img = smp.metrics.f1_score (tp, fp, fn, tn, reduction='micro-imagewise')

# Per-classe (macro su immagini)
iou_per_image_per_class = smp.metrics.iou_score(tp, fp, fn, tn, reduction='none')  # [M,C]
dice_per_image_per_class = smp.metrics.f1_score(tp, fp, fn, tn, reduction='none')  # [M,C]
iou_per_class  = iou_per_image_per_class.mean(dim=0)   # [C]
dice_per_class = dice_per_image_per_class.mean(dim=0)  # [C]

# Micro globale (sommone)
iou_micro  = smp.metrics.iou_score(tp, fp, fn, tn, reduction='micro')
dice_micro = smp.metrics.f1_score (tp, fp, fn, tn, reduction='micro')

print("\n--- METRICHE SUL TEST SET (Modello Pre-Addestrato) ---")
print(f"-> IoU micro-imagewise: {iou_micro_img.item():.4f}")
print(f"-> Dice micro-imagewise:{dice_micro_img.item():.4f}")
print(f"-> IoU micro (globale): {iou_micro.item():.4f}")
print(f"-> Dice micro (globale):{dice_micro.item():.4f}")
for c, (ii, dd) in enumerate(zip(iou_per_class, dice_per_class)):
    print(f"   Classe {c}: IoU={ii.item():.4f}  Dice={dd.item():.4f}")
print("-----------------------------------------------------")


In [None]:
#@title visualizza Maschere e scarica zip
# QA + PACKAGER per CVAT "Segmentation mask 1.1"
# ------------------------------------------------
import os, glob, zipfile, cv2, numpy as np, shutil, matplotlib
from pathlib import Path

# CONFIG
MASKS_DIR = Path("/content/pre-training_predictions")   # original 0/1/2 masks
VIS_DIR   = Path("/content/mask_previews")                   # where colored previews will go
ZIP_PATH  = Path("/content/mask_previews.zip")

# Clean previous run
if VIS_DIR.exists():
    shutil.rmtree(VIS_DIR)
VIS_DIR.mkdir(parents=True, exist_ok=True)

# Generate colored previews
for p in sorted(MASKS_DIR.glob("*.png")):
    m = cv2.imread(str(p), cv2.IMREAD_UNCHANGED)
    if m is None:
        print(f"Skip unreadable {p.name}")
        continue
    # ensure uint8 0/1/2
    m = np.clip(np.rint(m), 0, 2).astype(np.uint8)
    vis = cv2.applyColorMap((m*120).astype(np.uint8), cv2.COLORMAP_PARULA)
    out_path = VIS_DIR / p.name
    cv2.imwrite(str(out_path), vis)

# Zip the previews
if ZIP_PATH.exists():
    ZIP_PATH.unlink()
with zipfile.ZipFile(ZIP_PATH, "w", compression=zipfile.ZIP_DEFLATED) as zf:
    for img_path in VIS_DIR.glob("*.png"):
        zf.write(img_path, img_path.name)

ZIP_PATH



In [None]:
# =============================================================================
# CELLA 5.25: CONVERTITORE UNA TANTUM (COLORE -> 0,1,2)
# =============================================================================
import cv2
import numpy as np
from pathlib import Path
from tqdm.notebook import tqdm
import shutil

print("--- [TASK UNA TANTUM] Conversione delle maschere 'd'oro' da Colore a Numerico ---")

# --- 1. CONFIGURAZIONE ---
# La cartella con le maschere a colori esportate da CVAT
SOURCE_GOLDEN_MASKS_DIR = Path("/content/drive/MyDrive/GoldenDataset/masks")

# La nuova cartella dove salveremo le maschere corrette (formato 0,1,2)
# NON SOVRASCRIVIAMO NIENTE PER SICUREZZA
TARGET_CLEAN_MASKS_DIR = Path("/content/drive/MyDrive/GoldenDataset_Clean/masks")
TARGET_CLEAN_MASKS_DIR.mkdir(parents=True, exist_ok=True)

# Copia anche le immagini corrispondenti nella nuova struttura
SOURCE_GOLDEN_IMG_DIR = Path("/content/drive/MyDrive/GoldenDataset/images")
TARGET_CLEAN_IMG_DIR = Path("/content/drive/MyDrive/GoldenDataset_Clean/images")
TARGET_CLEAN_IMG_DIR.mkdir(parents=True, exist_ok=True)
print("Copia delle immagini sorgente...")
for img_file in SOURCE_GOLDEN_IMG_DIR.glob("*.png"):
    shutil.copy(img_file, TARGET_CLEAN_IMG_DIR / img_file.name)

# La mappa di conversione (BGR di OpenCV -> Indice di Classe)
COLOR_MAP = {
    (0, 0, 0): 0,          # background
    (100, 100, 255): 1,    # hole (Rosso/Rosa)
    (220, 220, 0): 2       # damage (Ciano)
}

# --- 2. CICLO DI CONVERSIONE ---
print(f"\nInizio conversione di {len(list(SOURCE_GOLDEN_MASKS_DIR.glob('*.png')))} maschere...")
for mask_color_path in tqdm(SOURCE_GOLDEN_MASKS_DIR.glob("*.png")):
    mask_color = cv2.imread(str(mask_color_path), cv2.IMREAD_COLOR)

    h, w, _ = mask_color.shape
    mask_indexed = np.zeros((h, w), dtype=np.uint8)

    for color_bgr, class_index in COLOR_MAP.items():
        mask_indexed[np.all(mask_color == color_bgr, axis=-1)] = class_index

    save_path = TARGET_CLEAN_MASKS_DIR / mask_color_path.name
    cv2.imwrite(str(save_path), mask_indexed)

print(f"\n✅ CONVERSIONE COMPLETATA. Le tue maschere numeriche sono pronte in:")
print(f"   -> {TARGET_CLEAN_MASKS_DIR}")
print("Ora puoi usare questa cartella per il fine-tuning.")

In [None]:
# =============================================================================
# CELLA 5.30: PREPARAZIONE DATI PER IL FINE-TUNING (robusta jpg/png)
# =============================================================================
import shutil
from pathlib import Path
from sklearn.model_selection import train_test_split

print("--- [FASE A] Preparazione dei Dati 'd'Oro' per il Fine-Tuning ---")

# --- 1. CONFIGURA I PERCORSI ---
GOLDEN_IMG_DIR = Path("/content/drive/MyDrive/GoldenDataset/images")
GOLDEN_MASK_DIR = Path("/content/drive/MyDrive/GoldenDataset_Clean/masks")
FINETUNE_DATASET_DIR = Path("/content/finetune_dataset")

GOLDEN_IMG_DIR.mkdir(parents=True, exist_ok=True)
GOLDEN_MASK_DIR.mkdir(parents=True, exist_ok=True)
print(f"ATTENZIONE: Assicurati di aver caricato le tue immagini e maschere corrette in:\n  - {GOLDEN_IMG_DIR}\n  - {GOLDEN_MASK_DIR}")

# --- 2. TROVA I CAMPIONI DALL'ELENCO MASCHERE (.png) ---
all_golden_masks = sorted(list(GOLDEN_MASK_DIR.glob("*.png")))
golden_stems = [p.stem for p in all_golden_masks]

if len(golden_stems) < 10:
    print(f"\nAVVISO: Trovati solo {len(golden_stems)} campioni. Userò tutti per il training.")
    train_stems = golden_stems
    val_stems = []
else:
    train_stems, val_stems = train_test_split(golden_stems, test_size=0.20, random_state=42)

print(f"\nSplit eseguito: {len(train_stems)} train, {len(val_stems)} val.")

# --- 3. PREPARA STRUTTURA CARTELLE PULITA ---
if FINETUNE_DATASET_DIR.exists():
    shutil.rmtree(FINETUNE_DATASET_DIR)
(FINETUNE_DATASET_DIR / "train" / "images").mkdir(parents=True)
(FINETUNE_DATASET_DIR / "train" / "masks").mkdir(parents=True)
(FINETUNE_DATASET_DIR / "val" / "images").mkdir(parents=True)
(FINETUNE_DATASET_DIR / "val" / "masks").mkdir(parents=True)

# --- 4. UTILS: copia rispettando estensioni (img .jpg o .png; mask .png) ---
def _first_existing(base: Path, exts):
    for ext in exts:
        p = base.with_suffix(ext)
        if p.exists():
            return p
    return None

def copy_golden_files(stems, subset):
    if not stems:
        return 0, 0, 0
    missing_img, missing_mask, copied = 0, 0, 0
    for stem in stems:
        # immagine: prova .jpg poi .png
        img_path = _first_existing(GOLDEN_IMG_DIR / stem, [".jpg", ".png", ".jpeg", ".JPG", ".PNG", ".JPEG"])
        if img_path is None:
            print(f"[WARN] Immagine mancante per '{stem}' (attese .jpg/.png). Salto.")
            missing_img += 1
            continue

        # maschera: deve essere .png
        mask_path = GOLDEN_MASK_DIR / f"{stem}.png"
        if not mask_path.exists():
            print(f"[WARN] Maschera .png mancante per '{stem}'. Salto.")
            missing_mask += 1
            continue

        # destinazioni (mantieni estensione originale immagine)
        dst_img = FINETUNE_DATASET_DIR / subset / "images" / img_path.name
        dst_mask = FINETUNE_DATASET_DIR / subset / "masks" / f"{stem}.png"

        shutil.copy(img_path, dst_img)
        shutil.copy(mask_path, dst_mask)
        copied += 1
    return copied, missing_img, missing_mask

copied_tr, miss_img_tr, miss_m_tr = copy_golden_files(train_stems, "train")
copied_va, miss_img_va, miss_m_va = copy_golden_files(val_stems, "val")

print(f"\n✅ Copiati: train={copied_tr}, val={copied_va}")
if miss_img_tr + miss_img_va > 0 or miss_m_tr + miss_m_va > 0:
    print(f"⚠️ Mancanti: immagini={miss_img_tr+miss_img_va}, maschere={miss_m_tr+miss_m_va}")

print(f"\n✅ Dati per il fine-tuning pronti in '{FINETUNE_DATASET_DIR}'")


In [None]:
# =============================================================================
#@title CELLA 5.5: CONTROLLO SANITÀ MASCHERE A COLORI
# =============================================================================
import cv2
import numpy as np
from pathlib import Path

print("--- Controllo Integrità Maschere a Colori nel Finetune Dataset ---")

FINETUNE_DATASET_DIR = Path("/content/content/drive/MyDrive/GoldenDataset_Clean/masks")
mask_dirs_to_check = [
    FINETUNE_DATASET_DIR / "train" / "masks",
    FINETUNE_DATASET_DIR / "val" / "masks"
]

# Definiamo i colori RGB attesi da CVAT
# NOTA: OpenCV legge in BGR, quindi li invertiamo per il confronto
EXPECTED_COLORS_BGR = {
    (0, 0, 0): 'background',
    (220, 220, 0): 'damage',      # Ciano in BGR
    (100, 100, 255): 'hole'       # Rosso/Rosa in BGR
}
EXPECTED_COLORS_BGR_TUPLES = list(EXPECTED_COLORS_BGR.keys())
print(f"Mi aspetto di trovare solo questi colori (in formato BGR): {EXPECTED_COLORS_BGR_TUPLES}")

errors_found = 0
for mask_dir in mask_dirs_to_check:
    if not mask_dir.exists(): continue
    print(f"\nAnalizzo la cartella: {mask_dir}...")

    for mask_path in mask_dir.glob("*.png"):
        # Carica come immagine a colori
        mask_color = cv2.imread(str(mask_path))

        # Trova i colori unici
        unique_colors = np.unique(mask_color.reshape(-1, mask_color.shape[2]), axis=0)

        # Controlla se ci sono colori inattesi
        unexpected_colors = []
        for color in unique_colors:
            if tuple(color) not in EXPECTED_COLORS_BGR_TUPLES:
                unexpected_colors.append(color)

        if unexpected_colors:
            print(f"  -> ❌ PROBLEMA: {mask_path.name} contiene colori INATTESI: {unexpected_colors}")
            errors_found += 1

if errors_found == 0:
    print("\n✅ OK! Tutte le maschere contengono i colori RGB attesi. Il problema è confermato.")
else:
    print(f"\n❗️ ATTENZIONE: {errors_found} file contengono colori strani oltre a quelli di CVAT.")

In [None]:
# =============================================================================
#@title INVESTIGATORE DI FILE MANCANTI
# =============================================================================
from pathlib import Path

print("--- [DIAGNOSI] Controllo corrispondenza tra immagini e maschere ---")

CLEAN_DATA_DIR = Path("/content/drive/MyDrive/GoldenDataset_Clean")
images_dir = CLEAN_DATA_DIR / "images"
masks_dir = CLEAN_DATA_DIR / "masks"

mask_stems = [p.stem for p in masks_dir.glob("*.png")]
image_stems = [p.stem for p in images_dir.glob("*.*")] # Prende sia .png che .jpg

missing_images = []
for m_stem in mask_stems:
    if m_stem not in image_stems:
        missing_images.append(m_stem)

if not missing_images:
    print("\n✅ OK! Tutte le maschere hanno un'immagine corrispondente.")
    print("   -> Il problema potrebbe essere un file corrotto.")
else:
    print(f"\n❌ ERRORE! Trovate {len(missing_images)} maschere senza un'immagine corrispondente:")
    for stem in missing_images:
        print(f"   -> Manca l'immagine per la maschera: {stem}.png")

print("\n--- Azione richiesta ---")
print("Vai nella cartella '/content/drive/MyDrive/GoldenDataset_Clean/images' e assicurati che esistano le immagini per le maschere elencate sopra. Controlla il nome e l'estensione del file.")

Avviamo un metodo di check zoom dei danni a ragnatela,ilmodello non riesce a prendere coraggio per segmentare il danno complesso, ritagliamo patch 128x128 che lo costringano a imparare mentre passano tutte dentro il train di fine tune insieme all'albumentation aggressivo

##CREAZIONESELEZIONEPATCHES128

In [None]:
# =============================================================================
# CELLA 6.1: GENERATORE DI PATCH v5.1 ("THE BLACKOUT ALGORITHM - CORRETTO")
# =============================================================================
import cv2
import numpy as np
from pathlib import Path
from tqdm.notebook import tqdm
import shutil
from sklearn.model_selection import train_test_split

print("--- [FASE di Pre-processing] Generazione Patch con Metodo 'Blackout' ---")

# --- 1. CONFIGURAZIONE ---
SOURCE_DATA_DIR = Path("/content/drive/MyDrive/GoldenDataset_Clean")
TEMP_SPLIT_DIR = Path("/content/temp_finetune_split")
PATCHES_OUTPUT_DIR = Path("/content/patches_dataset")
PATCH_SIZE = 128
DESIRED_PATCHES_PER_IMAGE = 50
BLACKOUT_MARGIN = 90

if TEMP_SPLIT_DIR.exists(): shutil.rmtree(TEMP_SPLIT_DIR)
if PATCHES_OUTPUT_DIR.exists(): shutil.rmtree(PATCHES_OUTPUT_DIR)

# --- 2. SPLIT (Non cambia) ---
all_stems = sorted([p.stem for p in (SOURCE_DATA_DIR / "masks").glob("*.png")])
train_stems, val_stems = train_test_split(all_stems, test_size=0.2, random_state=42)
def copy_files_for_split(stems, subset):
    (TEMP_SPLIT_DIR/subset/"images").mkdir(parents=True); (TEMP_SPLIT_DIR/subset/"masks").mkdir(parents=True)
    for stem in stems:
        img_path = next((SOURCE_DATA_DIR / "images").glob(f"{stem}.*")); shutil.copy(img_path, TEMP_SPLIT_DIR/subset/"images"/img_path.name)
        shutil.copy(SOURCE_DATA_DIR/"masks"/f"{stem}.png", TEMP_SPLIT_DIR/subset/"masks"/f"{stem}.png")
copy_files_for_split(train_stems, "train"); copy_files_for_split(val_stems, "val")
print(f"✅ Split temporaneo creato.")

# --- 3. CICLO DI ESTRAZIONE "BLACKOUT" (LOGICA CORRETTA) ---
for subset in ["train", "val"]:
    print(f"\nProcesso la subset: {subset}...")
    source_img_dir = TEMP_SPLIT_DIR / subset / "images"; source_mask_dir = TEMP_SPLIT_DIR / subset / "masks"
    target_img_dir = PATCHES_OUTPUT_DIR / subset / "images"; target_mask_dir = PATCHES_OUTPUT_DIR / subset / "masks"
    target_img_dir.mkdir(parents=True); target_mask_dir.mkdir(parents=True)
    stems = [p.stem for p in source_mask_dir.glob("*.png")]

    for stem in tqdm(stems, desc=f"Applicando blackout su '{subset}'"):
        img_path = next(source_img_dir.glob(f"{stem}.*")); mask_path = source_mask_dir / f"{stem}.png"
        img = cv2.imread(str(img_path), cv2.IMREAD_GRAYSCALE); mask = cv2.imread(str(mask_path), cv2.IMREAD_UNCHANGED)
        if img is None: continue

        #inversione valori maschere perchè Cvat ha sballato tutto

        hole_mask = np.uint8(mask == 2)

        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (BLACKOUT_MARGIN*2, BLACKOUT_MARGIN*2)) # Kernel circolare è meglio
        blackout_zone = cv2.dilate(hole_mask, kernel, iterations=1)

        damage_mask = np.uint8(mask == 1)

        # === LA CORREZIONE ===
        # Cancella fisicamente i pixel di danno che cadono nella zona di blackout.
        surviving_damage_mask = damage_mask.copy()
        surviving_damage_mask[blackout_zone > 0] = 0

        surviving_points_yx = np.argwhere(surviving_damage_mask > 0)

        if len(surviving_points_yx) == 0: continue

        num_to_sample = min(DESIRED_PATCHES_PER_IMAGE, len(surviving_points_yx))
        sampled_indices = np.random.choice(len(surviving_points_yx), size=num_to_sample, replace=False)

        for i, idx in enumerate(sampled_indices):
            y, x = surviving_points_yx[idx]
            # ... (il resto del codice da qui in poi è GIUSTO e non cambia) ...
            y_min = max(0, y - PATCH_SIZE // 2); x_min = max(0, x - PATCH_SIZE // 2)
            y_max = y_min + PATCH_SIZE; x_max = x_min + PATCH_SIZE

            if y_max > img.shape[0] or x_max > img.shape[1]: continue

            img_patch = img[y_min:y_max, x_min:x_max]
            mask_patch = mask[y_min:y_max, x_min:x_max]

            if img_patch.shape == (PATCH_SIZE, PATCH_SIZE):
                cv2.imwrite(str(target_img_dir / f"{stem}_patch_{i}.png"), img_patch)
                cv2.imwrite(str(target_mask_dir / f"{stem}_patch_{i}.png"), mask_patch)

print(f"\n✅ GENERAZIONE PATCH COMPLETATA.")

In [None]:
#Post selezione eliminando celle con foro bianco; vogliamo solo patches con danno ragnatela, sappiamo che il foro ha un valore di intensità elelvato si userà come scremaggio
#Valori di pixel unici nel campione del foro: [255]
#Valore di soglia per 'bianco accecante' identificato: 255

# usiamo 250 per scremare di più e essere sicuri
# =============================================================================
# CELLA 6.1.5: SCREMATURA AUTOMATICA PATCH TRAMITE ANALISI IMMAGINE
# =============================================================================
import cv2
import numpy as np
from pathlib import Path
from tqdm.notebook import tqdm
import os

print("--- [FASE di CURA AUTOMATICA] Scrematura delle patch contaminate dal foro ---")

# --- 1. PARAMETRI DI FILTRAGGIO ---
SOURCE_PATCHES_DIR = Path("/content/patches_dataset")

# Soglia di luminosità per considerare un pixel come "foro".
# Basato sull'analisi precedente, usiamo un valore leggermente inferiore a 255.
BRIGHTNESS_THRESHOLD = 250

# Percentuale massima di pixel "foro" consentita in una patch prima di essere scartata.
# L'1% è un buon punto di partenza: tollera piccoli artefatti ma scarta pezzi di foro.
MAX_HOLE_PIXEL_PERCENTAGE = 0.09

# --- 2. LOGICA DI SCANSIONE E FILTRAGGIO ---
patches_to_delete = []

for subset in ["train", "val"]:
    img_dir = SOURCE_PATCHES_DIR / subset / "images"
    if not img_dir.exists():
        print(f"La cartella {img_dir} non esiste. Salto.")
        continue

    for img_path in tqdm(list(img_dir.glob("*.png")), desc=f"Scansione '{subset}' images"):
        img = cv2.imread(str(img_path), cv2.IMREAD_GRAYSCALE)
        if img is None: continue

        # Conta i pixel che superano la soglia di luminosità
        bright_pixel_count = np.count_nonzero(img > BRIGHTNESS_THRESHOLD)
        total_pixels = img.size
        hole_percentage = (bright_pixel_count / total_pixels) * 100

        # Se la percentuale supera il limite, marca per l'eliminazione
        if hole_percentage > MAX_HOLE_PIXEL_PERCENTAGE:
            mask_path = img_path.parent.parent / "masks" / img_path.name
            patches_to_delete.append(img_path)
            patches_to_delete.append(mask_path)

# --- 3. REPORT E AZIONE ---
num_bad_patches = len(patches_to_delete) // 2
print(f"\n--- REPORT ---")
print(f"Identificate {num_bad_patches} patch da eliminare.")

if num_bad_patches > 0:
    print("Primi 10 file marcati per l'eliminazione:")
    for f in patches_to_delete[:10]: print(f"  - {f}")

    # === ESECUZIONE DELL'ELIMINAZIONE (ATTENZIONE: IRREVERSIBILE) ===
    # Rimuovi il commento dalla riga sottostante per eliminare i file

    for file_path in tqdm(patches_to_delete, desc="Eliminazione file"):
         if file_path.exists():
            os.remove(file_path)
            print("eliminazione patches con foro pixels bianchi")

    print("\nPer eseguire l'eliminazione, decommenta le righe nella sezione 'ESECUZIONE DELL'ELIMINAZIONE'.")
else:
    print("Nessuna patch ha superato la soglia di contaminazione. Il dataset è pulito.")


In [None]:
# =============================================================================
# CELLA 6.1.7: IL CURATORE UMANO E COLLETTORE DI "GEMME"
# =============================================================================
import shutil
from pathlib import Path
from tqdm.notebook import tqdm
#Serve a selezionare a mano le patches e prendere patch con corrispondent maschera dalla seleione fatta
#mettere il nome scelto nella lista per la selezione

print("--- [FASE di CURA MANUALE] Seleziono e impacchetto le patch migliori ---")

# --- 1. CONFIGURAZIONE ---
SOURCE_PATCHES_DIR = Path("/content/patches_dataset")
TEMP_CURATED_DIR = Path("/content/curated_patches_for_zip") # Directory temporanea
ZIP_OUTPUT_PATH = Path("/content/GoldenPatches_v1.zip") # Il nome del tuo file ZIP finale

if TEMP_CURATED_DIR.exists(): shutil.rmtree(TEMP_CURATED_DIR)

# --- 2. LA TUA LISTA DELLA SPESA ---
# Sfoglia le cartelle in /content/patches_dataset/train/images e /content/patches_dataset/val/images.
# Quando trovi una patch che ti piace, copia il suo nome (SENZA .png) e incollalo qui.
BEST_PATCH_STEMS = [
    "H546_h060_Scan5_patch_14","H549_h063_Scan5_patch_26","H551_h065_Scan5_patch_11","H554_h068_Scan5_patch_0","H554_h068_Scan5_patch_18","H554_h068_Scan5_patch_4",
    "H557_h071_Scan5_patch_0","H557_h071_Scan5_patch_4","H557_h071_Scan5_patch_5","H560_h074_Scan5_patch_17","H574_h002_Scan6_patch_11","H574_h002_Scan6_patch_20",
    "H574_h002_Scan6_patch_30","H580_h008_Scan6_patch_8","H583_h011_Scan6_patch_23","H584_h012_Scan6_patch_1","H585_h013_Scan6_patch_27","H587_h015_Scan6_patch_10",
    "H563_h077_Scan5_patch_45","H565_h079_Scan5_patch_47","H571_h085_Scan5_patch_0","H571_h085_Scan5_patch_1","H571_h085_Scan5_patch_10",
    "H592_h020_Scan6_patch_46"
]
print(f"Selezionati {len(BEST_PATCH_STEMS)} campioni d'oro per l'archiviazione.")

# --- 3. PESCA E COPIA I FILE SELEZIONATI ---
target_img_dir = TEMP_CURATED_DIR / "images"
target_mask_dir = TEMP_CURATED_DIR / "masks"
target_img_dir.mkdir(parents=True); target_mask_dir.mkdir(parents=True)

found_count = 0
for stem in tqdm(BEST_PATCH_STEMS, desc="Recuperando i file"):
    found = False
    for subset in ["train", "val"]:
        source_img_path = SOURCE_PATCHES_DIR / subset / "images" / f"{stem}.png"
        source_mask_path = SOURCE_PATCHES_DIR / subset / "masks" / f"{stem}.png"

        if source_img_path.exists():
            shutil.copy(source_img_path, target_img_dir / source_img_path.name)
            shutil.copy(source_mask_path, target_mask_dir / source_mask_path.name)
            found = True
            break # Trovato, passa al prossimo stelo
    if found:
        found_count += 1

print(f"\nRecuperati con successo {found_count} su {len(BEST_PATCH_STEMS)} file richiesti.")

# --- 4. CREA L'ARCHIVIO ZIP ---
if found_count > 0:
    shutil.make_archive(ZIP_OUTPUT_PATH.stem, 'zip', TEMP_CURATED_DIR)
    print(f"\n✅ ARCHIVIO CREATO: '{ZIP_OUTPUT_PATH}'")
    print("   Puoi scaricarlo dalla barra laterale o copiarlo su Google Drive con:")
    print(f'   !cp "{ZIP_OUTPUT_PATH}" "/content/drive/MyDrive/GoldenPatches/"')
else:
    print("\nNessun file recuperato. Controlla i nomi nella lista.")

##fineselezionecCELLEPATCHES128

##ADDESTRAMENTOIBRIDOCELLE+PATCHES FOVEALE

In [None]:
# =============================================================================
# CELLA 6.2: LA CELLA UNICA E DEFINITIVA DI TRAINING IBRIDO (v2 - ROBUSTA)
# =============================================================================
# --- [FASE 0] SETUP TOTALE ---
print("--- [FASE 0] Setup e import completi ---")
import torch, cv2, shutil, random, numpy as np, zipfile
import segmentation_models_pytorch as smp
from torch.utils.data import Dataset, DataLoader
from tqdm.notebook import tqdm
import albumentations as A
from albumentations.pytorch import ToTensorV2
from pathlib import Path
from sklearn.model_selection import train_test_split

# --- [FASE 1] CONFIGURAZIONE E PREPARAZIONE DATI ---
print("\n--- [FASE 1] Configurazione e preparazione dati ---")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# <<<<<<< PARAMETRI DELLO SMOKETEST / RUN FINALE >>>>>>>>>
FINETUNE_EPOCHS = 20 # Inizia con 2 per lo smoketest, poi aumenta a 20
FINETUNE_LR = 1e-5; FINETUNE_BATCH_SIZE = 4
ENCODER_NAME = "efficientnet-b0"

CKPT_DIR = Path("/content/drive/MyDrive/PesiUNETPP")
PRE_TRAINED_CHECKPOINT = CKPT_DIR / "best_model_pre-training.pth"
FINETUNED_CHECKPOINT_GDRIVE = CKPT_DIR / "best_model_finetuned.pth"
FINETUNED_CHECKPOINT_LOCAL  = Path("/content/best_model_finetuned_local.pth")
SOURCE_FULL_DATA_DIR = Path("/content/drive/MyDrive/GoldenDataset_Clean")
GOLDEN_PATCH_ZIP = Path("/content/drive/MyDrive/GoldenPatches/GoldenPatches_v1.zip")
FINETUNE_DATASET_DIR = Path("/content/finetune_full_images_split")
CURATED_PATCHES_DIR = Path("/content/curated_patches_unzipped")

if FINETUNE_DATASET_DIR.exists(): shutil.rmtree(FINETUNE_DATASET_DIR)
if CURATED_PATCHES_DIR.exists(): shutil.rmtree(CURATED_PATCHES_DIR)
with zipfile.ZipFile(GOLDEN_PATCH_ZIP, 'r') as zf: zf.extractall(CURATED_PATCHES_DIR)
print(f"✅ Patch d'oro estratte in '{CURATED_PATCHES_DIR}'")

all_full_stems = sorted([p.stem for p in (SOURCE_FULL_DATA_DIR / "masks").glob("*.png")])
train_stems, val_stems = train_test_split(all_full_stems, test_size=0.2, random_state=42)
def copy_files_for_split(stems, subset):
    # <<< FIX JPG/PNG: Trova l'immagine con qualsiasi estensione usando glob e next
    (FINETUNE_DATASET_DIR/subset/"images").mkdir(parents=True); (FINETUNE_DATASET_DIR/subset/"masks").mkdir(parents=True)
    for stem in stems:
        try:
            img_path = next((SOURCE_FULL_DATA_DIR / "images").glob(f"{stem}.*"))
            shutil.copy(img_path, FINETUNE_DATASET_DIR/subset/"images"/img_path.name)
            shutil.copy(SOURCE_FULL_DATA_DIR/"masks"/f"{stem}.png", FINETUNE_DATASET_DIR/subset/"masks"/f"{stem}.png")
        except StopIteration:
            print(f"ATTENZIONE: Immagine non trovata per la maschera '{stem}.png'. Viene saltata.")
copy_files_for_split(train_stems, "train"); copy_files_for_split(val_stems, "val")
print(f"✅ Immagini intere splittate: {len(train_stems)} train, {len(val_stems)} val.")

# --- [FASE 2] DATASET IBRIDO, MODELLO E TRAINING ---
print("\n--- [FASE 2] Avvio ciclo di fine-tuning IBRIDO ---")
class HybridDataset(Dataset):
    def __init__(self, full_img_dir, full_mask_dir, patch_img_dir, patch_mask_dir, transform):
        self.transform = transform
        self.full_mask_paths = sorted(list(full_mask_dir.glob("*.png")))
        self.patch_mask_paths = sorted(list(patch_mask_dir.glob("*.png")))
        # <<< FIX JPG/PNG: Usa la lista di maschere per trovare dinamicamente le immagini
        self.full_img_paths = [next(full_img_dir.glob(f"{p.stem}.*")) for p in self.full_mask_paths]
        self.patch_img_paths = [next(patch_img_dir.glob(f"{p.stem}.*")) for p in self.patch_mask_paths]
        self.total_size = len(self.full_img_paths) + len(self.patch_img_paths)
        print(f"Dataset ibrido caricato: {len(self.full_img_paths)} immagini intere + {len(self.patch_img_paths)} patch.")
    def __len__(self): return self.total_size
    def __getitem__(self, idx):
        if idx < len(self.full_img_paths): img_path, mask_path = self.full_img_paths[idx], self.full_mask_paths[idx]
        else: patch_idx = idx - len(self.full_img_paths); img_path, mask_path = self.patch_img_paths[patch_idx], self.patch_mask_paths[patch_idx]
        image = cv2.imread(str(img_path), cv2.IMREAD_GRAYSCALE); mask = cv2.imread(str(mask_path), cv2.IMREAD_UNCHANGED)
        augmented = self.transform(image=image, mask=mask); return augmented["image"], augmented["mask"].long()

transform = A.Compose([A.Resize(512, 512), A.HorizontalFlip(), A.VerticalFlip(), A.Normalize(mean=(0.5,), std=(0.5,)), ToTensorV2()])
train_ds = HybridDataset(FINETUNE_DATASET_DIR/"train"/"images", FINETUNE_DATASET_DIR/"train"/"masks", CURATED_PATCHES_DIR/"images", CURATED_PATCHES_DIR/"masks", transform)
val_ds = HybridDataset(FINETUNE_DATASET_DIR/"val"/"images", FINETUNE_DATASET_DIR/"val"/"masks", CURATED_PATCHES_DIR/"images", CURATED_PATCHES_DIR/"masks", transform)
train_loader = DataLoader(train_ds, batch_size=FINETUNE_BATCH_SIZE, shuffle=True, num_workers=2)
val_loader = DataLoader(val_ds, batch_size=FINETUNE_BATCH_SIZE, shuffle=False, num_workers=2)

model = smp.UnetPlusPlus(encoder_name=ENCODER_NAME, in_channels=1, classes=3).to(DEVICE)
model_to_load = FINETUNED_CHECKPOINT_GDRIVE if FINETUNED_CHECKPOINT_GDRIVE.exists() else PRE_TRAINED_CHECKPOINT
print(f"Caricamento pesi da: {model_to_load}"); model.load_state_dict(torch.load(model_to_load, map_location=DEVICE))
for p in model.parameters(): p.requires_grad = True
print("✅ Modello sbloccato."); loss_fn = smp.losses.DiceLoss(mode='multiclass', from_logits=True); optimizer = torch.optim.Adam(model.parameters(), lr=FINETUNE_LR)
scaler = torch.cuda.amp.GradScaler(enabled=(DEVICE=="cuda")); best_val_loss = float('inf')

for epoch in range(FINETUNE_EPOCHS):
    model.train()
    for images, masks in tqdm(train_loader, desc=f"Epoch {epoch+1} Train"):
        images, masks = images.to(DEVICE), masks.to(DEVICE); optimizer.zero_grad(set_to_none=True)
        with torch.cuda.amp.autocast(enabled=(DEVICE=="cuda")): outputs = model(images); loss = loss_fn(outputs, masks)
        scaler.scale(loss).backward(); scaler.step(optimizer); scaler.update()
    model.eval(); val_loss = 0.0
    with torch.no_grad():
        for images, masks in tqdm(val_loader, desc=f"Epoch {epoch+1} Val"):
            images, masks = images.to(DEVICE), masks.to(DEVICE)
            with torch.cuda.amp.autocast(enabled=(DEVICE=="cuda")): val_loss += loss_fn(model(images), masks).item()
    avg_val_loss = val_loss / max(1, len(val_loader)); print(f"Fine-Tuning Epoch {epoch+1}: Val Loss: {avg_val_loss:.4f}")
    if avg_val_loss < best_val_loss:
        print(f"🏆 Salvo (Val Loss: {avg_val_loss:.4f})..."); best_val_loss = avg_val_loss; torch.save(model.state_dict(), FINETUNED_CHECKPOINT_GDRIVE); torch.save(model.state_dict(), FINETUNED_CHECKPOINT_LOCAL)

print("\n🎉 Fine-Tuning Ibrido Terminato.")

In [None]:
# =============================================================================
# CELLA 7: ISPEZIONE VISIVA COMPARATIVA (AUTOCONTENUTA E CORRETTA)
# =============================================================================
import matplotlib.pyplot as plt
import torch
import cv2
import numpy as np
import segmentation_models_pytorch as smp
from pathlib import Path
from torch.utils.data import Dataset
import albumentations as A
from albumentations.pytorch import ToTensorV2


print("--- [FASE 3] Ispezione Visiva Comparativa ---")

# --- 1. DEFINIZIONE DELLA CLASSE DATASET (NECESSARIA IN UN RUNTIME PULITO) ---
# Questa è la classe semplice per caricare le immagini INTERE per la valutazione
class CleanDataset(Dataset):
    def __init__(self, subset_dir, transform=None):
        self.img_dir = subset_dir / "images"
        self.mask_dir = subset_dir / "masks"
        self.transform = transform
        self.stems = sorted([p.stem for p in self.mask_dir.glob("*.png")])
    def __len__(self): return len(self.stems)
    def __getitem__(self, idx):
        stem = self.stems[idx]
        try:
            img_path = next((self.img_dir).glob(f"{stem}.*")) # Robusto per JPG/PNG
        except StopIteration:
            raise FileNotFoundError(f"Immagine non trovata per la maschera: {stem}.png in {self.img_dir}")
        mask_path = self.mask_dir / f"{stem}.png"
        image = cv2.imread(str(img_path), cv2.IMREAD_GRAYSCALE)
        mask = cv2.imread(str(mask_path), cv2.IMREAD_UNCHANGED)
        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image, mask = augmented["image"], augmented["mask"]
        return image, mask.long()

# --- 2. CONFIGURAZIONE DEI MODELLI DA CONFRONTARE ---
# Usiamo i percorsi esatti che hai fornito.
MODEL_BEFORE_PATH = "/content/drive/MyDrive/PesiUNETPP/best_model_pre-training.pth"  # O il finetuned precedente, se preferisci
MODEL_AFTER_PATH = "/content/drive/MyDrive/PesiUNETPP/best_model_finetuned.pth"    # Il modello appena addestrato

# Ricrea le variabili necessarie se il runtime è stato riavviato
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
ENCODER_NAME = "efficientnet-b0"
FINETUNE_DATASET_DIR = Path("/content/finetune_full_images_split") # La cartella con le immagini intere splittate
transform = A.Compose([A.Resize(512, 512), A.Normalize(mean=(0.5,), std=(0.5,)), ToTensorV2()])


# --- 3. CARICAMENTO MODELLI ---
model_before = smp.UnetPlusPlus(encoder_name=ENCODER_NAME, in_channels=1, classes=3).to(DEVICE)
model_after = smp.UnetPlusPlus(encoder_name=ENCODER_NAME, in_channels=1, classes=3).to(DEVICE)
try:
    model_before.load_state_dict(torch.load(MODEL_BEFORE_PATH, map_location=DEVICE))
    model_after.load_state_dict(torch.load(MODEL_AFTER_PATH, map_location=DEVICE))
    model_before.eval(); model_after.eval()
    print("✅ Entrambi i modelli caricati con successo.")
except Exception as e:
    print(f"❌ ERRORE NEL CARICAMENTO DI UN MODELLO: {e}")
    raise e

# --- 4. CICLO DI VISUALIZZAZIONE ---
val_dataset_full = CleanDataset(FINETUNE_DATASET_DIR / "val", transform=transform)
print(f"Visualizzazione di {len(val_dataset_full)} campioni dal set di validazione...")

with torch.no_grad():
    for i in range(len(val_dataset_full)):
        image_tensor, mask_gt_tensor = val_dataset_full[i]
        input_tensor = image_tensor.unsqueeze(0).to(DEVICE)

        # Inferenza con entrambi i modelli
        pred_before = torch.argmax(model_before(input_tensor).squeeze(), dim=0).cpu().numpy()
        pred_after = torch.argmax(model_after(input_tensor).squeeze(), dim=0).cpu().numpy()

        image_vis = image_tensor.permute(1, 2, 0).numpy() * 0.5 + 0.5
        stem = val_dataset_full.stems[i]

        fig, axes = plt.subplots(1, 4, figsize=(28, 7))
        fig.suptitle(f'Confronto per: {stem}', fontsize=16)
        axes[0].imshow(image_vis, cmap='gray'); axes[0].set_title("Immagine Originale")
        axes[1].imshow(mask_gt_tensor, cmap='jet', vmin=0, vmax=2); axes[1].set_title("Ground Truth")
        axes[2].imshow(pred_before, cmap='jet', vmin=0, vmax=2); axes[2].set_title("PRE-IBRIDO")
        axes[3].imshow(pred_after, cmap='jet', vmin=0, vmax=2); axes[3].set_title("POST-IBRIDO (2 epoche)")
        for ax in axes: ax.axis('off')
        plt.show()

In [None]:
# =============================================================================
# CELLA DI CHECK:10 INFERENZA + METRICHE + VISUALIZZAZIONE (10 sample random)
# =============================================================================
import random, torch, numpy as np, cv2, matplotlib.pyplot as plt
import segmentation_models_pytorch as smp
from pathlib import Path

# --- CONFIGURAZIONE ---
SAMPLE_N      = 10
IMG_DIR       = FINETUNE_DATASET_DIR / "val" / "images"   # Cambia se vuoi
MASK_DIR      = FINETUNE_DATASET_DIR / "val" / "masks"    # idem; può non esistere
DEVICE        = "cuda" if torch.cuda.is_available() else "cpu"
CLASSES       = 3                                         # == model output channels

# --- TRANSFORM SOLO RESIZE+NORM (niente flip) ---
import albumentations as A
from albumentations.pytorch import ToTensorV2
val_tf = A.Compose([
    A.Resize(512, 512, interpolation=cv2.INTER_LINEAR),
    A.Normalize(mean=(0.5,), std=(0.5,)),
    ToTensorV2(),
])

# --- CAMPIONA FILE ---
all_images = sorted(list(IMG_DIR.glob("*.*")))
sampled    = random.sample(all_images, min(SAMPLE_N, len(all_images)))

# --- FUNZIONI METRICHE (smp.metrics) ---
def _metrics(pred, gt):
    # pred, gt: torch tensors [H,W] long
    tp, fp, fn, tn = smp.metrics.get_stats(pred.unsqueeze(0), gt.unsqueeze(0), mode='multiclass', num_classes=CLASSES)
    iou  = smp.metrics.iou_score(tp, fp, fn, tn, reduction="micro-imagewise")
    dice = smp.metrics.f1_score(tp, fp, fn, tn, reduction="micro-imagewise")
    return iou.item(), dice.item()

# --- MODELL0 ---
model = smp.UnetPlusPlus(encoder_name="efficientnet-b0", in_channels=1, classes=CLASSES)
model.load_state_dict(torch.load(FINETUNED_CHECKPOINT_GDRIVE, map_location=DEVICE))
model.eval().to(DEVICE)

ious, dices = [], []

# --- VISUAL SETUP ---
ncols = 3
nrows = len(sampled)          # NON la divisione per 3
plt.figure(figsize=(4*ncols, 4*nrows))

for idx, img_path in enumerate(sampled):
    stem = img_path.stem
    mask_path = MASK_DIR / f"{stem}.png"

    # -- LOAD & TF --
    img = cv2.imread(str(img_path), cv2.IMREAD_GRAYSCALE)
    tf_out = val_tf(image=img, mask=np.zeros_like(img))
    tensor_img = tf_out['image'].unsqueeze(0).to(DEVICE)

    # -- INFERENCE --
    with torch.no_grad(), torch.amp.autocast(device_type="cuda", enabled=(DEVICE=="cuda")):
        logits = model(tensor_img)
    pred = logits.argmax(dim=1).squeeze().cpu()        # [H,W] long

    # -- METRICS (se mask disponibile) --
    if mask_path.exists():
        gt = cv2.imread(str(mask_path), cv2.IMREAD_GRAYSCALE)
        gt_tf = val_tf(image=gt, mask=gt)
        gt_t  = torch.as_tensor(gt_tf['mask'], dtype=torch.long)
        iou, dice = _metrics(pred, gt_t)
        ious.append(iou); dices.append(dice)
        metric_str = f'IoU={iou:.3f}  Dice={dice:.3f}'
    else:
        metric_str = 'GT assente'

    # -- PLOT: original | GT | pred
    plt.subplot(nrows, ncols, idx*3 + 1)
    plt.imshow(img, cmap='gray'); plt.axis('off'); plt.title(f'{stem} : img')

    plt.subplot(nrows, ncols, idx*3 + 2)
    if mask_path.exists():
        plt.imshow(gt, cmap='viridis'); plt.axis('off'); plt.title('mask GT')
    else:
        plt.text(0.5, 0.5, 'No GT', ha='center', va='center'); plt.axis('off')

    plt.subplot(nrows, ncols, idx*3 + 3)
    plt.imshow(pred.numpy(), cmap='viridis'); plt.axis('off'); plt.title(metric_str)

plt.tight_layout(); plt.show()

# --- METRICHE AGGREGATE ---
if ious:
    print(f"\n=== METRICHE SU {len(ious)} IMMAGINI CON GT ===")
    print(f"mIoU  : {np.mean(ious):.4f}")
    print(f"mDice : {np.mean(dices):.4f}")
else:
    print("\nNessuna maschera ground-truth trovata: solo visualizzazione.")

In [None]:
# =============================================================================
# CELLA 7: LA PROVA DEFINITIVA - CONFRONTO VISIVO PRE vs POST TRAINING IBRIDO
# =============================================================================
# --- [FASE 0] SETUP COMPLETO E AUTOCONTENUTO ---
print("--- [FASE 0] Setup completo ---")
import torch, cv2, shutil, numpy as np
import segmentation_models_pytorch as smp
from torch.utils.data import Dataset, DataLoader
import matplotlib.pyplot as plt
from pathlib import Path
import albumentations as A
from albumentations.pytorch import ToTensorV2

# --- [FASE 1] CONFIGURAZIONE ---
print("\n--- [FASE 1] Configurazione del confronto ---")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
ENCODER_NAME = "efficientnet-b0"

# <<< I PERCORSI ESATTI CHE HAI FORNITO TU >>>
MODEL_PRE_FINETUNE_PATH = "/content/drive/MyDrive/PesiUNETPP/best_model_pre-training.pth"
MODEL_POST_FINETUNE_PATH = "/content/drive/MyDrive/PesiUNETPP/best_model_finetuned.pth" # Questo è l'ultimo salvato

# Le immagini INTERE "d'oro" su cui faremo il test
SOURCE_FULL_DATA_DIR = Path("/content/drive/MyDrive/GoldenDataset_Clean")
FINETUNE_DATASET_DIR = Path("/content/finetune_full_images_split") # Stessa cartella temporanea di prima

# --- [FASE 2] DEFINIZIONI NECESSARIE (per essere autocontenuti) ---
# Copiamo qui la classe Dataset e la funzione di split per non avere NameError
class CleanDataset(Dataset):
    def __init__(self, subset_dir, transform=None):
        self.img_dir = subset_dir / "images"; self.mask_dir = subset_dir / "masks"
        self.transform = transform
        self.mask_paths = sorted(list(self.mask_dir.glob("*.png")))
        self.stems = [p.stem for p in self.mask_paths]
        # Trova dinamicamente le immagini .jpg o .png
        self.img_paths = [next(self.img_dir.glob(f"{s}.*")) for s in self.stems]

    def __len__(self): return len(self.stems)
    def __getitem__(self, idx):
        image = cv2.imread(str(self.img_paths[idx]), cv2.IMREAD_GRAYSCALE)
        mask = cv2.imread(str(self.mask_paths[idx]), cv2.IMREAD_UNCHANGED)
        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image, mask = augmented["image"], augmented["mask"]
        return image, mask.long()

def copy_files_for_split(stems, subset, source_dir, target_dir):
    (target_dir/subset/"images").mkdir(parents=True, exist_ok=True); (target_dir/subset/"masks").mkdir(parents=True, exist_ok=True)
    for stem in stems:
        try:
            img_path = next((source_dir / "images").glob(f"{stem}.*"))
            shutil.copy(img_path, target_dir/subset/"images"/img_path.name)
            shutil.copy(source_dir/"masks"/f"{stem}.png", target_dir/subset/"masks"/f"{stem}.png")
        except StopIteration:
            print(f"ATTENZIONE: Immagine non trovata per la maschera '{stem}.png'. Viene saltata.")

# --- [FASE 3] PREPARAZIONE DATI E MODELLI ---
print("\n--- [FASE 3] Preparazione dati e caricamento modelli ---")
# Prepara il validation set su cui fare il confronto
all_stems = sorted([p.stem for p in (SOURCE_FULL_DATA_DIR / "masks").glob("*.png")])
_, val_stems = train_test_split(all_stems, test_size=0.2, random_state=42)
copy_files_for_split(val_stems, "val", SOURCE_FULL_DATA_DIR, FINETUNE_DATASET_DIR)
print(f"Creato validation set con {len(val_stems)} immagini.")

transform = A.Compose([A.Resize(512, 512), A.Normalize(mean=(0.5,), std=(0.5,)), ToTensorV2()])
val_dataset_full = CleanDataset(FINETUNE_DATASET_DIR / "val", transform=transform)

# Carica entrambi i modelli
model_before = smp.UnetPlusPlus(encoder_name=ENCODER_NAME, in_channels=1, classes=3).to(DEVICE)
model_after = smp.UnetPlusPlus(encoder_name=ENCODER_NAME, in_channels=1, classes=3).to(DEVICE)
model_before.load_state_dict(torch.load(MODEL_PRE_FINETUNE_PATH, map_location=DEVICE)); model_before.eval()
model_after.load_state_dict(torch.load(MODEL_POST_FINETUNE_PATH, map_location=DEVICE)); model_after.eval()
print("✅ Entrambi i modelli (PRE e POST fine-tuning) caricati.")

# --- [FASE 4] ESECUZIONE DEL CONFRONTO VISIVO ---
print("\n--- [FASE 4] Generazione del confronto visivo ---")
with torch.no_grad():
    for i in range(len(val_dataset_full)):
        image_tensor, mask_gt_tensor = val_dataset_full[i]
        input_tensor = image_tensor.unsqueeze(0).to(DEVICE)

        output_before = model_before(input_tensor); pred_before = torch.argmax(output_before.squeeze(), dim=0).cpu().numpy()
        output_after = model_after(input_tensor); pred_after = torch.argmax(output_after.squeeze(), dim=0).cpu().numpy()

        image_vis = image_tensor.permute(1, 2, 0).numpy() * 0.5 + 0.5; stem = val_dataset_full.stems[i]

        fig, axes = plt.subplots(1, 4, figsize=(28, 7))
        fig.suptitle(f'Confronto per: {stem}', fontsize=16)
        axes[0].imshow(image_vis, cmap='gray'); axes[0].set_title("Immagine Originale")
        axes[1].imshow(mask_gt_tensor, cmap='jet', vmin=0, vmax=2); axes[1].set_title("Ground Truth")
        axes[2].imshow(pred_before, cmap='jet', vmin=0, vmax=2); axes[2].set_title("PRE-TRAINING")
        axes[3].imshow(pred_after, cmap='jet', vmin=0, vmax=2); axes[3].set_title("POST-IBRIDO")
        for ax in axes: ax.axis('off')
        plt.show()

# =============================================================================
# CELLA A: SINCRONIZZAZIONE AUTOMATICA DA CVAT
# =============================================================================
import os
import zipfile
import shutil
from pathlib import Path

print("--- Sincronizzazione Annotazioni da CVAT ---")

# --- 1. CONFIGURAZIONE ---
CVAT_TASK_ID = "YOUR_TASK_ID"  # <<< METTI QUI L'ID NUMERICO DEL TUO TASK CVAT
ANNOTATIONS_ZIP_PATH = Path(f"/content/cvat_annotations_task_{CVAT_TASK_ID}.zip")
EXPORT_FORMAT = "Segmentation Mask 1.1"

# La cartella di GDrive dove le maschere corrette verranno copiate
GOLDEN_MASK_DIR_GDRIVE = Path("/content/drive/MyDrive/GoldenDataset/masks")
GOLDEN_MASK_DIR_GDRIVE.mkdir(parents=True, exist_ok=True)

# --- 2. INSTALLA E CONFIGURA CVAT-CLI (se non già fatto) ---
# !pip install -q cvat-cli
# Se è la prima volta, esegui questa riga e inserisci le credenziali:
# !cvat-cli core --auth

# --- 3. ESPORTA E SCARICA LE ANNOTAZIONI ---
print(f"Esportando e scaricando annotazioni dal task CVAT ID: {CVAT_TASK_ID}...")
# Il comando 'dump' esporta e scarica in un solo colpo
os.system(f'cvat-cli tasks dump --format "{EXPORT_FORMAT}" {CVAT_TASK_ID} "{ANNOTATIONS_ZIP_PATH}"')

if not ANNOTATIONS_ZIP_PATH.exists():
    print("❌ DOWNLOAD FALLITO. Controlla il tuo TASK_ID e le credenziali CVAT.")
else:
    print("✅ Annotazioni scaricate con successo.")

    # --- 4. ESTRAI E COPIA LE MASCHERE IN GDRIVE ---
    temp_extract_dir = Path("/content/temp_cvat_extract")
    if temp_extract_dir.exists(): shutil.rmtree(temp_extract_dir)
    
    with zipfile.ZipFile(ANNOTATIONS_ZIP_PATH, 'r') as zip_ref:
        zip_ref.extractall(temp_extract_dir)

    # Il formato Segmentation Mask 1.1 crea questa sottocartella
    source_masks_dir = temp_extract_dir / "SegmentationClass"
    if source_masks_dir.exists():
        new_masks_copied = 0
        for mask_file in source_masks_dir.glob("*.png"):
            # Copia il file in GDrive
            shutil.copy(mask_file, GOLDEN_MASK_DIR_GDRIVE / mask_file.name)
            new_masks_copied += 1
        print(f"✅ Copiate {new_masks_copied} nuove maschere in '{GOLDEN_MASK_DIR_GDRIVE}'")
    else:
        print("❌ Cartella 'SegmentationClass' non trovata nello ZIP. Controlla il formato di esportazione.")

    # Pulisci i file temporanei
    ANNOTATIONS_ZIP_PATH.unlink()
    shutil.rmtree(temp_extract_dir)