1. Configuration


In [0]:
dbutils.widgets.text('root_path','FileStore/')
dbutils.widgets.text('synth_in', 'FileStore/shared_uploads/omkars1202@gmail.com/')
root_path=dbutils.widgets.get('root_path')
synth_in=dbutils.widgets.get('synth_in')

In [0]:
import os
from pyspark.sql import functions as F
from pyspark.sql import Window

In [0]:
synthea_path  = synth_in
delta_root_path = f"{root_path}delta/"
print(f'Synthea Raw Path: {synthea_path}\n Delta Output Path:{delta_root_path}')

Synthea Raw Path: FileStore/shared_uploads/omkars1202@gmail.com/
 Delta Output Path:FileStore/delta/


In [0]:
print(f'Synthea Raw Path: {synthea_path}\nDelta Output Path: {delta_root_path}')

Synthea Raw Path: FileStore/shared_uploads/omkars1202@gmail.com/
Delta Output Path: FileStore/delta/


In [0]:
display(dbutils.fs.ls(synthea_path))


path,name,size,modificationTime
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/allergies.csv,allergies.csv,70010,1708397768000
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/careplans.csv,careplans.csv,672599,1708397768000
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/conditions.csv,conditions.csv,1062972,1708397768000
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/devices.csv,devices.csv,17498,1708397768000
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/encounters.csv,encounters.csv,16474491,1708397774000
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/imaging_studies.csv,imaging_studies.csv,205364,1708397769000
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/immunizations.csv,immunizations.csv,2225013,1708397771000
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/medications.csv,medications.csv,10884009,1708397775000
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/observations.csv,observations.csv,43094924,1708397785000
dbfs:/FileStore/shared_uploads/omkars1202@gmail.com/organizations.csv,organizations.csv,169568,1708397776000


In [0]:
datasets= ['allergies',
          'careplans',
          'conditions',
          'devices',
          'encounters',
          'imaging_studies',
          'immunizations',
          'medications',
          'observations',
          'organizations',
          'patients',
          'payer_transitions',
          'payers',
          'procedures',
          'providers',
          'supplies'
         ]

2. CSV Files as Spark Dataframes

In [0]:
# create a python dictionary of dataframes
df_dict = {}
for dataset in datasets:
    df_dict[dataset] = spark.read.csv('dbfs:/{}/{}.csv'.format(synthea_path,dataset),header=True,inferSchema=True)

In [0]:
# Print the dictionary keys (file names)
print("DataFrame keys (file names):", df_dict.keys())

DataFrame keys (file names): dict_keys(['allergies', 'careplans', 'conditions', 'devices', 'encounters', 'imaging_studies', 'immunizations', 'medications', 'observations', 'organizations', 'patients', 'payer_transitions', 'payers', 'procedures', 'providers', 'supplies'])


In [0]:
import pandas as pd
dataframes=[(x[0],x[1].count()) for x in list(df_dict.items())]
display(pd.DataFrame(dataframes,columns=['dataset','n_records']).sort_values(by=['n_records'],ascending=False))

dataset,n_records
observations,299697
encounters,53346
medications,42989
procedures,34981
immunizations,15478
conditions,8376
providers,5855
payer_transitions,3801
careplans,3483
patients,1171


3. De-identify Patient PHI (Masking to be performed in the Production ETL process)

In [0]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import StringType
import pandas as pd
import hashlib

In [0]:
def mask_pii(pii_col: pd.Series) -> pd.Series:
    '''
    mask_pii: function takes a pandas series and returned sha1 hash values of elements
    '''
    sha_value = pii_col.map(lambda x: hashlib.sha1(x.encode()).hexdigest())
    return sha_value
 
mask_pii_udf = pandas_udf(mask_pii, returnType=StringType())

We then use this function to mask pii columns for a given set of columns, namely:

['SSN','DRIVERS','PASSPORT','PREFIX','FIRST','LAST','SUFFIX','MAIDEN','BIRTHPLACE','ADDRESS'].

