# DS-2002 Project 2 Capstone â€” Dimensional Data Lakehouse (Bronze/Silver/Gold)

This Databricks notebook implements a **dimensional Data Lakehouse** for an **Insurance Billing** business process.


In [0]:
spark.range(5).show()


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [0]:

import os
import json
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Configure  DBFS paths

DBFS_BASE = "dbfs:/Volumes/workspace/ds2002_project2/ds2002/"
dbfs_csv_path = f"{DBFS_BASE}/insurance.csv"

BRONZE_PATH = f"{DBFS_BASE}/bronze"
SILVER_PATH = f"{DBFS_BASE}/silver"
GOLD_PATH   = f"{DBFS_BASE}/gold"
CHECKPOINTS = f"{DBFS_BASE}/_checkpoints"
STREAM_INPUT = f"{DBFS_BASE}/stream_input"  # where we will write 3 JSON batches for Auto Loader

spark.sql("CREATE DATABASE IF NOT EXISTS ds2002_project2")
spark.sql("USE ds2002_project2")


SQL_DB   = "northwind_dw2"                 
SQL_PWD  = "BubbleGuppy369"
SQL_HOST = "ds2002-sql-aisha.database.windows.net"   
SQL_DB     = "ds2002_project2"
SQL_USER   = "ds2002admin"


MONGO_URI  = "mongodb+srv://aishanipatnaik16_db_user:5CEkBdFkCxQxNUoB@cluster0.ygqk0ar.mongodb.net/?appName=Cluster0"  

#MYSQL_JDBC_URL = f"jdbc:mysql://{SQL_HOST}:3306/{SQL_DB}?useSSL=true&requireSSL=true&serverTimezone=UTC"

print("DBFS_BASE:", DBFS_BASE)

insurance_df = (spark.read.option("header", True).option("inferSchema", True).csv(dbfs_csv_path))
display(insurance_df.limit(5))



DBFS_BASE: dbfs:/Volumes/workspace/ds2002_project2/ds2002/


age,sex,bmi,children,smoker,region,charges
19,female,27.9,0,yes,southwest,16884.924
18,male,33.77,1,no,southeast,1725.5523
28,male,33.0,3,no,southeast,4449.462
33,male,22.705,0,no,northwest,21984.47061
32,male,28.88,0,no,northwest,3866.8552


In [0]:
SQL_JDBC_URL = (
  f"jdbc:sqlserver://{SQL_HOST}:1433;"
  f"database={SQL_DB};"
  "encrypt=true;"
  "trustServerCertificate=false;"
  "hostNameInCertificate=*.database.windows.net;"
  "loginTimeout=30;"
)

dim_date = (
  spark.read.format("jdbc")
  .option("url", SQL_JDBC_URL)
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  .option("dbtable", "dbo.DimDate")     # <-- adjust if your table name differs
  .option("user", SQL_USER)
  .option("password", SQL_PWD)
  .load()
)

display(dim_date.limit(5))
print(dim_date.count())


DateKey,Date,Day,DaySuffix,Weekday,WeekDayName,WeekDayName_Short,WeekDayName_FirstLetter,DOWInMonth,DayOfYear,WeekOfMonth,WeekOfYear,Month,MonthName,MonthName_Short,MonthName_FirstLetter,Quarter,QuarterName,Year,MMYYYY,MonthYear,IsWeekend,IsHoliday,HolidayName,SpecialDays,FinancialYear,FinancialQuater,FinancialMonth,FirstDateofYear,LastDateofYear,FirstDateofQuater,LastDateofQuater,FirstDateofMonth,LastDateofMonth,FirstDateofWeek,LastDateofWeek,CurrentYear,CurrentQuater,CurrentMonth,CurrentWeek,CurrentDay
20000101,2000-01-01,1,st,7,Saturday,SAT,S,1,1,1,1,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,1999-12-26,2000-01-01,-25,-103,-311,-1354,-9475
20000102,2000-01-02,2,nd,1,Sunday,SUN,S,2,2,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9474
20000103,2000-01-03,3,rd,2,Monday,MON,M,3,3,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9473
20000104,2000-01-04,4,th,3,Tuesday,TUE,T,4,4,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9472
20000105,2000-01-05,5,th,4,Wednesday,WED,W,5,5,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9471


4017


In [0]:
test = (spark.read.format("jdbc")
  .option("url", SQL_JDBC_URL)
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  .option("query", "SELECT 1 AS ok")
  .option("user", SQL_USER)
  .option("password", SQL_PWD)
  .load()
)
display(test)


ok
1


## 1)  DBFS Files (CSV): 

 Use provided `insurance.csv` as a **file-system source** and create dimensions:
- `dim_member`
- `dim_region`
- `dim_risk_profile`

In [0]:
insurance_local = spark.table("default.insurance_raw")
display(insurance_local.limit(5))
print("Rows:", insurance_local.count())



age,sex,bmi,children,smoker,region,charges
19,female,27.9,0,yes,southwest,16884.924
18,male,33.77,1,no,southeast,1725.5523
28,male,33.0,3,no,southeast,4449.462
33,male,22.705,0,no,northwest,21984.47061
32,male,28.88,0,no,northwest,3866.8552


