In [None]:
# ==============================================================================
# CE NOTEBOOKE EST LA MISE AU PROPRE DU NOTEBOOK :
#
# 4.2-fh-modeling-advanced-image.ipynb
#
# ==============================================================================

**⚠️ Avertissement sur l'Exécution et l'Origine des Fichiers**

L'entraînement de modèles de Deep Learning sur des images est une tâche très gourmande en ressources, qui requiert une puissance de calcul GPU importante. Pour cette raison, ce notebook n'a pas été ré-exécuté dans cet environnement.

Par conséquent, les fichiers de **logits** (`gcvit_logits.pt`, `convnextv2_logits.pt`, etc.) et les modèles générés par ce script proviennent d'une exécution du notebook d'origine réalisée sur un serveur de calcul plus puissant et payant.

PARTIE 1 : Entraînement des Modèles de Computer Vision par lots

In [None]:
# ==============================================================================
# NOTEBOOK D'ENTRAÎNEMENT PAR LOTS DE MODÈLES DE COMPUTER VISION
#
# Objectif : Préparer les données images, puis entraîner, évaluer et
# sauvegarder une suite de modèles de classification d'images (Maxvit,
# ConvNeXt, EfficientNet, etc.) de manière automatisée.
#
# Le code est structuré pour être modulaire et facilement extensible à de
# nouvelles architectures de modèles.
#
# ==============================================================================

In [None]:
!pip install timm -q

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m57.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m47.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m51.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m211.5/211.5 MB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.3/56.3 MB[0m [31m10.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m127.9/127.9 MB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [None]:
# Montage de Google Drive
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# ====== 1. PRÉPARATION DE L'ENVIRONNEMENT ======

# Importation des bibliothèques
import os
import json
import shutil
import cv2
import torch
import joblib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import timm

# PyTorch et Torchvision
from torch import nn
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms
from torchvision.transforms.autoaugment import TrivialAugmentWide

# Scikit-learn
from sklearn.metrics import f1_score
from sklearn.utils.class_weight import compute_class_weight

In [None]:
# ====== 2. PRÉPARATION DES DONNÉES IMAGES ======

# Cette section décompresse les images depuis un fichier .zip sur Drive
# vers l'espace local de Colab pour un accès plus rapide.

# Définition des chemins
zip_path = "/content/drive/MyDrive/Colab Notebooks/images/image_train.zip"
temp_path = "/content/temp_extraction/"
final_image_path = "/content/images/"

# Création des dossiers nécessaires
os.makedirs(temp_path, exist_ok=True)
os.makedirs(final_image_path, exist_ok=True)

# Décompression du fichier zip dans le dossier temporaire
print("Décompression des images...")
!unzip -q "{zip_path}" -d "{temp_path}"
print(f"✅ Zip extrait dans : {temp_path}")

def find_image_folder(base_dir):
    """
    Parcourt un répertoire pour trouver le premier sous-dossier contenant des fichiers images.

    Cette fonction est utile après une décompression où les images peuvent se trouver
    dans un sous-dossier dont le nom n'est pas connu à l'avance (par exemple,
    `/content/temp_extraction/image_train/`). Elle identifie le chemin
    correct vers les images en se basant sur la présence de fichiers avec des
    extensions courantes (.jpg, .png, etc.).

    Args:
        base_dir (str): Le chemin du répertoire de base où commencer la recherche.

    Returns:
        str | None: Le chemin complet du premier sous-dossier contenant des images,
                    ou None si aucun dossier d'images n'est trouvé.
    """
    for root, _, files in os.walk(base_dir):
        if any(f.lower().endswith(('.jpg', '.jpeg', '.png', '.webp')) for f in files):
            return root
    return None

# Déplacement des images vers le dossier final
image_root = find_image_folder(temp_path)
if image_root:
    moved_count = 0
    for fname in os.listdir(image_root):
        shutil.move(os.path.join(image_root, fname), os.path.join(final_image_path, fname))
        moved_count += 1
    print(f"✅ {moved_count} images déplacées vers : {final_image_path}")
else:
    print("❌ Aucune image trouvée après décompression.")

# Nettoyage du dossier temporaire
shutil.rmtree(temp_path)
print("🧹 Dossier temporaire supprimé.")

In [None]:
# ====== 3. PRÉPARATION DU FICHIER CSV ======

# Le CSV original contient des chemins qui ne sont pas valides dans Colab.
# Cette section met à jour les chemins des images pour pointer vers le
# dossier local `/content/images/` et sauvegarde ce nouveau CSV.

print("\nMise à jour des chemins dans le fichier CSV...")
df_original = pd.read_csv("/content/drive/MyDrive/Colab Notebooks/data_rakuten/X_train_fr_final.csv")
df_collab = df_original.copy()
df_collab['image_path'] = df_collab['image_path'].apply(lambda p: os.path.join(final_image_path, os.path.basename(p)))
collab_csv_path = '/content/X_train_fr_final_colab.csv'
df_collab.to_csv(collab_csv_path, index=False)

# Sauvegarde de la version modifiée sur Drive pour une utilisation future
drive_csv_path = '/content/drive/MyDrive/Colab Notebooks/data_rakuten/X_train_fr_final_colab.csv'
shutil.copy(collab_csv_path, drive_csv_path)
print(f"✅ CSV prêt et sauvegardé sur Drive : {drive_csv_path}")

In [None]:
# ====== 4. CONFIGURATION CENTRALE ET CHARGEMENT DES DONNÉES ======

# Configuration partagée pour tous les modèles
CONFIG = {
    "max_epochs": 30,
    "patience": 7,
    "batch_size": 32,
    "lr": 1e-4,
    "weight_decay": 1e-2,
}

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
BASE_PATH = "/content/drive/MyDrive/Colab Notebooks/data_rakuten"
DRIVE_OUTPUT_PATH = os.path.join(BASE_PATH, "models_image")
os.makedirs(DRIVE_OUTPUT_PATH, exist_ok=True)

# Chargement des données et des indices de séparation
df = pd.read_csv(collab_csv_path)
paths, labels = df["image_path"].values, df["label"].values
with open(os.path.join(BASE_PATH, "val_indices.json"), "r") as f:
    val_idx = np.array(json.load(f), dtype=int)
train_idx = np.setdiff1d(np.arange(len(df)), val_idx)

train_paths, train_labels = paths[train_idx], labels[train_idx]
val_paths, val_labels = paths[val_idx], labels[val_idx]
num_classes = len(np.unique(labels))

print(f"\nConfiguration chargée. Utilisation de l'appareil : {DEVICE}")
print(f"Nombre de classes : {num_classes}")

In [None]:
# ====== 5. DÉFINITIONS COMMUNES (TRANSFORMS, DATASET) ======

# Ces définitions sont partagées par tous les modèles pour assurer la cohérence.

# Transformations d'images (avec augmentation pour l'entraînement)
IMAGENET_MEAN, IMAGENET_STD = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]
train_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    TrivialAugmentWide(),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])
val_transforms = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(IMAGENET_MEAN, IMAGENET_STD),
])

