In [1]:
import pandas as pd
import numpy as np
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import precision_score, recall_score, f1_score, roc_auc_score
import time

class RecommenderEvaluator:
    """
    Évaluateur pour les systèmes de recommandation
    Implémente plusieurs métriques d'évaluation courantes en recommandation
    """
    
    def __init__(self, model_path='content_based_model.pkl'):
        """
        Initialise l'évaluateur avec le modèle à évaluer
        
        Args:
            model_path: Chemin vers le fichier du modèle sauvegardé
        """
        self.model_path = model_path
        self.model_data = None
        self.test_data = None
        self.train_data = None
        self.recommendations = None
        
    def load_model(self):
        """
        Charge le modèle entraîné
        """
        print(f"Chargement du modèle depuis {self.model_path}...")
        with open(self.model_path, 'rb') as f:
            self.model_data = pickle.load(f)
            
        print("Modèle chargé!")
        
    def load_test_data(self):
        """
        Charge les données de test
        """
        print("Chargement des données de test...")
        self.test_data = pd.read_csv('interactions_test.csv')
        self.train_data = pd.read_csv('interactions_train.csv')
        print(f"Données de test chargées: {self.test_data.shape}")
        
    def generate_recommendations(self, top_n=10):
        """
        Génère des recommandations pour tous les utilisateurs dans les données de test
        
        Args:
            top_n: Nombre de recommandations par utilisateur
        """
        print(f"Génération des recommandations (top-{top_n})...")
        start_time = time.time()
        
        # Obtenir la liste des utilisateurs uniques dans les données de test
        test_users = self.test_data['user_id'].unique()
        
        all_recommendations = []
        
        # Pour chaque utilisateur de test
        for user_id in test_users:
            # Vérifier si l'utilisateur a un profil
            if user_id in self.model_data['user_profiles'].index:
                # Récupérer le profil de l'utilisateur
                user_profile = self.model_data['user_profiles'].loc[user_id].values.reshape(1, -1)
                
                # Calculer la similarité entre le profil et toutes les vidéos
                scores = np.dot(user_profile, self.model_data['content_features'].values.T)[0]
                
                # Créer un DataFrame avec les scores
                user_recs = pd.DataFrame({
                    'user_id': user_id,
                    'video_id': self.model_data['content_features'].index,
                    'score': scores
                })
                
                # Exclure les vidéos déjà vues dans l'ensemble d'entraînement
                seen_videos = self.train_data[self.train_data['user_id'] == user_id]['video_id'].values
                user_recs = user_recs[~user_recs['video_id'].isin(seen_videos)]
                
                # Trier par score et prendre les top_n
                user_recs = user_recs.sort_values('score', ascending=False).head(top_n)
                
                all_recommendations.append(user_recs)
        
        # Concaténer toutes les recommandations
        self.recommendations = pd.concat(all_recommendations, ignore_index=True)
        
        print(f"Recommandations générées: {self.recommendations.shape}")
        print(f"Temps d'exécution: {time.time() - start_time:.2f} secondes")
        
        # Sauvegarder les recommandations
        self.recommendations.to_csv('content_based_recommendations.csv', index=False)
    
    def calculate_precision_at_k(self, k=10):
        """
        Calcule la précision@k pour les recommandations
        
        Args:
            k: Nombre de recommandations à considérer
            
        Returns:
            Précision moyenne à k
        """
        print(f"Calcul de la précision@{k}...")
        
        precision_scores = []
        
        # Pour chaque utilisateur dans les recommandations
        for user_id in self.recommendations['user_id'].unique():
            # Obtenir les top-k recommandations pour cet utilisateur
            user_recs = self.recommendations[self.recommendations['user_id'] == user_id].head(k)
            recommended_items = set(user_recs['video_id'].values)
            
            # Obtenir les items pertinents (interactions positives) pour cet utilisateur dans le test
            relevant_items = set(self.test_data[(self.test_data['user_id'] == user_id) & 
                                               (self.test_data['positive_interaction'] == 1)]['video_id'].values)
            
            # Calculer la précision pour cet utilisateur
            if len(recommended_items) > 0:
                precision = len(recommended_items.intersection(relevant_items)) / len(recommended_items)
                precision_scores.append(precision)
        
        # Calculer la précision moyenne
        mean_precision = np.mean(precision_scores) if precision_scores else 0
        
        return mean_precision
    
    def calculate_recall_at_k(self, k=10):
        """
        Calcule le rappel@k pour les recommandations
        
        Args:
            k: Nombre de recommandations à considérer
            
        Returns:
            Rappel moyen à k
        """
        print(f"Calcul du rappel@{k}...")
        
        recall_scores = []
        
        # Pour chaque utilisateur dans les recommandations
        for user_id in self.recommendations['user_id'].unique():
            # Obtenir les top-k recommandations pour cet utilisateur
            user_recs = self.recommendations[self.recommendations['user_id'] == user_id].head(k)
            recommended_items = set(user_recs['video_id'].values)
            
            # Obtenir les items pertinents (interactions positives) pour cet utilisateur dans le test
            relevant_items = set(self.test_data[(self.test_data['user_id'] == user_id) & 
                                               (self.test_data['positive_interaction'] == 1)]['video_id'].values)
            
            # Calculer le rappel pour cet utilisateur
            if len(relevant_items) > 0:
                recall = len(recommended_items.intersection(relevant_items)) / len(relevant_items)
                recall_scores.append(recall)
        
        # Calculer le rappel moyen
        mean_recall = np.mean(recall_scores) if recall_scores else 0
        
        return mean_recall
    
    def calculate_ndcg_at_k(self, k=10):
        """
        Calcule le NDCG@k (Normalized Discounted Cumulative Gain) pour les recommandations
        
        Args:
            k: Nombre de recommandations à considérer
            
        Returns:
            NDCG moyen à k
        """
        print(f"Calcul du NDCG@{k}...")
        
        ndcg_scores = []
        
        # Pour chaque utilisateur dans les recommandations
        for user_id in self.recommendations['user_id'].unique():
            # Obtenir les top-k recommandations pour cet utilisateur
            user_recs = self.recommendations[self.recommendations['user_id'] == user_id].head(k)
            recommended_items = user_recs['video_id'].values
            
            # Obtenir les items pertinents (interactions positives) pour cet utilisateur dans le test
            relevant_items = set(self.test_data[(self.test_data['user_id'] == user_id) & 
                                               (self.test_data['positive_interaction'] == 1)]['video_id'].values)
            
            # Calculer le DCG
            dcg = 0
            for i, item in enumerate(recommended_items):
                rel = 1 if item in relevant_items else 0
                dcg += rel / np.log2(i + 2)  # i+2 car i commence à 0
            
            # Calculer l'IDCG (DCG idéal)
            idcg = 0
            for i in range(min(len(relevant_items), k)):
                idcg += 1 / np.log2(i + 2)
                
            # Calculer le NDCG
            ndcg = dcg / idcg if idcg > 0 else 0
            ndcg_scores.append(ndcg)
        
        # Calculer le NDCG moyen
        mean_ndcg = np.mean(ndcg_scores) if ndcg_scores else 0
        
        return mean_ndcg
    
    def calculate_map(self, k=10):
        """
        Calcule le MAP@k (Mean Average Precision) pour les recommandations
        
        Args:
            k: Nombre de recommandations à considérer
            
        Returns:
            MAP à k
        """
        print(f"Calcul du MAP@{k}...")
        
        ap_scores = []
        
        # Pour chaque utilisateur dans les recommandations
        for user_id in self.recommendations['user_id'].unique():
            # Obtenir les top-k recommandations pour cet utilisateur
            user_recs = self.recommendations[self.recommendations['user_id'] == user_id].head(k)
            recommended_items = user_recs['video_id'].values
            
            # Obtenir les items pertinents pour cet utilisateur dans le test
            relevant_items = set(self.test_data[(self.test_data['user_id'] == user_id) & 
                                               (self.test_data['positive_interaction'] == 1)]['video_id'].values)
            
            # Calculer l'AP (Average Precision)
            hits = 0
            sum_precisions = 0
            
            for i, item in enumerate(recommended_items):
                if item in relevant_items:
                    hits += 1
                    precision_at_i = hits / (i + 1)
                    sum_precisions += precision_at_i
            
            ap = sum_precisions / len(relevant_items) if len(relevant_items) > 0 else 0
            ap_scores.append(ap)
        
        # Calculer le MAP
        mean_ap = np.mean(ap_scores) if ap_scores else 0
        
        return mean_ap
    
    def calculate_coverage(self):
        """
        Calcule la couverture du catalogue - pourcentage des vidéos qui sont recommandées
        
        Returns:
            Pourcentage de couverture
        """
        print("Calcul de la couverture...")
        
        # Nombre total de vidéos disponibles
        all_videos = set(self.model_data['content_features'].index)
        
        # Vidéos recommandées
        recommended_videos = set(self.recommendations['video_id'].values)
        
        # Calculer la couverture
        coverage = len(recommended_videos) / len(all_videos) if len(all_videos) > 0 else 0
        
        return coverage * 100  # En pourcentage
    
    def evaluate(self, ks=[5, 10, 20]):
        """
        Évalue le modèle avec plusieurs métriques à différentes valeurs de k
        
        Args:
            ks: Liste des valeurs de k à évaluer
        """
        print("Évaluation du modèle...")
        
        # Charger le modèle et les données de test si ce n'est pas déjà fait
        if self.model_data is None:
            self.load_model()
        if self.test_data is None:
            self.load_test_data()
        
        # Générer les recommandations si ce n'est pas déjà fait
        if self.recommendations is None:
            self.generate_recommendations(top_n=max(ks))
        
        # Stocker les résultats
        results = {
            'k': [],
            'precision': [],
            'recall': [],
            'ndcg': [],
            'map': []
        }
        
        # Calculer les métriques pour chaque valeur de k
        for k in ks:
            results['k'].append(k)
            results['precision'].append(self.calculate_precision_at_k(k))
            results['recall'].append(self.calculate_recall_at_k(k))
            results['ndcg'].append(self.calculate_ndcg_at_k(k))
            results['map'].append(self.calculate_map(k))
        
        # Calculer la couverture
        coverage = self.calculate_coverage()
        
        # Afficher les résultats
        print("\n===== Résultats de l'évaluation =====")
        results_df = pd.DataFrame(results)
        print(results_df)
        print(f"Couverture du catalogue: {coverage:.2f}%")
        
        # Sauvegarder les résultats
        results_df.to_csv('evaluation_results.csv', index=False)
        
        # Visualiser les résultats
        self.plot_evaluation_results(results_df)
        
        return results_df, coverage
    
    def plot_evaluation_results(self, results_df):
        """
        Visualise les résultats de l'évaluation
        
        Args:
            results_df: DataFrame contenant les résultats
        """
        print("Création des visualisations...")
        
        # Configurer le style
        plt.style.use('ggplot')
        
        # Créer une figure avec plusieurs sous-graphiques
        fig, axes = plt.subplots(2, 2, figsize=(14, 10))
        
        # Précision@k
        axes[0, 0].plot(results_df['k'], results_df['precision'], 'o-', color='blue')
        axes[0, 0].set_title('Précision@k')
        axes[0, 0].set_xlabel('k')
        axes[0, 0].set_ylabel('Précision')
        
        # Rappel@k
        axes[0, 1].plot(results_df['k'], results_df['recall'], 'o-', color='green')
        axes[0, 1].set_title('Rappel@k')
        axes[0, 1].set_xlabel('k')
        axes[0, 1].set_ylabel('Rappel')
        
        # NDCG@k
        axes[1, 0].plot(results_df['k'], results_df['ndcg'], 'o-', color='red')
        axes[1, 0].set_title('NDCG@k')
        axes[1, 0].set_xlabel('k')
        axes[1, 0].set_ylabel('NDCG')
        
        # MAP@k
        axes[1, 1].plot(results_df['k'], results_df['map'], 'o-', color='purple')
        axes[1, 1].set_title('MAP@k')
        axes[1, 1].set_xlabel('k')
        axes[1, 1].set_ylabel('MAP')
        
        plt.tight_layout()
        plt.savefig('evaluation_metrics.png')
        plt.close()
        
        print("Visualisations sauvegardées!")

    def analyze_recommendations(self):
        """
        Analyse plus approfondie des recommandations générées
        """
        print("Analyse des recommandations...")
        
        if self.recommendations is None:
            print("Aucune recommandation disponible. Veuillez d'abord générer des recommandations.")
            return
        
        # 1. Distribution des scores de recommandation
        plt.figure(figsize=(10, 6))
        sns.histplot(self.recommendations['score'], bins=30, kde=True)
        plt.title('Distribution des scores de recommandation')
        plt.xlabel('Score')
        plt.ylabel('Fréquence')
        plt.savefig('recommendation_scores_distribution.png')
        plt.close()
        
        # 2. Top vidéos les plus recommandées
        top_videos = self.recommendations['video_id'].value_counts().head(20)
        plt.figure(figsize=(12, 8))
        sns.barplot(x=top_videos.index.astype(str), y=top_videos.values)
        plt.title('Top 20 des vidéos les plus recommandées')
        plt.xlabel('ID Vidéo')
        plt.ylabel('Nombre de recommandations')
        plt.xticks(rotation=90)
        plt.tight_layout()
        plt.savefig('top_recommended_videos.png')
        plt.close()
        
        # 3. Calculer la diversité des recommandations par utilisateur
        user_diversity = []
        
        for user_id in self.recommendations['user_id'].unique():
            user_recs = self.recommendations[self.recommendations['user_id'] == user_id]
            
            # Si les caractéristiques des vidéos sont disponibles, on peut calculer une diversité plus précise
            if 'content_features' in self.model_data:
                # Extraire les caractéristiques des vidéos recommandées
                video_features = self.model_data['content_features'].loc[user_recs['video_id']]
                
                # Calculer la diversité comme la variance moyenne des caractéristiques
                diversity = video_features.var(axis=0).mean()
                user_diversity.append(diversity)
        
        if user_diversity:
            plt.figure(figsize=(10, 6))
            sns.histplot(user_diversity, bins=20, kde=True)
            plt.title('Diversité des recommandations par utilisateur')
            plt.xlabel('Diversité')
            plt.ylabel('Fréquence')
            plt.savefig('recommendation_diversity.png')
            plt.close()
        
        print("Analyse terminée. Les visualisations ont été sauvegardées.")


