In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.storagelevel import StorageLevel

spark = SparkSession.builder \
    .appName("Amazon_Reviews_Recommender_Optimized") \
    .config("spark.driver.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "2g") \
    .config("spark.sql.shuffle.partitions", "2000") \
    .config("spark.default.parallelism", "2000") \
    .config("spark.memory.fraction", "0.6") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.kryoserializer.buffer.max", "1024m") \
    .getOrCreate()

spark.sparkContext.setCheckpointDir("checkpoints_als")

PARQUET_PATH = "reviews_final_parquet"
df = spark.read.parquet(PARQUET_PATH)

def load_and_clean_data(df):
    item_counts = df.groupBy("asin").count().filter(F.col("count") >= 5)
    user_counts = df.groupBy("reviewerID").count().filter(F.col("count") >= 5)
    
    df_clean = df.join(item_counts, "asin", "left_semi") \
                 .join(user_counts, "reviewerID", "left_semi")
    return df_clean

print("--- Filtering Data ---")
df_spark = load_and_clean_data(df)


def create_index_mapping(df, col_name, output_col):
    print(f"--- Indexing column: {col_name} ---")
    distinct_vals = df.select(col_name).distinct()
    
    mapping_rdd = distinct_vals.rdd.map(lambda r: r[0]).zipWithUniqueId()
    
    mapping_df = spark.createDataFrame(mapping_rdd, schema=["original_id", "numeric_id"])
    
    mapping_path = f"mappings/{col_name}_mapping"
    mapping_df.write.mode("overwrite").parquet(mapping_path)
    
    return spark.read.parquet(mapping_path).withColumnRenamed("original_id", col_name).withColumnRenamed("numeric_id", output_col)

user_mapping = create_index_mapping(df_spark, "reviewerID", "user_id_index")
item_mapping = create_index_mapping(df_spark, "asin", "asin_index")

print("--- Joining Indices back to Data ---")
df_indexed = df_spark.join(user_mapping, on="reviewerID", how="inner") \
                     .join(item_mapping, on="asin", how="inner")

df_final = df_indexed.select(
    F.col("user_id_index").cast("integer"),
    F.col("asin_index").cast("integer"),
    F.col("overall").cast("float")
)

print("--- Splitting Data ---")
(training_data, test_data) = df_final.randomSplit([0.8, 0.2], seed=42)

training_data.persist(StorageLevel.DISK_ONLY)
test_data.persist(StorageLevel.DISK_ONLY)

print(f"Training Count: {training_data.count()}")
print(f"Test Count: {test_data.count()}")

print("--- Calculating User Means ---")
user_means = training_data.groupBy("user_id_index") \
    .agg(F.avg("overall").alias("user_mean"))

training_centered = training_data.join(user_means, on="user_id_index", how="inner")
training_centered = training_centered.withColumn("centered_rating", F.col("overall") - F.col("user_mean"))

training_centered_path = "tmp_training_centered"
training_centered.write.mode("overwrite").parquet(training_centered_path)
training_centered_ready = spark.read.parquet(training_centered_path)

print("--- Training ALS ---")
als = ALS(
    maxIter=10,
    regParam=0.1,
    rank=10, 
    userCol="user_id_index",
    itemCol="asin_index",
    ratingCol="centered_rating",
    coldStartStrategy="drop",
    nonnegative=False,
    checkpointInterval=2  
)

model = als.fit(training_centered_ready)
print("--- Model Trained Successfully ---")

print("--- Evaluating ---")
test_with_mean = test_data.join(user_means, on="user_id_index", how="inner")
predictions = model.transform(test_with_mean)

final_predictions = predictions.withColumn(
    "final_prediction", 
    F.col("prediction") + F.col("user_mean")
)

final_predictions = final_predictions.withColumn(
    "final_prediction_clipped", 
    F.when(F.col("final_prediction") > 5, 5.0)
     .when(F.col("final_prediction") < 1, 1.0)
     .otherwise(F.col("final_prediction"))
)

evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="overall", 
    predictionCol="final_prediction_clipped"
)

rmse = evaluator.evaluate(final_predictions)
print(f"RMSE: {rmse}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/01/16 00:57:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
26/01/16 00:57:22 WARN SparkContext: Spark is not running in local mode, therefore the checkpoint directory must not be on the local filesystem. Directory 'checkpoints_als' appears to be on the local filesystem.
                                                                                

--- Filtering Data ---
--- Indexing column: reviewerID ---


                                                                                

--- Indexing column: asin ---


                                                                                

--- Joining Indices back to Data ---
--- Splitting Data ---
--- Persisting to Disk (Not RAM) ---


                                                                                

Training Count: 60149963


                                                                                

Test Count: 15036389
--- Calculating User Means ---


                                                                                

--- Training ALS ---


                                                                                

--- Model Trained Successfully ---
--- Evaluating ---




RMSE: 1.027357238789061


                                                                                

In [None]:
from pyspark.ml.recommendation import ALSModel

# --- 1. Save the Model ---
model_path = "als_recommender_model_v1"
# print(f"--- Saving model to {model_path} ---")
model.write().overwrite().save(model_path)
print("Success: Model saved.")

print(f"--- Loading model from {model_path} ---")
loaded_model = ALSModel.load(model_path)
print("Success: Model loaded.")

print("--- Verifying Loaded Model on Test Data ---")

predictions_from_loaded = loaded_model.transform(test_with_mean)

# De-Centering logic
final_check = predictions_from_loaded.withColumn(
    "final_prediction_loaded", 
    F.col("prediction") + F.col("user_mean")
)

final_check.select(
    "user_id_index", 
    "asin_index", 
    "overall", 
    "prediction", 
    "user_mean",
    "final_prediction_loaded"
).show(5)

count = final_check.count()

evaluator = RegressionEvaluator(
    metricName="rmse", 
    labelCol="overall", 
    predictionCol="final_prediction_loaded"
)

rmse = evaluator.evaluate(final_check)
print(f"RMSE: {rmse}")
print(f"Verification complete. Generated predictions for {count} rows.")

--- Loading model from als_recommender_model_v1 ---
Success: Model loaded.
--- Verifying Loaded Model on Test Data ---


                                                                                

+-------------+----------+-------+----------+-----------------+-----------------------+
|user_id_index|asin_index|overall|prediction|        user_mean|final_prediction_loaded|
+-------------+----------+-------+----------+-----------------+-----------------------+
|      3472753|       496|    5.0|-0.5912889|3.945945945945946|      3.354657021728722|
|       792285|       496|    5.0|-0.5473409|              4.0|     3.4526590704917908|
|      1415638|       496|    5.0|  0.709856|4.461038961038961|      5.170894934759511|
|      1730055|       496|    5.0|-0.6964582|              3.2|     2.5035417795181276|
|       351178|       496|    5.0| 0.4956245|4.441860465116279|      4.937484977550285|
+-------------+----------+-------+----------+-----------------+-----------------------+
only showing top 5 rows





RMSE: 1.0393701316582167
Verification complete. Generated predictions for 15033986 rows.


                                                                                