In [1]:
pip install pycocotools

Collecting pycocotools
  Obtaining dependency information for pycocotools from https://files.pythonhosted.org/packages/ba/64/0451cf41a00fd5ac4501de4ea0e395b7d909e09d665e56890b5d3809ae26/pycocotools-2.0.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata
  Downloading pycocotools-2.0.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (1.1 kB)
Downloading pycocotools-2.0.7-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (426 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m426.2/426.2 kB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pycocotools
Successfully installed pycocotools-2.0.7
Note: you may need to restart the kernel to use updated packages.


In [2]:
import numpy as np                                    # Importa Numpy
import skimage.io as io                               # Importa il modulo Input/ouput di SK-Image
from skimage.transform import resize                  # Importa il modulo resize da SK-Image
from os import listdir                                # Importa il modulo listdir da OS

import json                                           # Importa Json
from matplotlib.collections import PatchCollection    # Importa PatchCollection dal modulo collections di MatPlotLib
from pycocotools.coco import COCO                     # Importa COCO dal modulo coco di PyCoco-Tools
import pycocotools.mask as cocomask                   # Importa il modulo Mask di PyCoco-Tools
import matplotlib.pyplot as plt                       # Importa il modulo  pyplot di MatPlotLib

import PIL
from PIL import Image, ImageDraw                      # Importa il modulo Image da PIL

from tensorflow import keras                          # Importa il modulo Keras di TensorFlow
import os                                             # Importa os

from tqdm import tqdm                                 # Importa il modulo tqdm da tqdm

import torch
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
from torch import nn, Tensor

import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn, FasterRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, FasterRCNN
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
from torchvision.transforms import transforms
from torchvision.models.detection.transform import GeneralizedRCNNTransform
from torchvision.models.detection.rpn import AnchorGenerator, RPNHead, RegionProposalNetwork
from torchvision.models.detection.roi_heads import RoIHeads
from torchvision.ops import MultiScaleRoIAlign

from typing import Dict, List, Optional, Tuple

import cv2



In [3]:
# Path
path_in = '/kaggle/input/dataset-ipcv-teama/'
path_out = '/kaggle/working/'

coco_path = path_in + 'COCO_annotations.json/COCO_annotations.json'
train_path = path_in + 'dataset_IPCV/kaggle/working/train'
test_path = path_in + 'dataset_IPCV/kaggle/working/test'
val_path = path_in + 'dataset_IPCV/kaggle/working/val'

weight_path = path_out + 'weight/'

# Def. di Funzioni

In [4]:
def load_image(image_path):
    """
    Carica un'immagine da un percorso specificato.

    Precondizione:
        - `image_path` è il percorso completo dell'immagine da caricare.

    Postcondizione:
        - Restituisce l'immagine caricata.
        - Solleva un'eccezione AssertionError se l'immagine non viene trovata nel percorso specificato.
    """
    image = plt.imread(image_path)
    assert image is not None, f"IMAGE NOT FOUND AT {image_path}"
    return image

In [5]:
class BuildingsDataset(Dataset):
    def __init__(self, images_dir, annotation_path, transform=None):
        """
        Inizializza un oggetto BuildingsDataset.

        Precondizioni:
            - `images_dir` è il percorso della directory contenente le immagini.
            - `annotation_path` è il percorso del file JSON contenente le annotazioni.
            - `transform` è un oggetto trasformazione (es. da torchvision.transforms) per applicare
              trasformazioni alle immagini e alle etichette, opzionale.

        Postcondizioni:
            - `image_paths` è una lista di percorsi completi delle immagini nella directory `images_dir`.
            - `annotations` contiene le annotazioni caricate dal file JSON specificato.
            - `transform` è l'oggetto trasformazione fornito.
        """
        self.image_paths = [os.path.join(images_dir, image_id) for image_id in sorted(os.listdir(images_dir))]
        self.transform = transform
 
        with open(annotation_path, 'r') as f:
            self.annotations = json.load(f)
 
    def transform_image_bbox(self, image, label):
        """
        Applica trasformazioni all'immagine e alle etichette, se definite.

        Precondizioni:
            - `image` è l'immagine da trasformare.
            - `label` sono le etichette associate all'immagine.

        Postcondizioni:
            - Se `transform` è definito, applica le trasformazioni all'immagine e alle etichette.
            - Restituisce l'immagine e le etichette trasformate.
        """
        if self.transform:
            transformed = self.transform(image=image, labels=label)
            image = transformed['image']
            label = transformed['labels']
 
        image = transforms.ToTensor()(image)
        return image, label
 
    def __getitem__(self, index):
        """
        Restituisce un campione specifico dell'insieme di dati.

        Precondizioni:
            - `index` è l'indice del campione da restituire.

        Postcondizioni:
            - Restituisce un'immagine e le relative etichette nel formato richiesto.
              Se non ci sono annotazioni per l'immagine, restituisce un target vuoto.
        """
        image_path = self.image_paths[index]
        image_name = image_path.split("/")[-1]
        image_id = next((img['id'] for img in self.annotations['images'] if img['file_name'] == image_name), None)
        image = load_image(image_path).astype(np.float32)
        image /= 255.0
        image = torch.from_numpy(image).permute(2,0,1) # change the shape from [h,w,c] to [c,h,w]  

        box_lab = [anno for anno in self.annotations['annotations'] if anno['image_id'] == image_id]

        if(len(box_lab) == 0):
            target = {}
            boxes = torch.zeros((0, 4), dtype=torch.float32) 
            target = {
                    "boxes": boxes, 
                    "labels": torch.zeros(0, dtype=torch.int64), 
                    "image_id": torch.as_tensor([4])
                     }
        else:
            boxes = [json.loads(record['bbox']) for record in box_lab]
            categories = [record['category_id'] for record in box_lab]
            boxes = np.array(boxes)
            # change the co-ordinates into expected [x, y, x+w, y+h] format
            boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
            boxes[:, 3] = boxes[:, 1] + boxes[:, 3]
            boxes = torch.as_tensor(boxes, dtype=torch.float32)

            labels = torch.as_tensor(categories, dtype=torch.int64)

            target = {
                   "boxes": boxes, 
                   "labels": labels,
                   "image_id": torch.tensor([index])
                    }
                
        return image, target
 
    def __len__(self):
        """
        Restituisce il numero totale di campioni nell'insieme di dati.

        Postcondizioni:
            - Restituisce la lunghezza dell'insieme di dati.
        """
        return len(self.image_paths)

In [6]:
class ImageList:
    """
    Structure that holds a list of images (of possibly
    varying sizes) as a single tensor.
    This works by padding the images to the same size,
    and storing in a field the original sizes of each image

    Args:
        tensors (tensor): Tensor containing images.
        image_sizes (list[tuple[int, int]]): List of Tuples each containing size of images.
    """

    def __init__(self, tensors: Tensor, image_sizes: List[Tuple[int, int]]) -> None:
        self.tensors = tensors
        self.image_sizes = image_sizes

    def to(self, device: torch.device) -> "ImageList":
        cast_tensor = self.tensors.to(device)
        return ImageList(cast_tensor, self.image_sizes)

In [7]:
def _default_anchorgen():
    anchor_sizes = ((32,), (64,), (128,), (256,), (512,))
    aspect_ratios = ((0.5, 1.0, 2.0),) * len(anchor_sizes)
    return AnchorGenerator(anchor_sizes, aspect_ratios)
 
# Definisci la tua implementazione personalizzata della RPN
class CustomRegionProposalNetwork(RegionProposalNetwork):
    def __init__(
        self,
        anchor_generator: AnchorGenerator,
        head: nn.Module,
        # Faster-RCNN Training
        fg_iou_thresh: float,
        bg_iou_thresh: float,
        batch_size_per_image: int,
        positive_fraction: float,
        # Faster-RCNN Inference
        pre_nms_top_n: Dict[str, int],
        post_nms_top_n: Dict[str, int],
        nms_thresh: float,
        score_thresh: float = 0.0,
    ):
        # Implementa il tuo costruttore personalizzato
        super().__init__(
            anchor_generator,
            head,
            fg_iou_thresh,
            bg_iou_thresh,
            batch_size_per_image,
            positive_fraction,
            pre_nms_top_n,
            post_nms_top_n,
            nms_thresh,
            score_thresh,
        )
        
        # Inizializza la tua RPN personalizzata con i parametri necessari
        # ...
 
    def compute_loss(
    self, objectness: Tensor, pred_bbox_deltas: Tensor, labels: List[Tensor], regression_targets: List[Tensor]
    ) -> Tuple[Tensor, Tensor]:
        """
        Args:
            objectness (Tensor)
            pred_bbox_deltas (Tensor)
            labels (List[Tensor])
            regression_targets (List[Tensor])

        Returns:
            objectness_loss (Tensor)
            box_loss (Tensor)
        """

        sampled_pos_inds, sampled_neg_inds = self.fg_bg_sampler(labels)
        sampled_pos_inds = torch.where(torch.cat(sampled_pos_inds, dim=0))[0]
        sampled_neg_inds = torch.where(torch.cat(sampled_neg_inds, dim=0))[0]

        sampled_inds = torch.cat([sampled_pos_inds, sampled_neg_inds], dim=0)

        objectness = objectness.flatten()

        labels = torch.cat(labels, dim=0)
        regression_targets = torch.cat(regression_targets, dim=0)

        box_loss = F.mse_loss(
            pred_bbox_deltas[sampled_pos_inds],
            regression_targets[sampled_pos_inds],
            reduction="sum",
        ) / (sampled_inds.numel())

        normalization_factor = max(1, labels.numel())  # Utilizza almeno 1 per evitare la divisione per zero
    
        # Controlla NaN
        if torch.isnan(box_loss):
            # Assegna un valore predefinito o gestisci la situazione di conseguenza
            box_loss = torch.tensor(0.0)  # Ad esempio, assegnando 0.0 in caso di NaN

        box_loss = box_loss / normalization_factor

        objectness_loss = F.binary_cross_entropy_with_logits(objectness[sampled_inds], labels[sampled_inds])

        return objectness_loss, box_loss

In [8]:
class TwoMLPHead(nn.Module):
    """
    Standard heads for FPN-based models
 
    Args:
        in_channels (int): number of input channels
        representation_size (int): size of the intermediate representation
    """
 
    def __init__(self, in_channels, representation_size):
        super().__init__()
 
        self.fc6 = nn.Linear(in_channels, representation_size)
        self.fc7 = nn.Linear(representation_size, representation_size)
 
    def forward(self, x):
        x = x.flatten(start_dim=1)
 
        x = F.relu(self.fc6(x))
        x = F.relu(self.fc7(x))
 
        return x

In [9]:
class FastRCNNPredictor(nn.Module):
    """
    Standard classification + bounding box regression layers
    for Fast R-CNN.
 
    Args:
        in_channels (int): number of input channels
        num_classes (int): number of output classes (including background)
    """
 
    def __init__(self, in_channels, num_classes):
        super().__init__()
        self.cls_score = nn.Linear(in_channels, num_classes)
        self.bbox_pred = nn.Linear(in_channels, num_classes * 4)
 
    def forward(self, x):
        if x.dim() == 4:
            torch._assert(
                list(x.shape[2:]) == [1, 1],
                f"x has the wrong shape, expecting the last two dimensions to be [1,1] instead of {list(x.shape[2:])}",
            )
        x = x.flatten(start_dim=1)
        scores = self.cls_score(x)
        bbox_deltas = self.bbox_pred(x)
 
        return scores, bbox_deltas

In [10]:
def iou_loss(pred_boxes, target_boxes):
    # Calcola l'area dell'ancora
    anchor_area = (target_boxes[:, 2] - target_boxes[:, 0]) * (target_boxes[:, 3] - target_boxes[:, 1])
    # Calcola l'area della predizione
    pred_area = (pred_boxes[:, 2] - pred_boxes[:, 0]) * (pred_boxes[:, 3] - pred_boxes[:, 1])

    # Calcola l'area di intersezione
    inter_xmin = torch.max(pred_boxes[:, 0], target_boxes[:, 0])
    inter_ymin = torch.max(pred_boxes[:, 1], target_boxes[:, 1])
    inter_xmax = torch.min(pred_boxes[:, 2], target_boxes[:, 2])
    inter_ymax = torch.min(pred_boxes[:, 3], target_boxes[:, 3])

    inter_width = torch.clamp(inter_xmax - inter_xmin, min=0)
    inter_height = torch.clamp(inter_ymax - inter_ymin, min=0)
    intersection = inter_width * inter_height

    # Calcola l'area di unione
    union = anchor_area + pred_area - intersection

    # Calcola la IoU Loss evitando la divisione per zero
    iou = torch.where(union > 0, intersection / union, torch.tensor(0.0))

    # Calcola la perdita finale
    iou_loss = 1 - iou

    return iou_loss.mean()

In [11]:
def fastrcnn_loss(class_logits, box_regression, labels, regression_targets):
    # type: (Tensor, Tensor, List[Tensor], List[Tensor]) -> Tuple[Tensor, Tensor]
    """
    Computes the loss for Faster R-CNN.

    Args:
        class_logits (Tensor)
        box_regression (Tensor)
        labels (list[BoxList])
        regression_targets (Tensor)

    Returns:
        classification_loss (Tensor)
        box_loss (Tensor)
    """

    labels = torch.cat(labels, dim=0)
    regression_targets = torch.cat(regression_targets, dim=0)

    classification_loss = F.cross_entropy(class_logits, labels)

    # get indices that correspond to the regression targets for
    # the corresponding ground truth labels, to be used with
    # advanced indexing
    sampled_pos_inds_subset = torch.where(labels > 0)[0]
    labels_pos = labels[sampled_pos_inds_subset]
    N, num_classes = class_logits.shape
    box_regression = box_regression.reshape(N, box_regression.size(-1) // 4, 4)

    box_loss = iou_loss(
        box_regression[sampled_pos_inds_subset, labels_pos],
        regression_targets[sampled_pos_inds_subset]
    )
    normalization_factor = max(1, labels.numel())  # Utilizza almeno 1 per evitare la divisione per zero
    box_loss = box_loss / normalization_factor

    # Controlla NaN
    if torch.isnan(box_loss):
        box_loss = torch.tensor(0.0)

    # Restituisce le perdite
    return classification_loss, box_loss
'''    
    box_loss = box_loss / labels.numel()

    return classification_loss, box_loss

'''

class CustomRoIHeads(RoIHeads):
    def __init__(
        self,
        box_roi_pool,
        box_head,
        box_predictor,
        # Faster R-CNN training
        fg_iou_thresh,
        bg_iou_thresh,
        batch_size_per_image,
        positive_fraction,
        bbox_reg_weights,
        # Faster R-CNN inference
        score_thresh,
        nms_thresh,
        detections_per_img,
        # Mask
        mask_roi_pool=None,
        mask_head=None,
        mask_predictor=None,
        keypoint_roi_pool=None,
        keypoint_head=None,
        keypoint_predictor=None,
    ):
        super().__init__(
            box_roi_pool,
            box_head,
            box_predictor,
            # Faster R-CNN training
            fg_iou_thresh,
            bg_iou_thresh,
            batch_size_per_image,
            positive_fraction,
            bbox_reg_weights,
            # Faster R-CNN inference
            score_thresh,
            nms_thresh,
            detections_per_img,
        )
        
    def forward(
        self,
        features,  # type: Dict[str, Tensor]
        proposals,  # type: List[Tensor]
        image_shapes,  # type: List[Tuple[int, int]]
        targets=None,  # type: Optional[List[Dict[str, Tensor]]]
    ):
        # type: (...) -> Tuple[List[Dict[str, Tensor]], Dict[str, Tensor]]
        """
        Args:
            features (List[Tensor])
            proposals (List[Tensor[N, 4]])
            image_shapes (List[Tuple[H, W]])
            targets (List[Dict])
        """
        if targets is not None:
            for t in targets:
                # TODO: https://github.com/pytorch/pytorch/issues/26731
                floating_point_types = (torch.float, torch.double, torch.half)
                if not t["boxes"].dtype in floating_point_types:
                    raise TypeError(f"target boxes must of float type, instead got {t['boxes'].dtype}")
                if not t["labels"].dtype == torch.int64:
                    raise TypeError(f"target labels must of int64 type, instead got {t['labels'].dtype}")
                if self.has_keypoint():
                    if not t["keypoints"].dtype == torch.float32:
                        raise TypeError(f"target keypoints must of float type, instead got {t['keypoints'].dtype}")

        if self.training:
            proposals, matched_idxs, labels, regression_targets = self.select_training_samples(proposals, targets)
        else:
            labels = None
            regression_targets = None
            matched_idxs = None

        box_features = self.box_roi_pool(features, proposals, image_shapes)
        box_features = self.box_head(box_features)
        class_logits, box_regression = self.box_predictor(box_features)

        result: List[Dict[str, torch.Tensor]] = []
        losses = {}
        if self.training:
            if labels is None:
                raise ValueError("labels cannot be None")
            if regression_targets is None:
                raise ValueError("regression_targets cannot be None")
            loss_classifier, loss_box_reg = fastrcnn_loss(class_logits, box_regression, labels, regression_targets)
            losses = {"loss_classifier": loss_classifier, "loss_box_reg": loss_box_reg}
        else:
            boxes, scores, labels = self.postprocess_detections(class_logits, box_regression, proposals, image_shapes)
            num_images = len(boxes)
            for i in range(num_images):
                result.append(
                    {
                        "boxes": boxes[i],
                        "labels": labels[i],
                        "scores": scores[i],
                    }
                )

        if self.has_mask():
            mask_proposals = [p["boxes"] for p in result]
            if self.training:
                if matched_idxs is None:
                    raise ValueError("if in training, matched_idxs should not be None")

                # during training, only focus on positive boxes
                num_images = len(proposals)
                mask_proposals = []
                pos_matched_idxs = []
                for img_id in range(num_images):
                    pos = torch.where(labels[img_id] > 0)[0]
                    mask_proposals.append(proposals[img_id][pos])
                    pos_matched_idxs.append(matched_idxs[img_id][pos])
            else:
                pos_matched_idxs = None

            if self.mask_roi_pool is not None:
                mask_features = self.mask_roi_pool(features, mask_proposals, image_shapes)
                mask_features = self.mask_head(mask_features)
                mask_logits = self.mask_predictor(mask_features)
            else:
                raise Exception("Expected mask_roi_pool to be not None")

            loss_mask = {}
            if self.training:
                if targets is None or pos_matched_idxs is None or mask_logits is None:
                    raise ValueError("targets, pos_matched_idxs, mask_logits cannot be None when training")

                gt_masks = [t["masks"] for t in targets]
                gt_labels = [t["labels"] for t in targets]
                rcnn_loss_mask = maskrcnn_loss(mask_logits, mask_proposals, gt_masks, gt_labels, pos_matched_idxs)
                loss_mask = {"loss_mask": rcnn_loss_mask}
            else:
                labels = [r["labels"] for r in result]
                masks_probs = maskrcnn_inference(mask_logits, labels)
                for mask_prob, r in zip(masks_probs, result):
                    r["masks"] = mask_prob

            losses.update(loss_mask)

        # keep none checks in if conditional so torchscript will conditionally
        # compile each branch
        if (
            self.keypoint_roi_pool is not None
            and self.keypoint_head is not None
            and self.keypoint_predictor is not None
        ):
            keypoint_proposals = [p["boxes"] for p in result]
            if self.training:
                # during training, only focus on positive boxes
                num_images = len(proposals)
                keypoint_proposals = []
                pos_matched_idxs = []
                if matched_idxs is None:
                    raise ValueError("if in trainning, matched_idxs should not be None")

                for img_id in range(num_images):
                    pos = torch.where(labels[img_id] > 0)[0]
                    keypoint_proposals.append(proposals[img_id][pos])
                    pos_matched_idxs.append(matched_idxs[img_id][pos])
            else:
                pos_matched_idxs = None

            keypoint_features = self.keypoint_roi_pool(features, keypoint_proposals, image_shapes)
            keypoint_features = self.keypoint_head(keypoint_features)
            keypoint_logits = self.keypoint_predictor(keypoint_features)

            loss_keypoint = {}
            if self.training:
                if targets is None or pos_matched_idxs is None:
                    raise ValueError("both targets and pos_matched_idxs should not be None when in training mode")

                gt_keypoints = [t["keypoints"] for t in targets]
                rcnn_loss_keypoint = keypointrcnn_loss(
                    keypoint_logits, keypoint_proposals, gt_keypoints, pos_matched_idxs
                )
                loss_keypoint = {"loss_keypoint": rcnn_loss_keypoint}
            else:
                if keypoint_logits is None or keypoint_proposals is None:
                    raise ValueError(
                        "both keypoint_logits and keypoint_proposals should not be None when not in training mode"
                    )

                keypoints_probs, kp_scores = keypointrcnn_inference(keypoint_logits, keypoint_proposals)
                for keypoint_prob, kps, r in zip(keypoints_probs, kp_scores, result):
                    r["keypoints"] = keypoint_prob
                    r["keypoints_scores"] = kps
            losses.update(loss_keypoint)

        return result, losses

In [12]:
def get_model(model_name='resnet50', num_classes=60):
    """
    Restituisce un modello Faster R-CNN con una determinata architettura di backbone.

    Precondizioni:
        - `model_name` è una stringa che specifica l'architettura del backbone. 
          Default è 'resnet50'.
        - `num_classes` è il numero di classi dell'insieme di dati. Default è 60.

    Postcondizioni:
        - Restituisce un modello Faster R-CNN con il backbone specificato e il numero di classi.
    """
    backbone = resnet_fpn_backbone(model_name, pretrained=True)
    model = CustomFasterRCNN(backbone, num_classes)
    return model

In [13]:
def collate_fn(batch):
    """
    Funzione di aggregazione per un batch di campioni.

    Precondizioni:
        - `batch` è una lista di campioni, ciascuno nel formato (immagine, target).
          Dove 'immagine' è un tensore rappresentante l'immagine e 'target' è un dizionario
          contenente le etichette associate all'immagine.

    Postcondizioni:
        - Restituisce una tupla di due elementi:
          1. Un tensore contenente tutte le immagini del batch.
          2. Una lista di dizionari rappresentanti i target corrispondenti alle immagini.
    """
    return tuple(zip(*batch))

In [14]:
class Averager:
    def __init__(self):
        """
        Inizializza un oggetto Averager.

        Postcondizioni:
            - `current_total` è l'accumulo corrente dei valori.
            - `iterations` è il numero corrente di iterazioni.
        """
        self.current_total = 0.0
        self.iterations = 0.0

    def send(self, value):
        """
        Aggiunge un valore all'accumulo e incrementa il numero di iterazioni.

        Precondizioni:
            - `value` è il valore da aggiungere all'accumulo.

        Postcondizioni:
            - Aggiunge `value` all'accumulo.
            - Incrementa il numero di iterazioni.
        """
        self.current_total += value
        self.iterations += 1

    @property
    def value(self):
        """
        Calcola e restituisce la media dei valori finora.

        Postcondizioni:
            - Restituisce la media dei valori finora.
              Se `iterations` è 0, restituisce 0.
        """
        print(f'value - self.iterations {self.iterations}, self.current_total {self.current_total}')
        if self.iterations == 0:
            return 0
        else:
            return 1.0 * self.current_total / self.iterations

    def reset(self):
        """
        Reimposta l'accumulo e il numero di iterazioni a zero.

        Postcondizioni:
            - `current_total` è reimpostato a 0.
            - `iterations` è reimpostato a 0.
        """
        self.current_total = 0.0
        self.iterations = 0.0

# DEFINIZIONE DEL MODELLO

In [15]:
# Definisci la tua implementazione personalizzata della FasterRCNN
class CustomFasterRCNN(FasterRCNN):
    def __init__(
        self,
        backbone,
        num_classes=None,
        # transform parameters
        min_size=800,
        max_size=1333,
        image_mean=None,
        image_std=None,
        # RPN parameters
        rpn_anchor_generator=None,
        rpn_head=None,
        rpn_pre_nms_top_n_train=2000,
        rpn_pre_nms_top_n_test=1000,
        rpn_post_nms_top_n_train=2000,
        rpn_post_nms_top_n_test=1000,
        rpn_nms_thresh=0.7,
        rpn_fg_iou_thresh=0.7,
        rpn_bg_iou_thresh=0.3,
        rpn_batch_size_per_image=256,
        rpn_positive_fraction=0.5,
        rpn_score_thresh=0.0,
        # Box parameters
        box_roi_pool=None,
        box_head=None,
        box_predictor=None,
        box_score_thresh=0.05,
        box_nms_thresh=0.5,
        box_detections_per_img=100,
        box_fg_iou_thresh=0.5,
        box_bg_iou_thresh=0.5,
        box_batch_size_per_image=512,
        box_positive_fraction=0.25,
        bbox_reg_weights=None,
        **kwargs,
    ):
        # Implementa il tuo costruttore personalizzato
        super().__init__(
            backbone, num_classes            
        )
        rpn_pre_nms_top_n = dict(training=rpn_pre_nms_top_n_train, testing=rpn_pre_nms_top_n_test)
        rpn_post_nms_top_n = dict(training=rpn_post_nms_top_n_train, testing=rpn_post_nms_top_n_test)
        
        if not hasattr(backbone, "out_channels"):
            raise ValueError(
                "backbone should contain an attribute out_channels "
                "specifying the number of output channels (assumed to be the "
                "same for all the levels)"
            )
 
        if not isinstance(rpn_anchor_generator, (AnchorGenerator, type(None))):
            raise TypeError(
                f"rpn_anchor_generator should be of type AnchorGenerator or None instead of {type(rpn_anchor_generator)}"
            )
 
        if num_classes is not None:
            if box_predictor is not None:
                raise ValueError("num_classes should be None when box_predictor is specified")
        else:
            if box_predictor is None:
                raise ValueError("num_classes should not be None when box_predictor is not specified")
 
        out_channels = backbone.out_channels
 
        if rpn_anchor_generator is None:
            rpn_anchor_generator = _default_anchorgen()
        if rpn_head is None:
            rpn_head = RPNHead(out_channels, rpn_anchor_generator.num_anchors_per_location()[0])
        
        # Sostituisci la RPN predefinita con la tua implementazione personalizzata
        self.rpn = CustomRegionProposalNetwork(
            rpn_anchor_generator,
            rpn_head,
            rpn_fg_iou_thresh,
            rpn_bg_iou_thresh,
            rpn_batch_size_per_image,
            rpn_positive_fraction,
            rpn_pre_nms_top_n,
            rpn_post_nms_top_n,
            rpn_nms_thresh,
            score_thresh=rpn_score_thresh,
        )
        
        '''
        if box_roi_pool is None:
            box_roi_pool = MultiScaleRoIAlign(featmap_names=["0", "1", "2", "3"], output_size=7, sampling_ratio=2)

        if box_head is None:
            resolution = box_roi_pool.output_size[0]
            representation_size = 1024
            box_head = TwoMLPHead(out_channels * resolution**2, representation_size)

        if box_predictor is None:
            representation_size = 1024
            box_predictor = FastRCNNPredictor(representation_size, num_classes)

        self.roi_heads = CustomRoIHeads(
            # Box
            box_roi_pool,
            box_head,
            box_predictor,
            box_fg_iou_thresh,
            box_bg_iou_thresh,
            box_batch_size_per_image,
            box_positive_fraction,
            bbox_reg_weights,
            box_score_thresh,
            box_nms_thresh,
            box_detections_per_img,
        )
        '''

In [16]:
# Crea il modello Faster R-CNN
model_name = 'resnet50'
num_classes = 60
model = get_model(model_name, num_classes)

print(model)

Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 236MB/s] 


CustomFasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=1e-05)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=1e-05)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=1e-05)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=1e-05)
         

