# Entraînement du Modèle en Deux Étapes pour la Classification des Prunes Africaines

Ce notebook vous guide à travers le processus complet d'entraînement du modèle en deux étapes pour la classification des prunes africaines :
1. **Détection** : Entraîne un modèle pour déterminer si l'image contient une prune ou non
2. **Classification** : Entraîne un modèle pour classifier l'état des prunes (bonne qualité, non mûre, tachetée, fissurée, meurtrie ou pourrie)

## Étapes :
1. Configuration de l'environnement
2. Téléchargement et préparation des données
3. Définition de l'architecture du modèle
4. Entraînement du modèle de détection
5. Entraînement du modèle de classification
6. Évaluation des performances
7. Sauvegarde des modèles entraînés
8. Test du modèle complet

## 1. Configuration de l'environnement

Commençons par installer les dépendances nécessaires et configurer l'environnement.

In [None]:
# Vérifier si nous utilisons un GPU
!nvidia-smi

# Installer les dépendances
!pip install torch torchvision numpy matplotlib scikit-learn pillow kaggle seaborn

# Importer les bibliothèques nécessaires
import os
import sys
import json
import time
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, random_split
import matplotlib.pyplot as plt
from PIL import Image
import seaborn as sns
from sklearn.metrics import confusion_matrix, classification_report
from google.colab import drive, files
import zipfile
import random

# Fixer les seeds pour la reproductibilité
seed = 42
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# Déterminer le device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Utilisation de: {device}")

## 2. Connexion à Google Drive

Connectons-nous à Google Drive pour sauvegarder les modèles entraînés.

In [None]:
# Monter Google Drive
drive.mount('/content/drive')

# Définir le chemin vers le dossier où sauvegarder les modèles entraînés
models_dir = '/content/drive/MyDrive/african_plums_models'

# Créer le dossier s'il n'existe pas
os.makedirs(models_dir, exist_ok=True)

# Créer un dossier local pour les données
data_dir = '/content/data'
os.makedirs(data_dir, exist_ok=True)

## 3. Téléchargement et préparation des données

Pour entraîner notre modèle en deux étapes, nous avons besoin de deux types de données :
1. Des images de prunes africaines (dataset Kaggle)
2. Des images qui ne sont pas des prunes (pour la détection)

In [None]:
# Configuration de Kaggle pour télécharger le dataset
!pip install -q kaggle
!mkdir -p ~/.kaggle

# Télécharger votre fichier kaggle.json depuis votre compte Kaggle et l'importer ici
print("Veuillez télécharger votre fichier kaggle.json")
files.upload()  # Sélectionnez votre fichier kaggle.json

# Déplacer le fichier et définir les permissions
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

# Télécharger le dataset des prunes africaines
!kaggle datasets download -d arnaudfadja/african-plums-quality-and-defect-assessment-data

# Extraire le contenu
!mkdir -p $data_dir/african_plums_dataset
!unzip -q african-plums-quality-and-defect-assessment-data.zip -d $data_dir/african_plums_dataset

# Vérifier la structure du dataset
!ls -la $data_dir/african_plums_dataset

### Téléchargement d'images qui ne sont pas des prunes

Pour la détection, nous avons besoin d'images qui ne sont pas des prunes. Nous allons utiliser un dataset générique d'images.

In [None]:
# Option 1: Télécharger un dataset d'images diverses depuis Kaggle
!kaggle datasets download -d prasunroy/natural-images
!mkdir -p $data_dir/non_plum_images
!unzip -q natural-images.zip -d $data_dir/temp_natural_images

