# Projet PDL – Classification d'images mammaires (BUSI)

Ce notebook propose une implémentation **complète** et **commentée** de bout en bout d’un pipeline de classification
d’images mammaires issues du dataset **BUSI**.  
L’objectif est de discriminer trois classes :

1. **Benign** – Tumeurs bénignes  
2. **Malignant** – Tumeurs malignes  
3. **Normal** – Tissu sain  

Chaque section du notebook est introduite par un bloc *Markdown* détaillant les choix méthodologiques,
puis suivie par le code correspondant.


## 1. Configuration générale

Dans cette section :

* Définition des hyper‑paramètres dans une classe `Args` pour un accès centralisé.  
* Initialisation des *seeds* pour assurer la **reproductibilité** des expériences sur `torch`, `numpy` et `random`.  
* Les chemins par défaut supposent l’arborescence :

```
PDL/
 └── Dataset/
     ├── benign/
     ├── malignant/
     └── normal/
```


In [None]:
# -*- coding: utf-8 -*-
"""Projet PDL - Classification d'images mammaires BUSI complet et fonctionnel"""

import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset, random_split
from torch.optim import AdamW
from torch.optim.lr_scheduler import ReduceLROnPlateau
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm
from sklearn.metrics import confusion_matrix, classification_report, f1_score
import time
from collections import Counter
import random

# Configuration
class Args:
    data_dir = "PDL/Dataset"
    model_name = "resnet"
    num_classes = 3
    epochs = 20
    batch_size = 32
    lr = 1e-4
    weight_decay = 0.01
    dropout_rate = 0.85
    patience = 7
    seed = 42
    mixup_alpha = 0.3

args = Args()

# Initialisation des seeds
torch.manual_seed(args.seed)
np.random.seed(args.seed)
random.seed(args.seed)


## 2. Analyse exploratoire rapide du dataset

Le but est de :

* **Compter** les images par classe afin d’identifier un éventuel déséquilibre.  
* Mesurer les **dimensions** moyennes, min et max pour choisir une taille de redimensionnement appropriée.  
* Visualiser :
  * La répartition des classes (barplot)  
  * La dispersion des dimensions brutes (scatter)  
  * Le ratio largeur/hauteur (histogramme)  


In [None]:
def analyze_dataset(dataset_path):
    """Affiche quelques statistiques et visualisations de base."""
    print("\n=== Analyse initiale du dataset ===")

    class_counts = Counter()
    sizes = []

    for cls in ['benign', 'malignant', 'normal']:
        cls_dir = os.path.join(dataset_path, cls)
        if not os.path.exists(cls_dir):
            continue

        for img_name in os.listdir(cls_dir):
            if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                img_path = os.path.join(cls_dir, img_name)
                with Image.open(img_path) as img:
                    sizes.append(img.size)
                class_counts[cls] += 1

    # Affichage texte
    total_imgs = sum(class_counts.values())
    print(f"\nNombre total d'images : {total_imgs}")
    print("Répartition par classe :")
    for cls, count in class_counts.items():
        print(f"- {cls}: {count} images ({count/total_imgs:.1%})")

    widths, heights = zip(*sizes)
    print(f"\nDimensions moyennes : {np.mean(widths):.0f}×{np.mean(heights):.0f}")
    print(f"Dimensions min : {min(widths)}×{min(heights)}")
    print(f"Dimensions max : {max(widths)}×{max(heights)}")

    # Visualisations
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 3, 1)
    sns.barplot(x=list(class_counts.keys()), y=list(class_counts.values()))
    plt.title('Répartition des classes')

    plt.subplot(1, 3, 2)
    plt.scatter(widths, heights, alpha=0.5)
    plt.xlabel('Largeur (px)')
    plt.ylabel('Hauteur (px)')
    plt.title('Distribution des dimensions')

    plt.subplot(1, 3, 3)
    plt.hist([w/h for w, h in sizes], bins=20)
    plt.xlabel('Ratio largeur/hauteur')
    plt.title('Ratios largeur/hauteur')

    plt.tight_layout()
    plt.show()


