In [0]:
bronze_df = spark.table("ecommerce_ai.bronze_online_retail")
bronze_df.display()


In [0]:
from pyspark.sql.functions import col, to_timestamp

silver_txn_df = (
    bronze_df
    .filter(~col("InvoiceNo").startswith("C"))   # Remove cancellations
    .filter(col("Quantity") > 0)
    .filter(col("UnitPrice") > 0)
    .filter(col("CustomerID").isNotNull())
    .withColumn("InvoiceDate", to_timestamp("InvoiceDate"))
)

silver_txn_df.display()


In [0]:
from pyspark.sql.functions import (
    max, countDistinct, sum, datediff, current_date
)

customer_features_df = (
    silver_txn_df
    .groupBy("CustomerID")
    .agg(
        max("InvoiceDate").alias("last_purchase_date"),
        countDistinct("InvoiceNo").alias("purchase_frequency"),
        sum(col("Quantity") * col("UnitPrice")).alias("total_spend")
    )
    .withColumn(
        "recency_days",
        datediff(current_date(), col("last_purchase_date"))
    )
    .withColumn(
        "avg_order_value",
        col("total_spend") / col("purchase_frequency")
    )
)

customer_features_df.display()


In [0]:
from pyspark.sql.functions import when

customer_features_df = (
    customer_features_df
    .withColumn(
        "churn_label",
        when(col("recency_days") > 90, 1).otherwise(0)
    )
)


In [0]:
customer_features_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("ecommerce_ai.silver_customer_features")


In [0]:
%sql
SELECT
    churn_label,
    COUNT(*) AS customer_count
FROM ecommerce_ai.silver_customer_features
GROUP BY churn_label;


In [0]:
%sql
DESCRIBE TABLE ecommerce_ai.silver_customer_features;
