In [1]:
from preprocess import ProductClassificationPipeline, PipelineConfig
import pandas as pd
import numpy as np
import os
import yaml
import matplotlib.pyplot as plt
import shap
import torch
import xgboost as xgb
from sklearn.metrics import accuracy_score, f1_score

# URL temporaire pour les données texte (à remplacer par la vôtre)
url_texte = "https://drive.google.com/file/d/1fakeurl123456789/view?usp=sharing"

def load_model_configs():
    """
    Charge les configurations des modèles depuis le YAML
    
    Returns:
        dict: Configurations des modèles avec leurs paramètres
    """
    try:
        config_path = os.path.join('data', 'models', 'model_configs.yaml')
        with open(config_path, 'r', encoding='utf-8') as f:
            configs = yaml.safe_load(f)
        
        # Vérification de la présence de tous les modèles attendus
        expected_models = {'xgboost', 'neural_net', 'SVM'}
        missing_models = expected_models - set(configs.keys())
        
        if missing_models:
            print(f"Attention : modèles manquants dans la configuration : {missing_models}")
            
        return configs
        
    except Exception as e:
        print(f"Erreur lors du chargement des configurations : {str(e)}")
        return {}

def load_models(pipeline, model_names=['xgboost', 'neural_net', 'SVM']):
    """
    Charge les modèles spécifiés
    
    Args:
        pipeline: Pipeline de classification
        model_names: Liste des noms de modèles à charger
        
    Returns:
        dict: Dictionnaire contenant les modèles chargés
    """
    models = {}
    
    for model_name in model_names:
        try:
            pipeline.load_model(model_name)
            models[model_name] = pipeline.model
            print(f"Modèle {model_name} chargé avec succès")
        except Exception as e:
            print(f"Erreur lors du chargement du modèle {model_name}: {str(e)}")
    
    return models

def fuse_predictions(text_probs, image_probs, strategy='mean'):
    """
    Fusionne les prédictions texte et image selon différentes stratégies
    
    Args:
        text_probs: Probabilités du modèle texte
        image_probs: Probabilités du modèle image
        strategy: Stratégie de fusion ('mean', 'product', 'max', 'weighted', 'confidence_weighted')
        
    Returns:
        np.array: Probabilités fusionnées
    """
    if strategy == 'mean':
        return (text_probs + image_probs) / 2
    elif strategy == 'product':
        return text_probs * image_probs
    elif strategy == 'max':
        return np.maximum(text_probs, image_probs)
    elif strategy == 'weighted':
        # Pondération fixe 60% texte, 40% image (à ajuster selon tests)
        return text_probs * 0.6 + image_probs * 0.4
    elif strategy == 'confidence_weighted':
        # Pondération dynamique basée sur la confiance de chaque modèle
        return fuse_predictions_confidence_weighted(text_probs, image_probs)
    else:
        raise ValueError(f"Stratégie de fusion {strategy} non supportée")

def fuse_predictions_confidence_weighted(text_probs, image_probs):
    """
    Fusionne les prédictions en utilisant la confiance de chaque modèle comme poids dynamique
    
    Args:
        text_probs: Probabilités du modèle texte
        image_probs: Probabilités du modèle image
        
    Returns:
        np.array: Probabilités fusionnées
    """
    # 1. Obtenir la confiance de chaque modèle (probabilité max pour chaque exemple)
    text_confidence = np.max(text_probs, axis=1, keepdims=True)
    image_confidence = np.max(image_probs, axis=1, keepdims=True)
    
    # 2. Normaliser les confidences pour obtenir les poids
    total_confidence = text_confidence + image_confidence
    text_weights = text_confidence / total_confidence
    image_weights = image_confidence / total_confidence
    
    # 3. Appliquer les poids spécifiques à chaque exemple
    weighted_text_probs = text_probs * text_weights
    weighted_image_probs = image_probs * image_weights
    
    # 4. Combiner les probabilités pondérées
    fused_probs = weighted_text_probs + weighted_image_probs
    
    return fused_probs