In [17]:
# Configurazione della trasformazione per l'oggetto model
model.transform = GeneralizedRCNNTransform(
    min_size=(640,),             # Dimensione minima dell'immagine durante la trasformazione
    max_size=640,                 # Dimensione massima dell'immagine durante la trasformazione
    image_mean=[0.485, 0.456, 0.406],   # Media dell'immagine per la normalizzazione
    image_std=[0.229, 0.224, 0.225]     # Deviazione standard dell'immagine per la normalizzazione
)

In [18]:
# Sposta il modello sulla GPU, se disponibile
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = model.to(device)

In [19]:
train_losses = Averager()     # Accumulatore di loss per train
val_losses = Averager()       # Accumulatore di loss per la validazione

best_loss = float('inf')       # Migliore loss inizializzata a infinito
os.makedirs(weight_path, exist_ok=True)  # Creazione della directory per i pesi del modello, se non esiste già

In [23]:
# IPERPARAMETRI
num_epochs = 10
batch_size = 16
num_workers = 4
momentum = 0.9
learning_rate = 0.005

# Pesi per la somma pesata delle loss 
rpn_loss_regr_weight = 1     # box regression della region proposal network
rpn_loss_cls_fixed_num = 1   # object classification della rpn
class_loss_cls_weight = 1    # object classfication della parte finale della rete
class_loss_regr_weight = 1   # box regression per la parte finale della rete