In [2]:
if __name__ == "__main__":
    evaluator = RecommenderEvaluator()
    evaluator.load_model()
    evaluator.load_test_data()
    evaluator.generate_recommendations(top_n=10)
    results, coverage = evaluator.evaluate(ks=[5, 10, 20])
    evaluator.analyze_recommendations()
    print("Évaluation et analyse terminées.")

Chargement du modèle depuis content_based_model.pkl...
Modèle chargé!
Chargement des données de test...


  self.test_data = pd.read_csv('interactions_test.csv')


Données de test chargées: (935314, 9)
Génération des recommandations (top-10)...
Recommandations générées: (14110, 3)
Temps d'exécution: 14.52 secondes
Évaluation du modèle...
Calcul de la précision@5...
Calcul du rappel@5...
Calcul du NDCG@5...
Calcul du MAP@5...
Calcul de la précision@10...
Calcul du rappel@10...
Calcul du NDCG@10...
Calcul du MAP@10...
Calcul de la précision@20...
Calcul du rappel@20...
Calcul du NDCG@20...
Calcul du MAP@20...
Calcul de la couverture...

===== Résultats de l'évaluation =====
    k  precision    recall      ndcg       map
0   5   0.344862  0.003803  0.332170  0.002320
1  10   0.362580  0.008021  0.348884  0.004213
2  20   0.362580  0.008021  0.225158  0.004213
Couverture du catalogue: 6.15%
Création des visualisations...
Visualisations sauvegardées!
Analyse des recommandations...
Analyse terminée. Les visualisations ont été sauvegardées.
Évaluation et analyse terminées.