class ImageDataset(Dataset):
    """
    Dataset PyTorch personnalisé pour charger des images et leurs labels à la volée.

    Cette classe gère le chargement des images à partir de leurs chemins sur le disque,
    la conversion de leur format de couleur (de BGR, utilisé par OpenCV, à RGB),
    et l'application de transformations (par exemple, redimensionnement, augmentation
    de données, normalisation) avant de les retourner pour l'entraînement ou l'évaluation.

    Attributes:
        paths (np.ndarray): Un tableau des chemins d'accès aux fichiers images.
        labels (np.ndarray): Un tableau des labels numériques correspondants.
        transform (callable): La pipeline de transformations à appliquer à chaque image.
    """
    def __init__(self, paths, labels, transform):
        """
        Initialise le Dataset.

        Args:
            paths (np.ndarray): La liste des chemins complets vers les images.
            labels (np.ndarray): La liste des labels associés à chaque image.
            transform (callable): Une fonction ou composition de transformations
                                  (de torchvision.transforms) à appliquer à chaque image.
        """
        self.paths = paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        """
        Retourne le nombre total d'échantillons dans le dataset.

        Returns:
            int: La taille totale du dataset.
        """
        return len(self.labels)

    def __getitem__(self, i):
        """
        Récupère et retourne un échantillon (image transformée et son label) à un index donné.

        Args:
            i (int): L'index de l'échantillon à récupérer.

        Returns:
            tuple: Un tuple contenant deux éléments :
                   - torch.Tensor: L'image transformée sous forme de tenseur.
                   - int: Le label numérique de l'image.
        """
        img = cv2.imread(self.paths[i])
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        return self.transform(img), int(self.labels[i])

