In [None]:
import os

HOME = os.path.abspath(os.sep)
train_dataset_folder = "../Dataset/fasterrcnn/train"
valid_dataset_folder = "../Dataset/fasterrcnn/valid"

In [None]:
from PIL import Image
from torch.utils.data import Dataset
import numpy as np
import torch
import torchvision
import json

class GunDataset(Dataset):
    def __init__(self, image_dir, annotation_dir, transform=None, train=True):
      with open(annotation_dir, "r") as f:
        coco_data = json.load(f)
      self.image_dir = image_dir
      self.transform = transform
      self.train = train
      self.images = coco_data['images']
      self.annotations = coco_data['annotations']
      self.classes = {"gun": 0}
      
      # Crea il mapping immagine -> annotazioni
      self.img_to_anns = {img['id']: [] for img in self.images}
      for ann in self.annotations:
        self.img_to_anns[ann['image_id']].append(ann)
    
      print(f"Immagini totali: {len(self.images)}")
      print(f"Immagini con annotazioni: {len(self.images)}")
      
    def __len__(self):
        return len(self.images)
      
    def __getitem__(self, idx):
        image_path = os.path.join(self.image_dir, str(self.images[idx]['file_name']))
        image = Image.open(image_path).convert("RGB")
        
        img_id = self.images[idx]['id']
        annotations = self.img_to_anns[img_id]
        
        # Conversione annotazioni COCO in Pascal VOC
        boxes = []
        labels = []  
        for ann in annotations:
            x, y, w, h = ann['bbox']
            boxes.append([x, y, x + w, y + h])
            labels.append(ann['category_id'])
        
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        
        if self.transform is not None:
            image = self.transform(image)
            
        return image, boxes, labels
        

In [None]:
import torchvision.transforms as T
from torch.utils.data import DataLoader

transform = T.Compose([
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_dataset = GunDataset(train_dataset_folder, train_dataset_folder+"/_annotations.coco.json", transform=transform)
val_dataset = GunDataset(valid_dataset_folder, valid_dataset_folder+"/_annotations.coco.json", transform=transform)

num_workers = 2 
if os.name == 'nt':
    num_workers = 0
    
def collate_fn(batch):
    return tuple(zip(*batch))

train_loader = DataLoader(
    train_dataset, 
    batch_size=4, 
    shuffle=True, 
    num_workers=num_workers,
    collate_fn=collate_fn
)

val_loader = DataLoader(
    val_dataset, 
    batch_size=4, 
    shuffle=False, 
    num_workers=num_workers,
    collate_fn=collate_fn
)

In [None]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.faster_rcnn import FasterRCNN_ResNet50_FPN_Weights
import torch.nn as nn

model = fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT)

in_features = model.roi_heads.box_predictor.cls_score.in_features

model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes=2)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