Rows: 1338


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Treat common "missing" strings as nulls
clean = (
    insurance_df
    .select(
        F.when(F.lower(F.col("age").cast("string")).isin("nan", "null", ""), None).otherwise(F.col("age")).alias("age"),
        F.col("sex").cast("string").alias("sex"),
        F.when(F.lower(F.col("bmi").cast("string")).isin("nan", "null", ""), None).otherwise(F.col("bmi")).alias("bmi"),
        F.when(F.lower(F.col("children").cast("string")).isin("nan", "null", ""), None).otherwise(F.col("children")).alias("children"),
        F.col("smoker").cast("string").alias("smoker"),
        F.col("region").cast("string").alias("region"),
        # keep charges if you need it later
        F.when(F.lower(F.col("charges").cast("string")).isin("nan", "null", ""), None).otherwise(F.col("charges")).alias("charges"),
    )
)

members = insurance_df.withColumn("member_id", F.monotonically_increasing_id())

members = (
    insurance_df
    .withColumn("member_id", F.row_number().over(Window.orderBy(F.monotonically_increasing_id())))
    .select(
        "member_id",
        F.expr("try_cast(age as int)").alias("age"),
        F.col("sex").cast("string").alias("sex"),
        F.expr("try_cast(bmi as double)").alias("bmi"),
        F.expr("try_cast(children as int)").alias("children"),
        F.col("smoker").cast("string").alias("smoker"),
        F.col("region").cast("string").alias("region"),
    )
)

display(
    insurance_df
    .select("age", "children", "bmi")
    .where(F.lower(F.col("age").cast("string")) == "nan")
    .limit(10)
)


age,children,bmi




In [0]:
from pyspark.sql import functions as F

# Build file-derived dimensions (robust to malformed values like 'nan')

members = (
    insurance_df
    # no Window => no warning
    .withColumn("member_id", F.monotonically_increasing_id())
    .select(
        F.col("member_id").cast("long").alias("member_id"),
        F.expr("try_cast(age as int)").alias("age"),
        F.col("sex").cast("string").alias("sex"),
        F.expr("try_cast(bmi as double)").alias("bmi"),
        F.expr("try_cast(children as int)").alias("children"),
        F.col("smoker").cast("string").alias("smoker"),
        F.col("region").cast("string").alias("region"),
    )
)

dim_member = members.dropDuplicates(["member_id"])

dim_region = (
    members.select(F.col("region").alias("region_name"))
    .dropDuplicates()
    .withColumn("region_key", F.row_number().over(Window.orderBy("region_name")))
    .select("region_key", "region_name")
)

dim_member = (
    dim_member.join(dim_region, dim_member.region == dim_region.region_name, "left")
    .drop("region")
    .withColumnRenamed("region_name", "region")
)

dim_risk_profile = (
    dim_member
    .select("sex", "smoker", "region_key")
    .dropDuplicates()
    .withColumn("risk_profile_key", F.row_number().over(Window.orderBy("sex","smoker","region_key")))
)

dim_member.write.format("delta").mode("overwrite").saveAsTable("dim_member")
dim_region.write.format("delta").mode("overwrite").saveAsTable("dim_region")
dim_risk_profile.write.format("delta").mode("overwrite").saveAsTable("dim_risk_profile")

display(spark.table("dim_member").limit(10))
display(spark.table("dim_region"))
display(spark.table("dim_risk_profile"))




member_id,age,sex,bmi,children,smoker,region_key,region
271,50,male,34.2,2,yes,4,southwest
372,42,female,33.155,1,no,1,northeast
722,62,male,37.4,0,no,4,southwest
212,24,male,28.5,2,no,2,northwest
1191,41,female,21.755,1,no,1,northeast
234,39,male,24.51,2,no,2,northwest
362,19,female,21.7,0,yes,4,southwest
1121,46,male,38.17,2,no,3,southeast
331,52,male,27.36,0,yes,2,northwest
547,54,female,46.7,2,no,4,southwest


region_key,region_name
1,northeast
2,northwest
3,southeast
4,southwest


sex,smoker,region_key,risk_profile_key
female,no,1,1
female,no,2,2
female,no,3,3
female,no,4,4
female,yes,1,5
female,yes,2,6
female,yes,3,7
female,yes,4,8
male,no,1,9
male,no,2,10


## 2) Azure SQL (Relational): read the **SQL-created** Date dimension
__

In [0]:
# Read Date dimension from Azure SQL Database (SQL Server)

dim_date_sql = (
  spark.read.format("jdbc")
  .option("url", SQL_JDBC_URL) 
  .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
  .option("dbtable", "dbo.DimDate")   
  .option("user", SQL_USER)
  .option("password", SQL_PWD)
  .load()
)

display(dim_date_sql.limit(10))
print("dim_date rows:", dim_date_sql.count())

dim_date_sql.write.format("delta").mode("overwrite").saveAsTable("dim_date")


