In [0]:
from pyspark.sql.functions import col, from_json, expr
from pyspark.sql.types import StructType

bronze_ledger_df = (
    spark.readStream
        .format("delta")
        .load("/Volumes/bank_cbs/00_landing/financial_ledger")
        .filter("TRANSACTION_TYPE_CODE = '001'")
)

In [0]:
# Filter DR (Customer) and CR (ATM) rows
dr_df = bronze_ledger_df.filter("entry_type = 'DR'").withColumnRenamed("account_id", "customer_account_id")
cr_df = bronze_ledger_df.filter("entry_type = 'CR'").withColumnRenamed("account_id", "atm_account_id")

# Join DR and CR on transaction_id
txn_df = (
    dr_df.alias("dr")
    .join(cr_df.alias("cr"), on="transaction_id")
    .select(
        col("dr.transaction_id"),
        col("dr.transaction_timestamp"),
        col("dr.customer_account_id"),
        col("cr.atm_account_id"),
        col("dr.amount").alias("withdrawal_amount"),
        col("dr.transaction_type_code").alias("type_code"),
        col("dr.channel_code")
    )
)



In [0]:
#Stream-Stream Join with Watermark

from pyspark.sql.functions import col, expr

dr_df = (
    bronze_ledger_df
    .filter("entry_type = 'DR'")
    .withColumnRenamed("account_id", "customer_account_id")
    .withWatermark("transaction_timestamp", "10 minutes")
)

cr_df = (
    bronze_ledger_df
    .filter("entry_type = 'CR'")
    .withColumnRenamed("account_id", "atm_account_id")
    .withWatermark("transaction_timestamp", "10 minutes")
)

txn_df = (
    dr_df.alias("dr")
    .join(
        cr_df.alias("cr"),
        expr("""
            dr.transaction_id = cr.transaction_id AND
            dr.transaction_timestamp BETWEEN cr.transaction_timestamp - interval 10 minutes AND cr.transaction_timestamp + interval 10 minutes
        """)
    )
    .select(
        col("dr.transaction_id"),
        col("dr.transaction_timestamp"),
        col("dr.customer_account_id"),
        col("cr.atm_account_id"),
        col("dr.amount").alias("withdrawal_amount"),
        col("dr.transaction_type_code").alias("type_code"),
        col("dr.channel_code")
    )
)

In [0]:
from pyspark.sql.functions import broadcast

# Load static reference tables
accounts_df = spark.read.format("delta").table("bank_cbs.bronze.accounts")
atms_df = spark.read.format("delta").table("bank_cbs.bronze.atms")
branches_df = spark.read.format("delta").table("bank_cbs.bronze.branches")
ref_transaction_types_df = spark.read.format("delta").table("bank_cbs.bronze.ref_transaction_types")
ref_channels_df = spark.read.format("delta").table("bank_cbs.bronze.ref_channels")

In [0]:
# Join ATM Account to ATM
txn_joined_df = (
    txn_df
    .join(broadcast(atms_df), txn_df["atm_account_id"] == atms_df["cash_gl_account_id"], how="left")
    .join(broadcast(branches_df), on="branch_id", how="left")
    .join(broadcast(ref_transaction_types_df), on="type_code", how="left")
    .join(broadcast(ref_channels_df), on="channel_code", how="left")
)

In [0]:
final_df = txn_joined_df.select(
    "transaction_id",
    "transaction_timestamp",
    "customer_account_id",
    "atm_account_id",
    "atm_id",
    "location_name",
    # "city",
    "branch_id",
    "branch_name",
    "type_code",
    "TYPE_NAME",
    "channel_code",
    "channel_name",
    "withdrawal_amount"
)

query = (
    final_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "/Volumes/bank_cbs/silver/atm_checkpoint_volume/atm_withdrawal_mart_ckpt")  # ✅ Use managed volume path
    .trigger(once=True)
    .start("/Volumes/bank_cbs/silver/atm_withdrawal_mart")  # ✅ Write to silver Delta table path
)

In [0]:
df_atm_mart = spark.read.format("delta").load("/Volumes/bank_cbs/silver/atm_withdrawal_mart")
display(df_atm_mart.limit(10))

transaction_id,transaction_timestamp,customer_account_id,atm_account_id,atm_id,location_name,branch_id,branch_name,type_code,TYPE_NAME,channel_code,channel_name,withdrawal_amount
935e4e3c-f5c4-4fec-b03b-5bdd9690484c,2025-07-25T07:58:00.000Z,100000459,200000071,ATM071,ATM Location 71,B010,Branch 10,1,Cash Withdrawal,100,ATM,1200.0
6a054f4e-6a2c-48a5-8b1f-2686500598fc,2025-07-25T22:43:00.000Z,100000268,200000031,ATM031,ATM Location 31,B009,Branch 9,1,Cash Withdrawal,100,ATM,1500.0
4f52e805-33ea-4edb-b550-91ac80fdfc1e,2025-07-25T00:49:00.000Z,100000031,200000018,ATM018,ATM Location 18,B004,Branch 4,1,Cash Withdrawal,100,ATM,1600.0
3cd07bad-47ef-4fdf-91ab-6e6ec6ed50de,2025-07-25T05:33:00.000Z,100000171,200000067,ATM067,ATM Location 67,B009,Branch 9,1,Cash Withdrawal,100,ATM,700.0
75cdea05-e9ed-4ae2-89c2-f7d78e3fd13d,2025-07-25T16:20:00.000Z,100000099,200000006,ATM006,ATM Location 6,B005,Branch 5,1,Cash Withdrawal,100,ATM,1500.0
2c680d75-fc58-4599-9805-c91c4d5b256a,2025-07-25T20:27:00.000Z,100000416,200000016,ATM016,ATM Location 16,B003,Branch 3,1,Cash Withdrawal,100,ATM,200.0
36a9a8bc-5e8a-44db-96f5-eb558664ab08,2025-07-25T05:19:00.000Z,100000382,200000032,ATM032,ATM Location 32,B004,Branch 4,1,Cash Withdrawal,100,ATM,1000.0
793b5647-f9bc-4f31-8dd7-66918092e949,2025-07-25T19:29:00.000Z,100000147,200000091,ATM091,ATM Location 91,B007,Branch 7,1,Cash Withdrawal,100,ATM,1300.0
a0230d14-6fd9-4344-8fa2-9de03e7db743,2025-07-25T06:27:00.000Z,100000422,200000081,ATM081,ATM Location 81,B003,Branch 3,1,Cash Withdrawal,100,ATM,1700.0
b4900194-7354-40b2-a07b-2e235928676b,2025-07-25T06:04:00.000Z,100000024,200000074,ATM074,ATM Location 74,B003,Branch 3,1,Cash Withdrawal,100,ATM,2000.0