def evaluate_multimodal_combinations(pipeline, text_model, image_models, X_text_test, X_image_test, y_test, idx_to_category):
    """
    Évalue différentes combinaisons de modèles multimodaux en utilisant les méthodes du pipeline
    
    Args:
        pipeline: Instance de ProductClassificationPipeline
        text_model: Modèle de texte
        image_models: Dictionnaire de modèles d'image
        X_text_test: Données de test texte
        X_image_test: Données de test image
        y_test: Vérité terrain
        idx_to_category: Mapping des indices vers les catégories
    """
    results = {}
    fusion_strategies = ['mean', 'product', 'weighted', 'max', 'confidence_weighted']
    print(f"Dimensions originales: X_text_test={len(X_text_test)}, X_image_test={len(X_image_test)}")
    
    category_to_idx = {code: idx for idx, code in idx_to_category.items()}
    
    # Vérifier que les dimensions correspondent
    if len(X_text_test) != len(X_image_test) or len(X_text_test) != len(y_test):
        print("AVERTISSEMENT: Les dimensions ne correspondent toujours pas!")
        min_size = min(len(X_text_test), len(X_image_test), len(y_test))
        text_input = X_text_test.iloc[:min_size] if hasattr(X_text_test, 'iloc') else X_text_test[:min_size]
        X_image_test = X_image_test[:min_size]
        y_test = y_test[:min_size]
    else:
        text_input = X_text_test
    
    # Obtenir les prédictions texte en utilisant le modèle SVM
    print("Génération des prédictions texte...")
    try:
        if isinstance(text_input, pd.Series):
            text_data = text_input.values.tolist()
        else:
            text_data = text_input
            
        # Prédictions directes avec le modèle de texte
        if hasattr(text_model, 'predict'):
            text_preds = text_model.predict(text_data)
            text_probs = text_model.predict_proba(text_data)
        else:
            # Si c'est un pipeline personnalisé
            pipeline.model = text_model
            text_preds, text_probs = pipeline.predict(text_data)
    except Exception as e:
        print(f"Erreur lors de la prédiction texte: {e}")
        # Fallback pour les erreurs
        num_classes = len(idx_to_category)
        text_preds = np.random.randint(0, num_classes, size=len(text_input))
        text_probs = np.random.random((len(text_input), num_classes))
        text_probs = text_probs / text_probs.sum(axis=1, keepdims=True)
    
    # Convertir les prédictions en indices si nécessaire
    if isinstance(text_preds[0], (int, np.integer)) and max(text_preds) < 100:
        # C'est déjà des indices
        text_preds_idx = text_preds
    else:
        # Convertir les codes de catégorie en indices
        text_preds_idx = np.array([category_to_idx.get(code, 0) for code in text_preds])
    
    # Même chose pour y_test
    if isinstance(y_test[0], (int, np.integer)) and max(y_test) < 100:
        y_test_idx = y_test
    else:
        y_test_idx = np.array([category_to_idx.get(code, 0) for code in y_test])
    
    # Calcul des métriques texte
    text_accuracy = accuracy_score(y_test_idx, text_preds_idx)
    text_f1 = f1_score(y_test_idx, text_preds_idx, average='weighted')
    
    text_results = {
        "accuracy": text_accuracy,
        "weighted_f1": text_f1
    }
    results["text_only"] = text_results
    print(f"Texte uniquement: accuracy={text_results['accuracy']:.4f}, f1={text_results['weighted_f1']:.4f}")
    
    # Pour chaque modèle image
    for model_name, image_model in image_models.items():
        print(f"Génération des prédictions pour {model_name}...")
        
        try:
            pipeline.model = image_model
            image_preds, image_probs = pipeline.predict(X_image_test)
            
            # Résultats image uniquement
            image_results = {
                "accuracy": accuracy_score(y_test_idx, image_preds),
                "weighted_f1": f1_score(y_test_idx, image_preds, average='weighted')
            }
            results[f"{model_name}_only"] = image_results
            print(f"{model_name} uniquement: accuracy={image_results['accuracy']:.4f}, f1={image_results['weighted_f1']:.4f}")
            
            # Essayer différentes stratégies de fusion
            for strategy in fusion_strategies:
                print(f"Évaluation de la fusion '{strategy}' avec {model_name}...")
                                
                fused_probs = fuse_predictions(text_probs, image_probs, strategy)
                fused_preds = np.argmax(fused_probs, axis=1)
                
                fused_results = {
                    "accuracy": accuracy_score(y_test_idx, fused_preds),
                    "weighted_f1": f1_score(y_test_idx, fused_preds, average='weighted')
                }
                
                results[f"{model_name}_{strategy}"] = fused_results
                print(f"Fusion {model_name} ({strategy}): accuracy={fused_results['accuracy']:.4f}, f1={fused_results['weighted_f1']:.4f}")
        except Exception as e:
            print(f"Erreur avec le modèle {model_name}: {e}")
            results[f"{model_name}_error"] = {"error": str(e)}
    
    # Conversion en DataFrame
    results_df = pd.DataFrame.from_dict(results, orient='index')
    return results_df