DateKey,Date,Day,DaySuffix,Weekday,WeekDayName,WeekDayName_Short,WeekDayName_FirstLetter,DOWInMonth,DayOfYear,WeekOfMonth,WeekOfYear,Month,MonthName,MonthName_Short,MonthName_FirstLetter,Quarter,QuarterName,Year,MMYYYY,MonthYear,IsWeekend,IsHoliday,HolidayName,SpecialDays,FinancialYear,FinancialQuater,FinancialMonth,FirstDateofYear,LastDateofYear,FirstDateofQuater,LastDateofQuater,FirstDateofMonth,LastDateofMonth,FirstDateofWeek,LastDateofWeek,CurrentYear,CurrentQuater,CurrentMonth,CurrentWeek,CurrentDay
20000101,2000-01-01,1,st,7,Saturday,SAT,S,1,1,1,1,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,1999-12-26,2000-01-01,-25,-103,-311,-1354,-9475
20000102,2000-01-02,2,nd,1,Sunday,SUN,S,2,2,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9474
20000103,2000-01-03,3,rd,2,Monday,MON,M,3,3,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9473
20000104,2000-01-04,4,th,3,Tuesday,TUE,T,4,4,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9472
20000105,2000-01-05,5,th,4,Wednesday,WED,W,5,5,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9471
20000106,2000-01-06,6,th,5,Thursday,THU,T,6,6,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9470
20000107,2000-01-07,7,th,6,Friday,FRI,F,7,7,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9469
20000108,2000-01-08,8,th,7,Saturday,SAT,S,8,8,2,2,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-02,2000-01-08,-25,-103,-311,-1353,-9468
20000109,2000-01-09,9,th,1,Sunday,SUN,S,9,9,3,3,1,January,JAN,J,1,First,2000,12000,2000JAN,True,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-09,2000-01-15,-25,-103,-311,-1352,-9467
20000110,2000-01-10,10,th,2,Monday,MON,M,10,10,3,3,1,January,JAN,J,1,First,2000,12000,2000JAN,False,False,,,,,,2000-01-01,2000-12-31,2025-10-01,2025-12-31,2000-01-01,2000-01-31,2000-01-09,2000-01-15,-25,-103,-311,-1352,-9466


dim_date rows: 4017


## 3)MongoDB Atlas (NoSQL): seed + read a dimension




**With Databricks and MongoDB free accounts I am having a lot of trouble connecting the two... I tried to connect them normally, and through an API, and neither is available for my accounts.**

I used providers.json through MongoDB and cleaned it to providers_array.json through which I uploaded the file into Databricks. I know it is not ideal, but this was the only way I was able to do so without having so subscribe to a premium account for either MongoDB or Databricks. 



In [0]:
spark.sql("DROP TABLE IF EXISTS dim_provider")


mongo_json_path = "dbfs:/Volumes/workspace/ds2002_project2/ds2002/providers_array.json"
providers_df = spark.read.option("multiline","true").json(mongo_json_path)
display(providers_df)
print("rows:", providers_df.count())

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w = Window.orderBy("region", "preferred_provider", "network_tier")
dim_provider = providers_df.withColumn("provider_key", F.row_number().over(w))

dim_provider.write.format("delta").mode("overwrite").saveAsTable("dim_provider")
display(dim_provider)




network_tier,preferred_provider,region
Tier 1,BlueShield SW,southwest
Tier 2,CarePlus SE,southeast
Tier 1,Cascade Health NW,northwest
Tier 3,Pinetree NE,northeast


rows: 4




network_tier,preferred_provider,region,provider_key
Tier 3,Pinetree NE,northeast,1
Tier 1,Cascade Health NW,northwest,2
Tier 2,CarePlus SE,southeast,3
Tier 1,BlueShield SW,southwest,4


This JSON file is an export of the MongoDB Atlas collection used as a NoSQL dimension source. Databricks Free/Serverless does not support installing the MongoDB Spark connector, so the extract was exported from Atlas and loaded into the Lakehouse volume.


In [0]:
from pyspark.sql import Row

providers = [
    Row(provider_code="P001", provider_name="Blue Ridge Health", plan_tier="Gold",   active=True),
    Row(provider_code="P002", provider_name="Cville Care Network", plan_tier="Silver", active=True),
    Row(provider_code="P003", provider_name="Piedmont Clinic",     plan_tier="Bronze", active=True),
]

providers_df = spark.createDataFrame(providers)
display(providers_df)


provider_code,provider_name,plan_tier,active
P001,Blue Ridge Health,Gold,True
P002,Cville Care Network,Silver,True
P003,Piedmont Clinic,Bronze,True


These cells below were when I was trying to troubleshoot connecting MongoDB and Databricks.

In [0]:
#mongo_db = "ds2002_project2"
#mongo_coll = "dim_provider"

'''

(providers_df.write.format("mongodb")
  .mode("overwrite")   # overwrites the collection contents
  .option("uri", MONGO_URI)
  .option("database", mongo_db)
  .option("collection", mongo_coll)
  .save()
)
'''


'\n\n(providers_df.write.format("mongodb")\n  .mode("overwrite")   # overwrites the collection contents\n  .option("uri", MONGO_URI)\n  .option("database", mongo_db)\n  .option("collection", mongo_coll)\n  .save()\n)\n'

In [0]:

'''
# 3.1 Seed MongoDB with a dimension collection (optional but guarantees 'actual data in MongoDB')
from pymongo import MongoClient

mongo_db = "ds2002_project2"
mongo_coll = "dim_provider"

client = MongoClient(MONGO_URI)
db = client[mongo_db]
coll = db[mongo_coll]

providers = [
  {"provider_code": "P001", "provider_name": "Blue Ridge Health",   "plan_tier": "Gold",   "active": True},
  {"provider_code": "P002", "provider_name": "Cville Care Network", "plan_tier": "Silver", "active": True},
  {"provider_code": "P003", "provider_name": "Piedmont Clinic",     "plan_tier": "Bronze", "active": True},
]

coll.delete_many({})
coll.insert_many(providers)
print("Mongo seeded docs:", coll.count_documents({}))

'''


