In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os

from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql import functions as F

In [3]:
folder_path = "./als_spark_checkpoints"

# Check if folder exists
if not os.path.exists(folder_path):
    os.makedirs(folder_path)   # Create folder
    print(f"Folder created: {folder_path}")
else:
    print(f"Folder already exists: {folder_path}")


Folder already exists: ./als_spark_checkpoints


In [4]:
spark = (SparkSession
     .builder
     .master('local[*]') # tells you master is 1 laptop using all 4 executors
     .config("spark.driver.memory", "8g")
     .config("spark.executor.memory", "8g")
     .config("spark.sql.shuffle.partitions", "8")  # reduce for local
     .getOrCreate()) # make new or get latest session

spark.sparkContext.setCheckpointDir("./als_spark_checkpoints")

In [5]:
# Read board game geek file on spark
schema = """
_c0 INT,
user STRING,
rating FLOAT,
comment STRING,
id INT, 
name STRING
"""
# Fix quote handling for comments column 
df_spark = spark.read.csv(
    "/mnt/data/public/bgg/bgg-19m-reviews.csv",
    sep=',', header=True,
    schema=schema,
    multiLine=True,
    quote='"',
    escape='"')
df_spark = df_spark.drop("_c0", "comment", "name")

In [7]:
# Map user name to integer
user_indexer = StringIndexer(inputCol="user", outputCol="user_id")
df_spark_indexed = user_indexer.fit(df_spark).transform(df_spark)

# Save Spark DF mapping of user to User ID
user_mapping = df_spark_indexed.select("user", "user_id").distinct()
df_spark_indexed = df_spark_indexed.drop("user")

# Change item column name for unformity
df_spark_indexed = df_spark_indexed.withColumnRenamed("id", "item_id")

In [8]:
df_spark_indexed.limit(10).show()

+------+-------+--------+
|rating|item_id| user_id|
+------+-------+--------+
|  10.0|  30549|   201.0|
|  10.0|  30549|  6591.0|
|  10.0|  30549|   631.0|
|  10.0|  30549|  1705.0|
|  10.0|  30549|  5796.0|
|  10.0|  30549|    78.0|
|  10.0|  30549|393225.0|
|  10.0|  30549|233206.0|
|  10.0|  30549| 22517.0|
|  10.0|  30549| 87298.0|
+------+-------+--------+



In [11]:
df_spark_sample = (
    df_spark_indexed.groupby("item_id")
    .count()
)

df_spark_sample.limit(10).show()

+-------+------+
|item_id| count|
+-------+------+
|     13|108195|
| 230802| 63019|
|  70323| 61203|
|  65244| 44555|
|  98778| 41619|
| 147020| 38601|
|    463| 34424|
|  37111| 33979|
| 205637| 33129|
|  31481| 30985|
+-------+------+



In [None]:
train, test = df_spark_indexed.randomSplit([0.8, 0.2])
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

# Warning! This cell takes a while to run.
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
reg_params = [.01, .005, .001]
for reg_param in reg_params:
  tuning_als = ALS(regParam = reg_param,
            userCol='user_id', itemCol='item_id', 
            ratingCol='rating', coldStartStrategy='drop')

  param_grid = ParamGridBuilder()\
                .addGrid(als.rank, [2, 4, 8, 12, 16])\
                .build()

  cv = CrossValidator(estimator=tuning_als, estimatorParamMaps=param_grid, 
                      evaluator=evaluator, parallelism=4)
  tuned_model= cv.fit(train)
  predictions = tuned_model.transform(test)
  evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                  predictionCol="prediction")
  rmse = evaluator.evaluate(predictions)
  print(reg_param, tuned_model.bestModel.rank, rmse)

0.01 10 1.3354120006135488


In [None]:
from pyspark.sql.functions import collect_list, col, size, slice
from pyspark.ml.evaluation import RankingEvaluator

print("Building evaluation dataset...")

# Get top-10 recommendations
k = 10
userRecs = als_model.recommendForAllUsers(k)

dfs_preds_grouped = userRecs.select(
    col('user_id'),
    col('recommendations.item_id').alias('predicted_item_id_arr')
).withColumn(
    'predicted_item_id_arr',
    col('predicted_item_id_arr').cast('array<double>')
)

# Get actual highly-rated items from test
thresh = 4.0
test_thresh_grouped = test.filter(
    col('rating') >= thresh
).groupBy('user_id').agg(
    collect_list(col('item_id').cast('double')).alias('rated_item_id_arr')
)

# Join predictions with actuals
dfs_preds_thresh_for_eval = test_thresh_grouped.join(
    dfs_preds_grouped,
    on='user_id',
    how='inner'
)

# CRITICAL: Limit array sizes to reduce computation
dfs_preds_limited = dfs_preds_thresh_for_eval.withColumn(
    'rated_item_id_arr',
    slice('rated_item_id_arr', 1, 20)  # Max 20 items
).withColumn(
    'predicted_item_id_arr',
    slice('predicted_item_id_arr', 1, 10)  # Max 10 predictions
)

print("âœ“ Evaluation dataset ready")

# Sample VERY small first - only 0.5%
print("Sampling 0.5% of users...")
dfs_preds_sampled = dfs_preds_limited.sample(fraction=0.005, seed=42)
dfs_preds_sampled.cache()

sample_count = dfs_preds_sampled.count()
print(f"Sample size: {sample_count:,} users")

