In [0]:
def can_write(path: str) -> bool:
    try:
        test_dir = path.rstrip("/") + "/__write_test__"
        dbutils.fs.mkdirs(test_dir)
        dbutils.fs.rm(test_dir, recurse=True)
        return True
    except Exception as e:
        print(f"❌ {path} -> {str(e)[:160]}")
        return False

candidates = [
    "dbfs:/FileStore",                  # common
    "dbfs:/databricks-datasets",         # read-only usually
    "dbfs:/Workspace",                  # sometimes exposed
    "dbfs:/Volumes",                    # if UC volumes exist
]

for p in candidates:
    ok = can_write(p)
    print(("✅" if ok else "—"), "Writable?" , p)


❌ dbfs:/FileStore -> Public DBFS root is disabled. Access is denied on path: /FileStore/__write_test__

JVM stacktrace:
java.lang.UnsupportedOperationException
	at com.databricks.ba
— Writable? dbfs:/FileStore
❌ dbfs:/databricks-datasets -> [UNAUTHORIZED_ACCESS] Unauthorized access:
__write_test__/: PUT 0-byte object  on __write_test__/: com.amazonaws.services.s3.model.AmazonS3Exception: User: arn:
— Writable? dbfs:/databricks-datasets
✅ Writable? dbfs:/Workspace
❌ dbfs:/Volumes -> Entry must be a full volume path. Path '/Volumes/__write_test__' is incomplete. Expected format: /Volumes/<catalog>/<schema>/<volume>

JVM stacktrace:
com.datab
— Writable? dbfs:/Volumes


In [0]:
BASE = "dbfs:/Workspace/fintech"

RAW_CSV = f"{BASE}/raw/transactions/csv"
RAW_JSON = f"{BASE}/raw/transactions/json"
CHECKPOINT = f"{BASE}/checkpoints/bronze_txn"
BAD = f"{BASE}/bad_records/bronze_txn"

dbutils.fs.mkdirs(RAW_CSV)
dbutils.fs.mkdirs(RAW_JSON)
dbutils.fs.mkdirs(CHECKPOINT)
dbutils.fs.mkdirs(BAD)

print("Using base path:", BASE)


Using base path: dbfs:/Workspace/fintech


In [0]:
%sql
CREATE DATABASE IF NOT EXISTS fintech_de;
USE fintech_de;


In [0]:
%sql
SELECT current_database();

current_database()
fintech_de


In [0]:
spark.table("fintech_de.bronze_transactions")


In [0]:
%sql
SHOW TABLES IN fintech_de;

database,tableName,isTemporary


In [0]:
%sql
SELECT current_database();

current_database()
fintech_de


In [0]:
spark.sql("""
CREATE TABLE IF NOT EXISTS fintech_de.bronze_transactions_raw (
  raw_payload STRING,
  ingest_time TIMESTAMP,
  batch_id STRING
) USING DELTA
""")


DataFrame[]

In [0]:
%sql
SHOW TABLES IN fintech_de;

database,tableName,isTemporary
fintech_de,bronze_transactions_raw,False


In [0]:
import json, uuid, random
from datetime import datetime, timedelta
from pyspark.sql import Row
from pyspark.sql.functions import current_timestamp, lit

def generate_txn():
    return json.dumps({
        "transaction_id": str(uuid.uuid4()),
        "event_time": "2026-02-01 10:00:00",
        "account_id": f"ACC{random.randint(1000,1100)}",
        "customer_id": f"CUST{random.randint(200,260)}",
        "merchant_id": f"MER{random.randint(1,40)}",
        "amount": round(random.uniform(5,500),2),
        "currency": random.choice(["CAD","USD"]),
        "txn_type": random.choice(["card_purchase","transfer","refund","atm_withdrawal"]),
        "status": random.choice(["approved","declined","reversed"]),
        "risk_score": round(random.uniform(0,1),3)
    })

rows = [Row(raw_payload=generate_txn()) for _ in range(200)]

df = (spark.createDataFrame(rows)
      .withColumn("ingest_time", current_timestamp())
      .withColumn("batch_id", lit(str(uuid.uuid4()))))

df.write.mode("append").saveAsTable("fintech_de.bronze_transactions_raw")


In [0]:
%sql
SELECT * FROM fintech_de.bronze_transactions_raw;


