# Project Assignment: Short Video Recommender System (KuaiRec)

Dataset Source: [Kuairec](https://kuairec.com/)

Arxiv Paper: [KuaiRec: A Fully-observed Dataset and Insights for Evaluating Recommender Systems](https://arxiv.org/pdf/2202.10842)

## Dataset import

In [None]:
!wget https://nas.chongminggao.top:4430/datasets/KuaiRec.zip --no-check-certificate
!unzip KuaiRec.zip

## Imports

In [None]:
import os

import numpy as np
import pandas as pd
import plotly.express as px
from scipy.sparse import csr_matrix
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import MultiLabelBinarizer

# I get my dataset from a Kaggle input
DATA_PATH = "/kaggle/input/kuairec/KuaiRec 2.0/data"
if not os.path.exists(DATA_PATH):
   DATA_PATH = f"{os.getcwd()}/KuaiRec/data"
if not os.path.exists(DATA_PATH):
   DATA_PATH = f"{os.getcwd()}/KuaiRec 2.0/data"
if not os.path.exists(DATA_PATH):
   raise FileNotFoundError("KuaiRec dataset not found. Please check the path.")

DATA_PATH

# Step 1: Load the dataset

In [None]:
def data_clear(df : pd.DataFrame) -> pd.DataFrame:
    # Date is time in a weird format

    # Time and Date are duplicated of timestamp, we can drop them
    df.drop(columns=["time", "date"], inplace=True)
    # Not a problem, we want to keep the data for the density
    df = df.astype({
        "user_id": "int32",
        "video_id": "int32",
        "play_duration":"int32",
        "timestamp": "int64",
        "watch_ratio": "float32"}, errors="ignore")
    
    # Drop duplicates
    df.drop_duplicates(inplace=True)
    df.dropna(inplace=True)
    df = df[df["timestamp"] >= 0]
    
    df["timestamp"] = pd.to_datetime(df["timestamp"], unit="s")

    return df

In [None]:
def my_describe(df : pd.DataFrame) -> pd.DataFrame:
    """
    Custom describe for datasets containing user_id and video_id
    """
    print(f"Shape of the small matrix: {df.shape}")
    unique_users = df["user_id"].nunique()
    unique_posts = df["video_id"].nunique()
    print(f"Number of unique users: {unique_users}")
    print(f"Number of unique posts: {unique_posts}")
    print(f"Matrix sparsity: {len(df) /(unique_posts * unique_users) * 100}%")
    return df.describe()

## Small matrix

This table has a density of 99.6%. This means that 99.6% of the entries in the matrix are non-zero, indicating that most users have interacted with most items.

In [None]:
small_matrix = pd.read_csv(f"{DATA_PATH}/small_matrix.csv")

small_matrix = data_clear(small_matrix)


## Big matrix

This table has a density of 16.3%. We will use this matrix for our training and testing.

It contains more interactions with the same users/items of the small matrix. We do not need to substract the small matrix.

In [None]:
big_matrix = pd.read_csv(f"{DATA_PATH}/big_matrix.csv")

big_matrix = data_clear(big_matrix)


## Caption Category

In [None]:
caption_category = pd.read_csv(f"{DATA_PATH}/kuairec_caption_category.csv", lineterminator='\n')
caption_category

## Misc

In [None]:
print(f"Proportion of small_matrix relative to big_matrix: {small_matrix.shape[0] * 100 / big_matrix.shape[0]:.2f}%")

# Step 2: Feature Engineering

Nothing required here. ALS needs the matrix of user-item interactions. We will use `small_matrix.csv` for training and testing.

# Step 3: Alternating Least Squares (ALS) Model

Considering that we only have implicit feedback, ALS can work well. We will not use demographic data for this simple model. This algorithm is mostly used for sparse datasets.

We will use the ALS algorithm from pyspark.ml.recommendation with hyperparameters tuning and cross-validation.

The model is cut into 4 parts:
- Data preparation and tuning
- Model training
- Model evaluation
- Model saving


### Pyspark imports

In [None]:
import pyspark
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.sql import SparkSession

# To evaluate the model with RMSE
from pyspark.ml.evaluation import RegressionEvaluator
# For hyperparameter tuning
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel, Model

print(f"Spark version: {pyspark.__version__}")
print(f"Pandas version: {pd.__version__}")

# Create a Spark session
spark = SparkSession.builder \
    .appName("KuaiRec ALS") \
    .getOrCreate()

### Data preparation for Pyspark

In [None]:
# We load directly from the CSV to avoid memory issues
# TODO: Maybe later on use parquet files for cleaned up data
small_matrix = spark.read.csv(
    f"{DATA_PATH}/small_matrix.csv",
    header=True,
    sep=",",
    nullValue="",
    # We have to infer for correct types
    inferSchema=True,
).select("user_id", "video_id", "watch_ratio").na.drop(subset=["user_id", "video_id", "watch_ratio"])

In [None]:
# Makes a lot of problems, we will not use it for now
big_matrix = spark.read.csv(
    f"{DATA_PATH}/big_matrix.csv",
    header=True,
    sep=",",
    inferSchema=True,
    nullValue="",
).select("user_id", "video_id", "watch_ratio").na.drop(subset=["user_id", "video_id", "watch_ratio"])

### Hyperparameter tuning and Cross Validation

In [None]:
# ALS model configuration
als = ALS(
    maxIter=10,
    rank=10,
    userCol="user_id",
    itemCol="video_id",
    ratingCol="watch_ratio",
    implicitPrefs=True,
)

# For CrossValidator
params = ParamGridBuilder() \
    .addGrid(als.maxIter, [10, 15]) \
    .addGrid(als.regParam, [0.09, 0.1]) \
    .build()


# RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="watch_ratio", predictionCol="prediction")


# CrossValidator
cvs = CrossValidator(
    estimator=als,
    estimatorParamMaps=params,
    evaluator=evaluator,
    # Between 2 and 5
    numFolds=3,
)

### Training
Now with the training, we should have:

R ≈ U x V

Where:
- R is the user-item interaction matrix
- U is the user feature matrix
- V is the item feature matrix

In [None]:
(training, test) = small_matrix.randomSplit([0.8, 0.2], seed=42)

In [None]:
# Fit the ALS model on the train data
models : CrossValidatorModel = cvs.fit(training)

### Evaluation

In [None]:
# Take the best model from the CrossValidator
pyspark_als_model : Model = models.bestModel
predictions = pyspark_als_model.transform(test)
rmse = evaluator.evaluate(predictions.na.drop())

In [None]:
print(f"RMSE: {rmse}")
print(f"Rank: {pyspark_als_model.rank}")
print(f"MaxIter: {pyspark_als_model._java_obj.parent().getMaxIter()}")
print(f"RegParam: {pyspark_als_model._java_obj.parent().getRegParam()}")

### Saving

In [None]:
pyspark_als_model.save("pyspark_als_model")

# Step 4: ALS Recommendation

- Predict which videos are likely to be enjoyed by each user in the test set
- Generate a top-N ranked list of recommendations for each user

### Loading model

In [None]:
try:
    pyspark_als_model
except NameError:
    print("Model not found. Trying to load it.")
    if os.path.exists("pyspark_als_model"):
        print("Model found. Loading it.")
        pyspark_als_model = ALSModel.load("pyspark_als_model")
        print("Model loaded.")
    else:
        print("Model not found. Please train the model first.")
        raise FileNotFoundError("Model not found. Please train the model first.")


### Recommendation

In [None]:
# If you have not already used Caption Category
caption_category = pd.read_csv(f"{DATA_PATH}/kuairec_caption_category.csv", lineterminator='\n')

def video_id_to_caption(video_id: int) -> str:
    """
    Get the caption of a video from its id

    Args:
        video_id (int): The id of the video

    Returns:
        str: The caption of the video
    """

    # Get the caption from the video_id
    match = caption_category[caption_category["video_id"] == video_id]["caption"]
    if not match.empty and match.values[0] == match.values[0]:  # check not NaN
        return str(match.values[0])
    else:
        return "Unknown"

In [None]:
# All user recommendations
recommends = pyspark_als_model.recommendForAllUsers(10)
recommends_df = recommends.toPandas()

# Explode to have each line as a recommendation
recommends_df = recommends_df.explode("recommendations")
recommends_df["recommendations"] = recommends_df["recommendations"].apply(
    lambda x: f"{video_id_to_caption(x[0])}")
recommends_df.set_index("user_id", inplace=True)
recommends_df

In [None]:
# Top 5 users recommendations
top_users = [120, 165, 357, 1314, 2118]
top_users_recommends_df = recommends_df[recommends_df.index.isin(top_users)]
top_users_recommends_df

# Evaluation

- Choose suitable metrics (e.g., Precision@K, Recall@K, MAP, NDCG)
- Evaluate performance and provide interpretations

In [None]:
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import col, expr
from sklearn.metrics import ndcg_score

ground_truth = test.groupBy("user_id") \
    .agg(collect_list("video_id").alias("true_items"))

predicted = recommends.select(
    col("user_id"),
    expr("transform(recommendations, x -> x.video_id)").alias("pred_items")
)
ranking_df = predicted.join(ground_truth, on="user_id", how="inner")

ranking_pd = ranking_df.toPandas()

In [None]:
def recall_at_k(y_true, y_pred, k):
    return len(set(y_true) & set(y_pred[:k])) / len(set(y_true)) if y_true else 0

def map_at_k(y_true, y_pred, k):
    score = 0.0
    hit_count = 0.0
    for i, p in enumerate(y_pred[:k]):
        if p in y_true:
            hit_count += 1.0
            score += hit_count / (i + 1.0)
    return score / min(len(y_true), k) if y_true else 0.0

def ndcg_at_k(y_true, y_pred, k):
    relevance = [1 if item in y_true else 0 for item in y_pred[:k]]
    return ndcg_score([relevance], [relevance])
recalls, maps, ndcgs = [], [], []

In [None]:
for row in ranking_pd.itertuples():
    true_items = row.true_items
    pred_items = row.pred_items
    
    recalls.append(recall_at_k(true_items, pred_items, 10))
    maps.append(map_at_k(true_items, pred_items, 10))
    ndcgs.append(ndcg_at_k(true_items, pred_items, 10))

print(f"Recall@10: {np.mean(recalls):.4f}")
print(f"MAP@10:    {np.mean(maps):.4f}")
print(f"NDCG@10:   {np.mean(ndcgs):.4f}")
