In [2]:
# Preprocessing des Données - Détection Nodules Pulmonaires
# Conversion YOLO → COCO Format pour Faster R-CNN

import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np
import matplotlib.pyplot as plt
import json
from pathlib import Path
import random
from PIL import Image
import albumentations as A
from albumentations.pytorch import ToTensorV2

# Configuration
plt.style.use('default')
plt.rcParams['figure.figsize'] = (12, 8)

print("✅ Imports preprocessing réussis!")
print(f"🔥 PyTorch version: {torch.__version__}")
print(f"🔥 CUDA disponible: {torch.cuda.is_available()}")

✅ Imports preprocessing réussis!
🔥 PyTorch version: 2.7.0
🔥 CUDA disponible: False


In [3]:
# Configuration des chemins (réutiliser du notebook 01)
BASE_PATH = Path("../data/raw/ct_images")
TRAIN_IMAGES = BASE_PATH / "images" / "train"
TRAIN_LABELS = BASE_PATH / "labels" / "train"
VAL_IMAGES = BASE_PATH / "images" / "val"
VAL_LABELS = BASE_PATH / "labels" / "val"

# Nouveau dossier pour les données preprocessées
PROCESSED_PATH = Path("../data/processed")
PROCESSED_PATH.mkdir(exist_ok=True)

# Configuration pour le modèle
CONFIG = {
    'image_size': (512, 512),  # Resize pour Faster R-CNN
    'num_classes': 2,          # 0=background, 1=nodule
    'batch_size': 4,           # Petit batch pour CUDA False
    'normalize_mean': [0.485, 0.456, 0.406],  # ImageNet standard
    'normalize_std': [0.229, 0.224, 0.225]
}

print("✅ Configuration créée!")
print(f"📁 Dossier processed: {PROCESSED_PATH.exists()}")
print(f"🖼️ Taille images: {CONFIG['image_size']}")
print(f"⚡ Batch size: {CONFIG['batch_size']} (adapté pour CPU)")

✅ Configuration créée!
📁 Dossier processed: True
🖼️ Taille images: (512, 512)
⚡ Batch size: 4 (adapté pour CPU)


In [4]:
def yolo_to_coco_format(yolo_boxes, img_width, img_height):
    """
    Convertir les bounding boxes YOLO en format COCO pour Faster R-CNN

    YOLO: [class_id, center_x, center_y, width, height] (normalisé 0-1)
    COCO: [x1, y1, x2, y2] (coordonnées absolues) + labels séparés
    """
    if not yolo_boxes:
        return {
            'boxes': torch.zeros((0, 4), dtype=torch.float32),
            'labels': torch.zeros((0,), dtype=torch.int64)
        }

    coco_boxes = []
    labels = []

    for box in yolo_boxes:
        # Convertir coordonnées normalisées → absolues
        center_x = box['center_x'] * img_width
        center_y = box['center_y'] * img_height
        width = box['width'] * img_width
        height = box['height'] * img_height

        # Calculer coins (x1, y1, x2, y2)
        x1 = center_x - width/2
        y1 = center_y - height/2
        x2 = center_x + width/2
        y2 = center_y + height/2

        # Clamper les coordonnées dans les limites de l'image
        x1 = max(0, min(x1, img_width))
        y1 = max(0, min(y1, img_height))
        x2 = max(0, min(x2, img_width))
        y2 = max(0, min(y2, img_height))

        # Vérifier que la box est valide
        if x2 > x1 and y2 > y1:
            coco_boxes.append([x1, y1, x2, y2])
            labels.append(1)  # 1 = nodule (0 sera pour background)

    return {
        'boxes': torch.tensor(coco_boxes, dtype=torch.float32),
        'labels': torch.tensor(labels, dtype=torch.int64)
    }

# Test de la fonction
print("🧪 Test de conversion YOLO → COCO:")
test_box = [{'class_id': 1, 'center_x': 0.5, 'center_y': 0.5, 'width': 0.1, 'height': 0.1}]
result = yolo_to_coco_format(test_box, 512, 512)
print(f"Input YOLO: {test_box[0]}")
print(f"Output COCO: boxes={result['boxes']}, labels={result['labels']}")

🧪 Test de conversion YOLO → COCO:
Input YOLO: {'class_id': 1, 'center_x': 0.5, 'center_y': 0.5, 'width': 0.1, 'height': 0.1}
Output COCO: boxes=tensor([[230.4000, 230.4000, 281.6000, 281.6000]]), labels=tensor([1])