raw_payload,ingest_time,batch_id
"{""transaction_id"": ""902e5f6f-becf-4a89-910c-54eaa0fc7c82"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1086"", ""customer_id"": ""CUST222"", ""merchant_id"": ""MER17"", ""amount"": 322.27, ""currency"": ""CAD"", ""txn_type"": ""transfer"", ""status"": ""declined"", ""risk_score"": 0.795}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf
"{""transaction_id"": ""43615cde-6c41-407c-a495-0d86af8a27e9"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1069"", ""customer_id"": ""CUST221"", ""merchant_id"": ""MER7"", ""amount"": 263.42, ""currency"": ""USD"", ""txn_type"": ""atm_withdrawal"", ""status"": ""declined"", ""risk_score"": 0.227}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf
"{""transaction_id"": ""35138017-7091-41e6-913d-caf9d0b7654e"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1002"", ""customer_id"": ""CUST205"", ""merchant_id"": ""MER14"", ""amount"": 380.11, ""currency"": ""CAD"", ""txn_type"": ""transfer"", ""status"": ""declined"", ""risk_score"": 0.633}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf
"{""transaction_id"": ""19f2de84-0575-48c3-a425-46d65c736d0c"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1100"", ""customer_id"": ""CUST231"", ""merchant_id"": ""MER23"", ""amount"": 152.55, ""currency"": ""USD"", ""txn_type"": ""atm_withdrawal"", ""status"": ""approved"", ""risk_score"": 0.105}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf
"{""transaction_id"": ""0847af79-7aa9-4d1e-88fb-e30ef7acdd61"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1090"", ""customer_id"": ""CUST236"", ""merchant_id"": ""MER11"", ""amount"": 36.84, ""currency"": ""CAD"", ""txn_type"": ""refund"", ""status"": ""reversed"", ""risk_score"": 0.43}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf
"{""transaction_id"": ""4d6d6726-5fba-4dcb-a249-30056d9f5fea"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1080"", ""customer_id"": ""CUST210"", ""merchant_id"": ""MER14"", ""amount"": 235.43, ""currency"": ""USD"", ""txn_type"": ""refund"", ""status"": ""declined"", ""risk_score"": 0.599}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf
"{""transaction_id"": ""67458338-95b3-4ef9-b887-3b28d1d5dbed"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1003"", ""customer_id"": ""CUST254"", ""merchant_id"": ""MER21"", ""amount"": 73.98, ""currency"": ""USD"", ""txn_type"": ""card_purchase"", ""status"": ""declined"", ""risk_score"": 0.407}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf
"{""transaction_id"": ""53e2ed65-d86d-417a-a79e-1ecb8dcf8100"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1080"", ""customer_id"": ""CUST242"", ""merchant_id"": ""MER3"", ""amount"": 18.65, ""currency"": ""USD"", ""txn_type"": ""card_purchase"", ""status"": ""approved"", ""risk_score"": 0.365}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf
"{""transaction_id"": ""e16728eb-2b99-4b8a-bae0-9699096c945d"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1024"", ""customer_id"": ""CUST233"", ""merchant_id"": ""MER35"", ""amount"": 249.47, ""currency"": ""CAD"", ""txn_type"": ""transfer"", ""status"": ""declined"", ""risk_score"": 0.71}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf
"{""transaction_id"": ""f9b3bd05-6e56-4cc9-8add-69aa2e533abe"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1057"", ""customer_id"": ""CUST229"", ""merchant_id"": ""MER24"", ""amount"": 84.0, ""currency"": ""CAD"", ""txn_type"": ""card_purchase"", ""status"": ""reversed"", ""risk_score"": 0.472}",2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf


In [0]:
from pyspark.sql.functions import from_json, col, schema_of_json, expr

# Read raw bronze
raw_df = spark.table("fintech_de.bronze_transactions_raw")

# Infer schema from a sample payload
sample_json = raw_df.select("raw_payload").limit(1).collect()[0][0]
inferred_schema = schema_of_json(expr(f"'{sample_json}'"))

# Parse JSON into columns
bronze_df = (
    raw_df
    .withColumn("data", from_json(col("raw_payload"), inferred_schema))
    .select("data.*", "ingest_time", "batch_id", "raw_payload")
)

# Write structured bronze table
(bronze_df
 .write
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .saveAsTable("fintech_de.bronze_transactions"))


In [0]:
%sql
SELECT COUNT(*) FROM fintech_de.bronze_transactions;



COUNT(*)
400