def explain_text_model(text_model, X_text_sample, feature_names=None):
    """
    Explique le modèle texte avec SHAP
    
    Args:
        text_model: Modèle SVM texte
        X_text_sample: Échantillon de données texte
        feature_names: Noms des caractéristiques
        
    Returns:
        tuple: Valeurs SHAP et explainer
    """
    print("Génération des explications pour le modèle texte...")
    
    # Créer un explainer SHAP pour le modèle SVM
    explainer = shap.KernelExplainer(text_model.predict_proba, X_text_sample[:50])
    
    # Calculer les valeurs SHAP
    shap_values = explainer.shap_values(X_text_sample[:100])
    
    # Visualiser les résultats
    plt.figure(figsize=(12, 8))
    shap.summary_plot(shap_values, X_text_sample[:100], feature_names=feature_names, show=False)
    plt.title("Explication SHAP pour le modèle texte")
    plt.tight_layout()
    plt.savefig("data/reports/shap_text_model.png")
    
    return shap_values, explainer

def explain_image_model(model_type, model, X_image_sample, feature_names=None):
    """
    Explique le modèle image avec SHAP
    
    Args:
        model_type: Type de modèle ('xgboost' ou 'neural_net')
        model: Modèle d'image
        X_image_sample: Échantillon de données image
        feature_names: Noms des caractéristiques
        
    Returns:
        tuple: Valeurs SHAP et explainer
    """
    print(f"Génération des explications pour le modèle {model_type}...")
    
    # Différentes approches selon le type de modèle
    if model_type == 'xgboost':
        # Utiliser TreeExplainer pour XGBoost (plus efficace)
        explainer = shap.TreeExplainer(model)
        shap_values = explainer.shap_values(X_image_sample[:100])
        
        # Visualisations
        plt.figure(figsize=(12, 8))
        shap.summary_plot(shap_values, X_image_sample[:100], feature_names=feature_names, show=False)
        plt.title(f"Explication SHAP pour le modèle {model_type}")
        plt.tight_layout()
        plt.savefig(f"data/reports/shap_{model_type}.png")
        
        # Feature importance globale
        plt.figure(figsize=(12, 8))
        plt.barh(range(min(20, len(model.feature_importances_))), 
                 sorted(model.feature_importances_, reverse=True)[:20])
        plt.title(f"Top 20 caractéristiques importantes pour {model_type}")
        plt.tight_layout()
        plt.savefig(f"data/reports/{model_type}_feature_importance.png")
        
    elif model_type == 'neural_net':
        # Pour MLP on utilise le DeepExplainer
        try:
            # Conversion en tenseurs PyTorch
            background = torch.FloatTensor(X_image_sample[:50]).to(model.device)
            test_tensor = torch.FloatTensor(X_image_sample[:100]).to(model.device)
            
            # Utiliser GradientExplainer à la place de DeepExplainer si disponible
            explainer = shap.GradientExplainer(model, background)
            shap_values = explainer.shap_values(test_tensor)
            
            # Visualisations
            plt.figure(figsize=(12, 8))
            shap.summary_plot(shap_values, X_image_sample[:100], feature_names=feature_names, show=False)
            plt.title(f"Explication SHAP pour le modèle {model_type}")
            plt.tight_layout()
            plt.savefig(f"data/reports/shap_{model_type}.png")
            
        except Exception as e:
            print(f"Erreur lors de l'explication du modèle neural_net: {str(e)}")
            print("Utilisation du KernelExplainer en secours...")
            
            # Utiliser KernelExplainer comme solution de secours
            def model_predict(x):
                # Convertir les données en tensor PyTorch
                x_tensor = torch.FloatTensor(x).to(model.device)
                # Obtenir les prédictions du modèle
                with torch.no_grad():
                    outputs = model(x_tensor)
                    probs = torch.softmax(outputs, dim=1)
                return probs.cpu().numpy()
            
            explainer = shap.KernelExplainer(model_predict, X_image_sample[:50])
            shap_values = explainer.shap_values(X_image_sample[:100])
            
            # Visualisations
            plt.figure(figsize=(12, 8))
            shap.summary_plot(shap_values, X_image_sample[:100], feature_names=feature_names, show=False)
            plt.title(f"Explication SHAP pour le modèle {model_type} (KernelExplainer)")
            plt.tight_layout()
            plt.savefig(f"data/reports/shap_{model_type}_kernel.png")
    
    return shap_values, explainer

