In [0]:
import uuid
from pyspark.sql.functions import (
    col, max, min, avg, count, lit, when, window,
    max as spark_max  # Import max with an alias
)
from pyspark.sql.window import Window # Import the Window class
from delta.tables import DeltaTable

# --- 1. Define UC_ROOT and Paths ---
UC_ROOT = "/Volumes/ecommerce_audit/audit_schema/audit_volume/APAF_Capstone_Project"

silver_enriched_table_name = "ecommerce_audit.audit_schema.silver_enriched_events"
gold_audit_table_name = "ecommerce_audit.audit_schema.gold_price_audit_metrics"

# Use a consistent checkpoint path
dbfs_checkpoint_gold = f"{UC_ROOT}/_checkpoints/gold_v4" 

print("All required paths and variables defined for Gold Layer processing.")

# --- 2. Initialize Gold Delta Table ---
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS {gold_audit_table_name} (
    audit_window_end TIMESTAMP,
    product_id STRING,
    category STRING,
    geo_cluster STRING,
    avg_price DOUBLE,
    price_variance_ratio DOUBLE, -- This will hold the Price Gap Ratio
    audit_risk_flag STRING,
    request_count LONG
  ) USING DELTA
""")
print(f"Gold Audit Table ensured: {gold_audit_table_name}")


# --- 3. [MODIFIED] Define the MERGE INTO function with ALL Gold Logic ---
def upsertToDelta(microBatchOutputDF, batchId):
    """
    This function now contains ALL the Gold Layer logic.
    It receives the micro-batch from the first aggregation,
    applies the second window function, calculates the PVR,
    and merges the final result.
    """
    
    # --- 5. [MOVED] Second Window: Calculate Global Max Price ---
    # This logic is now INSIDE foreachBatch, running on a static micro-batch
    global_window = Window.partitionBy(
        microBatchOutputDF.audit_window_end,
        microBatchOutputDF.product_id
    )
    
    df_with_global_max = microBatchOutputDF.withColumn(
        "global_max_price", 
        spark_max(col("cluster_avg_price")).over(global_window)
    )

    # --- 6. [MOVED] Final Audit Logic (Price Gap Ratio) ---
    AUDIT_GAP_THRESHOLD = 0.08  # 8% gap is significant
    
    gold_df_final = df_with_global_max.withColumn(
        # The CORRECT Audit Metric: Price Gap Ratio (PGR)
        "price_gap_ratio",
        (col("global_max_price") - col("cluster_avg_price")) / col("global_max_price")
    ).withColumn(
        "audit_risk_flag",
        when(
            (col("price_gap_ratio") > AUDIT_GAP_THRESHOLD) & (col("request_count") >= 10),
            lit("HIGH_RISK_AUDIT_DETECTED") # This group was charged LESS
        ).when(
            (col("price_gap_ratio") > 0.01) & (col("request_count") >= 5),
            lit("LOW_RISK_CLEARED") # Minor gap
        ).otherwise(
            lit("AUDIT_CLEARED") # This group was charged the MAX price (gap is 0)
        )
    ).select(
        "audit_window_end",
        "product_id",
        "category",
        "geo_cluster",
        col("cluster_avg_price").alias("avg_price"),
        col("price_gap_ratio").alias("price_variance_ratio"), # Aliasing to match table schema
        "audit_risk_flag",
        "request_count"
    )

    # --- Original Merge Logic ---
    deltaTable = DeltaTable.forName(spark, gold_audit_table_name)
    merge_condition = (
        "target.audit_window_end = source.audit_window_end AND " +
        "target.product_id = source.product_id AND " +
        "target.geo_cluster = source.geo_cluster"
    )
    
    (
        deltaTable.alias("target")
        .merge(gold_df_final.alias("source"), merge_condition) # Merge the FINAL transformed DF
        .whenMatchedUpdateAll() 
        .whenNotMatchedInsertAll() 
        .execute()
    )

# --- 4. [SIMPLIFIED] Define Streaming Read and First Aggregation ---
# This stream just does the first part: aggregating by cluster
silver_stream_df = (
    spark.readStream
    .table(silver_enriched_table_name)
    .withWatermark("event_time", "1 hour") 
)

cluster_metrics_df = silver_stream_df.groupBy(
    window(col("event_time"), "1 hour"), 
    col("product_id"),
    col("category"),
    col("geo_cluster")
).agg(
    avg("request_price").alias("cluster_avg_price"),
    count("request_id").alias("request_count")
).select(
    col("window.end").alias("audit_window_end"),
    col("product_id"),
    col("category"),
    col("geo_cluster"),
    col("cluster_avg_price"),
    col("request_count")
)

# --- 5. [MODIFIED] Write to Gold Delta Table using foreachBatch ---
# The stream now sends the intermediate 'cluster_metrics_df'
# to our 'upsertToDelta' function, which handles the rest.
gold_query = (
    cluster_metrics_df.writeStream # Note: We are writing the intermediate DF
    .foreachBatch(upsertToDelta)   # All the complex logic is now inside this function
    .outputMode("update")
    .option("checkpointLocation", dbfs_checkpoint_gold)
    .trigger(once=True)
    .queryName("Gold_Audit_Stream_v4_Corrected") # New query name
    .start()
)

gold_query.awaitTermination()

print(f"✅ Step 7: Corrected Gold Audit Stream finished. Auditing metrics are in: {gold_audit_table_name}")

# --- 8. Final Verification ---
print("Running final verification query...")
final_audit_df = spark.read.table(gold_audit_table_name)

print("Final Audit Metrics: CORRECTED Price Gap Ratio Check")
print("Look for HIGH_RISK records with a ratio of ~0.09-0.10")
(
    final_audit_df
    .select(
        "product_id",
        "geo_cluster",
        "avg_price",
        "price_variance_ratio", # This now contains the Price Gap Ratio
        "audit_risk_flag",
        "request_count"
    )
    .filter(col("audit_risk_flag") == "HIGH_RISK_AUDIT_DETECTED")
    .orderBy(col("price_variance_ratio").desc())
    .display()
)

All required paths and variables defined for Gold Layer processing.
Gold Audit Table ensured: ecommerce_audit.audit_schema.gold_price_audit_metrics
✅ Step 7: Corrected Gold Audit Stream finished. Auditing metrics are in: ecommerce_audit.audit_schema.gold_price_audit_metrics
Running final verification query...
Final Audit Metrics: CORRECTED Price Gap Ratio Check
Look for HIGH_RISK records with a ratio of ~0.09-0.10


product_id,geo_cluster,avg_price,price_variance_ratio,audit_risk_flag,request_count
PROD_0012,Standard_Region_B,195.76615384615383,0.6214592121468135,HIGH_RISK_AUDIT_DETECTED,13
PROD_0009,Standard_Region_B,198.7103571428572,0.4259930690468012,HIGH_RISK_AUDIT_DETECTED,28
PROD_0003,Standard_Region_C,223.92,0.4028895537671397,HIGH_RISK_AUDIT_DETECTED,14
PROD_0019,Standard_Region_B,239.057,0.3484658530336808,HIGH_RISK_AUDIT_DETECTED,10
PROD_0002,Standard_Region_B,250.7207692307693,0.3457694616006856,HIGH_RISK_AUDIT_DETECTED,13
PROD_0010,Standard_Region_B,222.11333333333332,0.3311160112293758,HIGH_RISK_AUDIT_DETECTED,27
PROD_0014,Standard_Region_B,232.19416666666663,0.3225256636565666,HIGH_RISK_AUDIT_DETECTED,24
PROD_0013,Standard_Region_B,237.25761904761904,0.3214882723955471,HIGH_RISK_AUDIT_DETECTED,21
PROD_0018,Standard_Region_B,229.51391304347828,0.317591746254232,HIGH_RISK_AUDIT_DETECTED,23
PROD_0011,Standard_Region_C,208.08529411764707,0.3095139946778147,HIGH_RISK_AUDIT_DETECTED,17
