In [40]:
from typing import Mapping, Sequence

import numpy as np
import pandas as pd
import polars as pl

K = 10
ndcg_weights = 1.0 / np.log2(np.arange(0, K) + 2)
ndcg_idcg = ndcg_weights.cumsum()


def evaluate(ref_path, pred_path, train_path):

    submission = pl.read_parquet(pred_path)
    ref_df = pl.read_parquet(ref_path)
    train = pl.read_parquet(train_path)

    submission = (
        submission
        .select(
            pl.col("user_id").cast(pl.Int64),
            pl.col("item_id").cast(pl.List(pl.Int64)).alias("predicted"),
        )
        .unique(subset="user_id")
        .with_columns(
            pl.col("predicted").list.unique(maintain_order=True)
        )
    )

    ground_truth = ref_df.with_columns(pl.col("item_id").alias("ground_truth"))

    submission_with_gt = ground_truth.join(submission, on="user_id", how="left")

    metrics_per_user = submission_with_gt.select(
        pl.col("user_id"),
        pl.struct("predicted", "ground_truth").apply(ndcg_per_user).alias("ndcg"),
    )
    mean_ndcg = metrics_per_user.select(pl.col("ndcg").mean())["ndcg"][0]
    
    metrics_per_user = submission_with_gt.select(
        pl.col("user_id"),
        pl.struct("predicted", "ground_truth").apply(hitrate_per_user).alias("hitrate"),
    )
    mean_hitrate = metrics_per_user.select(pl.col("hitrate").mean())["hitrate"][0]
    
    coverage = compute_coverage(submission, train)
    novelty = compute_novelty(submission, train)

    return {'ndcg': mean_ndcg,
            'hitrate': mean_hitrate,
            'coverage': coverage,
            'novelty': novelty}


def ndcg_per_user(pl_struct: Mapping[str, Sequence[int]]) -> float:

    predicted = pl_struct["predicted"]
    ground_truth = pl_struct["ground_truth"]

    if predicted is None:
        return 0.0

    assert ground_truth is not None
    assert len(ground_truth) > 0

    predicted_np = np.array(predicted[:K])
    ground_truth_np = np.array(ground_truth)

    predicted_count = min(len(predicted_np), K)
    gt_count = min(len(ground_truth_np), K)

    hits = (predicted_np.reshape(-1, 1) == ground_truth_np.reshape(1, -1)).sum(axis=1)
    dcg = (hits * ndcg_weights[:predicted_count]).sum()
    idcg = ndcg_idcg[gt_count - 1]
    ndcg = dcg / idcg
    return ndcg


def hitrate_per_user(pl_struct: Mapping[str, Sequence[int]]) -> float:

    predicted = pl_struct["predicted"]
    ground_truth = pl_struct["ground_truth"]

    if predicted is None:
        return 0.0

    assert ground_truth is not None
    assert len(ground_truth) > 0

    predicted_np = np.array(predicted[:K])
    ground_truth_np = np.array(ground_truth)

    hitrate = int(len(np.intersect1d(predicted_np, ground_truth_np)) > 0)

    return hitrate


def compute_coverage(submission, train):
    
    list_of_lists = submission.select('predicted').to_series().to_list()
    all_pred_items = [x for xs in list_of_lists for x in xs]
    all_pred_items = set(all_pred_items)
    
    all_train_items = train.select('item_id').unique().to_series().to_list()
    
    coverage = len(all_pred_items.intersection(all_train_items)) / len(all_train_items)
    
    return coverage


def compute_novelty(submission, train):
    
    num_interactions = len(train)
    item_stats = train.groupby('item_id').count()
    item_stats = item_stats.with_columns(-np.log2(pl.col('count') / num_interactions).alias('item_novelty'))
    item_stats = item_stats.with_columns((pl.col('item_novelty') / np.log2(num_interactions)))
    item_stats = item_stats.select('item_id', 'item_novelty').to_pandas()
    
    list_of_lists = submission.select('predicted').to_series().to_list()
    all_pred_items = [x for xs in list_of_lists for x in xs]
    num_recommendations = len(all_pred_items)
    
    recs_items = pd.Series(all_pred_items).value_counts().reset_index()
    recs_items.columns = ['item_id', 'item_count']
    recs_items = pd.merge(recs_items, item_stats)
    recs_items['product'] = recs_items['item_count'] * recs_items['item_novelty']

    novelty = recs_items['product'].sum() / num_recommendations

    return novelty

In [45]:
prediction_path = "predict.parquet"
ground_truth_path = "test_gt.parquet"
train_path = "train.parquet"

In [None]:
evaluate(ground_truth_path, prediction_path, train_path)