In [0]:
# 03_Gold_Analytics

from pyspark.sql.functions import *

df_silver = spark.table("silver_transactions")

# --- TABLE 1: DAILY TREND (Line Chart) ---
df_gold_daily = df_silver.groupBy(to_date("Transaction_Date").alias("Date")) \
    .agg(
        count("TransactionID").alias("Total_Transactions"),
        sum("Anomaly").alias("Fraud_Cases"),
        round(avg("Transaction_Amount"), 2).alias("Avg_Txn_Value")
    ).orderBy("Date")

df_gold_daily.write.format("delta").mode("overwrite").saveAsTable("gold_daily_fraud")

# --- TABLE 2: A/B TEST RESULTS (KPIs) ---
df_gold_ab = df_silver.groupBy("Experiment_Group") \
    .agg(
        count("TransactionID").alias("Total_Txns"),
        sum("Anomaly").alias("Fraud_Cases"),
        round((sum("Anomaly") / count("TransactionID")) * 100, 4).alias("Fraud_Rate_Percent")
    )

df_gold_ab.write.format("delta").mode("overwrite").saveAsTable("gold_ab_test_results")

# --- TABLE 3: GEOSPATIAL RISK (Heatmap Data) ---
# This will show 'New York' has high fraud
df_gold_city = df_silver.groupBy("User_City") \
    .agg(
        count("TransactionID").alias("Total_Txns"),
        sum("Anomaly").alias("Fraud_Cases"),
        round((sum("Anomaly") / count("TransactionID")) * 100, 2).alias("Fraud_Rate")
    ).filter(col("Total_Txns") > 50)

df_gold_city.write.format("delta").mode("overwrite").saveAsTable("gold_fraud_by_city")

# --- TABLE 4: CATEGORY RISK (Bar Chart Data) ---
# This will show 'Electronics' has high fraud
df_gold_category = df_silver.groupBy("Merchant_Category") \
    .agg(
        sum("Anomaly").alias("Fraud_Cases"),
        round((sum("Anomaly") / count("TransactionID")) * 100, 2).alias("Fraud_Rate")
    ).orderBy(col("Fraud_Rate").desc())

df_gold_category.write.format("delta").mode("overwrite").saveAsTable("gold_fraud_by_category")

print("✅ Gold Layer Updated! Analytics tables are ready.")

✅ Gold Layer Updated! Analytics tables are ready.