In [0]:
%sql
SELECT *
FROM fintech_de.bronze_transactions
ORDER BY ingest_time DESC
LIMIT 10;


account_id,amount,currency,customer_id,event_time,merchant_id,risk_score,status,transaction_id,txn_type,ingest_time,batch_id,raw_payload
ACC1003,73.98,USD,CUST254,2026-02-01 10:00:00,MER21,0.407,declined,67458338-95b3-4ef9-b887-3b28d1d5dbed,card_purchase,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""67458338-95b3-4ef9-b887-3b28d1d5dbed"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1003"", ""customer_id"": ""CUST254"", ""merchant_id"": ""MER21"", ""amount"": 73.98, ""currency"": ""USD"", ""txn_type"": ""card_purchase"", ""status"": ""declined"", ""risk_score"": 0.407}"
ACC1080,235.43,USD,CUST210,2026-02-01 10:00:00,MER14,0.599,declined,4d6d6726-5fba-4dcb-a249-30056d9f5fea,refund,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""4d6d6726-5fba-4dcb-a249-30056d9f5fea"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1080"", ""customer_id"": ""CUST210"", ""merchant_id"": ""MER14"", ""amount"": 235.43, ""currency"": ""USD"", ""txn_type"": ""refund"", ""status"": ""declined"", ""risk_score"": 0.599}"
ACC1002,380.11,CAD,CUST205,2026-02-01 10:00:00,MER14,0.633,declined,35138017-7091-41e6-913d-caf9d0b7654e,transfer,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""35138017-7091-41e6-913d-caf9d0b7654e"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1002"", ""customer_id"": ""CUST205"", ""merchant_id"": ""MER14"", ""amount"": 380.11, ""currency"": ""CAD"", ""txn_type"": ""transfer"", ""status"": ""declined"", ""risk_score"": 0.633}"
ACC1024,249.47,CAD,CUST233,2026-02-01 10:00:00,MER35,0.71,declined,e16728eb-2b99-4b8a-bae0-9699096c945d,transfer,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""e16728eb-2b99-4b8a-bae0-9699096c945d"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1024"", ""customer_id"": ""CUST233"", ""merchant_id"": ""MER35"", ""amount"": 249.47, ""currency"": ""CAD"", ""txn_type"": ""transfer"", ""status"": ""declined"", ""risk_score"": 0.71}"
ACC1090,36.84,CAD,CUST236,2026-02-01 10:00:00,MER11,0.43,reversed,0847af79-7aa9-4d1e-88fb-e30ef7acdd61,refund,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""0847af79-7aa9-4d1e-88fb-e30ef7acdd61"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1090"", ""customer_id"": ""CUST236"", ""merchant_id"": ""MER11"", ""amount"": 36.84, ""currency"": ""CAD"", ""txn_type"": ""refund"", ""status"": ""reversed"", ""risk_score"": 0.43}"
ACC1100,152.55,USD,CUST231,2026-02-01 10:00:00,MER23,0.105,approved,19f2de84-0575-48c3-a425-46d65c736d0c,atm_withdrawal,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""19f2de84-0575-48c3-a425-46d65c736d0c"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1100"", ""customer_id"": ""CUST231"", ""merchant_id"": ""MER23"", ""amount"": 152.55, ""currency"": ""USD"", ""txn_type"": ""atm_withdrawal"", ""status"": ""approved"", ""risk_score"": 0.105}"
ACC1086,322.27,CAD,CUST222,2026-02-01 10:00:00,MER17,0.795,declined,902e5f6f-becf-4a89-910c-54eaa0fc7c82,transfer,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""902e5f6f-becf-4a89-910c-54eaa0fc7c82"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1086"", ""customer_id"": ""CUST222"", ""merchant_id"": ""MER17"", ""amount"": 322.27, ""currency"": ""CAD"", ""txn_type"": ""transfer"", ""status"": ""declined"", ""risk_score"": 0.795}"
ACC1069,263.42,USD,CUST221,2026-02-01 10:00:00,MER7,0.227,declined,43615cde-6c41-407c-a495-0d86af8a27e9,atm_withdrawal,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""43615cde-6c41-407c-a495-0d86af8a27e9"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1069"", ""customer_id"": ""CUST221"", ""merchant_id"": ""MER7"", ""amount"": 263.42, ""currency"": ""USD"", ""txn_type"": ""atm_withdrawal"", ""status"": ""declined"", ""risk_score"": 0.227}"
ACC1080,18.65,USD,CUST242,2026-02-01 10:00:00,MER3,0.365,approved,53e2ed65-d86d-417a-a79e-1ecb8dcf8100,card_purchase,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""53e2ed65-d86d-417a-a79e-1ecb8dcf8100"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1080"", ""customer_id"": ""CUST242"", ""merchant_id"": ""MER3"", ""amount"": 18.65, ""currency"": ""USD"", ""txn_type"": ""card_purchase"", ""status"": ""approved"", ""risk_score"": 0.365}"
ACC1057,84.0,CAD,CUST229,2026-02-01 10:00:00,MER24,0.472,reversed,f9b3bd05-6e56-4cc9-8add-69aa2e533abe,card_purchase,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""f9b3bd05-6e56-4cc9-8add-69aa2e533abe"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1057"", ""customer_id"": ""CUST229"", ""merchant_id"": ""MER24"", ""amount"": 84.0, ""currency"": ""CAD"", ""txn_type"": ""card_purchase"", ""status"": ""reversed"", ""risk_score"": 0.472}"


