In [0]:
from pyspark.sql.functions import *
patients_bronze_df=spark.read.format('delta').load('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/bronze/patients')
patient_modify_df=patients_bronze_df.withColumn('file_name',col('_metadata.file_name'))
patient_modify_df=patient_modify_df.withColumn('load_time',col('_metadata.file_modification_time'))
patient_clean_df=patient_modify_df.withColumn('category',when((col('patient_id').isNotNull())&(col('name').isNotNull())&((col('age').isNotNull())&(col('age').between(0,120)))&(col('gender').isNotNull()),'Valid').otherwise('imvalid'))
patient_clean_df=patient_clean_df.withColumn('age_category',when(col('age')<18,'Child')\
                                               .otherwise(when((col('age')>=18) & (col('age')<60),'Adult')\
                                                   .otherwise('Senior')))
patient_clean_df=patient_clean_df.withColumn('record_validated_time',current_timestamp())
display(patient_clean_df)

patient_id,name,age,gender,admission_date,ward_no,status,_rescued_data,file_name,load_time,category,age_category,record_validated_time
P003,Rajesh Singh,60,M,20-08-2025,W03,Active,,part-00000-7ac5ec70-a6b2-4d84-af01-0cfb8ecde55d.c000.snappy.parquet,2025-11-16T09:48:55.000Z,Valid,Senior,2025-11-16T10:12:37.621Z
P008,Kritika Bansal,29,F,01-11-2025,W05,Discharged,,part-00000-7ac5ec70-a6b2-4d84-af01-0cfb8ecde55d.c000.snappy.parquet,2025-11-16T09:48:55.000Z,Valid,Adult,2025-11-16T10:12:37.621Z


In [0]:
from pyspark.sql import *
count_duplicates_df=patient_clean_df.groupBy('patient_id').agg(count('patient_id').alias('total_count'))
no_change_df=count_duplicates_df.filter(col('total_count')==1)
patient_clean_df=patient_clean_df.withColumn('start_At',col('load_time'))
valid_patients_df=no_change_df.select('patient_id').collect()
valid_patients_list=[row['patient_id'] for row in valid_patients_df]
valid_records_df=patient_clean_df.filter(col('patient_id').isin(valid_patients_list))
valid_records_df=valid_records_df.withColumn('is_current',lit(1))
valid_records_df=valid_records_df.withColumn('end_At',lit(None))
valid_records_df=valid_records_df.withColumn('start_At',col('load_time'))
new_change_df=count_duplicates_df.filter(col('total_count')>1)
duplicate_patients_df=new_change_df.select('patient_id').collect()
duplicate_patients_list=[row['patient_id'] for row in duplicate_patients_df]
duplicate_records_df=patient_clean_df.filter(col('patient_id').isin(duplicate_patients_list))
windows_spec=Window.partitionBy('patient_id').orderBy(col('load_time').desc())
duplicate_records_df=duplicate_records_df.withColumn('rank',dense_rank().over(windows_spec))
duplicate_records_df=duplicate_records_df.withColumn('is_current',when(col('rank')==1,lit(1)).otherwise(lit(0)))
duplicate_records_df=duplicate_records_df.withColumn('end_At',when(col('rank')>1,current_timestamp()).otherwise(None))
duplicate_records_df=duplicate_records_df.withColumn('start_At',when(col('rank')==1,current_timestamp()).otherwise(col('start_At'))).drop('rank')
patients_history_df=valid_records_df.unionByName(duplicate_records_df).orderBy('patient_id')
patients_active_df=patients_history_df.filter(col('is_current')==1).orderBy('patient_id')

patient_id,name,age,gender,admission_date,ward_no,status,_rescued_data,file_name,load_time,category,age_category,record_validated_time,start_At,is_current,end_At
P003,Rajesh Singh,60,M,20-08-2025,W03,Active,,part-00000-7ac5ec70-a6b2-4d84-af01-0cfb8ecde55d.c000.snappy.parquet,2025-11-16T09:48:55.000Z,Valid,Senior,2025-11-16T10:14:05.532Z,2025-11-16T09:48:55.000Z,1,
P008,Kritika Bansal,29,F,01-11-2025,W05,Discharged,,part-00000-7ac5ec70-a6b2-4d84-af01-0cfb8ecde55d.c000.snappy.parquet,2025-11-16T09:48:55.000Z,Valid,Adult,2025-11-16T10:14:05.532Z,2025-11-16T09:48:55.000Z,1,


