In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window


spark = SparkSession.builder.appName("GoldLayerBatch").getOrCreate()


silver_df = spark.sql("SELECT * FROM silver_table_lh")


silver_df = silver_df.withColumn(
    "high_amount_flag",
    F.when(F.col("amount") > 1000, 1).otherwise(0)
)

fraud_stats_df = (
    silver_df.groupBy("user_id")
    .agg(
        F.count("*").alias("total_txn"),
        F.sum(F.col("is_fraud").cast("int")).alias("fraud_txn")
    )
    .withColumn("fraud_rate", F.round(F.col("fraud_txn") / F.col("total_txn") * 100, 2))
)

user_risk_df = (
    silver_df.groupBy("user_id")
    .agg(
        F.sum(F.col("txn_count_1min_per_user")).alias("txn_count_1min"),
        F.avg(F.col("amount_deviation")).alias("avg_amount_deviation"),
        F.sum(F.col("geo_change_flag")).alias("geo_alerts"),
        F.avg(F.col("merchant_risk_score")).alias("merchant_risk_avg")
    )
)

user_risk_df = (
    user_risk_df.join(fraud_stats_df.select("user_id", "fraud_rate"), on="user_id", how="left")
    .withColumn(
        "user_risk_score",
        F.round(
            F.col("fraud_rate") * 0.5 + 
            F.col("txn_count_1min") * 0.1 + 
            F.col("avg_amount_deviation") * 0.1 + 
            F.col("geo_alerts") * 0.1 + 
            F.col("merchant_risk_avg") * 0.2,
            2
        )
    )
)


alerts_user_df = user_risk_df.withColumn(
    "alert_flag",
    F.when(F.col("user_risk_score") > 80, 1).otherwise(0)
)


alerts_region_df = (
    silver_df.withColumn(
        "high_amount_flag",
        F.when(F.col("amount") > 1000, 1).otherwise(0)
    )
    .groupBy("country_code")
    .agg(
        F.sum(F.col("geo_change_flag")).alias("geo_alerts"),
        F.sum(F.col("high_amount_flag")).alias("high_amount_alerts")
    )
)


baseline_fraud_rate = fraud_stats_df.agg(F.avg("fraud_rate").alias("baseline_rate")).collect()[0]["baseline_rate"]

fraud_spike_df = fraud_stats_df.withColumn(
    "fraud_spike_flag",
    F.when(F.col("fraud_rate") > 3 * baseline_fraud_rate, 1).otherwise(0)
)


gold_df = (
    user_risk_df
    .join(alerts_user_df.select("user_id", "alert_flag"), on="user_id", how="left")
    .join(fraud_spike_df.select("user_id", "fraud_spike_flag"), on="user_id", how="left")
)

spark.sql("DROP TABLE IF EXISTS gold_table_lh")
gold_df.write.format("delta").mode("overwrite").saveAsTable("gold_table_lh")

print("Gold table written successfully: gold_table_lh")


display(spark.sql("SELECT * FROM gold_table_lh LIMIT 10"))


StatementMeta(, fdea56cf-b068-4f58-9a4e-5bf921ef0f43, 4, Finished, Available, Finished, False)

Gold table written successfully: gold_table_lh


SynapseWidget(Synapse.DataFrame, 13507ba8-f83c-4fae-8889-3c8ac55f4d3e)

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window


spark = SparkSession.builder.appName("GoldLayerBatch").getOrCreate()


silver_df = spark.sql("SELECT * FROM silver_table_lh")


silver_df = silver_df.withColumn(
    "high_amount_flag",
    F.when(F.col("amount") > 1000, 1).otherwise(0)
)


fraud_stats_df = (
    silver_df.groupBy("user_id")
    .agg(
        F.count("*").alias("total_txn"),
        F.sum(F.col("is_fraud").cast("int")).alias("fraud_txn")
    )
    .withColumn("fraud_rate", F.round(F.col("fraud_txn") / F.col("total_txn") * 100, 2))
)


user_risk_df = (
    silver_df.groupBy("user_id")
    .agg(
        F.sum(F.col("txn_count_1min_per_user")).alias("txn_count_1min"),
        F.round(F.avg(F.col("amount_deviation")), 2).alias("avg_amount_deviation"),  # rounded to 2 decimals
        F.sum(F.col("geo_change_flag")).alias("geo_alerts"),
        F.round(F.avg(F.col("merchant_risk_score")), 5).alias("merchant_risk_avg")   # rounded to 5 decimals
    )
)

user_risk_df = (
    user_risk_df.join(fraud_stats_df.select("user_id", "fraud_rate"), on="user_id", how="left")
    .withColumn(
        "user_risk_score",
        F.round(
            F.col("fraud_rate") * 0.5 + 
            F.col("txn_count_1min") * 0.1 + 
            F.col("avg_amount_deviation") * 0.1 + 
            F.col("geo_alerts") * 0.1 + 
            F.col("merchant_risk_avg") * 0.2,
            2
        )
    )
)


alerts_user_df = user_risk_df.withColumn(
    "alert_flag",
    F.when(F.col("user_risk_score") > 80, 1).otherwise(0)
)


alerts_region_df = (
    silver_df.groupBy("country_code")
    .agg(
        F.sum(F.col("geo_change_flag")).alias("geo_alerts"),
        F.sum(F.col("high_amount_flag")).alias("high_amount_alerts")
    )
)


baseline_fraud_rate = fraud_stats_df.agg(F.avg("fraud_rate").alias("baseline_rate")).collect()[0]["baseline_rate"]

fraud_spike_df = fraud_stats_df.withColumn(
    "fraud_spike_flag",
    F.when(F.col("fraud_rate") > 3 * baseline_fraud_rate, "fraud").otherwise("normal")  # text labels
)


gold_df = (
    user_risk_df
    .join(alerts_user_df.select("user_id", "alert_flag"), on="user_id", how="left")
    .join(fraud_spike_df.select("user_id", "fraud_spike_flag"), on="user_id", how="left")
)

spark.sql("DROP TABLE IF EXISTS gold_table_lh")
gold_df.write.format("delta").mode("overwrite").saveAsTable("gold_table_lh")

print("Gold table written successfully: gold_table_lh")


display(spark.sql("SELECT * FROM gold_table_lh LIMIT 10"))


StatementMeta(, fdea56cf-b068-4f58-9a4e-5bf921ef0f43, 5, Finished, Available, Finished, False)

Gold table written successfully: gold_table_lh


SynapseWidget(Synapse.DataFrame, e4d24fed-4865-4c05-868b-20e37ee74fa8)