# Copier les images qui ne sont pas des fruits dans le dossier non_plum_images
!mkdir -p $data_dir/non_plum_images/non_plum
!cp $data_dir/temp_natural_images/natural_images/car/* $data_dir/non_plum_images/non_plum/
!cp $data_dir/temp_natural_images/natural_images/cat/* $data_dir/non_plum_images/non_plum/
!cp $data_dir/temp_natural_images/natural_images/dog/* $data_dir/non_plum_images/non_plum/
!cp $data_dir/temp_natural_images/natural_images/flower/* $data_dir/non_plum_images/non_plum/
!cp $data_dir/temp_natural_images/natural_images/house/* $data_dir/non_plum_images/non_plum/
!cp $data_dir/temp_natural_images/natural_images/motorbike/* $data_dir/non_plum_images/non_plum/
!cp $data_dir/temp_natural_images/natural_images/person/* $data_dir/non_plum_images/non_plum/

# Option 2 (alternative): Télécharger vos propres images
print("\nVous pouvez également télécharger vos propres images qui ne sont pas des prunes :")
print("1. Créez un dossier zip contenant des images qui ne sont pas des prunes")
print("2. Téléchargez ce fichier zip ci-dessous")

# Décommenter pour utiliser cette option
# uploaded = files.upload()  # Sélectionnez votre fichier zip
# for filename in uploaded.keys():
#     if filename.endswith('.zip'):
#         with zipfile.ZipFile(filename, 'r') as zip_ref:
#             zip_ref.extractall(f'{data_dir}/non_plum_images')

# Vérifier le nombre d'images dans chaque dossier
!echo "Nombre d'images de prunes:"
!find $data_dir/african_plums_dataset -type f | wc -l

!echo "Nombre d'images qui ne sont pas des prunes:"
!find $data_dir/non_plum_images -type f | wc -l

### Définition des classes de données et exploration

In [None]:
# Définir les classes pour la détection et la classification
detection_class_names = ['plum', 'non_plum']

# Trouver les noms des classes de classification à partir des dossiers
classification_class_names = [d for d in os.listdir(f'{data_dir}/african_plums_dataset') 
                             if os.path.isdir(os.path.join(f'{data_dir}/african_plums_dataset', d))]
classification_class_names.sort()  # Trier pour avoir un ordre cohérent

print(f"Classes de détection: {detection_class_names}")
print(f"Classes de classification: {classification_class_names}")

# Explorer la distribution des données
def analyze_dataset_distribution(data_dir, class_names):
    class_counts = {}
    for class_name in class_names:
        class_path = os.path.join(data_dir, class_name)
        if os.path.isdir(class_path):
            class_counts[class_name] = len(os.listdir(class_path))
    
    # Afficher la distribution
    plt.figure(figsize=(10, 6))
    plt.bar(class_counts.keys(), class_counts.values())
    plt.xlabel('Classe')
    plt.ylabel('Nombre d\'images')
    plt.title('Distribution des classes')
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.show()
    
    return class_counts

# Analyser la distribution des classes de classification
print("\nDistribution des classes de classification:")
classification_counts = analyze_dataset_distribution(f'{data_dir}/african_plums_dataset', classification_class_names)

# Analyser la distribution des classes de détection
print("\nDistribution des classes de détection:")
detection_counts = {}
detection_counts['plum'] = sum(classification_counts.values())
detection_counts['non_plum'] = len(os.listdir(f'{data_dir}/non_plum_images/non_plum'))

plt.figure(figsize=(8, 5))
plt.bar(detection_counts.keys(), detection_counts.values())
plt.xlabel('Classe')
plt.ylabel('Nombre d\'images')
plt.title('Distribution des classes de détection')
plt.tight_layout()
plt.show()

### Visualisation d'exemples d'images

In [None]:
# Visualiser des exemples d'images pour chaque classe
def visualize_examples(data_dir, class_names, num_examples=3):
    plt.figure(figsize=(15, len(class_names) * 3))
    for i, class_name in enumerate(class_names):
        class_path = os.path.join(data_dir, class_name)
        if os.path.isdir(class_path):
            images = os.listdir(class_path)[:num_examples]  # Prendre 3 exemples
            
            for j, img_name in enumerate(images):
                img_path = os.path.join(class_path, img_name)
                img = Image.open(img_path).convert('RGB')
                
                plt.subplot(len(class_names), num_examples, i*num_examples + j + 1)
                plt.imshow(img)
                plt.title(f"{class_name}")
                plt.axis('off')
    
    plt.tight_layout()
    plt.show()

# Visualiser des exemples d'images de prunes
print("Exemples d'images de prunes par classe:")
visualize_examples(f'{data_dir}/african_plums_dataset', classification_class_names)

# Visualiser des exemples d'images qui ne sont pas des prunes
print("\nExemples d'images qui ne sont pas des prunes:")
visualize_examples(f'{data_dir}/non_plum_images', ['non_plum'], num_examples=5)

## 4. Définition des classes et fonctions pour le prétraitement des données

In [None]:
# Définir les transformations pour les images
def get_train_transforms(img_size=224):
    return transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(10),
        transforms.ColorJitter(brightness=0.2, contrast=0.2),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

def get_val_transforms(img_size=224):
    return transforms.Compose([
        transforms.Resize((img_size, img_size)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])

# Classe de dataset pour les images de prunes
class PlumDataset(Dataset):
    def __init__(self, root_dir, class_names, transform=None):
        self.root_dir = root_dir
        self.class_names = class_names
        self.transform = transform
        self.image_paths = []
        self.labels = []
        
        # Collecter les chemins d'images et les labels
        for class_idx, class_name in enumerate(class_names):
            class_dir = os.path.join(root_dir, class_name)
            if os.path.isdir(class_dir):
                for img_name in os.listdir(class_dir):
                    if img_name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                        self.image_paths.append(os.path.join(class_dir, img_name))
                        self.labels.append(class_idx)
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Classe de dataset pour les images qui ne sont pas des prunes
class NonPlumDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        
        # Collecter les chemins d'images
        for img_name in os.listdir(root_dir):
            if img_name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif')):
                self.image_paths.append(os.path.join(root_dir, img_name))
    
    def __len__(self):
        return len(self.image_paths)
    
    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert('RGB')
        label = 1  # 1 = non_plum
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Fonction pour charger et préparer les données pour les deux étapes
def load_and_prepare_two_stage_data(plum_data_dir, non_plum_data_dir, batch_size=32, img_size=224, val_split=0.15, test_split=0.15, num_workers=4):
    # Transformations
    train_transform = get_train_transforms(img_size)
    val_transform = get_val_transforms(img_size)
    
    # 1. Préparer les données pour la détection
    # Dataset pour les prunes (classe 0)
    plum_dataset = PlumDataset(
        root_dir=plum_data_dir,
        class_names=classification_class_names,
        transform=val_transform  # Utiliser val_transform pour éviter l'augmentation
    )
    
    # Assigner label 0 (plum) à toutes les images de prunes
    for i in range(len(plum_dataset.labels)):
        plum_dataset.labels[i] = 0
    
    # Dataset pour les non-prunes (classe 1)
    non_plum_dataset = NonPlumDataset(
        root_dir=os.path.join(non_plum_data_dir, 'non_plum'),
        transform=val_transform  # Utiliser val_transform pour éviter l'augmentation
    )
    
    # Équilibrer les datasets
    min_size = min(len(plum_dataset), len(non_plum_dataset))
    plum_indices = list(range(len(plum_dataset)))
    random.shuffle(plum_indices)
    plum_indices = plum_indices[:min_size]
    
    non_plum_indices = list(range(len(non_plum_dataset)))
    random.shuffle(non_plum_indices)
    non_plum_indices = non_plum_indices[:min_size]
    
    # Créer des sous-ensembles équilibrés
    from torch.utils.data import Subset
    balanced_plum_dataset = Subset(plum_dataset, plum_indices)
    balanced_non_plum_dataset = Subset(non_plum_dataset, non_plum_indices)
    
    # Combiner les datasets pour la détection
    from torch.utils.data import ConcatDataset
    detection_dataset = ConcatDataset([balanced_plum_dataset, balanced_non_plum_dataset])
    
    # Diviser en train, val, test
    detection_dataset_size = len(detection_dataset)
    detection_val_size = int(detection_dataset_size * val_split)
    detection_test_size = int(detection_dataset_size * test_split)
    detection_train_size = detection_dataset_size - detection_val_size - detection_test_size
    
    detection_train_dataset, detection_val_dataset, detection_test_dataset = random_split(
        detection_dataset, 
        [detection_train_size, detection_val_size, detection_test_size],
        generator=torch.Generator().manual_seed(seed)
    )
    
    # Appliquer les transformations d'entraînement au dataset d'entraînement
    detection_train_dataset = TransformDataset(detection_train_dataset, train_transform)
    
    # Créer les dataloaders pour la détection
    detection_train_loader = DataLoader(detection_train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    detection_val_loader = DataLoader(detection_val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
    detection_test_loader = DataLoader(detection_test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
    
    # 2. Préparer les données pour la classification
    # Dataset pour la classification des prunes
    classification_dataset = PlumDataset(
        root_dir=plum_data_dir,
        class_names=classification_class_names,
        transform=val_transform  # Utiliser val_transform pour éviter l'augmentation
    )
    
    # Diviser en train, val, test
    classification_dataset_size = len(classification_dataset)
    classification_val_size = int(classification_dataset_size * val_split)
    classification_test_size = int(classification_dataset_size * test_split)
    classification_train_size = classification_dataset_size - classification_val_size - classification_test_size
    
    classification_train_dataset, classification_val_dataset, classification_test_dataset = random_split(
        classification_dataset, 
        [classification_train_size, classification_val_size, classification_test_size],
        generator=torch.Generator().manual_seed(seed)
    )
    
    # Appliquer les transformations d'entraînement au dataset d'entraînement
    classification_train_dataset = TransformDataset(classification_train_dataset, train_transform)
    
    # Créer les dataloaders pour la classification
    classification_train_loader = DataLoader(classification_train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
    classification_val_loader = DataLoader(classification_val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
    classification_test_loader = DataLoader(classification_test_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers)
    
    return (
        (detection_train_loader, detection_val_loader, detection_test_loader, detection_class_names),
        (classification_train_loader, classification_val_loader, classification_test_loader, classification_class_names)
    )

# Classe pour appliquer des transformations à un dataset existant
class TransformDataset(Dataset):
    def __init__(self, dataset, transform=None):
        self.dataset = dataset
        self.transform = transform
    
    def __len__(self):
        return len(self.dataset)
    
    def __getitem__(self, idx):
        image, label = self.dataset[idx]
        
        # Si l'image est déjà un tenseur, la convertir en PIL Image
        if isinstance(image, torch.Tensor):
            # Dénormaliser si nécessaire
            if image.shape[0] == 3:  # Si c'est une image RGB
                mean = torch.tensor([0.485, 0.456, 0.406]).view(3, 1, 1)
                std = torch.tensor([0.229, 0.224, 0.225]).view(3, 1, 1)
                image = image * std + mean
                image = image.clamp(0, 1)
            
            # Convertir en PIL Image
            image = transforms.ToPILImage()(image)
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

# Fonction pour visualiser un batch d'images
def visualize_batch(dataloader, class_names):
    # Obtenir un batch
    images, labels = next(iter(dataloader))
    
    # Convertir les images pour l'affichage
    images_np = images.numpy()
    images_np = np.transpose(images_np, (0, 2, 3, 1))
    
    # Dénormaliser
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    images_np = images_np * std + mean
    images_np = np.clip(images_np, 0, 1)
    
    # Afficher les images
    plt.figure(figsize=(15, 8))
    for i in range(min(16, len(images))):
        plt.subplot(4, 4, i+1)
        plt.imshow(images_np[i])
        plt.title(class_names[labels[i]])
        plt.axis('off')
    
    plt.tight_layout()
    plt.show()

## 5. Chargement et préparation des données pour les deux étapes

In [None]:
# Charger et préparer les données pour les deux étapes
print("Chargement des données pour les deux étapes...")
(detection_train_loader, detection_val_loader, detection_test_loader, detection_class_names), \
(classification_train_loader, classification_val_loader, classification_test_loader, classification_class_names) = \
    load_and_prepare_two_stage_data(
        plum_data_dir=f'{data_dir}/african_plums_dataset', 
        non_plum_data_dir=f'{data_dir}/non_plum_images',
        batch_size=32, 
        img_size=224,
        num_workers=2
    )

print(f"Classes de détection: {detection_class_names}")
print(f"Classes de classification: {classification_class_names}")

# Visualiser un batch pour chaque étape
print("\nVisualisation d'un batch d'images pour la détection:")
visualize_batch(detection_train_loader, detection_class_names)

print("\nVisualisation d'un batch d'images pour la classification:")
visualize_batch(classification_train_loader, classification_class_names)

## 6. Définition de l'architecture du modèle

In [None]:
# Définir l'architecture du modèle
class PlumClassifier(nn.Module):
    """Modèle de classification des prunes."""
    def __init__(self, num_classes=6, base_model='resnet18', pretrained=True, dropout_rate=0.5):
        super(PlumClassifier, self).__init__()
        
        self.base_model_name = base_model
        self.num_classes = num_classes
        self.dropout_rate = dropout_rate
        
        # Sélectionner le modèle de base
        if base_model == 'resnet18':
            from torchvision.models import resnet18
            self.base_model = resnet18(pretrained=pretrained)
            num_features = self.base_model.fc.in_features
            self.base_model.fc = nn.Identity()  # Retirer la dernière couche
            
        elif base_model == 'resnet50':
            from torchvision.models import resnet50
            self.base_model = resnet50(pretrained=pretrained)
            num_features = self.base_model.fc.in_features
            self.base_model.fc = nn.Identity()
            
        elif base_model == 'mobilenet_v2':
            from torchvision.models import mobilenet_v2
            self.base_model = mobilenet_v2(pretrained=pretrained)
            num_features = self.base_model.classifier[1].in_features
            self.base_model.classifier = nn.Identity()
        
        # Classifier personnalisé avec dropout pour la régularisation
        self.classifier = nn.Sequential(
            nn.Dropout(dropout_rate),
            nn.Linear(num_features, 512),
            nn.ReLU(),
            nn.Dropout(dropout_rate/2),
            nn.Linear(512, 128),
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )
        
    def forward(self, x):
        features = self.base_model(x)
        return self.classifier(features)
    
    def get_model_info(self):
        return {
            "base_model": self.base_model_name,
            "num_classes": self.num_classes,
            "dropout_rate": self.dropout_rate
        }

class LightweightPlumClassifier(nn.Module):
    """Version légère du classificateur de prunes."""
    def __init__(self, num_classes=6, pretrained=True):
        super(LightweightPlumClassifier, self).__init__()
        
        self.num_classes = num_classes
        
        # Utiliser MobileNetV2 qui est plus léger
        from torchvision.models import mobilenet_v2
        self.base_model = mobilenet_v2(pretrained=pretrained)
        num_features = self.base_model.classifier[1].in_features
        self.base_model.classifier = nn.Sequential(
            nn.Dropout(0.2),
            nn.Linear(num_features, num_classes)
        )
        
    def forward(self, x):
        return self.base_model(x)
    
    def get_model_info(self):
        return {
            "base_model": "mobilenet_v2_lightweight",
            "num_classes": self.num_classes,
            "dropout_rate": 0.2
        }

class TwoStageModel:
    """Modèle en deux étapes pour la détection et la classification des prunes."""
    def __init__(self, detection_model, classification_model, detection_threshold=0.7):
        self.detection_model = detection_model
        self.classification_model = classification_model
        self.detection_threshold = detection_threshold
        
    def predict(self, image_tensor, device):
        """Prédit si l'image contient une prune et, si oui, son état."""
        # Déplacer l'image sur le device
        image_tensor = image_tensor.to(device)
        
        # Étape 1 : Détection de prune
        self.detection_model.eval()
        with torch.no_grad():
            detection_outputs = self.detection_model(image_tensor)
            detection_probs = torch.nn.functional.softmax(detection_outputs, dim=1)[0]
            
            # Classe 0 = prune, Classe 1 = non-prune
            is_plum = detection_probs[0] > self.detection_threshold
            
        # Si ce n'est pas une prune, retourner le résultat
        if not is_plum:
            return False, "non_plum", detection_probs.cpu().numpy()
        
        # Étape 2 : Classification de l'état de la prune
        self.classification_model.eval()
        with torch.no_grad():
            classification_outputs = self.classification_model(image_tensor)
            classification_probs = torch.nn.functional.softmax(classification_outputs, dim=1)[0]
            _, predicted_idx = torch.max(classification_outputs, 1)
            
        return True, predicted_idx.item(), classification_probs.cpu().numpy()
    
    def get_model_info(self):
        detection_info = self.detection_model.get_model_info()
        classification_info = self.classification_model.get_model_info()
        
        return {
            "detection_model": detection_info,
            "classification_model": classification_info,
            "detection_threshold": self.detection_threshold
        }

def get_model(model_name='standard', num_classes=6, base_model='resnet18', pretrained=True):
    """Factory function pour créer un modèle."""
    if model_name == 'standard':
        return PlumClassifier(num_classes=num_classes, base_model=base_model, pretrained=pretrained)
    elif model_name == 'lightweight':
        return LightweightPlumClassifier(num_classes=num_classes, pretrained=pretrained)
    else:
        raise ValueError(f"Modèle '{model_name}' non supporté")

def get_two_stage_model(detection_model_name='lightweight', classification_model_name='standard',
                       detection_base_model='mobilenet_v2', classification_base_model='resnet18',
                       num_detection_classes=2, num_classification_classes=6,
                       pretrained=True, detection_threshold=0.7):
    """Factory function pour créer un modèle en deux étapes."""
    detection_model = get_model(
        model_name=detection_model_name,
        num_classes=num_detection_classes,
        base_model=detection_base_model,
        pretrained=pretrained
    )
    
    classification_model = get_model(
        model_name=classification_model_name,
        num_classes=num_classification_classes,
        base_model=classification_base_model,
        pretrained=pretrained
    )
    
    return TwoStageModel(detection_model, classification_model, detection_threshold)

## 7. Fonctions d'entraînement et d'évaluation

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, 
                device, num_epochs=25, early_stopping_patience=7, save_dir='models', model_name="model"):
    """Entraîne le modèle et sauvegarde le meilleur modèle."""
    # Créer le répertoire de sauvegarde s'il n'existe pas
    os.makedirs(save_dir, exist_ok=True)
    
    # Initialiser les variables
    best_val_loss = float('inf')
    best_val_acc = 0.0
    early_stopping_counter = 0
    
    # Historique pour tracer les courbes
    history = {
        'train_loss': [],
        'val_loss': [],
        'train_acc': [],
        'val_acc': [],
        'lr': []
    }
    
    # Heure de début
    start_time = time.time()
    
    # Boucle d'entraînement
    for epoch in range(num_epochs):
        print(f"Époque {epoch+1}/{num_epochs}")
        print('-' * 10)
        
        # Mode entraînement
        model.train()
        running_loss = 0.0
        running_corrects = 0
        
        # Boucle sur les batches d'entraînement
        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # Réinitialiser les gradients
            optimizer.zero_grad()
            
            # Forward pass
            with torch.set_grad_enabled(True):
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
                
                # Backward pass et optimisation
                loss.backward()
                optimizer.step()
            
            # Statistiques
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        
        # Calculer les métriques d'entraînement
        epoch_train_loss = running_loss / len(train_loader.dataset)
        epoch_train_acc = running_corrects.double() / len(train_loader.dataset)
        
        # Mode évaluation
        model.eval()
        running_loss = 0.0
        running_corrects = 0
        
        # Boucle sur les batches de validation
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            # Forward pass
            with torch.no_grad():
                outputs = model(inputs)
                _, preds = torch.max(outputs, 1)
                loss = criterion(outputs, labels)
            
            # Statistiques
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
        
        # Calculer les métriques de validation
        epoch_val_loss = running_loss / len(val_loader.dataset)
        epoch_val_acc = running_corrects.double() / len(val_loader.dataset)
        
        # Ajuster le learning rate
        current_lr = optimizer.param_groups[0]['lr']
        scheduler.step(epoch_val_loss)
        
        # Afficher les métriques
        print(f"Train Loss: {epoch_train_loss:.4f} Acc: {epoch_train_acc:.4f}")
        print(f"Val Loss: {epoch_val_loss:.4f} Acc: {epoch_val_acc:.4f}")
        print(f"Learning Rate: {current_lr:.6f}")
        
        # Mettre à jour l'historique
        history['train_loss'].append(epoch_train_loss)
        history['val_loss'].append(epoch_val_loss)
        history['train_acc'].append(epoch_train_acc.item())
        history['val_acc'].append(epoch_val_acc.item())
        history['lr'].append(current_lr)
        
        # Sauvegarder le meilleur modèle selon la perte de validation
        if epoch_val_loss < best_val_loss:
            print(f"Amélioration de la perte de validation de {best_val_loss:.4f} à {epoch_val_loss:.4f}. Sauvegarde du modèle...")
            best_val_loss = epoch_val_loss
            torch.save(model.state_dict(), os.path.join(save_dir, f'{model_name}_best_loss.pth'))
            early_stopping_counter = 0
        else:
            early_stopping_counter += 1
        
        # Sauvegarder le meilleur modèle selon l'accuracy de validation
        if epoch_val_acc > best_val_acc:
            print(f"Amélioration de l'accuracy de validation de {best_val_acc:.4f} à {epoch_val_acc:.4f}. Sauvegarde du modèle...")
            best_val_acc = epoch_val_acc
            torch.save(model.state_dict(), os.path.join(save_dir, f'{model_name}_best_acc.pth'))
        
        # Early stopping
        if early_stopping_counter >= early_stopping_patience:
            print(f"Early stopping après {early_stopping_counter} époques sans amélioration")
            break
        
        print()
    
    # Temps total d'entraînement
    time_elapsed = time.time() - start_time
    print(f"Entraînement terminé en {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s")
    print(f"Meilleure perte de validation: {best_val_loss:.4f}")
    print(f"Meilleure accuracy de validation: {best_val_acc:.4f}")
    
    # Sauvegarder le dernier modèle
    torch.save(model.state_dict(), os.path.join(save_dir, f'{model_name}_last.pth'))
    
    # Sauvegarder l'historique
    with open(os.path.join(save_dir, f'{model_name}_history.json'), 'w') as f:
        json.dump(history, f)
    
    return history

def evaluate_model(model, test_loader, criterion, device, class_names, save_dir='models', model_name="model"):
    """Évalue le modèle sur l'ensemble de test et génère des visualisations."""
    # Créer le répertoire de sauvegarde s'il n'existe pas
    os.makedirs(save_dir, exist_ok=True)
    
    # Mode évaluation
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    
    # Pour la matrice de confusion
    all_preds = []
    all_labels = []
    
    # Boucle sur les batches de test
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        # Forward pass
        with torch.no_grad():
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            loss = criterion(outputs, labels)
        
        # Statistiques
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        
        # Collecter les prédictions et les labels pour la matrice de confusion
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())
    
    # Calculer les métriques
    test_loss = running_loss / len(test_loader.dataset)
    test_acc = running_corrects.double() / len(test_loader.dataset)
    
    print(f"Test Loss: {test_loss:.4f} Acc: {test_acc:.4f}")
    
    # Créer la matrice de confusion
    cm = confusion_matrix(all_labels, all_preds)
    
    # Visualiser la matrice de confusion
    plt.figure(figsize=(10, 8))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=class_names, yticklabels=class_names)
    plt.xlabel('Prédiction')
    plt.ylabel('Vérité')
    plt.title('Matrice de confusion')
    plt.savefig(os.path.join(save_dir, f'{model_name}_confusion_matrix.png'))
    plt.show()
    
    # Générer le rapport de classification
    report = classification_report(all_labels, all_preds, target_names=class_names, output_dict=True)
    
    # Sauvegarder le rapport
    with open(os.path.join(save_dir, f'{model_name}_classification_report.json'), 'w') as f:
        json.dump(report, f, indent=4)
    
    # Visualiser les métriques par classe
    plt.figure(figsize=(12, 6))
    
    # Extraire les métriques par classe
    classes = list(report.keys())[:-3]  # Exclure 'accuracy', 'macro avg', 'weighted avg'
    precision = [report[cls]['precision'] for cls in classes]
    recall = [report[cls]['recall'] for cls in classes]
    f1 = [report[cls]['f1-score'] for cls in classes]
    
    # Créer le graphique
    x = np.arange(len(classes))
    width = 0.25
    
    plt.bar(x - width, precision, width, label='Precision')
    plt.bar(x, recall, width, label='Recall')
    plt.bar(x + width, f1, width, label='F1-score')
    
    plt.xlabel('Classe')
    plt.ylabel('Score')
    plt.title('Métriques par classe')
    plt.xticks(x, classes, rotation=45)
    plt.legend()
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, f'{model_name}_metrics_by_class.png'))
    plt.show()
    
    return {
        'test_loss': test_loss,
        'test_acc': test_acc.item(),
        'confusion_matrix': cm.tolist(),
        'classification_report': report
    }

def plot_training_history(history, save_dir='models', model_name="model"):
    """Trace les courbes d'entraînement."""
    # Créer le répertoire de sauvegarde s'il n'existe pas
    os.makedirs(save_dir, exist_ok=True)
    
    # Tracer les courbes de perte
    plt.figure(figsize=(12, 4))
    
    # Graphique des pertes
    plt.subplot(1, 2, 1)
    plt.plot(history['train_loss'], label='Train Loss')
    plt.plot(history['val_loss'], label='Validation Loss')
    plt.xlabel('Époque')
    plt.ylabel('Perte')
    plt.legend()
    plt.title('Évolution des pertes')
    
    # Graphique de l'accuracy
    plt.subplot(1, 2, 2)
    plt.plot(history['train_acc'], label='Train Accuracy')
    plt.plot(history['val_acc'], label='Validation Accuracy')
    plt.xlabel('Époque')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.title('Évolution de l\'accuracy')
    
    plt.tight_layout()
    plt.savefig(os.path.join(save_dir, f'{model_name}_training_curves.png'))
    plt.show()
    
    # Tracer l'évolution du learning rate
    plt.figure(figsize=(10, 4))
    plt.plot(history['lr'])
    plt.xlabel('Époque')
    plt.ylabel('Learning Rate')
    plt.title('Évolution du Learning Rate')
    plt.yscale('log')
    plt.savefig(os.path.join(save_dir, f'{model_name}_learning_rate.png'))
    plt.show()

## 8. Entraînement du modèle de détection

In [None]:
# Étape 1: Entraîner le modèle de détection
print("\n=== Entraînement du modèle de détection ===\n")

# Créer le modèle de détection
detection_model = get_model(
    model_name='lightweight', 
    num_classes=len(detection_class_names), 
    base_model='mobilenet_v2', 
    pretrained=True
)

# Déplacer le modèle sur le device
detection_model = detection_model.to(device)

# Définir la fonction de perte et l'optimiseur
detection_criterion = nn.CrossEntropyLoss()
detection_optimizer = optim.Adam(detection_model.parameters(), lr=0.001)

# Scheduler pour ajuster le learning rate
detection_scheduler = ReduceLROnPlateau(detection_optimizer, mode='min', factor=0.1, patience=3, verbose=True)

# Entraîner le modèle de détection
detection_history = train_model(
    detection_model, 
    detection_train_loader, 
    detection_val_loader, 
    detection_criterion, 
    detection_optimizer, 
    detection_scheduler, 
    device, 
    num_epochs=15,  # Réduire pour le notebook
    save_dir=models_dir,
    model_name="detection"
)

# Tracer les courbes d'entraînement
plot_training_history(detection_history, save_dir=models_dir, model_name="detection")

# Charger le meilleur modèle (selon l'accuracy)
detection_model.load_state_dict(torch.load(os.path.join(models_dir, 'detection_best_acc.pth')))

# Évaluer le modèle de détection
detection_metrics = evaluate_model(
    detection_model, 
    detection_test_loader, 
    detection_criterion, 
    device, 
    detection_class_names, 
    save_dir=models_dir,
    model_name="detection"
)

## 9. Entraînement du modèle de classification

In [None]:
# Étape 2: Entraîner le modèle de classification
print("\n=== Entraînement du modèle de classification ===\n")

# Créer le modèle de classification
classification_model = get_model(
    model_name='standard', 
    num_classes=len(classification_class_names), 
    base_model='resnet18', 
    pretrained=True
)

# Déplacer le modèle sur le device
classification_model = classification_model.to(device)

# Définir la fonction de perte et l'optimiseur
classification_criterion = nn.CrossEntropyLoss()
classification_optimizer = optim.Adam(classification_model.parameters(), lr=0.001)

# Scheduler pour ajuster le learning rate
classification_scheduler = ReduceLROnPlateau(classification_optimizer, mode='min', factor=0.1, patience=3, verbose=True)

# Entraîner le modèle de classification
classification_history = train_model(
    classification_model, 
    classification_train_loader, 
    classification_val_loader, 
    classification_criterion, 
    classification_optimizer, 
    classification_scheduler, 
    device, 
    num_epochs=15,  # Réduire pour le notebook
    save_dir=models_dir,
    model_name="classification"
)

# Tracer les courbes d'entraînement
plot_training_history(classification_history, save_dir=models_dir, model_name="classification")

# Charger le meilleur modèle (selon l'accuracy)
classification_model.load_state_dict(torch.load(os.path.join(models_dir, 'classification_best_acc.pth')))

# Évaluer le modèle de classification
classification_metrics = evaluate_model(
    classification_model, 
    classification_test_loader, 
    classification_criterion, 
    device, 
    classification_class_names, 
    save_dir=models_dir,
    model_name="classification"
)

## 10. Création du modèle en deux étapes et sauvegarde des informations

In [None]:
# Créer le modèle en deux étapes
two_stage_model = get_two_stage_model(
    detection_model_name='lightweight',
    classification_model_name='standard',
    detection_base_model='mobilenet_v2',
    classification_base_model='resnet18',
    num_detection_classes=len(detection_class_names),
    num_classification_classes=len(classification_class_names),
    pretrained=False,  # Nous utilisons nos propres poids entraînés
    detection_threshold=0.7
)

# Charger les poids entraînés
two_stage_model.detection_model.load_state_dict(torch.load(os.path.join(models_dir, 'detection_best_acc.pth')))
two_stage_model.classification_model.load_state_dict(torch.load(os.path.join(models_dir, 'classification_best_acc.pth')))

# Sauvegarder les informations du modèle
model_info = {
    "detection_class_names": detection_class_names,
    "classification_class_names": classification_class_names,
    "img_size": 224,
    "detection_metrics": detection_metrics,
    "classification_metrics": classification_metrics,
    "detection_model_info": two_stage_model.detection_model.get_model_info(),
    "classification_model_info": two_stage_model.classification_model.get_model_info(),
    "detection_threshold": two_stage_model.detection_threshold
}

with open(os.path.join(models_dir, 'two_stage_model_info.json'), 'w') as f:
    json.dump(model_info, f, indent=4)

print("\n=== Entraînement terminé ===\n")
print(f"Modèle en deux étapes entraîné et sauvegardé dans {models_dir}")
print(f"Métriques de détection: Accuracy={detection_metrics['test_acc']:.4f}")
print(f"Métriques de classification: Accuracy={classification_metrics['test_acc']:.4f}")

## 11. Test du modèle en deux étapes avec des images

In [None]:
# Fonction pour prétraiter une image
def preprocess_single_image(image_path, transform=None):
    if transform is None:
        transform = get_val_transforms()
    
    if isinstance(image_path, str):
        image = Image.open(image_path).convert('RGB')
    else:
        image = image_path.convert('RGB')
        
    return transform(image).unsqueeze(0)  # Ajouter une dimension de batch

# Fonction pour prédire une image
def predict_image(model, image_path, model_info, device, transform=None):
    # Prétraiter l'image
    if transform is None:
        img_size = model_info.get('img_size', 224)
        transform = get_val_transforms(img_size)
    
    image_tensor = preprocess_single_image(image_path, transform)
    
    # Prédiction
    is_plum, predicted_idx, probs = model.predict(image_tensor, device)
    
    if is_plum:
        # C'est une prune, retourner la classe prédite
        predicted_class = model_info['classification_class_names'][predicted_idx]
        return True, predicted_class, None, probs
    else:
        # Ce n'est pas une prune
        return False, "non_plum", probs, None

# Fonction pour visualiser la prédiction
def visualize_prediction(image_path, is_plum, predicted_class, detection_probs=None, classification_probs=None, model_info=None):
    # Charger l'image
    if isinstance(image_path, str):
        image = Image.open(image_path).convert('RGB')
    else:
        image = image_path.convert('RGB')
    
    # Créer la figure
    plt.figure(figsize=(12, 6))
    
    # Afficher l'image
    plt.subplot(1, 2, 1)
    plt.imshow(image)
    if is_plum:
        plt.title(f"Prédiction : Prune - {predicted_class}")
    else:
        plt.title("Prédiction : Pas une prune")
    plt.axis('off')
    
    # Afficher les probabilités
    plt.subplot(1, 2, 2)
    
    if is_plum and classification_probs is not None and model_info is not None:
        # Afficher les probabilités de classification
        class_names = model_info['classification_class_names']
        y_pos = np.arange(len(class_names))
        plt.barh(y_pos, classification_probs, align='center')
        plt.yticks(y_pos, class_names)
        plt.xlabel('Probabilité')
        plt.title('Probabilités par classe')
    elif not is_plum and detection_probs is not None and model_info is not None:
        # Afficher les probabilités de détection
        class_names = model_info['detection_class_names']
        y_pos = np.arange(len(class_names))
        plt.barh(y_pos, detection_probs, align='center')
        plt.yticks(y_pos, class_names)
        plt.xlabel('Probabilité')
        plt.title('Probabilités de détection')
    
    plt.tight_layout()
    plt.show()
    
    # Afficher les probabilités en pourcentage
    print("\nProbabilités détaillées :")
    if is_plum and classification_probs is not None and model_info is not None:
        for i, (cls, prob) in enumerate(zip(model_info['classification_class_names'], classification_probs)):
            print(f"{cls}: {prob*100:.2f}%")
    elif not is_plum and detection_probs is not None and model_info is not None:
        for i, (cls, prob) in enumerate(zip(model_info['detection_class_names'], detection_probs)):
            print(f"{cls}: {prob*100:.2f}%")

In [None]:
# Tester le modèle avec des images de test
print("\n=== Test du modèle en deux étapes ===\n")

# Sélectionner quelques images de test
test_images = []

# Ajouter des images de prunes
for class_name in classification_class_names:
    class_dir = os.path.join(f'{data_dir}/african_plums_dataset', class_name)
    if os.path.isdir(class_dir):
        images = [f for f in os.listdir(class_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
        if images:
            test_images.append(os.path.join(class_dir, images[0]))

# Ajouter des images qui ne sont pas des prunes
non_plum_dir = os.path.join(f'{data_dir}/non_plum_images/non_plum')
if os.path.isdir(non_plum_dir):
    images = [f for f in os.listdir(non_plum_dir) if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.gif'))]
    if images:
        test_images.append(os.path.join(non_plum_dir, images[0]))

# Tester chaque image
for image_path in test_images:
    print(f"\nTest de l'image: {os.path.basename(image_path)}")
    
    is_plum, predicted_class, detection_probs, classification_probs = predict_image(
        two_stage_model,
        image_path,
        model_info,
        device
    )
    
    if is_plum:
        print(f"L'image contient une prune de type: {predicted_class}")
    else:
        print("L'image ne contient pas de prune.")
    
    visualize_prediction(
        image_path,
        is_plum,
        predicted_class,
        detection_probs,
        classification_probs,
        model_info
    )

## 12. Téléchargement et test d'images personnalisées

In [None]:
# Fonction pour télécharger et tester une image
def upload_and_predict():
    print("Veuillez télécharger une image :")
    uploaded = files.upload()
    
    for filename in uploaded.keys():
        # Charger l'image
        image = Image.open(io.BytesIO(uploaded[filename]))
        
        # Faire la prédiction
        is_plum, predicted_class, detection_probs, classification_probs = predict_image(
            two_stage_model,
            image,
            model_info,
            device
        )
        
        # Afficher les résultats
        if is_plum:
            print(f"L'image contient une prune de type: {predicted_class}")
        else:
            print("L'image ne contient pas de prune.")
        
        # Visualiser la prédiction
        visualize_prediction(
            image,
            is_plum,
            predicted_class,
            detection_probs,
            classification_probs,
            model_info
        )

# Exécuter la fonction pour télécharger et tester une image
upload_and_predict()

## 13. Résumé et prochaines étapes

Félicitations ! Vous avez maintenant un modèle en deux étapes pour la classification des prunes africaines. Ce modèle peut :
1. Détecter si une image contient une prune ou non
2. Classifier l'état de la prune si elle est détectée

### Ce que vous avez accompli :
- Préparé les données pour les deux étapes
- Défini l'architecture du modèle en deux étapes
- Entraîné les modèles de détection et de classification
- Évalué les performances des modèles
- Testé le modèle complet avec des images

### Prochaines étapes possibles :
- Améliorer les performances en ajustant les hyperparamètres
- Augmenter le dataset avec plus d'images
- Déployer le modèle dans une application Django
- Optimiser le modèle pour des appareils mobiles

N'hésitez pas à adapter ce notebook à vos besoins spécifiques pour le hackathon JCIA 2025 !