In [0]:
pii_cols=['SSN','DRIVERS','PASSPORT','PREFIX','FIRST','LAST','SUFFIX','MAIDEN','BIRTHPLACE','ADDRESS']
patients_obfuscated = df_dict['patients']
for c in pii_cols:
  masked_col_name = c+'_masked'
  patients_obfuscated = patients_obfuscated.withColumn(c,F.coalesce(c,F.lit('null'))).withColumn(masked_col_name,mask_pii_udf(c))

Replacing the Original records with the de-identified records


In [0]:
df_dict['patients']=patients_obfuscated.drop(*pii_cols)

4. Writing Tables to Dellta Lake

In [0]:
try:
  dbutils.fs.ls(delta_root_path)
except:
  print(f'Path {delta_root_path} does not exist, creating path {delta_root_path}')
  dbutils.fs.mkdirs(delta_root_path)
print(f'Delta tables will be stored in {delta_root_path}')

Delta tables will be stored in FileStore/delta/


In [0]:
for table_name in datasets:
  table_path = f'dbfs:/FileStore/delta'+ '/bronze/{}'.format(table_name)
  df_dict[table_name].write.format('delta').mode("overwrite").save(table_path)

In [0]:
display(dbutils.fs.ls(f'{delta_root_path}/bronze/'))

path,name,size,modificationTime
dbfs:/FileStore/delta/bronze/allergies/,allergies/,0,0
dbfs:/FileStore/delta/bronze/careplans/,careplans/,0,0
dbfs:/FileStore/delta/bronze/conditions/,conditions/,0,0
dbfs:/FileStore/delta/bronze/devices/,devices/,0,0
dbfs:/FileStore/delta/bronze/encounters/,encounters/,0,0
dbfs:/FileStore/delta/bronze/imaging_studies/,imaging_studies/,0,0
dbfs:/FileStore/delta/bronze/immunizations/,immunizations/,0,0
dbfs:/FileStore/delta/bronze/medications/,medications/,0,0
dbfs:/FileStore/delta/bronze/observations/,observations/,0,0
dbfs:/FileStore/delta/bronze/organizations/,organizations/,0,0


In [0]:
silver_path = "dbfs:/FileStore/delta/Silver/"

# the Silver layer directory
dbutils.fs.mkdirs(silver_path)

Out[148]: True

In [0]:
# Define path for Bronze and Silver layers
bronze_path = "dbfs:/FileStore/delta/bronze/"
silver_path = "dbfs:/FileStore/delta/Silver/"

# Function to read Delta table and return cleaned and transformed DataFrame
def clean_and_transform_df(dataset):
    df = spark.read.format("delta").load(f"{bronze_path}{dataset}")
    # Here are some examples, adjust based on your needs:
    df.dropDuplicates()
    df.dropna()

    # Write cleaned and transformed data to silver layer
    df.write.format("delta").save(f"{silver_path}{dataset}")

# Process each dataset
for dataset in datasets:
    clean_and_transform_df(dataset)

[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-817673786071447>:17[0m
[1;32m     15[0m [38;5;66;03m# Process each dataset[39;00m
[1;32m     16[0m [38;5;28;01mfor[39;00m dataset [38;5;129;01min[39;00m datasets:
[0;32m---> 17[0m     clean_and_transform_df(dataset)

File [0;32m<command-817673786071447>:13[0m, in [0;36mclean_and_transform_df[0;34m(dataset)[0m
[1;32m     10[0m df[38;5;241m.[39mdropna()
[1;32m     12[0m [38;5;66;03m# Write cleaned and transformed data to silver layer[39;00m
[0;32m---> 13[0m [43mdf[49m[38;5;241;43m.[39;49m[43mwrite[49m[38;5;241;43m.[39;49m[43mformat[49m[43m([49m[38;5;124;43m"[39;49m[38;5;124;43mdelta[39;49m[38;5;124;43m"[39;49m[43m)[49m[38;5;241;43m.[39;49m[43msave[49m[43m([49m[38;5;124;43mf[39;49m[38;5;124;43m"[39;49m[38;5;132;43;01m{[39;49;00m[43msilve

In [0]:
clean_df = clean_and_transform_df('patients')
clean_df.write.quality_checked_df.write.mode("overwrite").format("delta").save(silver_path + dataset)

