In [2]:
from pyspark.sql import functions as F

SILVER_PATH = "s3://ibmrawaml/Spark_curated_day/"
GOLD_ROOT   = "s3://ibmrawaml/gold/"

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

silver = spark.read.parquet(SILVER_PATH)

# Cache only if it fits in memory and you want speedup
silver.cache()
print("Rows in Silver:", silver.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Rows in Silver: 431109182

In [6]:
def write_gold(df, subfolder, coalesce_parts=8):
    """
    Write a DataFrame to Gold S3 location using Snappy Parquet,
    partitioned by yr / mo / day.
    """
    (df.coalesce(coalesce_parts)
       .write.mode("overwrite")
       .option("compression", "snappy")
       .partitionBy("yr", "mo", "day")
       .parquet(f"{GOLD_ROOT}{subfolder}"))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
daily = (silver
    .groupBy("yr", "mo", "day")
    .agg(
        F.count("*").alias("txn_ct"),
        F.sum("amount_received").alias("total_amt"),
        F.avg("amount_received").alias("avg_amt"),
        F.sum(F.expr("amount_received >= 10000").cast("int")).alias("big_tx_ct"),
        F.sum("is_laundering").alias("label_ct")
    )
    .withColumn("pct_big_tx", F.round(100.0 * F.col("big_tx_ct") / F.col("txn_ct"), 2))
    .withColumn("pct_label",  F.round(100.0 * F.col("label_ct") / F.col("txn_ct"), 2))
)

daily.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+---+---+-------+--------------------+-----------------+---------+--------+----------+---------+
|  yr| mo|day| txn_ct|           total_amt|          avg_amt|big_tx_ct|label_ct|pct_big_tx|pct_label|
+----+---+---+-------+--------------------+-----------------+---------+--------+----------+---------+
|2022| 10| 21|5189803|2.861449830057276E13|5513600.092445274|  1332898|    5467|     25.68|     0.11|
|2022|  9|  6|8788142|4.821919515143186E13|5486847.521516136|  2167159|   10722|     24.66|     0.12|
|2022|  9|  8|8789382|4.621526353219144E13|5258078.842425034|  2167344|   11035|     24.66|     0.13|
|2022| 10| 14|5977894|5.204302625211076...| 8705913.19486608|  1763749|    5442|      29.5|     0.09|
|2022|  9| 23|5193724|3.615389379937657E13| 6961073.36457936|  1336339|    5285|     25.73|      0.1|
+----+---+---+-------+--------------------+-----------------+---------+--------+----------+---------+
only showing top 5 rows

In [7]:
write_gold(daily, "daily/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
fanout = (silver
    .groupBy("yr", "mo", "day", "from_account")
    .agg(
        F.countDistinct("to_account").alias("fanout_deg"),
        F.sum("amount_received").alias("sum_amt"),
        F.max("amount_received").alias("max_amt")
    ))

write_gold(fanout, "fanout/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
fx = (silver
    .filter(F.col("receiving_currency") != F.col("payment_currency"))
    .withColumn("fx_pair", F.concat_ws("→", "receiving_currency", "payment_currency"))
    .groupBy("yr", "mo", "day", "fx_pair")
    .agg(
        F.count("*").alias("tx_ct"),
        F.sum("amount_received").alias("amt_recv"),
        F.sum("amount_paid").alias("amt_paid"),
        F.sum(F.col("amount_received") - F.col("amount_paid")).alias("spread_abs")
    )
    .withColumn("spread_pct",
        F.round(100.0 * F.col("spread_abs") / F.col("amt_paid"), 4))
)

write_gold(fx, "fx/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
hourly = (silver
    .withColumn("hr", F.hour("timestamp"))
    .groupBy("yr", "mo", "day", "hr")
    .agg(
        F.count("*").alias("tx_ct"),
        F.sum("amount_received").alias("amt")
    ))

write_gold(hourly, "hourly/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
corridors = (silver
    .withColumn("corridor", F.concat_ws("→", "from_bank", "to_bank"))
    .groupBy("yr", "mo", "day", "corridor")
    .agg(
        F.count("*").alias("tx_ct"),
        F.sum("amount_received").alias("amt")
    ))

write_gold(corridors, "corridors/")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…