In [0]:
# Always pin catalog + schema
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA fraud_detection")

print("Catalog:", spark.sql("SELECT current_catalog()").collect())
print("Schema:", spark.sql("SELECT current_schema()").collect())


Catalog: [Row(current_catalog()='workspace')]
Schema: [Row(current_schema()='fraud_detection')]


In [0]:
RAW_BASE = "dbfs:/Volumes/workspace/fraud_detection/raw"


In [0]:
dbutils.fs.ls("dbfs:/Volumes/workspace/fraud_detection/raw/")


[FileInfo(path='dbfs:/Volumes/workspace/fraud_detection/raw/transactions/', name='transactions/', size=0, modificationTime=1770585961073)]

In [0]:
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA fraud_detection")

spark.sql("SELECT current_catalog(), current_schema()").show()


+-----------------+----------------+
|current_catalog()|current_schema()|
+-----------------+----------------+
|        workspace| fraud_detection|
+-----------------+----------------+



In [0]:
raw_txn_df = spark.read.json(
    "dbfs:/Volumes/workspace/fraud_detection/raw/transactions/"
)



In [0]:
raw_txn_df.printSchema()


root
 |-- amount: double (nullable = true)
 |-- countryDest: string (nullable = true)
 |-- countryOrig: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- id: string (nullable = true)
 |-- isUnauthorizedOverdraft: long (nullable = true)
 |-- nameDest: string (nullable = true)
 |-- nameOrig: string (nullable = true)
 |-- newBalanceDest: double (nullable = true)
 |-- newBalanceOrig: double (nullable = true)
 |-- oldBalanceDest: double (nullable = true)
 |-- oldBalanceOrig: double (nullable = true)
 |-- step: long (nullable = true)
 |-- type: string (nullable = true)



In [0]:
raw_txn_df.show(5, truncate=False)


+---------+-----------+-----------+-----------+------------------------------------+-----------------------+-----------+-----------+--------------+--------------+--------------+--------------+----+-------+
|amount   |countryDest|countryOrig|customer_id|id                                  |isUnauthorizedOverdraft|nameDest   |nameOrig   |newBalanceDest|newBalanceOrig|oldBalanceDest|oldBalanceOrig|step|type   |
+---------+-----------+-----------+-----------+------------------------------------+-----------------------+-----------+-----------+--------------+--------------+--------------+--------------+----+-------+
|67124.53 |ITA        |REU        |NULL       |90c0e73f-3878-41a7-8964-609e10f2c1b4|0                      |M4457655148|C7854608532|0.0           |243257.97     |0.0           |176133.44     |1   |CASH_IN|
|120052.48|NGA        |PER        |NULL       |5055ca4b-d16d-4a05-97c7-a9e38fb9fb85|0                      |M0295918729|C2217616908|0.0           |490418.16     |0.0           

In [0]:
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA fraud_detection")


DataFrame[]

In [0]:
raw_txn_df = spark.read.json(
    "dbfs:/Volumes/workspace/fraud_detection/raw/transactions/"
)


In [0]:
raw_txn_df.count()


5442996

In [0]:
from pyspark.sql.functions import current_timestamp

(
    raw_txn_df
    .withColumn("ingest_time", current_timestamp())
    .write
    .format("delta")
    .option("mergeSchema", "true")
    .mode("overwrite")
    .saveAsTable("bronze_transactions")
)


In [0]:
%sql
SELECT COUNT(*) FROM bronze_transactions;


COUNT(*)
5442996


In [0]:
%sql
SELECT * FROM bronze_transactions LIMIT 5;


amount,countryDest,countryOrig,customer_id,id,isUnauthorizedOverdraft,nameDest,nameOrig,newBalanceDest,newBalanceOrig,oldBalanceDest,oldBalanceOrig,step,type,ingested_at,source,ingest_time
63500.3,PSE,PER,,63532783-86b2-4ab0-9bfa-5185f7deb8be,0,M8095486685,C1508273145,0.0,263846.55,0.0,200346.25,1,CASH_IN,,,2026-02-08T21:37:25.820Z
118753.0,ITA,PER,,60dbb47b-8641-474a-83fd-37f67f4a3b0c,0,M6111908704,C2410203448,0.0,118813.11,0.0,60.11,1,CASH_IN,,,2026-02-08T21:37:25.820Z
162692.35,FRA,FRA,,165ff242-1c92-4309-b9b2-5d7d20a2428e,0,M1616944657,C7305498098,0.0,1262285.09,0.0,1099592.75,1,CASH_IN,,,2026-02-08T21:37:25.820Z
229007.03,PAN,BRA,,c1d6b4c8-b5b4-45e4-9b96-7bf6985beecb,0,M4452069469,C1642574105,0.0,526487.81,0.0,297480.78,1,CASH_IN,,,2026-02-08T21:37:25.820Z
27090.91,PAN,KHM,,88636b7c-9b6a-49e7-9c67-f8e1d2563847,0,M7046374792,C2019145373,0.0,1086074.56,0.0,1058983.65,2,CASH_IN,,,2026-02-08T21:37:25.820Z


In [0]:
%sql
DESCRIBE EXTENDED bronze_transactions;


