In [None]:
import os
import kagglehub
import shutil
from collections import defaultdict
from pathlib import Path
import cv2
import numpy as np
from keras.utils import img_to_array, to_categorical
from sklearn.preprocessing import LabelBinarizer
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Tuple, List

# ====================================================================
#                      CONFIGURATIONS ET CONSTANTES
# ====================================================================
EPOCHS             = 25
INIT_LR            = 1e-3
BS                 = 16
WIDTH              = 256
HEIGHT             = 256
DEFAULT_IMAGE_SIZE = tuple((WIDTH, HEIGHT))
DEPTH              = 3
DATASET_REF        = "emmarex/plantdisease"
WORKING_DIR_NAME   = "plantdisease_working" # R√©pertoire modifiable pour la copie
ALLOWED_EXTS       = {".jpg", ".jpeg", ".png", ".bmp", ".JPG", ".JPEG", ".PNG", ".BMP"}
MAX_PER_CLASS      = 100000  # Limite le nombre d'images par classe

# ====================================================================
#                           FONCTIONS UTILITAIRES
# ====================================================================


def _find_class_dirs(root: Path) -> List[Path]:
    """Trouve tous les dossiers qui contiennent des images et les traite comme des classes."""
    class_dirs = []
    for p in root.iterdir():
        if p.is_dir() and any(f.is_file() and f.suffix.lower() in ALLOWED_EXTS for f in p.iterdir()):
            class_dirs.append(p)
    return sorted(class_dirs, key=lambda x: x.name)

def consolidate_dataset(read_only_path: str) -> str:
    """
    Copie le dataset vers un r√©pertoire modifiable et supprime les chemins dupliqu√©s.
    Ceci est essentiel pour r√©soudre l'erreur "Read-only file system".
    """
    print(f"\n## üîÑ 2. Copie vers R√©pertoire Modifiable et Nettoyage")

    # D√©finition et cr√©ation du chemin de travail (modifiable)
    working_base_dir = os.path.join(os.getcwd(), WORKING_DIR_NAME)
    if os.path.exists(working_base_dir):
        shutil.rmtree(working_base_dir)

    # Copie l'int√©gralit√© du dataset
    print(f"   -> Copie de {read_only_path} vers {working_base_dir}...")
    shutil.copytree(read_only_path, working_base_dir)
    base_dir = working_base_dir

    # Nettoyage des doublons de chemin (ex: plantdisease_working/plantvillage/PlantVillage)
    target_root_name = "PlantVillage"
    target_root = os.path.join(base_dir, target_root_name)
    duplicated_root = os.path.join(base_dir, "plantvillage", target_root_name)

    if os.path.exists(duplicated_root):
        print(f"üí° Dossiers dupliqu√©s trouv√©s. Consolidation en cours...")
        try:
            for class_name in os.listdir(duplicated_root):
                source_folder = os.path.join(duplicated_root, class_name)
                target_folder = os.path.join(target_root, class_name)

                if os.path.isdir(source_folder):
                    # Assurer que le dossier cible existe
                    if not os.path.exists(target_folder):
                        os.makedirs(target_folder)

                    # D√©placer les fichiers (shutil.move est maintenant possible)
                    for item_name in os.listdir(source_folder):
                        source_item = os.path.join(source_folder, item_name)
                        target_item = os.path.join(target_folder, item_name)

                        if os.path.isfile(source_item) and item_name.endswith(tuple(ALLOWED_EXTS)):
                             shutil.move(source_item, target_item)

            # Supprimer la structure de dossiers vide et dupliqu√©e
            shutil.rmtree(os.path.join(base_dir, "plantvillage"))
            print("‚úÖ Consolidation termin√©e. ")

        except Exception as e:
            print(f"‚ùå Erreur lors du d√©placement des fichiers : {e}")

    return os.path.join(base_dir, 'PlantVillage') # Retourne le chemin du r√©pertoire des classes


