In [None]:
!pip install segmentation-models-pytorch timm

# Addestramento modello

In [None]:
import os
import cv2
import json
import numpy as np
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset, DataLoader
import segmentation_models_pytorch as smp
from tqdm import tqdm
from torch.cuda.amp import GradScaler, autocast
from pycocotools.coco import COCO


CONFIG = {
    "IMG_SIZE": 704,
    "BATCH_SIZE": 4,
    "EPOCHS": 20,
    "LR": 3e-4,
    "DEVICE": "cuda" if torch.cuda.is_available() else "cpu",
    "BACKBONE": "timm-efficientnet-b4",
    "ENCODER_WEIGHTS": "imagenet",
    "TRAIN_JSON": "/kaggle/input/computervisiondataset/progettoComputerVision/train/train.json",
    "TRAIN_DIR": "/kaggle/input/computervisiondataset/progettoComputerVision/train"
}


class CableTrainDataset(Dataset):
    def __init__(self, coco, img_dir, transform=None):
        self.coco = coco
        self.img_dir = img_dir
        self.ids = list(sorted(coco.imgs.keys()))
        self.transform = transform

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, index):
        img_id = self.ids[index]
        img_info = self.coco.loadImgs(img_id)[0]
        path = os.path.join(self.img_dir, img_info['file_name'])

        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)


        mask = np.zeros((img_info['height'], img_info['width']), dtype=np.float32)
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)

        for ann in anns:
            if 'segmentation' in ann:
                for seg in ann['segmentation']:
                    poly = np.array(seg).reshape((-1, 2)).astype(np.int32)
                    cv2.fillPoly(mask, [poly], 1.0)

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask.unsqueeze(0)

#AUGMENTATION
def get_train_transforms():
    return A.Compose([
        A.PadIfNeeded(min_height=CONFIG['IMG_SIZE'], min_width=CONFIG['IMG_SIZE'], border_mode=cv2.BORDER_CONSTANT, value=0),
        A.RandomCrop(height=CONFIG['IMG_SIZE'], width=CONFIG['IMG_SIZE']),

        #Geometriche
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.RandomRotate90(p=0.5),
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=15, p=0.5),

        #Colore e luce
        A.RandomBrightnessContrast(p=0.5),
        A.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1, p=0.3),

        A.Normalize(),
        ToTensorV2(),
    ])

#loop
def train_model():
    print(f"--- TRAINING START: {CONFIG['BACKBONE']} ---")

    coco = COCO(CONFIG['TRAIN_JSON'])
    ds = CableTrainDataset(coco, CONFIG['TRAIN_DIR'], transform=get_train_transforms())
    loader = DataLoader(ds, batch_size=CONFIG['BATCH_SIZE'], shuffle=True, num_workers=2, drop_last=True)

    model = smp.UnetPlusPlus(
        encoder_name=CONFIG['BACKBONE'],
        encoder_weights=CONFIG['ENCODER_WEIGHTS'],
        in_channels=3,
        classes=1
    ).to(CONFIG['DEVICE'])

    #Loss Combinata= BCE + Dice
    loss_bce = torch.nn.BCEWithLogitsLoss()
    loss_dice = smp.losses.DiceLoss(mode='binary', from_logits=True)

    optimizer = torch.optim.AdamW(model.parameters(), lr=CONFIG['LR'], weight_decay=1e-4)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CONFIG['EPOCHS'], eta_min=1e-6)
    scaler = GradScaler()

    best_loss = float('inf')

    for epoch in range(CONFIG['EPOCHS']):
        model.train()
        epoch_loss = 0
        pbar = tqdm(loader, desc=f"Epoch {epoch+1}/{CONFIG['EPOCHS']}")

        for img, mask in pbar:
            img, mask = img.to(CONFIG['DEVICE']), mask.to(CONFIG['DEVICE'])

            with autocast():
                out = model(img)
                l1 = loss_bce(out, mask)
                l2 = loss_dice(out, mask)
                loss = 0.5 * l1 + 0.5 * l2

            optimizer.zero_grad()
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            epoch_loss += loss.item()
            pbar.set_postfix(loss=loss.item())

        scheduler.step()
        avg_loss = epoch_loss / len(loader)
        print(f"Epoch {epoch+1} Avg Loss: {avg_loss:.4f}")

        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), "best_cable_model.pth")
            print(">>> Modello Salvato! <<<")

if __name__ == "__main__":
    train_model()

# Inferenza

In [None]:
import os
import cv2
import json
import torch
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import DataLoader, Dataset
import segmentation_models_pytorch as smp
from pycocotools import mask as coco_mask
from tqdm import tqdm


CONFIG_INF = {
    "IMG_SIZE": 704,
    "DEVICE": "cuda" if torch.cuda.is_available() else "cpu",
    "BACKBONE": "timm-efficientnet-b4",
    "MODEL_PATH": "/kaggle/input/modello7g/tensorflow2/default/1/best_cable_model7Gennaio.pth",
    "TEST_JSON": "/kaggle/input/computervisiondataset/progettoComputerVision/test/test.json",
    "TEST_DIR": "/kaggle/input/computervisiondataset/progettoComputerVision/test",
    "OUTPUT_FILE": "matricola.json",


    "CONF_THRESHOLD": 0.8,
    "MIN_AREA": 150,      #Rimuove rumore
    "MORPH_KERNEL": 5    #Closing leggero(5x5)per unire pixel vicini
}


def get_line_params(mask_component):
    y_idxs, x_idxs = np.where(mask_component > 0)
    pts = np.stack((x_idxs, y_idxs), axis=1).astype(np.float32)
    if len(pts) < 10: return None, None

    [vx, vy, x0, y0] = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01)
    theta = np.arctan2(vy, vx) + np.pi/2
    rho = x0 * np.cos(theta) + y0 * np.sin(theta)

    rho, theta = float(rho), float(theta)
    if rho < 0: rho, theta = -rho, theta - np.pi
    theta = theta % (2 * np.pi)
    return rho, theta

def rle_encode(mask):
    pixels = np.asfortranarray(mask.astype(np.uint8))
    encoded = coco_mask.encode(pixels)
    encoded['counts'] = encoded['counts'].decode('utf-8')
    return encoded

class CableTestDataset(Dataset):
    def __init__(self, coco, img_dir):
        self.coco = coco
        self.img_dir = img_dir
        self.ids = list(sorted(coco.imgs.keys()))
        self.transform = A.Compose([
            A.PadIfNeeded(min_height=CONFIG_INF['IMG_SIZE'], min_width=CONFIG_INF['IMG_SIZE'], border_mode=cv2.BORDER_CONSTANT, value=0),
            A.Normalize(),
            ToTensorV2(),
        ])
    def __len__(self): return len(self.ids)
    def __getitem__(self, index):
        img_id = self.ids[index]
        img_info = self.coco.loadImgs(img_id)[0]
        path = os.path.join(self.img_dir, img_info['file_name'])
        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        orig_h, orig_w = image.shape[:2]
        augmented = self.transform(image=image)
        return augmented['image'], img_id, orig_h, orig_w