In [5]:
# Fonction pour lire labels YOLO (du notebook 01)
def read_yolo_label(label_path):
    """Réutiliser la fonction du notebook 01"""
    boxes = []
    if not label_path.exists():
        return boxes

    with open(label_path, 'r') as f:
        for line in f:
            line = line.strip()
            if line:
                parts = line.split()
                boxes.append({
                    'class_id': int(parts[0]),
                    'center_x': float(parts[1]),
                    'center_y': float(parts[2]),
                    'width': float(parts[3]),
                    'height': float(parts[4])
                })
    return boxes

class LungNoduleDataset(Dataset):
    """
    Dataset custom pour les nodules pulmonaires
    Compatible avec Faster R-CNN
    """

    def __init__(self, images_dir, labels_dir, transforms=None, config=CONFIG):
        self.images_dir = Path(images_dir)
        self.labels_dir = Path(labels_dir)
        self.transforms = transforms
        self.config = config

        # Lister toutes les images
        self.image_files = list(self.images_dir.glob('*.jpg'))
        print(f"📊 Dataset créé avec {len(self.image_files)} images")

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Charger l'image
        img_path = self.image_files[idx]
        image = cv2.imread(str(img_path))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        # Charger les labels
        label_name = img_path.stem + '.txt'
        label_path = self.labels_dir / label_name
        yolo_boxes = read_yolo_label(label_path)

        # Dimensions originales
        original_height, original_width = image.shape[:2]

        # Resize l'image
        target_height, target_width = self.config['image_size']
        image_resized = cv2.resize(image, (target_width, target_height))

        # Ajuster les coordonnées des boxes pour la nouvelle taille
        scale_x = target_width / original_width
        scale_y = target_height / original_height

        adjusted_boxes = []
        for box in yolo_boxes:
            adjusted_boxes.append({
                'class_id': box['class_id'],
                'center_x': box['center_x'],  # Déjà normalisé
                'center_y': box['center_y'],  # Déjà normalisé
                'width': box['width'],        # Déjà normalisé
                'height': box['height']       # Déjà normalisé
            })

        # Convertir en format COCO
        target = yolo_to_coco_format(adjusted_boxes, target_width, target_height)

        # Convertir image en tensor
        image_tensor = torch.from_numpy(image_resized).permute(2, 0, 1).float() / 255.0

        # Normalisation ImageNet
        normalize = transforms.Normalize(
            mean=self.config['normalize_mean'],
            std=self.config['normalize_std']
        )
        image_tensor = normalize(image_tensor)

        return image_tensor, target

# Test du dataset
print("🧪 Test du dataset:")
train_dataset = LungNoduleDataset(TRAIN_IMAGES, TRAIN_LABELS)
print(f"✅ Dataset train créé: {len(train_dataset)} échantillons")

# Test d'un échantillon
sample_image, sample_target = train_dataset[0]
print(f"📊 Échantillon 0:")
print(f"  Image shape: {sample_image.shape}")
print(f"  Boxes: {sample_target['boxes'].shape}")
print(f"  Labels: {sample_target['labels']}")

🧪 Test du dataset:
📊 Dataset créé avec 239 images
✅ Dataset train créé: 239 échantillons
📊 Échantillon 0:
  Image shape: torch.Size([3, 512, 512])
  Boxes: torch.Size([1, 4])
  Labels: tensor([1])


