In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col, expr, current_timestamp, to_timestamp, sha2, concat_ws, coalesce, monotonically_increasing_id
from delta.tables import DeltaTable
from pyspark.sql import Window

In [0]:
spark.conf.set(
    "fs.azure.account.key.hospitaldatastorag.dfs.core.windows.net",
    dbutils.secrets.get(scope="hospitalanalyticsvaultscope",key="storageacct")
)

silver_path = "abfss://silver@hospitaldatastorag.dfs.core.windows.net/patient_flow"
gold_dim_patient = "abfss://gold@hospitaldatastorag.dfs.core.windows.net/dim_patient"
gold_dim_dept = "abfss://gold@hospitaldatastorag.dfs.core.windows.net/dim_department"
gold_fact ="abfss://gold@hospitaldatastorag.dfs.core.windows.net/fact_patient_flow"

In [0]:
# read the silverlayer data
silver_df = spark.read.format("delta").load(silver_path)

#use window function to filter the latest admission time of patients
w = Window.partitionBy("patient_id").orderBy(col("admission_time").desc())

# Keep only the latest data
silver_df = silver_df.withColumn("row_num",F.row_number().over(w)).filter(col("row_num") ==1).drop("row_num")

In [0]:
# Create Patient Dimension table
#prepare incoming patient records
incoming_patient = (silver_df.select("patient_id","gender","age").withColumn("effective_from",F.current_timestamp())
                                                                .withColumn("_hash",F.sha2(F.concat_ws("||",F.coalesce(col("gender"),lit("NA")),F.coalesce(col("age").cast("String"),lit("NA"))),256))
                                                                .withColumn("surrogate_key", monotonically_increasing_id())  # ✅ always added
                                                                .withColumn("effective_to", lit(None).cast("timestamp"))
                                                                .withColumn("is_current", lit(True))
                                                                   )
if not DeltaTable.isDeltaTable(spark,gold_dim_patient):
    incoming_patient.write.format("delta").mode("overwrite")\
                    .save(gold_dim_patient)                                                                           

target_patient = DeltaTable.forPath(spark,gold_dim_patient) 


In [0]:
(
target_patient.alias("t").merge(incoming_patient.alias("i"),"t.patient_id==i.patient_id").whenMatchedUpdate(
    condition="t.is_current=True AND t._hash <> i._hash",
    set={
        "is_current":"false",
        "effective_to":"current_timestamp()"
    }
).whenNotMatchedInsert(
    values={
        "surrogate_key": col("i.surrogate_key"),
            "patient_id": col("i.patient_id"),
            "gender": col("i.gender"),
            "age": col("i.age"),
            "effective_from": col("i.effective_from"),
            "effective_to": col("i.effective_to"),
            "is_current": col("i.is_current"),
            "_hash": col("i._hash")
    }
).execute()
)

In [0]:

incoming_department = (silver_df.select("department","hospital_id")
                               .dropDuplicates(["department","hospital_id"])
                               .withColumn("surrogate_key",F.monotonically_increasing_id())
)

#Department Dimension
if not DeltaTable.isDeltaTable(spark, gold_dim_dept):
    incoming_department.write.format("delta").mode("overwrite").save(gold_dim_dept)



dim_department_df = (spark.read.format("delta").load(gold_dim_dept).select(F.col("surrogate_key").alias("department_sk"),"department","hospital_id"))


In [0]:
#Fact Table
dim_patient_df = spark.read.format("delta").load(gold_dim_patient).filter(F.col("is_current")=="True").select(F.col("surrogate_key").alias("patient_sk"),"patient_id")

In [0]:
fact_base = silver_df.select("patient_id","department","hospital_id","admission_time","discharge_time","bed_id")\
                     .withColumn("admission_date",F.to_date("admission_time"))

fact_enriched = fact_base.join(dim_patient_df,"patient_id","left").join(dim_department_df,["department","hospital_id"],"left")\
                         .withColumn("length_of_stay",(F.unix_timestamp("discharge_time")-F.unix_timestamp("admission_time"))/3600)\
                         .withColumn("is_currently_admitted",F.when(col("discharge_time")>current_timestamp(),F.lit("False")).otherwise(F.lit("True"))).withColumn("event_ingestion_time",F.current_timestamp())

In [0]:
fact_final = fact_enriched.select(F.monotonically_increasing_id().alias("fact_id"),
                                  "patient_sk",
                                  "department_sk",
                                  "admission_time",
                                  "discharge_time",
                                  "admission_date",
                                  "length_of_stay",
                                  "is_currently_admitted",
                                  "bed_id",
                                  "event_ingestion_time"
                                  )                            

fact_final.write.format("delta").mode("overwrite").partitionBy("admission_date").save(gold_fact)

In [0]:
print("gold_dim_patient",spark.read.format("delta").load(gold_dim_patient).count())
print("gold_dim_department",spark.read.format("delta").load(gold_dim_dept).count())
print("gold_fact",spark.read.format("delta").load(gold_fact).count())

gold_dim_patient 1210
gold_dim_department 49
gold_fact 1210