## 3. Pipelines de pré‑traitement et d’*augmentation* des données

* **Entraînement** : redimensionnement vers 256², flips horizontaux/verticaux, *affine*, *ColorJitter*,
  *RandomErasing*, puis normalisation ImageNet.  
* **Validation / Test** : redimensionnement fixe 224² + normalisation.  
Les mêmes statistiques sont utilisées pour la normalisation que celles d’ImageNet,
car nous ré‑utilisons des poids pré‑entraînés.


In [None]:
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(p=0.3),
    transforms.RandomVerticalFlip(p=0.3),
    transforms.RandomAffine(degrees=10, translate=(0.1, 0.1), scale=(0.85, 1.15), shear=5),
    transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
    transforms.RandomErasing(p=0.3, scale=(0.02, 0.1)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])


## 4. Classe `BUSIDataset`

Cette classe hérite de `torch.utils.data.Dataset` et :

* Charge les chemins d’images et leur étiquette **une seule fois** lors de l’initialisation.  
* Convertit chaque image en RGB, puis applique la **transformation** appropriée (train/val/test).  
* Expose `classes` et `class_to_idx` pour un accès simple aux noms.


In [None]:
class BUSIDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.transform = transform
        self.classes = ['benign', 'malignant', 'normal']
        self.class_to_idx = {cls: i for i, cls in enumerate(self.classes)}
        self.samples = []

        for cls in self.classes:
            cls_dir = os.path.join(root_dir, cls)
            if not os.path.isdir(cls_dir):
                continue

            for img_name in os.listdir(cls_dir):
                if img_name.lower().endswith(('.png', '.jpg', '.jpeg')):
                    self.samples.append((os.path.join(cls_dir, img_name), self.class_to_idx[cls]))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        img_path, label = self.samples[idx]
        img = Image.open(img_path).convert('RGB')
        if self.transform:
            img = self.transform(img)
        return img, label


## 5. Architecture du modèle

Nous utilisons **ResNet‑18** pré‑entraînée ImageNet :

* **Gel** des couches jusqu’à `layer3` incluse pour réduire le coût d’entraînement et éviter le sur‑apprentissage.  
* Remplacement de la *fully‑connected* par une tête sur‑mesure :
  * `Dropout` (réduire overfitting)  
  * `Linear` → 128
  * `BatchNorm1d` + `ReLU`
  * `Dropout`
  * `Linear` final vers `num_classes`  
* `dropout_rate` configurable dans `Args`.


In [None]:
def get_model():
    model = models.resnet18(weights=models.ResNet18_Weights.IMAGENET1K_V1)

    # Gel des couches basses
    for param in model.parameters():
        param.requires_grad = False
    for param in model.layer4.parameters():
        param.requires_grad = True
    # Tête de classification
    num_features = model.fc.in_features
    model.fc = nn.Sequential(
        nn.Dropout(args.dropout_rate),
        nn.Linear(num_features, 128),
        nn.BatchNorm1d(128),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(128, args.num_classes)
    )
    return model


## 6. Augmentation **Mixup**

*Mixup* crée des exemples synthétiques en mélangeant deux images et leurs labels
selon un coefficient `λ ~ Beta(α, α)`.

Avantages :

* Lisse la frontière de décision.  
* Améliore la généralisation surtout sur de petits datasets.


In [None]:
def mixup_data(x, y, alpha=0.4):
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1
    batch_size = x.size(0)
    index = torch.randperm(batch_size)
    mixed_x = lam * x + (1 - lam) * x[index]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam


## 7. Fonctions de visualisation des métriques

* **Courbes d’apprentissage** : loss, accuracy, learning rate.  
* **Matrice de confusion** normalisée.  
* **Analyse d’erreurs** : affichage des images mal classées avec probas.


