In [1]:
from google.colab import drive

drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [8]:
from IPython import get_ipython
from IPython.display import display

In [3]:
!pip install efficientnet_pytorch

Collecting efficientnet_pytorch
  Downloading efficientnet_pytorch-0.7.1.tar.gz (21 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch->efficientnet_pytorch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch->efficientnet_pytorch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch->efficientnet_pytorch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch->efficientnet_pytorch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch->efficientnet_pytorch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metada

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import transforms
from efficientnet_pytorch import EfficientNet
import numpy as np
from typing import Dict, List, Tuple, Optional

In [9]:
class EfficientNetBackbone2D(nn.Module):
    """
    Backbone EfficientNet per analisi multi-modale di tavole tecniche 2D.
    Rileleva errori in saldature, cartiglio e BOM.
    """

    def __init__(
        self,
        model_name: str = 'efficientnet-b0',
        num_classes: int = 6,  # missing_weld, weld_error, valid_name, des_name, mat_cod, part_cod
        freeze_layers: int = 3,  # Numero di blocchi da congelare
        dropout_rate: float = 0.2,
        pretrained: bool = True
    ):
        super(EfficientNetBackbone2D, self).__init__()

        self.num_classes = num_classes
        self.class_names = [
            'missing_weld',    # 0: Simboli di saldatura mancanti
            'weld_error',      # 1: Simboli di saldatura posizionati male
            'valid_name',      # 2: Nome validatore presente e corretto
            'des_name',        # 3: Nome disegnatore presente
            'mat_cod',         # 4: Codice materiale presente
            'part_cod'         # 5: Codice parte presente
        ]

        # Carica EfficientNet pre-addestrata
        if pretrained:
            self.backbone = EfficientNet.from_pretrained(model_name)
        else:
            self.backbone = EfficientNet.from_name(model_name)

        # Ottieni il numero di features dell'ultimo layer
        self.num_features = self.backbone._fc.in_features

        # Rimuovi il classificatore originale
        self.backbone._fc = nn.Identity()

        # Congela i primi layer
        self._freeze_layers(freeze_layers)

        # Feature extractor custom per le diverse regioni
        # Modificato: removed redundant pooling/flatten layers from the attention module itself
        self.region_attention = nn.ModuleDict({
            'weld_region': self._create_attention_module(),
            'title_block': self._create_attention_module(),
            'bom_region': self._create_attention_module()
        })

        # Classificatori specifici per ogni tipo di errore
        self.classifiers = nn.ModuleDict({
            # Classificatori per errori di saldatura
            'weld_classifier': self._create_classifier(self.num_features, 2, dropout_rate),  # missing_weld, weld_error

            # Classificatori per cartiglio
            'title_classifier': self._create_classifier(self.num_features, 2, dropout_rate),  # valid_name, des_name

            # Classificatori per BOM
            'bom_classifier': self._create_classifier(self.num_features, 2, dropout_rate)     # mat_cod, part_cod
        })

        # Classificatore finale multi-label
        self.final_classifier = nn.Sequential(
            nn.Linear(self.num_features, 512),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(256, num_classes)
        )

        # Loss weights per bilanciare le classi
        self.loss_weights = torch.tensor([1.2, 1.0, 0.8, 0.8, 1.0, 1.0])  # Peso maggiore per errori critici

    def _freeze_layers(self, freeze_layers: int):
        """Congela i primi N blocchi della rete"""
        blocks_to_freeze = min(freeze_layers, len(self.backbone._blocks))

        # Congela stem (primo layer)
        for param in self.backbone._conv_stem.parameters():
            param.requires_grad = False
        for param in self.backbone._bn0.parameters():
            param.requires_grad = False

        # Congela i primi N blocchi
        for i in range(blocks_to_freeze):
            for param in self.backbone._blocks[i].parameters():
                param.requires_grad = False

        print(f"Congelati i primi {blocks_to_freeze} blocchi + stem layer")

    def _create_attention_module(self):
        """Crea un modulo di attenzione per le regioni specifiche.
        Applica solo i layer lineari e l'attivazione,
        poiché l'input è già pre-poolato e appiattito."""
        return nn.Sequential(
            # Rimosso AdaptiveAvgPool2d e Flatten da qui
            nn.Linear(self.num_features, self.num_features // 4),
            nn.ReLU(),
            nn.Linear(self.num_features // 4, self.num_features),
            nn.Sigmoid()
        )

    def _create_classifier(self, in_features: int, out_features: int, dropout_rate: float):
        """Crea un classificatore specifico"""
        return nn.Sequential(
            nn.Linear(in_features, in_features // 2),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(in_features // 2, out_features)
        )

    def forward(self, x: torch.Tensor, region_masks: Optional[Dict[str, torch.Tensor]] = None):
        """
        Forward pass con attenzione alle regioni

        Args:
            x: Input tensor (B, C, H, W)
            region_masks: Dict con maschere per le diverse regioni (opzionale)

        Returns:
            Dict con le predizioni per ogni categoria
        """
        # Estrazione features dalla backbone
        features = self.backbone.extract_features(x)  # (B, C, H_feat, W_feat)

        # Get the spatial dimensions of the feature map
        _, _, H_feat, W_feat = features.shape

        # Global average pooling
        global_features = F.adaptive_avg_pool2d(features, 1).flatten(1)  # (B, num_features)

        # Applicazione dell'attenzione alle regioni (se fornite le maschere)
        region_features = {}
        if region_masks is not None:
            # Resize masks to match feature map size and move to the correct device
            resized_masks = {}
            for region_name, mask in region_masks.items():
                if region_name in self.region_attention:
                    # Ensure mask is on the correct device
                    mask = mask.to(features.device)
                    # Resize mask to feature map spatial dimensions
                    # Add a channel dimension to the mask before interpolation
                    resized_mask = F.interpolate(mask.unsqueeze(1), size=(H_feat, W_feat), mode='bilinear', align_corners=False)
                    # Remove the channel dimension and ensure it's float for multiplication
                    resized_masks[region_name] = resized_mask.squeeze(1).float()

            for region_name, resized_mask in resized_masks.items():
                if region_name in self.region_attention:
                    # Applica maschera alle features
                    # Resized mask shape: (B, H_feat, W_feat)
                    # Features shape: (B, C, H_feat, W_feat)
                    # Unsqueeze mask for broadcasting with features: (B, 1, H_feat, W_feat)
                    masked_features = features * resized_mask.unsqueeze(1)

                    # Pool and flatten the masked features BEFORE passing to the attention module
                    pooled = F.adaptive_avg_pool2d(masked_features, 1).view(masked_features.size(0), -1) # (B, num_features)

                    # Calcola attenzione
                    # Now pooled (B, num_features) is passed directly to the attention linear layers
                    attention = self.region_attention[region_name](pooled)
                    region_features[region_name] = pooled * attention


        # Classificazioni specifiche
        predictions = {}

        # Classificazione saldature
        # Ensure regions are present in region_features before accessing
        if 'weld_region' in region_features:
            weld_preds = self.classifiers['weld_classifier'](region_features['weld_region'])
            predictions['weld_missing'] = weld_preds[:, 0]
            predictions['weld_error'] = weld_preds[:, 1]

        # Classificazione cartiglio
        if 'title_block' in region_features:
            title_preds = self.classifiers['title_classifier'](region_features['title_block'])
            predictions['valid_name'] = title_preds[:, 0]
            predictions['des_name'] = title_preds[:, 1]

        # Classificazione BOM
        if 'bom_region' in region_features:
            bom_preds = self.classifiers['bom_classifier'](region_features['bom_region'])
            predictions['mat_cod'] = bom_preds[:, 0]
            predictions['part_cod'] = bom_preds[:, 1]

        # Classificazione finale multi-label
        final_logits = self.final_classifier(global_features)

        # Organizza output finale
        output = {
            'logits': final_logits,
            'probabilities': torch.sigmoid(final_logits),
            'region_predictions': predictions,
            'global_features': global_features
        }

        return output


    def compute_loss(self, outputs: Dict, targets: torch.Tensor, region_targets: Optional[Dict] = None):
        """
        Calcola la loss combinata per tutti i task

        Args:
            outputs: Output del modello
            targets: Target multi-label (B, num_classes)
            region_targets: Target specifici per regione (opzionale)
        """
        # Loss principale multi-label
        main_loss = F.binary_cross_entropy_with_logits(
            outputs['logits'],
            targets.float(),
            weight=self.loss_weights.to(targets.device)
        )

        total_loss = main_loss
        loss_components = {'main_loss': main_loss.item()}

        # Loss aggiuntive per le regioni specifiche
        if region_targets and 'region_predictions' in outputs:
            region_loss = 0
            count = 0

            # Mappa i nomi delle predizioni regionali ai nomi dei target regionali
            # Le chiavi di outputs['region_predictions'] sono 'weld_missing', 'weld_error', 'valid_name', etc.
            # Le chiavi di region_targets dovrebbero essere i nomi delle regioni 'weld_region', 'title_block', etc.
            # Dobbiamo assicurarci che i target per 'weld_missing' e 'weld_error'
            # siano mappati correttamente al target di 'weld_region', ecc.

            # Creiamo una mappa inversa per facilitare la ricerca dei target
            # Questo approccio potrebbe non essere ideale se i target regionali
            # non corrispondono direttamente 1-a-1 con i sottotask (es. un target per 'weld_region'
            # che si applica sia a 'missing_weld' che a 'weld_error').
            # Assumiamo che region_targets contenga i target per i task specifici
            # 'missing_weld', 'weld_error', 'valid_name', 'des_name', 'mat_cod', 'part_cod'
            # filtrati per le regioni.

            # Rivediamo la logica: i classificatori regionali hanno 2 output ciascuno.
            # 'weld_classifier' -> ['weld_missing', 'weld_error']
            # 'title_classifier' -> ['valid_name', 'des_name']
            # 'bom_classifier' -> ['mat_cod', 'part_cod']
            # I region_targets dovrebbero avere le stesse chiavi dei task finali
            # ma solo per quelli coperti dai classificatori regionali.
            # Es: region_targets = {'missing_weld': target_tensor_0, 'weld_error': target_tensor_1, ...}

            # Controlliamo se region_targets contiene le chiavi delle predizioni regionali
            regional_task_keys = {
                'weld_classifier': ['weld_missing', 'weld_error'],
                'title_classifier': ['valid_name', 'des_name'],
                'bom_classifier': ['mat_cod', 'part_cod']
            }

            for classifier_name, task_keys in regional_task_keys.items():
                if classifier_name in self.classifiers: # Ensure the classifier exists
                     # Get the raw predictions from the specific classifier
                     # We need the output BEFORE splitting into missing/error etc.
                     # This requires modifying the forward pass to store classifier outputs
                     # or recalculating here. Let's modify forward to store these if needed.

                     # Assuming region_predictions already stores the *split* predictions
                     # we need to map them back or adjust the target handling.
                     # If region_targets has keys 'weld_missing', 'weld_error', etc.
                    valid_regional_preds = []
                    valid_regional_targets = []
                    current_region_loss = 0
                    current_region_count = 0

                    for task_key in task_keys:
                        if task_key in outputs['region_predictions'] and task_key in region_targets:
                             # Ensure shapes match - region_preds[:, i] has shape (B,)
                             # region_targets[task_key] should also have shape (B,)
                            if outputs['region_predictions'][task_key].shape == region_targets[task_key].shape:
                                reg_loss = F.binary_cross_entropy_with_logits(
                                    outputs['region_predictions'][task_key],
                                    region_targets[task_key].float()
                                )
                                current_region_loss += reg_loss
                                current_region_count += 1
                            else:
                                print(f"Warning: Shape mismatch for regional target '{task_key}'. Pred shape: {outputs['region_predictions'][task_key].shape}, Target shape: {region_targets[task_key].shape}")

                    if current_region_count > 0:
                        # Average loss for the tasks associated with this classifier
                        avg_classifier_loss = current_region_loss / current_region_count
                        region_loss += avg_classifier_loss
                        count += 1 # Increment overall region count based on classifiers with valid targets

            if count > 0:
                # Average loss across the classifiers that had valid targets
                region_loss /= count
                total_loss += 0.3 * region_loss  # Peso minore per le loss regionali
                loss_components['region_loss'] = region_loss.item()
                # Optionally add individual classifier losses
                # loss_components['weld_classifier_loss'] = ...
                # loss_components['title_classifier_loss'] = ...
                # loss_components['bom_classifier_loss'] = ...


        return total_loss, loss_components

    def predict(self, x: torch.Tensor, region_masks: Optional[Dict[str, torch.Tensor]] = None, threshold: float = 0.5) -> Dict:
        """
        Predizione con soglia per classificazione binaria

        Args:
            x: Input tensor
            region_masks: Dict con maschere per le diverse regioni (opzionale)
            threshold: Soglia per la classificazione binaria

        Returns:
            Dict con predizioni e confidenze per ogni categoria
            Include anche predizioni e confidenze per le regioni specifiche
        """
        self.eval()
        with torch.no_grad():
            # Pass region_masks to the forward method
            outputs = self.forward(x, region_masks)

            # Global predictions
            global_probs = outputs['probabilities']
            global_predictions = (global_probs > threshold).int()

            # Organize global results
            results = {'global': {}}
            for i, class_name in enumerate(self.class_names):
                results['global'][class_name] = {
                    'prediction': global_predictions[:, i].cpu().numpy(),
                    'confidence': global_probs[:, i].cpu().numpy()
                }

            # Regional predictions
            results['regional'] = {}
            if 'region_predictions' in outputs:
                 for task_name, logits in outputs['region_predictions'].items():
                     regional_probs = torch.sigmoid(logits)
                     regional_preds = (regional_probs > threshold).int()
                     results['regional'][task_name] = {
                         'prediction': regional_preds.cpu().numpy(),
                         'confidence': regional_probs.cpu().numpy()
                     }


            return results

    def get_trainable_parameters(self):
        """Restituisce solo i parametri allenabili"""
        return [p for p in self.parameters() if p.requires_grad]

    def unfreeze_all(self):
        """Sblocca tutti i parametri per fine-tuning completo"""
        for param in self.parameters():
            param.requires_grad = True
        print("Tutti i parametri sono stati sbloccati")


# Utility per preprocessing delle immagini
class DrawingPreprocessor:
    """Preprocessor specifico per tavole tecniche"""

    def __init__(self, input_size: Tuple[int, int] = (512, 512)):
        self.input_size = input_size

        # Transform per training
        self.train_transform = transforms.Compose([
            transforms.Resize(input_size),
            transforms.RandomRotation(degrees=(-2, 2)),  # Rotazione minima
            transforms.RandomAffine(degrees=0, translate=(0.02, 0.02)),  # Traslazione minima
            transforms.ColorJitter(brightness=0.1, contrast=0.1),  # Variazione minima di contrasto
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

        # Transform per validazione/test
        self.val_transform = transforms.Compose([
            transforms.Resize(input_size),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def create_region_masks(self, image_size: Tuple[int, int]) -> Dict[str, np.ndarray]:
        """
        Crea maschere approssimative per le diverse regioni della tavola

        Args:
            image_size: (height, width) dell'immagine

        Returns:
            Dict con maschere per ogni regione
        """
        h, w = image_size
        masks = {}

        # Maschera per regione saldature (parte centrale-superiore)
        weld_mask = np.zeros((h, w), dtype=np.float32)
        weld_mask[h//6:2*h//3, w//6:5*w//6] = 1.0
        masks['weld_region'] = weld_mask

        # Maschera per cartiglio (angolo basso-destro)
        title_mask = np.zeros((h, w), dtype=np.float32)
        title_mask[3*h//4:, 2*w//3:] = 1.0
        masks['title_block'] = title_mask

        # Maschera per BOM (lato destro)
        bom_mask = np.zeros((h, w), dtype=np.float32)
        bom_mask[h//6:2*h//3, 3*w//4:] = 1.0
        masks['bom_region'] = bom_mask

        return masks


# Esempio di utilizzo
def create_model_and_example():
    """Esempio di creazione e utilizzo del modello"""

    # Crea il modello
    model = EfficientNetBackbone2D(
        model_name='efficientnet-b0',
        num_classes=6,
        freeze_layers=3,
        dropout_rate=0.2,
        pretrained=True
    )

    print(f"Modello creato con {sum(p.numel() for p in model.parameters())} parametri totali")
    print(f"Parametri allenabili: {sum(p.numel() for p in model.get_trainable_parameters())}")

    # Esempio di input
    batch_size = 4
    input_tensor = torch.randn(batch_size, 3, 512, 512)

    # Crea maschere di esempio
    preprocessor = DrawingPreprocessor()
    region_masks = preprocessor.create_region_masks((512, 512))

    # Converti maschere in tensor e aggiungi batch dimension
    mask_tensors = {}
    for region, mask in region_masks.items():
        # mask is np array (H, W), convert to tensor (H, W), unsqueeze to (1, H, W), repeat batch_size times (B, H, W)
        mask_tensors[region] = torch.from_numpy(mask).unsqueeze(0).repeat(batch_size, 1, 1)

    # Forward pass
    model.eval()
    with torch.no_grad():
        # Pass mask_tensors to the forward method
        outputs = model(input_tensor, mask_tensors)

    print(f"Shape output logits: {outputs['logits'].shape}")
    print(f"Shape probabilità: {outputs['probabilities'].shape}")
    print(f"Classi: {model.class_names}")

    # Esempio di calcolo della loss (necessita di target)
    # Crea target di esempio (random per dimostrazione)
    dummy_targets = torch.randint(0, 2, (batch_size, model.num_classes)).float() # Random binary targets
    # Crea target regionali di esempio - devono corrispondere alle task regionali
    dummy_region_targets = {
        'weld_missing': torch.randint(0, 2, (batch_size,)).float(),
        'weld_error': torch.randint(0, 2, (batch_size,)).float(),
        'valid_name': torch.randint(0, 2, (batch_size,)).float(),
        'des_name': torch.randint(0, 2, (batch_size,)).float(),
        'mat_cod': torch.randint(0, 2, (batch_size,)).float(),
        'part_cod': torch.randint(0, 2, (batch_size,)).float(),
    }

    total_loss, loss_components = model.compute_loss(outputs, dummy_targets, dummy_region_targets)
    print(f"\nTotal Loss: {total_loss.item()}")
    print(f"Loss Components: {loss_components}")

    # Esempio di predizione (con soglia)
    predictions_results = model.predict(input_tensor, mask_tensors)
    print("\nPrediction Results (Global):")
    for class_name, data in predictions_results['global'].items():
        print(f"  {class_name}: Prediction={data['prediction']}, Confidence={data['confidence']}")

    print("\nPrediction Results (Regional):")
    for task_name, data in predictions_results['regional'].items():
         print(f"  {task_name}: Prediction={data['prediction']}, Confidence={data['confidence']}")


    return model, preprocessor

if __name__ == "__main__":
    model, preprocessor = create_model_and_example()

Loaded pretrained weights for efficientnet-b0
Congelati i primi 3 blocchi + stem layer
Modello creato con 9722056 parametri totali
Parametri allenabili: 9702966
Shape output logits: torch.Size([4, 6])
Shape probabilità: torch.Size([4, 6])
Classi: ['missing_weld', 'weld_error', 'valid_name', 'des_name', 'mat_cod', 'part_cod']

Total Loss: 0.8782455921173096
Loss Components: {'main_loss': 0.6702383160591125, 'region_loss': 0.6933574676513672}

Prediction Results (Global):
  missing_weld: Prediction=[0 0 0 0], Confidence=[0.47620454 0.4771901  0.47639662 0.47755376]
  weld_error: Prediction=[1 1 1 1], Confidence=[0.5087033  0.5089917  0.50880814 0.50924736]
  valid_name: Prediction=[1 1 1 1], Confidence=[0.5256676  0.52720386 0.52708864 0.52630985]
  des_name: Prediction=[1 1 1 1], Confidence=[0.5077051  0.50663555 0.5076995  0.5056138 ]
  mat_cod: Prediction=[0 0 0 0], Confidence=[0.49432546 0.49603495 0.49490675 0.49518895]
  part_cod: Prediction=[1 1 1 1], Confidence=[0.51192087 0.5119