In [None]:
!conda install -c conda-forge scikit-surprise -y

In [4]:
!pip install tqdm

Collecting tqdm
  Downloading tqdm-4.67.3-py3-none-any.whl.metadata (57 kB)
Downloading tqdm-4.67.3-py3-none-any.whl (78 kB)
Installing collected packages: tqdm
Successfully installed tqdm-4.67.3


In [5]:
import json
import gzip
from collections import defaultdict
from typing import Dict, List, Set
import random
import numpy as np
import pandas as pd
import math
from tqdm import tqdm
from surprise import Dataset, Reader, SVD

In [6]:
# ---------------------------------------
# Utility Loader
# ---------------------------------------

def load_json(path: str):
    with gzip.open(path, "rt", encoding="utf-8") as f:
        return json.load(f)

In [7]:
class ExplicitMFRecommender():
    """
    Explicit Matrix Factorization using SVD (via Surprise).
    Train-only ratings.
    """

    def __init__(self, train_user_book_rating: Dict[str, Dict[str, float]]):
        """
        Input format:
            train_user_book_rating[user][book] = rating
        """
        self.train_data = train_user_book_rating

    def fit(self):
        print("Training Explicit MF (SVD)...")

        records = []
        for user, book_dict in self.train_data.items():
            for book, rating in book_dict.items():
                records.append([user, book, float(rating)])

        reader = Reader(rating_scale=(1, 5))
        self.data = Dataset.load_from_df(
            pd.DataFrame(records, columns=["user", "item", "rating"]),
            reader,
        )

        trainset = self.data.build_full_trainset()

        self.model = SVD(random_state=42)
        self.model.fit(trainset)

        self.all_items = set({b for u in self.train_data for b in self.train_data[u]})

    def recommend(self, user_id: str, k: int = 10, n_candidates: int = 1000):
        if user_id not in self.train_data:
            return []
    
        seen = set(self.train_data[user_id].keys())
    
        # Sample candidates instead of full catalog
        unseen_items = list(self.all_items - seen)
    
        if len(unseen_items) > n_candidates:
            candidates = random.sample(unseen_items, n_candidates)
        else:
            candidates = unseen_items
    
        scores = []
        for book in candidates:
            est = self.model.predict(user_id, book).est
            scores.append((book, est))
    
        scores.sort(key=lambda x: x[1], reverse=True)
        return [b for b, _ in scores[:k]]

In [8]:
class RankingEvaluator:
    """
    Evaluates ranking-based recommendation metrics.
    Computes: Hit@K, MRR@K, NDCG@K for various K values.
    """
    
    def __init__(self):
        pass
    
    def hit_at_k(self, predictions: List[str], ground_truth: Set[str], k: int) -> float:
        """
        Hit@K: 1 if at least one relevant item is in top-K, else 0
        """
        top_k = predictions[:k]
        return 1.0 if any(item in ground_truth for item in top_k) else 0.0
    
    def mrr_at_k(self, predictions: List[str], ground_truth: Set[str], k: int) -> float:
        """
        MRR@K: Reciprocal rank of first relevant item in top-K
        """
        top_k = predictions[:k]
        for rank, item in enumerate(top_k, start=1):
            if item in ground_truth:
                return 1.0 / rank
        return 0.0
    
    def dcg_at_k(self, predictions: List[str], ground_truth: Set[str], k: int) -> float:
        """
        DCG@K: Discounted Cumulative Gain
        """
        dcg = 0.0
        top_k = predictions[:k]
        for rank, item in enumerate(top_k, start=1):
            if item in ground_truth:
                dcg += 1.0 / np.log2(rank + 1)
        return dcg
    
    def idcg_at_k(self, ground_truth: Set[str], k: int) -> float:
        """
        IDCG@K: Ideal DCG (best possible DCG)
        """
        ideal_k = min(len(ground_truth), k)
        idcg = sum(1.0 / np.log2(rank + 1) for rank in range(1, ideal_k + 1))
        return idcg
    
    def ndcg_at_k(self, predictions: List[str], ground_truth: Set[str], k: int) -> float:
        """
        NDCG@K: Normalized Discounted Cumulative Gain
        """
        dcg = self.dcg_at_k(predictions, ground_truth, k)
        idcg = self.idcg_at_k(ground_truth, k)
        
        if idcg == 0.0:
            return 0.0
        
        return dcg / idcg
    
    def evaluate(self, predictions: Dict[str, List[str]], ground_truth: Dict[str, Set[str]]) -> Dict[str, float]:
        """
        Evaluate predictions against ground truth.
        
        Args:
            predictions: Dict mapping user_id -> list of recommended item_ids (ranked)
            ground_truth: Dict mapping user_id -> set of relevant item_ids
            
        Returns:
            Dict of metric_name -> average_score
        """
        metrics = {
            'Hit@5': [],
            'Hit@10': [],
            'Hit@50': [],
            'MRR@10': [],
            'NDCG@10': [],
            'NDCG@50': []
        }
        
        # Only evaluate users present in both predictions and ground_truth
        common_users = set(predictions.keys()) & set(ground_truth.keys())
        
        for user_id in common_users:
            preds = predictions[user_id]
            gt = ground_truth[user_id]
            
            # Skip users with no ground truth items
            if len(gt) == 0:
                continue
            
            # Compute metrics
            metrics['Hit@5'].append(self.hit_at_k(preds, gt, 5))
            metrics['Hit@10'].append(self.hit_at_k(preds, gt, 10))
            metrics['Hit@50'].append(self.hit_at_k(preds, gt, 50))
            metrics['MRR@10'].append(self.mrr_at_k(preds, gt, 10))
            metrics['NDCG@10'].append(self.ndcg_at_k(preds, gt, 10))
            metrics['NDCG@50'].append(self.ndcg_at_k(preds, gt, 50))
        
        # Average across all users
        results = {}
        for metric_name, values in metrics.items():
            if len(values) > 0:
                results[metric_name] = np.mean(values)
            else:
                results[metric_name] = 0.0
        
        return results

