# PhobiaShield - FPN Custom Training

**Feature Pyramid Network** implementato from-scratch per PhobiaShield.

**Team:** Gabriele (Architect) | Member A (Data) | Member C (Demo)

**Course:** Fundamentals of Data Science - Sapienza University

**Date:** December 2025

---

## Dataset: ULTIMATE_COMPLETE
- **Total:** 11,425 images
- **Classes:** 5 (Clown, Shark, Spider, Blood, Needle)
- **Split:** 70/15/15
- **Size variation:** 260√ó (1.36px to 354px)

## Model: PhobiaNetFPN
- **Architecture:** Multi-scale FPN (P3, P4, P5)
- **Parameters:** 5.4M
- **Loss:** Focal Loss + MSE + CrossEntropy

## Requirements
- GPU: Tesla T4 (16GB) or better
- Time: ~2-4 hours for 50 epochs
- Drive: Dataset must be in Google Drive

## 1. Setup & Mount Drive

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

# Check GPU
!nvidia-smi

## 2. Clone Repository

In [None]:
import os
import sys

# Clone repo
!rm -rf /content/PhobiaShield
!git clone https://github.com/Gabriele-mp/PhobiaShield.git /content/PhobiaShield

# Add to path
os.chdir('/content/PhobiaShield')
sys.path.insert(0, '/content/PhobiaShield')

print("‚úÖ Repository cloned")
!ls -la

## 3. Install Dependencies

In [None]:
!pip install -q omegaconf albumentations tqdm
print("‚úÖ Dependencies installed")

## 4. Extract Dataset

In [None]:
import zipfile
from pathlib import Path

# Paths
ZIP_PATH = '/content/drive/MyDrive/PhobiaShield_Models/PhobiaShield/DATASET_ULTIMATE_COMPLETE.zip'
EXTRACT_TO = '/content/dataset_ultimate'

# Check if zip exists
if not os.path.exists(ZIP_PATH):
    print(f"‚ùå Dataset not found at: {ZIP_PATH}")
    print("Please upload DATASET_ULTIMATE_COMPLETE.zip to your Drive")
else:
    print(f"üì¶ Extracting dataset...")
    
    os.makedirs(EXTRACT_TO, exist_ok=True)
    
    with zipfile.ZipFile(ZIP_PATH, 'r') as z:
        z.extractall(EXTRACT_TO)
    
    # Count images
    train_imgs = len(list(Path(f'{EXTRACT_TO}/train/images').glob('*.jpg')))
    val_imgs = len(list(Path(f'{EXTRACT_TO}/val/images').glob('*.jpg')))
    test_imgs = len(list(Path(f'{EXTRACT_TO}/test/images').glob('*.jpg')))
    
    print(f"‚úÖ Dataset extracted!")
    print(f"   Train: {train_imgs} images")
    print(f"   Val: {val_imgs} images")
    print(f"   Test: {test_imgs} images")
    print(f"   Total: {train_imgs + val_imgs + test_imgs} images")

## 5. Load Model & Loss

In [None]:
import torch
from src.models.phobia_net_fpn import PhobiaNetFPN
from src.models.loss_fpn import FPNLoss
from omegaconf import OmegaConf

# Load config
config = OmegaConf.load('cfg/model/tiny_yolo_5class.yaml')
config = OmegaConf.to_container(config, resolve=True)

# Create model
model = PhobiaNetFPN(config, use_attention=True).to('cuda')

# Create loss with optimized class weights
CLASS_WEIGHTS = [4.76, 1.28, 3.70, 1.01, 1.39]  # Optimized

loss_fn = FPNLoss(
    num_classes=5,
    num_boxes=2,
    lambda_coord=5.0,
    lambda_obj=5.0,
    lambda_noobj=0.05,
    use_focal=True,
    focal_gamma=2.0,
    focal_alpha=0.25,
    class_weights=CLASS_WEIGHTS
)

params = sum(p.numel() for p in model.parameters())
print(f"‚úÖ Model loaded: {params:,} parameters ({params*4/1e6:.2f} MB)")
print(f"‚úÖ Loss function: FPN Loss with Focal Loss")

## 6. Create Datasets

In [None]:
import torch
from torch.utils.data import DataLoader
from pathlib import Path
import cv2
import numpy as np
import albumentations as A

