# Full LLM Recommender Evaluation with Auto-Save and Advanced Metrics
*Generated on 2025-05-06 09:29 UTC*

Includes:
- Accuracy metrics: Precision, Recall, NDCG, AP@10, RR, Hit@k
- Bias: Log-popularity diff, Long-tail coverage, Average popularity rank
- Fairness metrics: Jaccard@10, PRAG@10, SNSR, SNSV
- Auto-saving plots to `analysisresults/`

In [None]:

from contextlib import contextmanager
from pathlib import Path
import matplotlib.pyplot as plt

@contextmanager
def autosave_fig(name, folder="analysisresults", dpi=300, fmt="png"):
    path = Path(folder)
    path.mkdir(parents=True, exist_ok=True)
    try:
        yield
        plt.tight_layout()
        plt.savefig(path / f"{name}.{fmt}", dpi=dpi)
    finally:
        plt.show()


In [None]:

import re

STRATEGY_OPTS = ["random", "top-rated", "recent"]
FAIRNESS_OPTS = ["neutral", "gender_age_only", "occupation_only", "all_attributes"]
BIAS_OPTS = ["baseline", "niche_genre", "exclude_popular", "indie_international", "temporal_diverse", "obscure_theme"]

PROMPT_RE = re.compile(
    r'^(?P<user>\d+)'
    r'_(?P<strategy>' + "|".join(STRATEGY_OPTS) + r')'
    r'_(?P<fairness>' + "|".join(FAIRNESS_OPTS) + r')'
    r'_(?P<bias>' + "|".join(BIAS_OPTS) + r')$'
)

def parse_prompt_id(pid):
    m = PROMPT_RE.match(pid)
    if not m:
        raise ValueError(f"Bad prompt_id: {pid}")
    d = m.groupdict()
    d["user"] = int(d["user"])
    return d


In [None]:

import pandas as pd, numpy as np, math, collections, itertools
import matplotlib.pyplot as plt, seaborn as sns
from rapidfuzz import process, fuzz
from tqdm.auto import tqdm

USER_DATA_PATH = '../Dataset/full_movies_data.csv'
RECS_PATH      = '../LlmCsvOutput/merged_prompt_results_top_rated.csv'

user_df = pd.read_csv(USER_DATA_PATH)
rec_df = pd.read_csv(RECS_PATH).rename(columns={'custom_id':'prompt_id'})
rec_df['movies'] = rec_df['movies'].str.split(',').apply(lambda lst: [m.strip() for m in lst])
rec_df = pd.concat([rec_df['prompt_id'].apply(parse_prompt_id).apply(pd.Series), rec_df], axis=1)

popularity_counts = user_df['Title'].value_counts()
title_to_rank = {t:r for r, t in enumerate(popularity_counts.index, 1)}
user_likes = user_df[user_df['Rating'] > 2].groupby('UserID')['Title'].apply(set).to_dict()


In [None]:
from functools import lru_cache

# Optional: switch fuzzy match off for speed if titles are standardized
USE_FUZZY_MATCH = True

@lru_cache(maxsize=None)
def fuzzy_match(rec, liked_tuple):
    liked = set(liked_tuple)
    if not liked:
        return False
    if USE_FUZZY_MATCH:
        return process.extractOne(rec, liked, scorer=fuzz.token_sort_ratio, score_cutoff=80) is not None
    return rec in liked

def fuzzy_hits(recs, liked, cutoff=55):
    liked_tuple = tuple(sorted(liked))  # hashable for caching
    return sum(1 for rec in recs if fuzzy_match(rec, liked_tuple))


In [None]:
'''
# Global cutoff threshold for fuzzy matching
CUTOFF = 55  # You can change this value as needed

def fuzzy_hits(recs, liked, cutoff=CUTOFF):
    return sum(1 for rec in recs if process.extractOne(rec, liked, scorer=fuzz.token_sort_ratio, score_cutoff=cutoff))

'''

CUTOFF = 55

def precision_at_k(recs, liked, k, cutoff=CUTOFF): 
    return fuzzy_hits(recs[:k], liked, cutoff) / k if k else 0

def recall_at_k(recs, liked, k, cutoff=CUTOFF): 
    return fuzzy_hits(recs[:k], liked, cutoff) / len(liked) if liked else 0

def ndcg_at_k(recs, liked, k):
    dcg = sum((1 / math.log2(i+2)) for i, itm in enumerate(recs[:k]) if itm in liked)
    ideal = sum(1 / math.log2(i+2) for i in range(min(k, len(liked))))
    return dcg / ideal if ideal else 0

def apk(recs, liked, k):
    score = hits = 0
    for i, itm in enumerate(recs[:k], 1):
        if itm in liked:
            hits += 1
            score += hits / i
    return score / min(len(liked), k) if liked else 0

def rr(recs, liked):
    for i, itm in enumerate(recs, 1):
        if itm in liked:
            return 1 / i
    return 0

def hit_rate_at_k(recs, liked, k, cutoff=CUTOFF):
    top_k = recs[:k]
    return 1 if any(process.extractOne(r, liked, scorer=fuzz.token_sort_ratio, score_cutoff=cutoff) for r in top_k) else 0