'\n# 3.1 Seed MongoDB with a dimension collection (optional but guarantees \'actual data in MongoDB\')\nfrom pymongo import MongoClient\n\nmongo_db = "ds2002_project2"\nmongo_coll = "dim_provider"\n\nclient = MongoClient(MONGO_URI)\ndb = client[mongo_db]\ncoll = db[mongo_coll]\n\nproviders = [\n  {"provider_code": "P001", "provider_name": "Blue Ridge Health",   "plan_tier": "Gold",   "active": True},\n  {"provider_code": "P002", "provider_name": "Cville Care Network", "plan_tier": "Silver", "active": True},\n  {"provider_code": "P003", "provider_name": "Piedmont Clinic",     "plan_tier": "Bronze", "active": True},\n]\n\ncoll.delete_many({})\ncoll.insert_many(providers)\nprint("Mongo seeded docs:", coll.count_documents({}))\n\n'

In [0]:

'''
# 3.2 Read MongoDB collection into Spark
dim_provider = (
    spark.read.format("mongodb")
    .option("spark.mongodb.read.connection.uri", MONGO_URI)
    .option("database", mongo_db)
    .option("collection", mongo_coll)
    .load()
)

dim_provider = (
    dim_provider
    .withColumn("provider_key", F.row_number().over(Window.orderBy("provider_code")))
    .select("provider_key", "provider_code", "provider_name", "plan_tier", "active")
)

dim_provider.write.format("delta").mode("overwrite").saveAsTable("dim_provider")
display(spark.table("dim_provider")) 
'''


'\n# 3.2 Read MongoDB collection into Spark\ndim_provider = (\n    spark.read.format("mongodb")\n    .option("spark.mongodb.read.connection.uri", MONGO_URI)\n    .option("database", mongo_db)\n    .option("collection", mongo_coll)\n    .load()\n)\n\ndim_provider = (\n    dim_provider\n    .withColumn("provider_key", F.row_number().over(Window.orderBy("provider_code")))\n    .select("provider_key", "provider_code", "provider_name", "plan_tier", "active")\n)\n\ndim_provider.write.format("delta").mode("overwrite").saveAsTable("dim_provider")\ndisplay(spark.table("dim_provider")) \n'

## 4)  3 JSON batches and ingest with Auto Loader (Bronze)

Build a fact-event stream from the insurance data and write 3 JSON batches into DBFS folders:
- `stream_input/interval=1`
- `stream_input/interval=2`
- `stream_input/interval=3`


In [0]:
insurance_df = spark.table("default.insurance_raw")
print("insurance_df rows:", insurance_df.count())
insurance_df.printSchema()


insurance_df rows: 1338
root
 |-- age: long (nullable = true)
 |-- sex: string (nullable = true)
 |-- bmi: double (nullable = true)
 |-- children: string (nullable = true)
 |-- smoker: string (nullable = true)
 |-- region: string (nullable = true)
 |-- charges: double (nullable = true)



In [0]:

# 4.1 Build a fact-event source from insurance rows
w = Window.orderBy(F.monotonically_increasing_id())

fact_source = (
    insurance_df
    .withColumn("member_id", F.row_number().over(w))
    .withColumn("event_id", F.concat(F.lit("E"), F.lpad(F.col("member_id").cast("string"), 6, "0")))
    .withColumn("charge_amount", F.col("charges").cast("double"))
    .withColumn("event_date", F.date_add(F.to_date(F.lit("2025-01-01")), (F.col("member_id") % F.lit(365)).cast("int")))
    .withColumn(
        "provider_code",
        F.when((F.col("member_id") % 3) == 1, F.lit("P001"))
         .when((F.col("member_id") % 3) == 2, F.lit("P002"))
         .otherwise(F.lit("P003"))
    )
    .select("event_id", "member_id", "provider_code", "event_date", "charge_amount")
)

display(fact_source.limit(10))
print("Fact rows:", fact_source.count())




event_id,member_id,provider_code,event_date,charge_amount
E000001,1,P001,2025-01-02,16884.924
E000002,2,P002,2025-01-03,1725.5523
E000003,3,P003,2025-01-04,4449.462
E000004,4,P001,2025-01-05,21984.47061
E000005,5,P002,2025-01-06,3866.8552
E000006,6,P003,2025-01-07,3756.6216
E000007,7,P001,2025-01-08,8240.5896
E000008,8,P002,2025-01-09,7281.5056
E000009,9,P003,2025-01-10,6406.4107
E000010,10,P001,2025-01-11,28923.13692


Fact rows: 1338


In [0]:


# 4.2 Write 3 JSON intervals to DBFS for Auto Loader
dbutils.fs.mkdirs(STREAM_INPUT)

total = fact_source.count()
third = total // 3

batch1 = fact_source.limit(third)
batch2 = fact_source.subtract(batch1).limit(third)
batch3 = fact_source.subtract(batch1).subtract(batch2)

for i, b in enumerate([batch1, batch2, batch3], start=1):
    out = f"{STREAM_INPUT}/interval={i}"
    dbutils.fs.rm(out, True)
    b.coalesce(1).write.mode("overwrite").json(out)
    print("Wrote:", out)

display(dbutils.fs.ls(STREAM_INPUT))




Wrote: dbfs:/Volumes/workspace/ds2002_project2/ds2002//stream_input/interval=1
Wrote: dbfs:/Volumes/workspace/ds2002_project2/ds2002//stream_input/interval=2
Wrote: dbfs:/Volumes/workspace/ds2002_project2/ds2002//stream_input/interval=3


path,name,size,modificationTime
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=1/,interval=1/,0,1765501228008
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=2/,interval=2/,0,1765501228008
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=3/,interval=3/,0,1765501228008


