# Rilevamento e Segmentazione di Cavi Elettrici su Dataset TTPLA
## Implementazione basata su Detectron2 e PointRend con Loss Ibrida

**Corso:** Computer Vision - A.A. 2025/2026  
**Università:** Università della Calabria (DIMES)  
**Studenti:** Martucci Anastasia (271316), Zappia Giuseppe (268784)  
**Docenti:** Prof. Manco Giuseppe, Prof. Pisani Francesco Sergio  

---

### Introduzione e Obiettivi del Task
L'obiettivo di questo progetto è lo sviluppo di un modello di **Instance Segmentation** capace di individuare cavi elettrici in scenari aerei complessi utilizzando il dataset **TTPLA**. 
Il task presenta sfide significative:
1. **Sottigliezza estrema:** i cavi occupano spesso solo 1-3 pixel.
2. **Sbilanciamento delle classi:** il background domina l'immagine (>99%).
3. **Discontinuità:** le reti standard tendono a frammentare i cavi a causa del downsampling.

Il modello finale deve restituire: score, bounding box, maschera di segmentazione e coordinate polari $(\rho, \theta)$ della retta.

## 1. Setup dell'Ambiente e Installazione
Installiamo le dipendenze necessarie, tra cui **Detectron2** (sviluppato da FAIR) e le utility per la gestione del formato COCO.

In [None]:
# Installazione di Detectron2
!python -m pip install 'git+https://github.com/facebookresearch/detectron2.git'
!pip install pycocotools opencv-python-headless

In [None]:
import torch, detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

import numpy as np
import os, json, cv2, random
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor, DefaultTrainer
from detectron2.config import get_cfg
from detectron2.data import MetadataCatalog, DatasetCatalog
import albumentations as A
from tqdm import tqdm
import copy

# --- PERCORSI DI INPUT  ---
PATH_IMG_TRAIN = "/kaggle/input/train-cv" 
PATH_JSON_TRAIN = "/kaggle/input/json-annotazioni/train.json"

# --- PERCORSO DI OUTPUT ---
TRAIN_AUG_DIR = "/kaggle/working/train_aug_dataset"
TRAIN_AUG_IMG_DIR = os.path.join(TRAIN_AUG_DIR, "images")
TRAIN_AUG_JSON_PATH = os.path.join(TRAIN_AUG_DIR, "train_aug.json")

os.makedirs(TRAIN_AUG_IMG_DIR, exist_ok=True)

## 2. Strategia di Data Augmentation Offline
Dato il numero ridotto di campioni (842 immagini di training), abbiamo implementato una pipeline di **Augmentation Offline**.
Abbiamo scelto trasformazioni **geometriche non distruttive** (rotazioni di 90°, flip) per aumentare il dataset di 5 volte senza introdurre artefatti di interpolazione che potrebbero danneggiare la definizione dei cavi sottili.

In [None]:
import numpy as np
import os, json, cv2, copy
import albumentations as A
from tqdm import tqdm