def avg_pop_rank(recs): 
    return np.mean([title_to_rank.get(m, len(title_to_rank)+1) for m in recs]) if recs else np.nan

def log_pop_diff(recs, hist):
    rec_pop  = [math.log(popularity_counts.get(m, 1)) for m in recs]
    hist_pop = [math.log(popularity_counts.get(m, 1)) for m in hist]
    return np.mean(rec_pop) - np.mean(hist_pop) if rec_pop and hist_pop else 0

def is_long_tail(title, cutoff=int(len(title_to_rank)*0.8)):
    return title_to_rank.get(title, cutoff+1) > cutoff

def jaccard_at_k(l1, l2, k):
    A, B = set(l1[:k]), set(l2[:k])
    return len(A & B) / len(A | B) if A | B else 0

def prag_at_k(l1, l2, k):
    r1, r2 = {v: i for i, v in enumerate(l1[:k])}, {v: i for i, v in enumerate(l2[:k])}
    U = set(r1) | set(r2)
    agree = total = 0
    for i, j in itertools.combinations(U, 2):
        d1, d2 = r1.get(i, k+1) - r1.get(j, k+1), r2.get(i, k+1) - r2.get(j, k+1)
        if (d1 == d2 == 0) or (d1 > 0 and d2 > 0) or (d1 < 0 and d2 < 0):
            agree += 1
        total += 1
    return agree / total if total else 0


In [None]:
from tqdm import tqdm

# Step 1: Precompute user history cache (vectorized)
user_hist_cache = user_df.groupby('UserID')['Title'].apply(list).to_dict()

# Step 2: Precompute user likes if not already a dict
# Assuming user_likes = {uid: set(liked_titles)}, if not:
# user_likes = user_df.groupby('UserID')['Title'].apply(set).to_dict()

records = []

# Step 3: Iterate efficiently through rows
for row in tqdm(rec_df.itertuples(index=False), total=len(rec_df)):
    uid = row.user
    hist = user_hist_cache.get(uid, [])
    liked = user_likes.get(uid, set())
    movies = row.movies

    # Avoid computing if 'movies' is empty
    lt_coverage = sum(is_long_tail(t) for t in movies)/len(movies) if movies else np.nan

    # Cache any reused function results if applicable
    records.append({
        "prompt_id": row.prompt_id,
        "user": uid,
        "strategy": row.strategy,
        "fairness": row.fairness,
        "bias": row.bias,
        "precision@10": precision_at_k(movies, liked, 10),
        "recall@10": recall_at_k(movies, liked, 10),
        "ndcg@10": ndcg_at_k(movies, liked, 10),
        "ap@10": apk(movies, liked, 10),
        "rr": rr(movies, liked),
        "hit_rate@5": hit_rate_at_k(movies, liked, 5),
        "hit_rate@10": hit_rate_at_k(movies, liked, 10),
        "avg_pop_rank": avg_pop_rank(movies),
        "log_pop_diff": log_pop_diff(movies, hist),
        "lt_coverage_pct": lt_coverage,
        "jaccard@10": jaccard_at_k(movies, hist, 10),
        "prag@10": prag_at_k(movies, hist, 10),
    })

# Step 4: Convert to DataFrame
metrics_df = pd.DataFrame.from_records(records)


In [None]:

# ---- Visualising new metrics ----
bias_order = ["baseline","niche_genre","exclude_popular",
              "indie_international","temporal_diverse","obscure_theme"]

metrics_to_plot = [
    ("hit_rate@5",    "Hit Rate@5"),
    ("hit_rate@10",   "Hit Rate@10"),
    ("jaccard@10",    "Jaccard@10"),
    ("prag@10",       "PRAG@10")
]

for col, label in metrics_to_plot:
    with autosave_fig(f"{col}_by_fairness_lines"):
        plt.figure(figsize=(9,5))
        pivot = (
            metrics_df
            .groupby(['fairness','bias'])[col]
            .mean()
            .unstack('bias')
            .reindex(columns=bias_order)
        )
        for fairness, yvals in pivot.iterrows():
            plt.plot(bias_order, yvals, marker='o', label=fairness)
        plt.title(f"{label} across bias strategies by fairness group")
        plt.xlabel("Bias strategy"); plt.ylabel(label)
        plt.xticks(rotation=45); plt.grid(True, alpha=0.3); plt.legend()


In [None]:

# --- SNSR and SNSV (group-level fairness dispersion) ---
def snsr(series): return series.max() - series.min() if not series.empty else np.nan
def snsv(series): return series.var() if not series.empty else np.nan

for metric in ["precision@10","hit_rate@10","jaccard@10"]:
    summary = metrics_df.groupby(['fairness','bias'])[metric].mean().unstack('fairness')
    snsr_vals = summary.apply(snsr, axis=1)
    snsv_vals = summary.apply(snsv, axis=1)

    with autosave_fig(f"{metric}_snsr"):
        snsr_vals.plot(kind='bar', title=f"SNSR – {metric}", ylabel='SNSR')
        plt.xticks(rotation=45)

    with autosave_fig(f"{metric}_snsv"):
        snsv_vals.plot(kind='bar', title=f"SNSV – {metric}", ylabel='SNSV')
        plt.xticks(rotation=45)
