In [0]:
from pyspark.sql.functions import col, when

rating_df = spark.table("workspace.ecommerce.events_delta") \
    .withColumn(
        "rating",
        when(col("event_type") == "view", 1)
        .when(col("event_type") == "click", 2)
        .when(col("event_type") == "add_to_cart", 3)
        .when(col("event_type") == "purchase", 5)
        .otherwise(0)
    )

In [0]:
ratings_df = rating_df \
    .withColumn("user_id", col("user_id").cast("integer")) \
    .withColumn("product_id", col("product_id").cast("integer")) \
    .withColumn("rating", col("rating").cast("float"))

ratings_df.select("user_id", "product_id", "rating", "event_type").show(5)

In [0]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="user_id",
    itemCol="product_id",
    ratingCol="rating",
    rank=10,
    maxIter=5,
    regParam=0.1,
    coldStartStrategy="drop",
    nonnegative=True
)

als_model = als.fit(ratings_df)

print("ALS Model trained successfully!")

In [0]:
products_df = ratings_df.select("product_id").distinct().limit(200)
users_df = ratings_df.select("user_id").distinct().limit(1000)

In [0]:
all_pairs = users_df.crossJoin(products_df)

In [0]:
predictions = als_model.transform(all_pairs)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

windowSpec = Window.partitionBy("user_id").orderBy(desc("prediction"))

ranked = predictions.withColumn(
    "rank",
    row_number().over(windowSpec)
)

top5_recs = ranked.filter("rank <= 5") \
    .select("user_id", "product_id", "prediction")

top5_recs.show(5)

In [0]:
top5_recs.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("workspace.ecommerce.gold_recommendations")

In [0]:
%sql
SELECT *
FROM workspace.ecommerce.gold_recommendations
ORDER BY user_id, prediction DESC
LIMIT 20;