def plot_class_distribution_global(class_counts: dict):
    """Affiche la distribution des images par classe avant le split."""
    if not class_counts: return
    labels = list(class_counts.keys())
    counts = list(class_counts.values())

    plt.style.use('ggplot')
    plt.figure(figsize=(14, 8))
    ax = sns.barplot(x=labels, y=counts, palette="viridis")
    ax.set_title(f"Distribution des images par classe (Limit√©e √† {MAX_PER_CLASS})", fontsize=16)
    ax.set_xlabel("Classe", fontsize=12)
    ax.set_ylabel("Nombre d'images", fontsize=12)
    plt.xticks(rotation=45, ha='right')
    for index, value in enumerate(counts):
        plt.text(index, value + (max(counts)*0.01), str(value), ha='center')
    plt.tight_layout()
    plt.show()

def print_class_distribution(labels: np.ndarray, name: str = "Dataset") -> Tuple[np.ndarray, np.ndarray]:
    """Affiche la distribution des classes en pourcentages."""
    unique, counts = np.unique(labels, return_counts=True)
    print(f"\nClass distribution in '{name}':")
    for cls, count in zip(unique, counts):
        print(f"  {cls} : {count} ({count / len(labels) * 100:.2f}%)")
    return unique, counts

# ====================================================================
#                           LOGIQUE PRINCIPALE
# ====================================================================

# --- √âTAPE 1: T√©l√©chargement et Nettoyage du Dataset ---

print(f"## ‚¨áÔ∏è 1. T√©l√©chargement du Dataset ({DATASET_REF})")
try:
    read_only_path = kagglehub.dataset_download(DATASET_REF)
    print(f"‚úÖ Dataset trouv√© (Lecture Seule) √† : {read_only_path}")

    # Nettoyage et copie vers le r√©pertoire modifiable
    # DIRECTORY_ROOT sera le chemin du dossier PlantVillage dans le r√©pertoire de travail
    DIRECTORY_ROOT = consolidate_dataset(read_only_path)
except Exception as e:
    print(f"‚ùå Erreur critique : {e}")
    DIRECTORY_ROOT = None




In [None]:
import os
from pathlib import Path
import numpy as np
import pandas as pd
import cv2
from typing import Tuple, List
import pickle

# Imports Keras/TensorFlow corrig√©s (n√©cessaires pour to_categorical)
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.utils import Sequence # N√©cessaire pour la classe DataGenerator (m√™me si non utilis√©e dans ce bloc)

# Imports Sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelBinarizer

# Imports Visualisation
import matplotlib.pyplot as plt
import seaborn as sns


# ====================================================================
#                      CONFIGURATIONS ET CONSTANTES
# ====================================================================
BS                 = 16
WIDTH              = 256
HEIGHT             = 256
DEFAULT_IMAGE_SIZE = (WIDTH, HEIGHT)

# **CHEMIN D'ACC√àS CL√â : V√âRIFIEZ ET CORRIGEZ CE CHEMIN**
DIRECTORY_ROOT     = '/content/plantdisease_working/PlantVillage'

ALLOWED_EXTS       = {".jpg", ".jpeg", ".png", ".bmp"}
MAX_PER_CLASS      = 100000
SAVE_DIR           = '/content/saved_artefacts'


# ====================================================================
#                           FONCTIONS UTILITAIRES
# ====================================================================

def _find_class_dirs(root: Path) -> List[Path]:
    """Trouve tous les dossiers qui contiennent des images et les traite comme des classes."""
    class_dirs = []
    for p in root.iterdir():
        if p.is_dir() and any(f.is_file() and f.suffix.lower() in ALLOWED_EXTS for f in p.iterdir()):
            class_dirs.append(p)
    return sorted(class_dirs, key=lambda x: x.name)

def plot_class_distribution_global(class_counts: dict):
    if not class_counts: return
    labels = list(class_counts.keys())
    counts = list(class_counts.values())
    plt.style.use('ggplot')
    plt.figure(figsize=(14, 8))
    ax = sns.barplot(x=labels, y=counts, palette="viridis")
    ax.set_title(f"Distribution des images par classe (Limit√©e √† {MAX_PER_CLASS})", fontsize=16)
    ax.set_xlabel("Classe", fontsize=12)
    ax.set_ylabel("Nombre d'images", fontsize=12)
    plt.xticks(rotation=45, ha='right')
    for index, value in enumerate(counts):
        plt.text(index, value + (max(counts)*0.01), str(value), ha='center')
    plt.tight_layout()
    plt.show()