In [0]:
patients_history_df.write.format('delta').mode('overwrite').save('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/patients_history')
patients_active_df.write.format('delta').mode('overwrite').save('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/patients_active')

In [0]:
patients_history_read_df=spark.read.format('delta').load('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/patients_active')
display(patients_history_read_df)

patient_id,name,age,gender,admission_date,ward_no,status,_rescued_data,file_name,load_time,category,age_category,record_validated_time,start_At,is_current,end_At
P003,Rajesh Singh,60,M,20-08-2025,W03,Active,,part-00000-7ac5ec70-a6b2-4d84-af01-0cfb8ecde55d.c000.snappy.parquet,2025-11-16T09:48:55.000Z,Valid,Senior,2025-11-16T10:15:01.310Z,2025-11-16T09:48:55.000Z,1,
P008,Kritika Bansal,29,F,01-11-2025,W05,Discharged,,part-00000-7ac5ec70-a6b2-4d84-af01-0cfb8ecde55d.c000.snappy.parquet,2025-11-16T09:48:55.000Z,Valid,Adult,2025-11-16T10:15:01.310Z,2025-11-16T09:48:55.000Z,1,


In [0]:
devices_bronze_raw_df=spark.read.format('delta').load('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/bronze/devices')
devices_modify_df=devices_bronze_raw_df.withColumn('file_name',col('_metadata.file_name'))\
                                       .withColumn('file_load_time',col('_metadata.file_modification_time'))
display(devices_modify_df)

device_id,device_type,install_date,location,patient_id,_rescued_data,file_name,file_load_time
D001,HeartRateMonitor,2025-10-16,ICU-1,P001,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z
D002,Oximeter,2025-09-29,Ward-2,P002,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z
D003,Thermometer,2025-08-21,Ward-3,P003,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z
D004,HeartRateMonitor,2025-10-26,ICU-2,P004,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z
D005,Oximeter,2025-07-16,Ward-4,P005,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z
D006,Thermometer,2025-10-19,Ward-2,P006,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z
D007,HeartRateMonitor,2025-09-13,Ward-3,P007,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z
D008,Oximeter,2025-11-02,Ward-5,P008,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z


In [0]:
devices_clean_df=devices_modify_df.withColumn('category',when((col('device_id').isNotNull())&(col('device_type').isNotNull())&(col('location').isNotNull())&(col('device_type').isin('HeartRateMonitor','BloodPressureSensor','Oximeter','TemperatureSensor','Thermometer')),'valid').otherwise('invalid'))
display(devices_clean_df)

device_id,device_type,install_date,location,patient_id,_rescued_data,file_name,file_load_time,category
D001,HeartRateMonitor,2025-10-16,ICU-1,P001,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid
D002,Oximeter,2025-09-29,Ward-2,P002,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid
D003,Thermometer,2025-08-21,Ward-3,P003,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid
D004,HeartRateMonitor,2025-10-26,ICU-2,P004,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid
D005,Oximeter,2025-07-16,Ward-4,P005,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid
D006,Thermometer,2025-10-19,Ward-2,P006,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid
D007,HeartRateMonitor,2025-09-13,Ward-3,P007,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid
D008,Oximeter,2025-11-02,Ward-5,P008,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid


In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
devices_clean_df=devices_clean_df.withColumn('start_At',current_timestamp())
devices_count_df=devices_clean_df.groupBy('device_id').agg(count('device_id').alias('total_count'))
device_duplicate_count_df=devices_count_df.filter(col('total_count')>1)
device_duplicate_device_df=device_duplicate_count_df.select('device_id').collect()
device_duplicate_device_list=[row['device_id'] for row in device_duplicate_device_df]
device_no_change_df=devices_count_df.filter(col('total_count')==1)
device_no_change_valid_df=device_no_change_df.select('device_id').collect()
device_no_change_valid_list=[row['device_id'] for row in device_no_change_valid_df]
device_valid_df=devices_clean_df.filter(col('device_id').isin(device_no_change_valid_list))
device_valid_df=device_valid_df.withColumn('is_current',lit(1))
device_valid_df=device_valid_df.withColumn('end_At',lit(None))
device_valid_df=device_valid_df.withColumn('start_At',current_timestamp())
device_duplicate_record_df=devices_clean_df.filter(col('device_id').isin(device_duplicate_device_list))
window_device_spec=Window.partitionBy('device_id').orderBy(col('file_load_time').desc())
device_duplicate_record_df=device_duplicate_record_df.withColumn('rank',dense_rank().over(window_device_spec))
device_duplicate_record_df=device_duplicate_record_df.withColumn('is_current',when(col('rank')==1,lit(1)).otherwise(lit(0)))
device_duplicate_record_df=device_duplicate_record_df.withColumn('end_At',when(col('rank')>1,current_timestamp()).otherwise(lit(None)))
device_duplicate_record_df=device_duplicate_record_df.withColumn('start_At',when(col('rank')==1,current_timestamp()).otherwise(lit(col('start_At')))).drop('rank')
device_history_df=device_valid_df.unionByName(device_duplicate_record_df)
device_active_df=device_history_df.filter(col('is_current')==1).orderBy('device_id')
display(device_active_df.orderBy('device_id'))