In [24]:
# Definisci i percorsi ai tuoi dataset
train_dataset = BuildingsDataset(train_path, coco_path)
val_dataset = BuildingsDataset(val_path, coco_path)
test_dataset = BuildingsDataset(test_path, coco_path)
 
# Crea i data loader
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=collate_fn)

# Definizione dell'ottimizzatore
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate, momentum=momentum) 
#optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999), weight_decay=1e-5)
#optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# TRAINING

In [26]:
lossModel_Train = []
lossModel_Val = []
        
# Addestramento
for epoch in range(num_epochs):
    model.train()
    train_losses.reset()
    val_losses.reset()
    
    # Iterazione su batches di addestramento
    for batch_index, (images, targets) in enumerate(tqdm(train_loader)):
        images, targets = [image.to(device) for image in images], [{k: v.to(device) for k, v in t.items()} for t in targets]

        model.zero_grad()
        optimizer.zero_grad()
        loss_dict = model(images, targets)
        
        for key, value in loss_dict.items():
            if torch.isnan(value):
                loss_dict[key] = torch.tensor(0.0)
        
        # Calcolo della loss totale come combinazione pesata di diverse componenti
        loss = rpn_loss_regr_weight * loss_dict['loss_rpn_box_reg'] +\
               rpn_loss_cls_fixed_num * loss_dict['loss_objectness'] +\
               class_loss_cls_weight * loss_dict['loss_classifier'] +\
               class_loss_regr_weight * loss_dict['loss_box_reg']
            
        # Tracciamento della loss
        train_losses.send(loss.item())

        # Backpropagation e aggiornamento dei pesi
        loss.backward()
        optimizer.step()
        
        # Stampa della loss ogni 50 batches
        '''if batch_index % 50 == 0:
            print(f"TRAINING: Epoch: {epoch} Batch Index: {batch_index} Loss: {train_losses.value}")
            modelLoss_train = train_losses.value'''
            
    modelLoss_train = train_losses.value        
    print(f"TRAINING: Epoch: {epoch+1} Loss: {modelLoss_train}")   
    
    # Validazione
    with torch.no_grad():
        for batch_index, (images, targets) in enumerate(tqdm(val_loader)):
            images, targets = [image.to(device) for image in images], [{k: v.to(device) for k, v in t.items()} for t in targets]
            outputs = model(images, targets)
  
            for key, value in outputs.items():
                if torch.isnan(value):
                    outputs[key] = torch.tensor(0.0)

            # Calcolo della loss totale durante la validazione
            loss = rpn_loss_regr_weight * outputs['loss_rpn_box_reg'] +\
                    rpn_loss_cls_fixed_num * outputs['loss_objectness'] +\
                    class_loss_cls_weight * outputs['loss_classifier'] +\
                    class_loss_regr_weight * outputs['loss_box_reg']

            # Tracciamento della loss
            val_losses.send(loss.item())
            '''
            # Stampa della loss di validazione ogni 50 batches
            if batch_index % 50 == 0:
                print(f"VALIDATION: Epoch: {epoch} Batch Index: {batch_index} Loss: {val_losses.value}")
                modelLoss_val = val_losses.value 
            '''    
        modelLoss_val = val_losses.value        
        print(f"VALIDATION: Epoch: {epoch+1} Loss: {modelLoss_val}")
        
        # Salvataggio dei migliori pesi se la loss di validazione è migliorata    
        if (modelLoss_val < best_loss) :
            print('     .... Saving best weights ....')
            best_loss = modelLoss_val
            #salvataggio dei migliori pesi sul validation
            torch.save(model.state_dict(), weight_path + 'best_model_weights.pth')
        
        # Salvataggio delle loss per i plot
        lossModel_Train.append(modelLoss_train)
        lossModel_Val.append(modelLoss_val)


  0%|          | 0/1109 [00:03<?, ?it/s]


RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn

In [None]:
# Effettuiamo il plot dele curve di loss
plt.figure()
plt.title("Model: Training Vs Validation Losses")
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.plot(list(range(1,len(lossModel_Train)+1)), lossModel_Train, color='r', label="Training Loss")
plt.plot(list(range(1, len(lossModel_Val)+1)), lossModel_Val, color='g', label="Validation Loss")
plt.legend()

# Test di Prova

In [None]:
def visualize_results_with_boxes(image_path, results):
    bounding_boxes = results['boxes']

    # Carica l'immagine usando PIL
    image = Image.open(image_path)

    # Inizializza ImageDraw con l'immagine
    draw = ImageDraw.Draw(image)

    # Disegna i bounding box sull'immagine
    for bbox in bounding_boxes:
        bbox_coords = bbox  # Converti la stringa bbox in una lista di coordinate
        bbox_coords = [bbox_coords[0],bbox_coords[1],bbox_coords[2],bbox_coords[3]]
        draw.rectangle(bbox_coords, outline='red', width=2)

    # Visualizza l'immagine con i bounding box
    plt.imshow(image)
    plt.show()

In [None]:
test_images = sorted(os.listdir(test_path))
print(test_images[0])

In [None]:
def visualize_image_with_boxes(image_path, coco_annotations_path):
    # Carica il file COCO annotations JSON
    with open(coco_annotations_path, 'r') as coco_file:
        coco_data = json.load(coco_file)

    # Estrai l'id dell'immagine dal nome del file
    image_name = image_path.split("/")[-1]
    image_id = next((img['id'] for img in coco_data['images'] if img['file_name'] == image_name), None)

    # Se l'id dell'immagine è trovato, estrai i bounding box corrispondenti
    if image_id is not None:
        bounding_boxes = [bbox for bbox in coco_data['annotations'] if bbox['image_id'] == image_id]
        
        # Carica l'immagine usando PIL
        image = Image.open(image_path)

        # Inizializza ImageDraw con l'immagine
        draw = ImageDraw.Draw(image)

        # Disegna i bounding box sull'immagine
        for bbox in bounding_boxes:
            bbox_coords = eval(bbox['bbox'])  # Converti la stringa bbox in una lista di coordinate
            bbox_coords = [bbox_coords[0],bbox_coords[1],bbox_coords[0]+bbox_coords[2],bbox_coords[1]+bbox_coords[3]]
            draw.rectangle(bbox_coords, outline='red', width=2)

        # Visualizza l'immagine con i bounding box
        plt.imshow(image)
        plt.show()
        return image
    else:
        print(f"Image {image_name} not found in COCO annotations.")