def explain_multimodal(text_model, image_model, model_type, X_text_sample, X_image_sample, 
                      fusion_strategy='mean', class_names=None):
    """
    Explique le modèle multimodal fusionné
    
    Args:
        text_model: Modèle texte
        image_model: Modèle image
        model_type: Type du modèle image
        X_text_sample: Échantillon de données texte
        X_image_sample: Échantillon de données image
        fusion_strategy: Stratégie de fusion
        class_names: Noms des classes
        
    Returns:
        dict: Résultats d'explication
    """
    print(f"Explication du modèle multimodal ({model_type}, fusion: {fusion_strategy})...")
    
    # Créer une fonction pour prédire avec le modèle fusionné
    def fused_predict(x):
        # Supposons que x soit divisé en [features_texte, features_image]
        # Dans un cas réel, vous voudriez peut-être séparer x en deux parties
        text_part = x[:, :X_text_sample.shape[1]]
        image_part = x[:, X_text_sample.shape[1]:]
        
        # Obtenir les prédictions individuelles
        text_probs = text_model.predict_proba(text_part)
        
        if model_type == 'xgboost':
            image_probs = image_model.predict_proba(image_part)
        else:  # neural_net
            image_part_tensor = torch.FloatTensor(image_part).to(image_model.device)
            with torch.no_grad():
                outputs = image_model(image_part_tensor)
                image_probs = torch.softmax(outputs, dim=1).cpu().numpy()
        
        # Fusionner selon la stratégie
        return fuse_predictions(text_probs, image_probs, strategy=fusion_strategy)
    
    # Combiner les échantillons
    combined_sample = np.hstack([X_text_sample[:100], X_image_sample[:100]])
    
    # Créer l'explainer
    explainer = shap.KernelExplainer(fused_predict, combined_sample[:50])
    
    # Obtenir les valeurs SHAP
    shap_values = explainer.shap_values(combined_sample[:100])
    
    # Créer des noms de features pour meilleure lisibilité
    feature_names = [f'text_{i}' for i in range(X_text_sample.shape[1])] + \
                    [f'img_{i}' for i in range(X_image_sample.shape[1])]
    
    # Visualiser les résultats
    plt.figure(figsize=(16, 10))
    shap.summary_plot(shap_values, combined_sample, feature_names=feature_names, show=False)
    plt.title(f"Explication SHAP pour le modèle multimodal ({model_type}, {fusion_strategy})")
    plt.tight_layout()
    plt.savefig(f"data/reports/shap_multimodal_{model_type}_{fusion_strategy}.png")
    
    # Analyser l'importance relative texte vs image
    text_importance = np.mean(np.abs(np.vstack(shap_values))[:, :X_text_sample.shape[1]])
    image_importance = np.mean(np.abs(np.vstack(shap_values))[:, X_text_sample.shape[1]:])
    
    importance_ratio = {
        'text_importance': float(text_importance),
        'image_importance': float(image_importance),
        'text_percentage': float(text_importance / (text_importance + image_importance) * 100),
        'image_percentage': float(image_importance / (text_importance + image_importance) * 100)
    }
    
    print(f"Importance relative: Texte {importance_ratio['text_percentage']:.2f}%, "
          f"Image {importance_ratio['image_percentage']:.2f}%")
    
    # Créer un graphique de comparaison d'importance
    plt.figure(figsize=(10, 6))
    plt.bar(['Texte', 'Image'], [importance_ratio['text_percentage'], importance_ratio['image_percentage']])
    plt.ylabel('Importance relative (%)')
    plt.title(f'Contribution relative des modalités ({model_type}, {fusion_strategy})')
    plt.ylim(0, 100)
    plt.tight_layout()
    plt.savefig(f"data/reports/modalite_importance_{model_type}_{fusion_strategy}.png")
    
    return {
        'shap_values': shap_values,
        'explainer': explainer,
        'importance': importance_ratio
    }

def sample_data_for_explanation(X_text, X_image, y, n_samples=100, stratify=True):
    """
    Échantillonne les données pour l'explication en assurant une représentation de toutes les classes
    
    Args:
        X_text: Données texte
        X_image: Données image
        y: Étiquettes
        n_samples: Nombre d'échantillons à prendre
        stratify: Si True, échantillonnage stratifié par classe
        
    Returns:
        tuple: X_text_sample, X_image_sample, y_sample
    """
    if stratify:
        from sklearn.model_selection import StratifiedShuffleSplit
        
        sss = StratifiedShuffleSplit(n_splits=1, test_size=n_samples, random_state=42)
        for _, idx in sss.split(X_text, y):
            return X_text[idx], X_image[idx], y[idx]
    else:
        # Échantillonnage aléatoire
        idx = np.random.choice(len(X_text), n_samples, replace=False)
        return X_text[idx], X_image[idx], y[idx]