col_name,data_type,comment
amount,double,
countryDest,string,
countryOrig,string,
customer_id,string,
id,string,
isUnauthorizedOverdraft,bigint,
nameDest,string,
nameOrig,string,
newBalanceDest,double,
newBalanceOrig,double,


In [0]:
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA fraud_detection")


DataFrame[]

In [0]:
from pyspark.sql.functions import current_timestamp

bronze_customers_df = (
    spark.table("bronze_transactions")
    .selectExpr(
        "nameOrig as customer_id",
        "countryOrig as country_code"
    )
    .withColumn("ingest_time", current_timestamp())
)


In [0]:
(
    bronze_customers_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("bronze_customers")
)


In [0]:
%sql
SELECT COUNT(*) FROM bronze_customers;
SELECT * FROM bronze_customers LIMIT 5;
DESCRIBE EXTENDED bronze_customers;


col_name,data_type,comment
customer_id,string,
country_code,string,
ingest_time,timestamp,
,,
# Delta Statistics Columns,,
Column Names,"customer_id, country_code, ingest_time",
Column Selection Method,first-32,
,,
# Detailed Table Information,,
Catalog,workspace,


In [0]:
from pyspark.sql.functions import current_timestamp

bronze_country_df = (
    spark.table("bronze_transactions")
    .selectExpr("countryOrig as country_code")
    .union(
        spark.table("bronze_transactions")
        .selectExpr("countryDest as country_code")
    )
    .withColumn("ingest_time", current_timestamp())
)


In [0]:
(
    bronze_country_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("bronze_country_code")
)


In [0]:
%sql
SELECT COUNT(*) FROM bronze_country_code;
SELECT DISTINCT country_code FROM bronze_country_code;


country_code
CAN
ITA
ESP
QAT
PSE
PER
TUR
RUS
FRA
KHM


In [0]:
from pyspark.sql.functions import current_timestamp, lit

bronze_fraud_df = (
    spark.table("bronze_transactions")
    .select(
        "id",
        "isUnauthorizedOverdraft"
    )
    .withColumnRenamed("isUnauthorizedOverdraft", "fraud_flag")
    .withColumn("ingest_time", current_timestamp())
)


In [0]:
(
    bronze_fraud_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("bronze_fraud_report")
)


In [0]:
%sql
SELECT COUNT(*) FROM bronze_fraud_report;
SELECT fraud_flag, COUNT(*) 
FROM bronze_fraud_report 
GROUP BY fraud_flag;


fraud_flag,COUNT(*)
0,5430616
1,12380


In [0]:
%sql SHOW TABLES;


database,tableName,isTemporary
fraud_detection,bronze_country_code,False
fraud_detection,bronze_customers,False
fraud_detection,bronze_fraud_report,False
fraud_detection,bronze_transactions,False


In [0]:
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA fraud_detection")



DataFrame[]

In [0]:
from pyspark.sql.functions import col

silver_dim_customers_df = (
    spark.table("bronze_transactions")
    .select(col("nameOrig").alias("customer_id"))
    .union(
        spark.table("bronze_transactions")
        .select(col("nameDest").alias("customer_id"))
    )
    .dropna()
    .dropDuplicates(["customer_id"])
)


In [0]:
(
    silver_dim_customers_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_dim_customers")
)


In [0]:
%sql
SELECT COUNT(*) FROM silver_dim_customers;
SELECT * FROM silver_dim_customers LIMIT 5;


customer_id
C7168983926
C2401457690
C2317870228
C1189806313
CC0189065943


In [0]:
silver_dim_countries_df = (
    spark.table("bronze_transactions")
    .select(col("countryOrig").alias("country_code"))
    .union(
        spark.table("bronze_transactions")
        .select(col("countryDest").alias("country_code"))
    )
    .dropna()
    .dropDuplicates(["country_code"])
)


In [0]:
(
    silver_dim_countries_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_dim_countries")
)


In [0]:
%sql
SELECT COUNT(*) FROM silver_dim_countries;
SELECT * FROM silver_dim_countries;


country_code
CAN
ITA
ESP
QAT
PSE
PER
TUR
RUS
FRA
KHM


In [0]:
silver_dim_fraud_flags_df = (
    spark.table("bronze_transactions")
    .select(
        col("id").alias("transaction_id"),
        col("isUnauthorizedOverdraft").cast("int").alias("unauthorized_overdraft_flag")
    )
)


In [0]:
(
    silver_dim_fraud_flags_df
    .write
    .format("delta")
    .mode("overwrite")
    .saveAsTable("silver_dim_fraud_flags")
)


In [0]:
%sql
SELECT COUNT(*) FROM silver_dim_fraud_flags;
SELECT * FROM silver_dim_fraud_flags LIMIT 5;


transaction_id,unauthorized_overdraft_flag
90c0e73f-3878-41a7-8964-609e10f2c1b4,0
5055ca4b-d16d-4a05-97c7-a9e38fb9fb85,0
34db2fa6-1c2b-4e47-919b-fb5f1015276a,0
4eba2416-4b83-477d-8aec-fc056f1ccf38,0
ef0bb47c-d9c5-4f16-8cfb-d986084e1695,0