device_id,device_type,install_date,location,patient_id,_rescued_data,file_name,file_load_time,category,start_At,is_current,end_At
D001,HeartRateMonitor,2025-10-16,ICU-1,P001,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:17:25.375Z,1,
D002,Oximeter,2025-09-29,Ward-2,P002,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:17:25.375Z,1,
D003,Thermometer,2025-08-21,Ward-3,P003,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:17:25.375Z,1,
D004,HeartRateMonitor,2025-10-26,ICU-2,P004,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:17:25.375Z,1,
D005,Oximeter,2025-07-16,Ward-4,P005,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:17:25.375Z,1,
D006,Thermometer,2025-10-19,Ward-2,P006,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:17:25.375Z,1,
D007,HeartRateMonitor,2025-09-13,Ward-3,P007,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:17:25.375Z,1,
D008,Oximeter,2025-11-02,Ward-5,P008,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:17:25.375Z,1,


In [0]:
device_history_df.write.format('delta').mode('overwrite').save('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/devices_history')
device_active_df.write.format('delta').mode('overwrite').save('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/devices_active')

In [0]:
devices_active_read_df=spark.read.format('delta').load('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/devices_active')
display(devices_active_read_df)

device_id,device_type,install_date,location,patient_id,_rescued_data,file_name,file_load_time,category,start_At,is_current,end_At
D001,HeartRateMonitor,2025-10-16,ICU-1,P001,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:18:19.265Z,1,
D002,Oximeter,2025-09-29,Ward-2,P002,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:18:19.265Z,1,
D003,Thermometer,2025-08-21,Ward-3,P003,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:18:19.265Z,1,
D004,HeartRateMonitor,2025-10-26,ICU-2,P004,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:18:19.265Z,1,
D005,Oximeter,2025-07-16,Ward-4,P005,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:18:19.265Z,1,
D006,Thermometer,2025-10-19,Ward-2,P006,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:18:19.265Z,1,
D007,HeartRateMonitor,2025-09-13,Ward-3,P007,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:18:19.265Z,1,
D008,Oximeter,2025-11-02,Ward-5,P008,,part-00000-e1bc7f76-85e4-4767-9a72-0ff054dfa78b.c000.snappy.parquet,2025-11-16T09:51:50.000Z,valid,2025-11-16T10:18:19.265Z,1,


In [0]:
devices_tagged_patient_df=devices_active_read_df.select('patient_id').collect()
devices_patient_tagged_list=[row['patient_id'] for row in devices_tagged_patient_df]
devices_not_tagged_patients_df=patients_active_df.filter(~col('patient_id').isin(devices_patient_tagged_list))
devices_not_tagged_patients_df.write.format('delta').mode('overwrite').save('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/devices_not_tagged_patients')

In [0]:
device_patient_merge_df=patients_active_df.join(device_active_df,patients_active_df.patient_id==device_active_df.patient_id,'left')
device_patient_merge_df.select(patients_active_df['patient_id'], 'name', 'age', 'gender', 'admission_date', patients_active_df['ward_no'], 'status', 'age_category', 'device_id', 'device_type', 'install_date', 'location').write.format('delta').mode('overwrite').save('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/device_patient_merge_df')

