In [1]:
import os
import sys

from pathlib import Path

sys.path.append(str(Path(__name__).resolve().parents[1]))

import pandas as pd
import numpy as np


from src.loader.movielens import MovieLensLoader
from src.utils.metrics import RecSysMetrics

import warnings

warnings.filterwarnings("ignore")



In [2]:
movielens_loader = MovieLensLoader(
    num_users=100,
    num_test_items=5,
)

In [3]:
moivelens_dataset = movielens_loader.load()

In [4]:
train = moivelens_dataset.train
test = moivelens_dataset.test

In [5]:
# Rating Prediction
from typing import List, Dict
from collections import defaultdict


class RandomMovieRatingModel(object):
    def __init__(self, unique_user_ids: List[int], unique_movie_ids: List[int]) -> None:

        self.userid_index = dict(zip(unique_user_ids, range(len(unique_user_ids))))
        self.movieid_index = dict(zip(unique_movie_ids, range(len(unique_movie_ids))))
        self.random_pred_matrix = np.random.uniform(
            0.5, 5.0, (len(unique_user_ids), len(unique_movie_ids))
        )

    def predict(self, test_rating: pd.DataFrame) -> List[float]:

        pred_results = []

        for i, row in test_rating.iterrows():
            user_id = row["user_id"]
            movie_id = row["movie_id"]
            if movie_id not in self.movieid_index:
                pred_results.append(np.random.uniform(0.5, 5.0))
                continue

            user_index = self.userid_index[user_id]
            movie_index = self.movieid_index[movie_id]

            pred_rating = self.random_pred_matrix[user_index, movie_index]
            pred_results.append(pred_rating)

        return pred_results


class RandomMovieRankingModel(object):
    def __init__(self, movielens_train: pd.DataFrame):
        self.user_evaluated_movies = (
            movielens_train.groupby("user_id")
            .agg({"movie_id": list})["movie_id"]
            .to_dict()
        )

    def predict(
        self, unique_user_ids: List[int], unique_movie_ids: List[int], top_k: int = 10) -> Dict[int, List[int]]:
        pred_user2items = defaultdict(list)
        for user_id in unique_user_ids:
            watched_movie_set = set(self.user_evaluated_movies[user_id])
            all_movie_set = set(unique_movie_ids)
            non_watched_movie = list(watched_movie_set ^ all_movie_set)

            random_pred_ranking = np.random.choice(non_watched_movie, top_k)

            pred_user2items[user_id].extend(random_pred_ranking)
        return pred_user2items

In [6]:
unique_user_ids = sorted(train.user_id.unique())
unique_movie_ids = sorted(train.movie_id.unique())

userid_index = dict(zip(unique_user_ids, range(len(unique_user_ids))))
movieid_index = dict(zip(unique_movie_ids, range(len(unique_movie_ids))))
movie_test_rating = test.copy()

In [7]:
movie_rank_model = RandomMovieRankingModel(movielens_train=train)
movie_rating_model = RandomMovieRatingModel(
    unique_movie_ids=unique_movie_ids, unique_user_ids=unique_user_ids
)

In [8]:
pred_ratings = movie_rating_model.predict(test_rating=movie_test_rating)

In [9]:
pred_ranking = movie_rank_model.predict(unique_user_ids, unique_movie_ids)

In [12]:
print("Test MAE rating", RecSysMetrics().mae(test["rating"], pred_ratings))
print("Test MSE rating", RecSysMetrics().mse(test["rating"], pred_ratings))
print("Test RMSE rating", RecSysMetrics().rmse(test["rating"], pred_ratings))
print(
    "Test Precision@k",
    RecSysMetrics().calc_precision_at_k(
        moivelens_dataset.test_user2item, pred_ranking, 10
    ),
)

print(
    "Test Recall@k",
    RecSysMetrics().calc_recall_at_k(
        moivelens_dataset.test_user2item, pred_ranking, 10
    ),
)

Test MAE rating 1.7266766414853625
Test MSE rating 4.409018120551929
Test RMSE rating 2.0997662061648503
Test Precision@k 0.0044943820224719105
Test Recall@k 0.012921348314606741
