# Feature Engineering, Model, Evaluation and Metrics

## Loading Data

**Note:** We suppose that the `EDA.ipynb` file has already been run to remove duplicates and handle missing values.


In [1]:
import os
import random
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, expr, explode, collect_list, collect_set,
    udf, size
)
from pyspark.sql.types import ArrayType, IntegerType, FloatType

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.recommendation import ALS, ALSModel

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

from sklearn.metrics import ndcg_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder, MultiLabelBinarizer


small_matrix = pd.read_csv("../data_final_project/KuaiRec/data/small_matrix.csv")
big_matrix = pd.read_csv("../data_final_project/KuaiRec/data/big_matrix.csv")

caption_category = pd.read_csv("../data_final_project/KuaiRec/data/kuairec_caption_category.csv", engine="python", sep=",", quotechar='"', on_bad_lines='skip')

item_categories = pd.read_csv("../data_final_project/KuaiRec/data/item_categories.csv")

item_daily_features = pd.read_csv("../data_final_project/KuaiRec/data/item_daily_features.csv")
item_daily_features.fillna(-1, inplace=True)

user_features = pd.read_csv("../data_final_project/KuaiRec/data/user_features.csv")
user_features.fillna(-1, inplace=True)

##  Setting up spark session

In [2]:
spark = SparkSession.builder \
    .appName("KuaiRec ALS with Metadata") \
    .getOrCreate()

25/05/17 20:31:38 WARN Utils: Your hostname, Euginux resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/05/17 20:31:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/17 20:31:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


##  Feature engineering

We first initialize to process large-scale data efficiently.

In [3]:
# Load interaction data
small_matrix = spark.read.csv(
    "../data_final_project/KuaiRec/data/small_matrix.csv",
    header=True,
    sep=",",
    nullValue="",
    inferSchema=True,
).select("user_id", "video_id", "watch_ratio").na.drop(subset=["user_id", "video_id", "watch_ratio"])

# Load user metadata
user_metadata = spark.read.csv(
    "../data_final_project/KuaiRec/data/user_features.csv",
    header=True,
    sep=",",
    nullValue="",
    inferSchema=True,
)

# Load item metadata
item_metadata = spark.read.csv(
    "../data_final_project/KuaiRec/data/item_daily_features.csv",
    header=True,
    sep=",",
    nullValue="",
    inferSchema=True,
)

                                                                                

Enrich interaction data by joining with user and item metadata

In [4]:
small_matrix_with_user_metadata = small_matrix.join(
    user_metadata,
    on="user_id",
    how="left"
)

small_matrix_with_metadata = small_matrix_with_user_metadata.join(
    item_metadata,
    on="video_id",
    how="left"
)

Define user-related features and cast them to numerical format for modeling and assemble into a single vector

In [5]:
user_feature_columns = ["is_lowactive_period", "fans_user_num", "register_days"]

for column in user_feature_columns:
    small_matrix_with_metadata = small_matrix_with_metadata.withColumn(
        column, col(column).cast("double")
    )

user_assembler = VectorAssembler(
    inputCols=user_feature_columns,
    outputCol="user_features"
)
small_matrix_with_metadata = user_assembler.transform(small_matrix_with_metadata)



Define item-related features and cast them to numerical format for modeling and assemble into a single vector

In [6]:
item_feature_columns = ["video_duration", "play_cnt", "complete_play_user_num", 
                       "like_user_num", "comment_cnt", "share_user_num"]

for column in item_feature_columns:
    small_matrix_with_metadata = small_matrix_with_metadata.withColumn(
        column, col(column).cast("double")
    )

item_assembler = VectorAssembler(
    inputCols=item_feature_columns,
    outputCol="item_features"
)
small_matrix_with_metadata = item_assembler.transform(small_matrix_with_metadata)


##  Model architecture

We configure and tune an ALS collaborative filtering model using cross-validation with different hyperparameters. The data is split into training and test sets, and the model is trained and evaluated using RMSE. Finally, we save the best-performing model for future use.

In [8]:
als = ALS(
    maxIter=10,
    rank=10,
    userCol="user_id",
    itemCol="video_id",
    ratingCol="watch_ratio",
    implicitPrefs=True
)

params = ParamGridBuilder() \
    .addGrid(als.maxIter, [10, 15]) \
    .addGrid(als.regParam, [0.1]) \
    .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="watch_ratio", predictionCol="prediction")

cvs = CrossValidator(
    estimator=als,
    estimatorParamMaps=params,
    evaluator=evaluator,
    numFolds=3,
)