def make_submission():
    model = smp.UnetPlusPlus(encoder_name=CONFIG_INF['BACKBONE'], in_channels=3, classes=1)


    try:
        ckpt = torch.load(CONFIG_INF['MODEL_PATH'], map_location=CONFIG_INF['DEVICE'])
        if 'state_dict' in ckpt: model.load_state_dict(ckpt['state_dict'])
        else: model.load_state_dict(ckpt, strict=False)
    except:
        print("ERRORE: Impossibile caricare il modello. Verifica il file.")
        return

    model.to(CONFIG_INF['DEVICE'])
    model.eval()

    from pycocotools.coco import COCO
    coco_test = COCO(CONFIG_INF['TEST_JSON'])
    loader = DataLoader(CableTestDataset(coco_test, CONFIG_INF['TEST_DIR']), batch_size=1, shuffle=False, num_workers=2)

    results = []
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (CONFIG_INF['MORPH_KERNEL'], CONFIG_INF['MORPH_KERNEL']))

    with torch.no_grad():
        for img, img_id, h, w in tqdm(loader):
            img = img.to(CONFIG_INF['DEVICE'])
            img_id, orig_h, orig_w = int(img_id.item()), int(h.item()), int(w.item())

            # TTA: Solo Flip Orizzontale
            p1 = torch.sigmoid(model(img))
            p2 = torch.flip(torch.sigmoid(model(torch.flip(img, [3]))), [3])
            pred_mask = (p1 + p2) / 2.0
            pred_mask = pred_mask.squeeze().cpu().numpy()

            mask_bin = (pred_mask > CONFIG_INF['CONF_THRESHOLD']).astype(np.uint8)
            # Crop
            pad_h = (CONFIG_INF['IMG_SIZE'] - orig_h) // 2
            pad_w = (CONFIG_INF['IMG_SIZE'] - orig_w) // 2
            if pad_h > 0 or pad_w > 0:
                mask_bin = mask_bin[pad_h:pad_h+orig_h, pad_w:pad_w+orig_w]
                pred_mask = pred_mask[pad_h:pad_h+orig_h, pad_w:pad_w+orig_w]

            # Morphologia leggera
            mask_bin = cv2.morphologyEx(mask_bin, cv2.MORPH_CLOSE, kernel)

            # Estrazione
            num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(mask_bin, connectivity=8)

            for i in range(1, num_labels):
                area = stats[i, cv2.CC_STAT_AREA]
                if area < CONFIG_INF['MIN_AREA']: continue

                inst_mask = (labels == i).astype(np.uint8)

                vals = pred_mask[inst_mask > 0]
                score = float(np.mean(vals)) if len(vals) > 0 else 0.0

                #BBox
                x, y, bw, bh = stats[i, cv2.CC_STAT_LEFT], stats[i, cv2.CC_STAT_TOP], stats[i, cv2.CC_STAT_WIDTH], stats[i, cv2.CC_STAT_HEIGHT]

                #Linee
                rho, theta = get_line_params(inst_mask)
                if rho is None: continue

                res = {
                    "image_id": img_id,
                    "category_id": 0,
                    "bbox": [float(x), float(y), float(bw), float(bh)],
                    "segmentation": rle_encode(inst_mask),
                    "score": score,
                    "lines": [rho, theta],
                    "area": float(area),
                    "height": orig_h,
                    "width": orig_w,
                    "id": len(results) + 1
                }
                results.append(res)

    with open(CONFIG_INF['OUTPUT_FILE'], 'w') as f:
        json.dump(results, f)

if __name__ == "__main__":
    make_submission()

# Fine Tuning 1

In [None]:
import os
import cv2
import json
import numpy as np
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset, DataLoader
import segmentation_models_pytorch as smp
from tqdm import tqdm
from torch.cuda.amp import GradScaler, autocast
from pycocotools.coco import COCO


CONFIG = {
    "IMG_SIZE": 732,             #proviamo ad aumentare la size
    "BATCH_SIZE": 2,             #per evitare che cuscha
    "ACCUMULATE_GRAD": 2,        #per simula un batch size di 4
    "EPOCHS": 15,
    "LR": 5e-5,                  #basso per non distruggere i pesi esistenti
    "DEVICE": "cuda" if torch.cuda.is_available() else "cpu",
    "BACKBONE": "timm-efficientnet-b4",

    "PRETRAINED_PATH": "/kaggle/input/modello7g/tensorflow2/default/1/best_cable_model.pth",
    "TRAIN_JSON": "/kaggle/input/computervisiondataset/progettoComputerVision/train/train.json",
    "TRAIN_DIR": "/kaggle/input/computervisiondataset/progettoComputerVision/train"
}


class CableTrainDataset(Dataset):
    def __init__(self, coco, img_dir, transform=None):
        self.coco = coco
        self.img_dir = img_dir
        self.ids = list(sorted(coco.imgs.keys()))
        self.transform = transform

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, index):
        img_id = self.ids[index]
        img_info = self.coco.loadImgs(img_id)[0]
        path = os.path.join(self.img_dir, img_info['file_name'])
        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        mask = np.zeros((img_info['height'], img_info['width']), dtype=np.float32)
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)
        for ann in anns:
            if 'segmentation' in ann:
                for seg in ann['segmentation']:
                    poly = np.array(seg).reshape((-1, 2)).astype(np.int32)
                    cv2.fillPoly(mask, [poly], 1.0)

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask.unsqueeze(0)


#aggiungiamo distorsioni per aiutare il modello a generalizzare le forme curve
def get_train_transforms():
    return A.Compose([
        A.PadIfNeeded(min_height=CONFIG['IMG_SIZE'], min_width=CONFIG['IMG_SIZE'], border_mode=cv2.BORDER_CONSTANT, value=0),
        A.RandomCrop(height=CONFIG['IMG_SIZE'], width=CONFIG['IMG_SIZE']),

        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.RandomRotate90(p=0.5),

        #distorsioni geometriche (Simula cavi piegati male)
        A.GridDistortion(num_steps=5, distort_limit=0.3, p=0.4),
        A.OpticalDistortion(distort_limit=0.1, shift_limit=0.1, p=0.3),

        A.RandomBrightnessContrast(p=0.5),
        A.HueSaturationValue(hue_shift_limit=10, sat_shift_limit=20, val_shift_limit=10, p=0.3),

        A.Normalize(),
        ToTensorV2(),
    ])


def train_finetune():
    print(f"FINE-TUNING START: {CONFIG['BACKBONE']} @ {CONFIG['IMG_SIZE']}px ---")

    coco = COCO(CONFIG['TRAIN_JSON'])
    ds = CableTrainDataset(coco, CONFIG['TRAIN_DIR'], transform=get_train_transforms())
    loader = DataLoader(ds, batch_size=CONFIG['BATCH_SIZE'], shuffle=True, num_workers=2, drop_last=True)

    model = smp.UnetPlusPlus(
        encoder_name=CONFIG['BACKBONE'],
        encoder_weights=None,
        in_channels=3,
        classes=1
    ).to(CONFIG['DEVICE'])


    print("Caricamento pesi pre-addestrati...")
    try:
        checkpoint = torch.load(CONFIG['PRETRAINED_PATH'], map_location=CONFIG['DEVICE'])
        if 'state_dict' in checkpoint:
            model.load_state_dict(checkpoint['state_dict'])
        else:
            model.load_state_dict(checkpoint)
        print("Pesi caricati con successo!")
    except Exception as e:
        print(f"ATTENZIONE: Impossibile caricare i pesi ({e}).")
        return


    loss_tversky = smp.losses.TverskyLoss(mode='binary', alpha=0.7, beta=0.3, log_loss=True)
    loss_focal = smp.losses.FocalLoss(mode='binary', alpha=0.25, gamma=2)

    optimizer = torch.optim.AdamW(model.parameters(), lr=CONFIG['LR'], weight_decay=1e-3)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CONFIG['EPOCHS'], eta_min=1e-7)
    scaler = GradScaler()

    best_loss = float('inf')

    for epoch in range(CONFIG['EPOCHS']):
        model.train()
        epoch_loss = 0
        pbar = tqdm(loader, desc=f"Fine-Tune {epoch+1}/{CONFIG['EPOCHS']}")

        optimizer.zero_grad()

        for i, (img, mask) in enumerate(pbar):
            img, mask = img.to(CONFIG['DEVICE']), mask.to(CONFIG['DEVICE'])

            with autocast():
                out = model(img)
                l1 = loss_tversky(out, mask)
                l2 = loss_focal(out, mask)
                loss = (0.7 * l1 + 0.3 * l2) / CONFIG['ACCUMULATE_GRAD']

            scaler.scale(loss).backward()

            if (i + 1) % CONFIG['ACCUMULATE_GRAD'] == 0:
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()

            epoch_loss += loss.item() * CONFIG['ACCUMULATE_GRAD']
            pbar.set_postfix(loss=loss.item() * CONFIG['ACCUMULATE_GRAD'])

        scheduler.step()
        avg_loss = epoch_loss / len(loader)
        print(f"Epoch {epoch+1} Avg Loss: {avg_loss:.4f}")

        torch.save(model.state_dict(), "best_cable_finetuned.pth")
        if avg_loss < best_loss:
            best_loss = avg_loss
            torch.save(model.state_dict(), "best_cable_finetuned_best.pth")
            print(">>> Best Loss! Model Saved <<<")