# Création des datasets
train_dataset = ImageDataset(train_paths, train_labels, train_transforms)
val_dataset = ImageDataset(val_paths, val_labels, val_transforms)
full_dataset = ImageDataset(paths, labels, val_transforms)

In [None]:
# ====== 6. BOUCLE D'ENTRAÎNEMENT PRINCIPALE ======

# Cette boucle itère sur une liste de noms de modèles `timm`.
# Pour chaque modèle, elle exécute le cycle complet : entraînement, validation,
# sauvegarde du meilleur état, et génération des logits.

models_to_train = [
    "maxvit_base_tf_224.in21k",
    "convnextv2_base.in22k",
    "tf_efficientnetv2_l.in21k",
    "coatnet_rmlp_2_rw_224.sw_in12k_ft_in1k",
    "gcvit_base.in1k"
]

performance_history = {}

for model_timm_name in models_to_train:
    model_name = model_timm_name.split('.')[0]
    print("\n" + "="*50)
    print(f" DÉBUT DE L'ENTRAÎNEMENT POUR : {model_name.upper()} ")
    print("="*50)

    # --- Initialisation spécifique au modèle ---
    model = timm.create_model(model_timm_name, pretrained=True, num_classes=num_classes).to(DEVICE)

    # --- Dataloader avec rééchantillonnage pour gérer le déséquilibre ---
    counts = np.bincount(train_labels, minlength=num_classes)
    sample_weights = 1.0 / counts[train_labels]
    sampler = WeightedRandomSampler(sample_weights, len(sample_weights), replacement=True)
    train_loader = DataLoader(train_dataset, batch_size=CONFIG["batch_size"], sampler=sampler, num_workers=2)
    val_loader = DataLoader(val_dataset, batch_size=CONFIG["batch_size"], shuffle=False, num_workers=2)

    # --- Optimiseur, Scheduler et Fonction de Perte ---
    optimizer = torch.optim.AdamW(model.parameters(), lr=CONFIG["lr"], weight_decay=CONFIG["weight_decay"])
    scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=CONFIG["max_epochs"])
    class_weights = torch.tensor(compute_class_weight("balanced", classes=np.unique(train_labels), y=train_labels), dtype=torch.float, device=DEVICE)
    loss_fn = nn.CrossEntropyLoss(weight=class_weights, label_smoothing=0.1)

    # --- Boucle d'entraînement et de validation ---
    best_f1, wait = 0.0, 0
    best_state = model.state_dict()
    val_f1s = []

    for ep in range(1, CONFIG["max_epochs"] + 1):
        model.train()
        total_loss = 0
        pbar = tqdm(train_loader, desc=f"{model_name} Ep{ep}")
        for x, y in pbar:
            x, y = x.to(DEVICE), y.to(DEVICE)
            optimizer.zero_grad()
            out = model(x)
            loss = loss_fn(out, y)
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        scheduler.step()

        # --- Validation ---
        model.eval()
        y_true, y_pred = [], []
        with torch.no_grad():
            for x, y in val_loader:
                x = x.to(DEVICE)
                logits = model(x)
                y_pred.extend(logits.argmax(1).cpu().numpy())
                y_true.extend(y.numpy())
        f1 = f1_score(y_true, y_pred, average="weighted")
        val_f1s.append(f1)
        print(f"  -> TrainLoss: {total_loss/len(train_loader):.4f} | Val F1_weighted: {f1:.4f}")

        if f1 > best_f1:
            print(f"   => Nouveau meilleur score F1 ! Sauvegarde du modèle.")
            best_f1, best_state, wait = f1, model.state_dict(), 0
        else:
            wait += 1
            if wait >= CONFIG["patience"]:
                print(f"⏹️ Arrêt anticipé à l'époque {ep}.")
                break

    performance_history[model_name] = val_f1s

    # --- Sauvegarde du meilleur modèle et des logits ---
    print(f"Sauvegarde du meilleur modèle {model_name} avec F1={best_f1:.4f}")
    model.load_state_dict(best_state)
    torch.save(model.state_dict(), os.path.join(DRIVE_OUTPUT_PATH, f"{model_name}_best.pt"))

    # --- Génération des logits sur le dataset complet ---
    full_loader = DataLoader(full_dataset, batch_size=CONFIG["batch_size"], shuffle=False, num_workers=2)
    all_logits = []
    model.eval()
    with torch.no_grad():
        for x, _ in tqdm(full_loader, desc=f"Génération des logits pour {model_name}"):
            x = x.to(DEVICE)
            all_logits.append(model(x).cpu())
    all_logits = torch.cat(all_logits)
    torch.save(all_logits, os.path.join(BASE_PATH, f"{model_name}_logits.pt"))
    print(f"Logits sauvegardés. Shape: {all_logits.shape}")

