In [None]:
account =  # your storage account name
container =  # your container name

bronze = f"abfss://{container}@{account}.dfs.core.windows.net/bronze"
silver = f"abfss://{container}@{account}.dfs.core.windows.net/silver"

In [None]:
from pyspark.sql import functions as F

orders_bronze = f"{bronze}/orders"
fact_orders_silver = f"{silver}/fact_orders"

ob = spark.read.parquet(orders_bronze)

fo = (ob
    # normalize column names already done in bronze; cast types here
    .withColumn("order_id", F.col("order_id").cast("int"))
    .withColumn("user_id", F.col("user_id").cast("int"))
    .withColumn("eval_set", F.col("eval_set").cast("string"))
    .withColumn("order_number", F.col("order_number").cast("int"))
    .withColumn("order_dow", F.col("order_dow").cast("int"))
    # sample showed "08" / "07" — cast to int safely
    .withColumn("order_hour_of_day", F.col("order_hour_of_day").cast("int"))
    .withColumn("days_since_prior_order", F.col("days_since_prior_order").cast("double"))
    .select("order_id","user_id","eval_set","order_number","order_dow",
            "order_hour_of_day","days_since_prior_order")
)

# write silver
(fo.repartition(1)
   .write.mode("overwrite")
   .parquet(fact_orders_silver))

print("silver.fact_orders:", fact_orders_silver, "rows:", fo.count())

In [None]:
products_bronze    = f"{bronze}/products"
aisles_bronze      = f"{bronze}/aisles"
departments_bronze = f"{bronze}/departments"
dim_products_silver = f"{silver}/dim_products"

p  = spark.read.parquet(products_bronze)
a  = spark.read.parquet(aisles_bronze)
d  = spark.read.parquet(departments_bronze)

# cast + join
p = (p
     .withColumn("product_id", F.col("product_id").cast("int"))
     .withColumn("product_name", F.col("product_name").cast("string"))
     .withColumn("aisle_id", F.col("aisle_id").cast("int"))
     .withColumn("department_id", F.col("department_id").cast("int"))
)

a = (a
     .withColumn("aisle_id", F.col("aisle_id").cast("int"))
     .withColumnRenamed("aisle", "aisle_name")
)

d = (d
     .withColumn("department_id", F.col("department_id").cast("int"))
     .withColumnRenamed("department", "department_name")
)

dim_products = (p
    .join(a, "aisle_id", "left")
    .join(d, "department_id", "left")
    .select("product_id","product_name","aisle_id","aisle_name","department_id","department_name")
)

(dim_products.repartition(1)
    .write.mode("overwrite")
    .parquet(dim_products_silver))

print(" silver.dim_products:", dim_products_silver, "rows:", dim_products.count())

In [None]:
opp_bronze = f"{bronze}/order_products__prior"
opt_bronze = f"{bronze}/order_products__train"
fact_op_silver = f"{silver}/fact_order_products"

prior = (spark.read.parquet(opp_bronze)
         .withColumn("split_source", F.lit("prior")))

train = (spark.read.parquet(opt_bronze)
         .withColumn("split_source", F.lit("train")))

# ensure consistent types
def cast_op(df):
    return (df
        .withColumn("order_id", F.col("order_id").cast("int"))
        .withColumn("product_id", F.col("product_id").cast("int"))
        .withColumn("add_to_cart_order", F.col("add_to_cart_order").cast("int"))
        .withColumn("reordered", F.col("reordered").cast("int"))
        .select("order_id","product_id","add_to_cart_order","reordered","split_source")
    )

fact_op = cast_op(prior).unionByName(cast_op(train))

(fact_op.repartition(8)  # a few files for parallel reads later
   .write.mode("overwrite")
   .parquet(fact_op_silver))

print(" silver.fact_order_products:", fact_op_silver, "rows:", fact_op.count())

In [None]:
fact_orders_silver = f"{silver}/fact_orders"
fact_op_silver     = f"{silver}/fact_order_products"
dim_users_silver   = f"{silver}/dim_users"
fup_silver         = f"{silver}/fact_user_product"

fo = spark.read.parquet(fact_orders_silver)
op = spark.read.parquet(fact_op_silver)

# basket sizes per order
basket_sizes = (op.groupBy("order_id").agg(F.count("*").alias("basket_size")))

# join to orders to compute user metrics
orders_plus = (fo.join(basket_sizes, "order_id", "left"))

# user reorder ratio: needs reordered flag joined at user level
user_reorders = (op.groupBy().count())  # just placeholder to avoid confusion

