In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# ADLS Configuration
spark.conf.set(
    "fs.azure.account.key.drhospitalanalytics.dfs.core.windows.net",
    "<<Storage account access key>>"
)

bronze_path="abfss://<<Container name>>@<<storage account name>>.dfs.core.windows.net/bronze/patient_flow"
silver_path="abfss://<<Container name>>@<<storage account name>>.dfs.core.windows.net/silver/patient_flow"

# read data from bronze
bronze_df=(
    spark.readStream.format("delta")
    .load(bronze_path)
)

#define schema
schema=StructType([
    StructField("patient_id",StringType()),
    StructField("gender",StringType()),
    StructField("age",IntegerType()),
    StructField("department",StringType()),
    StructField("admission_time",StringType()),
    StructField("discharge_time",StringType()),
    StructField("bed_id",IntegerType()),
    StructField("hospital_id",IntegerType())
])

# parse it to dataframe
parsed_df=bronze_df.withColumn("data",from_json(col("raw_json"),schema)).select("data.*")

# convert type to timestamp
clean_df=parsed_df.withColumn("admission_time",to_timestamp(col("admission_time")))
clean_df=clean_df.withColumn("discharge_time",to_timestamp(col("discharge_time")))

#invalid admission times
clean_df=clean_df.withColumn("admission_time",
                             when(
                                 col("admission_time").isNull() | (col("admission_time")>current_timestamp()),
                                 current_timestamp())
                             .otherwise(col("admission_time")))

#invalid discharge times
clean_df = clean_df.withColumn(
    "discharge_time",
    when(
        col("discharge_time") < col("admission_time"),
        col("admission_time") + expr("INTERVAL 1 HOUR")
    ).otherwise(col("discharge_time"))
)


# handle invaild age
clean_df = clean_df.withColumn(
    "age",
    when(col("age") > 100, floor(rand()*90 + 1).cast("int"))
    .otherwise(col("age"))
)

# schema evolution
excepted_column=["patient_id","gender","age","department","admission_time","discharge_time","bed_id","hospital_id"]

for col_names in excepted_column:
    if col_names not in clean_df.columns:
        clean_df=clean_df.withColumn(col_names,lit(None))

# writing data in to silver
(
    clean_df.writeStream
    .format("delta")
    .outputMode("append")
    .option("mergeSchema","true")
    .option("changeDataFeed","true")
    .option("checkpointLocation",silver_path+"/_checkpoint")
    .start(silver_path)
)





<pyspark.sql.connect.streaming.query.StreamingQuery at 0x7f9521f0e8d0>

In [0]:
display(spark.read.format("delta").load(silver_path))

patient_id,gender,age,department,admission_time,discharge_time,bed_id,hospital_id
8cbe4631-5d83-4648-902c-da0e37725a11,Male,86,Maternity,2026-02-07T00:47:40.833843Z,2026-02-08T16:47:40.833843Z,149,7
55c09bc7-140e-4939-98c7-5903478ba7bc,Female,41,Surgery,2026-02-07T19:19:28.807622Z,2026-02-08T03:19:28.807622Z,5,6
210b90d4-246d-4590-ae1f-980d8a55b29b,Male,22,Maternity,2026-02-06T05:00:29.53873Z,2026-02-07T19:00:29.53873Z,374,4
a69656b6-9ed1-4888-ac4f-1d9929f904d1,Male,14,Emergency,2026-02-06T15:15:45.952289Z,2026-02-09T08:15:45.952289Z,237,3
022b3a48-a7d0-495c-bfd2-d5303691a35b,Male,64,Maternity,2026-02-07T05:06:10.565949Z,2026-02-08T12:06:10.565949Z,397,1
1b7b6e56-1b37-4160-98c6-a2348e8a8205,Male,83,Maternity,2026-02-07T20:06:35.383959Z,2026-02-09T16:06:35.383959Z,408,4
f3f567c4-43b4-499a-bfdd-7a26c57d9729,Male,50,Emergency,2026-02-07T12:53:44.605997Z,2026-02-08T19:53:44.605997Z,294,5
8910c7da-5267-48c1-a717-8569ea593b42,Female,2,Oncology,2026-02-06T18:19:55.765451Z,2026-02-07T19:19:55.765451Z,100,3
9e58576f-8c79-4e87-90a4-be375b4050ce,Male,33,Maternity,2026-02-07T19:54:41.085255Z,2026-02-08T18:54:41.085255Z,386,2
8cafb902-447a-453c-91a5-aca37b00633f,Male,70,Maternity,2026-02-08T07:51:30.350606Z,2026-02-10T17:51:30.350606Z,341,3
