In [0]:
#################################
# This is one time setup to read the CD XML files, but for a solution I would go with 2 options
# 1. Databricks streaming - using Autoloader and Streaming tables which will read all files from Cloud folder location and write to Delta Lake. Also it will keep track of new files and read the new files only when available. This will help to reduce the cost of running the solution, and maintaning the watermark for reading files. This could be realtime streaming vs micro batches where we can cerate this as a workflow and run it as a scheduled job. Where first it will run the Autoloader Noebook to read new files and ingest them and then the subsequent notebook to read the XML files from Delta Lake and parse/process the data.
# 2. Another option is to have standard solution of keeping watermark in some control table lower/upper limit for file create/modified date and then read the files from the folder location based on the watermark. This will be a batch job and will read all the files from the folder location and process them which falls between watermark.
#################################
from pyspark.sql.functions import regexp_extract, col, split

df = spark.read.option("rowTag", "ClinicalDocument").option("includeMetadata", "true").format("xml").load("/Volumes/workspace/default/raw_xml/")
df = df.withColumn("filename", df["_metadata.file_path"])

df = df.withColumn("file_id", regexp_extract(col("filename"), ".*/([^/]+)$", 1)
                ).withColumn("file_id_split", split(col("file_id"), "_")
                ).withColumn("patient_id", col("file_id_split").getItem(0)
                ).withColumn("doc_id", col("file_id_split").getItem(1))

#df.select("file_id", "patient_id", "doc_id").show(truncate=False)

print(df.count())

558


In [0]:
from pyspark.sql.functions import to_date, col, when, lit, size
#########################
# This is again a one time setup to read the XML files, but for a solution I would go with creating UDF to parse the XML files and then create the tables.
#########################
# ========================
# 1. Extract Patient Info
# ========================
df_patient = df.select(
    col("doc_id"),
    col("patient_id"),
    col("recordTarget.patientRole.patient.name").getItem(0).getField("given").getItem(0).getField("_VALUE").alias("first_name"),
    when(
        size(col("recordTarget.patientRole.patient.name").getItem(0).getField("given")) > 1,
        col("recordTarget.patientRole.patient.name").getItem(0).getField("given").getItem(1).getField("_VALUE")
    ).otherwise(lit(None)).alias("middle_name"),
    col("recordTarget.patientRole.patient.name").getItem(0).getField("family").getField("_VALUE").alias("last_name"),
    col("recordTarget.patientRole.patient.birthTime._value").alias("birth_date"),
    col("recordTarget.patientRole.patient.administrativeGenderCode._code").alias("gender"),
    col("recordTarget.patientRole.telecom._value").cast("string").alias("phone"),
    col("recordTarget.patientRole.addr.streetAddressLine").getItem(0).getItem(0).alias("address"),
    col("recordTarget.patientRole.addr.county").getItem(0).alias("county"),
    col("recordTarget.patientRole.addr.city").getItem(0).alias("city"),
    col("recordTarget.patientRole.addr.state").getItem(0).alias("state"),
    col("recordTarget.patientRole.addr.postalCode").getItem(0).alias("postalCode"),
    col("recordTarget.patientRole.addr.country").getItem(0).alias("country")
)

print(df_patient.count())
#display(df_patient)

In [0]:
from pyspark.sql.functions import lit, col, regexp_extract, array, concat_ws

# ==========================
# 2. Extract Medications
# ==========================
df_components = df.select(
    col("doc_id"),
    col("patient_id"),
    col("component.structuredBody.component").alias("components")
)

df_medications = df_components.selectExpr("doc_id","patient_id", "explode(components) as med_section").filter("med_section.section.title == 'Medications'")\
            .selectExpr("doc_id","patient_id", "explode(med_section.section.text.paragraph) as medications_text")

df_medications = df_medications.withColumn("medication_id", col("medications_text._id")
                            ).withColumn("medication_period", col("medications_text._VALUE").cast("string")
                            ).withColumn("medication_content", col("medications_text.content._VALUE").cast("string")
                        ).drop("medications_text")