# dim_users: user-level rollups
dim_users = (orders_plus
  .groupBy("user_id")
  .agg(
      F.countDistinct("order_id").alias("total_orders"),
      F.avg("days_since_prior_order").alias("avg_days_between_orders"),
      F.expr("percentile_approx(order_hour_of_day, 0.5)").alias("median_order_hour"),
      F.expr("percentile_approx(order_dow, 0.5)").alias("median_order_dow"),
      F.avg("basket_size").alias("avg_basket_size")
  )
)

# compute user reorder ratio (share of line items with reordered=1)
user_reorder_ratio = (op.groupBy("order_id")
                        .agg(F.avg(F.col("reordered").cast("double")).alias("order_reorder_rate"))
                      .join(fo.select("order_id","user_id"), "order_id", "left")
                      .groupBy("user_id")
                      .agg(F.avg("order_reorder_rate").alias("reorder_ratio"))
                     )

dim_users = (dim_users.join(user_reorder_ratio, "user_id", "left")
                      .fillna({"reorder_ratio": 0.0})
)

(dim_users.repartition(1)
   .write.mode("overwrite")
   .parquet(dim_users_silver))

print("silver.dim_users:", dim_users_silver, "rows:", dim_users.count())

# (optional) user × product history for ML features and analytics
fup = (op.join(fo.select("order_id","user_id"), "order_id", "left")
         .groupBy("user_id","product_id")
         .agg(
             F.count("*").alias("times_purchased"),
             F.avg("add_to_cart_order").alias("avg_add_to_cart_position"),
             F.max("reordered").alias("ever_reordered")
         )
      )

(fup.repartition(8)
    .write.mode("overwrite")
    .parquet(fup_silver))

print("silver.fact_user_product:", fup_silver, "rows:", fup.count())

In [None]:
from pyspark.sql import functions as F

account =  # your storage account name
container =  # your container name
silver = f"abfss://{container}@{account}.dfs.core.windows.net/silver"
gold   = f"abfss://{container}@{account}.dfs.core.windows.net/gold"

op = spark.read.parquet(f"{silver}/fact_order_products")

# product popularity + reorder ratio
prod_feats = (op.groupBy("product_id")
                .agg(
                    F.count("*").alias("prod_times_ordered"),
                    F.avg(F.col("reordered").cast("double")).alias("prod_reorder_ratio")
                ))

(prod_feats.repartition(1)
    .write.mode("overwrite")
    .parquet(f"{gold}/product_features"))

print("gold.product_features written")

In [9]:
dim_users = spark.read.parquet(f"{silver}/dim_users")
fup       = spark.read.parquet(f"{silver}/fact_user_product")   # user×product history
prod_f    = spark.read.parquet(f"{gold}/product_features")
op        = spark.read.parquet(f"{silver}/fact_order_products")
fo        = spark.read.parquet(f"{silver}/fact_orders")

# Attach user_id to line items (for label joining)
op_with_user = op.join(fo.select("order_id","user_id"), "order_id", "left")

# Build a supervised dataset on observed pairs (user, product) with label = reordered
dataset = (op_with_user
  .join(dim_users.select("user_id","total_orders","avg_days_between_orders","avg_basket_size","reorder_ratio"),
        "user_id","left")
  .join(prod_f, "product_id", "left")
  .join(fup.select("user_id","product_id","times_purchased","avg_add_to_cart_position","ever_reordered"),
        ["user_id","product_id"], "left")
  .fillna({"times_purchased":0, "avg_add_to_cart_position":0.0, "ever_reordered":0,
           "prod_times_ordered":0, "prod_reorder_ratio":0.0})
  .select(
      "user_id","product_id",
      "reordered",  # label
      "total_orders","avg_days_between_orders","avg_basket_size","reorder_ratio",
      "prod_times_ordered","prod_reorder_ratio",
      "times_purchased","avg_add_to_cart_position","ever_reordered"
  )
)

dataset.cache()
print("Rows:", dataset.count())
dataset.limit(5).show()

StatementMeta(sparksmall, 5, 3, Finished, Available, Finished)

Rows: 33819106
+-------+----------+---------+------------+-----------------------+------------------+-------------------+------------------+-------------------+---------------+------------------------+--------------+
|user_id|product_id|reordered|total_orders|avg_days_between_orders|   avg_basket_size|      reorder_ratio|prod_times_ordered| prod_reorder_ratio|times_purchased|avg_add_to_cart_position|ever_reordered|
+-------+----------+---------+------------+-----------------------+------------------+-------------------+------------------+-------------------+---------------+------------------------+--------------+
|      9|     47167|        0|           4|                   22.0|              24.5| 0.4409090909090909|              2553| 0.6568742655699178|              1|                    21.0|             0|
|     14|      4210|        0|          14|      21.23076923076923|15.785714285714286| 0.3710018012813044|             36968| 0.7800530188270937|              1|                