In [None]:
# Carichiamo i pesi relativi al modello migliore
#weights = torch.load('/kaggle/input/dataset-ipcv-teama/best_model_weights_rpn_10epoch_16batch_0.005lr.pth')
weights = torch.load(weight_path + 'best_model_weights.pth')
model.load_state_dict(weights)

In [None]:
indice = 50
# Test su un'immagine
model.eval()
test_image = test_dataset[indice][0]
#print(test_image)

test_image = test_image.to(device)
test_image = test_image.unsqueeze(0)
with torch.no_grad():
    output = model(test_image)
    
soglia = 0.1
output_tagliato = []
 
for detection in output:
    boxes = detection['boxes']
    labels = detection['labels']
    scores = detection['scores']
 
    # Trova gli indici dei box che superano la soglia
    indici_superati_soglia = scores >= soglia
 
    # Filtra i box, le label e gli score in base agli indici superati la soglia
    boxes_tagliati = boxes[indici_superati_soglia]
    labels_tagliati = labels[indici_superati_soglia]
    scores_tagliati = scores[indici_superati_soglia]
 
    # Creare un nuovo dizionario con i risultati tagliati
    detection_tagliata = {
        'boxes': boxes_tagliati,
        'labels': labels_tagliati,
        'scores': scores_tagliati
    }
 
    # Aggiungere il risultato alla lista finale solo se ci sono box sopra la soglia
    if len(boxes_tagliati) > 0:
        output_tagliato.append(detection_tagliata)

original_image_path = test_path + '/' + test_images[indice]
img = visualize_image_with_boxes(original_image_path, coco_path)
visualize_results_with_boxes(original_image_path, output_tagliato[0])   