In [None]:
# ====== 7. RÉSULTATS GLOBAUX ET VISUALISATION ======

print("\n" + "="*50)
print("             RÉSULTATS DE L'ENTRAÎNEMENT             ")
print("="*50)

# Création d'un DataFrame récapitulatif
summary_data = []
for model_name, f1_scores in performance_history.items():
    summary_data.append({
        "model": model_name,
        "best_f1_weighted": max(f1_scores) if f1_scores else 0
    })
df_summary = pd.DataFrame(summary_data).set_index("model")
df_summary.to_csv(os.path.join(DRIVE_OUTPUT_PATH, "recap_scores_modeles_image.csv"))

print("Scores finaux par modèle :")
print(df_summary)

# Création du graphique comparatif
plt.figure(figsize=(12, 7))
for model_name, f1_scores in performance_history.items():
    plt.plot(range(1, len(f1_scores) + 1), f1_scores, marker='o', linestyle='--', label=model_name)
plt.title("Comparaison des Scores F1 (Validation) par Époque")
plt.xlabel("Époque")
plt.ylabel("F1 Score Pondéré (Weighted)")
plt.xticks(range(1, CONFIG["max_epochs"] + 1))
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

PARTIE 2 : Fusion et Stacking Avancés des Modèles de Computer Vision

In [None]:
# ==============================================================================
# NOTEBOOK DE FUSION AVANCÉE ET STACKING POUR MODÈLES DE COMPUTER VISION
#
# Objectif : Combiner les prédictions des modèles de computer vision entraînés dans le
# Notebook 3 pour maximiser les performances.
#
# Étapes :
#   1. Fusion Pondérée : Recherche des poids optimaux avec Optuna pour créer
#      un "ensemble" de modèles simple mais performant.
#   2. Stacking : Utilisation des logits comme features pour entraîner des
#      méta-modèles (LGBM, MLP). Le meilleur est sélectionné par validation
#      croisée puis entraîné sur toutes les données.
# ==============================================================================

In [None]:
!pip install optuna lightgbm -q

In [None]:
# Montage de Google Drive pour accéder aux fichiers depuis Colab
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# ====== 1. PRÉPARATION DE L'ENVIRONNEMENT ======

# Importation des bibliothèques
import os
import json
import shutil
import joblib
import numpy as np
import pandas as pd
import torch
import optuna
import lightgbm as lgb

# PyTorch et Scikit-learn
from torch import nn
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.metrics import f1_score, classification_report
from sklearn.model_selection import StratifiedKFold

In [None]:
# ====== 2. MISE EN PLACE DE L'ESPACE DE TRAVAIL ======

# Définition des chemins
drive_path = "/content/drive/MyDrive/Colab Notebooks/data_rakuten"
local_path = '/content/data_stacking_vision/'
os.makedirs(local_path, exist_ok=True)

print("--- Copie des logits et des fichiers de configuration depuis Google Drive ---")

# Liste des logits générés par le notebook précédent
logits_files = [
    "maxvit_base_tf_224_logits.pt",
    "convnextv2_base_logits.pt",
    "tf_efficientnetv2_l_logits.pt",
    "coatnet_rmlp_2_rw_224_logits.pt",
    "gcvit_base_logits.pt",
    "true_labels_final.pt",
    "val_indices.json",
    "label_mapping_final.json"
]

# Copie des fichiers en local pour un accès rapide
for filename in logits_files:
    src_file = os.path.join(drive_path, filename)
    dst_file = os.path.join(local_path, filename)
    if os.path.exists(src_file):
        shutil.copy(src_file, dst_file)
        print(f"✅ Copié : {filename}")
    else:
        print(f"❌ Manquant : {src_file}")