def analyze_class_specific_explanations(shap_values, y_sample, idx_to_category, category_names, top_n=5):
    """
    Analyse les explications SHAP spécifiques à chaque classe
    
    Args:
        shap_values: Valeurs SHAP
        y_sample: Vérité terrain pour les échantillons
        idx_to_category: Mapping indices -> codes catégorie
        category_names: Mapping codes catégorie -> noms
        top_n: Nombre de features importantes à afficher
        
    Returns:
        dict: Résultats par classe
    """
    class_results = {}
    
    # Pour chaque classe
    unique_classes = np.unique(y_sample)
    for class_idx in unique_classes:
        # Convertir l'indice en code et nom de catégorie
        category_code = idx_to_category[int(class_idx)]
        category_name = category_names[category_code]
        
        # Filtrer les échantillons de cette classe
        class_mask = (y_sample == class_idx)
        if np.sum(class_mask) < 5:  # Au moins 5 échantillons
            continue
            
        # Calculer l'importance moyenne des features pour cette classe
        class_importance = np.mean(np.abs(shap_values[int(class_idx)][class_mask]), axis=0)
        
        # Trouver les top features
        top_indices = np.argsort(class_importance)[-top_n:][::-1]
        top_values = class_importance[top_indices]
        
        class_results[f"{category_code}_{category_name}"] = {
            'top_indices': top_indices,
            'top_values': top_values
        }
        
        # Créer un graphique pour cette classe
        plt.figure(figsize=(10, 6))
        plt.barh(range(top_n), top_values)
        plt.yticks(range(top_n), [f"Feature {idx}" for idx in top_indices])
        plt.title(f"Top {top_n} caractéristiques pour {category_name} (code {category_code})")
        plt.tight_layout()
        plt.savefig(f"data/reports/class_{category_code}_top_features.png")
    
    return class_results

def compare_models_explanations(text_shap, xgboost_shap, mlp_shap, multimodal_shap, y_sample, 
                               idx_to_category, category_names):
    """
    Compare les explications entre les différents modèles
    
    Args:
        text_shap: Valeurs SHAP du modèle texte
        xgboost_shap: Valeurs SHAP du modèle XGBoost
        mlp_shap: Valeurs SHAP du modèle MLP
        multimodal_shap: Valeurs SHAP du modèle multimodal
        y_sample: Vérité terrain pour les échantillons
        idx_to_category: Mapping indices -> codes catégorie
        category_names: Mapping codes catégorie -> noms
        
    Returns:
        pd.DataFrame: Comparaison des explications
    """
    # Initialiser le DataFrame de résultats
    results = []
    
    # Pour chaque classe
    unique_classes = np.unique(y_sample)
    for class_idx in unique_classes:
        # Convertir l'indice en code et nom de catégorie
        category_code = idx_to_category[int(class_idx)]
        category_name = category_names[category_code]
        
        # Filtrer les échantillons de cette classe
        class_mask = (y_sample == class_idx)
        if np.sum(class_mask) < 5:  # Au moins 5 échantillons
            continue
            
        # Calculer la magnitude SHAP moyenne pour chaque modèle
        text_magnitude = np.mean(np.abs(text_shap[int(class_idx)][class_mask]))
        xgb_magnitude = np.mean(np.abs(xgboost_shap[int(class_idx)][class_mask]))
        mlp_magnitude = np.mean(np.abs(mlp_shap[int(class_idx)][class_mask]))
        multi_magnitude = np.mean(np.abs(multimodal_shap[int(class_idx)][class_mask]))
        
        # Déterminer la modalité dominante
        modality = "Texte" if text_magnitude > (xgb_magnitude + mlp_magnitude)/2 else "Image"
        model_rank = sorted(['Texte', 'XGBoost', 'MLP', 'Multimodal'], 
                          key=lambda x: {'Texte': text_magnitude, 'XGBoost': xgb_magnitude, 
                                         'MLP': mlp_magnitude, 'Multimodal': multi_magnitude}[x],
                          reverse=True)
        
        # Ajouter aux résultats
        results.append({
            'category_code': category_code,
            'category_name': category_name,
            'samples': np.sum(class_mask),
            'text_magnitude': text_magnitude,
            'xgboost_magnitude': xgb_magnitude,
            'mlp_magnitude': mlp_magnitude, 
            'multimodal_magnitude': multi_magnitude,
            'dominant_modality': modality,
            'best_model': model_rank[0],
            'model_ranking': ', '.join(model_rank)
        })
    
    # Convertir en DataFrame
    results_df = pd.DataFrame(results)
    
    # Créer un graphique comparatif des magnitudes par classe
    plt.figure(figsize=(16, 10))
    categories = results_df['category_name']
    
    x = np.arange(len(categories))
    width = 0.2
    
    plt.bar(x - 1.5*width, results_df['text_magnitude'], width, label='Texte')
    plt.bar(x - 0.5*width, results_df['xgboost_magnitude'], width, label='XGBoost')
    plt.bar(x + 0.5*width, results_df['mlp_magnitude'], width, label='MLP')
    plt.bar(x + 1.5*width, results_df['multimodal_magnitude'], width, label='Multimodal')
    
    plt.xlabel('Catégorie')
    plt.ylabel('Magnitude SHAP moyenne')
    plt.title('Comparaison des explications SHAP par catégorie et modèle')
    plt.xticks(x, categories, rotation=90)
    plt.legend()
    plt.tight_layout()
    plt.savefig("data/reports/shap_magnitude_comparison.png")
    
    return results_df