def print_class_distribution(labels: np.ndarray, name: str = "Dataset") -> Tuple[np.ndarray, np.ndarray]:
    unique, counts = np.unique(labels, return_counts=True)
    print(f"\nClass distribution in '{name}':")
    for cls, count in zip(unique, counts):
        print(f"  {cls} : {count} ({count / len(labels) * 100:.2f}%)")
    return unique, counts

def plot_class_distribution_split(global_labels: List[str], train_labels: List[str], test_labels: List[str]):
    unique_classes = sorted(list(set(global_labels)))
    def get_counts(labels):
        counts = {cls: 0 for cls in unique_classes}
        for label in labels:
            if label in counts: counts[label] += 1
        return np.array(list(counts.values()))

    global_counts = get_counts(global_labels)
    train_counts = get_counts(train_labels)
    test_counts = get_counts(test_labels)

    fig, ax = plt.subplots(figsize=(14, 7))
    global_props = global_counts / global_counts.sum()
    train_props = train_counts / train_counts.sum()
    test_props = test_counts / test_counts.sum()
    x = np.arange(len(unique_classes))
    width = 0.25

    ax.bar(x - width, global_props, width, label='Global (proportion)', alpha=0.8, color='grey')
    ax.bar(x, train_props, width, label='Train (proportion)', alpha=0.8, color='green')
    ax.bar(x + width, test_props, width, label='Test (proportion)', alpha=0.8, color='red')

    ax.set_xlabel("Classes")
    ax.set_ylabel("Proportion")
    ax.set_title("Proportions des classes (Global vs Train vs Test) ")
    ax.set_xticks(x)
    ax.set_xticklabels(unique_classes, rotation=45, ha='right')
    ax.legend()
    plt.tight_layout()
    plt.show()

# ====================================================================
#                           LOGIQUE PRINCIPALE
# ====================================================================

# --- √âTAPE 1: Collecte des Chemins ---
all_paths, all_labels = [], []
per_class_counts = {}

print(f"## 1. üìÇ Collecte des Chemins de Fichiers (Faible RAM)")

if not os.path.exists(DIRECTORY_ROOT):
    print(f"‚ùå ERREUR: Le chemin sp√©cifi√© ({DIRECTORY_ROOT}) n'existe pas. Arr√™t.")
    exit()
else:
    root = Path(DIRECTORY_ROOT)
    class_dirs = _find_class_dirs(root)

    if not class_dirs:
        print(f"‚ùå ERREUR: Aucun dossier de classe trouv√© dans {root}. Arr√™t.")
        exit()

    for cdir in class_dirs:
        class_name = cdir.name
        file_paths = []
        for ext in ALLOWED_EXTS:
            file_paths.extend(cdir.glob(f"*{ext}"))
            file_paths.extend(cdir.glob(f"*{ext.upper()}"))

        files_to_process = sorted(list(set(file_paths)))[:MAX_PER_CLASS]
        per_class_counts[class_name] = len(files_to_process)

        for fp in files_to_process:
            all_paths.append(str(fp))
            all_labels.append(class_name)

    print(f"[INFO] Total chemins collect√©s : {len(all_paths)}")

    # Cr√©er le DataFrame (cl√© pour la stratification)
    df = pd.DataFrame({'path': all_paths, 'label': all_labels})

    # Visualisation de la distribution globale
    print("\n## üìà 2. Visualisation de la Distribution Globale")
    plot_class_distribution_global(per_class_counts)

# --------------------------------------------------------------------
# --- √âTAPE 2: Division Stratifi√©e des CHEMINS et Encodage des Labels ---
# --------------------------------------------------------------------

print("\n## 3. ‚úÇÔ∏è Division Stratifi√©e des Chemins et Encodage des Labels")

