In [0]:
%sql
DROP TABLE IF EXISTS workspace.lakehouse_db.finance_bronze;
DROP TABLE IF EXISTS workspace.lakehouse_db.finance_silver;
DROP TABLE IF EXISTS workspace.lakehouse_db.finance_gold;

In [0]:
%python
# Clear the folders on DBFS/S3/ADLS
dbutils.fs.rm(CHECKPOINT_BRONZE, recurse=True)
dbutils.fs.rm(CHECKPOINT_SILVER, recurse=True)

print("âœ… Streaming memory cleared. Ready for a fresh start.")

In [0]:
# STARTS HERE
import sys, os, importlib
sys.path.append(os.path.abspath('..'))

# Load and Reload custom modules
import src.config, src.engine, utils.generate_finance_data
for mod in [src.config, src.engine, utils.generate_finance_data]:
    importlib.reload(mod)

from src.config import *
from src.engine import run_bronze, run_silver, run_gold
from utils.generate_finance_data import generate_finance_data

# Final safety check: Clear previous run data (optional for demo)
# dbutils.fs.rm(CHECKPOINT_BRONZE, recurse=True) 
print(f"âœ… System Ready. Targeting Catalog: {CATALOG}")

In [0]:
%python
with open("../utils/environment_setup.sql", "r") as f:
    setup_sql = f.read()
    # Split by semicolon and execute each statement
    for statement in setup_sql.split(';'):
        if statement.strip():
            spark.sql(statement)
            
print("âœ… Environment verified via utils/environment_setup.sql")

In [0]:
# Simulate 500 new financial transactions landing in the volume
generate_finance_data(500)

# Peek at the raw files
display(dbutils.fs.ls(f"{BASE_PATH}finance/raw/"))

In [0]:
# 1. BRONZE: Ingest raw JSON using Auto Loader
run_bronze(spark, f"{BASE_PATH}finance/raw/", TABLE_BRONZE, CHECKPOINT_BRONZE)

# 2. SILVER: Enrich with Partner Lookup 
partners_df = spark.createDataFrame([
    (101, "BMO_Global"), (102, "Air_Miles_Rewards"), (103, "Mastercard_Promo")
], ["source_id", "partner_name"])

run_silver(spark, TABLE_BRONZE, TABLE_SILVER, partners_df)

# 3. GOLD: Aggregate Spend by Partner
run_gold(spark, TABLE_SILVER, TABLE_GOLD, ["partner_name"], {"amount": "sum", "event_id": "count"})

print("ðŸš€ Pipeline Execution Complete.")

In [0]:
# View the final Gold results
results_df = spark.read.table(TABLE_GOLD)
display(results_df)

# Show Data Lineage / History
# display(spark.sql(f"DESCRIBE HISTORY {TABLE_GOLD}"))