### 4A) Auto Loader â†’ Bronze Delta table

In [0]:
spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA ds2002_project2")


DataFrame[]

In [0]:
spark.sql("SELECT current_catalog() AS catalog, current_schema() AS schema").show(truncate=False)
spark.sql("SHOW CATALOGS").show(truncate=False)


+---------+---------------+
|catalog  |schema         |
+---------+---------------+
|workspace|ds2002_project2|
+---------+---------------+

+---------+
|catalog  |
+---------+
|samples  |
|system   |
|workspace|
+---------+



In [0]:
bronze_table = "workspace.ds2002_project2.fact_insurance_bronze"

# clean checkpoint 
dbutils.fs.rm(f"{CHECKPOINTS}/fact_insurance_bronze", True)

bronze_stream = (
  spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaLocation", f"{CHECKPOINTS}/fact_insurance_bronze/schema")
    .option("cloudFiles.includeExistingFiles", "true")
    .option("recursiveFileLookup", "true")     
    .load(STREAM_INPUT)
)

(
  bronze_stream.writeStream
    .format("delta")
    .option("checkpointLocation", f"{CHECKPOINTS}/fact_insurance_bronze")
    .trigger(availableNow=True)
    .toTable(bronze_table)
)

display(spark.table(bronze_table).limit(10))
print("Bronze rows:", spark.table(bronze_table).count())


charge_amount,event_date,event_id,member_id,provider_code,_rescued_data
10422.91665,2025-06-13,E000893,893,P002,
44202.6536,2025-06-14,E000894,894,P003,
13555.0049,2025-06-15,E000895,895,P001,
13063.883,2025-06-16,E000896,896,P002,
19798.05455,2025-06-17,E000897,897,P003,
2221.56445,2025-06-18,E000898,898,P001,
1634.5734,2025-06-19,E000899,899,P002,
2117.33885,2025-06-20,E000900,900,P003,
8688.85885,2025-06-21,E000901,901,P001,
48673.5588,2025-06-22,E000902,902,P002,


Bronze rows: 1338


In [0]:
dbutils.fs.rm(f"{CHECKPOINTS}/{bronze_table}", True)
dbutils.fs.rm(BRONZE_PATH, True)  
spark.sql(f"DROP TABLE IF EXISTS {bronze_table}")


DataFrame[]

In [0]:
files = [f.path for f in dbutils.fs.ls(f"{STREAM_INPUT}/interval=1") if "part-" in f.path]
print(files[0])
display(dbutils.fs.head(files[0], 200))


dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=1/part-00000-tid-738299328326227665-242cfa3f-518b-465f-a673-ddb64c62caf9-306-1-c000.json
[Truncated to first 200 bytes]


'{"event_id":"E000001","member_id":1,"provider_code":"P001","event_date":"2025-01-02","charge_amount":16884.924}\n{"event_id":"E000002","member_id":2,"provider_code":"P002","event_date":"2025-01-03","ch'

In [0]:
display(dbutils.fs.ls(f"{STREAM_INPUT}/interval=1"))
display(dbutils.fs.ls(f"{STREAM_INPUT}/interval=2"))
display(dbutils.fs.ls(f"{STREAM_INPUT}/interval=3"))


path,name,size,modificationTime
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=1/_SUCCESS,_SUCCESS,0,1765501223000
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=1/_committed_738299328326227665,_committed_738299328326227665,113,1765501223000
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=1/_started_738299328326227665,_started_738299328326227665,0,1765501222000
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=1/part-00000-tid-738299328326227665-242cfa3f-518b-465f-a673-ddb64c62caf9-306-1-c000.json,part-00000-tid-738299328326227665-242cfa3f-518b-465f-a673-ddb64c62caf9-306-1-c000.json,50931,1765501222000


path,name,size,modificationTime
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=2/_SUCCESS,_SUCCESS,0,1765501225000
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=2/_committed_397104368856291486,_committed_397104368856291486,113,1765501225000
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=2/_started_397104368856291486,_started_397104368856291486,0,1765501225000
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=2/part-00000-tid-397104368856291486-3267f4f5-1e4c-48f3-b549-545fd4b9d278-311-1-c000.json,part-00000-tid-397104368856291486-3267f4f5-1e4c-48f3-b549-545fd4b9d278-311-1-c000.json,51021,1765501225000


path,name,size,modificationTime
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=3/_SUCCESS,_SUCCESS,0,1765501228000
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=3/_committed_8357391933902772417,_committed_8357391933902772417,114,1765501228000
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=3/_started_8357391933902772417,_started_8357391933902772417,0,1765501227000
dbfs:/Volumes/workspace/ds2002_project2/ds2002/stream_input/interval=3/part-00000-tid-8357391933902772417-f0ce2018-bac2-463f-800a-12e5b92d973c-317-1-c000.json,part-00000-tid-8357391933902772417-f0ce2018-bac2-463f-800a-12e5b92d973c-317-1-c000.json,51417,1765501228000


## 5) Silver: join streaming facts to static dimensions
 join:
- Bronze facts
- dim_member (CSV)
- dim_provider (MongoDB - to JSON)
- dim_date (MySQL)


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window



providers_clean = (
    providers_df
      .select(
          "provider_code",
          F.col("provider_name").alias("preferred_provider"),  # renamed
          F.col("plan_tier").alias("network_tier"),            # renamed
          F.lit(None).cast("string").alias("region")
      )
)