if __name__ == "__main__":
    print("1. Configuration et chargement des données..")
    # 1. Configuration et chargement des données
    config = PipelineConfig.from_yaml('config.yaml')
    pipeline = ProductClassificationPipeline(config)
    
    try:
        # Préparer les données
        force_preprocess_image = False
        force_preprocess_texte = False
        pipeline.prepare_data(force_preprocess_image=force_preprocess_image, force_preprocess_text=force_preprocess_texte)
        
        # Sauvegarde des indices pour les tests de prod/exam à venir
        indices_dict = pipeline.extract_and_save_indices()

        # Créer les répertoires de résultats et rapports
        os.makedirs('data/reports', exist_ok=True)
        os.makedirs('data/explanations', exist_ok=True)
        
        print("2. Charger les modèles pré-entraînés..")
        # 2. Charger les modèles pré-entraînés
        models = load_models(pipeline, model_names=['xgboost', 'neural_net'])

        # Charger le modèle SVM texte pré-entraîné
        from joblib import load
        import pandas as pd

        # Fonction pour prétraiter le texte comme pour l'entraînement
        def preprocess_text(df):
            """Prétraite les colonnes texte comme lors de l'entraînement"""
            if isinstance(df, pd.DataFrame):
                # Si c'est un DataFrame, fusionner designation et description
                df['text'] = df['designation'].fillna('') + " " + df['description'].fillna('')
                return df['text']
            else:
                # Si c'est déjà une série ou une liste de textes
                return df

        # Charger le modèle SVM
        try:
            text_model = load('data/models/SVM/model.pkl')
            print("Modèle SVM texte chargé avec succès")
            models['SVM'] = text_model
        except Exception as e:
            print(f"Erreur lors du chargement du modèle texte: {str(e)}")
            # Utiliser un modèle factice en cas d'erreur
            class DummySVM:
                def __init__(self, num_classes):
                    self.num_classes = num_classes
                    
                def predict(self, X):
                    preds = np.random.randint(0, self.num_classes, size=len(X))
                    probs = np.random.random((len(X), self.num_classes))
                    probs = probs / probs.sum(axis=1, keepdims=True)
                    return preds, probs
                        
                def predict_proba(self, X):
                    probs = np.random.random((len(X), self.num_classes))
                    return probs / probs.sum(axis=1, keepdims=True)
            
            text_model = DummySVM(num_classes=len(pipeline.category_names))
            models['SVM'] = text_model
            print("Utilisation d'un modèle de secours pour le texte")
        
        print("3. Préparer les données texte pour l'évaluation..")
        # 3. Préparer les données texte pour l'évaluation        
        # Obtenir les indices du split de test
        test_split_indices = pipeline.preprocessed_data['test_split_indices']
        # print("test_split_indices:",test_split_indices)

        # Lire les données d'origine
        X_train_df = pd.read_csv('data/X_train_update.csv', index_col=0)
        # print(f"X_train_df.index: {X_train_df.index}")
        
        # Créer un nouvel index pour X_train_df et un mappage vers les anciens indices
        X_train_df_reset = X_train_df.reset_index()
        index_map = dict(zip(X_train_df.index, range(len(X_train_df))))
        
        # Filtrer les indices valides ou utiliser un échantillon représentatif
        valid_indices = [idx for idx in test_split_indices if idx in X_train_df.index]
        
        if len(valid_indices) > 0:
            # Utiliser les indices valides
            X_test_split_df = X_train_df.loc[valid_indices]
            print(f"Utilisation de {len(valid_indices)}/{len(test_split_indices)} indices valides")
        else:
            # Utiliser un échantillon aléatoire de même taille
            sample_size = len(test_split_indices)
            random_indices = np.random.choice(len(X_train_df), size=sample_size, replace=False)
            X_test_split_df = X_train_df.iloc[random_indices]
            print(f"Utilisation d'un échantillon aléatoire de {sample_size} éléments")
        
        # Prétraiter le texte
        X_text = preprocess_text(X_test_split_df)
        
        print("4. Préparation des données image.. ")
        # 4. Préparation des données image
        X_image_raw = pipeline.preprocessed_data['X_test_split']
        
        # Extraire les features d'image correctement
        if isinstance(X_image_raw, np.ndarray) and X_image_raw.shape == ():
            # C'est un array 0-dimensionnel, extraire son contenu
            content = X_image_raw.item()
            if isinstance(content, dict) and 'features' in content:
                X_image = content['features']
            else:
                # Tentative avec d'autres clés possibles
                if isinstance(content, dict):
                    print(f"Clés disponibles: {list(content.keys())}")
                    possible_keys = ['features', 'X_test_split', 'data']
                    for key in possible_keys:
                        if key in content:
                            X_image = content[key]
                            break
                else:
                    # En dernier recours, utiliser un array aléatoire
                    X_image = np.random.random((len(X_text), 2048))
        else:
            X_image = X_image_raw
            
        print(f"Shape de X_image: {X_image.shape if hasattr(X_image, 'shape') else 'inconnu'}")
        
        # Assurer que X_text, X_image et y_test ont les mêmes dimensions
        y_test = pipeline.preprocessed_data['y_test_split']
        
        # Vérifier et ajuster les dimensions
        sample_sizes = [len(X_text)]
        if hasattr(X_image, 'shape'):
            sample_sizes.append(X_image.shape[0])
        sample_sizes.append(len(y_test))
        
        min_size = min(sample_sizes)
        X_text = X_text.iloc[:min_size] if isinstance(X_text, pd.Series) else X_text[:min_size]
        X_image = X_image[:min_size]
        y_test = y_test[:min_size]
        
        print(f"Dimensions finales: X_text={len(X_text)}, X_image={X_image.shape}, y_test={len(y_test)}")
        
        
        print("5. Évaluer les combinaisons multimodales")
        # 5. Évaluer les combinaisons multimodales        
        print(f"Shape de X_image_test: {X_image.shape if hasattr(X_image, 'shape') else 'pas de forme'}")
        print(f"Type interne: {X_image.dtype if hasattr(X_image, 'dtype') else 'pas de dtype'}")

        results_df = evaluate_multimodal_combinations(
            pipeline=pipeline,
            text_model=models['SVM'],
            image_models={'xgboost': models['xgboost'], 'neural_net': models['neural_net']},
            X_text_test=X_text,
            X_image_test=X_image,
            y_test=y_test,
            idx_to_category=pipeline.idx_to_category
        )
        
        # Sauvegarde des résultats
        results_df.to_csv('data/reports/multimodal_comparison_results.csv')
        
        # Identifier la meilleure combinaison
        best_combo = results_df['weighted_f1'].idxmax()
        print(f"\nLa meilleure combinaison est: {best_combo} avec F1 = {results_df.loc[best_combo, 'weighted_f1']:.4f}")
        
        print("6. Échantillonner des données pour explication..")
        # 6. Échantillonner des données pour explication
        X_text_sample, X_image_sample, y_sample = sample_data_for_explanation(
            X_text, X_image, y_test, n_samples=100
        )
        
        print("7. Générer des explications pour les modèles individuels..")
        # 7. Générer des explications pour les modèles individuels
        text_shap, text_explainer = explain_text_model(
            text_model=models['SVM'],
            X_text_sample=X_text_sample
        )
        
        xgboost_shap, xgb_explainer = explain_image_model(
            model_type='xgboost',
            model=models['xgboost'],
            X_image_sample=X_image_sample
        )
        
        mlp_shap, mlp_explainer = explain_image_model(
            model_type='neural_net',
            model=models['neural_net'],
            X_image_sample=X_image_sample
        )
        
        print("8. Expliquer le modèle multimodal (pour la meilleure combinaison)..")
        # 8. Expliquer le modèle multimodal (pour la meilleure combinaison)
        best_model_type = best_combo.split('_')[0]  # Extraire le type de modèle
        best_fusion = '_'.join(best_combo.split('_')[1:])  # Extraire la stratégie de fusion
        
        multimodal_results = explain_multimodal(
            text_model=models['SVM'],
            image_model=models[best_model_type],
            model_type=best_model_type,
            X_text_sample=X_text_sample,
            X_image_sample=X_image_sample,
            fusion_strategy=best_fusion,
            class_names=pipeline.category_names
        )
        
        print("9. Analyser les explications par classe..")
        # 9. Analyser les explications par classe
        class_results = analyze_class_specific_explanations(
            shap_values=multimodal_results['shap_values'],
            y_sample=y_sample,
            idx_to_category=pipeline.idx_to_category,
            category_names=pipeline.category_names
        )
        
        print("10. Comparer les explicabilités des différents modèles..")
        # 10. Comparer les explicabilités des différents modèles
        comparison_df = compare_models_explanations(
            text_shap=text_shap,
            xgboost_shap=xgboost_shap,
            mlp_shap=mlp_shap,
            multimodal_shap=multimodal_results['shap_values'],
            y_sample=y_sample,
            idx_to_category=pipeline.idx_to_category,
            category_names=pipeline.category_names
        )
        
        # Sauvegarde des résultats
        comparison_df.to_csv('data/reports/model_explanation_comparison.csv')
        
        print("11. Préparation rapport final..")
        # 11. Préparation rapport final
        print("\n" + "="*50)
        print("RAPPORT FINAL D'ANALYSE MULTIMODALE")
        print("="*50)
        
        print(f"\n1. PERFORMANCES DES MODÈLES:")
        print("-"*30)
        print(results_df.sort_values('weighted_f1', ascending=False))
        
        print(f"\n2. IMPORTANCE RELATIVE DES MODALITÉS:")
        print("-"*30)
        print(f"Texte: {multimodal_results['importance']['text_percentage']:.2f}%")
        print(f"Image: {multimodal_results['importance']['image_percentage']:.2f}%")
        
        print(f"\n3. MODALITÉ DOMINANTE PAR CLASSE:")
        print("-"*30)
        print(comparison_df[['category_name', 'dominant_modality', 'best_model']].sort_values('category_name'))
        
        print(f"\n4. RECOMMANDATION FINALE:")
        print("-"*30)
        if best_model_type == 'xgboost':
            print("Recommandation: Utiliser XGBoost pour le modèle image")
            print("Raisons:")
            print("- Meilleures performances multimodales")
            print("- Meilleure explicabilité (TreeExplainer natif)")
            print("- Plus rapide en inférence")
        else:
            print("Recommandation: Utiliser Neural Network pour le modèle image")
            print("Raisons:")
            print("- Meilleures performances multimodales")
            print("- Capable de capturer des motifs plus complexes")
        
        print("\nGraphiques d'analyse sauvegardés dans data/reports/")
        print("Rapports détaillés sauvegardés dans data/reports/")
        
    except Exception as e:
        print(f"Erreur lors de l'exécution: {str(e)}")
        import traceback
        traceback.print_exc()

  from .autonotebook import tqdm as notebook_tqdm