In [0]:
from pyspark.sql.functions import *
from pyspark.sql import *
schema='reading string,device_id string,heart_rate int,bp_systolic int,bp_diastolic int,spo2 int,temperature double,timestamp timestamp,_rescued_data string'
vital_streams_bronze_df=spark.read.format('delta').load('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/bronze/vital_streams')
vital_streams_modify_df=vital_streams_bronze_df.withColumn('file_name',col('_metadata.file_name'))
vital_streams_modify_df=vital_streams_modify_df.withColumn('load_time',current_timestamp())
vital_streams_modify_df=vital_streams_modify_df.withColumn('heart_rate',col('heart_rate').cast('int'))
vital_streams_modify_df=vital_streams_modify_df.withColumn('bp_systolic',col('bp_systolic').cast('int'))
vital_streams_modify_df=vital_streams_modify_df.withColumn('bp_diastolic',col('bp_diastolic').cast('int'))
vital_streams_modify_df=vital_streams_modify_df.withColumn('spo2',col('spo2').cast('int'))
vital_streams_modify_df=vital_streams_modify_df.withColumn('temperature',col('temperature').cast('double'))
display(vital_streams_modify_df)

bp_diastolic,bp_systolic,device_id,heart_rate,reading_id,spo2,temperature,timestamp,_rescued_data,file_name,load_time
78,120,D001,82,R001,98,36.8,2025-11-12T07:00:00Z,,part-00000-c6765753-21c9-4717-b1ad-78627aaa6869.c000.snappy.parquet,2025-11-16T10:42:04.120Z
100,160,D001,125,R002,91,38.2,2025-11-12T07:10:00Z,,part-00000-c6765753-21c9-4717-b1ad-78627aaa6869.c000.snappy.parquet,2025-11-16T10:42:04.120Z
77,118,D002,74,R003,96,36.7,2025-11-12T07:15:00Z,,part-00000-c6765753-21c9-4717-b1ad-78627aaa6869.c000.snappy.parquet,2025-11-16T10:42:04.120Z


In [0]:
vital_streams_clean_df=vital_streams_modify_df.withColumn('category',when(col('heart_rate').between(40,180)&col('bp_systolic').between(80,200)&col('bp_diastolic').between(50,120)&col('spo2').between(80,100)&col('temperature').between(30,45),'valid').otherwise('invalid'))


bp_diastolic,bp_systolic,device_id,heart_rate,reading_id,spo2,temperature,timestamp,_rescued_data,file_name,load_time,category
78,120,D001,82,R001,98,36.8,2025-11-12T07:00:00Z,,part-00000-c6765753-21c9-4717-b1ad-78627aaa6869.c000.snappy.parquet,2025-11-16T10:42:11.163Z,valid
100,160,D001,125,R002,91,38.2,2025-11-12T07:10:00Z,,part-00000-c6765753-21c9-4717-b1ad-78627aaa6869.c000.snappy.parquet,2025-11-16T10:42:11.163Z,valid
77,118,D002,74,R003,96,36.7,2025-11-12T07:15:00Z,,part-00000-c6765753-21c9-4717-b1ad-78627aaa6869.c000.snappy.parquet,2025-11-16T10:42:11.163Z,valid


In [0]:
vital_streams_clean_df.write.format('delta').mode('overwrite').save('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/vital_streams')

In [0]:
device_patient_silver_df=spark.read.format('delta').load('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/device_patient_merge_df')
device_patient_vital_stream_df=device_patient_silver_df.join(vital_streams_clean_df,device_patient_silver_df.device_id==vital_streams_clean_df.device_id,how='left').drop(vital_streams_clean_df.device_id,vital_streams_clean_df.load_time,vital_streams_clean_df.file_name,vital_streams_clean_df.category,vital_streams_clean_df._rescued_data)
device_patient_vital_stream_df.write.format('delta').mode('overwrite').save('abfss://catalog@storageaccounthealthetl.dfs.core.windows.net/data/silver/device_patient_vital_streams_merge_df')
display(device_patient_vital_stream_df)

patient_id,name,age,gender,admission_date,ward_no,status,age_category,device_id,device_type,install_date,location,bp_diastolic,bp_systolic,heart_rate,reading_id,spo2,temperature,timestamp
P003,Rajesh Singh,60,M,20-08-2025,W03,Active,Senior,D003,Thermometer,2025-08-21,Ward-3,,,,,,,
P008,Kritika Bansal,29,F,01-11-2025,W05,Discharged,Adult,D008,Oximeter,2025-11-02,Ward-5,,,,,,,
