In [1]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, lit, expr, explode, row_number, sum, udf
from pyspark.sql.types import FloatType, DoubleType
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

def save_model(model, path):
    print(f"Saving model to {path}...")
    try:
        model.save(path)
        print("Model saved successfully!")
    except Exception as e:
        print(f"Error saving model: {e}")

def load_data(spark, train_path, test_path):
    print("Loading training dataset...")
    training_df = spark.read.csv(train_path, header=True, inferSchema=True)
    print("Loading test dataset...")
    test_df = spark.read.csv(test_path, header=True, inferSchema=True)
    return training_df, test_df

def clean_data(df):
    df = df.filter(col("watch_ratio").isNotNull())
    df = df.filter(col("watch_ratio") >= 0)
    df = df.filter(~isnan(col("watch_ratio")))
    df = df.withColumn("watch_ratio", when(col("watch_ratio") > 5, 5).otherwise(col("watch_ratio")))
    return df

def index_data(training_df, test_df):
    print("Using existing numerical user_id and video_id columns for ALS...")
    training = training_df.withColumnRenamed("user_id", "userIndex").withColumnRenamed("video_id", "videoIndex")
    test = test_df.withColumnRenamed("user_id", "userIndex").withColumnRenamed("video_id", "videoIndex")
    return training, test

def evaluate_regression(predictions):
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="watch_ratio", predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print(f"Root Mean Squared Error (RMSE): {rmse:.4f}")
    mae_evaluator = RegressionEvaluator(metricName="mae", labelCol="watch_ratio", predictionCol="prediction")
    mae = mae_evaluator.evaluate(predictions)
    print(f"Mean Absolute Error (MAE): {mae:.4f}")

def evaluate_als_precision(model, test_df, k=10, rating_col="watch_ratio", user_col="userIndex", item_col="videoIndex", relevance_threshold=0.9):
    """
    Evaluate precision@k for ALS recommendations.
    For each user in the test set, recommend k videos and check how many are actually watched by the user in the test set.
    """
    from pyspark.sql.functions import col, explode

    # Get distinct users from test set
    users = test_df.select(user_col).distinct()

    # Generate top-k recommendations for these users
    user_recs = model.recommendForUserSubset(users, k)
    # user_recs: [userIndex, recommendations] where recommendations is an array of {itemIndex, rating}

    # Explode recommendations to get (user, recommended_item)
    recs_exploded = user_recs.select(
        col(user_col),
        explode("recommendations").alias("rec")
    ).select(
        col(user_col),
        col("rec." + item_col).alias(item_col)
    )

    # Get ground truth: (user, item) pairs in test set where watch_ratio >= relevance_threshold
    relevant = test_df.filter(col(rating_col) >= relevance_threshold).select(user_col, item_col)

    # Join recommendations with ground truth to find hits
    hits = recs_exploded.join(relevant, on=[user_col, item_col], how="inner") \
                        .groupBy(user_col).count().withColumnRenamed("count", "num_hits")

    # Count recommendations per user (should be k, but may be less if not enough items)
    recs_per_user = recs_exploded.groupBy(user_col).count().withColumnRenamed("count", "num_recs")

    # Join to compute precision per user
    precision_per_user = hits.join(recs_per_user, on=user_col, how="right") \
                             .fillna(0, subset=["num_hits"]) \
                             .withColumn("precision", col("num_hits") / col("num_recs"))

    # Average precision over all users
    avg_precision = precision_per_user.agg({"precision": "avg"}).collect()[0][0]

    print(f"Average Precision@{k}: {avg_precision:.4f}")
    return avg_precision

def generate_sample_recommendations(model, predictions, num_users=5, k=10):
    sample_users = predictions.select("userIndex").distinct().limit(num_users)
    user_recs = model.recommendForUserSubset(sample_users, k)
    user_recs.show(truncate=False)

In [2]:

spark = SparkSession.builder.appName("KuaiRec_ALS_Model").getOrCreate()
train_path = "../data_final_project/KuaiRec 2.0/data/big_matrix.csv"
test_path = "../data_final_project/KuaiRec 2.0/data/small_matrix.csv"
training_df, test_df = load_data(spark, train_path, test_path)
training_df = clean_data(training_df)
test_df = clean_data(test_df)
print(f"Training dataset shape after cleaning: {training_df.count()} rows")
print(f"Test dataset shape after cleaning: {test_df.count()} rows")
training, test = index_data(training_df, test_df)
print("Configuring ALS model...")
als = ALS(
    maxIter=50,
    regParam=0.1,
    rank=30,
    userCol="userIndex",
    itemCol="videoIndex",
    ratingCol="watch_ratio",
    coldStartStrategy="drop",
    nonnegative=True
)