In [10]:
# Définir les transformations d'augmentation CORRIGÉES
def get_training_transforms():
    """
    Augmentations pour améliorer la robustesse du modèle
    Attention: il faut aussi transformer les bounding boxes !
    """
    return A.Compose([
        # Transformations géométriques
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.3),
        A.Rotate(limit=15, p=0.5),

        # Transformations d'intensité (importantes pour les images médicales)
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.GaussianBlur(blur_limit=3, p=0.3),
        # A.GaussNoise retiré - paramètre non supporté dans cette version

        # Normalisation
        A.Normalize(mean=CONFIG['normalize_mean'], std=CONFIG['normalize_std']),
        ToTensorV2()
    ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

def get_validation_transforms():
    """
    Transformations pour validation (seulement normalisation)
    """
    return A.Compose([
        A.Normalize(mean=CONFIG['normalize_mean'], std=CONFIG['normalize_std']),
        ToTensorV2()
    ], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

class LungNoduleDatasetAugmented(Dataset):
    """
    Dataset avec augmentation automatique
    """

    def __init__(self, images_dir, labels_dir, transforms=None, config=CONFIG):
        self.images_dir = Path(images_dir)
        self.labels_dir = Path(labels_dir)
        self.transforms = transforms
        self.config = config
        self.image_files = list(self.images_dir.glob('*.jpg'))

        print(f"📊 Dataset augmenté créé avec {len(self.image_files)} images")

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        # Charger image et labels
        img_path = self.image_files[idx]
        image = cv2.imread(str(img_path))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        label_name = img_path.stem + '.txt'
        label_path = self.labels_dir / label_name
        yolo_boxes = read_yolo_label(label_path)

        # Resize
        original_height, original_width = image.shape[:2]
        target_height, target_width = self.config['image_size']
        image = cv2.resize(image, (target_width, target_height))

        # Convertir YOLO → Pascal VOC pour Albumentations
        bboxes = []
        labels = []

        for box in yolo_boxes:
            # Convertir coordonnées normalisées → absolues
            center_x = box['center_x'] * target_width
            center_y = box['center_y'] * target_height
            width = box['width'] * target_width
            height = box['height'] * target_height

            x1 = center_x - width/2
            y1 = center_y - height/2
            x2 = center_x + width/2
            y2 = center_y + height/2

            # Clamper
            x1 = max(0, min(x1, target_width))
            y1 = max(0, min(y1, target_height))
            x2 = max(0, min(x2, target_width))
            y2 = max(0, min(y2, target_height))

            if x2 > x1 and y2 > y1:
                bboxes.append([x1, y1, x2, y2])
                labels.append(1)  # Integer direct

        # Appliquer les transformations
        if self.transforms:
            transformed = self.transforms(image=image, bboxes=bboxes, labels=labels)
            image = transformed['image']
            bboxes = transformed['bboxes']
            labels = transformed['labels']

        # Convertir en tensors PyTorch (FIX conversion explicite)
        if len(bboxes) > 0:
            boxes_tensor = torch.tensor(bboxes, dtype=torch.float32)
            labels_tensor = torch.tensor([int(label) for label in labels], dtype=torch.int64)  # FIX
        else:
            boxes_tensor = torch.zeros((0, 4), dtype=torch.float32)
            labels_tensor = torch.zeros((0,), dtype=torch.int64)

        target = {
            'boxes': boxes_tensor,
            'labels': labels_tensor
        }

        return image, target

# Test avec augmentation CORRIGÉ
print("🔄 Test du dataset avec augmentation CORRIGÉ:")
train_transforms = get_training_transforms()
train_dataset_aug = LungNoduleDatasetAugmented(TRAIN_IMAGES, TRAIN_LABELS, train_transforms)

# Test plusieurs échantillons pour voir la variation
print("📊 Test de 3 échantillons augmentés:")
for i in range(3):
    img, target = train_dataset_aug[0]  # Même image, augmentations différentes
    print(f"  Échantillon {i+1}: Image {img.shape}, Boxes {target['boxes'].shape}")



🔄 Test du dataset avec augmentation CORRIGÉ:
📊 Dataset augmenté créé avec 239 images
📊 Test de 3 échantillons augmentés:
  Échantillon 1: Image torch.Size([3, 512, 512]), Boxes torch.Size([1, 4])
  Échantillon 2: Image torch.Size([3, 512, 512]), Boxes torch.Size([1, 4])
  Échantillon 3: Image torch.Size([3, 512, 512]), Boxes torch.Size([1, 4])


In [12]:
def collate_fn(batch):
    """
    Fonction collate custom pour Faster R-CNN
    Chaque image peut avoir un nombre différent de bounding boxes
    """
    images = []
    targets = []

    for image, target in batch:
        images.append(image)
        targets.append(target)

    return images, targets

# Créer les datasets finaux
print("🔧 Création des datasets finaux...")

# Transformations
train_transforms = get_training_transforms()
val_transforms = get_validation_transforms()

# Datasets
train_dataset = LungNoduleDatasetAugmented(TRAIN_IMAGES, TRAIN_LABELS, train_transforms)
val_dataset = LungNoduleDatasetAugmented(VAL_IMAGES, VAL_LABELS, val_transforms)

# DataLoaders
train_loader = DataLoader(
    train_dataset,
    batch_size=CONFIG['batch_size'],
    shuffle=True,                    # Mélanger pour l'entraînement
    num_workers=0,                   # 0 pour éviter les problèmes multiprocessing
    collate_fn=collate_fn           # Function custom pour gérer les boxes variables
)

val_loader = DataLoader(
    val_dataset,
    batch_size=CONFIG['batch_size'],
    shuffle=False,                   # Pas de mélange pour validation
    num_workers=0,
    collate_fn=collate_fn
)

print(f"✅ DataLoaders créés:")
print(f"   🔹 Train: {len(train_loader)} batches de {CONFIG['batch_size']}")
print(f"   🔹 Val: {len(val_loader)} batches de {CONFIG['batch_size']}")

# Test d'un batch
print("\n🧪 Test d'un batch:")
train_iter = iter(train_loader)
sample_images, sample_targets = next(train_iter)

print(f"📊 Batch info:")
print(f"   🖼️ Images: {len(sample_images)} images de shape {sample_images[0].shape}")
print(f"   📦 Targets: {len(sample_targets)} dictionnaires")
print(f"   📊 Premier target: boxes {sample_targets[0]['boxes'].shape}, labels {sample_targets[0]['labels']}")

# Vérification mémoire
print(f"\n💾 Statistiques:")
print(f"   📊 Total échantillons train: {len(train_dataset)}")
print(f"   📊 Total échantillons val: {len(val_dataset)}")
print(f"   ⚡ Batch size: {CONFIG['batch_size']} (adapté pour CPU)")
print(f"   🔄 Train batches: {len(train_loader)}")
print(f"   🔄 Val batches: {len(val_loader)}")

🔧 Création des datasets finaux...
📊 Dataset augmenté créé avec 239 images
📊 Dataset augmenté créé avec 41 images
✅ DataLoaders créés:
   🔹 Train: 60 batches de 4
   🔹 Val: 11 batches de 4

🧪 Test d'un batch:
📊 Batch info:
   🖼️ Images: 4 images de shape torch.Size([3, 512, 512])
   📦 Targets: 4 dictionnaires
   📊 Premier target: boxes torch.Size([1, 4]), labels tensor([1])

💾 Statistiques:
   📊 Total échantillons train: 239
   📊 Total échantillons val: 41
   ⚡ Batch size: 4 (adapté pour CPU)
   🔄 Train batches: 60
   🔄 Val batches: 11


In [13]:
import torchvision
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

def create_model(num_classes):
    """
    Créer Faster R-CNN pré-entraîné sur COCO
    Modifier la tête de classification pour nos classes
    """
    # Charger le modèle pré-entraîné
    model = fasterrcnn_resnet50_fpn(pretrained=True)

    # Remplacer la tête de classification
    # Le modèle COCO a 91 classes, nous en avons 2 (background + nodule)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model

# Créer le modèle
print("🤖 Création du modèle Faster R-CNN...")
model = create_model(CONFIG['num_classes'])

# Informations sur le modèle
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"✅ Modèle créé:")
print(f"   🧠 Architecture: Faster R-CNN + ResNet50 + FPN")
print(f"   📊 Paramètres totaux: {total_params:,}")
print(f"   🔥 Paramètres entraînables: {trainable_params:,}")
print(f"   🎯 Classes: {CONFIG['num_classes']} (background + nodule)")

# Test du modèle
print("\n🧪 Test du modèle:")
model.eval()
with torch.no_grad():
    # Prendre un batch de test
    test_images, test_targets = next(iter(train_loader))

    # Faire une prédiction
    predictions = model(test_images)

    print(f"📊 Input: {len(test_images)} images")
    print(f"📊 Output: {len(predictions)} prédictions")
    print(f"📊 Première prédiction: {len(predictions[0]['boxes'])} boxes détectées")

# Configuration de l'entraînement
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

print(f"\n⚡ Device: {device}")
print(f"🚀 Modèle prêt pour l'entraînement!")

🤖 Création du modèle Faster R-CNN...




Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /Users/abchatealiibrahim/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth


100%|██████████| 160M/160M [00:52<00:00, 3.18MB/s] 


✅ Modèle créé:
   🧠 Architecture: Faster R-CNN + ResNet50 + FPN
   📊 Paramètres totaux: 41,299,161
   🔥 Paramètres entraînables: 41,076,761
   🎯 Classes: 2 (background + nodule)

🧪 Test du modèle:
📊 Input: 4 images
📊 Output: 4 prédictions
📊 Première prédiction: 100 boxes détectées

⚡ Device: cpu
🚀 Modèle prêt pour l'entraînement!