class FPNDataset(torch.utils.data.Dataset):
    """Dataset for FPN training"""
    
    def __init__(self, img_dir, label_dir, img_size=416, augment=False):
        self.img_dir = Path(img_dir)
        self.label_dir = Path(label_dir)
        self.img_size = img_size
        self.img_files = sorted(list(self.img_dir.glob('*.jpg')))
        self.grid_sizes = [52, 26, 13]
        
        if augment:
            self.transform = A.Compose([
                A.HorizontalFlip(p=0.5),
                A.RandomBrightnessContrast(p=0.3),
            ], bbox_params=A.BboxParams(
                format='yolo',
                label_fields=['class_labels'],
                min_visibility=0.3,
                clip=True
            ))
        else:
            self.transform = None
    
    def __len__(self):
        return len(self.img_files)
    
    def __getitem__(self, idx):
        img_path = self.img_files[idx]
        img = cv2.imread(str(img_path))
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        
        label_path = self.label_dir / f"{img_path.stem}.txt"
        
        bboxes = []
        class_labels = []
        
        if label_path.exists():
            with open(label_path) as f:
                for line in f:
                    parts = line.strip().split()
                    if len(parts) >= 5:
                        cls_id = int(parts[0])
                        x, y, w, h = map(float, parts[1:5])
                        bboxes.append([x, y, w, h])
                        class_labels.append(cls_id)
        
        if self.transform and bboxes:
            try:
                transformed = self.transform(image=img, bboxes=bboxes, class_labels=class_labels)
                img = transformed['image']
                bboxes = transformed['bboxes']
                class_labels = transformed['class_labels']
            except:
                pass
        
        img = cv2.resize(img, (self.img_size, self.img_size))
        img = torch.from_numpy(img.transpose(2, 0, 1)).float() / 255.0
        
        targets = []
        
        for grid_size in self.grid_sizes:
            target = torch.zeros(grid_size, grid_size, 20)
            
            for bbox, cls_id in zip(bboxes, class_labels):
                x, y, w, h = bbox
                cls_id = int(cls_id)
                
                i = min(int(x * grid_size), grid_size - 1)
                j = min(int(y * grid_size), grid_size - 1)
                
                if target[j, i, 4] == 0:
                    target[j, i, 0] = x * grid_size - i
                    target[j, i, 1] = y * grid_size - j
                    target[j, i, 2] = w
                    target[j, i, 3] = h
                    target[j, i, 4] = 1.0
                    target[j, i, 5 + cls_id] = 1.0
            
            targets.append(target)
        
        return img, targets
    
    @staticmethod
    def collate_fn(batch):
        imgs = torch.stack([b[0] for b in batch])
        t_p3 = torch.stack([b[1][0] for b in batch])
        t_p4 = torch.stack([b[1][1] for b in batch])
        t_p5 = torch.stack([b[1][2] for b in batch])
        return imgs, (t_p3, t_p4, t_p5)

# Create datasets
DATASET_BASE = '/content/dataset_ultimate'

train_dataset = FPNDataset(
    f'{DATASET_BASE}/train/images',
    f'{DATASET_BASE}/train/labels',
    augment=True
)

val_dataset = FPNDataset(
    f'{DATASET_BASE}/val/images',
    f'{DATASET_BASE}/val/labels',
    augment=False
)

print(f"‚úÖ Train dataset: {len(train_dataset)} images")
print(f"‚úÖ Val dataset: {len(val_dataset)} images")

## 7. Training Configuration

In [None]:
from torch.amp import autocast, GradScaler
from torch.optim.lr_scheduler import LambdaLR, CosineAnnealingWarmRestarts

# Training config
BATCH = 64
EPOCHS = 50
LR = 0.000346
PATIENCE = 20

# Create loaders
train_loader = DataLoader(
    train_dataset,
    batch_size=BATCH,
    shuffle=True,
    collate_fn=FPNDataset.collate_fn,
    num_workers=2,
    pin_memory=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=BATCH,
    shuffle=False,
    collate_fn=FPNDataset.collate_fn,
    num_workers=2,
    pin_memory=True
)

# Optimizer
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=LR,
    weight_decay=0.0001
)

# Schedulers
warmup_epochs = 10

def warmup_lambda(epoch):
    if epoch < warmup_epochs:
        return (epoch + 1) / warmup_epochs
    return 1.0

