In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import (
    lit, col, expr, current_timestamp, to_timestamp,
    sha2, concat_ws, coalesce, to_date, monotonically_increasing_id, unix_timestamp
)
from pyspark.sql import Window
from delta.tables import DeltaTable

# Use key-based access to Azure Storage
spark.conf.set(
    "fs.azure.account.key.hospitalstorage224.dfs.core.windows.net",
    dbutils.secrets.get(scope="hospitalAnalyticsVaultScope", key="storage-connection")
)

# Paths
silver_path = "abfss://silver@hospitalstorage224.dfs.core.windows.net/patient_flow"
gold_fact = "abfss://gold@hospitalstorage224.dfs.core.windows.net/patient_flow/fact"
gold_dim_patient = "abfss://gold@hospitalstorage224.dfs.core.windows.net/patient_flow/dim_patient"
gold_dim_department = "abfss://gold@hospitalstorage224.dfs.core.windows.net/patient_flow/dim_department"

# Read silver data (assumed append-only)
silver_df = spark.read.format("delta").load(silver_path)

# Define window for latest admission per patient
windowSpec = Window.partitionBy("patient_id").orderBy(F.col("admission_time").desc())

# Keep only latest record per patient
silver_df = (
    silver_df
    .withColumn("row_num", F.row_number().over(windowSpec))
    .filter(F.col("row_num") == 1)
    .drop("row_num")
)


In [0]:
# Patient Dimension Table Creation
# Prepare incoming dimension records (deduplicated per patient, latest record)
incoming_patient = (
    silver_df
    .select("patient_id", "gender", "age")
    .withColumn("effective_from", current_timestamp())
)

# Create target if not exists
if not DeltaTable.isDeltaTable(spark, gold_dim_patient):
    (
        incoming_patient
        .withColumn("surrogate_key", monotonically_increasing_id())
        .withColumn("effective_to", lit(None).cast("timestamp"))
        .withColumn("is_current", lit(True))
        .write
        .format("delta")
        .mode("overwrite")
        .save(gold_dim_patient)
    )

# Load target as DeltaTable
target_patient = DeltaTable.forPath(spark, gold_dim_patient)

# Hash for change detection
incoming_patient = incoming_patient.withColumn(
    "_hash",
    sha2(
        concat_ws(
            "||",
            coalesce(col("gender"), lit("NA")),
            coalesce(col("age").cast("string"), lit("NA"))
        ),
        256
    )
)

# Bring target current hash
target_patient_df = (
    spark.read.format("delta").load(gold_dim_patient)
    .withColumn(
        "_target_hash",
        sha2(
            concat_ws(
                "||",
                coalesce(col("gender"), lit("NA")),
                coalesce(col("age").cast("string"), lit("NA"))
            ),
            256
        )
    )
    .select(
        "surrogate_key",
        "patient_id",
        "gender",
        "age",
        "is_current",
        "_target_hash",
        "effective_from",
        "effective_to",
    )
)

# Temp views
incoming_patient.createOrReplaceTempView("incoming_patient_tmp")
target_patient_df.createOrReplaceTempView("target_patient_tmp")

# 1 - Find changed rows (current rows where attributes changed)
changes_df = spark.sql("""
    SELECT t.surrogate_key, t.patient_id
    FROM target_patient_tmp t
    JOIN incoming_patient_tmp i
      ON t.patient_id = i.patient_id
    WHERE t.is_current = true
      AND t._target_hash != i._hash
""")

changed_keys = [r["surrogate_key"] for r in changes_df.collect()]

if changed_keys:
    # Update existing current records: set is_current=false and effective_to=current_timestamp
    target_patient.update(
        condition=expr(
            "is_current = true AND surrogate_key IN ({})"
            .format(",".join(map(str, changed_keys)))
        ),
        set={
            "is_current": lit(False),
            "effective_to": current_timestamp()
        }
    )

# 2 - Build insert df: new + changed rows
inserts_df = (
    spark.sql("""
        SELECT i.patient_id, i.gender, i.age, i.effective_from, i._hash
        FROM incoming_patient_tmp i
        LEFT JOIN target_patient_tmp t
          ON i.patient_id = t.patient_id
         AND t.is_current = true
        WHERE t.patient_id IS NULL
           OR t._target_hash != i._hash
    """)
    .withColumn("surrogate_key", monotonically_increasing_id())
    .withColumn("effective_to", lit(None).cast("timestamp"))
    .withColumn("is_current", lit(True))
    .select(
        "surrogate_key",
        "patient_id",
        "gender",
        "age",
        "effective_from",
        "effective_to",
        "is_current"
    )
)