if __name__ == "__main__":
    train_finetune()

# Post Processing

In [None]:
import os
import cv2
import json
import torch
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import DataLoader, Dataset
import segmentation_models_pytorch as smp
from pycocotools import mask as coco_mask
from tqdm import tqdm
from skimage.morphology import skeletonize


CONFIG = {
    "IMG_SIZE": 732,
    "DEVICE": "cuda" if torch.cuda.is_available() else "cpu",
    "BACKBONE": "timm-efficientnet-b4",
    "MODEL_PATH": "/kaggle/working/best_cable_finetuned_best.pth",
    "TEST_JSON": "/kaggle/input/computervisiondataset/progettoComputerVision/test/test.json",
    "TEST_DIR": "/kaggle/input/computervisiondataset/progettoComputerVision/test",
    "OUTPUT_FILE": "submission_final_tta.json",


    "CONF_THRESHOLD": 0.8,
    "MIN_AREA": 100,
    "MORPH_KERNEL_SIZE": 5
}

class CableTestDataset(Dataset):
    def __init__(self, coco, img_dir):
        self.coco = coco
        self.img_dir = img_dir
        self.ids = list(sorted(coco.imgs.keys()))
        self.transform = A.Compose([
            A.PadIfNeeded(min_height=CONFIG['IMG_SIZE'], min_width=CONFIG['IMG_SIZE'], border_mode=cv2.BORDER_CONSTANT, value=0),
            A.Normalize(),
            ToTensorV2(),
        ])

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, index):
        img_id = self.ids[index]
        img_info = self.coco.loadImgs(img_id)[0]
        path = os.path.join(self.img_dir, img_info['file_name'])
        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        orig_h, orig_w = image.shape[:2]
        augmented = self.transform(image=image)
        return augmented['image'], img_id, orig_h, orig_w

def rle_encode(mask):
    pixels = np.asfortranarray(mask.astype(np.uint8))
    encoded = coco_mask.encode(pixels)
    encoded['counts'] = encoded['counts'].decode('utf-8')
    return encoded

def get_line_params_skeleton(mask_component):
    skel = skeletonize(mask_component > 0).astype(np.uint8)
    y_idxs, x_idxs = np.where(skel > 0)
    pts = np.stack((x_idxs, y_idxs), axis=1).astype(np.float32)

    if len(pts) < 10: # Fallback su mask intera se scheletro rotto
        y_idxs, x_idxs = np.where(mask_component > 0)
        pts = np.stack((x_idxs, y_idxs), axis=1).astype(np.float32)

    if len(pts) < 10: return None, None

    [vx, vy, x0, y0] = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01)
    theta = np.arctan2(vy, vx) + np.pi/2
    rho = x0 * np.cos(theta) + y0 * np.sin(theta)

    rho, theta = float(rho), float(theta)
    if rho < 0: rho, theta = -rho, theta - np.pi
    theta = theta % (2 * np.pi)
    return rho, theta

def run_inference():
    print(f"--- INFERENZA HEAVY TTA (Threshold: {CONFIG['CONF_THRESHOLD']}) ---")
    model = smp.UnetPlusPlus(encoder_name=CONFIG['BACKBONE'], in_channels=3, classes=1)

    try:
        ckpt = torch.load(CONFIG['MODEL_PATH'], map_location=CONFIG['DEVICE'])
        if 'state_dict' in ckpt: model.load_state_dict(ckpt['state_dict'])
        else: model.load_state_dict(ckpt)
    except:
        print("Errore caricamento modello.")
        return

    model.to(CONFIG['DEVICE'])
    model.eval()

    from pycocotools.coco import COCO
    coco_test = COCO(CONFIG['TEST_JSON'])
    loader = DataLoader(CableTestDataset(coco_test, CONFIG['TEST_DIR']), batch_size=1, shuffle=False, num_workers=2)
    results = []

    morph_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (CONFIG['MORPH_KERNEL_SIZE'], CONFIG['MORPH_KERNEL_SIZE']))

    with torch.no_grad():
        for img, img_id, h, w in tqdm(loader):
            img = img.to(CONFIG['DEVICE'])
            img_id_val, orig_h, orig_w = int(img_id.item()), int(h.item()), int(w.item())

            #  HEAVY TTA
            # 1. Originale
            p1 = torch.sigmoid(model(img))
            # 2. Horizontal Flip
            p2 = torch.flip(torch.sigmoid(model(torch.flip(img, [3]))), [3])
            # 3. Vertical Flip
            p3 = torch.flip(torch.sigmoid(model(torch.flip(img, [2]))), [2])
            # 4. Horizontal + Vertical Flip (Rotazione 180)
            p4 = torch.flip(torch.sigmoid(model(torch.flip(img, [2, 3]))), [2, 3])
            # Media delle 4 predizioni
            pred_mask = (p1 + p2 + p3 + p4) / 4.0
            pred_mask = pred_mask.squeeze().cpu().numpy()

            # Crop
            pad_h = (CONFIG['IMG_SIZE'] - orig_h) // 2
            pad_w = (CONFIG['IMG_SIZE'] - orig_w) // 2
            if pad_h > 0: pred_mask = pred_mask[pad_h:pad_h+orig_h, :]
            if pad_w > 0: pred_mask = pred_mask[:, pad_w:pad_w+orig_w]

            if pred_mask.shape != (orig_h, orig_w):
                pred_mask = cv2.resize(pred_mask, (orig_w, orig_h), interpolation=cv2.INTER_CUBIC)

            # Soglia Aggressiva
            mask_bin = (pred_mask > CONFIG['CONF_THRESHOLD']).astype(np.uint8)

            # Closing per unire segmenti spezzati
            mask_bin = cv2.morphologyEx(mask_bin, cv2.MORPH_CLOSE, morph_kernel)

            # Estrazione Componenti
            num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(mask_bin, connectivity=8)

            for i in range(1, num_labels):
                area = stats[i, cv2.CC_STAT_AREA]
                if area < CONFIG['MIN_AREA']: continue

                inst_mask = (labels == i).astype(np.uint8)

                score = float(np.mean(pred_mask[inst_mask > 0]))

                #BBox
                x, y, bw, bh = stats[i, cv2.CC_STAT_LEFT], stats[i, cv2.CC_STAT_TOP], stats[i, cv2.CC_STAT_WIDTH], stats[i, cv2.CC_STAT_HEIGHT]

                #linee
                rho, theta = get_line_params_skeleton(inst_mask)
                if rho is None: continue

                res = {
                    "image_id": img_id_val,
                    "category_id": 0,
                    "bbox": [float(x), float(y), float(bw), float(bh)],
                    "segmentation": rle_encode(inst_mask),
                    "score": score,
                    "lines": [rho, theta],
                    "area": float(area),
                    "height": orig_h,
                    "width": orig_w,
                    "id": len(results) + 1
                }
                results.append(res)

    with open(CONFIG['OUTPUT_FILE'], 'w') as f:
        json.dump(results, f)
    print("Salvataggio completato.")