warmup_scheduler = LambdaLR(optimizer, lr_lambda=warmup_lambda)
cosine_scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=15)

# Mixed precision
scaler = GradScaler('cuda')

# Save directory
SAVE_DIR = '/content/drive/MyDrive/PhobiaShield_Models/fpn_custom'
os.makedirs(SAVE_DIR, exist_ok=True)

print(f"‚úÖ Training config:")
print(f"   Batch: {BATCH}")
print(f"   Epochs: {EPOCHS}")
print(f"   LR: {LR}")
print(f"   Train batches: {len(train_loader)}")
print(f"   Val batches: {len(val_loader)}")

## 8. Training Loop

In [None]:
import time
from tqdm import tqdm

print("="*70)
print("üöÄ STARTING FPN TRAINING")
print("="*70)

best_val_loss = float('inf')
patience_counter = 0

for epoch in range(EPOCHS):
    epoch_start = time.time()
    
    # TRAIN
    model.train()
    train_loss = 0
    
    train_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Train]")
    
    for imgs, targets in train_bar:
        imgs = imgs.to('cuda', non_blocking=True)
        targets = tuple(t.to('cuda', non_blocking=True) for t in targets)
        
        optimizer.zero_grad(set_to_none=True)
        
        with autocast('cuda'):
            preds = model(imgs)
            loss, _ = loss_fn(preds, targets)
        
        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 10.0)
        scaler.step(optimizer)
        scaler.update()
        
        train_loss += loss.item()
        train_bar.set_postfix({'loss': f"{loss.item():.3f}"})
    
    train_loss /= len(train_loader)
    
    # VAL
    model.eval()
    val_loss = 0
    
    with torch.no_grad():
        val_bar = tqdm(val_loader, desc=f"Epoch {epoch+1}/{EPOCHS} [Val]")
        
        for imgs, targets in val_bar:
            imgs = imgs.to('cuda', non_blocking=True)
            targets = tuple(t.to('cuda', non_blocking=True) for t in targets)
            
            with autocast('cuda'):
                preds = model(imgs)
                loss, _ = loss_fn(preds, targets)
            
            val_loss += loss.item()
            val_bar.set_postfix({'loss': f"{loss.item():.3f}"})
    
    val_loss /= len(val_loader)
    
    # Scheduler
    if epoch < warmup_epochs:
        warmup_scheduler.step()
    else:
        cosine_scheduler.step()
    
    epoch_time = (time.time() - epoch_start) / 60
    
    # Log
    print(f"\nüìä Epoch {epoch+1}/{EPOCHS}")
    print(f"   Train: {train_loss:.4f} | Val: {val_loss:.4f}")
    print(f"   Time: {epoch_time:.1f} min")
    print(f"   LR: {optimizer.param_groups[0]['lr']:.6f}")
    
    # Save best
    if val_loss < best_val_loss:
        improvement = best_val_loss - val_loss
        best_val_loss = val_loss
        patience_counter = 0
        
        checkpoint_path = f'{SAVE_DIR}/fpn_best_e{epoch+1}_loss{val_loss:.4f}.pth'
        
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_loss': val_loss,
        }, checkpoint_path)
        
        print(f"   ‚úÖ Best! Saved (‚Üì {improvement:.4f})")
    else:
        patience_counter += 1
        print(f"   ‚ö†Ô∏è  No improvement ({patience_counter}/{PATIENCE})")
        
        if patience_counter >= PATIENCE:
            print(f"\nüõë Early stopping")
            break
    
    print()

print("\n" + "="*70)
print("‚úÖ TRAINING COMPLETE!")
print("="*70)
print(f"Best Val Loss: {best_val_loss:.4f}")
print(f"Model saved in: {SAVE_DIR}")

## 9. Results Summary

In [None]:
print("\nüìä TRAINING SUMMARY")
print("="*70)
print(f"\n‚úÖ Best Model:")
print(f"   Validation Loss: {best_val_loss:.4f}")
print(f"   Saved at: {SAVE_DIR}")
print(f"\nüìÅ Checkpoint files:")
!ls -lh "$SAVE_DIR"

print("\nüéØ Next Steps:")
print("   1. Run evaluation notebook (03_Evaluation.ipynb)")
print("   2. Compare with YOLOv8 results")
print("   3. Generate metrics and confusion matrix")