# 1. Pr√©paration de l'encodeur de labels
label_binarizer = LabelBinarizer()
labels_encoded_all = label_binarizer.fit_transform(df['label'])
N_CLASSES = len(label_binarizer.classes_)
CLASS_NAMES = label_binarizer.classes_

# 2. Division Stratifi√©e des lignes du DataFrame
train_df, test_df = train_test_split(
    df,
    test_size=0.20,
    random_state=42,
    stratify=df['label'] # Stratifie sur les labels string
)

# 3. Encoder les labels pour les g√©n√©rateurs ou le chargement futur
train_labels_encoded = label_binarizer.transform(train_df['label'])
test_labels_encoded = label_binarizer.transform(test_df['label'])

# 4. Conversion en One-Hot Encoding si multiclasses (√† stocker si n√©cessaire pour un futur chargement)
if N_CLASSES > 2:
    train_labels_encoded = to_categorical(train_labels_encoded)
    test_labels_encoded = to_categorical(test_labels_encoded)


print(f"[INFO] Train samples : {len(train_df)}")
print(f"[INFO] Test samples  : {len(test_df)}")
print(f"[INFO] Nombre de classes : {N_CLASSES}")

# V√©rification de la stratification et trac√©
print_class_distribution(np.array(all_labels), name="Total Dataset")
plot_class_distribution_split(all_labels, train_df['label'].tolist(), test_df['label'].tolist())

# --------------------------------------------------------------------
# --- √âTAPE 3: Sauvegarde des Artefacts (DataFrames et LabelBinarizer) ---
# --------------------------------------------------------------------

os.makedirs(SAVE_DIR, exist_ok=True)
print(f"\n## 4. üíæ Sauvegarde des Artefacts de Donn√©es")

# 1. Sauvegarde des DataFrames (chemins et labels non encod√©s)
# Cela permet de recr√©er les g√©n√©rateurs/jeux de donn√©es exactement plus tard.
train_df.to_csv(os.path.join(SAVE_DIR, 'train_df.csv'), index=False)
test_df.to_csv(os.path.join(SAVE_DIR, 'test_df.csv'), index=False)
print("‚úÖ DataFrames (chemins/labels) sauvegard√©s.")

# 2. Sauvegarde du LabelBinarizer
BINARIZER_PATH = os.path.join(SAVE_DIR, 'label_binarizer.pkl')
try:
    with open(BINARIZER_PATH, 'wb') as f:
        pickle.dump(label_binarizer, f)
    print(f"‚úÖ LabelBinarizer sauvegard√© avec succ√®s √† : {BINARIZER_PATH}")
except Exception as e:
    print(f"‚ùå Erreur lors de la sauvegarde du LabelBinarizer : {e}")

# 3. Sauvegarde des Noms des Classes
CLASSES_PATH = os.path.join(SAVE_DIR, 'class_names.pkl')
try:
    with open(CLASSES_PATH, 'wb') as f:
        pickle.dump(CLASS_NAMES, f)
    print(f"‚úÖ Noms des classes sauvegard√©s avec succ√®s √† : {CLASSES_PATH}")
except Exception as e:
    print(f"‚ùå Erreur lors de la sauvegarde des noms des classes : {e}")

print("\n[FIN] Pipeline de pr√©paration des donn√©es termin√© et artefacts sauvegard√©s.")

In [None]:
import os
import pickle
import pandas as pd
import numpy as np
import cv2
import tensorflow as tf
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.utils import Sequence
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense
from tensorflow.keras.optimizers import Adam
from sklearn.preprocessing import LabelBinarizer


# ====================================================================
#                      CONSTANTES ET R√âPLICATION
# ====================================================================
BS                 = 64
WIDTH              = 256
HEIGHT             = 256
DEFAULT_IMAGE_SIZE = (WIDTH, HEIGHT)
INIT_LR            = 1e-3
EPOCHS             = 25
SAVE_DIR           = '/content/saved_artefacts'

