In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col, expr, current_timestamp, to_timestamp, sha2, concat_ws, coalesce, monotonically_increasing_id
from delta.tables import DeltaTable
from pyspark.sql import Window

#ADLS configuration 
# spark.conf.set(
#   "fs.azure.account.key.<<Storageaccount_name>>.dfs.core.windows.net",
#   "<<Storage_Account_access_key>>"
# )

spark.conf.set(
    "fs.azure.account.key.storageaccounthealthcare.dfs.core.windows.net",
    dbutils.secrets.get(scope="healthcarekeyvaultscope", key="storage-connection")
)

# Paths
silver_path = "abfss://silver@storageaccounthealthcare.dfs.core.windows.net/patient_flow"
gold_dim_patient = "abfss://gold@storageaccounthealthcare.dfs.core.windows.net/patient"
gold_dim_department = "abfss://gold@storageaccounthealthcare.dfs.core.windows.net/department"
gold_fact = "abfss://gold@storageaccounthealthcare.dfs.core.windows.net/fact"

# # Paths
# silver_path = "abfss://<<container>>@<<Storageaccount_name>>.core.windows.net/<<path>>"
# gold_dim_patient = "abfss://<<container>>@<<Storageaccount_name>>.core.windows.net/<<path>>"
# gold_dim_department = "abfss://<<container>>@<<Storageaccount_name>>.core.windows.net/<<path>>"
# gold_fact = "abfss://<<container>>@<<Storageaccount_name>>.core.windows.net/<<path>>"

# Read silver data (assume append-only)
silver_df = spark.read.format("delta").load(silver_path)

# Define window for latest admission per patient
w = Window.partitionBy("patient_id").orderBy(F.col("admission_time").desc())

silver_df = (
    silver_df
    .withColumn("row_num", F.row_number().over(w))  # Rank by latest admission_time
    .filter(F.col("row_num") == 1)                  # Keep only latest row
    .drop("row_num")
)

#Patient Dimension Table Creation
# Prepare incoming dimension records (deduplicated per patient, latest record)
incoming_patient = (silver_df
                    .select("patient_id", "gender", "age")
                    .withColumn("effective_from", current_timestamp())
                   )

# Create target if not exists
if not DeltaTable.isDeltaTable(spark, gold_dim_patient):
    # initialize table with schema and empty data
    incoming_patient.withColumn("surrogate_key", F.monotonically_increasing_id()) \
                    .withColumn("effective_to", lit(None).cast("timestamp")) \
                    .withColumn("is_current", lit(True)) \
                    .write.format("delta").mode("overwrite").save(gold_dim_patient)

# Load target as DeltaTable
target_patient = DeltaTable.forPath(spark, gold_dim_patient)

# Create an expression to detect attribute changes (hash or explicit comparisons)
# We'll use a simple concat hash to detect changes
incoming_patient = incoming_patient.withColumn(
    "_hash",
    F.sha2(F.concat_ws("||", F.coalesce(col("gender"), lit("NA")), F.coalesce(col("age").cast("string"), lit("NA"))), 256)
)

# Bring target current hash
target_patient_df = spark.read.format("delta").load(gold_dim_patient).withColumn(
    "_target_hash",
    F.sha2(F.concat_ws("||", F.coalesce(col("gender"), lit("NA")), F.coalesce(col("age").cast("string"), lit("NA"))), 256)
).select("surrogate_key", "patient_id", "gender", "age", "is_current", "_target_hash", "effective_from", "effective_to")

# Create temp views for merge
incoming_patient.createOrReplaceTempView("incoming_patient_tmp")
target_patient_df.createOrReplaceTempView("target_patient_tmp")

# We'll implement in two steps using Delta MERGE (safe & explicit)

# 1) Mark old current rows as not current where changed
changes_df = spark.sql("""
SELECT t.surrogate_key, t.patient_id
FROM target_patient_tmp t
JOIN incoming_patient_tmp i
  ON t.patient_id = i.patient_id
WHERE t.is_current = true AND t._target_hash <> i._hash
""")