def geometric_augmentation(inputs_list, output_img_dir, output_json_path, multiplier=5):
    """
    Genera il dataset aumentato SOLO GEOMETRICAMENTE
    """
    
    # Contatori globali
    global_img_id = 1
    global_ann_id = 1
    
    final_images = []
    final_annotations = []
    
    # Categoria (id 0 = cavo)
    categories = [{"id": 0, "name": "cable"}] 

    # --- PIPELINE DI AUGMENTATION ---
    transform = A.Compose([
        A.HorizontalFlip(p=0.7),
        A.VerticalFlip(p=0.7),
        A.RandomRotate90(p=0.7),
        A.Transpose(p=0.7),
    ])

    print(f"Inizio augmentation geometrica dataset in: {output_img_dir}")

    for json_path, img_source_dir in inputs_list:
        
        with open(json_path) as f:
            data = json.load(f)
            
        # Mappa annotazioni per ID immagine
        img_to_anns = {img['id']: [] for img in data['images']}
        if 'annotations' in data:
            for ann in data['annotations']:
                img_to_anns[ann['image_id']].append(ann)

        for img_info in tqdm(data['images']):
            src_path = os.path.join(img_source_dir, img_info['file_name'])
            
            if not os.path.exists(src_path):
                continue
                
            image = cv2.imread(src_path)
            # Conversione in RGB per Albumentations
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            h, w = image.shape[:2]
            
            anns = img_to_anns.get(img_info['id'], [])
            
            # --- FASE 1: Preparazione Maschere Separate ---           
            masks_list = []
            if len(anns) > 0:
                for idx, ann in enumerate(anns):
                    mask = np.zeros((h, w), dtype=np.uint8)
                    for seg in ann['segmentation']:
                        poly = np.array(seg).reshape((-1, 2)).astype(np.int32)
                        cv2.fillPoly(mask, [poly], 1)
                    masks_list.append(mask)
            
            # --- FASE 2: Generazione Versioni ---          
            for i in range(multiplier):
                filename_base = ""
                
                # Caso 0: Immagine Originale
                if i == 0:
                    augmented_image = image
                    augmented_masks = masks_list
                    filename_base = f"img_{global_img_id}_orig.jpg"
                
                # Caso > 0: Augmentation (Solo Geometrica)
                else:
                    if len(masks_list) > 0:
                        augmented = transform(image=image, masks=masks_list)
                        augmented_image = augmented['image']
                        augmented_masks = augmented['masks']
                    else:
                        augmented = transform(image=image)
                        augmented_image = augmented['image']
                        augmented_masks = []
                    
                    filename_base = f"img_{global_img_id}_aug_{i}.jpg"

                # Salvataggio immagine su disco (tornando in BGR per opencv)
                save_path = os.path.join(output_img_dir, filename_base)
                cv2.imwrite(save_path, cv2.cvtColor(augmented_image, cv2.COLOR_RGB2BGR))
                
                # Aggiunta entry Immagine al JSON
                new_img_entry = {
                    "id": global_img_id,
                    "file_name": filename_base,
                    "height": h,
                    "width": w
                }
                final_images.append(new_img_entry)
                
                # --- FASE 3: Ricostruzione Annotazioni COCO ---
                for mask_idx, aug_mask in enumerate(augmented_masks):
                    if np.sum(aug_mask) == 0:
                        continue
                        
                    contours, _ = cv2.findContours(aug_mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
                    new_segmentation = []
                    total_area = 0
                    
                    for contour in contours:
                        if cv2.contourArea(contour) < 5: 
                            continue
                        seg_coords = contour.flatten().tolist()
                        if len(seg_coords) >= 6:
                            new_segmentation.append(seg_coords)
                            total_area += cv2.contourArea(contour)
                    
                    if len(new_segmentation) > 0:
                        ys, xs = np.where(aug_mask > 0)
                        x_min, x_max = xs.min(), xs.max()
                        y_min, y_max = ys.min(), ys.max()
                        w_box = x_max - x_min
                        h_box = y_max - y_min
                        
                        new_ann = {
                            "id": global_ann_id,
                            "image_id": global_img_id,
                            "category_id": 0,
                            "segmentation": new_segmentation,
                            "area": float(total_area),
                            "bbox": [float(x_min), float(y_min), float(w_box), float(h_box)],
                            "iscrowd": 0
                        }
                        final_annotations.append(new_ann)
                        global_ann_id += 1
                
                global_img_id += 1

    # Salvataggio JSON aumentato
    final_json = {
        "images": final_images,
        "annotations": final_annotations,
        "categories": categories
    }
    
    with open(output_json_path, 'w') as f:
        json.dump(final_json, f)
    
    print(f"Totale immagini: {len(final_images)}, Totale annotazioni: {len(final_annotations)}")

training_inputs = [(PATH_JSON_TRAIN, PATH_IMG_TRAIN)]

geometric_augmentation(training_inputs, TRAIN_AUG_IMG_DIR, TRAIN_AUG_JSON_PATH, multiplier=5)

In [None]:
# --- VISUALIZZAZIONE DI CONTROLLO DELL'AUGMENTATION ---
import matplotlib.pyplot as plt
import random
from detectron2.utils.visualizer import Visualizer
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.data.datasets import register_coco_instances

# 1. Registriamo il dataset per la visualizzazione
AUG_VIS_DATASET_NAME = "vis_debug_augmented_v2"

# Pulizia preventiva se il dataset è già registrato
if AUG_VIS_DATASET_NAME in DatasetCatalog.list():
    DatasetCatalog.remove(AUG_VIS_DATASET_NAME)
    MetadataCatalog.remove(AUG_VIS_DATASET_NAME)

register_coco_instances(AUG_VIS_DATASET_NAME, {}, TRAIN_AUG_JSON_PATH, TRAIN_AUG_IMG_DIR)

# 2. Impostiamo i metadati gestendo il mapping degli ID 
aug_metadata = MetadataCatalog.get(AUG_VIS_DATASET_NAME)
aug_metadata.set(
    thing_classes=["cable"],
    # Esplicitiamo che l'ID 0 del JSON corrisponde alla classe 0 interna
    thing_dataset_id_to_contiguous_id={0: 0} 
)

# 3. Carichiamo il dataset
dataset_dicts = DatasetCatalog.get(AUG_VIS_DATASET_NAME)

# 4. Selezione CASUALE di 5 immagini
num_to_show = 5
# Usiamo random.sample per prenderne 5 diverse a caso dal totale
samples = random.sample(dataset_dicts, num_to_show)

print(f"Selezioniamo casualmente {num_to_show} immagini dal dataset aumentato per verificare le annotazioni.")

# Impostiamo una figura alta e stretta per la disposizione a colonna
plt.figure(figsize=(10, 25)) 

for i, d in enumerate(samples):
    img = cv2.imread(d["file_name"])
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    visualizer = Visualizer(img, metadata=aug_metadata, scale=1.0)
    
    # Disegniamo le annotazioni
    vis = visualizer.draw_dataset_dict(d)
    
    plt.subplot(num_to_show, 1, i + 1)
    
    plt.imshow(vis.get_image())
    
    file_name = os.path.basename(d["file_name"])
    plt.title(f"File: {file_name} | ID: {d['image_id']}", fontsize=12)
    plt.axis("off")

plt.tight_layout()
plt.show()

print("Abbiamo verificato che le maschere siano coerenti anche su campioni casuali del dataset.")

## 3. Architettura Custom: PointRend & Loss Ibrida
Per superare i limiti della griglia fissa di Mask R-CNN ($28 \times 28$), adottiamo **PointRend**. Questa architettura tratta la segmentazione come un problema di rendering, calcolando i pixel in modo adattivo solo dove l'incertezza è alta (sui bordi).

### Loss Ibrida Focal + Dice
Per gestire il forte sbilanciamento delle classi, implementiamo una funzione di perdita custom:
$$L_{total} = 5 \cdot L_{focal} + L_{dice}$$

* **Focal Loss:** Costringe la rete a concentrarsi sui pixel "difficili" (i cavi) ignorando lo sfondo facile.
* **Dice Loss:** Ottimizza la sovrapposizione globale, garantendo la continuità lineare del cavo.

In [None]:
# 1. IMPLEMENTAZIONE CUSTOM LOSS (DICE + FOCAL) E CUSTOM MASK HEAD 
import torch
from torch.nn import functional as F
from torchvision.ops import sigmoid_focal_loss
from detectron2.projects.point_rend.point_head import StandardPointHead, POINT_HEAD_REGISTRY
from detectron2.projects.point_rend import PointRendMaskHead
from detectron2.modeling import ROI_MASK_HEAD_REGISTRY
from detectron2.modeling.roi_heads.mask_head import mask_rcnn_loss

def dice_loss(inputs, targets, smooth=1.0):
    """
    Calcola la Dice Loss per segmentazione binaria.
    """
    inputs = inputs.view(-1)
    targets = targets.view(-1)
    intersection = (inputs * targets).sum()                            
    dice = (2.*intersection + smooth)/(inputs.sum() + targets.sum() + smooth)  
    return 1 - dice

# --- DEFINIZIONE CLASSI ---

class CustomCablePointHead(StandardPointHead):
    """
    Versione modificata della PointHead per TTPLA.
    Usa DiceLoss + FocalLoss.
    """
    def losses(self, point_logits, point_labels):
        if point_logits.dim() == 3 and point_logits.shape[1] == 1:
            point_logits = point_logits.squeeze(1)
        
        point_probs = point_logits.sigmoid()
        target = point_labels.float()

        # 1. FOCAL LOSS
        loss_focal = sigmoid_focal_loss(
            point_logits, 
            target, 
            alpha=0.95, 
            gamma=2.0, 
            reduction="mean"
        )

        # 2. DICE LOSS
        loss_dice = dice_loss(point_probs, target)

        # 3. Combinazione
        total_loss = loss_dice + (loss_focal * 5.0)

        return {"loss_point": total_loss}

class CustomCableMaskHead(PointRendMaskHead):
    """
    MaskHead personalizzata che chiama la loss custom.
    """
    def forward(self, features, instances):
        if self.training:
            proposal_boxes = [x.proposal_boxes for x in instances]
            
            # 1. Coarse Mask
            coarse_mask = self.coarse_head(self._roi_pooler(features, proposal_boxes))
            losses = {"loss_mask": mask_rcnn_loss(coarse_mask, instances)}
            
            if not self.mask_point_on:
                return losses

            # 2. Point Sampling
            point_coords, point_labels = self._sample_train_points(coarse_mask, instances)
            
            # 3. Features
            point_fine_grained_features = self._point_pooler(features, proposal_boxes, point_coords)
            
            # 4. Logits
            point_logits = self._get_point_logits(
                point_fine_grained_features, point_coords, coarse_mask
            )
            
            # 5. Custom Loss
            point_loss_dict = self.point_head.losses(point_logits, point_labels)
            losses.update(point_loss_dict)
            
            return losses
        else:
            pred_boxes = [x.pred_boxes for x in instances]
            coarse_mask = self.coarse_head(self._roi_pooler(features, pred_boxes))
            return self._subdivision_inference(features, coarse_mask, instances)

# --- REGISTRAZIONE SICURA ---

if "CustomCablePointHead" in POINT_HEAD_REGISTRY:
    del POINT_HEAD_REGISTRY._obj_map["CustomCablePointHead"]
POINT_HEAD_REGISTRY.register(CustomCablePointHead)

if "CustomCableMaskHead" in ROI_MASK_HEAD_REGISTRY:
    del ROI_MASK_HEAD_REGISTRY._obj_map["CustomCableMaskHead"]
ROI_MASK_HEAD_REGISTRY.register(CustomCableMaskHead)

print("Classi registrate correttamente.")

In [None]:
# Cloniamo il repo solo per avere i file di config (yaml) a portata di mano
!git clone https://github.com/facebookresearch/detectron2.git

## 4. Addestramento del Modello
Configuriamo il training seguendo la **3x Schedule** di Detectron2 (~43 epoche).
Utilizziamo una backbone **ResNet-50** con **FPN** (Feature Pyramid Network) per estrarre caratteristiche multiscala. In questa fase applichiamo anche una **Augmentation Online** fotometrica (luminosità, contrasto) per migliorare la robustezza.

In [None]:
# --- IMPORT NECESSARI ---
!pip install gdown
from detectron2.projects import point_rend
import detectron2.data.transforms as T
from detectron2.data import detection_utils as utils
import os
import gdown
import torch
import copy
from detectron2.config import get_cfg
from detectron2.engine import DefaultTrainer
from detectron2.data import MetadataCatalog, DatasetCatalog

# --- 1. DOWNLOAD PESI ---
file_id = '1SoFg6AjB17CIekGvAf_sLIuCE7wEmVfK'
output_weights = "pointrend_rcnn_R_50_FPN_3x_model_final_3c3198.pkl"
url = f'https://drive.google.com/uc?id={file_id}'

if not os.path.exists(output_weights):
    print(f"Scaricamento pesi da mirror Google Drive...")
    gdown.download(url, output_weights, quiet=False)
else:
    print("File dei pesi già presente.")

# --- 2. SETUP DATASET ---
dataset_train_name = "training_dataset_aug"
DatasetCatalog.clear()

try:
    DatasetCatalog.register(dataset_train_name, lambda: detectron2.data.datasets.load_coco_json(TRAIN_AUG_JSON_PATH, TRAIN_AUG_IMG_DIR))
    MetadataCatalog.get(dataset_train_name).set(thing_classes=["cable"])
    print(f"Dataset {dataset_train_name} registrato correttamente.")
except Exception as e:
    print(f"Errore registrazione dataset: {e}")

# --- 3. MAPPER (FOTOMETRIA + FORMATO) ---
def custom_mapper(dataset_dict):
    """
    Gestisce Fotometria (Online) e Formato.
    """
    dataset_dict = copy.deepcopy(dataset_dict)
    image = utils.read_image(dataset_dict["file_name"], format="BGR")
    
    # Lista trasformazioni SOLO FOTOMETRICHE + RESIZE
    transform_list = [
        # --- FOTOMETRIA ---
        T.RandomBrightness(0.8, 1.2),
        T.RandomContrast(0.8, 1.2),
        T.RandomSaturation(0.8, 1.2),
        T.RandomLighting(0.7),
        
        # --- FORMATO  ---
        # Invece di fisso (700, 700), diamo un range. 
        # Il modello imparerà a vedere i cavi a diverse risoluzioni.
        T.ResizeShortestEdge(
            short_edge_length=(640, 672, 704, 736, 768, 800), 
            max_size=1333, 
            sample_style='choice'
        )
    ]
    
    image, transforms = T.apply_transform_gens(transform_list, image)
    dataset_dict["image"] = torch.as_tensor(image.transpose(2, 0, 1).astype("float32"))
    
    annos = [
        utils.transform_instance_annotations(obj, transforms, image.shape[:2])
        for obj in dataset_dict.pop("annotations")
        if obj.get("iscrowd", 0) == 0
    ]
    
    # --- FORMATO (Bitmask per PointRend) ---
    instances = utils.annotations_to_instances(annos, image.shape[:2], mask_format="bitmask")
    dataset_dict["instances"] = utils.filter_empty_instances(instances)
    return dataset_dict

class CustomTrainer(DefaultTrainer):
    @classmethod
    def build_train_loader(cls, cfg):
        return detectron2.data.build_detection_train_loader(cfg, mapper=custom_mapper)

# --- 4. CONFIGURAZIONE COMPLETA ---
cfg = get_cfg()
point_rend.add_pointrend_config(cfg)
config_path = "detectron2/projects/PointRend/configs/InstanceSegmentation/pointrend_rcnn_R_50_FPN_3x_coco.yaml"
cfg.merge_from_file(config_path)

cfg.DATASETS.TRAIN = (dataset_train_name,)
cfg.DATASETS.TEST = () 
cfg.MODEL.WEIGHTS = output_weights

# === VOGLIAMO USARE LA NOSTRA LOSS ===
cfg.MODEL.ROI_MASK_HEAD.NAME = "CustomCableMaskHead" 
cfg.MODEL.POINT_HEAD.NAME = "CustomCablePointHead" 

# Parametri Point Head
cfg.MODEL.POINT_HEAD.FC_DIM = 256
cfg.MODEL.POINT_HEAD.NUM_FC = 3
cfg.MODEL.POINT_HEAD.CLS_AGNOSTIC_MASK = False
cfg.MODEL.POINT_HEAD.COARSE_PRED_EACH_LAYER = True
cfg.MODEL.POINT_HEAD.IN_FEATURES = ["p2"]
cfg.MODEL.POINT_HEAD.TRAIN_NUM_POINTS = 2048 
cfg.MODEL.POINT_HEAD.OVERSAMPLE_RATIO = 3
cfg.MODEL.POINT_HEAD.IMPORTANCE_SAMPLE_RATIO = 0.75

cfg.MODEL.ANCHOR_GENERATOR.ASPECT_RATIOS = [[0.2, 0.5, 1.0, 2.0, 5.0]]

# Parametri di Training
cfg.DATALOADER.NUM_WORKERS = 2
cfg.INPUT.MASK_FORMAT = "bitmask"
cfg.SOLVER.IMS_PER_BATCH = 4
cfg.SOLVER.BASE_LR = 0.0025
cfg.SOLVER.MAX_ITER = 45000 
cfg.SOLVER.STEPS = (34000, 41000)
cfg.SOLVER.CHECKPOINT_PERIOD = 5000 

cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 256 
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1     
cfg.MODEL.POINT_HEAD.NUM_CLASSES = 1    

cfg.INPUT.MIN_SIZE_TRAIN = (700,750,800)
cfg.INPUT.MAX_SIZE_TRAIN = 1333
cfg.INPUT.MIN_SIZE_TEST = 700
cfg.INPUT.MAX_SIZE_TEST = 1333

# Output
cfg.OUTPUT_DIR = "/kaggle/working/output_models"
os.makedirs(cfg.OUTPUT_DIR, exist_ok=True)

# --- AVVIO DEL TRAINING ---
trainer = CustomTrainer(cfg) 
trainer.resume_or_load(resume=False)
trainer.train()

In [None]:

!git clone https://github.com/facebookresearch/detectron2.git detectron2_repo

!python -m pip install -e detectron2_repo

import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

# CODICE FORNITOCI PER LA VALUTAZIONE AUTOMATICA DELL'LDS SCORE

## 5. Valutazione: Line Detection Score (LDS)
Il successo del modello non è misurato solo tramite mAP, ma tramite l'**LDS**, che pondera la qualità della segmentazione con la precisione geometrica dell'angolo:
$$LDS = mAP + mAR + 2 \cdot e^{-0.12 \cdot \Delta\theta}$$

In [None]:
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from pathlib import Path
from collections import defaultdict
from tqdm import tqdm
from pycocotools import mask as coco_mask
import json
import numpy as np
import cv2

def evaluate_segmentation(gt_json_path, pred_json_path, check_cable_class=False):
    # Load ground truth
    coco_gt = COCO(gt_json_path)

    # Load predictions
    with open(pred_json_path, 'r') as f:
        predictions = json.load(f)

    # Load results into COCO results structure
    coco_res = coco_gt.loadRes(predictions)

    # Create COCOeval object
    coco_eval = COCOeval(coco_gt, coco_res, 'segm')
    if check_cable_class:
        coco_eval.params.catIds = [0]  # id of the cable class

    # Run evaluation
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()

    avg_p50 = coco_eval.stats[1]
    avg_r50 = coco_eval.stats[7]
    return avg_p50, avg_r50

In [None]:
def combined_analysis(gt_annotation_file, prediction_file):
    # Load ground truth data
    with open(gt_annotation_file, 'r') as f:
        gt_data = json.load(f)
    # Load prediction data
    with open(prediction_file, 'r') as f:
        pred_data = json.load(f)
    # Group GT lines by image id
    gt_lines_by_image = defaultdict(list)
    for ann in gt_data['annotations']:
        image_id = ann['image_id']
        if 'polar_coordinates' in ann:
            lines = [(coord['rho'], coord['theta']) for coord in ann['polar_coordinates']]
            gt_lines_by_image[image_id].extend(lines)
        else:
            raise RuntimeError(f'no polar coord for image id {image_id}')
    # Group predictions by image id
    pred_by_image = defaultdict(list)
    for pred in pred_data:
        pred_by_image[pred['image_id']].append(pred)
    angle_diffs = []
    rho_diffs = []
    
    def theta_diff(theta_pred, theta_gt):
        t = min(abs(theta_pred - theta_gt), np.pi - abs(theta_pred - theta_gt))
        return np.exp(-.12 * t)
    
    def polygons_to_mask(polygons, shape):
        mask = np.zeros(shape, dtype=np.uint8)
        for polygon in polygons:
            pts = np.array(polygon).reshape((-1, 2)).astype(np.int32)
            cv2.fillPoly(mask, [pts], color=255)
        return mask
    
    def compute_iou(mask1, mask2):
        intersection = np.logical_and(mask1, mask2).sum()
        union = np.logical_or(mask1, mask2).sum()
        return intersection / union if union > 0 else 0
    
    total_matches = 0
    total_gt_lines = 0
    total_pred_lines = 0
    
    for image_info in tqdm(gt_data['images']):
        image_id = image_info['id']
        height, width = image_info['height'], image_info['width']
        
        # Load predictions for this image
        pred_masks = []
        pred_lines = []
        for pred in pred_by_image.get(image_id, []):
            seg = pred['segmentation']
            if isinstance(seg, list):
                mask_poly = polygons_to_mask(seg, (height, width))
                pred_masks.append(mask_poly)
            elif isinstance(seg, dict) and 'counts' in seg and 'size' in seg:
                mask_rle = coco_mask.decode(seg)
                if mask_rle.ndim == 3:
                    mask_rle = mask_rle[:, :, 0]
                mask_rle = (mask_rle * 255).astype(np.uint8)
                pred_masks.append(mask_rle)
            else:
                raise RuntimeError(f'[SEGM] unsupported format for image id {image_id}')
            
            # Extract predicted line if exists
            if 'lines' in pred and len(pred['lines']) == 2:
                rho, theta = pred['lines']
                rho = np.abs(rho / np.sqrt(height**2 + width**2))
                pred_lines.append((rho, theta))
            else:
                pred_lines.append(None)
        
        # Load ground truth masks for this image
        gt_masks = []
        gt_lines = []
        for ann in gt_data['annotations']:
            if ann['image_id'] == image_id:
                seg = ann['segmentation']
                if isinstance(seg, list):
                    mask_poly = polygons_to_mask(seg, (height, width))
                    gt_masks.append(mask_poly)
                elif isinstance(seg, dict) and 'counts' in seg and 'size' in seg:
                    mask_rle = coco_mask.decode(seg)
                    if mask_rle.ndim == 3:
                        mask_rle = mask_rle[:, :, 0]
                    mask_rle = (mask_rle * 255).astype(np.uint8)
                    gt_masks.append(mask_rle)
                else:
                    raise RuntimeError(f'[GT] unsupported format for image id {image_id}')
                
                # Extract GT line
                if 'polar_coordinates' in ann and len(ann['polar_coordinates']) > 0:
                    rho, theta = ann['polar_coordinates'][0]['rho'], ann['polar_coordinates'][0]['theta']
                    rho = np.abs(rho / np.sqrt(height**2 + width**2))
                    gt_lines.append((rho, theta))
                else:
                    gt_lines.append(None)
        
        # Detect the matching mask by IoU
        matched_gt = set()
        for pred_idx, pred_mask in enumerate(pred_masks):
            best_iou = 0
            best_gt_idx = -1
            
            for gt_idx, gt_mask in enumerate(gt_masks):
                if gt_idx in matched_gt:
                    continue
                iou = compute_iou(pred_mask, gt_mask)
                if iou > best_iou:
                    best_iou = iou
                    best_gt_idx = gt_idx
            
            # Consider it a match if IoU > threshold (e.g., 0.5)
            #if best_iou > 0.5:
            if best_gt_idx >= 0:
                matched_gt.add(best_gt_idx)
                total_matches += 1
                
                # Compute the rho_diff and theta_diff if both lines exist
                pred_line = pred_lines[pred_idx]
                gt_line = gt_lines[best_gt_idx]
                
                if pred_line is not None and gt_line is not None:
                    rho_pred, theta_pred = pred_line
                    rho_gt, theta_gt = gt_line
                    
                    rho_diffs.append(abs(rho_pred - rho_gt))
                    angle_diffs.append(theta_diff(theta_pred, theta_gt))
        
        # Count matching and not matching lines
        total_gt_lines += len(gt_masks)
        total_pred_lines += len(pred_masks)
    
    print(f"Total GT lines: {total_gt_lines}")
    print(f"Total predicted lines: {total_pred_lines}")
    print(f"Total matches: {total_matches}")
    print(f"Lines with coordinate differences computed: {len(rho_diffs)}")
    
    if len(rho_diffs) == 0:
        return 0, 0
    
    return np.mean(rho_diffs), np.mean(angle_diffs)

In [None]:
import json
import os

ORIGINAL_JSON_PATH = "/kaggle/input/json-annotazioni/test.json"
FIXED_JSON_PATH = "/kaggle/working/test_fixed.json"


with open(ORIGINAL_JSON_PATH, 'r') as f:
    data = json.load(f)

# Controllo e aggiunta del campo info
if "info" not in data:
    print("Campo 'info' mancante. Aggiunta in corso...")
    data["info"] = {
        "description": "Cable Dataset Test Set",
        "url": "http://kaggle.com",
        "version": "1.0",
        "year": 2024,
        "contributor": "User",
        "date_created": "2024-01-17"
    }
else:
    print("Il campo 'info' esiste già.")

with open(FIXED_JSON_PATH, 'w') as f:
    json.dump(data, f)


In [None]:
def compute_line_detection_score(gt_json_path, pred_json_path):

    avg_p50, avg_r50 = evaluate_segmentation(gt_json_path, pred_json_path)
    rho_diff, angle_diff = combined_analysis(gt_json_path, pred_json_path)

    print(f'{avg_p50=}, {avg_r50=}, {angle_diff=}')

    lds = avg_p50 + avg_r50 + 2 * angle_diff
    print(f'LDS = {lds}')
    return lds
        

In [None]:
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

import numpy as np
import cv2
import json
import os
import math
import random
import contextlib
import io
import sys
from tqdm import tqdm
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.data.datasets import register_coco_instances 
from detectron2.projects import point_rend
import pycocotools.mask as mask_util

# Percorsi
TEST_IMG_DIR = "/kaggle/input/test-cv"
TEST_JSON_PATH = "/kaggle/working/test_fixed.json" 
WEIGHTS_PATH = "/kaggle/input/pesi-pointrend/pesi_finali.pth"
OUTPUT_JSON = "MARTUCCI_271316_ZAPPIA_268784.json" 

# 2. FUNZIONI DI UTILITÀ
def binary_mask_to_rle(binary_mask):
    rle = mask_util.encode(np.asfortranarray(binary_mask.astype(np.uint8)))
    rle["counts"] = rle["counts"].decode("utf-8")
    return rle

def get_polar_line(binary_mask):
    """Calcola Rho e Theta dalla maschera."""
    y_coords, x_coords = np.where(binary_mask)
    if len(x_coords) < 5: return [0.0, 0.0]

    pts = np.column_stack((x_coords, y_coords))
    try:
        [vx, vy, x0, y0] = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01)
        vx, vy, x0, y0 = vx[0], vy[0], x0[0], y0[0]
        nx, ny = -vy, vx
        theta = math.atan2(ny, nx)
        rho = x0 * nx + y0 * ny
        if rho < 0:
            rho = -rho
            theta += math.pi
        theta = theta % (2 * math.pi)
        return [float(rho), float(theta)]
    except:
        return [0.0, 0.0]

