In [0]:
from pyspark.sql.functions import col, when

df_bronze = spark.table("workspace.ecommerce.ecommerce_delta")

# Create Rating Mapping
rating_df = df_bronze.select("user_id", "product_id", "event_type").withColumn(
    "rating",
    when(col("event_type") == "purchase", 5)
    .when(col("event_type") == "cart", 3)
    .otherwise(1)
)

rating_df = rating_df.dropDuplicates(["user_id", "product_id"]).dropna(subset=["user_id", "product_id", "rating"])
rating_df = rating_df.withColumn("user_id", col("user_id").cast("integer")) \
                     .withColumn("product_id", col("product_id").cast("integer")) \
                     .withColumn("rating", col("rating").cast("float"))

train_data = rating_df.sample(withReplacement=False, fraction=0.1, seed=42)

In [0]:
from pyspark.ml.recommendation import ALS

# Train ALS Model
als = ALS(
    maxIter=5,
    regParam=0.01,
    userCol="user_id",
    itemCol="product_id",
    ratingCol="rating",
    coldStartStrategy="drop"
)

model_als = als.fit(train_data)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number

users_df = train_data.select("user_id").distinct().limit(5)

products_df = train_data.select("product_id").distinct()

user_product_grid = users_df.crossJoin(products_df)

predictions = model_als.transform(user_product_grid)

valid_predictions = predictions.dropna(subset=["prediction"])

# Top 5 products
window_spec = Window.partitionBy("user_id").orderBy(col("prediction").desc())

top_5_recs = (
    valid_predictions
    .withColumn("rank", row_number().over(window_spec))
    .filter(col("rank") <= 5)
    .select("user_id", "product_id", "prediction")
    .orderBy("user_id", "rank")
)

display(top_5_recs)