In [9]:
def build_user_book_interactions(
    user_to_review_path: str,
    book_to_review_path: str,
) -> Dict[str, List[str]]:
    """
    Builds:
        user_id -> [book_id, book_id, ...]
    """
    user_to_review = load_json(user_to_review_path)
    book_to_review = load_json(book_to_review_path)

    # Normalize IDs to string
    review_to_user = {
        str(x["review_id"]): str(x["user_id"])
        for x in user_to_review
    }

    review_to_book = {
        str(x["review_id"]): str(x["book_id"])
        for x in book_to_review
    }

    user_book = defaultdict(list)

    for rid, user_id in review_to_user.items():
        if rid in review_to_book:
            book_id = review_to_book[rid]
            user_book[user_id].append(book_id)

    return user_book


def split_user_interactions(
    user_book: Dict[str, List[str]],
    seed: int = 42,
):
    """
    Returns:
        train_user_book
        val_user_book
        test_user_book
    """
    random.seed(seed)

    train = {}
    val = {}
    test = {}

    for user, books in user_book.items():
        books = list(set(books))  # remove duplicates
        random.shuffle(books)

        n = len(books)
        if n < 3:
            train[user] = books
            val[user] = []
            test[user] = []
            continue

        n_train = int(0.7 * n)
        n_val = int(0.15 * n)

        train[user] = books[:n_train]
        val[user] = books[n_train : n_train + n_val]
        test[user] = books[n_train + n_val :]

    return train, val, test


def build_ground_truth(test_user_book: Dict[str, List[str]]) -> Dict[str, Set[str]]:
    return {
        user: set(books)
        for user, books in test_user_book.items()
        if len(books) > 0
    }


def generate_predictions(model, users: List[str], k: int):
    predictions = {}

    for u in tqdm(users, desc="Generating Predictions"):
        preds = model.recommend(u, k)
        predictions[u] = preds

    return predictions


def build_train_user_book_rating(
    train_user_book,
    user_to_review_path,
    book_to_review_path,
    review_path,
):
    user_to_review = load_json(user_to_review_path)
    book_to_review = load_json(book_to_review_path)
    reviews = load_json(review_path)

    review_to_user = {str(x["review_id"]): str(x["user_id"]) for x in user_to_review}
    review_to_book = {str(x["review_id"]): str(x["book_id"]) for x in book_to_review}
    review_to_rating = {str(x["review_id"]): float(x["user_rating"]) for x in reviews}

    train_ratings = defaultdict(dict)

    for rid, user in review_to_user.items():
        if rid in review_to_book and rid in review_to_rating:
            book = review_to_book[rid]
            rating = review_to_rating[rid]

            if rating > 0 and user in train_user_book and book in train_user_book[user]:
                train_ratings[user][book] = rating

    return train_ratings

In [10]:
def main():
    # -------------------------------
    # Paths
    # -------------------------------

    DATA_FOLDER = "RokomariBG_Dataset/"
    USER_TO_REVIEW = DATA_FOLDER+"user_to_review.json.gz"
    BOOK_TO_REVIEW = DATA_FOLDER+"book_to_review.json.gz"
    REVIEW = DATA_FOLDER + "review.json.gz"

    K = 10

    # -------------------------------
    # Build user-book interactions
    # -------------------------------
    user_book = build_user_book_interactions(
        USER_TO_REVIEW,
        BOOK_TO_REVIEW,
    )

    print(f"Total users with interactions: {len(user_book)}")

    evaluator = RankingEvaluator()

    # -------------------------------
    # Split 70/15/15
    # -------------------------------
    train_user_book, val_user_book, test_user_book = split_user_interactions(
        user_book
    )

    ground_truth = build_ground_truth(test_user_book)
    test_users = list(ground_truth.keys())

    print(f"Users in test set: {len(test_users)}")

    # =====================================================
    # EXPLICIT MF (SVD)
    # =====================================================

    print("\nTraining Explicit MF (SVD)...")

    train_user_book_rating = build_train_user_book_rating(
        train_user_book=train_user_book,
        user_to_review_path=USER_TO_REVIEW,
        book_to_review_path=BOOK_TO_REVIEW,
        review_path=REVIEW,
    )

    explicit_mf = ExplicitMFRecommender(train_user_book_rating)
    explicit_mf.fit()

    explicit_preds = generate_predictions(explicit_mf, test_users, K)
    explicit_metrics = evaluator.evaluate(explicit_preds, ground_truth)

    print("\n===== Explicit MF Results =====")
    for m, v in explicit_metrics.items():
        print(f"{m}: {v:.4f}")


if __name__ == "__main__":
    main()

Total users with interactions: 63721
Users in test set: 15427

Training Explicit MF (SVD)...
Training Explicit MF (SVD)...


Generating Predictions: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████| 15427/15427 [02:35<00:00, 99.32it/s]



===== Explicit MF Results =====
Hit@5: 0.0033
Hit@10: 0.0063
Hit@50: 0.0063
MRR@10: 0.0020
NDCG@10: 0.0019
NDCG@50: 0.0019