if __name__ == "__main__":
    run_inference()

# Fine Tuning 2

In [None]:
import os
import cv2
import json
import numpy as np
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import Dataset, DataLoader
import segmentation_models_pytorch as smp
from tqdm import tqdm
from torch.cuda.amp import GradScaler, autocast
from pycocotools.coco import COCO

CONFIG = {
    "IMG_SIZE": 704,
    "BATCH_SIZE": 4,
    "EPOCHS": 15,
    "LR": 5e-5,
    "DEVICE": "cuda" if torch.cuda.is_available() else "cpu",
    "BACKBONE": "timm-efficientnet-b4",

    "PRETRAINED_PATH": "/kaggle/working/best_cable_finetuned_best.pth",

    "TRAIN_JSON": "/kaggle/input/computervisiondataset/progettoComputerVision/train/train.json",
    "TRAIN_DIR": "/kaggle/input/computervisiondataset/progettoComputerVision/train",
    "SAVE_DIR": "./finetuned_checkpoints"
}

if not os.path.exists(CONFIG['SAVE_DIR']):
    os.makedirs(CONFIG['SAVE_DIR'])

class CableTrainDataset(Dataset):
    def __init__(self, coco, img_dir, transform=None):
        self.coco = coco
        self.img_dir = img_dir
        self.ids = list(sorted(coco.imgs.keys()))
        self.transform = transform

    def __len__(self):
        return len(self.ids)

    def __getitem__(self, index):
        img_id = self.ids[index]
        img_info = self.coco.loadImgs(img_id)[0]
        path = os.path.join(self.img_dir, img_info['file_name'])
        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        mask = np.zeros((img_info['height'], img_info['width']), dtype=np.float32)
        ann_ids = self.coco.getAnnIds(imgIds=img_id)
        anns = self.coco.loadAnns(ann_ids)
        for ann in anns:
            if 'segmentation' in ann:
                for seg in ann['segmentation']:
                    poly = np.array(seg).reshape((-1, 2)).astype(np.int32)
                    cv2.fillPoly(mask, [poly], 1.0)

        if self.transform:
            augmented = self.transform(image=image, mask=mask)
            image = augmented['image']
            mask = augmented['mask']

        return image, mask.unsqueeze(0)

#AUGMENTATION
def get_train_transforms():
    return A.Compose([
        A.PadIfNeeded(min_height=CONFIG['IMG_SIZE'], min_width=CONFIG['IMG_SIZE'], border_mode=cv2.BORDER_CONSTANT, value=0),
        A.RandomCrop(height=CONFIG['IMG_SIZE'], width=CONFIG['IMG_SIZE']),

        # Geometriche classiche
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.RandomRotate90(p=0.5),

        # Distorsioni
        A.GridDistortion(num_steps=5, distort_limit=0.3, p=0.4),

        A.CLAHE(clip_limit=4.0, tile_grid_size=(8, 8), p=0.8),

        A.RandomBrightnessContrast(p=0.5),
        A.Normalize(),
        ToTensorV2(),
    ])

def train_finetune():
    coco = COCO(CONFIG['TRAIN_JSON'])
    ds = CableTrainDataset(coco, CONFIG['TRAIN_DIR'], transform=get_train_transforms())
    loader = DataLoader(ds, batch_size=CONFIG['BATCH_SIZE'], shuffle=True, num_workers=2, drop_last=True)

    model = smp.UnetPlusPlus(
        encoder_name=CONFIG['BACKBONE'],
        in_channels=3,
        classes=1
    ).to(CONFIG['DEVICE'])

    print(f"Caricamento pesi da: {CONFIG['PRETRAINED_PATH']}")
    try:
        checkpoint = torch.load(CONFIG['PRETRAINED_PATH'], map_location=CONFIG['DEVICE'])
        if 'state_dict' in checkpoint:
            model.load_state_dict(checkpoint['state_dict'])
        else:
            model.load_state_dict(checkpoint)
    except Exception as e:
        print(f"ERRORE CRITICO: Non riesco a caricare i pesi. {e}")
        return

    # Loss Tversky
    loss_fn = smp.losses.TverskyLoss(mode='binary', alpha=0.7, beta=0.3, log_loss=True)
    optimizer = torch.optim.AdamW(model.parameters(), lr=CONFIG['LR'], weight_decay=1e-3)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CONFIG['EPOCHS'], eta_min=1e-6)
    scaler = GradScaler()

    for epoch in range(CONFIG['EPOCHS']):
        model.train()
        epoch_loss = 0
        pbar = tqdm(loader, desc=f"Fine-Tune {epoch+1}/{CONFIG['EPOCHS']}")

        for img, mask in pbar:
            img, mask = img.to(CONFIG['DEVICE']), mask.to(CONFIG['DEVICE'])

            with autocast():
                out = model(img)
                loss = loss_fn(out, mask)

            optimizer.zero_grad()
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            epoch_loss += loss.item()
            pbar.set_postfix(loss=loss.item())

        scheduler.step()

        save_path = f"{CONFIG['SAVE_DIR']}/ft_epoch_{epoch+1}.pth"
        torch.save(model.state_dict(), save_path)
        print(f"Salvato: {save_path}")

    print("Fine Tuning completato.")

# AVERAGE WEIGHTS
def create_average_model():
    print("Creazione modello media (Average Weights)...")
    #prendiamo le ultime 5 epoche
    epochs_to_avg = [11, 12, 13, 14, 15]
    checkpoints = [f"{CONFIG['SAVE_DIR']}/ft_epoch_{i}.pth" for i in epochs_to_avg]

    avg_state_dict = torch.load(checkpoints[0])

    for path in checkpoints[1:]:
        state_dict = torch.load(path)
        for key in avg_state_dict:
            avg_state_dict[key] += state_dict[key]

    for key in avg_state_dict:
        avg_state_dict[key] = avg_state_dict[key] / len(checkpoints)

    torch.save(avg_state_dict, "best_cable_finetuned_avg.pth")
    print(">>> MODELLO FINALE CREATO: best_cable_finetuned_avg.pth <<<")

if __name__ == "__main__":
    train_finetune()
    create_average_model()

# Nuova inferenza

In [None]:
import os
import cv2
import json
import torch
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import DataLoader, Dataset
import segmentation_models_pytorch as smp
from pycocotools import mask as coco_mask
from tqdm import tqdm
from skimage.morphology import skeletonize


CONFIG = {
    "IMG_SIZE": 704,
    "DEVICE": "cuda" if torch.cuda.is_available() else "cpu",
    "BACKBONE": "timm-efficientnet-b4",

    "MODEL_PATH": "/kaggle/input/bestmodel-avg/tensorflow2/default/1/best_cable_finetuned_avg.pth",

    "TEST_JSON": "/kaggle/input/computervisiondataset/progettoComputerVision/test/test.json",
    "TEST_DIR": "/kaggle/input/computervisiondataset/progettoComputerVision/test",
    "OUTPUT_FILE": "Inferenza2-68.json",

    "CONF_THRESHOLD": 0.65,   #0.28 per adesso migliore.
    "MIN_AREA": 60,           #bbassiamo per accettare frammenti di cavo più piccoli
    "MORPH_KERNEL_SIZE": 3    #riduciamo il kernel a 3 per non unire cavi paralleli vicini
}