changed_keys = [row['surrogate_key'] for row in changes_df.collect()]

if changed_keys:
    # Update existing current records: set is_current=false and effective_to=current_timestamp()
    target_patient.update(
        condition = expr("is_current = true AND surrogate_key IN ({})".format(",".join([str(k) for k in changed_keys]))),
        set = {
            "is_current": expr("false"),
            "effective_to": expr("current_timestamp()")
        }
    )

# 2) Insert new rows for changed & new records
# Build insert DF: join incoming with target to figure new inserts where either not exists or changed
inserts_df = spark.sql("""
SELECT i.patient_id, i.gender, i.age, i.effective_from, i._hash
FROM incoming_patient_tmp i
LEFT JOIN target_patient_tmp t
  ON i.patient_id = t.patient_id AND t.is_current = true
WHERE t.patient_id IS NULL OR t._target_hash <> i._hash
""").withColumn("surrogate_key", F.monotonically_increasing_id()) \
  .withColumn("effective_to", lit(None).cast("timestamp")) \
  .withColumn("is_current", lit(True)) \
  .select("surrogate_key", "patient_id", "gender", "age", "effective_from", "effective_to", "is_current")

# Append new rows
if inserts_df.count() > 0:
    inserts_df.write.format("delta").mode("append").save(gold_dim_patient)



# Department Dimension Table Creation

# prepare incoming (latest per patient feed snapshot)
incoming_dept = (silver_df
                 .select("department", "hospital_id")
                )

# add hash and dedupe incoming (one row per natural key)
incoming_dept = incoming_dept.dropDuplicates(["department", "hospital_id"]) \
    .withColumn("surrogate_key", monotonically_increasing_id())

# initialize table if missing

incoming_dept.select("surrogate_key", "department", "hospital_id") \
    .write.format("delta").mode("overwrite").save(gold_dim_department)



# Create Fact table

# Read current dims (filter is_current=true)
dim_patient_df = (spark.read.format("delta").load(gold_dim_patient)
                  .filter(col("is_current") == True)
                  .select(col("surrogate_key").alias("surrogate_key_patient"), "patient_id", "gender", "age"))

dim_dept_df = (spark.read.format("delta").load(gold_dim_department)
               .select(col("surrogate_key").alias("surrogate_key_dept"), "department", "hospital_id"))

# Build base fact from silver events
fact_base = (silver_df
             .select("patient_id", "department", "hospital_id", "admission_time", "discharge_time", "bed_id")
             .withColumn("admission_date", F.to_date("admission_time"))
            )

# Join to get surrogate keys
fact_enriched = (fact_base
                 .join(dim_patient_df, on="patient_id", how="left")
                 .join(dim_dept_df, on=["department", "hospital_id"], how="left")
                )

# Compute metrics
fact_enriched = fact_enriched.withColumn("length_of_stay_hours",
                                         (F.unix_timestamp(col("discharge_time")) - F.unix_timestamp(col("admission_time"))) / 3600.0) \
                             .withColumn("is_currently_admitted", F.when(col("discharge_time") > current_timestamp(), lit(True)).otherwise(lit(False))) \
                             .withColumn("event_ingestion_time", current_timestamp())

# Let's make column names explicit instead:
fact_final = fact_enriched.select(
    F.monotonically_increasing_id().alias("fact_id"),
    col("surrogate_key_patient").alias("patient_sk"),
    col("surrogate_key_dept").alias("department_sk"),
    "admission_time",
    "discharge_time",
    "admission_date",
    "length_of_stay_hours",
    "is_currently_admitted",
    "bed_id",
    "event_ingestion_time"
)

# Persist fact table partitioned by admission_date (helps Synapse / queries)
fact_final.write.format("delta").mode("overwrite").save(gold_fact)


# Quick sanity checks
print("Patient dim count:", spark.read.format("delta").load(gold_dim_patient).count())
print("Department dim count:", spark.read.format("delta").load(gold_dim_department).count())
print("Fact rows:", spark.read.format("delta").load(gold_fact).count())