optimizer = torch.optim.SGD(model.parameters(), lr = 0.0025, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
from torchmetrics.detection import MeanAveragePrecision
import time

def train_one_epoch(model, optimizer, data_loader, device, epoch):
    model.train()
    total_loss = 0
    loss_classifier = 0
    loss_box_reg = 0
    loss_objectness = 0
    loss_rpn_box_reg = 0
    
    metric = MeanAveragePrecision(box_format='xyxy', 
                                 iou_thresholds=[0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95])
    
    print(f"Epoch: {epoch+1}")
    start_time = time.time()
    
    for i, (images, targets_boxes, targets_labels) in enumerate(data_loader):
        images = list(image.to(device) for image in images)
        targets = []
        
        for boxes, labels in zip(targets_boxes, targets_labels):
            target = {}
            target['boxes'] = boxes.to(device)
            target['labels'] = labels.to(device)
            targets.append(target)
        
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        
        loss_classifier += loss_dict.get('loss_classifier', 0)
        loss_box_reg += loss_dict.get('loss_box_reg', 0)
        loss_objectness += loss_dict.get('loss_objectness', 0)
        loss_rpn_box_reg += loss_dict.get('loss_rpn_box_reg', 0)
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        total_loss += losses.item()
        
        model.eval()
        with torch.no_grad():
            predictions = model(images)
        model.train()
        
        metric.update(predictions, targets)
        
        if i % 50 == 0:
            print(f"Batch [{i}/{len(data_loader)}], Loss: {losses.item():.4f}")
    
    metric_results = metric.compute()
    
    metrics = {
        'loss': total_loss / len(data_loader),
        'loss_classifier': loss_classifier / len(data_loader),
        'loss_box_reg': loss_box_reg / len(data_loader),
        'loss_objectness': loss_objectness / len(data_loader),
        'loss_rpn_box_reg': loss_rpn_box_reg / len(data_loader),
        'mAP': metric_results['map'].item(),
        'mAP_50': metric_results['map_50'].item(),
        'mAP_75': metric_results['map_75'].item(),
        'recall': metric_results['mar_100'].item(),
        'f1': 2 * (metric_results['map'].item() * metric_results['mar_100'].item()) / 
              (metric_results['map'].item() + metric_results['mar_100'].item() + 1e-6)
    }
    
    print(f"\nEpoch {epoch+1} training results:")
    for k, v in metrics.items():
        print(f"{k}: {v:.4f}")
    
    print(f"Time taken: {time.time() - start_time:.2f} seconds")
    
    return metrics

def validate(model, data_loader, device):
    model.eval()
    metric = MeanAveragePrecision(box_format='xyxy', 
                                 iou_thresholds=[0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95])
    
    total_loss = 0
    loss_classifier = 0
    loss_box_reg = 0
    loss_objectness = 0
    loss_rpn_box_reg = 0
    
    with torch.no_grad():
        for images, targets_boxes, targets_labels in data_loader:
            images = list(image.to(device) for image in images)
            targets = []
            
            for boxes, labels in zip(targets_boxes, targets_labels):
                target = {}
                target['boxes'] = boxes.to(device)
                target['labels'] = labels.to(device)
                targets.append(target)
            
            model.train()
            loss_dict = model(images, targets)
            model.eval()
            losses = sum(loss for loss in loss_dict.values())
            
            loss_classifier += loss_dict.get('loss_classifier', 0)
            loss_box_reg += loss_dict.get('loss_box_reg', 0)
            loss_objectness += loss_dict.get('loss_objectness', 0)
            loss_rpn_box_reg += loss_dict.get('loss_rpn_box_reg', 0)
            total_loss += losses.item()
            
            predictions = model(images)
            metric.update(predictions, targets)
    
    metric_results = metric.compute()
    
    metrics = {
        'val_loss': total_loss / len(data_loader),
        'val_loss_classifier': loss_classifier / len(data_loader),
        'val_loss_box_reg': loss_box_reg / len(data_loader),
        'val_loss_objectness': loss_objectness / len(data_loader),
        'val_loss_rpn_box_reg': loss_rpn_box_reg / len(data_loader),
        'val_mAP': metric_results['map'].item(),
        'val_mAP_50': metric_results['map_50'].item(),
        'val_mAP_75': metric_results['map_75'].item(),
        'val_recall': metric_results['mar_100'].item(),
        'val_f1': 2 * (metric_results['map'].item() * metric_results['mar_100'].item()) / 
                 (metric_results['map'].item() + metric_results['mar_100'].item() + 1e-6)
    }
    
    print("\nValidation results:")
    for k, v in metrics.items():
        print(f"{k}: {v:.4f}")
    
    return metrics

def train_model(model, optimizer, scheduler, train_loader, val_loader, device, num_epochs=10):
    best_map = 0
    history = []
    
    #Early stopping
    patience = 3
    epochs_no_improve = 0
    best_model_path = '../Models/best_model_fasterrcnn.pth'
    
    for epoch in range(num_epochs):
        train_metrics = train_one_epoch(model, optimizer, train_loader, device, epoch)
        scheduler.step()
        val_metrics = validate(model, val_loader, device)        
        
        if val_metrics['val_mAP'] > best_map:
            best_map = val_metrics['val_mAP']
            torch.save(model.state_dict(), best_model_path)
            epochs_no_improve = 0
        else:
            epochs_no_improve += 1
            print(f"Nessun miglioramento per {epochs_no_improve} epoche. Miglior mAP: {best_map:.4f}")
            
        # Early stopping
        if epochs_no_improve >= patience:
            print(f"Early stopping attivato all'epoca {epoch+1}")
            break
        
        epoch_metrics = {**train_metrics, **val_metrics}
        history.append(epoch_metrics)
        
        print(f"Epoca {epoch+1}/{num_epochs} completata")
        print("-" * 50)
    
    return history

In [9]:
import pandas as pd
import matplotlib.pyplot as plt

def plot_metrics(history):
    df = pd.DataFrame(history)

    if df.empty:
        print("Errore: il dizionario 'history' è vuoto. Nessun grafico verrà generato.")
        return

    metrics_to_plot = [
        ('mAP', 'val_mAP', 'Mean Average Precision'),
        ('mAP_50', 'val_mAP_50', 'mAP@0.5'),
        ('mAP_75', 'val_mAP_75', 'mAP@0.75'),
        ('recall', 'val_recall', 'Mean Average Recall'),
        ('f1', 'val_f1', 'F1 Score'),
        ('loss', 'val_loss', 'Loss'),
    ]

    fig, axes = plt.subplots(3, 2, figsize=(15, 20))
    axes = axes.flatten()

    for i, (train_metric, val_metric, title) in enumerate(metrics_to_plot):
        ax = axes[i]
        
        train_values = df.get(train_metric, pd.Series(dtype=float))
        val_values = df.get(val_metric, pd.Series(dtype=float))

        if not train_values.empty:
            ax.plot(df.index, train_values, 'b-', label='Training', alpha=0.7)
        
        if not val_values.empty:
            ax.plot(df.index, val_values, 'r--', label='Validation', alpha=0.7)
            
        ax.set_title(title)
        ax.set_xlabel('Epoca')
        ax.set_ylabel('Valore')
        ax.legend()
        ax.grid(True)

    plt.tight_layout()
    plt.savefig(f'training_metrics_{time.strftime("%Y%m%d-%H%M%S")}.png')
    plt.show()

In [None]:
num_epochs = 20
history = train_model(model, optimizer, lr_scheduler, train_loader, val_loader, device, num_epochs)
plot_metrics(history)