In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

bronze_df = spark.table("fintech_de.bronze_transactions")

typed_df = (
    bronze_df
    .withColumn("event_time", to_timestamp(col("event_time")))
    .withColumn("amount", col("amount").cast("decimal(18,2)"))
    .withColumn("risk_score", col("risk_score").cast("double"))
)


In [0]:
valid_df = typed_df.filter(
    col("transaction_id").isNotNull() &
    col("event_time").isNotNull() &
    (col("amount") > 0) &
    col("currency").isNotNull()
)

quarantine_df = typed_df.subtract(valid_df)

In [0]:
window_spec = Window.partitionBy("transaction_id").orderBy(col("ingest_time").desc())

silver_df = (
    valid_df
    .withColumn("row_num", row_number().over(window_spec))
    .filter(col("row_num") == 1)
    .drop("row_num")
)


In [0]:
(
    silver_df.write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("fintech_de.silver_transactions")
)

(
    quarantine_df.write
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("fintech_de.silver_quarantine")
)


In [0]:
%sql
SELECT COUNT(*) AS silver_rows FROM fintech_de.silver_transactions;


silver_rows
400


In [0]:
%sql
SELECT COUNT(*) AS quarantined_rows FROM fintech_de.silver_quarantine;



quarantined_rows
0


In [0]:
%sql
SELECT *
FROM fintech_de.silver_transactions
ORDER BY ingest_time DESC
LIMIT 10;