w = Window.orderBy("provider_code")

dim_provider = providers_clean.withColumn("provider_key", F.row_number().over(w))

(dim_provider.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("dim_provider")
)

display(dim_provider)




provider_code,preferred_provider,network_tier,region,provider_key
P001,Blue Ridge Health,Gold,,1
P002,Cville Care Network,Silver,,2
P003,Piedmont Clinic,Bronze,,3


In [0]:
spark.sql("DESCRIBE TABLE dim_provider").show()


In [0]:

spark.sql("USE CATALOG workspace")
spark.sql("USE SCHEMA ds2002_project2")


DataFrame[]

In [0]:
spark.sql("SELECT current_catalog() AS catalog, current_schema() AS schema").show(truncate=False)


+---------+---------------+
|catalog  |schema         |
+---------+---------------+
|workspace|ds2002_project2|
+---------+---------------+



In [0]:
spark.sql("SHOW TABLES IN workspace.ds2002_project2").show(truncate=False)


+---------------+---------------------+-----------+
|database       |tableName            |isTemporary|
+---------------+---------------------+-----------+
|ds2002_project2|dim_date             |false      |
|ds2002_project2|dim_member           |false      |
|ds2002_project2|dim_provider         |false      |
|ds2002_project2|dim_region           |false      |
|ds2002_project2|dim_risk_profile     |false      |
|ds2002_project2|fact_insurance_bronze|false      |
+---------------+---------------------+-----------+



In [0]:
from pyspark.sql import functions as F

silver_table = "fact_insurance_silver"
bronze_table = "fact_insurance_bronze"   # <--- match Part 4

# clean up old outputs
spark.sql(f"DROP TABLE IF EXISTS {silver_table}")
dbutils.fs.rm(SILVER_PATH, True)
dbutils.fs.rm(f"{CHECKPOINTS}/{silver_table}", True)

# alias dimension tables
b = spark.readStream.table(bronze_table).alias("b")
m = spark.table("dim_member").alias("m")
p = spark.table("dim_provider").alias("p")
d = spark.table("dim_date").alias("d")

silver_df = (
    b
    .withColumn("event_date_dt", F.to_date(F.col("b.event_date")))
    .withColumn("charge_amount_dbl", F.col("b.charge_amount").cast("double"))
    .withColumn("member_id_bigint", F.col("b.member_id").cast("bigint"))

    .join(m, F.col("member_id_bigint") == F.col("m.member_id"), "left")
    .join(p, F.col("b.provider_code") == F.col("p.provider_code"), "left")
    .join(d, F.col("event_date_dt") == F.col("d.Date"), "left")

    .select(
        F.col("b.event_id").alias("event_id"),
        F.col("member_id_bigint").alias("member_id"),
        F.col("b.provider_code").alias("provider_code"),
        F.col("event_date_dt").alias("event_date"),
        F.col("charge_amount_dbl").alias("charge_amount"),

        F.col("m.sex"),
        F.col("m.smoker"),
        F.col("m.region").alias("region"),
        F.col("m.region_key"),

        F.col("p.provider_key"),
        F.col("p.network_tier"),
        F.col("p.preferred_provider"),

        F.col("d.DateKey").alias("date_key"),
    )
    .withColumn("fact_key", F.xxhash64("event_id"))
)


(
    silver_df.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", f"{CHECKPOINTS}/{silver_table}")
        .trigger(availableNow=True)
        .start(SILVER_PATH)
        .awaitTermination()
)