class CableTestDataset(Dataset):
    def __init__(self, coco, img_dir):
        self.coco = coco
        self.img_dir = img_dir
        self.ids = list(sorted(coco.imgs.keys()))
        self.transform = A.Compose([
            A.PadIfNeeded(min_height=CONFIG['IMG_SIZE'], min_width=CONFIG['IMG_SIZE'], border_mode=cv2.BORDER_CONSTANT, value=0),
            A.CLAHE(clip_limit=4.0, tile_grid_size=(8, 8), p=1.0),
            A.Normalize(),
            ToTensorV2(),
        ])
    def __len__(self): return len(self.ids)
    def __getitem__(self, index):
        img_id = self.ids[index]
        img_info = self.coco.loadImgs(img_id)[0]
        path = os.path.join(self.img_dir, img_info['file_name'])
        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        orig_h, orig_w = image.shape[:2]
        augmented = self.transform(image=image)
        return augmented['image'], img_id, orig_h, orig_w


def rle_encode(mask):
    pixels = np.asfortranarray(mask.astype(np.uint8))
    encoded = coco_mask.encode(pixels)
    encoded['counts'] = encoded['counts'].decode('utf-8')
    return encoded

def get_line_params_skeleton(mask_component):
    skel = skeletonize(mask_component > 0).astype(np.uint8)
    y_idxs, x_idxs = np.where(skel > 0)
    pts = np.stack((x_idxs, y_idxs), axis=1).astype(np.float32)

    if len(pts) < 5:
        y_idxs, x_idxs = np.where(mask_component > 0)
        pts = np.stack((x_idxs, y_idxs), axis=1).astype(np.float32)
    if len(pts) < 5: return None, None

    [vx, vy, x0, y0] = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01)
    theta = np.arctan2(vy, vx) + np.pi/2
    rho = x0 * np.cos(theta) + y0 * np.sin(theta)

    rho, theta = float(rho), float(theta)
    if rho < 0: rho, theta = -rho, theta - np.pi
    theta = theta % (2 * np.pi)
    return rho, theta

def run_inference():
    print(f"--- INFERENZA DEEP FISHING (Thresh: {CONFIG['CONF_THRESHOLD']}) ---")

    model = smp.UnetPlusPlus(encoder_name=CONFIG['BACKBONE'], in_channels=3, classes=1)

    try:
        if not os.path.exists(CONFIG['MODEL_PATH']):
            print("ERRORE: Modello non trovato!")
            return
        ckpt = torch.load(CONFIG['MODEL_PATH'], map_location=CONFIG['DEVICE'])
        if 'state_dict' in ckpt: model.load_state_dict(ckpt['state_dict'])
        else: model.load_state_dict(ckpt)
    except:
        return

    model.to(CONFIG['DEVICE'])
    model.eval()

    from pycocotools.coco import COCO
    coco_test = COCO(CONFIG['TEST_JSON'])
    loader = DataLoader(CableTestDataset(coco_test, CONFIG['TEST_DIR']), batch_size=1, shuffle=False, num_workers=2)
    results = []

    morph_kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (CONFIG['MORPH_KERNEL_SIZE'], CONFIG['MORPH_KERNEL_SIZE']))

    with torch.no_grad():
        for img, img_id, h, w in tqdm(loader):
            img = img.to(CONFIG['DEVICE'])
            img_id_val, orig_h, orig_w = int(img_id.item()), int(h.item()), int(w.item())

            # HEAVY TTA
            p1 = torch.sigmoid(model(img))
            p2 = torch.flip(torch.sigmoid(model(torch.flip(img, [3]))), [3])
            p3 = torch.flip(torch.sigmoid(model(torch.flip(img, [2]))), [2])
            p4 = torch.flip(torch.sigmoid(model(torch.flip(img, [2, 3]))), [2, 3])
            pred_mask = (p1 + p2 + p3 + p4) / 4.0
            pred_mask = pred_mask.squeeze().cpu().numpy()

            # Crop & Resize
            pad_h = (CONFIG['IMG_SIZE'] - orig_h) // 2
            pad_w = (CONFIG['IMG_SIZE'] - orig_w) // 2
            if pad_h > 0: pred_mask = pred_mask[pad_h:pad_h+orig_h, :]
            if pad_w > 0: pred_mask = pred_mask[:, pad_w:pad_w+orig_w]
            if pred_mask.shape != (orig_h, orig_w):
                pred_mask = cv2.resize(pred_mask, (orig_w, orig_h), interpolation=cv2.INTER_CUBIC)

            # SOGLIA BASSA
            mask_bin = (pred_mask > CONFIG['CONF_THRESHOLD']).astype(np.uint8)

            # Closing leggero (Kernel 3)
            mask_bin = cv2.morphologyEx(mask_bin, cv2.MORPH_CLOSE, morph_kernel)

            num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(mask_bin, connectivity=8)

            for i in range(1, num_labels):
                area = stats[i, cv2.CC_STAT_AREA]
                if area < CONFIG['MIN_AREA']: continue

                inst_mask = (labels == i).astype(np.uint8)
                score = float(np.mean(pred_mask[inst_mask > 0]))

                # BBox
                x, y, bw, bh = stats[i, cv2.CC_STAT_LEFT], stats[i, cv2.CC_STAT_TOP], stats[i, cv2.CC_STAT_WIDTH], stats[i, cv2.CC_STAT_HEIGHT]

                rho, theta = get_line_params_skeleton(inst_mask)
                if rho is None: continue

                res = {
                    "image_id": img_id_val, "category_id": 0,
                    "bbox": [float(x), float(y), float(bw), float(bh)],
                    "segmentation": rle_encode(inst_mask),
                    "score": score, "lines": [rho, theta],
                    "area": float(area), "height": orig_h, "width": orig_w,
                    "id": len(results) + 1
                }
                results.append(res)

    with open(CONFIG['OUTPUT_FILE'], 'w') as f:
        json.dump(results, f)
    print("Salvataggio completato.")

if __name__ == "__main__":
    run_inference()

# Fine tuning su immagini difficili

In [None]:
import os
import cv2
import json
import torch
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import DataLoader, Dataset
import segmentation_models_pytorch as smp
from torch.optim import AdamW
from torch.optim.lr_scheduler import OneCycleLR
from tqdm import tqdm
from pycocotools.coco import COCO
import gc
import random

from torch.amp import autocast, GradScaler

CONFIG = {
    "IMG_SIZE": 704,
    "BATCH_SIZE": 2,
    "ACCUMULATION_STEPS": 16,
    "NUM_WORKERS": 2,
    "EPOCHS": 8,
    "LEARNING_RATE": 5e-5,
    "DEVICE": "cuda" if torch.cuda.is_available() else "cpu",
    "BACKBONE": "timm-efficientnet-b4",
    "PRETRAINED_PATH": "/kaggle/input/bestmodel/tensorflow2/default/1/best_cable_finetuned_avg.pth",
    "TRAIN_JSON": "/kaggle/input/computervisiondataset/progettoComputerVision/train/train.json",
    "TRAIN_DIR": "/kaggle/input/computervisiondataset/progettoComputerVision/train",
    "SAVE_PATH": "best_finetuned_HARD.pth",
    "HARD_AREA_THRESHOLD": 4000
}

