In [None]:
# reloads modules automatically before entering the execution of code
%load_ext autoreload
%autoreload 2

# third parties imports
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from surprise.model_selection import train_test_split, LeaveOneOut
from surprise import Dataset, Reader, accuracy
from sklearn.metrics import jaccard_score
import itertools

# local imports
from configs import EvalConfig
from constants import Constant as C
from loaders import load_ratings, export_evaluation_report
from models import get_top_n, ContentBased


In [None]:
def load_ratings(surprise_format=False):
    df_ratings = pd.read_csv(C.EVIDENCE_PATH / C.RATINGS_FILENAME)
    if surprise_format:
        reader = Reader(rating_scale=C.RATINGS_SCALE)
        data = Dataset.load_from_df(df_ratings[['userId', 'movieId', 'rating']], reader)
        return data
    else:
        return df_ratings

def generate_split_predictions(algo, ratings_dataset, eval_config):
    """Generate predictions on a random test set specified in eval_config"""
    # Split the data into training and testing sets based on the proportion defined in eval_config
    trainset, testset = train_test_split(ratings_dataset, test_size=eval_config.test_size)

    # Train the recommendation algorithm on the training set
    algo.fit(trainset)

    # Generate predictions on the test set
    predictions = algo.test(testset)
    return predictions

def generate_loo_top_n(algo, ratings_dataset, eval_config):
    """Generate top-n recommendations for each user on a random Leave-one-out split (LOO)"""
    loo = LeaveOneOut()
    top_n_recommendations = None
    testset = None

    for trainset, testset in loo.split(ratings_dataset):
        # Fit the algorithm on the trainset
        algo.fit(trainset)

        # Test the algorithm on the single excluded rating
        predictions = algo.test(testset)

        # Build the anti-testset for all users in the training set
        anti_testset = trainset.build_anti_testset()

        # Test the algorithm on the anti-testset
        all_predictions = algo.test(anti_testset)

        # Get the top-N recommendations from the predictions
        top_n_recommendations = get_top_n(all_predictions, n=eval_config.top_n_value)
        
        # Break after the first split, since LOO method is usually executed one split at a time for evaluation
        break

    return top_n_recommendations, testset

def generate_full_top_n(algo, ratings_dataset, eval_config):
    """Generate top-n recommendations for each user with full training set (LOO)"""
    # Build the full training set from the dataset
    full_trainset = ratings_dataset.build_full_trainset()

    # Fit the algorithm on the full training set
    algo.fit(full_trainset)

    # Build the anti-testset from the full training set
    anti_testset = full_trainset.build_anti_testset()

    # Test the algorithm on the anti-testset
    all_predictions = algo.test(anti_testset)

    # Get the top-N recommendations from the predictions
    anti_testset_top_n = get_top_n(all_predictions, n=eval_config.top_n_value)

    return anti_testset_top_n

def precompute_information():
    """ Returns a dictionary that precomputes relevant information for evaluating in full mode
    
    Dictionary keys:
    - precomputed_dict["item_to_rank"] : contains a dictionary mapping movie ids to rankings
    - (-- for your project, add other relevant information here -- )
    """
    # Count the number of ratings by movieId and sort in descending order of popularity
    ratings = load_ratings()
    item_counts = ratings['movieId'].value_counts().sort_values(ascending=False)
    item_to_rank = {movie: idx + 1 for idx, movie in enumerate(item_counts.index)}
    
    return {'item_to_rank': item_to_rank}

def create_evaluation_report(eval_config, sp_ratings, precomputed_dict, available_metrics):
    evaluation_dict = {}
    for model_name, model, arguments in eval_config.models:
        print(f'Handling model {model_name}')
        algo = model(**arguments)
        evaluation_dict[model_name] = {}
        
        if len(eval_config.split_metrics) > 0:
            print('Training split predictions')
            predictions = generate_split_predictions(algo, sp_ratings, eval_config)
            for metric in eval_config.split_metrics:
                print(f'- computing metric {metric}')
                assert metric in available_metrics['split']
                evaluation_function, parameters = available_metrics["split"][metric]
                evaluation_dict[model_name][metric] = evaluation_function(predictions, **parameters) 

        if len(eval_config.loo_metrics) > 0:
            print('Training loo predictions')
            anti_testset_top_n, testset = generate_loo_top_n(algo, sp_ratings, eval_config)
            for metric in eval_config.loo_metrics:
                assert metric in available_metrics['loo']
                evaluation_function, parameters = available_metrics["loo"][metric]
                evaluation_dict[model_name][metric] = evaluation_function(anti_testset_top_n, testset, **parameters)
        
        if len(eval_config.full_metrics) > 0:
            print('Training full predictions')
            anti_testset_top_n = generate_full_top_n(algo, sp_ratings, eval_config)
            for metric in eval_config.full_metrics:
                print(f'- computing metric {metric}')
                assert metric in available_metrics['full']
                evaluation_function, parameters = available_metrics["full"][metric]
                if metric == 'diversity':
                    evaluation_dict[model_name][metric] = evaluation_function(anti_testset_top_n)
                elif metric == 'precision':
                    evaluation_dict[model_name][metric] = evaluation_function(anti_testset_top_n, testset)
                else:
                    evaluation_dict[model_name][metric] = evaluation_function(
                        anti_testset_top_n,
                        **precomputed_dict,
                        **parameters
                    )
        
    return pd.DataFrame.from_dict(evaluation_dict).T