# register the Delta folder as a table
spark.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {silver_table}
    AS SELECT * FROM delta.`{SILVER_PATH}`
    """
)

display(spark.table(silver_table).limit(10))
print("Silver rows:", spark.table(silver_table).count())


event_id,member_id,provider_code,event_date,charge_amount,sex,smoker,region,region_key,provider_key,network_tier,preferred_provider,date_key,fact_key
E000893,893,P002,2025-06-13,10422.91665,male,yes,southeast,3,2,Silver,Cville Care Network,,604327308462913709
E000894,894,P003,2025-06-14,44202.6536,male,no,northeast,1,3,Bronze,Piedmont Clinic,,4976226016618428740
E000895,895,P001,2025-06-15,13555.0049,female,no,southwest,4,1,Gold,Blue Ridge Health,,1383589627973978027
E000896,896,P002,2025-06-16,13063.883,female,yes,northeast,1,2,Silver,Cville Care Network,,8655060690952290032
E000897,897,P003,2025-06-17,19798.05455,male,no,northwest,2,3,Bronze,Piedmont Clinic,,3179674761482581259
E000898,898,P001,2025-06-18,2221.56445,female,no,southeast,3,1,Gold,Blue Ridge Health,,-2696011124852635808
E000899,899,P002,2025-06-19,1634.5734,female,no,northwest,2,2,Silver,Cville Care Network,,-6317596956023600576
E000900,900,P003,2025-06-20,2117.33885,male,no,northeast,1,3,Bronze,Piedmont Clinic,,1487743712675079188
E000901,901,P001,2025-06-21,8688.85885,male,yes,southeast,3,1,Gold,Blue Ridge Health,,4721398522253390005
E000902,902,P002,2025-06-22,48673.5588,male,no,northeast,1,2,Silver,Cville Care Network,,6853149787595284017


Silver rows: 2676


## 6) Gold: business-value aggregates

In [0]:

gold_region_smoker = "gold_charges_by_region_smoker"
gold_provider_month = "gold_charges_by_provider_month"

spark.sql(f"DROP TABLE IF EXISTS {gold_region_smoker}")
spark.sql(f"DROP TABLE IF EXISTS {gold_provider_month}")
# dbutils.fs.rm(f"{GOLD_PATH}/region_smoker", True)
# dbutils.fs.rm(f"{GOLD_PATH}/provider_month", True)

silver_static = spark.table("fact_insurance_silver")

# Charges by region + smoker
gold1 = (
    silver_static
    .join(spark.table("dim_region"), "region_key", "left")
    .groupBy("region_name", "smoker")
    .agg(
        F.count("*").alias("num_events"),
        F.sum("charge_amount").alias("total_charges"),
        F.avg("charge_amount").alias("avg_charges")
    )
    .orderBy(F.desc("total_charges"))
)

# Charges by provider + month
gold2 = (
    silver_static
    .withColumn("event_month", F.date_format(F.col("event_date"), "yyyy-MM"))
    .groupBy("preferred_provider", "network_tier", "event_month")
    .agg(
        F.count("*").alias("num_events"),
        F.sum("charge_amount").alias("total_charges"),
        F.avg("charge_amount").alias("avg_charges"),
    )
    .orderBy(F.col("event_month"), F.desc("total_charges"))
)

gold1.write.format("delta").mode("overwrite").saveAsTable(gold_region_smoker)
gold2.write.format("delta").mode("overwrite").saveAsTable(gold_provider_month)

display(spark.table(gold_region_smoker))
display(spark.table(gold_provider_month))




region_name,smoker,num_events,total_charges,avg_charges
northwest,no,534,7813051.834039993,14631.18320981272
southwest,no,534,6935460.56154,12987.753860561796
northeast,no,514,6754114.817737989,13140.30120182488
southeast,no,546,6671444.674719996,12218.763140512812
southeast,yes,182,2497127.3158,13720.479757142855
northeast,yes,134,1743404.0156400006,13010.47772865672
northwest,yes,116,1674557.6644,14435.84193448276
southwest,yes,114,1364206.3770399997,11966.722605614032
,,2,58282.7206,29141.3603


preferred_provider,network_tier,event_month,num_events,total_charges,avg_charges
Cville Care Network,Silver,2025-01,82,1308240.2286200002,15954.149129512198
Blue Ridge Health,Gold,2025-01,82,1164428.88486,14200.352254390244
Piedmont Clinic,Bronze,2025-01,82,1129750.4702800002,13777.444759512198
Blue Ridge Health,Gold,2025-02,76,1250755.2810000002,16457.306328947372
Piedmont Clinic,Bronze,2025-02,74,935656.74168,12644.010022702703
Cville Care Network,Silver,2025-02,74,867226.4632200002,11719.276530000005
Blue Ridge Health,Gold,2025-03,82,902269.845,11003.290792682928
Cville Care Network,Silver,2025-03,84,868074.5617380001,10334.22097307143
Piedmont Clinic,Bronze,2025-03,82,806453.1195400005,9834.794140731712
Cville Care Network,Silver,2025-04,80,1145087.28904,14313.591113


## 7) Batch incremental load 

**batch incremental** append to Bronze, then refresh Silver/Gold in batch.

In [0]:
from pyspark.sql import functions as F

# Fully-qualified tables
bronze_table = "workspace.ds2002_project2.fact_insurance_bronze"
silver_table = "workspace.ds2002_project2.fact_insurance_silver"

gold_region_smoker = "gold_charges_by_region_smoker"
gold_provider_month = "gold_charges_by_provider_month"

# BATCH incremental append to BRONZE
fact_source = spark.table(bronze_table)

incremental = (
    fact_source
    .orderBy(F.desc("member_id"))
    .limit(25)
    .withColumn("event_id", F.concat(F.lit("INC_"), F.col("event_id")))
)

incremental.write.mode("append").saveAsTable(bronze_table)
print("Bronze rows after incremental append:", spark.table(bronze_table).count())

# BATCH refresh SILVER (rebuild)
b = spark.table(bronze_table).alias("b")
m = spark.table("workspace.ds2002_project2.dim_member").alias("m")
p = spark.table("workspace.ds2002_project2.dim_provider").alias("p")
d = spark.table("workspace.ds2002_project2.dim_date").alias("d")

silver_rebuild = (
    b
    .withColumn("member_id_long", F.col("member_id").cast("bigint"))
    .withColumn("event_date_dt", F.to_date("event_date"))
    .withColumn("charge_amount_dbl", F.col("charge_amount").cast("double"))
    .join(m, F.col("member_id_long") == F.col("m.member_id"), "left")
    .join(p, F.col("b.provider_code") == F.col("p.provider_code"), "left")
    .join(d, F.col("event_date_dt") == F.col("d.Date"), "left")
    .select(
        F.col("b.event_id").alias("event_id"),
        F.col("member_id_long").alias("member_id"),
        F.col("b.provider_code").alias("provider_code"),
        F.col("event_date_dt").alias("event_date"),
        F.col("charge_amount_dbl").alias("charge_amount"),

        F.col("m.sex").alias("sex"),
        F.col("m.smoker").alias("smoker"),
        F.col("m.region").alias("region"),
        F.col("m.region_key").alias("region_key"),

        F.col("p.provider_key").alias("provider_key"),
        F.col("p.network_tier").alias("network_tier"),
        F.col("p.preferred_provider").alias("preferred_provider"),

        F.col("d.DateKey").alias("date_key"),
    )
    .withColumn("fact_key", F.xxhash64("event_id"))
)

(silver_rebuild.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable(silver_table)
)

print("Silver rows after rebuild:", spark.table(silver_table).count())

# BATCH refresh GOLD (overwrite results)
spark.sql(f"""
INSERT OVERWRITE TABLE {gold_region_smoker}
SELECT
  r.region_name,
  s.smoker,
  COUNT(*) AS num_events,
  SUM(s.charge_amount) AS total_charges,
  AVG(s.charge_amount) AS avg_charges