def get_bbox_from_mask(mask):
    """Ricalcola la BBox stretta attorno alla maschera."""
    rows = np.any(mask, axis=1)
    cols = np.any(mask, axis=0)
    
    if not np.any(rows) or not np.any(cols):
        return None 
        
    ymin, ymax = np.where(rows)[0][[0, -1]]
    xmin, xmax = np.where(cols)[0][[0, -1]]
    
    # Formato COCO: [x_min, y_min, width, height]
    return [float(xmin), float(ymin), float(xmax - xmin + 1), float(ymax - ymin + 1)]


# Configurazione
cfg = get_cfg()
point_rend.add_pointrend_config(cfg)

config_path = "detectron2_repo/projects/PointRend/configs/InstanceSegmentation/pointrend_rcnn_R_50_FPN_3x_coco.yaml"
cfg.merge_from_file(config_path)
cfg.MODEL.ANCHOR_GENERATOR.ASPECT_RATIOS = [[0.2, 0.5, 1.0, 2.0, 5.0]]
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1  
cfg.MODEL.POINT_HEAD.NUM_CLASSES = 1
cfg.MODEL.WEIGHTS = WEIGHTS_PATH
cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 256
cfg.INPUT.MASK_FORMAT = "bitmask"
cfg.INPUT.MIN_SIZE_TEST = 700
cfg.INPUT.MAX_SIZE_TEST = 700
cfg.MODEL.DEVICE = "cuda"