# we use small_matrix instead of big_matrix and split it into train and test
# Spark is not able to support a data set as big as big_matrix and crashes
(training, test) = small_matrix.randomSplit([0.8, 0.2], seed=42)

models = cvs.fit(training)

# Take the best model from the CrossValidator
pyspark_als_model = models.bestModel
predictions = pyspark_als_model.transform(test)
rmse = evaluator.evaluate(predictions.na.drop())
print(f"RMSE: {rmse}")
print(f"Rank: {pyspark_als_model.rank}")
print(f"MaxIter: {pyspark_als_model._java_obj.parent().getMaxIter()}")
print(f"RegParam: {pyspark_als_model._java_obj.parent().getRegParam()}")

pyspark_als_model.save("als_best_model")

                                                                                

RMSE: 1.286511979447459
Rank: 10
MaxIter: 15
RegParam: 0.1


Py4JJavaError: An error occurred while calling o3335.save.
: java.io.IOException: Path als_best_model already exists. To overwrite it, please use write.overwrite().save(path) for Scala and use write().overwrite().save(path) for Java and Python.
	at org.apache.spark.ml.util.FileSystemOverwrite.handleOverwrite(ReadWrite.scala:683)
	at org.apache.spark.ml.util.MLWriter.save(ReadWrite.scala:167)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


# Recommendation

We have the ALS model with its hyperparameters. We now need to build the recommendation function. 
We have to: 
For each user, predict ratings/scores for candidate videos they haven’t watched.

Rank these videos by predicted rating.

Return top N videos per user.

In [None]:
top_n = 10

test_users = test.select("user_id").distinct()

user_recs = pyspark_als_model.recommendForUserSubset(test_users, top_n)

user_recs_exploded = user_recs.select(
    col("user_id"),
    explode("recommendations").alias("rec")
).select(
    col("user_id"),
    col("rec.video_id"),
    col("rec.rating")
)

user_recs_exploded.show()

# Evaluate

In [9]:
# only watch_ratio > 0 are relevant
test_relevant = test.filter(col("watch_ratio") > 0)

ground_truth = test_relevant.groupBy("user_id").agg(
    collect_set("video_id").alias("true_items")
)


In [10]:
top_n = 10
test_users = test.select("user_id").distinct()

user_recs = pyspark_als_model.recommendForUserSubset(test_users, top_n)

user_recs = user_recs.select(
    "user_id",
    expr("transform(recommendations, x -> x.video_id)").alias("pred_items")
)

In [11]:
eval_df = user_recs.join(ground_truth, on="user_id")


# Defining metrics functions

In [12]:
def precision_at_k(pred_items, true_items, k=10):
    pred_k = pred_items[:k]
    hits = len(set(pred_k) & set(true_items))
    return hits / float(k)

def recall_at_k(pred_items, true_items, k=10):
    pred_k = pred_items[:k]
    hits = len(set(pred_k) & set(true_items))
    return hits / float(len(true_items)) if true_items else 0.0

precision_udf = udf(lambda pred, true: precision_at_k(pred, true, top_n), FloatType())
recall_udf = udf(lambda pred, true: recall_at_k(pred, true, top_n), FloatType())

eval_df = eval_df.withColumn("precision_at_k", precision_udf("pred_items", "true_items"))
eval_df = eval_df.withColumn("recall_at_k", recall_udf("pred_items", "true_items"))


In [13]:
avg_precision = eval_df.selectExpr("avg(precision_at_k) as mean_precision").collect()[0]["mean_precision"]
avg_recall = eval_df.selectExpr("avg(recall_at_k) as mean_recall").collect()[0]["mean_recall"]

print(f"Mean Precision@{top_n}: {avg_precision:.4f}")
print(f"Mean Recall@{top_n}: {avg_recall:.4f}")


                                                                                

Mean Precision@10: 0.1928
Mean Recall@10: 0.0030


### Average Precision and NDCG@K

MAP (Mean Average Precision): Average precision across all users, accounting for order of hits.

NDCG: Measures usefulness considering position; higher rank hits contribute more.

In [14]:
def average_precision(pred_items, true_items, k=10):
    if not true_items:
        return 0.0
    pred_items = pred_items[:k]
    score = 0.0
    hits = 0
    for i, item in enumerate(pred_items):
        if item in true_items:
            hits += 1
            score += hits / (i + 1.0)
    return score / min(len(true_items), k)


In [15]:
import math