FROM {silver_table} s
LEFT JOIN workspace.ds2002_project2.dim_region r
  ON s.region_key = r.region_key
GROUP BY r.region_name, s.smoker
""")

spark.sql(f"""
INSERT OVERWRITE TABLE {gold_provider_month}
SELECT
  s.preferred_provider,
  s.network_tier,
  date_format(s.event_date, 'yyyy-MM') AS event_month,
  COUNT(*) AS num_events,
  SUM(s.charge_amount) AS total_charges,
  AVG(s.charge_amount) AS avg_charges
FROM {silver_table} s
GROUP BY s.preferred_provider, s.network_tier, date_format(s.event_date, 'yyyy-MM')
""")

display(spark.table(gold_region_smoker))
display(spark.table(gold_provider_month))


Bronze rows after incremental append: 2726
Silver rows after rebuild: 2726


region_name,smoker,num_events,total_charges,avg_charges
,,2,58282.7206,29141.3603
southwest,no,544,7026812.91994,12916.935514595589
southeast,yes,182,2497127.3158,13720.479757142855
southwest,yes,116,1409031.6740399995,12146.824776206891
northwest,no,540,7852362.334639992,14541.4117308148
northwest,yes,122,1707464.4681999995,13995.610395081963
northeast,no,529,6971063.604887991,13177.813997897903
southeast,no,555,6746311.078719994,12155.515457153142
northeast,yes,136,1752441.6681400004,12885.600501029416


preferred_provider,network_tier,event_month,num_events,total_charges,avg_charges
Cville Care Network,Silver,2025-03,84,868074.5617380001,10334.22097307143
Cville Care Network,Silver,2025-01,82,1308240.2286200002,15954.149129512198
Blue Ridge Health,Gold,2025-04,80,1095617.85632,13695.223204
Piedmont Clinic,Bronze,2025-07,82,988865.9144,12059.340419512191
Cville Care Network,Silver,2025-06,80,1139985.4073,14249.81759125
Piedmont Clinic,Bronze,2025-02,74,935656.74168,12644.010022702703
Cville Care Network,Silver,2025-05,82,907189.0781600004,11063.281440975614
Blue Ridge Health,Gold,2025-08,82,965288.95252,11771.81649414634
Piedmont Clinic,Bronze,2025-05,84,1207208.99974,14371.535711190478
Cville Care Network,Silver,2025-04,80,1145087.28904,14313.591113


## 8) Final validation queries (joins 3+ tables with aggregation)

Demonstrate the business value of the solution.

In [0]:
q1 = """
SELECT
  p.preferred_provider,
  p.network_tier,
  r.region_name,
  COUNT(*) AS num_events,
  ROUND(SUM(CAST(s.charge_amount AS DOUBLE)), 2) AS total_charges,
  ROUND(AVG(CAST(s.charge_amount AS DOUBLE)), 2) AS avg_charges
FROM fact_insurance_silver s
JOIN dim_provider p
  ON s.provider_code = p.provider_code
JOIN dim_region r
  ON s.region_key = r.region_key
WHERE
  s.provider_code IS NOT NULL
  AND s.region_key IS NOT NULL
  AND p.preferred_provider IS NOT NULL
  AND r.region_name IS NOT NULL
GROUP BY p.preferred_provider, p.network_tier, r.region_name
ORDER BY total_charges DESC
"""
display(spark.sql(q1))


q2 = """
SELECT
  date_format(to_date(event_date), 'yyyy-MM') AS event_month,
  smoker,
  ROUND(AVG(CAST(charge_amount AS DOUBLE)), 2) AS avg_charges
FROM fact_insurance_silver
GROUP BY date_format(to_date(event_date), 'yyyy-MM'), smoker
ORDER BY event_month
"""
display(spark.sql(q2))


preferred_provider,network_tier,region_name,num_events,total_charges,avg_charges
Cville Care Network,Silver,northwest,246,3568472.35,14505.99
Piedmont Clinic,Bronze,southwest,242,3433634.63,14188.57
Cville Care Network,Silver,northeast,232,3229160.2,13918.79
Cville Care Network,Silver,southeast,238,3165720.67,13301.35
Blue Ridge Health,Gold,northwest,208,3099405.49,14900.99
Blue Ridge Health,Gold,southeast,270,3082001.52,11414.82
Piedmont Clinic,Bronze,northeast,230,3031863.74,13182.02
Piedmont Clinic,Bronze,southeast,229,2995716.2,13081.73
Piedmont Clinic,Bronze,northwest,208,2891948.97,13903.6
Blue Ridge Health,Gold,southwest,226,2683163.06,11872.4


event_month,smoker,avg_charges
2025-01,no,15102.49
2025-01,yes,13222.59
2025-02,yes,16543.85
2025-02,no,12879.9
2025-03,yes,11264.72
2025-03,no,10212.04
2025-04,yes,17889.01
2025-04,no,12743.3
2025-05,yes,13537.89
2025-05,no,12583.19