account_id,amount,currency,customer_id,event_time,merchant_id,risk_score,status,transaction_id,txn_type,ingest_time,batch_id,raw_payload
ACC1027,299.07,USD,CUST231,2026-02-01T10:00:00.000Z,MER23,0.746,declined,06fb67bf-bcd3-4c8d-aa07-583816566a7b,atm_withdrawal,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""06fb67bf-bcd3-4c8d-aa07-583816566a7b"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1027"", ""customer_id"": ""CUST231"", ""merchant_id"": ""MER23"", ""amount"": 299.07, ""currency"": ""USD"", ""txn_type"": ""atm_withdrawal"", ""status"": ""declined"", ""risk_score"": 0.746}"
ACC1027,384.57,USD,CUST249,2026-02-01T10:00:00.000Z,MER15,0.808,declined,075c4e24-67f4-43de-87d7-a0ef7e0c657c,transfer,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""075c4e24-67f4-43de-87d7-a0ef7e0c657c"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1027"", ""customer_id"": ""CUST249"", ""merchant_id"": ""MER15"", ""amount"": 384.57, ""currency"": ""USD"", ""txn_type"": ""transfer"", ""status"": ""declined"", ""risk_score"": 0.808}"
ACC1075,215.12,USD,CUST252,2026-02-01T10:00:00.000Z,MER14,0.487,approved,03111aeb-1982-4125-9819-063852d29fda,card_purchase,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""03111aeb-1982-4125-9819-063852d29fda"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1075"", ""customer_id"": ""CUST252"", ""merchant_id"": ""MER14"", ""amount"": 215.12, ""currency"": ""USD"", ""txn_type"": ""card_purchase"", ""status"": ""approved"", ""risk_score"": 0.487}"
ACC1090,36.84,CAD,CUST236,2026-02-01T10:00:00.000Z,MER11,0.43,reversed,0847af79-7aa9-4d1e-88fb-e30ef7acdd61,refund,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""0847af79-7aa9-4d1e-88fb-e30ef7acdd61"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1090"", ""customer_id"": ""CUST236"", ""merchant_id"": ""MER11"", ""amount"": 36.84, ""currency"": ""CAD"", ""txn_type"": ""refund"", ""status"": ""reversed"", ""risk_score"": 0.43}"
ACC1061,377.77,USD,CUST251,2026-02-01T10:00:00.000Z,MER15,0.381,declined,022819bd-5b9b-4440-a8ad-a8f57a67a867,transfer,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""022819bd-5b9b-4440-a8ad-a8f57a67a867"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1061"", ""customer_id"": ""CUST251"", ""merchant_id"": ""MER15"", ""amount"": 377.77, ""currency"": ""USD"", ""txn_type"": ""transfer"", ""status"": ""declined"", ""risk_score"": 0.381}"
ACC1035,86.62,CAD,CUST220,2026-02-01T10:00:00.000Z,MER25,0.467,approved,04a4bae9-0ce3-4b94-be63-f292001ee68f,refund,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""04a4bae9-0ce3-4b94-be63-f292001ee68f"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1035"", ""customer_id"": ""CUST220"", ""merchant_id"": ""MER25"", ""amount"": 86.62, ""currency"": ""CAD"", ""txn_type"": ""refund"", ""status"": ""approved"", ""risk_score"": 0.467}"
ACC1076,190.18,USD,CUST221,2026-02-01T10:00:00.000Z,MER23,0.926,reversed,01f7fea0-efcf-4d87-8258-fbbf1ecfbc10,refund,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""01f7fea0-efcf-4d87-8258-fbbf1ecfbc10"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1076"", ""customer_id"": ""CUST221"", ""merchant_id"": ""MER23"", ""amount"": 190.18, ""currency"": ""USD"", ""txn_type"": ""refund"", ""status"": ""reversed"", ""risk_score"": 0.926}"
ACC1056,177.82,USD,CUST241,2026-02-01T10:00:00.000Z,MER39,0.689,approved,046b13a8-9e30-4c9b-b42f-af1be22e0d57,card_purchase,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""046b13a8-9e30-4c9b-b42f-af1be22e0d57"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1056"", ""customer_id"": ""CUST241"", ""merchant_id"": ""MER39"", ""amount"": 177.82, ""currency"": ""USD"", ""txn_type"": ""card_purchase"", ""status"": ""approved"", ""risk_score"": 0.689}"
ACC1048,169.9,CAD,CUST252,2026-02-01T10:00:00.000Z,MER40,0.837,approved,07bc90b2-6235-4c50-8149-501e94a71904,atm_withdrawal,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""07bc90b2-6235-4c50-8149-501e94a71904"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1048"", ""customer_id"": ""CUST252"", ""merchant_id"": ""MER40"", ""amount"": 169.9, ""currency"": ""CAD"", ""txn_type"": ""atm_withdrawal"", ""status"": ""approved"", ""risk_score"": 0.837}"
ACC1038,456.31,USD,CUST233,2026-02-01T10:00:00.000Z,MER19,0.972,approved,0b11df50-f444-4a33-8dcc-64969e1549d6,atm_withdrawal,2026-02-07T05:12:16.279Z,ab2fe3ca-068d-4f66-a3f4-ae857009c7bf,"{""transaction_id"": ""0b11df50-f444-4a33-8dcc-64969e1549d6"", ""event_time"": ""2026-02-01 10:00:00"", ""account_id"": ""ACC1038"", ""customer_id"": ""CUST233"", ""merchant_id"": ""MER19"", ""amount"": 456.31, ""currency"": ""USD"", ""txn_type"": ""atm_withdrawal"", ""status"": ""approved"", ""risk_score"": 0.972}"


In [0]:
%sql
USE fintech_de;



In [0]:
%sql
CREATE OR REPLACE TABLE fintech_de.gold_daily_kpis AS
SELECT
  DATE(event_time)               AS txn_date,
  currency,
  COUNT(*)                       AS txn_count,
  ROUND(SUM(amount), 2)          AS total_amount,
  ROUND(
    AVG(CASE WHEN status = 'approved' THEN 1 ELSE 0 END),
    4
  )                              AS approval_rate,
  ROUND(AVG(risk_score), 4)      AS avg_risk_score
