In [1]:
# Import only necessary libraries
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.functions import explode, col

In [2]:
# Set the path to the data directory
data_path = "../data/KuaiRec/data/"

# Load datasets
print("Loading big matrix...")
big_matrix = pd.read_csv(data_path + "big_matrix.csv")
print("Loading small matrix...")
small_matrix = pd.read_csv(data_path + "small_matrix.csv")
print("Loading social network...")
social_network = pd.read_csv(data_path + "social_network.csv")
social_network["friend_list"] = social_network["friend_list"].map(eval)
print("Loading item features...")
item_categories = pd.read_csv(data_path + "item_categories.csv")
item_categories["feat"] = item_categories["feat"].map(eval)
print("Loading user features...")
user_features = pd.read_csv(data_path + "user_features.csv")
print("Loading items' daily features...")
item_daily_features = pd.read_csv(data_path + "item_daily_features.csv")
print("All data loaded.")

Loading big matrix...
Loading small matrix...
Loading social network...
Loading item features...
Loading user features...
Loading items' daily features...
All data loaded.


In [3]:
# Prepare the interaction matrix for training
interaction_matrix = big_matrix[["user_id", "video_id", "watch_ratio"]]
interaction_matrix = interaction_matrix[interaction_matrix['watch_ratio'] <= 3]
interaction_matrix['watch_ratio'] = (
    (interaction_matrix['watch_ratio'] - interaction_matrix['watch_ratio'].min()) /
    (interaction_matrix['watch_ratio'].max() - interaction_matrix['watch_ratio'].min())
)

# Prepare the test matrix in the same way
test_matrix = small_matrix[["user_id", "video_id", "watch_ratio"]]
test_matrix = test_matrix[test_matrix['watch_ratio'] <= 3]
test_matrix['watch_ratio'] = (
    (test_matrix['watch_ratio'] - test_matrix['watch_ratio'].min()) /
    (test_matrix['watch_ratio'].max() - test_matrix['watch_ratio'].min())
)

In [4]:
# Start Spark session with specified memory settings
spark = SparkSession.builder \
    .appName("ALS KuaiRec") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.extraJavaOptions", "-Xs32m") \
    .getOrCreate()

# Convert pandas DataFrames to Spark DataFrames
spark_df = spark.createDataFrame(interaction_matrix)
spark_df_test = spark.createDataFrame(test_matrix)

# Cast columns to appropriate types for ALS
spark_df = spark_df.withColumn("userId", spark_df["user_id"].cast(IntegerType()))
spark_df = spark_df.withColumn("itemId", spark_df["video_id"].cast(IntegerType()))
spark_df = spark_df.withColumn("rating", spark_df["watch_ratio"].cast(FloatType()))
spark_df_test = spark_df_test.withColumn("userId", spark_df_test["user_id"].cast(IntegerType()))
spark_df_test = spark_df_test.withColumn("itemId", spark_df_test["video_id"].cast(IntegerType()))
spark_df_test = spark_df_test.withColumn("rating", spark_df_test["watch_ratio"].cast(FloatType()))

# Repartition and persist for performance
spark_df = spark_df.repartition(500, "userId").persist()
spark_df_test = spark_df_test.repartition(500, "userId").persist()


25/05/17 16:33:51 WARN Utils: Your hostname, Anosis-laptop resolves to a loopback address: 127.0.1.1; using 192.168.1.33 instead (on interface wlp4s0)
25/05/17 16:33:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/17 16:33:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Train ALS model
als = ALS(
    userCol="userId",
    itemCol="itemId",
    ratingCol="rating",
    rank=50,
    maxIter=15,
    regParam=0.001,
    implicitPrefs=False,
    coldStartStrategy="drop"
)
model = als.fit(spark_df)