if sample_count > 0:
    # Evaluate
    evaluator = RankingEvaluator(
        labelCol='rated_item_id_arr',
        predictionCol='predicted_item_id_arr',
        metricName='ndcgAtK',
        k=3
    )
    
    print("Evaluating NDCG...")
    ndcg_k = evaluator.evaluate(dfs_preds_sampled)
    print(f"\n{'='*50}")
    print(f"NDCG at k=3: {ndcg_k:.6f}")
    print(f"{'='*50}")
    
    # Quick diagnostics
    print("\nArray size check:")
    dfs_preds_sampled.select(
        size('rated_item_id_arr').alias('actual_size'),
        size('predicted_item_id_arr').alias('pred_size')
    ).describe().show()
else:
    print("ERROR: Sample is empty!")

In [11]:
# Generate top 10 movie recommendations for each user
userRecs = als_model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = als_model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = ratings.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = als_model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = ratings.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = als_model.recommendForItemSubset(movies, 10)

NameError: name 'ratings' is not defined

In [14]:
userRecs.limit(3).toPandas()

Unnamed: 0,user_id,recommendations
0,12,"[(149705, 8.133953094482422), (254632, 8.13036..."
1,13,"[(345976, 7.779404163360596), (63170, 7.649724..."
2,14,"[(345976, 8.195059776306152), (277538, 8.04972..."


In [12]:
# Check the data quality
print("="*50)
print("Diagnostic Checks")
print("="*50)

# 1. How many users have actual ratings in test?
print(f"\nUsers with rated items: {test_thresh_grouped.count():,}")

# 2. How many users got recommendations?
print(f"Users with predictions: {dfs_preds_grouped.count():,}")

# 3. How many users have BOTH?
print(f"Users in evaluation: {dfs_preds_thresh_for_eval.count():,}")

# 4. Look at array sizes
print("\nArray size statistics:")
dfs_preds_thresh_for_eval.select(
    F.size('rated_item_id_arr').alias('actual_size'),
    F.size('predicted_item_id_arr').alias('pred_size')
).describe().show()

# 5. Check for overlap - are ANY predictions correct?
print("\nSample of predictions vs actuals:")
dfs_preds_thresh_for_eval.select(
    'user_id',
    'rated_item_id_arr',
    'predicted_item_id_arr'
).show(5, truncate=False)

Diagnostic Checks


NameError: name 'test_thresh_grouped' is not defined

In [11]:
# DON'T count! Just sample immediately
print("Sampling 1% of users for evaluation...")

# Sample WITHOUT counting first
dfs_preds_sampled = dfs_preds_thresh_for_eval.sample(fraction=0.01, seed=42)

# Cache the sample
dfs_preds_sampled.cache()

# Now count only the SAMPLE (much smaller)
sample_count = dfs_preds_sampled.count()
print(f"Sampled users: {sample_count:,}")

# Evaluate on the sample
evaluator = RankingEvaluator(
    labelCol='rated_item_id_arr',
    predictionCol='predicted_item_id_arr',
    metricName='ndcgAtK',
    k=3
)

ndcg_k = evaluator.evaluate(dfs_preds_sampled)
print(f"NDCG at k=3 (on {sample_count:,} users): {ndcg_k}")

Sampling 1% of users for evaluation...


NameError: name 'dfs_preds_thresh_for_eval' is not defined

In [None]:
from pyspark.sql.functions import collect_list, col, explode
from pyspark.ml.evaluation import RankingEvaluator

# Step 1: Get predictions using the efficient built-in method
# This returns top N recommendations per user (much smaller dataset!)
k = 10  # or however many recommendations you want
userRecs = als_model.recommendForAllUsers(k)

# Step 2: Extract item IDs from recommendations
# userRecs has format: user_id | recommendations (array of struct(item_id, rating))
dfs_preds_grouped = userRecs.select(
    col('user_id'),
    col('recommendations.item_id').alias('predicted_item_id_arr')
).withColumn(
    'predicted_item_id_arr',
    col('predicted_item_id_arr').cast('array<double>')
)

# Step 3: Get actual highly-rated items from test set
thresh = 4.0
test_thresh_grouped = test.filter(
    col('rating') >= thresh
).groupBy('user_id').agg(
    collect_list(col('item_id').cast('double')).alias('rated_item_id_arr')
)

# Step 4: Join predictions with actuals
dfs_preds_thresh_for_eval = test_thresh_grouped.join(
    dfs_preds_grouped, 
    on='user_id', 
    how='inner'
)

# Step 5: Check size (should be manageable now!)
row_count = dfs_preds_thresh_for_eval.count()
print(f"Number of rows: {row_count:,}")

# Step 6: Evaluate
evaluator = RankingEvaluator(
    labelCol='rated_item_id_arr',
    predictionCol='predicted_item_id_arr',
    metricName='ndcgAtK',
    k=3
)
ndcg_k = evaluator.evaluate(dfs_preds_thresh_for_eval)
print(f"NDCG at k=3: {ndcg_k}")

- https://medium.com/@sinha.raunak/recommendation-systems-pyspark-als-model-evaluation-rmse-map-k-recall-k-ndcg-k-477bf6df893e

- https://github.com/CGrannan/building-boardgame-recommendation-systems/blob/master/spark_als_recommendation.ipynb (but no ndcg@k)

fix the code below tomorrow