#display(df_medications)
#df_medications.printSchema()
print(df_medications.count())

doc_id,patient_id,medication_id,medication_period,medication_content
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3996435959,[Start Date: 2021-12-22 Status: Ordered],"[acetaminophen 325 mg oral tablet, 650 mg 2 Tab, Oral, Tab, Q6H, PRN Pain (Moderate 4-6), 0 Refill(s)]"
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3996436145,[Start Date: 2021-12-22 Status: Ordered],"[amiodarone 200 mg oral tablet, 200 mg 1 Tab, Oral, Tab, Daily, # 30 Tab, 0 Refill(s), Pharmacy: CVS/pharmacy #6199, cm, 2021-11-29 6:21:00 EDT, CLINICALHEIGHT, 111.36, kg, 2021-10-12 21:06:00 EST, CLINICALWEIGHT, 172.72]"
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3996436301,[Start Date: 2021-12-22 Status: Ordered],"[amLODIPine 10 mg oral tablet, 10 mg 1 Tab, Oral, Tab, Daily, # 30 Tab, 0 Refill(s), Pharmacy: CVS/pharmacy #6199, cm, 2021-11-29 6:21:00 EDT, CLINICALHEIGHT, 111.36, kg, 2021-10-12 21:06:00 EST, CLINICALWEIGHT, 172.72]"
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3996436327,[Start Date: 2021-12-22 Status: Ordered],"[apixaban 5 mg oral tablet, 5 mg 1 Tab, Oral, Tab, BID, # 60 Tab, 0 Refill(s), Pharmacy: CVS/pharmacy #6199, cm, 2021-11-29 6:21:00 EDT, CLINICALHEIGHT, 111.36, kg, 2021-10-12 21:06:00 EST, CLINICALWEIGHT, 172.72]"
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3996433147,[Start Date: 2021-12-22 Status: Ordered],"[aspirin 325 mg oral tablet, 325 mg 1 Tab, Oral, Tab, Daily, # 30 Tab, 0 Refill(s), Pharmacy: CVS/pharmacy #6199, cm, 2021-11-29 6:21:00 EDT, CLINICALHEIGHT, 111.36, kg, 2021-10-12 21:06:00 EST, CLINICALWEIGHT, 172.72]"
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3702531637,[Start Date: 2021-08-08 Status: Ordered],"[atorvastatin 20 mg oral tablet, 20 mg 1 Tab, Oral, At Bedtime, 0 Refill(s)]"
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3996486551,[Start Date: 2021-12-22 Status: Ordered],"[Basaglar KwikPen 100 units/mL subcutaneous solution, 48 Units, SubCutaneous, Daily, Take in the morning Rotate injection sites, # 10 mL, 0 Refill(s), Pharmacy: CVS/pharmacy #6199, cm, 2021-11-29 6:21:00 EDT, CLINICALHEIGHT, 111.36, kg, 2021-10-12 21:06:00 EST, CLINICALWEIGHT, 172.72]"
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3996433407,[Start Date: 2021-12-22 Status: Ordered],"[bumetanide 2 mg oral tablet, See Instructions, Take 1.5 Tab Oral BID, # 90 Tab, 11 Refill(s), Pharmacy: CVS/pharmacy #6199, cm, 2021-11-29 6:21:00 EDT, CLINICALHEIGHT, 111.36, kg, 2021-10-12 21:06:00 EST, CLINICALWEIGHT, 172.72]"
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3996436543,[Start Date: 2021-12-22 Status: Ordered],"[calcitriol 0.25 mcg oral capsule, 0.25 mcg 1 Cap, Oral, Cap, Daily, # 30 Cap, 0 Refill(s), Pharmacy: CVS/pharmacy #6199, cm, 2021-11-29 6:21:00 EDT, CLINICALHEIGHT, 111.36, kg, 2021-10-12 21:06:00 EST, CLINICALWEIGHT, 172.72]"
69546483ef4f4bcefec443870173a0b7c3d10f30,2xv0526m-4n5t-7295-hg96-02v51m626436,MEDICATION3996437357,[Start Date: 2021-12-22 Status: Ordered],"[cholecalciferol 1000 intl units oral capsule, 1,000 Int Units 1 Cap, Oral, Cap, Daily, # 30 Cap, 0 Refill(s), Pharmacy: CVS/pharmacy #6199, cm, 2021-11-29 6:21:00 EDT, CLINICALHEIGHT, 111.36, kg, 2021-10-12 21:06:00 EST, CLINICALWEIGHT, 172.72]"


