# Workking the Streaming Data Transformation - Silver Layer Implementation

In [0]:
from pyspark.sql.functions import (
    col, when, log1p,
    window, count, avg, stddev, approx_count_distinct
)


In [0]:
bronze_df = (
    spark.readStream
        .format("delta")
        .table("angad_kumar91.fraud_detection_bronzelayer.stream_bronze_data")
)


In [0]:
from pyspark.sql.functions import col, to_timestamp

bronze_df = (
    bronze_df
        .withColumn("TransactionID", col("TransactionID").cast("long"))
        .withColumn("TransactionDT", col("TransactionDT").cast("long"))
        .withColumn("TransactionAmt", col("TransactionAmt").cast("double"))
        .withColumn("isFraud", col("isFraud").cast("integer"))

        .withColumn("card1", col("card1").cast("integer"))
        .withColumn("card2", col("card2").cast("double"))
        .withColumn("card3", col("card3").cast("double"))
        .withColumn("card5", col("card5").cast("double"))

        .withColumn("addr1", col("addr1").cast("double"))
        .withColumn("addr2", col("addr2").cast("double"))

        .withColumn("dist1", col("dist1").cast("double"))
        .withColumn("dist2", col("dist2").cast("double"))

        .withColumn(
            "event_timestamp",
            to_timestamp(col("event_timestamp"))
        )
)


In [0]:
clean_df = (
    bronze_df
        .filter(col("TransactionAmt").isNotNull())
        .withColumn("TransactionAmt", col("TransactionAmt").cast("double"))
)


In [0]:
feature_df = (
    clean_df
        .withColumn(
            "is_high_value_txn",
            when(col("TransactionAmt") > 1000, 1).otherwise(0)
        )
        .withColumn(
            "log_transaction_amount",
            log1p(col("TransactionAmt"))
        )
        .withColumn(
            "is_international_txn",
            when(col("addr1") != col("addr2"), 1).otherwise(0)
        )
)


In [0]:
base_silver_df = (
    feature_df
        .withWatermark("event_timestamp", "10 minutes")
)


In [0]:
windowed_features_df = (
    base_silver_df
        .groupBy(
            col("card1"),
            window(col("event_timestamp"), "5 minutes")
        )
        .agg(
            count("*").alias("txn_count_5min"),
            avg("TransactionAmt").alias("avg_amount_5min"),
            stddev("TransactionAmt").alias("stddev_amount_5min"),
            approx_count_distinct("ProductCD").alias("product_diversity_5min")
        )
)


In [0]:
silver_enriched_df = (
    base_silver_df
        .join(
            windowed_features_df,
            on=[
                base_silver_df.card1 == windowed_features_df.card1,
                base_silver_df.event_timestamp >= windowed_features_df.window.start,
                base_silver_df.event_timestamp < windowed_features_df.window.end
            ],
            how="left"
        )
        .drop(windowed_features_df.card1)
        .drop("window")
)


In [0]:
silver_df_final = silver_enriched_df.select(
    "TransactionID",
    "TransactionDT",
    "TransactionAmt",
    "event_timestamp",
    "card1",
    "ProductCD",
    "DeviceType",
    "DeviceInfo",
    "isFraud",
    "txn_count_5min",
    "avg_amount_5min",
    "stddev_amount_5min",
    "product_diversity_5min",
    "_ingestion_timestamp",
    "_source_file"
)


In [0]:
# dbutils.fs.rm(
#   "/Volumes/angad_kumar91/fraud_detection_raw_data_files/checkpoints/silver/",
#   recurse=True
# )


In [0]:
(
    silver_df_final.writeStream
        .format("delta")
        .outputMode("append")
        .option(
            "checkpointLocation",
            "/Volumes/angad_kumar91/fraud_detection_raw_data_files/checkpoints/silver/"
        )
        .trigger(availableNow=True)
        .table(
            "angad_kumar91.fraud_detection_silverlayer.silver_transactions_features"
        )
)


In [0]:
spark.sql("""
SELECT
  txn_count_5min,
  avg_amount_5min,
  COUNT(*) AS records
FROM angad_kumar91.fraud_detection_silverlayer.silver_transactions_features
GROUP BY txn_count_5min, avg_amount_5min
ORDER BY records DESC
""").show()