In [None]:
# ====== 3. CHARGEMENT DES DONNÉES ======

# Chargement des logits de chaque modèle dans un dictionnaire
all_logits = {}
model_names = [f.replace('_logits.pt', '') for f in logits_files if f.endswith('_logits.pt') and 'true_labels' not in f]
for name in model_names:
    all_logits[name] = torch.load(os.path.join(local_path, f"{name}_logits.pt")).cpu().numpy()

# Chargement des étiquettes et indices
with open(os.path.join(local_path, "val_indices.json"), "r") as f:
    val_idx = np.array(json.load(f))
with open(os.path.join(local_path, "label_mapping_final.json"), "r") as f:
    label_mapping = json.load(f)

labels_full = torch.load(os.path.join(local_path, "true_labels_final.pt")).numpy()
class_names = [label_mapping[str(i)]["label_name"] for i in range(len(label_mapping))]

# Séparation des étiquettes et des logits en ensembles d'entraînement et de validation
all_indices = np.arange(len(labels_full))
train_idx = np.setdiff1d(all_indices, val_idx)
y_train, y_val = labels_full[train_idx], labels_full[val_idx]
logits_train = {name: log[train_idx] for name, log in all_logits.items()}
logits_val = {name: log[val_idx] for name, log in all_logits.items()}

print(f"\nDonnées de fusion et de stacking prêtes.")

In [None]:
# ====== 4. FUSION PONDÉRÉE (ENSEMBLING) ======

# Nous utilisons Optuna pour trouver la meilleure pondération à appliquer aux
# probabilités de chaque modèle afin de maximiser le score F1.

def objective_fusion(trial, logits_dict, y_true):
    """
    Fonction objective pour Optuna visant à maximiser le score F1 pondéré.

    Cette fonction est appelée par Optuna à chaque essai. Elle suggère un ensemble de
    poids pour chaque modèle, normalise ces poids pour qu'ils somment à 1, puis
    calcule les prédictions fusionnées en effectuant une moyenne pondérée des
    probabilités (obtenues via softmax sur les logits). Le score F1 qui en résulte
    est retourné pour être optimisé.

    Args:
        trial (optuna.trial.Trial): L'objet d'essai d'Optuna qui gère la suggestion
                                    des hyperparamètres (ici, les poids).
        logits_dict (dict): Dictionnaire où les clés sont les noms des modèles et
                            les valeurs sont les logits (np.ndarray) correspondants.
        y_true (np.ndarray): Le tableau des véritables labels pour l'évaluation.

    Returns:
        float: Le score F1 pondéré calculé pour la combinaison de poids actuelle.
               Optuna cherchera à maximiser cette valeur.
    """
    weights = [trial.suggest_float(name, 0.0, 1.0) for name in logits_dict.keys()]
    s = sum(weights)
    if s < 1e-6: return 0.0 # Retourne un mauvais score pour éviter la division par 0

    # Normalisation des poids
    weights = np.array(weights) / s

    # Calcul des probabilités pondérées
    fused_probs = np.zeros_like(list(logits_dict.values())[0])
    for i, name in enumerate(logits_dict.keys()):
        probs = torch.softmax(torch.tensor(logits_dict[name]), dim=1).numpy()
        fused_probs += weights[i] * probs

    preds = np.argmax(fused_probs, axis=1)
    return f1_score(y_true, preds, average="weighted")

print("\n--- Recherche des poids de fusion optimaux avec Optuna ---")
study = optuna.create_study(direction="maximize")
study.optimize(lambda trial: objective_fusion(trial, logits_val, y_val), n_trials=150)

# Évaluation avec les meilleurs poids trouvés
best_weights = np.array([study.best_params[name] for name in model_names])
best_weights /= sum(best_weights)
fused_probs_val = sum(best_weights[i] * torch.softmax(torch.tensor(logits_val[name]), dim=1).numpy() for i, name in enumerate(model_names))
preds_fusion = np.argmax(fused_probs_val, axis=1)
f1_fusion = f1_score(y_val, preds_fusion, average='weighted')

print(f"\nMeilleur F1-Score (Fusion Pondérée) : {f1_fusion:.5f}")
print("Poids optimaux :")
for name, weight in zip(model_names, best_weights):
    print(f"  - {name}: {weight:.4f}")