# Impostazione Parametri Ottimali
cfg.MODEL.ROI_HEADS.NMS_THRESH_TEST = 0.7
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.01

predictor = DefaultPredictor(cfg)

# Dataset
DATASET_NAME = "final_test_dataset"
if DATASET_NAME in DatasetCatalog.list(): DatasetCatalog.remove(DATASET_NAME)
register_coco_instances(DATASET_NAME, {}, TEST_JSON_PATH, TEST_IMG_DIR)
dataset_dicts = DatasetCatalog.get(DATASET_NAME)

final_results = []

for d in tqdm(dataset_dicts, desc="Processing Images"):
    img = cv2.imread(d["file_name"])
    outputs = predictor(img)
    instances = outputs["instances"].to("cpu")
    
    for i in range(len(instances)):
        # 1. Maschera Originale
        original_mask = instances.pred_masks[i].numpy()
        score = float(instances.scores[i])
                 
        bbox_new = get_bbox_from_mask(original_mask)

        if bbox_new is None:
            continue
            
        # 2. Calcolo Parametri sulla maschera 
        rle = binary_mask_to_rle(original_mask)
        line_params = get_polar_line(original_mask)
        area = float(np.sum(original_mask))
        
        res = {
            "image_id": d["image_id"],
            "category_id": 0,
            "bbox": bbox_new,     
            "segmentation": rle,  
            "score": score,
            "lines": line_params, 
            "area": area,
            "id": random.randint(1, 99999999)
        }
        final_results.append(res)

