# Gold Layer â€“ Repeat Purchase Features

## Purpose
This notebook creates customer-level features and labels for predicting
repeat purchase behavior in an e-commerce setting.

## Input
- Silver table:
  workspace.repeat_purchase.silver_customer_transactions

## Business Definition
- A repeat purchase is defined as a customer placing another order
  within 30 days of a previous order.

## Output
- Gold table:
  workspace.repeat_purchase.gold_customer_features

## Notes
- One row represents one customer
- Output is suitable for machine learning and analytics


In [0]:
# Read the Silver table containing clean, order-level customer transactions

df_silver = spark.table(
    "workspace.repeat_purchase.silver_customer_transactions"
)


In [0]:
# Define a window to order transactions by time for each customer

from pyspark.sql.window import Window

customer_window = (
    Window
    .partitionBy("CustomerID")
    .orderBy("invoice_ts")
)


In [0]:
# Add the timestamp of the next order for each customer

from pyspark.sql.functions import lead

df_with_next_order = df_silver.withColumn(
    "next_order_ts",
    lead("invoice_ts").over(customer_window)
)


In [0]:
# Calculate number of days between current order and next order

from pyspark.sql.functions import datediff, col

df_with_gap = df_with_next_order.withColumn(
    "days_to_next_order",
    datediff(col("next_order_ts"), col("invoice_ts"))
)


In [0]:
# Create repeat purchase flag at order level
# 1 = next order within 30 days
# 0 = otherwise

from pyspark.sql.functions import when

df_labeled_orders = df_with_gap.withColumn(
    "repeat_within_30_days",
    when(col("days_to_next_order") <= 30, 1).otherwise(0)
)


In [0]:
# Calculate first and last purchase dates for each customer

from pyspark.sql.functions import min, max

df_customer_dates = (
    df_labeled_orders
    .groupBy("CustomerID")
    .agg(
        min("invoice_ts").alias("first_order_date"),
        max("invoice_ts").alias("last_order_date")
    )
)


In [0]:
# Calculate customer active days

from pyspark.sql.functions import datediff

df_customer_dates = df_customer_dates.withColumn(
    "active_days",
    datediff("last_order_date", "first_order_date")
)


In [0]:
# Calculate average gap between orders for each customer

from pyspark.sql.functions import avg

df_avg_gap = (
    df_labeled_orders
    .groupBy("CustomerID")
    .agg(
        avg("days_to_next_order").alias("avg_days_between_orders")
    )
)


In [0]:
# Combine all customer-level features into final Gold dataset

from pyspark.sql.functions import max, count, sum as spark_sum

df_customer_features = (
    df_labeled_orders
    .groupBy("CustomerID")
    .agg(
        max("repeat_within_30_days").alias("repeat_purchase_label"),
        count("InvoiceNo").alias("total_orders"),
        spark_sum("order_value").alias("total_spent")
    )
    .join(df_customer_dates, on="CustomerID", how="left")
    .join(df_avg_gap, on="CustomerID", how="left")
    .withColumn(
        "avg_order_value",
        col("total_spent") / col("total_orders")
    )
)



In [0]:
from pyspark.sql.functions import round

# Round numeric features for readability and consistency

df_customer_features = (
    df_customer_features
    .withColumn("avg_days_between_orders", round("avg_days_between_orders", 2))
    .withColumn("avg_order_value", round("avg_order_value", 2))
)


In [0]:
# Write customer-level Gold table for analytics and ML

(
    df_customer_features.write
    .format("delta")
    .mode("overwrite")
    .option("mergeSchema", "true")
    .saveAsTable("workspace.repeat_purchase.gold_customer_features")
)



In [0]:
# Validate distribution of repeat vs non-repeat customers

display(
    spark.sql("""
    SELECT
        repeat_purchase_label,
        COUNT(*) AS customer_count
    FROM workspace.repeat_purchase.gold_customer_features
    GROUP BY repeat_purchase_label
    """)
)


repeat_purchase_label,customer_count
0,2618
1,1721