# Append new rows
if inserts_df.count() > 0:
    (
        inserts_df.write
        .format("delta")
        .mode("append")
        .save(gold_dim_patient)
    )


In [0]:
# Department Dimension table Creation

# Prepare incoming (latest per patient feed snapshot)
incoming_dept = silver_df.select("department", "hospital_id")

# Add hash and dedupe incoming (one row per natural key)
incoming_dept = (
    incoming_dept
        .dropDuplicates(["department", "hospital_id"])  # fixed: dropDulplicates -> dropDuplicates
        .withColumn("surrogate_key", monotonically_increasing_id())
)

# Initialize table (this will overwrite every time; add an existence check if needed)
(
    incoming_dept
        .select("surrogate_key", "department", "hospital_id")
        .write
        .format("delta")
        .mode("overwrite")
        .save(gold_dim_department)
)

In [0]:
# Create Fact Table

# Read current dims (filter is_current=true)
dim_patient_df = (
    spark.read.format("delta").load(gold_dim_patient)
    .filter(col("is_current") == True)
    .select(
        col("surrogate_key").alias("surrogate_key_patient"),
        "patient_id",
        "gender",
        "age"
    )
)

dim_dept_df = (
    spark.read.format("delta").load(gold_dim_department)
    .select(
        col("surrogate_key").alias("surrogate_key_dept"),
        "department",
        "hospital_id"
    )
)

# Build base fact from silver events
fact_base = (
    silver_df
    .select(
        "patient_id",
        "department",
        "hospital_id",
        "admission_time",
        "discharge_time",
        "bed_id"
    )
    .withColumn("admission_date", to_date(col("admission_time")))
)

# Join to get surrogate keys
fact_enriched = (
    fact_base
    .join(dim_patient_df, on="patient_id", how="left")
    .join(dim_dept_df, on=["department", "hospital_id"], how="left")
)

# Compute metrics
fact_enriched = (
    fact_enriched
    .withColumn(
        "length_of_stay_hours",
        (unix_timestamp(col("discharge_time")) - unix_timestamp(col("admission_time"))) / 3600.0
    )
    .withColumn(
        "is_currently_admitted",
        F.when(col("discharge_time") > current_timestamp(), lit(True)).otherwise(lit(False))
    )
    .withColumn("event_ingestion_time", current_timestamp())
)

# Make column names explicit
fact_final = fact_enriched.select(
    monotonically_increasing_id().alias("fact_id"),
    col("surrogate_key_patient").alias("patient_sk"),
    col("surrogate_key_dept").alias("department_sk"),
    col("admission_time"),
    col("discharge_time"),
    col("admission_date"),
    col("length_of_stay_hours"),
    col("is_currently_admitted"),
    col("bed_id"),
    col("event_ingestion_time")
)

# Persist fact table (consider partitionBy if you want partitioning)
fact_final.write.format("delta").mode("overwrite").save(gold_fact)


In [0]:
# Quick sanity checks
print("Patient dim count:", spark.read.format("delta").load(gold_dim_patient).count())
print("Department dim count:", spark.read.format("delta").load(gold_dim_department).count())
print("Fact rows:", spark.read.format("delta").load(gold_fact).count())


Patient dim count: 3259
Department dim count: 56
Fact rows: 3259


In [0]:
display(spark.read.format('delta').load(gold_dim_patient).limit(10))