# ====================================================================
#           CLASSE G√âN√âRATEUR (Gestion de la RAM)
# ====================================================================
class DataGenerator(Sequence):
    # ... (Votre classe DataGenerator reste inchang√©e) ...
    """G√©n√©rateur de donn√©es Keras pour charger les images par lots."""

    def __init__(self, df, labels_encoded, batch_size=BS, dim=DEFAULT_IMAGE_SIZE, n_channels=3,
                 n_classes=None, shuffle=True):
        self.df = df
        self.labels_encoded = np.array(labels_encoded)
        self.batch_size = batch_size
        self.dim = dim
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Nombre de lots par √©poque'
        return int(np.floor(len(self.df) / self.batch_size))

    def __getitem__(self, index):
        'G√©n√®re un lot de donn√©es'
        indices = self.indices[index*self.batch_size:(index+1)*self.batch_size]
        list_paths_temp = self.df['path'].iloc[indices].tolist()
        labels_temp = self.labels_encoded[indices]
        X, y = self.__data_generation(list_paths_temp, labels_temp)
        return X, y

    def on_epoch_end(self):
        'M√©lange les indices apr√®s chaque √©poque'
        self.indices = np.arange(len(self.df))
        if self.shuffle == True:
            np.random.shuffle(self.indices)

    def __data_generation(self, list_paths_temp, labels_temp):
        'Charge et pr√©traite les images d un lot'

        X = np.empty((self.batch_size, *self.dim, self.n_channels), dtype=np.float32)
        y = labels_temp

        for i, path in enumerate(list_paths_temp):
            image = cv2.imread(path)
            if image is not None:
                image = cv2.resize(image, self.dim)
                X[i,] = image.astype('float32') / 255.0

        return X, y


# ====================================================================
#                      CHARGEMENT DES ARTEFACTS
# ====================================================================

print("## 1. üîÑ Chargement des Artefacts de Donn√©es")
# ... (Chargement des DataFrames et du Binarizer - inchang√©)
try:
    loaded_train_df = pd.read_csv(os.path.join(SAVE_DIR, 'train_df.csv'))
    loaded_test_df = pd.read_csv(os.path.join(SAVE_DIR, 'test_df.csv'))
    print("‚úÖ DataFrames Train et Test charg√©s avec succ√®s.")
except FileNotFoundError:
    print(f"‚ùå ERREUR: Fichiers CSV non trouv√©s dans {SAVE_DIR}. Veuillez ex√©cuter le script de pr√©paration avant.")
    exit()

BINARIZER_PATH = os.path.join(SAVE_DIR, 'label_binarizer.pkl')
try:
    with open(BINARIZER_PATH, 'rb') as f:
        loaded_binarizer = pickle.load(f)
    N_CLASSES = len(loaded_binarizer.classes_)
    print(f"‚úÖ LabelBinarizer charg√©. {N_CLASSES} classes d√©tect√©es.")
except:
    print(f"‚ùå ERREUR: Le fichier label_binarizer.pkl n'a pas √©t√© trouv√©.")
    exit()


# ====================================================================
#                      PR√âPARATION POUR L'ENTRA√éNEMENT
# ====================================================================

print("\n## 2. Pr√©paration et Nettoyage des Labels")

# 1. Encoder les labels en utilisant le binariseur charg√©
train_labels_encoded = loaded_binarizer.transform(loaded_train_df['label'])
test_labels_encoded = loaded_binarizer.transform(loaded_test_df['label'])

# --- CORRECTION CLEF ICI ---
# Le LabelBinarizer produit des donn√©es de forme (N, 1) pour N_CLASSES=2, ou (N, N_CLASSES) pour N_CLASSES>2
# Si les labels ont √©t√© mal sauvegard√©s, ils peuvent avoir des dimensions superflues.
# On utilise .squeeze() pour enlever toute dimension de taille 1 (comme le 1 dans (N, 1))

# Nettoyage des labels (enl√®ve les dimensions 1 superflues)
train_labels_encoded = np.squeeze(train_labels_encoded)
test_labels_encoded = np.squeeze(test_labels_encoded)

print(f"[DEBUG] Forme de 'train_labels_encoded' apr√®s binarisation/nettoyage : {train_labels_encoded.shape}")

