In [None]:
import pandas as pandas
from sklearn.model_selection import train_test_split

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

from pyspark.ml.evaluation import RegressionEvaluator

merged_df = pandas.read_csv("merged.csv")

user_review_counts = merged_df.groupby("user_id").value_counts()

valid_user_ids = user_review_counts[user_review_counts >= 5].index.get_level_values(0).unique()

als_df = merged_df[merged_df["user_id"].isin(valid_user_ids)]

als_df = als_df[['user_id', 'asin', 'rating']]

In [None]:
user_ids = als_df['user_id'].unique().tolist()
product_ids = als_df['asin'].unique().tolist()

user_id_map = {user_id: i for i, user_id in enumerate(user_ids)}
product_id_map = {product_id: i for i, product_id in enumerate(product_ids)}

als_df['user_index'] = als_df['user_id'].map(user_id_map)
als_df['product_index'] = als_df['asin'].map(product_id_map)

# Split the data
train_data, test_data = train_test_split(als_df[['user_index', 'product_index', 'rating']],
                                                test_size=0.2, random_state=42)

In [None]:
spark = SparkSession.builder.appName("ALSExample").getOrCreate()

train_df_spark = spark.createDataFrame(train_data)
test_df_spark = spark.createDataFrame(test_data)

als = ALS(rank=10, maxIter=10, userCol="user_index", itemCol="product_index", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(train_df_spark)

In [None]:
predictions = model.transform(test_df_spark)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print(f"RMSE on the test set: {rmse}")

# Stop Spark Session
spark.stop()

In [None]:
"""  Code For Demonstrating Recommendations for Random Users in Test Set

"""

# from pyspark.sql import SparkSession

# # Re-initialize Spark Session if you stopped it earlier
# spark = SparkSession.builder.appName("ALSRecommenderDemo").getOrCreate()

# # Convert your test_data (Pandas) to Spark DataFrame if needed
# test_df_spark = spark.createDataFrame(test_data)

# # Get a list of unique user indices from the test set
# test_users = test_df_spark.select("user_index").distinct().sample(fraction=0.1, seed=42).limit(3).collect()
# test_user_indices = [row.user_index for row in test_users]

# print("\nTop 5 Recommendations for Random Users in Test Set:")
# for user_index in test_user_indices:
#     user_recommendations = model.recommendForAllUsers(5) # Get top 5 for all users
#     user_recs_df = user_recommendations.filter(user_recommendations.user_index == user_index) \
#                                        .select("recommendations") \
#                                        .rdd.flatMap(lambda x: x).collect()[0]

#     print(f"\nRecommendations for User Index: {user_index} (Original User ID: {user_ids[user_index]})")
#     for rec in user_recs_df:
#         product_index = rec.product_index
#         predicted_rating = rec.rating
#         original_asin = product_ids[product_index]
#         print(f"  Product ASIN: {original_asin}, Predicted Rating: {predicted_rating:.2f}")

# spark.stop()