# Salvataggio
with open(OUTPUT_JSON, "w") as f:
    json.dump(final_results, f)

print(f"Predizioni salvate in: {OUTPUT_JSON}")



lds_score = compute_line_detection_score("/kaggle/working/test_fixed.json", OUTPUT_JSON)

print("="*50)
print(f"LDS SCORE FINALE: {lds_score}")
print("="*50)


## 6. Post-Processing Geometrico: Correzione del Bias
Attraverso una **Grid Search** sul training set, abbiamo identificato un errore sistematico di disallineamento geometrico (shift). 
Il codice seguente analizza le maschere predette rispetto alla Ground Truth per calcolare lo shift ottimale $(dx, dy)$ che massimizza l'IoU.

In [None]:
import numpy as np
import cv2
import pycocotools.mask as mask_util
from tqdm import tqdm
from detectron2.data import DatasetCatalog


# 1. FUNZIONI DI SUPPORTO
def annotation_to_mask(segmentation, h, w):
    """
    Converte l'annotazione COCO (poligono o RLE) in maschera binaria numpy.
    Sostituisce GenericMask di Detectron2.
    """
    if isinstance(segmentation, list):
        # Caso Poligono: [[x1, y1, x2, y2, ...], [...]]
        # frPyObjects si aspetta una lista di liste
        rles = mask_util.frPyObjects(segmentation, h, w)
        rle = mask_util.merge(rles)
    elif isinstance(segmentation, dict):
        # Caso RLE standard COCO
        rle = segmentation
    else:
        return np.zeros((h, w), dtype=bool)

    # Decodifica RLE in maschera binaria (0 e 1)
    m = mask_util.decode(rle)
    return m.astype(bool)