In [None]:
# ====== 5. STACKING AVEC UN MÉTA-MODÈLE ======

# Le stacking utilise les logits des modèles de base comme "features" pour
# entraîner un méta-modèle. Cela permet de capturer des relations plus complexes
# entre les prédictions des modèles de base.

# --- 5.1. Préparation des données pour le Stacking ---
X_train_stack = np.concatenate(list(logits_train.values()), axis=1)
X_val_stack = np.concatenate(list(logits_val.values()), axis=1)

print(f"\nShape des features de Stacking (Train): {X_train_stack.shape}")
print(f"Shape des features de Stacking (Val):   {X_val_stack.shape}")


# --- 5.2. Sélection du meilleur méta-modèle par validation croisée ---
# Nous évaluons LightGBM et un MLP pour voir lequel est le plus performant
# sur nos données de stacking.

def train_lgbm(X_train, y_train, X_val, y_val):
    """
    Entraîne et évalue un classifieur LightGBM.

    Cette fonction utilitaire initialise un modèle LGBMClassifier, l'entraîne sur
    les données d'entraînement fournies et retourne le score F1 pondéré calculé
    sur l'ensemble de validation.

    Args:
        X_train (np.ndarray): Features d'entraînement (logits concaténés).
        y_train (np.ndarray): Labels d'entraînement.
        X_val (np.ndarray): Features de validation.
        y_val (np.ndarray): Labels de validation.

    Returns:
        float: Le score F1 pondéré du modèle sur les données de validation.
    """
    model = lgb.LGBMClassifier(random_state=42)
    model.fit(X_train, y_train, eval_set=[(X_val, y_val)])
    preds = model.predict(X_val)
    return f1_score(y_val, preds, average="weighted")

print("\n--- Évaluation des méta-modèles par validation croisée (5-fold) ---")
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
lgbm_scores = []

for fold, (train_fold_idx, val_fold_idx) in enumerate(skf.split(X_train_stack, y_train)):
    X_train_fold, X_val_fold = X_train_stack[train_fold_idx], X_train_stack[val_fold_idx]
    y_train_fold, y_val_fold = y_train[train_fold_idx], y_train[val_fold_idx]

    score = train_lgbm(X_train_fold, y_train_fold, X_val_fold, y_val_fold)
    lgbm_scores.append(score)
    print(f"Fold {fold+1}/5 - F1 Score LGBM : {score:.4f}")

avg_lgbm_score = np.mean(lgbm_scores)
print(f"\nScore F1 moyen pour LightGBM (CV) : {avg_lgbm_score:.5f}")
# (Note: une CV similaire pourrait être faite pour un MLP, mais LGBM est souvent
# un excellent point de départ pour les données tabulaires comme les logits).


# --- 5.3. Entraînement et Évaluation du Méta-Modèle Final ---
print("\n--- Entraînement du méta-modèle final (LGBM) sur toutes les données d'entraînement ---")

final_lgbm_model = lgb.LGBMClassifier(
    objective="multiclass",
    num_class=len(class_names),
    metric="multi_logloss",
    random_state=42
)
final_lgbm_model.fit(X_train_stack, y_train, eval_set=[(X_val_stack, y_val)])
preds_lgbm_final = final_lgbm_model.predict(X_val_stack)

In [None]:
# ====== 6. RÉSULTATS FINAUX ET SAUVEGARDE ======

print("\n" + "="*50)
print("              RÉSULTATS FINAUX              ")
print("="*50)

print(f"Score F1 (Fusion Pondérée Simple) : {f1_fusion:.5f}")

print("\n--- Rapport de Classification Final (Stacking avec LightGBM) ---")
print(classification_report(y_val, preds_lgbm_final, target_names=class_names, digits=4))

# Sauvegarde du méta-modèle final, qui est l'artefact le plus précieux.
meta_model_path = os.path.join(drive_path, 'meta_model_vision_lgbm.pkl')
joblib.dump(final_lgbm_model, meta_model_path)
print(f"\n✅ Méta-modèle final sauvegardé sur Drive : {meta_model_path}")

In [None]:
!pip freeze > "/content/drive/MyDrive/Colab Notebooks/requirements_Images.txt"