In [10]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

features = [
    "total_orders","avg_days_between_orders","avg_basket_size","reorder_ratio",
    "prod_times_ordered","prod_reorder_ratio",
    "times_purchased","avg_add_to_cart_position","ever_reordered"
]

assembler = VectorAssembler(inputCols=features, outputCol="features")
lr = LogisticRegression(labelCol="reordered", featuresCol="features", maxIter=50, regParam=0.01)

train, test = dataset.randomSplit([0.8, 0.2], seed=42)

pipe = Pipeline(stages=[assembler, lr]).fit(train)

pred = pipe.transform(test)
auc = BinaryClassificationEvaluator(labelCol="reordered", metricName="areaUnderROC").evaluate(pred)
print("AUC:", round(auc, 4))

StatementMeta(sparksmall, 5, 4, Finished, Available, Finished)

AUC: 0.8973


In [11]:
scored_all = pipe.transform(dataset) \
    .select("user_id","product_id", F.col("probability").alias("prob_vec"), F.col("prediction").alias("pred"), F.col("reordered").alias("label"))

# Extract P(reorder=1) from the probability vector
get1 = F.udf(lambda v: float(v[1]), "double")
scored_all = scored_all.withColumn("reorder_prob", get1("prob_vec")).drop("prob_vec")

(scored_all.repartition(8)
    .write.mode("overwrite")
    .parquet(f"{gold}/predictions_reorder"))

print("✅ gold.predictions_reorder written")

StatementMeta(sparksmall, 5, 5, Finished, Available, Finished)

✅ gold.predictions_reorder written


In [12]:
# from earlier
pipe   # fitted PipelineModel
train, test = dataset.randomSplit([0.8, 0.2], seed=42)
from pyspark.sql import functions as F, Window

StatementMeta(sparksmall, 5, 6, Finished, Available, Finished)

In [13]:
pred_test = pipe.transform(test) \
    .select("user_id","product_id","reordered","probability","prediction")

# probability is a DenseVector [P(0), P(1)] — pull P(1)
get1 = F.udf(lambda v: float(v[1]), "double")
pred_test = pred_test.withColumn("p1", get1("probability")).drop("probability")

pred_test.cache(); print("test rows:", pred_test.count())
pred_test.show(5, truncate=False)

StatementMeta(sparksmall, 5, 7, Finished, Available, Finished)

test rows: 6766403
+-------+----------+---------+----------+-------------------+
|user_id|product_id|reordered|prediction|p1                 |
+-------+----------+---------+----------+-------------------+
|16     |48171     |0        |0.0       |0.02287248472478387|
|35     |42625     |0        |1.0       |0.5694348750904394 |
|38     |37119     |0        |1.0       |0.6341471237784557 |
|40     |29926     |1        |1.0       |0.7084113605295783 |
|54     |23734     |0        |1.0       |0.6585116697468143 |
+-------+----------+---------+----------+-------------------+
only showing top 5 rows



In [14]:
import numpy as np

def pr_at_threshold(df, thr):
    pred = df.withColumn("yhat", (F.col("p1") >= F.lit(thr)).cast("int"))
    tp = pred.filter((F.col("yhat")==1) & (F.col("reordered")==1)).count()
    fp = pred.filter((F.col("yhat")==1) & (F.col("reordered")==0)).count()
    fn = pred.filter((F.col("yhat")==0) & (F.col("reordered")==1)).count()
    precision = tp / (tp + fp) if (tp+fp)>0 else 0.0
    recall    = tp / (tp + fn) if (tp+fn)>0 else 0.0
    f1 = 2*precision*recall/(precision+recall) if (precision+recall)>0 else 0.0
    return precision, recall, f1

thresholds = [round(x,2) for x in np.linspace(0.1,0.9,9)]
metrics = []
for t in thresholds:
    p,r,f1 = pr_at_threshold(pred_test, t)
    metrics.append((t,p,r,f1))
    
print("thr | precision | recall | f1")
for t,p,r,f1 in metrics:
    print(f"{t:>3} | {p:9.3f} | {r:6.3f} | {f1:5.3f}")

# choose the best F1 threshold (or pick based on business need)
best_thr, _, _, best_f1 = max(metrics, key=lambda x: x[3])
print("Best F1 threshold:", best_thr, "F1:", round(best_f1,3))

StatementMeta(sparksmall, 5, 8, Finished, Available, Finished)