root
 |-- doc_id: string (nullable = true)
 |-- patient_id: string (nullable = true)
 |-- medication_id: string (nullable = true)
 |-- medication_period: string (nullable = true)
 |-- medication_content: string (nullable = true)



In [0]:

# ==========================
# 3. Extract Problems
# ==========================

df_problems = df_components.selectExpr("doc_id", "patient_id", "explode(components) as problem_section").filter("problem_section.section.title == 'Problem List'") \
.selectExpr("doc_id", "patient_id", "explode(problem_section.section.text.paragraph) as problem_text")

df_problems = df_problems.selectExpr("doc_id", "patient_id", "explode(problem_text.content) as problem_content")
df_problems = df_problems.withColumn("problem_id", col("problem_content._ID")
                                    ).withColumn("problem_value", col("problem_content._VALUE")
                                    ).drop("problem_content")

#display(df_problems)
print(df_problems.count())


doc_id,patient_id,problem_id,problem_value
8f298e537ae90e2e2d494a8e27c53ad552dd32a7,2xv0526m-4n5t-7295-hg96-02v51m626436,PROBCMT63774641,bilateral lower extremity
8f298e537ae90e2e2d494a8e27c53ad552dd32a7,2xv0526m-4n5t-7295-hg96-02v51m626436,PROBCMT63774723,in bilateral feet
8f298e537ae90e2e2d494a8e27c53ad552dd32a7,2xv0526m-4n5t-7295-hg96-02v51m626436,PROBCMT63774619,history of
c9711a567e5f2788ab6c439157f9a2718552b0b4,2xv0526m-4n5t-7295-hg96-02v51m626436,NOPROBINFO,No data available for this section
f6b08dec138fea31b1c44819ad726aa437a23646,207al31q-dq91-7pf3-idft-37u74m6471zf,PROBCMT13593561,COPD - documented in patient history.
f6b08dec138fea31b1c44819ad726aa437a23646,207al31q-dq91-7pf3-idft-37u74m6471zf,PROBCMT15856403,COPD - documented in patient history.
f6b08dec138fea31b1c44819ad726aa437a23646,207al31q-dq91-7pf3-idft-37u74m6471zf,PROBCMT13593569,Diagnosis or History of Type 1 Diabetes - documented in patient history.
f6b08dec138fea31b1c44819ad726aa437a23646,207al31q-dq91-7pf3-idft-37u74m6471zf,PROBCMT15856411,Diagnosis or History of Type 1 Diabetes - documented in patient history.
f6b08dec138fea31b1c44819ad726aa437a23646,207al31q-dq91-7pf3-idft-37u74m6471zf,PROBCMT13380369,Current Every day smoker or Heavy Tobacco Smoker - documented in patient history dependent habits
f6b08dec138fea31b1c44819ad726aa437a23646,207al31q-dq91-7pf3-idft-37u74m6471zf,PROBCMT15856395,Current Every day smoker or Heavy Tobacco Smoker - documented in patient history dependent habits


In [0]:
############################
# This is temporary solution and for better solution these tables will be external delta tables. All data will be added using upsert as delta table keeps track of change history. 
############################
df_patient.write.mode("overwrite").saveAsTable("patient")
df_medications.write.mode("overwrite").saveAsTable("medications")
df_problems.write.mode("overwrite").saveAsTable("problems")