In [None]:

def get_hit_rate(anti_testset_top_n, testset):
    """Compute the average hit over the users (loo metric)
    
    A hit (1) happens when the movie in the testset has been picked by the top-n recommender
    A fail (0) happens when the movie in the testset has not been picked by the top-n recommender
    """
    hits = 0
    total = len(testset)  

    for user_id, movie_id, _ in testset:
        top_n_recommendations = anti_testset_top_n.get(user_id, [])
        if movie_id in [recommended_movie[0] for recommended_movie in top_n_recommendations]:
            hits += 1 

    hit_rate = hits / total if total > 0 else 0

    return hit_rate

def get_novelty(anti_testset_top_n, item_to_rank):
    total_rank = 0
    num_entries = 0
    max_rank = len(item_to_rank)
    for user_recommendations in anti_testset_top_n.values():
        for movie_id, _ in user_recommendations:
            total_rank += item_to_rank.get(movie_id, max_rank + 1)
            num_entries += 1
    average_rank_sum = total_rank / num_entries if num_entries > 0 else 0
    normalized_novelty = average_rank_sum / max_rank  # Normalization step
    return normalized_novelty

def calculate_diversity(top_n_recommendations):
    all_recommendations = list(top_n_recommendations.values())
    total_pairs = 0
    total_diversity = 0
    for list1, list2 in itertools.combinations(all_recommendations, 2):
        items1 = set([iid for iid, _ in list1])
        items2 = set([iid for iid, _ in list2])
        jaccard_dist = 1 - len(items1 & items2) / len(items1 | items2)
        total_diversity += jaccard_dist
        total_pairs += 1
    return total_diversity / total_pairs if total_pairs > 0 else 0

def calculate_precision(top_n_recommendations, testset):
    total_relevant = 0
    total_recommended = 0
    for uid, user_ratings in top_n_recommendations.items():
        relevant_items = {iid for (uid, iid, true_r) in testset if true_r >= 4}
        recommended_items = {iid for (iid, est) in user_ratings}
        total_relevant += len(recommended_items & relevant_items)
        total_recommended += len(recommended_items)
    return total_relevant / total_recommended if total_recommended > 0 else 0



In [None]:
AVAILABLE_METRICS = {
    "split": {
        "mae": (accuracy.mae, {'verbose': False}),
        "rmse": (accuracy.rmse, {'verbose': False})
    },
    "loo": {
        "hit_rate": (get_hit_rate, {})
    },
    "full": {
        "novelty": (get_novelty, {}),
        "diversity": (calculate_diversity, {}),
        "precision": (calculate_precision, {})
    }
}

# Function to evaluate ContentBased class with different feature methods and regression methods
def evaluate_content_based():
    feature_methods = [
        'year_of_release', 'average_rating', 'rating_count', 'genre_tf_idf',
          'tag', 'synopsis', 'previous_apparitions', 'actors_tfidf',
        'director_tfidf', 'production_countries_tfidf', 'budget',
        'original_language_tfidf', 'production_companies_tfidf', 'translations_tfidf'
    ]
    
    regressor_methods = [
        'linear', 'gradient_boosting', 'neural_network',
        'decision_tree','random_forest'
    ]
    
    results = {metric: [] for metric in AVAILABLE_METRICS['split'].keys()}
    results.update({metric: [] for metric in AVAILABLE_METRICS['loo'].keys()})
    results.update({metric: [] for metric in AVAILABLE_METRICS['full'].keys()})
    
    for regressor in regressor_methods:
        for feature in feature_methods:
            print(f"Evaluating: Regressor: {regressor}, Feature: {feature}")
            config = EvalConfig()
            config.models = [('ContentBased', ContentBased, {'features_method': [feature], 'regressor_method': regressor})]
            sp_ratings = load_ratings(surprise_format=True)
            precomputed_dict = precompute_information()
            evaluation_report = create_evaluation_report(config, sp_ratings, precomputed_dict, AVAILABLE_METRICS)
            for metric in results.keys():
                results[metric].append({
                    'regressor': regressor,
                    'feature': feature,
                    metric: evaluation_report[metric].loc['ContentBased']
                })
    
    return results

# Run evaluation
results = evaluate_content_based()

# Convert results to DataFrame and save to CSV for each metric
for metric, result in results.items():
    df = pd.DataFrame(result)
    df.to_csv(f'content_based_evaluation_{metric}.csv', index=False)
    print(f"Saved results for {metric} to content_based_evaluation_{metric}.csv")