class CableDataset(Dataset):
    def __init__(self, coco, img_dir, specific_ids=None, transform=None):
        self.coco = coco
        self.img_dir = img_dir
        if specific_ids is not None:
            self.ids = [int(i) for i in specific_ids]
        else:
            self.ids = list(sorted(coco.imgs.keys()))
        self.transform = transform

    def __len__(self): return len(self.ids)

    def __getitem__(self, index):
        safe_index = index % len(self.ids)
        img_id = self.ids[safe_index]

        try:
            img_list = self.coco.loadImgs(int(img_id))
            if not img_list:
                return self.__getitem__(index + 1)

            img_info = img_list[0]
            path = os.path.join(self.img_dir, img_info['file_name'])

            image = cv2.imread(path)
            if image is None:
                print(f"Warning: Immagine corrotta {path}. Skip.")
                return self.__getitem__(index + 1)

            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            ann_ids = self.coco.getAnnIds(imgIds=img_id)
            anns = self.coco.loadAnns(ann_ids)
            mask = np.zeros(image.shape[:2], dtype=np.uint8)

            for ann in anns:
                m = self.coco.annToMask(ann)
                mask = np.maximum(mask, m)
            mask = mask.astype(np.float32)

            if self.transform:
                augmented = self.transform(image=image, mask=mask)
                image = augmented['image']
                mask = augmented['mask']

            return image, mask.unsqueeze(0)

        except Exception as e:
            print(f"Errore critico ID {img_id}: {e}")
            return self.__getitem__(random.randint(0, len(self.ids)-1))


def get_transforms(phase):
    if phase == 'train':
        return A.Compose([
            A.Resize(CONFIG['IMG_SIZE'], CONFIG['IMG_SIZE']),
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.RandomRotate90(p=0.5),
            A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.1, rotate_limit=15, p=0.5),
            A.Normalize(),
            ToTensorV2(),
        ])
    else:
        return A.Compose([
            A.Resize(CONFIG['IMG_SIZE'], CONFIG['IMG_SIZE']),
            A.Normalize(),
            ToTensorV2(),
        ])

class CombinedLoss(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.dice = smp.losses.DiceLoss(mode='binary', from_logits=True)
        self.focal = smp.losses.FocalLoss(mode='binary', gamma=3.0)

    def forward(self, y_pred, y_true):
        return self.dice(y_pred, y_true) + self.focal(y_pred, y_true)

def get_hard_samples(coco):
    all_ids = sorted(coco.imgs.keys())
    hard_ids = []
    normal_ids = []

    print("Analisi dataset per Hard Mining...")
    for img_id in tqdm(all_ids):
        ann_ids = coco.getAnnIds(imgIds=img_id)
        anns = coco.loadAnns(ann_ids)
        if not anns: continue

        total_area = sum([ann['area'] for ann in anns])
        if 0 < total_area < CONFIG['HARD_AREA_THRESHOLD']:
            hard_ids.append(img_id)
        else:
            normal_ids.append(img_id)

    print(f"Trovati {len(hard_ids)} HARD samples.")

    rng = np.random.default_rng(seed=42)
    # Prendiamo max 25% di normali per bilanciare
    num_normal = min(len(normal_ids), int(len(hard_ids) * 0.25))
    selected_normal = list(rng.choice(normal_ids, size=num_normal, replace=False))

    final_ids = hard_ids + selected_normal
    rng.shuffle(final_ids)

    print(f"Training Set Finale: {len(final_ids)} immagini.")
    return final_ids

def train_model():
    gc.collect()
    torch.cuda.empty_cache()

    print(f"--- TRAINING HARD MINING V3 (FINAL FIX) ---")

    coco = COCO(CONFIG['TRAIN_JSON'])
    train_ids = get_hard_samples(coco)

    all_ids_set = set(coco.imgs.keys())
    train_ids_set = set(train_ids)
    remaining = list(all_ids_set - train_ids_set)
    val_ids = remaining[:50] if len(remaining) > 50 else remaining

    train_ds = CableDataset(coco, CONFIG['TRAIN_DIR'], specific_ids=train_ids, transform=get_transforms('train'))
    val_ds = CableDataset(coco, CONFIG['TRAIN_DIR'], specific_ids=val_ids, transform=get_transforms('val'))

    train_loader = DataLoader(train_ds, batch_size=CONFIG['BATCH_SIZE'], shuffle=True, num_workers=CONFIG['NUM_WORKERS'], pin_memory=True, drop_last=True)
    val_loader = DataLoader(val_ds, batch_size=CONFIG['BATCH_SIZE'], shuffle=False, num_workers=CONFIG['NUM_WORKERS'], pin_memory=True)

    model = smp.UnetPlusPlus(encoder_name=CONFIG['BACKBONE'], in_channels=3, classes=1)

    if os.path.exists(CONFIG['PRETRAINED_PATH']):
        print(f"Caricamento pesi base: {CONFIG['PRETRAINED_PATH']}")
        ckpt = torch.load(CONFIG['PRETRAINED_PATH'], map_location='cpu')
        if 'state_dict' in ckpt: model.load_state_dict(ckpt['state_dict'])
        else: model.load_state_dict(ckpt)

    model.to(CONFIG['DEVICE'])

    loss_fn = CombinedLoss()
    optimizer = AdamW(model.parameters(), lr=CONFIG['LEARNING_RATE'], weight_decay=1e-2)

    scaler = GradScaler('cuda')

    scheduler = OneCycleLR(optimizer, max_lr=CONFIG['LEARNING_RATE'],
                           steps_per_epoch=len(train_loader) // CONFIG['ACCUMULATION_STEPS'],
                           epochs=CONFIG['EPOCHS'])

    best_iou = 0.0

    for epoch in range(CONFIG['EPOCHS']):
        model.train()
        train_loss = 0
        optimizer.zero_grad()

        pbar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{CONFIG['EPOCHS']}")

        for i, (imgs, masks) in enumerate(pbar):
            imgs = imgs.to(CONFIG['DEVICE'], non_blocking=True)
            masks = masks.to(CONFIG['DEVICE'], non_blocking=True)


            with autocast('cuda'):
                preds = model(imgs)
                loss = loss_fn(preds, masks)
                loss = loss / CONFIG['ACCUMULATION_STEPS']

            scaler.scale(loss).backward()

            if (i + 1) % CONFIG['ACCUMULATION_STEPS'] == 0:
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad()
                scheduler.step()

            train_loss += loss.item() * CONFIG['ACCUMULATION_STEPS']
            pbar.set_postfix(loss=loss.item() * CONFIG['ACCUMULATION_STEPS'])

        model.eval()
        tp, fp, fn = 0, 0, 0
        torch.cuda.empty_cache()

        with torch.no_grad():
            for imgs, masks in val_loader:
                imgs = imgs.to(CONFIG['DEVICE'], non_blocking=True)
                masks = masks.to(CONFIG['DEVICE'], non_blocking=True)
                with autocast('cuda'):
                    preds = model(imgs)
                pred_mask = (preds > 0).float()
                tp += (pred_mask * masks).sum().item()
                fp += (pred_mask * (1 - masks)).sum().item()
                fn += ((1 - pred_mask) * masks).sum().item()

        iou_score = tp / (tp + fp + fn + 1e-7)
        print(f"Stats: Val IoU={iou_score:.4f}")

        if iou_score > best_iou:
            best_iou = iou_score
            torch.save(model.state_dict(), CONFIG['SAVE_PATH'])
            print(f" BEST HARD-MINED MODEL SAVED! (IoU: {best_iou:.4f})")

    print("Fine Training.")

if __name__ == "__main__":
    train_model()

# Inferenza Finale

In [None]:
import os
import cv2
import json
import torch
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.utils.data import DataLoader, Dataset
import segmentation_models_pytorch as smp
from pycocotools import mask as coco_mask
from tqdm import tqdm
from skimage.morphology import skeletonize
from pycocotools.coco import COCO


CONFIG = {
    "IMG_SIZE": 704,
    "DEVICE": "cuda" if torch.cuda.is_available() else "cpu",
    "BACKBONE": "timm-efficientnet-b4",

    "MODEL_PATH": "/kaggle/input/bestmodel-avg/tensorflow2/default/1/best_cable_finetuned_avg.pth",

    "TEST_JSON": "/kaggle/input/computervisiondataset/progettoComputerVision/test/test.json",
    "TEST_DIR": "/kaggle/input/computervisiondataset/progettoComputerVision/test",
    "OUTPUT_FILE": "269290-269170.json",


    "CONF_THRESHOLD": 0.9,
    "MIN_AREA": 100,
    "MIN_ASPECT_RATIO": 2.0
}


def rle_encode(mask):
    """Codifica RLE compatibile con JSON (stringhe, non bytes)"""
    pixels = np.asfortranarray(mask.astype(np.uint8))
    encoded = coco_mask.encode(pixels)
    if isinstance(encoded['counts'], bytes):
        encoded['counts'] = encoded['counts'].decode('utf-8')
    return encoded

def get_line_params_skeleton(mask_component):
    #Calcola rho e theta
    skel = skeletonize(mask_component > 0).astype(np.uint8)
    y_idxs, x_idxs = np.where(skel > 0)
    pts = np.stack((x_idxs, y_idxs), axis=1).astype(np.float32)

    if len(pts) < 5:
        y_idxs, x_idxs = np.where(mask_component > 0)
        pts = np.stack((x_idxs, y_idxs), axis=1).astype(np.float32)

    if len(pts) < 5: return None, None

    [vx, vy, x0, y0] = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01)
    theta = np.arctan2(vy, vx) + np.pi/2
    rho = x0 * np.cos(theta) + y0 * np.sin(theta)

    return float(rho), float(theta % (2 * np.pi))