FROM fintech_de.silver_transactions
GROUP BY 1, 2;


num_affected_rows,num_inserted_rows


In [0]:
%sql
SELECT * 
FROM fintech_de.gold_daily_kpis
ORDER BY txn_date DESC;


txn_date,currency,txn_count,total_amount,approval_rate,avg_risk_score
2026-02-01,CAD,204,51726.22,0.3137,0.5027
2026-02-01,USD,196,46461.08,0.3214,0.5307


In [0]:
%sql
CREATE OR REPLACE TABLE fintech_de.gold_merchant_performance AS
SELECT
  merchant_id,
  COALESCE(
    get_json_object(raw_payload, '$.merchant_category'),
    'unknown'
  ) AS merchant_category,
  COUNT(*)                       AS txn_count,
  ROUND(SUM(amount), 2)          AS total_amount,
  ROUND(AVG(risk_score), 4)      AS avg_risk_score
FROM fintech_de.silver_transactions
WHERE status = 'approved'
GROUP BY 1, 2
ORDER BY total_amount DESC;


num_affected_rows,num_inserted_rows


In [0]:
 from pyspark.sql.functions import get_json_object, col

silver = spark.table("fintech_de.silver_transactions")

silver_enriched = (
    silver
    .withColumn(
        "merchant_category",
        get_json_object(col("raw_payload"), "$.merchant_category")
    )
)

silver_enriched.write.mode("overwrite") \
    .option("overwriteSchema","true") \
    .saveAsTable("fintech_de.silver_transactions")


In [0]:
%sql
SELECT *
FROM fintech_de.gold_merchant_performance
LIMIT 10;


merchant_id,merchant_category,txn_count,total_amount,avg_risk_score
MER14,unknown,7,2150.1,0.4401
MER26,unknown,7,1983.75,0.6046
MER17,unknown,6,1671.66,0.5647
MER8,unknown,6,1587.95,0.6697
MER10,unknown,5,1382.68,0.7908
MER18,unknown,4,1354.85,0.3938
MER5,unknown,5,1295.66,0.3876
MER16,unknown,5,1231.67,0.5426
MER24,unknown,4,1213.95,0.6798
MER33,unknown,4,1161.0,0.5223


In [0]:
%sql
CREATE TABLE IF NOT EXISTS fintech_de.silver_transactions (
  transaction_id STRING,
  event_time TIMESTAMP,
  account_id STRING,
  customer_id STRING,
  merchant_id STRING,
  amount DECIMAL(18,2),
  currency STRING,
  txn_type STRING,
  channel STRING,
  status STRING,
  country STRING,
  risk_score DOUBLE,
  ingest_time TIMESTAMP,
  batch_id STRING,
  raw_payload STRING
) USING DELTA;



In [0]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

bronze = spark.table("fintech_de.bronze_transactions")

incremental = (
    bronze
    .withColumn("event_time", to_timestamp("event_time"))
    .withColumn("amount", col("amount").cast("decimal(18,2)"))
    .withColumn("risk_score", col("risk_score").cast("double"))
    .filter(col("transaction_id").isNotNull())
    .filter(col("event_time").isNotNull())
    .filter(col("amount") > 0)
    .filter(col("currency").isNotNull())
)



In [0]:
%sql
MERGE INTO fintech_de.silver_transactions AS tgt
USING fintech_de.bronze_transactions AS src
ON tgt.transaction_id = src.transaction_id

WHEN MATCHED AND src.ingest_time > tgt.ingest_time THEN
  UPDATE SET *

WHEN NOT MATCHED THEN
  INSERT *