def calculate_mask_iou(mask1, mask2):
    """Calcola IoU veloce tra due maschere booleane."""
    intersection = (mask1 & mask2).sum()
    union = (mask1 | mask2).sum()
    if union == 0: return 0.0
    return intersection / union

def apply_shift_and_morph(mask, dx, dy, morph_op=0):
    """Applica shift geometrico e morfologia."""
    m_uint = mask.astype(np.uint8)
    h, w = mask.shape
    shifted = np.zeros_like(m_uint)
    
    # 1. Shift sicuro con slicing
    src_y_start, src_y_end = max(0, -dy), min(h, h - dy)
    src_x_start, src_x_end = max(0, -dx), min(w, w - dx)
    dst_y_start, dst_y_end = max(0, dy), min(h, h + dy)
    dst_x_start, dst_x_end = max(0, dx), min(w, w + dx)
    
    shifted[dst_y_start:dst_y_end, dst_x_start:dst_x_end] = \
        m_uint[src_y_start:src_y_end, src_x_start:src_x_end]
        
    # 2. Morfologia
    if morph_op != 0:
        kernel = np.ones((3,3), np.uint8)
        if morph_op > 0:
            shifted = cv2.dilate(shifted, kernel, iterations=morph_op)
        else:
            shifted = cv2.erode(shifted, kernel, iterations=abs(morph_op))
            
    return shifted.astype(bool)