class SimpleTestDataset(Dataset):
    def __init__(self, coco, img_dir):
        self.coco = coco
        self.img_dir = img_dir
        self.ids = list(sorted(coco.imgs.keys()))

        self.transform = A.Compose([
            A.PadIfNeeded(min_height=CONFIG['IMG_SIZE'], min_width=CONFIG['IMG_SIZE'], border_mode=cv2.BORDER_CONSTANT, value=0),
            A.Normalize(),
            ToTensorV2(),
        ])

    def __len__(self): return len(self.ids)

    def __getitem__(self, index):
        img_id = self.ids[index]
        info = self.coco.loadImgs(img_id)[0]
        path = os.path.join(self.img_dir, info['file_name'])

        image = cv2.imread(path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        orig_h, orig_w = image.shape[:2]

        augmented = self.transform(image=image)
        return augmented['image'], img_id, orig_h, orig_w

def run_pure_inference():


    model = smp.UnetPlusPlus(encoder_name=CONFIG['BACKBONE'], in_channels=3, classes=1)
    try:
        ckpt = torch.load(CONFIG['MODEL_PATH'], map_location=CONFIG['DEVICE'])
        if 'state_dict' in ckpt: model.load_state_dict(ckpt['state_dict'])
        else: model.load_state_dict(ckpt)
        print("Modello caricato correttamente.")
    except Exception as e:
        print(f"Errore caricamento modello: {e}")
        return

    model.to(CONFIG['DEVICE'])
    model.eval()


    coco_test = COCO(CONFIG['TEST_JSON'])
    ds = SimpleTestDataset(coco_test, CONFIG['TEST_DIR'])
    loader = DataLoader(ds, batch_size=1, shuffle=False, num_workers=2)

    results = []

    with torch.no_grad():
        for img, img_id, h, w in tqdm(loader):
            img = img.to(CONFIG['DEVICE'])
            img_id_val = int(img_id.item())
            orig_h, orig_w = int(h.item()), int(w.item())


            logits = model(img)
            pred_mask = torch.sigmoid(logits).squeeze().cpu().numpy()


            #Rimozione Padding (Crop centrale per tornare alle dimensioni originali)
            ph = (CONFIG['IMG_SIZE'] - orig_h) // 2
            pw = (CONFIG['IMG_SIZE'] - orig_w) // 2

            if ph >= 0 and pw >= 0:
                pred_mask = pred_mask[ph:ph+orig_h, pw:pw+orig_w]
            else:
                pred_mask = cv2.resize(pred_mask, (orig_w, orig_h))

            mask_bin = (pred_mask > CONFIG['CONF_THRESHOLD']).astype(np.uint8)


            num_labels, labels, stats, _ = cv2.connectedComponentsWithStats(mask_bin, connectivity=8)

            for i in range(1, num_labels):
                area = stats[i, cv2.CC_STAT_AREA]
                # Filtri di base
                if area < CONFIG['MIN_AREA']: continue
                # Estrazione Parametri
                inst_mask = (labels == i).astype(np.uint8)
                score = float(np.mean(pred_mask[inst_mask > 0]))
                # BBox
                x, y, bw, bh = stats[i, cv2.CC_STAT_LEFT], stats[i, cv2.CC_STAT_TOP], stats[i, cv2.CC_STAT_WIDTH], stats[i, cv2.CC_STAT_HEIGHT]
                # Geometria Linea
                rho, theta = get_line_params_skeleton(inst_mask)
                if rho is None: continue
                # Costruzione Oggetto JSON
                res = {
                    "image_id": img_id_val,
                    "category_id": 0,
                    "bbox": [float(x), float(y), float(bw), float(bh)],
                    "segmentation": rle_encode(inst_mask),
                    "score": score,
                    "lines": [rho, theta],
                    "area": float(area),
                    "id": len(results) + 1
                }
                results.append(res)

    # Salvataggio Finale
    with open(CONFIG['OUTPUT_FILE'], 'w') as f:
        json.dump(results, f)
    print(f"File salvato: {CONFIG['OUTPUT_FILE']}")

if __name__ == "__main__":
    run_pure_inference()

--- AVVIO INFERENZA SENZA TTA (PURE) ---


  A.PadIfNeeded(min_height=CONFIG['IMG_SIZE'], min_width=CONFIG['IMG_SIZE'], border_mode=cv2.BORDER_CONSTANT, value=0),


Modello caricato correttamente.
loading annotations into memory...
Done (t=0.06s)
creating index...
index created!


  return float(rho), float(theta % (2 * np.pi))
100%|██████████| 400/400 [00:42<00:00,  9.47it/s]

File salvato: 269290-269170.json





# Calcolo Score

In [None]:
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from pathlib import Path
from collections import defaultdict
from tqdm import tqdm
from pycocotools import mask as coco_mask
import json
import numpy as np
import cv2

def evaluate_segmentation(gt_json_path, pred_json_path, check_cable_class=False):
    # Load ground truth
    coco_gt = COCO(gt_json_path)

    # --- FIX PER L'ERRORE KEYERROR 'INFO' ---
    # Se il dataset originale non ha la chiave 'info', ne creiamo una vuota
    if 'info' not in coco_gt.dataset:
        coco_gt.dataset['info'] = {'description': 'Dataset patchato per pycocotools', 'version': '1.0', 'year': 2024}
    # ----------------------------------------

    # Load predictions
    with open(pred_json_path, 'r') as f:
        predictions = json.load(f)

    # Load results into COCO results structure
    coco_res = coco_gt.loadRes(predictions)

    # Create COCOeval object
    coco_eval = COCOeval(coco_gt, coco_res, 'segm')
    if check_cable_class:
        coco_eval.params.catIds = [0]  # id of the cable class

    # Run evaluation
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()

    avg_p50 = coco_eval.stats[1]
    avg_r50 = coco_eval.stats[7]
    return avg_p50, avg_r50



def combined_analysis(gt_annotation_file, prediction_file):
    # Load ground truth data
    with open(gt_annotation_file, 'r') as f:
        gt_data = json.load(f)
    # Load prediction data
    with open(prediction_file, 'r') as f:
        pred_data = json.load(f)
    # Group GT lines by image id
    gt_lines_by_image = defaultdict(list)
    for ann in gt_data['annotations']:
        image_id = ann['image_id']
        if 'polar_coordinates' in ann:
            lines = [(coord['rho'], coord['theta']) for coord in ann['polar_coordinates']]
            gt_lines_by_image[image_id].extend(lines)
        else:
            raise RuntimeError(f'no polar coord for image id {image_id}')
    # Group predictions by image id
    pred_by_image = defaultdict(list)
    for pred in pred_data:
        pred_by_image[pred['image_id']].append(pred)
    angle_diffs = []
    rho_diffs = []

    def theta_diff(theta_pred, theta_gt):
        t = min(abs(theta_pred - theta_gt), np.pi - abs(theta_pred - theta_gt))
        return np.exp(-.12 * t)

    def polygons_to_mask(polygons, shape):
        mask = np.zeros(shape, dtype=np.uint8)
        for polygon in polygons:
            pts = np.array(polygon).reshape((-1, 2)).astype(np.int32)
            cv2.fillPoly(mask, [pts], color=255)
        return mask

    def compute_iou(mask1, mask2):
        intersection = np.logical_and(mask1, mask2).sum()
        union = np.logical_or(mask1, mask2).sum()
        return intersection / union if union > 0 else 0

    total_matches = 0
    total_gt_lines = 0
    total_pred_lines = 0

    for image_info in tqdm(gt_data['images']):
        image_id = image_info['id']
        height, width = image_info['height'], image_info['width']

        # Load predictions for this image
        pred_masks = []
        pred_lines = []
        for pred in pred_by_image.get(image_id, []):
            seg = pred['segmentation']
            if isinstance(seg, list):
                mask_poly = polygons_to_mask(seg, (height, width))
                pred_masks.append(mask_poly)
            elif isinstance(seg, dict) and 'counts' in seg and 'size' in seg:
                mask_rle = coco_mask.decode(seg)
                if mask_rle.ndim == 3:
                    mask_rle = mask_rle[:, :, 0]
                mask_rle = (mask_rle * 255).astype(np.uint8)
                pred_masks.append(mask_rle)
            else:
                raise RuntimeError(f'[SEGM] unsupported format for image id {image_id}')

            # Extract predicted line if exists
            if 'lines' in pred and len(pred['lines']) == 2:
                rho, theta = pred['lines']
                rho = np.abs(rho / np.sqrt(height**2 + width**2))
                pred_lines.append((rho, theta))
            else:
                pred_lines.append(None)

        # Load ground truth masks for this image
        gt_masks = []
        gt_lines = []
        for ann in gt_data['annotations']:
            if ann['image_id'] == image_id:
                seg = ann['segmentation']
                if isinstance(seg, list):
                    mask_poly = polygons_to_mask(seg, (height, width))
                    gt_masks.append(mask_poly)
                elif isinstance(seg, dict) and 'counts' in seg and 'size' in seg:
                    mask_rle = coco_mask.decode(seg)
                    if mask_rle.ndim == 3:
                        mask_rle = mask_rle[:, :, 0]
                    mask_rle = (mask_rle * 255).astype(np.uint8)
                    gt_masks.append(mask_rle)
                else:
                    raise RuntimeError(f'[GT] unsupported format for image id {image_id}')

                # Extract GT line
                if 'polar_coordinates' in ann and len(ann['polar_coordinates']) > 0:
                    rho, theta = ann['polar_coordinates'][0]['rho'], ann['polar_coordinates'][0]['theta']
                    rho = np.abs(rho / np.sqrt(height**2 + width**2))
                    gt_lines.append((rho, theta))
                else:
                    gt_lines.append(None)

        # Detect the matching mask by IoU
        matched_gt = set()
        for pred_idx, pred_mask in enumerate(pred_masks):
            best_iou = 0
            best_gt_idx = -1

            for gt_idx, gt_mask in enumerate(gt_masks):
                if gt_idx in matched_gt:
                    continue
                iou = compute_iou(pred_mask, gt_mask)
                if iou > best_iou:
                    best_iou = iou
                    best_gt_idx = gt_idx

            # Consider it a match if IoU > threshold (e.g., 0.5)
            #if best_iou > 0.5:
            if best_gt_idx >= 0:
                matched_gt.add(best_gt_idx)
                total_matches += 1

                # Compute the rho_diff and theta_diff if both lines exist
                pred_line = pred_lines[pred_idx]
                gt_line = gt_lines[best_gt_idx]

                if pred_line is not None and gt_line is not None:
                    rho_pred, theta_pred = pred_line
                    rho_gt, theta_gt = gt_line

                    rho_diffs.append(abs(rho_pred - rho_gt))
                    angle_diffs.append(theta_diff(theta_pred, theta_gt))

        # Count matching and not matching lines
        total_gt_lines += len(gt_masks)
        total_pred_lines += len(pred_masks)

    print(f"Total GT lines: {total_gt_lines}")
    print(f"Total predicted lines: {total_pred_lines}")
    print(f"Total matches: {total_matches}")
    print(f"Lines with coordinate differences computed: {len(rho_diffs)}")

    if len(rho_diffs) == 0:
        return 0, 0

    return np.mean(rho_diffs), np.mean(angle_diffs)



def compute_line_detection_score(gt_json_path, pred_json_path):

    avg_p50, avg_r50 = evaluate_segmentation(gt_json_path, pred_json_path)
    rho_diff, angle_diff = combined_analysis(gt_json_path, pred_json_path)

    print(f'{avg_p50=}, {avg_r50=}, {angle_diff=}')

    lds = avg_p50 + avg_r50 + 2 * angle_diff
    print(f'LDS = {lds}')
    return lds



gt_json_path='/kaggle/input/computervisiondataset/progettoComputerVision/test/test.json'
c='269290-269170.json'
compute_line_detection_score(gt_json_path, c)

loading annotations into memory...
Done (t=0.07s)
creating index...
index created!
Loading and preparing results...
DONE (t=0.00s)
creating index...
index created!
Running per image evaluation...
Evaluate annotation type *segm*
DONE (t=1.00s).
Accumulating evaluation results...
DONE (t=0.05s).
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.234
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets=100 ] = 0.433
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets=100 ] = 0.229
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.129
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = 0.527
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = 0.655
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=  1 ] = 0.070
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 10 ] = 0.270
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets

100%|██████████| 400/400 [00:37<00:00, 10.73it/s]

Total GT lines: 3328
Total predicted lines: 2903
Total matches: 2413
Lines with coordinate differences computed: 2413
avg_p50=np.float64(0.43343107386395985), avg_r50=np.float64(0.2695612980769231), angle_diff=np.float64(0.9986471746095762)
LDS = 2.7002867211600354





np.float64(2.7002867211600354)