thr | precision | recall | f1
0.1 |     0.782 |  1.000 | 0.878
0.2 |     0.782 |  1.000 | 0.878
0.3 |     0.782 |  1.000 | 0.878
0.4 |     0.782 |  1.000 | 0.878
0.5 |     0.783 |  0.997 | 0.877
0.6 |     0.803 |  0.947 | 0.869
0.7 |     0.871 |  0.745 | 0.803
0.8 |     0.927 |  0.480 | 0.632
0.9 |     0.955 |  0.255 | 0.403
Best F1 threshold: 0.2 F1: 0.878


In [15]:
K = 5  # try 5, 10, 20

# For each user, rank products by predicted probability
w = Window.partitionBy("user_id").orderBy(F.col("p1").desc())
ranked = pred_test.withColumn("rank", F.row_number().over(w))

topk = ranked.filter(F.col("rank") <= K) \
             .select("user_id","product_id","reordered","p1","rank")

# Precision@K: of the K we recommended, what fraction were actually reordered?
prec_k = (topk.groupBy("user_id")
              .agg(F.avg(F.col("reordered").cast("double")).alias("prec_at_k"))
              .agg(F.avg("prec_at_k").alias("Precision_at_K"))
         ).collect()[0]["Precision_at_K"]

# Recall@K: of the items the user actually reordered, how many did we capture in top-K?
# First, number of positives per user in the test set:
pos_per_user = (pred_test.groupBy("user_id")
                         .agg(F.sum(F.col("reordered").cast("int")).alias("pos")))
# True positives in top-K:
tp_topk = (topk.groupBy("user_id")
                .agg(F.sum(F.col("reordered").cast("int")).alias("tp_in_topk")))

recall_k = (tp_topk.join(pos_per_user, "user_id", "left")
                 .withColumn("rec_at_k", F.when(F.col("pos")>0, F.col("tp_in_topk")/F.col("pos")).otherwise(F.lit(None)))
                 .agg(F.avg("rec_at_k").alias("Recall_at_K"))
                 .collect()[0]["Recall_at_K"])

print(f"Precision@{K}:", round(float(prec_k or 0.0), 4))
print(f"Recall@{K}:", round(float(recall_k or 0.0), 4))

StatementMeta(sparksmall, 5, 9, Finished, Available, Finished)

Precision@5: 0.6694
Recall@5: 0.517


In [None]:
# Average Precision for a single user@K: mean of precision@i at each hit position i<=K
def avg_precision_at_k(pdf, K=5):
    pdf = pdf.sort_values("p1", ascending=False).head(K)
    hits = 0
    precisions = []
    for i, (_, row) in enumerate(pdf.iterrows(), start=1):
        if row["reordered"] == 1:
            hits += 1
            precisions.append(hits / i)
    return sum(precisions)/min(K, max(1, int(pdf["reordered"].sum()))) if precisions else 0.0

import pandas as pd
mapk = (pred_test.select("user_id","product_id","reordered","p1")
              .toPandas()
              .groupby("user_id")
              .apply(lambda g: avg_precision_at_k(g, K=K))
              .mean())

print(f"MAP@{K}:", round(float(mapk), 4))

In [None]:
N = 10
w = Window.partitionBy("user_id").orderBy(F.col("p1").desc())
topn = pred_test.withColumn("rank", F.row_number().over(w)) \
                .filter(F.col("rank") <= N) \
                .select("user_id","product_id","p1","rank")

# (Optional) attach product names
account =  # your storage account name
container =  # your container name
prod = spark.read.parquet(f"abfss://{container}@{account}.dfs.core.windows.net/silver/dim_products") \
                 .select("product_id","product_name","aisle_name","department_name")

topn_named = (topn.join(prod, "product_id", "left")
                   .orderBy("user_id","rank"))

(topn_named.repartition(8)
    .write.mode("overwrite")
    .parquet(f"abfss://{container}@{account}.dfs.core.windows.net/gold/topn_recommendations"))

print("✅ gold.topn_recommendations written")
topn_named.show(20, truncate=False)

StatementMeta(sparksmall, 5, 11, Finished, Available, Finished)

✅ gold.topn_recommendations written
+----------+-------+------------------+----+-----------------------------------------------------+----------------------+---------------+
|product_id|user_id|p1                |rank|product_name                                         |aisle_name            |department_name|
+----------+-------+------------------+----+-----------------------------------------------------+----------------------+---------------+
|196       |1      |0.8708614017612107|1   |Soda                                                 |soft drinks           |beverages      |
|196       |1      |0.8708614017612107|2   |Soda                                                 |soft drinks           |beverages      |
|196       |1      |0.8708614017612107|3   |Soda                                                 |soft drinks           |beverages      |
|196       |1      |0.8708614017612107|4   |Soda                                                 |soft drinks           |beverages      