In [None]:
def plot_learning_curves(history):
    plt.figure(figsize=(15, 5))

    plt.subplot(1, 3, 1)
    plt.plot(history['epoch'], history['train_loss'], label='Train')
    plt.plot(history['epoch'], history['val_loss'], label='Validation')
    plt.title('Évolution de la Loss')
    plt.xlabel('Epochs')
    plt.legend()

    plt.subplot(1, 3, 2)
    plt.plot(history['epoch'], history['train_acc'], label='Train')
    plt.plot(history['epoch'], history['val_acc'], label='Validation')
    plt.title('Évolution de l\'Accuracy')
    plt.xlabel('Epochs')
    plt.legend()

    plt.subplot(1, 3, 3)
    plt.plot(history['epoch'], history['lr'])
    plt.title('Évolution du Learning Rate')
    plt.xlabel('Epochs')

    plt.tight_layout()
    plt.show()

def plot_confusion_matrix(y_true, y_pred, classes):
    cm = confusion_matrix(y_true, y_pred)
    cm_norm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    plt.figure(figsize=(8, 6))
    sns.heatmap(cm_norm, annot=True, fmt='.2f', cmap='Blues',
                xticklabels=classes, yticklabels=classes)
    plt.title('Matrice de Confusion Normalisée')
    plt.xlabel('Prédictions')
    plt.ylabel('Vraies étiquettes')
    plt.show()

def plot_error_analysis(model, test_loader, device, class_names):
    model.eval()
    errors = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for i in range(len(labels)):
                if preds[i] != labels[i]:
                    errors.append((
                        inputs[i].cpu(),
                        labels[i].cpu(),
                        preds[i].cpu(),
                        F.softmax(outputs[i], dim=0).cpu().numpy()
                    ))
            if len(errors) >= 8:
                break

    if not errors:
        print("Aucune erreur à afficher!")
        return

    plt.figure(figsize=(15, 20))
    for i in range(min(8, len(errors))):
        img, true, pred, probs = errors[i]

        # Image
        plt.subplot(8, 2, 2*i+1)
        img = img.numpy().transpose((1, 2, 0))
        img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
        img = np.clip(img, 0, 1)
        plt.imshow(img)
        plt.title(f"Vrai: {class_names[true]}\nPrédit: {class_names[pred]}")
        plt.axis('off')

        # Probabilités
        plt.subplot(8, 2, 2*i+2)
        plt.barh(class_names, probs)
        plt.xlim(0, 1)
        plt.xlabel('Probabilité')
    plt.tight_layout()
    plt.show()


## 8. Boucle d’entraînement, validation et sauvegarde

Points clés :

* **Scheduler** : `ReduceLROnPlateau` (divise LR par 2 si la val‑accuracy stagne 3 epochs).  
* **Label Smoothing** : `CrossEntropyLoss(label_smoothing=0.2)` atténue la sur‑confiance.  
* **Early Stopping** : patience configurable (`Args.patience`).  
* Sauvegarde automatique du *meilleur* modèle dans `best_model.pth`.