patient_id,gender,age,effective_from,surrogate_key,effective_to,is_current
00139842-ee7b-4a2b-a81b-23166b0b445c,Male,21,2025-12-15T08:43:48.346982Z,0,,True
002073bb-fcfc-48c0-b057-238c4da94336,Female,59,2025-12-15T08:43:48.346982Z,1,,True
00324629-286f-44cf-9603-9cccc87cfc96,Male,19,2025-12-15T08:43:48.346982Z,2,,True
0039037f-b8aa-4223-8c84-afe6939669aa,Female,75,2025-12-15T08:43:48.346982Z,3,,True
004e59d6-e09d-44f3-a3b0-c207676e1d5b,Female,75,2025-12-15T08:43:48.346982Z,4,,True
0061ea5c-6064-44e5-a24b-2db3f36a7335,Male,46,2025-12-15T08:43:48.346982Z,5,,True
00863f05-e345-44b6-9381-8f625f2ac6a7,Female,54,2025-12-15T08:43:48.346982Z,6,,True
00e2aef9-c091-4ef8-96e7-be25198ea219,Other,8,2025-12-15T08:43:48.346982Z,7,,True
00fd7005-7098-46e1-810e-964cda769705,Other,38,2025-12-15T08:43:48.346982Z,8,,True
012b0d8f-40ed-4049-a0c3-b8d1f083792c,Male,59,2025-12-15T08:43:48.346982Z,9,,True


In [0]:
display(spark.read.format('delta').load(gold_dim_patient).limit(10))

patient_id,gender,age,effective_from,surrogate_key,effective_to,is_current
00139842-ee7b-4a2b-a81b-23166b0b445c,Male,21,2025-12-15T08:43:48.346982Z,0,,True
002073bb-fcfc-48c0-b057-238c4da94336,Female,59,2025-12-15T08:43:48.346982Z,1,,True
00324629-286f-44cf-9603-9cccc87cfc96,Male,19,2025-12-15T08:43:48.346982Z,2,,True
0039037f-b8aa-4223-8c84-afe6939669aa,Female,75,2025-12-15T08:43:48.346982Z,3,,True
004e59d6-e09d-44f3-a3b0-c207676e1d5b,Female,75,2025-12-15T08:43:48.346982Z,4,,True
0061ea5c-6064-44e5-a24b-2db3f36a7335,Male,46,2025-12-15T08:43:48.346982Z,5,,True
00863f05-e345-44b6-9381-8f625f2ac6a7,Female,54,2025-12-15T08:43:48.346982Z,6,,True
00e2aef9-c091-4ef8-96e7-be25198ea219,Other,8,2025-12-15T08:43:48.346982Z,7,,True
00fd7005-7098-46e1-810e-964cda769705,Other,38,2025-12-15T08:43:48.346982Z,8,,True
012b0d8f-40ed-4049-a0c3-b8d1f083792c,Male,59,2025-12-15T08:43:48.346982Z,9,,True


In [0]:
display(spark.read.format('delta').load(gold_fact).limit(10))

fact_id,patient_sk,department_sk,admission_time,discharge_time,admission_date,length_of_stay_hours,is_currently_admitted,bed_id,event_ingestion_time
0,0,35,2025-12-11T11:35:11.662709,2025-12-11T17:35:11.662709Z,2025-12-11,,False,448,2025-12-16T18:31:01.150573Z
1,1,10,2025-12-14T12:06:54.302391,2025-12-16T04:06:54.302391Z,2025-12-14,,False,217,2025-12-16T18:31:01.150573Z
2,2,52,2025-12-12T20:22:45.815580,2025-12-15T08:22:45.81558Z,2025-12-12,,False,403,2025-12-16T18:31:01.150573Z
3,3,39,2025-12-13T12:29:34.193455,2025-12-14T18:29:34.193455Z,2025-12-13,,False,312,2025-12-16T18:31:01.150573Z
4,4,2,2025-12-13T02:37:21.073577,2025-12-13T07:37:21.073577Z,2025-12-13,,False,180,2025-12-16T18:31:01.150573Z
5,5,2,2025-12-14T16:32:19.505202,2025-12-17T11:32:19.505202Z,2025-12-14,,True,156,2025-12-16T18:31:01.150573Z
6,6,35,2025-12-11T20:05:36.014156,2025-12-13T20:05:36.014156Z,2025-12-11,,False,437,2025-12-16T18:31:01.150573Z
7,7,25,2025-12-13T19:04:40.838240,2025-12-15T10:04:40.83824Z,2025-12-13,,False,130,2025-12-16T18:31:01.150573Z
8,8,9,2025-12-13T21:09:24.949795,2025-12-14T04:09:24.949795Z,2025-12-13,,False,484,2025-12-16T18:31:01.150573Z
9,9,42,2025-12-12T19:46:13.472552,2025-12-15T12:46:13.472552Z,2025-12-12,,False,59,2025-12-16T18:31:01.150573Z