# Evaluate model on test set
predictions = model.transform(spark_df_test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")
predictions.show()


25/05/17 16:36:42 WARN TaskSetManager: Stage 0 contains a task of very large size (12209 KiB). The maximum recommended task size is 1000 KiB.
25/05/17 16:36:54 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
25/05/17 16:37:32 WARN TaskSetManager: Stage 82 contains a task of very large size (4643 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

RMSE: 0.1523


[Stage 202:(455 + 18) / 500][Stage 238:>  (0 + 0) / 1][Stage 240:>  (0 + 0) / 2]

+-------+--------+-------------------+------+------+----------+----------+
|user_id|video_id|        watch_ratio|userId|itemId|    rating|prediction|
+-------+--------+-------------------+------+------+----------+----------+
|    833|    3631| 0.3598314406113849|   833|  3631|0.35983145| 0.3771991|
|    833|    1963| 0.3506944444444444|   833|  1963|0.35069445| 0.3606498|
|    833|    9592|0.37574811345303144|   833|  9592|0.37574813|0.40438125|
|    833|    5262|  0.417341089192379|   833|  5262|0.41734108|0.42578137|
|    833|    1922| 0.3625562689100435|   833|  1922|0.36255628|0.37377772|
|    833|    6787|  0.874326906374848|   833|  6787| 0.8743269|0.85503644|
|    833|    6788|0.22572653006562207|   833|  6788|0.22572653|0.22418948|
|    833|     166| 0.5999404761904762|   833|   166| 0.5999405| 0.5772757|
|    833|    6775| 0.2863189591500483|   833|  6775|0.28631896|0.27551708|
|    833|     183| 0.5539890710382513|   833|   183|0.55398905|0.58110744|
|    833|    1936|0.47394

                                                                                

In [6]:
# Function to get top-k actual items per user from test set
def get_top_k_df(k=10):
    df_sorted = test_matrix.sort_values(['user_id', 'watch_ratio'], ascending=[True, False])
    df_sorted = df_sorted.groupby('user_id').head(k)
    return df_sorted.groupby('user_id')['video_id'].apply(list).reset_index()

# Function to get top-k predicted items per user using ALS model
def format_top_k_predictions(k=10):
    test_user_ids = spark_df_test.select("userId").distinct()
    # Recommend top-k items for each user in the test set
    user_recommendations = model.recommendForUserSubset(test_user_ids, k)
    # Flatten recommendations
    exploded_recs = user_recommendations.select(
        col("userId"),
        explode(col("recommendations")).alias("recommendation")
    )
    final_recs = exploded_recs.select(
        col("userId").alias("user_id"),
        col("recommendation.itemId").alias("video_id"),
        col("recommendation.rating").alias("watch_ratio")
    )
    df = final_recs.toPandas()
    df = df.groupby("user_id").head(k)
    return df.groupby("user_id")["video_id"].apply(list).reset_index()

# Function to compute hit rate at k
def hit_rate_at_k(k=10):
    df = get_top_k_df(k)
    df_recommendations = format_top_k_predictions(k)
    # Merge actual and predicted top-k lists
    merged_df = pd.merge(df, df_recommendations, on='user_id', suffixes=('_actual', '_predicted'))
    # Calculate hit rate for each user
    hit_rate = merged_df.apply(lambda row: len(set(row['video_id_actual']) & set(row['video_id_predicted'])), axis=1)
    return hit_rate.mean()

# Print hit rate and sample outputs
print(f"Hit Rate at {10}: {hit_rate_at_k(10):.4f}")
print(get_top_k_df(10))
print(format_top_k_predictions(10))

                                                                                

Hit Rate at 10: 0.0468
      user_id                                           video_id
0          14  [601, 2427, 4051, 3105, 5753, 738, 2292, 4644,...
1          19  [154, 5464, 2629, 8366, 9178, 4592, 4092, 1305...
2          21  [5205, 8488, 2941, 9804, 5412, 9815, 10206, 67...
3          23  [2302, 3738, 522, 9611, 8802, 2283, 3130, 791,...
4          24  [9886, 3720, 8431, 6266, 1415, 8596, 7135, 304...
...       ...                                                ...
1406     7142  [4123, 1305, 6787, 351, 10206, 619, 2894, 5525...
1407     7147  [314, 9721, 9613, 7752, 9815, 8298, 7564, 6787...
1408     7153  [1922, 2478, 7820, 211, 9998, 171, 7844, 3742,...
1409     7159  [4282, 2223, 4123, 9721, 4858, 395, 6879, 7794...
1410     7162  [4932, 5276, 9936, 1396, 217, 8629, 1288, 8670...

[1411 rows x 2 columns]


                                                                                

      user_id                                           video_id
0          14  [4256, 7755, 5284, 9699, 6596, 868, 6418, 6358...
1          19  [1087, 908, 3556, 6214, 6538, 6596, 7933, 1024...
2          21  [5284, 4546, 6596, 5488, 8797, 2992, 5430, 843...
3          23  [6547, 6186, 2872, 3367, 6816, 8052, 6389, 843...
4          24  [421, 5107, 1654, 10507, 7424, 5110, 946, 9703...
...       ...                                                ...
1406     7142  [9637, 6625, 3453, 7644, 908, 6200, 3367, 9309...
1407     7147  [3001, 6596, 1049, 9079, 7933, 2982, 1087, 689...
1408     7153  [1163, 3434, 1781, 902, 1559, 9698, 7968, 1054...
1409     7159  [9426, 6483, 6360, 5056, 6625, 3305, 9189, 104...
1410     7162  [6402, 8006, 1462, 5927, 8434, 10132, 6642, 70...

[1411 rows x 2 columns]