;


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4524556756415951>, line 1[0m
[0;32m----> 1[0m get_ipython()[38;5;241m.[39mrun_cell_magic([38;5;124m'[39m[38;5;124msql[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124mMERGE INTO fintech_de.silver_transactions AS tgt[39m[38;5;130;01m\n[39;00m[38;5;124mUSING fintech_de.bronze_transactions AS src[39m[38;5;130;01m\n[39;00m[38;5;124mON tgt.transaction_id = src.transaction_id[39m[38;5;130;01m\n[39;00m[38;5;130;01m\n[39;00m[38;5;124mWHEN MATCHED AND src.ingest_time > tgt.ingest_time THEN[39m[38;5;130;01m\n[39;00m[38;5;124m  UPDATE SET *[39m[38;5;130;01m\n[39;00m[38;5;130;01m\n[39;00m[38;5;124mWHEN NOT MATCHED THEN[39m[38;5;130;01m\n[39;00m[38;5;124m  INSERT *[39m[38;5;130;01m\n[39;00m[38;5;124m;[39m[38;5;130;01m\n[39;00m

In [0]:
%sql
DESCRIBE TABLE fintech_de.silver_transactions;


col_name,data_type,comment
account_id,string,
amount,"decimal(18,2)",
currency,string,
customer_id,string,
event_time,timestamp,
merchant_id,string,
risk_score,double,
status,string,
transaction_id,string,
txn_type,string,


In [0]:
%sql
ALTER TABLE fintech_de.silver_transactions
ADD COLUMNS (channel STRING);


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4524556756415953>, line 1[0m
[0;32m----> 1[0m get_ipython()[38;5;241m.[39mrun_cell_magic([38;5;124m'[39m[38;5;124msql[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124m'[39m, [38;5;124m'[39m[38;5;124mALTER TABLE fintech_de.silver_transactions[39m[38;5;130;01m\n[39;00m[38;5;124mADD COLUMNS (channel STRING);[39m[38;5;130;01m\n[39;00m[38;5;124m'[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/IPython/core/interactiveshell.py:2541[0m, in [0;36mInteractiveShell.run_cell_magic[0;34m(self, magic_name, line, cell)[0m
[1;32m   2539[0m [38;5;28;01mwith[39;00m [38;5;28mself[39m[38;5;241m.[39mbuiltin_trap:
[1;32m   2540[0m     args [38;5;241m=[39m (magic_arg_s, cell)
[0;32m-> 2541[0m     result [38;5;241m=[39m fn([38;5;241m*[39margs, [38;5;241m

In [0]:
%sql
SELECT
  channel,
  COUNT(*) AS cnt
FROM fintech_de.silver_transactions
GROUP BY channel;


channel,cnt
,400


In [0]:
%sql
MERGE INTO fintech_de.silver_transactions AS tgt
USING (
  SELECT
    transaction_id,
    to_timestamp(event_time)        AS event_time,
    account_id,
    customer_id,
    merchant_id,
    CAST(amount AS DECIMAL(18,2))   AS amount,
    currency,
    txn_type,
    status,
    CAST(risk_score AS DOUBLE)      AS risk_score,
    ingest_time,
    batch_id,
    raw_payload,
    COALESCE(
      get_json_object(raw_payload, '$.merchant_category'),
      'unknown'
    ) AS merchant_category
  FROM fintech_de.bronze_transactions
  WHERE transaction_id IS NOT NULL
    AND amount > 0
    AND currency IS NOT NULL
) src
ON tgt.transaction_id = src.transaction_id

WHEN MATCHED AND src.ingest_time > tgt.ingest_time THEN
  UPDATE SET
    event_time        = src.event_time,
    account_id        = src.account_id,
    customer_id       = src.customer_id,
    merchant_id       = src.merchant_id,
    amount            = src.amount,
    currency          = src.currency,
    txn_type          = src.txn_type,
    status            = src.status,
    risk_score        = src.risk_score,
    ingest_time       = src.ingest_time,
    batch_id          = src.batch_id,
    raw_payload       = src.raw_payload,
    merchant_category = src.merchant_category

WHEN NOT MATCHED THEN
  INSERT (
    transaction_id,
    event_time,
    account_id,
    customer_id,
    merchant_id,
    amount,
    currency,
    txn_type,
    status,
    risk_score,
    ingest_time,
    batch_id,
    raw_payload,
    merchant_category
  )
  VALUES (
    src.transaction_id,
    src.event_time,
    src.account_id,
    src.customer_id,
    src.merchant_id,
    src.amount,
    src.currency,
    src.txn_type,
    src.status,
    src.risk_score,
    src.ingest_time,
    src.batch_id,
    src.raw_payload,
    src.merchant_category
  );


num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
0,0,0,0


In [0]:
%sql
SELECT COUNT(*) AS total_rows,
       COUNT(DISTINCT transaction_id) AS unique_txns
FROM fintech_de.silver_transactions;


total_rows,unique_txns
400,400


In [0]:
%sql
SELECT merchant_category, COUNT(*)
FROM fintech_de.silver_transactions
GROUP BY merchant_category;


merchant_category,COUNT(*)
,400