25/05/09 13:35:22 WARN Utils: Your hostname, debian-lenovo resolves to a loopback address: 127.0.1.1; using 192.168.0.18 instead (on interface wlp3s0)
25/05/09 13:35:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/09 13:35:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Loading training dataset...


                                                                                

Loading test dataset...


                                                                                

Training dataset shape after cleaning: 12530806 rows




Test dataset shape after cleaning: 4676570 rows
Using existing numerical user_id and video_id columns for ALS...
Configuring ALS model...


                                                                                

In [2]:
model = als.fit(training)
#model = ALS.load("../saved_models/als_model")
print("Generating predictions...")
predictions = model.transform(test)
print(f"Predictions shape: {predictions.count()} rows")
#save_model(model, "../saved_models/als_model")
predictions.show(5)

25/05/09 13:36:35 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/05/09 13:39:07 ERROR Executor: Exception in task 3.0 in stage 59.0 (TID 479)]
java.lang.StackOverflowError
	at jdk.internal.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:569)
	at java.base/java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1100)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2423)
	at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
	at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
	at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
	at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
	at java.base/java.io.O

ConnectionRefusedError: [Errno 111] Connection refused

ConnectionRefusedError: [Errno 111] Connection refused

In [None]:
if predictions.count() == 0:
    print("Warning: No valid predictions generated. Check your data.")
else: 
    #evaluate_regression(predictions)
    print(evaluate_als_precision(model, test, k=10,rating_col="watch_ratio", user_col="userIndex", item_col="videoIndex", relevance_threshold=0.5))
    #generate_sample_recommendations(model, predictions, num_users=5, k=10)

                                                                                

Average Precision@10: 0.0000
0.0


In [None]:
# ...existing code...

# Pick a sample user from the test set
sample_user = test.select("userIndex").distinct().limit(1).collect()[0][0]

print(f"Sample userIndex: {sample_user}")

# Get recommended videos for this user
user_recs = model.recommendForUserSubset(test.filter(col("userIndex") == sample_user).select("userIndex").distinct(), 10)
user_recs.show(truncate=False)

# Get actual videos watched by this user in the test set
actual_videos = test.filter(col("userIndex") == sample_user).select("videoIndex").distinct()
print("Actual videos in test set for this user:")
actual_videos.show(truncate=False)

# Flatten recommendations for this user
from pyspark.sql.functions import explode
recs_flat = user_recs.select(
    col("userIndex"),
    explode("recommendations").alias("rec")
).select(
    col("userIndex"),
    col("rec.videoIndex").alias("videoIndex")
)

print("Recommended videos for this user:")
recs_flat.show(truncate=False)

# Check intersection
intersection = recs_flat.join(actual_videos, on="videoIndex", how="inner")
print("Intersection (recommended & actually watched):")
intersection.show(truncate=False)
# ...existing code...

Sample userIndex: 14


                                                                                

+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userIndex|recommendations                                                                                                                                                                          |
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|14       |[{4930, 6.6510844}, {505, 6.141119}, {5137, 5.923943}, {10013, 5.9077296}, {1594, 5.834613}, {2253, 5.4328294}, {7381, 5.4263077}, {5074, 5.423842}, {6598, 5.3590636}, {9290, 5.283864}]|
+---------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

Actual vi

                                                                                

+----------+
|videoIndex|
+----------+
|148       |
|471       |
|463       |
|9852      |
|4101      |
|8592      |
|9900      |
|833       |
|7340      |
|7253      |
|10206     |
|7754      |
|6357      |
|7880      |
|3698      |
|1896      |
|392       |
|2235      |
|540       |
|5614      |
+----------+
only showing top 20 rows

Recommended videos for this user:


                                                                                

+---------+----------+
|userIndex|videoIndex|
+---------+----------+
|14       |4930      |
|14       |505       |
|14       |5137      |
|14       |10013     |
|14       |1594      |
|14       |2253      |
|14       |7381      |
|14       |5074      |
|14       |6598      |
|14       |9290      |
+---------+----------+

Intersection (recommended & actually watched):


                                                                                

+----------+---------+
|videoIndex|userIndex|
+----------+---------+
+----------+---------+