Patient dim count: 113
Department dim count: 46
Fact rows: 113


In [0]:
display(spark.read.format('delta').load(gold_dim_patient))

patient_id,gender,age,effective_from,surrogate_key,effective_to,is_current
02c0743c-c50b-4974-8c80-7cddab86c979,Female,69,2026-01-02T03:47:03.909975Z,0,,True
05779118-cd25-42d5-8c75-5aa4e0f966c6,Female,19,2026-01-02T03:47:03.909975Z,1,,True
080cad4d-1237-4826-8e22-74b0a3930290,Female,58,2026-01-02T03:47:03.909975Z,2,,True
0fb2fa94-0015-4f51-8afe-7841af3552a5,Female,48,2026-01-02T03:47:03.909975Z,3,,True
16231a2a-cbea-4ed0-b9f9-9b7a6429d22d,Male,58,2026-01-02T03:47:03.909975Z,4,,True
168cef76-cb2b-4a52-95bf-d317cebf98f4,Male,18,2026-01-02T03:47:03.909975Z,5,,True
16cf98f0-7002-47dc-96f7-3f17c3b3d4b3,Female,95,2026-01-02T03:47:03.909975Z,6,,True
199ddea0-c37d-40bd-9cfa-b4258248e355,Male,5,2026-01-02T03:47:03.909975Z,7,,True
2090b0d7-ae5a-4fe0-989b-b060210f0598,Female,5,2026-01-02T03:47:03.909975Z,8,,True
2488dec3-a16f-487f-8d5c-24f08e51fb84,Female,64,2026-01-02T03:47:03.909975Z,9,,True


In [0]:
display(spark.read.format('delta').load(gold_dim_department))

surrogate_key,department,hospital_id
0,Pediatrics,5
1,Surgery,3
2,Surgery,2
3,ICU,3
4,Emergency,7
5,Cardiology,1
6,Surgery,1
7,Oncology,3
8,Emergency,6
9,Maternity,3


In [0]:
display(spark.read.format('delta').load(gold_fact))

fact_id,patient_sk,department_sk,admission_time,discharge_time,admission_date,length_of_stay_hours,is_currently_admitted,bed_id,event_ingestion_time
0,0,18,2026-01-01T16:25:46.448267Z,2026-01-03T19:25:46.448267Z,2026-01-01,51.0,True,394,2026-01-02T03:47:15.093813Z
1,1,33,2026-01-01T12:24:42.146344Z,2026-01-03T05:24:42.146344Z,2026-01-01,41.0,True,79,2026-01-02T03:47:15.093813Z
2,2,20,2025-12-31T18:25:20.329609Z,2026-01-01T22:25:20.329609Z,2025-12-31,28.0,False,392,2026-01-02T03:47:15.093813Z
3,3,23,2026-01-01T17:24:58.229227Z,2026-01-01T18:24:58.229227Z,2026-01-01,1.0,False,446,2026-01-02T03:47:15.093813Z
4,4,17,2025-12-31T21:24:48.174122Z,2026-01-02T11:24:48.174122Z,2025-12-31,38.0,True,245,2026-01-02T03:47:15.093813Z
5,5,13,2026-01-01T15:25:15.310745Z,2026-01-03T06:25:15.310745Z,2026-01-01,39.0,True,370,2026-01-02T03:47:15.093813Z
6,6,34,2025-12-30T11:24:27.078126Z,2025-12-31T03:24:27.078126Z,2025-12-30,16.0,False,9,2026-01-02T03:47:15.093813Z
7,7,9,2025-12-31T07:25:03.255466Z,2026-01-03T05:25:03.255466Z,2025-12-31,70.0,True,36,2026-01-02T03:47:15.093813Z
8,8,23,2025-12-31T08:26:04.522534Z,2025-12-31T13:26:04.522534Z,2025-12-31,5.0,False,411,2026-01-02T03:47:15.093813Z
9,9,38,2026-01-01T02:24:59.234817Z,2026-01-02T06:24:59.234817Z,2026-01-01,28.0,True,206,2026-01-02T03:47:15.093813Z