In [None]:
def train_and_validate():
    # 1) Analyse exploratoire
    analyze_dataset(args.data_dir)

    # 2) Création des jeux Train / Val / Test
    dataset = BUSIDataset(args.data_dir, transform=train_transform)
    train_size = int(0.7 * len(dataset))
    val_size   = int(0.15 * len(dataset))
    test_size  = len(dataset) - train_size - val_size

    train_dataset, val_dataset, test_dataset = random_split(
        dataset, [train_size, val_size, test_size],
        generator=torch.Generator().manual_seed(args.seed)
    )

    # Switch des transforms pour val/test
    val_dataset.dataset.transform  = val_transform
    test_dataset.dataset.transform = val_transform

    # 3) DataLoaders
    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True, num_workers=2)
    val_loader   = DataLoader(val_dataset,   batch_size=args.batch_size, num_workers=2)
    test_loader  = DataLoader(test_dataset,  batch_size=args.batch_size, num_workers=2)

    # 4) Modèle
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"\nUtilisation de {device}")
    model = get_model().to(device)

    # 5) Optimisation
    optimizer  = AdamW(model.parameters(), lr=args.lr, weight_decay=args.weight_decay)
    scheduler  = ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=3)
    criterion  = nn.CrossEntropyLoss(label_smoothing=0.2)

    # 6) Historique
    history = {'epoch': [], 'train_loss': [], 'train_acc': [], 'val_loss': [], 'val_acc': [], 'lr': []}

    best_val_acc = 0
    epochs_no_improve = 0
    print("\nDébut de l'entraînement...")
    start_time = time.time()

    for epoch in range(args.epochs):
        model.train()
        running_loss, correct, total = 0.0, 0, 0

        for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{args.epochs}"):
            inputs, labels = inputs.to(device), labels.to(device)

            # Mixup
            inputs, targets_a, targets_b, lam = mixup_data(inputs, labels, args.mixup_alpha)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = lam * criterion(outputs, targets_a) + (1 - lam) * criterion(outputs, targets_b)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total   += labels.size(0)
            correct += lam * (predicted == targets_a).sum().item() + (1 - lam) * (predicted == targets_b).sum().item()

        # Train metrics
        train_loss = running_loss / len(train_loader)
        train_acc  = 100 * correct / total

        # Validation metrics
        val_loss, val_acc = validate(model, val_loader, criterion, device)

        # Historique
        history['epoch'].append(epoch+1)
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        history['lr'].append(optimizer.param_groups[0]['lr'])

        scheduler.step(val_acc)

        print(f"Epoch {epoch+1}/{args.epochs} - Train Loss: {train_loss:.4f}, Acc: {train_acc:.2f}% | "
              f"Val Loss: {val_loss:.4f}, Acc: {val_acc:.2f}% | LR: {optimizer.param_groups[0]['lr']:.2e}")

        # Early stopping
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            epochs_no_improve = 0
            torch.save(model.state_dict(), "best_model.pth")
        else:
            epochs_no_improve += 1
            if epochs_no_improve == args.patience:
                print(f"\nEarly stopping à l'époque {epoch+1}!")
                break

    # Visualisation des courbes
    plot_learning_curves(history)

    # Évaluation finale
    print("\n=== Évaluation sur le test set ===")
    evaluate_model(model, test_loader, device, dataset.classes)

    print(f"\nTemps total: {(time.time()-start_time)/60:.2f} minutes")


## 9. Fonctions `validate` et `evaluate_model`

* `validate` : boucle simple sans gradient, renvoie loss & accuracy sur le set de validation.  
* `evaluate_model` : recharge **le meilleur** modèle, calcule *accuracy*, *F1 macro*, matrice de confusion et lance l’analyse d’erreurs.


In [None]:
def validate(model, val_loader, criterion, device):
    model.eval()
    val_loss, correct, total = 0.0, 0, 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total   += labels.size(0)
            correct += (predicted == labels).sum().item()

    return val_loss / len(val_loader), 100 * correct / total

def evaluate_model(model, test_loader, device, class_names):
    model.load_state_dict(torch.load("best_model.pth"))
    model.eval()
    all_preds, all_labels = [], []

    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc="Évaluation"):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = 100 * np.mean(np.array(all_preds) == np.array(all_labels))
    f1 = f1_score(all_labels, all_preds, average='macro')
    print(f"\nAccuracy: {accuracy:.2f}%")
    print(f"F1-Score (macro): {f1:.4f}")

    plot_confusion_matrix(all_labels, all_preds, class_names)
    print("\n=== Rapport de classification ===")
    print(classification_report(all_labels, all_preds, target_names=class_names, digits=4))

    print("\n=== Analyse des erreurs ===")
    plot_error_analysis(model, test_loader, device, class_names)


## 10. Point d’entrée

Exécute le pipeline complet (`train_and_validate`) si le notebook est lancé comme script.  
Pour un usage interactif, il suffit d’appeler `train_and_validate()` depuis une cellule.


In [None]:
if __name__ == "__main__":
    train_and_validate()