def ndcg(pred_items, true_items, k=10):
    pred_items = pred_items[:k]
    dcg = 0.0
    for i, item in enumerate(pred_items):
        if item in true_items:
            dcg += 1.0 / math.log2(i + 2)  # +2 because log2(1) for position 0
    ideal_dcg = sum([1.0 / math.log2(i + 2) for i in range(min(len(true_items), k))])
    return dcg / ideal_dcg if ideal_dcg > 0 else 0.0


In [16]:
ap_udf = udf(lambda pred, true: average_precision(pred, true, top_n), FloatType())
ndcg_udf = udf(lambda pred, true: ndcg(pred, true, top_n), FloatType())

# we apply udf and eval
eval_df = eval_df \
    .withColumn("average_precision", ap_udf("pred_items", "true_items")) \
    .withColumn("ndcg", ndcg_udf("pred_items", "true_items"))

mean_ap = eval_df.selectExpr("avg(average_precision) as MAP").first()["MAP"]
mean_ndcg = eval_df.selectExpr("avg(ndcg) as NDCG").first()["NDCG"]

print(f"MAP@{top_n}: {mean_ap:.4f}")
print(f"NDCG@{top_n}: {mean_ndcg:.4f}")


                                                                                

MAP@10: 0.0827
NDCG@10: 0.1930


## Evaluation on a random basis

In [17]:
big_matrix_spark = spark.read.csv(
    "../data_final_project/KuaiRec/data/big_matrix.csv",
    header=True,
    inferSchema=True
).select("video_id").na.drop()


all_videos = small_matrix.select("video_id").union(
    big_matrix_spark.select("video_id")
).distinct()
video_list = [row.video_id for row in all_videos.collect()]

def random_recommendations(video_pool, k=10):
    return random.sample(video_pool, k)

random_rec_udf = udf(lambda: random.sample(video_list, 10), ArrayType(IntegerType()))

random_preds = test.select("user_id").distinct().withColumn("pred_items", random_rec_udf())

random_eval_df = random_preds.join(ground_truth, on="user_id")

random_eval_df = random_preds.join(ground_truth, on="user_id")

random_eval_df = random_eval_df \
    .withColumn("precision_at_k", precision_udf("pred_items", "true_items")) \
    .withColumn("recall_at_k", recall_udf("pred_items", "true_items")) \
    .withColumn("average_precision", ap_udf("pred_items", "true_items")) \
    .withColumn("ndcg", ndcg_udf("pred_items", "true_items"))

random_metrics = random_eval_df.selectExpr(
    "avg(precision_at_k) as precision",
    "avg(recall_at_k) as recall",
    "avg(average_precision) as map",
    "avg(ndcg) as ndcg"
).first()

print(f"Random Precision@10: {random_metrics['precision']:.4f}")
print(f"Random Recall@10: {random_metrics['recall']:.4f}")
print(f"Random MAP@10: {random_metrics['map']:.4f}")
print(f"Random NDCG@10: {random_metrics['ndcg']:.4f}")


                                                                                

Random Precision@10: 0.0536
Random Recall@10: 0.0008
Random MAP@10: 0.0161
Random NDCG@10: 0.0512


## Summary

### Evaluation metrics on Top-10 with ALS Model:
- **Mean Precision@10:** `0.1904`
- **Mean Recall@10:** `0.0029`
- **MAP@10 (Mean Average Precision):** `0.0797`
- **NDCG@10 (Normalized Discounted Cumulative Gain):** `0.1876`

---

### Random Baseline (for comparing):
- **Random Precision@10:** `0.0623`
- **Random Recall@10:** `0.0009`
- **Random MAP@10:** `0.0209`
- **Random NDCG@10:** `0.0624`

---

### Interpretation:

- **Precision@10 (19.04%)**: On average, ~2 of the top 10 recommended videos are relevant — significantly better than random (6.23%).  
- **Recall@10 (0.29%)**: The model retrieves only a very small portion of all relevant items. This may reflect a large candidate set and limited top-K budget.
- **MAP@10 (7.97%)**: Relevant videos appear earlier in the list more often than random, but there's still a lot of room for ranking improvement.
- **NDCG@10 (18.76%)**: Shows decent ranking quality — relevant items are typically placed higher than in a random list.

Compared to the **random recommender**, the ALS model performs **substantially better** across all metrics, especially in **precision and ranking (MAP/NDCG)**.

### Potential improvements or ideas:

- Tuning more hyperparameter tuning (`rank`, `regParam`, `alpha`).
- Address cold-start issues (e.g., filter unseen users/items or integrate side info).
- Look into hybrid models combining collaborative filtering with metadata.
