In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS demo_bde")

from pyspark.sql import functions as F

transactions = [
    ("t1", "c1", 25.50, "m1", "2026-02-10 10:01:00", "NY"),
    ("t2", "c1", 12.00, "m2", "2026-02-10 10:05:00", "NY"),
    ("t3", "c2", 100.00, "m1", "2026-02-10 11:00:00", "CA"),
    ("t4", "c3", None,  "m3", "2026-02-10 11:10:00", "TX"),   # bad: amount null
    ("t5", "c2", -5.00, "m2", "2026-02-10 11:20:00", "CA"),   # bad: negative amount
    ("t2", "c1", 12.00, "m2", "2026-02-10 10:05:00", "NY")    # duplicate transaction_id
]

cols = ["transaction_id", "customer_id", "amount", "merchant_id", "transaction_time", "state"]

df_raw = spark.createDataFrame(transactions, cols) \
    .withColumn("transaction_time", F.to_timestamp("transaction_time"))

display(df_raw)

(df_raw.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("demo_bde.bronze_transactions"))

display(spark.table("demo_bde.bronze_transactions"))

from pyspark.sql import functions as F
from pyspark.sql.window import Window

bronze = spark.table("demo_bde.bronze_transactions")

# keep the latest record per transaction_id
w = Window.partitionBy("transaction_id").orderBy(F.col("transaction_time").desc())

silver = (
    bronze
    .filter(F.col("amount").isNotNull())
    .filter(F.col("amount") > 0)
    .withColumn("rn", F.row_number().over(w))
    .filter(F.col("rn") == 1)
    .drop("rn")
)

display(silver)

(silver.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("demo_bde.silver_transactions"))
    
display(spark.table("demo_bde.silver_transactions"))


from pyspark.sql import functions as F

silver_tbl = spark.table("demo_bde.silver_transactions")

gold_daily = (
    silver_tbl
    .withColumn("txn_date", F.to_date("transaction_time"))
    .groupBy("customer_id", "txn_date")
    .agg(
        F.sum("amount").alias("total_spend"),
        F.count("*").alias("txn_count")
    )
)

display(gold_daily)

(gold_daily.write
    .mode("overwrite")
    .format("delta")
    .saveAsTable("demo_bde.gold_daily_customer_metrics"))

display(spark.table("demo_bde.gold_daily_customer_metrics"))




transaction_id,customer_id,amount,merchant_id,transaction_time,state
t1,c1,25.5,m1,2026-02-10T10:01:00.000Z,NY
t2,c1,12.0,m2,2026-02-10T10:05:00.000Z,NY
t3,c2,100.0,m1,2026-02-10T11:00:00.000Z,CA
t4,c3,,m3,2026-02-10T11:10:00.000Z,TX
t5,c2,-5.0,m2,2026-02-10T11:20:00.000Z,CA
t2,c1,12.0,m2,2026-02-10T10:05:00.000Z,NY


transaction_id,customer_id,amount,merchant_id,transaction_time,state
t1,c1,25.5,m1,2026-02-10T10:01:00.000Z,NY
t2,c1,12.0,m2,2026-02-10T10:05:00.000Z,NY
t3,c2,100.0,m1,2026-02-10T11:00:00.000Z,CA
t4,c3,,m3,2026-02-10T11:10:00.000Z,TX
t5,c2,-5.0,m2,2026-02-10T11:20:00.000Z,CA
t2,c1,12.0,m2,2026-02-10T10:05:00.000Z,NY


transaction_id,customer_id,amount,merchant_id,transaction_time,state
t1,c1,25.5,m1,2026-02-10T10:01:00.000Z,NY
t2,c1,12.0,m2,2026-02-10T10:05:00.000Z,NY
t3,c2,100.0,m1,2026-02-10T11:00:00.000Z,CA


transaction_id,customer_id,amount,merchant_id,transaction_time,state
t1,c1,25.5,m1,2026-02-10T10:01:00.000Z,NY
t2,c1,12.0,m2,2026-02-10T10:05:00.000Z,NY
t3,c2,100.0,m1,2026-02-10T11:00:00.000Z,CA


customer_id,txn_date,total_spend,txn_count
c1,2026-02-10,37.5,2
c2,2026-02-10,100.0,1


customer_id,txn_date,total_spend,txn_count
c2,2026-02-10,100.0,1
c1,2026-02-10,37.5,2