# 2. MOTORE DI CALIBRAZIONE
def run_full_calibration(predictor, dataset_name):
    dataset_dicts = DatasetCatalog.get(dataset_name)
    print(f"Avvio calibrazione sulle {len(dataset_dicts)} immagini di train.")
    
    valid_pairs = [] # Lista di tuple (gt_mask, pred_mask)
    
    print("Fase 1/2: Raccolta coppie Predizione-GT...")
    for d in tqdm(dataset_dicts):
        img = cv2.imread(d["file_name"])
        h, w = d["height"], d["width"]
        
        # Inferenza 
        outputs = predictor(img)
        pred_instances = outputs["instances"].to("cpu")
        
        if len(pred_instances) == 0 or "annotations" not in d:
            continue
            
        # Prepara GT Masks 
        gt_masks = []
        for anno in d["annotations"]:
            m = annotation_to_mask(anno["segmentation"], h, w)
            gt_masks.append(m)
        
        if not gt_masks: continue
        gt_masks = np.array(gt_masks) # [N_GT, H, W]

        # Match
        pred_masks_np = pred_instances.pred_masks.numpy()
        scores = pred_instances.scores.numpy()
        
        for i, pred_mask in enumerate(pred_masks_np):
            if scores[i] < 0.1: continue 
            
            # Vectorized IoU 
            intersections = np.logical_and(gt_masks, pred_mask).sum(axis=(1,2))
            unions = np.logical_or(gt_masks, pred_mask).sum(axis=(1,2))
            ious = intersections / (unions + 1e-6)
            
            best_gt_idx = np.argmax(ious)
            max_iou = ious[best_gt_idx]
            
            if max_iou > 0.1:
                valid_pairs.append((gt_masks[best_gt_idx], pred_mask))

    n_pairs = len(valid_pairs)
    print(f"Fase 1 completata. Trovate {n_pairs} coppie valide.")
    
    if n_pairs == 0:
        print("Nessuna coppia valida trovata.")
        return 0, 0, 0

    # FASE 2: GRID SEARCH 
    print("Fase 2/2: Grid Search parametri ottimali...")
    
    x_range = range(-3, 4) 
    y_range = range(-3, 4)
    morph_range = [-1, 0, 1] 
    
    results = {}
    total_combinations = len(x_range) * len(y_range) * len(morph_range)
    
    with tqdm(total=total_combinations) as pbar:
        for dx in x_range:
            for dy in y_range:
                for morph in morph_range:
                    cum_iou = 0.0
                    for gt, pred in valid_pairs:
                        mod_pred = apply_shift_and_morph(pred, dx, dy, morph)
                        cum_iou += calculate_mask_iou(mod_pred, gt)
                    
                    results[(dx, dy, morph)] = cum_iou / n_pairs
                    pbar.update(1)

    # RISULTATI 
    best_params = max(results, key=results.get)
    best_iou = results[best_params]
    base_iou = results[(0,0,0)]

    print("\n" + "═"*50)
    print(f"RISULTATI CALIBRAZIONE")
    print("═"*50)
    print(f"IoU Medio Iniziale:    {base_iou:.5f}")
    print(f"IoU Medio Ottimizzato: {best_iou:.5f}")
    print(f"Miglioramento:         +{(best_iou - base_iou)*100:.2f}%")
    print("─"*50)
    print(f"FARE QUESTE OPERAZIONI:")
    print(f"Shift X (dx): {best_params[0]}")
    print(f"Shift Y (dy): {best_params[1]}")
    print(f"Morfologia:   {best_params[2]} (1=dilata, -1=erodi, 0=nulla)")
    print("═"*50)
    
    return best_params

# 1. CONFIGURAZIONE E REGISTRAZIONE DATASET
TRAIN_IMG_DIR = "/kaggle/input/train-cv"
TRAIN_JSON_PATH = "/kaggle/input/json-annotazioni/train.json"

DATASET_NAME = "my_test_dataset_final"
try:
    register_coco_instances(DATASET_NAME, {}, TRAIN_JSON_PATH, TRAIN_IMG_DIR)
except AssertionError:
    pass # Già registrato

best_dx, best_dy, best_morph = run_full_calibration(predictor, DATASET_NAME)

## 7. Inferenza Finale e Generazione Output
Applichiamo il modello sul Test Set con i parametri ottimizzati:
* **Score Threshold: 0.01** (Massimizziamo la Recall, fondamentale per il punteggio LDS).
* **NMS Threshold: 0.7** (Evitiamo di sopprimere cavi paralleli o incrociati).
* **Shift Correction (+1, +1):** Correzione del bias geometrico rilevato.

Infine, calcoliamo le coordinate polari tramite regressione `cv2.fitLine` sui pixel della maschera.

In [None]:
import detectron2
from detectron2.utils.logger import setup_logger
setup_logger()

import numpy as np
import cv2
import json
import os
import math
import random
import contextlib
import io
import sys
from tqdm import tqdm
from detectron2 import model_zoo
from detectron2.engine import DefaultPredictor
from detectron2.config import get_cfg
from detectron2.data import MetadataCatalog, DatasetCatalog
from detectron2.projects import point_rend
import pycocotools.mask as mask_util

# 1. PARAMETRI MIGLIORI IN SEGUITO ALLA GRID SEARCH
BEST_NMS = 0.7   
BEST_SCORE = 0.01 

# Percorsi
TEST_IMG_DIR = "/kaggle/input/test-cv"
TEST_JSON_PATH = "/kaggle/working/test_fixed.json" # Quello corretto con il campo info
WEIGHTS_PATH = "/kaggle/input/pesi-pointrend/pesi_finali.pth"
OUTPUT_SHIFTED_JSON = "MARTUCCI_271316_ZAPPIA_268784.json"

# 2. FUNZIONI DI UTILITÀ (SHIFT & RICALCOLO)

