In [0]:
from pyspark.sql import functions as F

silver_path = "s3://customer-seg-project/silver_delta/"
gold_path = "s3://customer-seg-project/gold_delta/"
gold_csv_path = "s3://customer-seg-project/exports/gold_csv/"

df_silver = spark.read.format("delta").load(silver_path)

# Aggregate Metrics
df_gold = (
    df_silver.groupBy("customer_type", "customer_age_group", "city", "merchant_cat")
    .agg(
        F.sum("amount").alias("total_spent"),
        F.count("*").alias("total_transactions"),        
        F.countDistinct("channel").alias("unique_channels"),
        F.sum("label_fraud").alias("fraud_transactions")
    )
    .withColumn("avg_transaction_value", F.round(F.col("total_spent") / F.col("total_transactions"), 2)) 
    .withColumn("fraud_rate", F.round(F.col("fraud_transactions") / F.col("total_transactions"), 4))
)

# Save Gold Delta
df_gold.write.format("delta").option("mergeSchema", "true").mode("overwrite").save(gold_path)

# Export CSV
df_gold.write.option("header","true").mode("overwrite").csv(gold_csv_path)

display(df_gold.limit(10))


customer_type,customer_age_group,city,merchant_cat,total_spent,total_transactions,unique_channels,fraud_transactions,avg_transaction_value,fraud_rate
Occasional,36-50,Pune,Electronics,7088.332805285103,344,4,11,20.61,0.032
Premium,36-50,Chennai,Pharmacy,14309.144757095182,471,4,11,30.38,0.0234
Regular,51+,Mumbai,Entertainment,11668.443589831237,606,4,13,19.25,0.0215
Premium,18-25,Bengaluru,Pharmacy,9860.33951918882,486,4,14,20.29,0.0288
Premium,26-35,Bengaluru,Grocery,19461.512148692917,985,4,30,19.76,0.0305
Regular,51+,Pune,Fuel,4431.864992132613,212,4,4,20.91,0.0189
Regular,36-50,Pune,BillPay,12948.750004905549,573,4,7,22.6,0.0122
Occasional,36-50,Delhi,Dining,16022.32689561621,687,4,13,23.32,0.0189
Occasional,26-35,Delhi,BillPay,19552.67214154624,939,4,20,20.82,0.0213
Regular,51+,Kolkata,Fashion,5133.316806778887,182,4,3,28.21,0.0165