GPU disponible : Quadro M1200
CUDA Memory: 0.00GB / 0.00GB
1. Configuration et chargement des données..


2025-05-22 10:24:28,813 - classification_pipeline - INFO - Chargement des features pré-calculées...
2025-05-22 10:24:29,880 - classification_pipeline - INFO - Chargement effectué avec succès.
2025-05-22 10:24:30,843 - classification_pipeline - INFO - Indices sauvegardés dans data/indices
2025-05-22 10:24:30,845 - classification_pipeline - INFO - Distribution des indices:
2025-05-22 10:24:30,847 - classification_pipeline - INFO -   - Total: 84916
2025-05-22 10:24:30,849 - classification_pipeline - INFO -   - Test: 16984 (20.0%)
2025-05-22 10:24:30,850 - classification_pipeline - INFO -   - Train original: 67932 (80.0%)
2025-05-22 10:24:30,853 - classification_pipeline - INFO -   - Train balanced (unique): 44050 (51.9%)


Erreur lors de l'exécution: object of type 'int' has no len()


Traceback (most recent call last):
  File "/tmp/ipykernel_1469/4260016239.py", line 605, in <module>
    indices_dict = pipeline.extract_and_save_indices()
  File "/home/elion/Dev_IA/Rakuten/app/preprocess.py", line 2196, in extract_and_save_indices
    ({len(train_balanced_indices)/len(len(train_balanced_indices) - len(train_balanced_unique))*100:.1f}%)")
TypeError: object of type 'int' has no len()


In [None]:
# Après avoir exécuté prepare_data
# indices_dict = pipeline.extract_and_save_indices()

# Plus tard, pour charger et utiliser ces indices
test_indices = np.load('data/indices/test_split_indices.npy')
ignored_indices = np.load('data/indices/ignored_indices.npy')

# Pour des analyses ou tests spécifiques
ignored_samples = X_train_df.loc[ignored_indices]