def binary_mask_to_rle(binary_mask):
    rle = mask_util.encode(np.asfortranarray(binary_mask.astype(np.uint8)))
    rle["counts"] = rle["counts"].decode("utf-8")
    return rle

def get_polar_line(binary_mask):
    """Calcola Rho e Theta dalla maschera."""
    y_coords, x_coords = np.where(binary_mask)
    if len(x_coords) < 5: return [0.0, 0.0]

    pts = np.column_stack((x_coords, y_coords))
    try:
        [vx, vy, x0, y0] = cv2.fitLine(pts, cv2.DIST_L2, 0, 0.01, 0.01)
        vx, vy, x0, y0 = vx[0], vy[0], x0[0], y0[0]
        nx, ny = -vy, vx
        theta = math.atan2(ny, nx)
        rho = x0 * nx + y0 * ny
        if rho < 0:
            rho = -rho
            theta += math.pi
        theta = theta % (2 * math.pi)
        return [float(rho), float(theta)]
    except:
        return [0.0, 0.0]

def apply_shift_and_recalc(original_mask, dx=1, dy=1):
    """
    Applica shift geometrico alla maschera e restituisce la maschera spostata.
    """
    h, w = original_mask.shape
    shifted_mask = np.zeros_like(original_mask)
    
    # Logica di Shift Numpy Slicing:
    # Copia src[:-dy, :-dx] in dst[dy:, dx:]
    # Gestisce i bordi evitando errori di indice
    src_y_start, src_y_end = 0, h - dy
    src_x_start, src_x_end = 0, w - dx
    dst_y_start, dst_y_end = dy, h
    dst_x_start, dst_x_end = dx, w
    
    shifted_mask[dst_y_start:dst_y_end, dst_x_start:dst_x_end] = \
        original_mask[src_y_start:src_y_end, src_x_start:src_x_end]
        
    return shifted_mask

def get_bbox_from_mask(mask):
    """Ricalcola la BBox stretta attorno alla maschera (necessario dopo lo shift)."""
    rows = np.any(mask, axis=1)
    cols = np.any(mask, axis=0)
    
    if not np.any(rows) or not np.any(cols):
        return None # Maschera vuota dopo lo shift (es. uscita dall'immagine)
        
    ymin, ymax = np.where(rows)[0][[0, -1]]
    xmin, xmax = np.where(cols)[0][[0, -1]]
    
    # Formato COCO: [x_min, y_min, width, height]
    return [float(xmin), float(ymin), float(xmax - xmin + 1), float(ymax - ymin + 1)]


# Configurazione
cfg = get_cfg()
point_rend.add_pointrend_config(cfg)

config_path = "detectron2_repo/projects/PointRend/configs/InstanceSegmentation/pointrend_rcnn_R_50_FPN_3x_coco.yaml"
cfg.merge_from_file(config_path)
cfg.MODEL.ANCHOR_GENERATOR.ASPECT_RATIOS = [[0.2, 0.5, 1.0, 2.0, 5.0]]
cfg.MODEL.ROI_HEADS.NUM_CLASSES = 1  
cfg.MODEL.POINT_HEAD.NUM_CLASSES = 1
cfg.MODEL.WEIGHTS = WEIGHTS_PATH
cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 256
cfg.INPUT.MASK_FORMAT = "bitmask"
cfg.INPUT.MIN_SIZE_TEST = 700
cfg.INPUT.MAX_SIZE_TEST = 700
cfg.MODEL.DEVICE = "cuda"

# Impostazione Parametri Ottimali
cfg.MODEL.ROI_HEADS.NMS_THRESH_TEST = BEST_NMS
cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = BEST_SCORE

predictor = DefaultPredictor(cfg)

# Dataset
DATASET_NAME = "final_test_ds"
if DATASET_NAME in DatasetCatalog.list(): DatasetCatalog.remove(DATASET_NAME)
register_coco_instances(DATASET_NAME, {}, TEST_JSON_PATH, TEST_IMG_DIR)
dataset_dicts = DatasetCatalog.get(DATASET_NAME)

final_results = []

for d in tqdm(dataset_dicts, desc="Processing Images"):
    img = cv2.imread(d["file_name"])
    outputs = predictor(img)
    instances = outputs["instances"].to("cpu")
    
    for i in range(len(instances)):
        # 1. Maschera Originale
        original_mask = instances.pred_masks[i].numpy()
        score = float(instances.scores[i])
        
        # 2. APPLICAZIONE SHIFT (+1, +1)
        shifted_mask = apply_shift_and_recalc(original_mask, dx=1, dy=1)
        
        # 3. Ricalcolo BBox (Fondamentale: la box deve seguire la maschera)
        bbox_new = get_bbox_from_mask(shifted_mask)
        
        # Se lo shift ha fatto uscire la maschera dall'immagine, saltiamo
        if bbox_new is None:
            continue
            
        # 4. Ricalcolo Parametri sulla maschera spostata
        rle = binary_mask_to_rle(shifted_mask)
        line_params = get_polar_line(shifted_mask)
        area = float(np.sum(shifted_mask))
        
        res = {
            "image_id": d["image_id"],
            "category_id": 0,
            "bbox": bbox_new,     # Box aggiornata
            "segmentation": rle,  # RLE aggiornato
            "score": score,
            "lines": line_params, # Linea aggiornata
            "area": area,
            "id": random.randint(1, 99999999)
        }
        final_results.append(res)

# Salvataggio
with open(OUTPUT_SHIFTED_JSON, "w") as f:
    json.dump(final_results, f)

print(f"Predizioni shiftate salvate in: {OUTPUT_SHIFTED_JSON}")

# 3. VALUTAZIONE LDS

try:
    with contextlib.redirect_stdout(io.StringIO()):
        lds_score = compute_line_detection_score(TEST_JSON_PATH, OUTPUT_SHIFTED_JSON)

    print("="*50)
    print(f"LDS SCORE FINALE (SHIFT +1,+1): {lds_score}")
    print("="*50)

except Exception as e:
    print(f"Errore nel calcolo dello score: {e}")
    compute_line_detection_score(TEST_JSON_PATH, OUTPUT_SHIFTED_JSON)