# 2. Appliquer le One-Hot Encoding pour garantir la forme [N, N_CLASSES]
# C'est essentiel pour 'categorical_crossentropy' et l'architecture CNN standard.
if N_CLASSES >= 2:
    # to_categorical prend l'array 1D d'indices ou l'array 2D (N, 1) et le convertit en [N, N_CLASSES]
    # Nous devons reconstruire les labels √† partir des indices pour to_categorical

    # Si les labels sont d√©j√† [N, N_CLASSES], ne rien faire.
    # Si les labels sont [N,], on utilise to_categorical.
    if train_labels_encoded.ndim == 1:
        # Re-transformer les labels string en indices entiers (0, 1, 2, ...)
        label_map = {name: i for i, name in enumerate(loaded_binarizer.classes_)}
        train_indices = loaded_train_df['label'].map(label_map).values
        test_indices = loaded_test_df['label'].map(label_map).values

        # Appliquer to_categorical sur les indices
        train_labels_encoded = to_categorical(train_indices, num_classes=N_CLASSES)
        test_labels_encoded = to_categorical(test_indices, num_classes=N_CLASSES)

    elif train_labels_encoded.ndim == 2 and train_labels_encoded.shape[1] == 1:
        # Cas binaire [N, 1], le convertir en [N, 2]
        train_labels_encoded = to_categorical(train_labels_encoded, num_classes=N_CLASSES)
        test_labels_encoded = to_categorical(test_labels_encoded, num_classes=N_CLASSES)


# V√âRIFICATION FINALE DE LA FORME
if train_labels_encoded.ndim != 2 or train_labels_encoded.shape[1] != N_CLASSES:
    print(f"‚ùå ERREUR CRITIQUE DE FORME: Les labels ont la forme {train_labels_encoded.shape} (Rang {train_labels_encoded.ndim}), mais le mod√®le attend le rang 2 avec {N_CLASSES} colonnes.")
    print("V√©rifiez l'√©tape de sauvegarde initiale.")
    exit()

print(f"[INFO] Forme finale des labels TRAIN : {train_labels_encoded.shape}")
print(f"[INFO] Forme finale des labels TEST : {test_labels_encoded.shape}")

# 3. Cr√©er les g√©n√©rateurs Keras
train_generator = DataGenerator(loaded_train_df, train_labels_encoded, batch_size=BS, n_classes=N_CLASSES, shuffle=True)
test_generator = DataGenerator(loaded_test_df, test_labels_encoded, batch_size=BS, n_classes=N_CLASSES, shuffle=False)
print(f"[INFO] Train samples : {len(loaded_train_df)}")
print(f"[INFO] Test samples : {len(loaded_test_df)}")


# ====================================================================
#                      D√âFINITION ET ENTRA√éNEMENT
# ====================================================================

print("\n## 3. D√©finition et Entra√Ænement du Mod√®le üß†")
# ... (D√©finition du mod√®le CNN, compilation et entra√Ænement - inchang√©)
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(WIDTH, HEIGHT, 3)),
    MaxPooling2D(2, 2),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D(2, 2),
    Flatten(),
    Dense(512, activation='relu'),
    Dense(N_CLASSES, activation='softmax')
])

model.compile(optimizer=Adam(learning_rate=INIT_LR),
              loss='categorical_crossentropy',
              metrics=['accuracy'])

model.summary()

print("\n[INFO] Lancement de l'entra√Ænement sur les g√©n√©rateurs...")

history = model.fit(
    train_generator,
    steps_per_epoch=len(train_generator),
    epochs=EPOCHS,
    validation_data=test_generator,
    validation_steps=len(test_generator)
)

print("\n[FIN] Entra√Ænement termin√© !")

# ====================================================================
#                      SAUVEGARDE DU MOD√àLE
# ====================================================================
MODEL_PATH = os.path.join(SAVE_DIR, 'plant_disease_model.keras')
model.save(MODEL_PATH)
print(f"‚úÖ Mod√®le Keras sauvegard√© avec succ√®s √† : {MODEL_PATH}")