In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, expr

# Initialize Spark session
spark = SparkSession.builder \
    .appName("FHIR Data Pipeline") \
    .getOrCreate()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/29 17:26:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/29 17:26:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:

# File path
file_path = "data/fhir/samples"

#Output path
output_path = "etl"

# Read the JSON file
data = spark.read.json(file_path, multiLine=True)

# Explode the entry array
entries = data.select(explode(col("entry")).alias("entry"))

# Cache entries because it is used to derive all subsequent dataframes
entries.cache()


24/11/29 17:26:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[entry: struct<fullUrl:string,request:struct<method:string,url:string>,resource:struct<abatementDateTime:string,activity:array<struct<detail:struct<code:struct<coding:array<struct<code:string,display:string,system:string>>,text:string>,location:struct<display:string>,status:string>>>,address:array<struct<city:string,country:string,extension:array<struct<extension:array<struct<url:string,valueDecimal:double>>,url:string>>,line:array<string>,postalCode:string,state:string>>,addresses:array<struct<reference:string>>,authoredOn:string,billablePeriod:struct<end:string,start:string>,birthDate:string,careTeam:array<struct<provider:struct<reference:string>,reference:string,role:struct<coding:array<struct<code:string,display:string,system:string>>>,sequence:bigint>>,category:array<string>,claim:struct<reference:string>,class:struct<code:string,system:string>,clinicalStatus:struct<coding:array<struct<code:string,system:string>>>,code:struct<coding:array<struct<code:string,display:string

### Patient

In [3]:

# Filter patient resources
patients = entries.filter(col("entry.resource.resourceType") == "Patient") \
                  .select(col("entry.resource.*"))

# Extract the "official" name
official_name = expr("""
    filter(name, x -> x.use = 'official')[0]
""")

# Extract geolocation fields
geolocation = expr("""
    filter(address[0].extension, x -> x.url = 'http://hl7.org/fhir/StructureDefinition/geolocation')[0].extension
""")

# Extract all languages from communication as a list
communication_languages = expr("""
    transform(communication, x -> x.language.text)
""")

# Extract identifier types
identifier_types = expr("""
    transform(identifier, x -> x.type.coding[0].display)
""")

# Create the enhanced patient DataFrame
patient_df = patients.select(
    col("id").alias("patient_id"),
    identifier_types.alias("identifier_types"),
    official_name.getField("family").alias("last_name"),
    official_name.getField("given").getItem(0).alias("first_name"),
    col("gender").alias("gender"),
    col("birthDate").alias("birth_date"),
    col("address").getItem(0).getField("city").alias("city"),
    col("address").getItem(0).getField("state").alias("state"),
    col("address").getItem(0).getField("country").alias("country"),
    col("address").getItem(0).getField("postalCode").alias("postal_code"),
    geolocation.getItem(0).getField("valueDecimal").alias("latitude"),
    geolocation.getItem(1).getField("valueDecimal").alias("longitude"),
    col("telecom").getItem(0).getField("value").alias("phone"),
    col("maritalStatus.text").alias("marital_status"),
    col("extension").getItem(0).getField("valueString").alias("mothers_maiden_name"),
    col("extension").getItem(1).getField("valueAddress").getField("city").alias("birthplace_city"),
    col("extension").getItem(1).getField("valueAddress").getField("state").alias("birthplace_state"),
    col("extension").getItem(1).getField("valueAddress").getField("country").alias("birthplace_country"),
    col("extension").getItem(2).getField("valueDecimal").alias("disability_adjusted_life_years"),
    col("extension").getItem(3).getField("valueDecimal").alias("quality_adjusted_life_years"),
    col("multipleBirthBoolean").alias("multiple_birth"),
    communication_languages.alias("languages")
)

# Show the resulting DataFrame
patient_df.show(truncate=False)


                                                                                

+------------------------------------+----------------------------------------------------------------------------------------+-------------+------------+------+----------+-----------------------+-------------------------+-------+-----------+------------------+-------------------+------------+--------------+-------------------------+---------------+-------------------------+------------------+------------------------------+---------------------------+--------------+---------+
|patient_id                          |identifier_types                                                                        |last_name    |first_name  |gender|birth_date|city                   |state                    |country|postal_code|latitude          |longitude          |phone       |marital_status|mothers_maiden_name      |birthplace_city|birthplace_state         |birthplace_country|disability_adjusted_life_years|quality_adjusted_life_years|multiple_birth|languages|
+------------------------------------+

In [4]:
# Save the DataFrame to Parquet format in the "patients" folder
patient_df.write.mode("overwrite").parquet(f"{output_path}/patient")

print("Patient DataFrame has been successfully saved in Parquet format in the 'patient' folder.")

24/11/29 17:27:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/11/29 17:27:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/11/29 17:27:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
[Stage 5:>                                                        (0 + 10) / 10]

Patient DataFrame has been successfully saved in Parquet format in the 'patients' folder.


24/11/29 17:27:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/11/29 17:27:00 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

### Encounter

In [5]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import ArrayType, StructType, StructField, StringType

# Define the schema for the 'type' field
type_schema = ArrayType(
    StructType([
        StructField("coding", ArrayType(
            StructType([
                StructField("system", StringType(), True),
                StructField("code", StringType(), True),
                StructField("display", StringType(), True)
            ])
        ), True),
        StructField("text", StringType(), True)
    ])
)


# Filter encounter resources
encounters = entries.filter(col("entry.resource.resourceType") == "Encounter") \
                    .select(col("entry.resource.*"))

# Parse the 'type' field from JSON string to structured format
encounters = encounters.withColumn("type_parsed", from_json(col("type"), type_schema))

# Explode the participant array to create one row per participant
encounters_with_participants = encounters.select(
    col("id").alias("encounter_id"),
    col("status").alias("status"),
    col("class.code").alias("class_code"),  #todo - find a better column name
    col("type_parsed").getItem(0).getField("text").alias("type_text"),  #todo - find a better column name
    col("subject.reference").alias("patient_reference"),
    col("period.start").alias("start_time"),
    col("period.end").alias("end_time"),
    col("serviceProvider.reference").alias("service_provider_id"),
    col("serviceProvider.display").alias("service_provider_display"),
    explode(col("participant")).alias("participant")
)

# Extract participant details
encounter_with_participant_df = encounters_with_participants.select(
    col("encounter_id"),
    col("status"),
    col("class_code"),
    col("type_text"),
    col("start_time"),
    col("end_time"),
    col("patient_reference"),
    col("service_provider_id"),
    col("service_provider_display"),
    col("participant.individual.display").alias("participant_individual_display"),
    col("participant.individual.reference").alias("participant_individual_reference"),
    col("participant.period.start").alias("participant_period_start"),
    col("participant.period.end").alias("participant_period_end"),
    col("participant.type").getItem(0).getField("coding").getItem(0).getField("code").alias("participant_type_code"),
    col("participant.type").getItem(0).getField("coding").getItem(0).getField("display").alias("participant_type_display")
)

# Show the resulting DataFrame
encounter_with_participant_df.show(truncate=False)


+------------------------------------+--------+----------+------------------------------------------+-------------------------+-------------------------+---------------------------------------------+-------------------------------------------------------------------------------------------------------+--------------------------------+------------------------------+-----------------------------------------------------------------+-------------------------+-------------------------+---------------------+------------------------+
|encounter_id                        |status  |class_code|type_text                                 |start_time               |end_time                 |patient_reference                            |service_provider_id                                                                                    |service_provider_display        |participant_individual_display|participant_individual_reference                                 |participant_period_start |particip

In [6]:
# Save the DataFrame to Parquet
encounter_with_participant_df.write.mode("overwrite").parquet(f"{output_path}/encounter")

print("Participant details have been successfully extracted and saved in the 'encounter' folder.")


Participant details have been successfully extracted and saved in the 'encounter' folder.


24/11/29 17:27:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/11/29 17:27:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/11/29 17:27:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/11/29 17:27:01 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


### Condition

In [7]:
# Filter for Condition resources
conditions = entries.filter(col("entry.resource.resourceType") == "Condition") \
                    .select(col("entry.resource.*"))

# Extract relevant fields from Condition
condition_df = conditions.select(
    col("id").alias("condition_id"),
    col("clinicalStatus.coding").getItem(0).getField("code").alias("clinical_status"),
    col("verificationStatus.coding").getItem(0).getField("code").alias("verification_status"),
    col("code.text").alias("condition_code_display"),
    col("subject.reference").alias("patient_reference"),
    col("encounter.reference").alias("encounter_reference"),
    col("onsetDateTime").alias("onset_datetime"),
    col("abatementDateTime").alias("abatement_datetime"),
    col("recordedDate").alias("recorded_date")
)

# Show the resulting DataFrame
condition_df.show(truncate=False)

+------------------------------------+---------------+-------------------+-------------------------------------------+---------------------------------------------+---------------------------------------------+-------------------------+-------------------------+-------------------------+
|condition_id                        |clinical_status|verification_status|condition_code_display                     |patient_reference                            |encounter_reference                          |onset_datetime           |abatement_datetime       |recorded_date            |
+------------------------------------+---------------+-------------------+-------------------------------------------+---------------------------------------------+---------------------------------------------+-------------------------+-------------------------+-------------------------+
|ef7aaf17-7b06-76be-254d-04b736547a25|active         |confirmed          |Chronic sinusitis (disorder)               |urn:uuid:7ca297

In [8]:
# Save the DataFrame to Parquet
condition_df.write.mode("overwrite").parquet(f"{output_path}/condition")

print("Condition DataFrame has been successfully saved in Parquet format in the 'condition' folder.")

Condition DataFrame has been successfully saved in Parquet format in the 'conditions' folder.


### Medication Request

In [9]:
# Filter for MedicationRequest resources
medication_requests = entries.filter(col("entry.resource.resourceType") == "MedicationRequest") \
                             .select(col("entry.resource.*"))

# Explode the dosageInstruction array to handle multiple dosage instructions
medication_requests_exploded = medication_requests.withColumn("dosageInstruction", explode(col("dosageInstruction")))

# Extract relevant fields from MedicationRequest
medication_request_df = medication_requests_exploded.select(
    col("id").alias("medication_request_id"),
    col("status").alias("status"),
    col("intent").alias("intent"),
    col("medicationCodeableConcept.coding").getItem(0).getField("display").alias("medication_display"),
    col("subject.reference").alias("patient_reference"),
    col("requester.reference").alias("requester_reference"),
    col("requester.display").alias("requester_display"),
    col("encounter.reference").alias("encounter_reference"),
    col("authoredOn").alias("authored_on"),
    col("dosageInstruction.text").alias("dosage_text"),
    col("dosageInstruction.timing.repeat.frequency").alias("dosage_frequency"),
    col("dosageInstruction.timing.repeat.period").alias("dosage_period"),
    col("dosageInstruction.timing.repeat.periodUnit").alias("dosage_period_unit")
)

# Show the resulting DataFrame
medication_request_df.show(truncate=False)

+------------------------------------+-------+------+--------------------------------------------------------------+---------------------------------------------+-----------------------------------------------------------------+-----------------------------+---------------------------------------------+-------------------------+-------------------------------------------+----------------+-------------+------------------+
|medication_request_id               |status |intent|medication_display                                            |patient_reference                            |requester_reference                                              |requester_display            |encounter_reference                          |authored_on              |dosage_text                                |dosage_frequency|dosage_period|dosage_period_unit|
+------------------------------------+-------+------+--------------------------------------------------------------+----------------------------------

In [10]:
# Save the DataFrame to Parquet
medication_request_df.write.mode("overwrite").parquet(f"{output_path}/medication_request")

print("MedicationRequest DataFrame with multiple dosage instructions has been successfully saved in the 'medication_request' folder.")

MedicationRequest DataFrame with multiple dosage instructions has been successfully saved in the 'medication_requests' folder.


24/11/29 17:27:02 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


### Claim

In [11]:
from pyspark.sql.types import DoubleType

# Filter for Claim resources
claims = entries.filter(col("entry.resource.resourceType") == "Claim") \
                .select(col("entry.resource.*"))

# Define the schema for the 'type' field
type_schema = ArrayType(
    StructType([
        StructField("coding", ArrayType(
            StructType([
                StructField("system", StringType(), True),
                StructField("code", StringType(), True)
            ])
        ), True)
    ])
)

# Define the schema for the 'total' field
total_schema = StructType([
    StructField("value", DoubleType(), True),      
    StructField("currency", StringType(), True)  
])


claims = (claims
          .withColumn("type_parsed", from_json(col("type"), type_schema))
          .withColumn("total_parsed", from_json(col("total"), total_schema))
          .withColumn("supportingInfo", explode(col("supportingInfo")))
          .withColumn("insurance", explode(col("insurance")))
          .withColumn("claim_item", explode(col("item")))
          .withColumn("diagnosis", explode(col("diagnosis")))
          )  

# Extract relevant fields from Claim
claim_df = claims.select(
    col("id").alias("claim_id"),
    col("status").alias("status"),
    col("type_parsed").getItem(0).getField("coding").getItem(0).getField("code").alias("type_code"),
    col("use").alias("use"),
    col("patient.reference").alias("patient_reference"),
    col("patient.display").alias("patient_display"),
    col("created").alias("created_date"),
    col("billablePeriod.start").alias("billable_period_start"),
    col("billablePeriod.end").alias("billable_period_end"),
    col("provider.reference").alias("provider_reference"),
    col("priority.coding").getItem(0).getField("code").alias("priority"),
    col("supportingInfo.category.coding").getItem(0).getField("code").alias("supporting_info_code"),
    col("supportingInfo.valueReference.reference").alias("supporting_info_value_reference"),
    col("insurance.coverage.display").alias("insurance_coverage_display"),
    col("insurance.focal").alias("insurance_focal"),
    col("total_parsed").getField("value").alias("total_amount"), 
    col("total_parsed").getField("currency").alias("currency"),
    col("diagnosis.diagnosisReference.reference").alias("diagnosis_reference"), 
    col("claim_item.productOrService.coding").getItem(0).getField("code").alias("item_code"),
    col("claim_item.productOrService.coding").getItem(0).getField("display").alias("item_description"),
    col("claim_item.category.coding").getItem(0).getField("display").alias("item_category"),
    col("claim_item.net.value").alias("item_net_value"),
    col("claim_item.net.currency").alias("item_net_currency"),
    col("claim_item.encounter").getItem(0).getField("reference").alias("encounter_reference"),
    col("claim_item.locationCodeableConcept.coding").getItem(0).getField("display").alias("location_description"),
    col("claim_item.servicedPeriod.start").alias("service_period_start"),
    col("claim_item.servicedPeriod.end").alias("service_period_end"),
    col("claim_item.adjudication").alias("adjudication")
)

claim_df.show(truncate=False)


+------------------------------------+------+-------------+-----+---------------------------------------------+------------------+-------------------------+-------------------------+-------------------------+-------------------------------------------------------------------------------------------------------+--------+--------------------+---------------------------------------------+--------------------------+---------------+------------+--------+---------------------------------------------+---------+--------------------------------------------------+-------------+--------------+-----------------+---------------------------------------------+--------------------+--------------------+------------------+------------+
|claim_id                            |status|type_code    |use  |patient_reference                            |patient_display   |created_date             |billable_period_start    |billable_period_end      |provider_reference                                              

In [12]:
adjudication_df = claim_df.withColumn("adjudication", explode(col("adjudication")))

#todo unable to read adjudication from the files
#todo uploading claims without adjudication details for now


In [13]:
# Save the DataFrame to Parquet
claim_df.write.mode("overwrite").parquet(f"{output_path}/claim")

print("Claims DataFrame has been successfully saved in the 'claim' folder.")

Claims DataFrame has been successfully saved in the 'claims' folder.


### Explanation of Benefit

In [14]:
# Filter for ExplanationOfBenefit resources
eobs = entries.filter(col("entry.resource.resourceType") == "ExplanationOfBenefit") \
              .select(col("entry.resource.*"))

eobs = (eobs.withColumn("type_parsed", from_json(col("type"), type_schema))
        .withColumn("total_parsed", from_json(col("total"), total_schema))
        .withColumn("contained", explode(col("contained")))
        .withColumn("careTeam", explode(col("careTeam")))
        .withColumn("insurance", explode(col("insurance")))
        .withColumn("item", explode(col("item")))
        )
# Extract relevant fields from ExplanationOfBenefit
eob_df = eobs.select(
    col("id").alias("eob_id"),
    col("identifier").getItem(0).getField("value").alias("identifier_claim_id"),  
    col("identifier").getItem(1).getField("value").alias("identifier_claim_group"),  
    col("status").alias("status"),
    col("type_parsed.coding").getItem(0).getField("code").alias("code"),
    col("use").alias("use"),
    col("patient.reference").alias("patient_reference"),
    col("billablePeriod.start").alias("billable_period_start"),
    col("billablePeriod.end").alias("billable_period_end"),
    col("insurer.display").alias("insurer_display"),
    col("provider.reference").alias("provider_reference"),
    col("referral.reference").alias("referral_reference"),
    col("claim.reference").alias("claim_reference"),
    col("outcome").alias("outcome"),
    col("careTeam.provider.reference").alias("care_team_provider_reference"),
    col("careTeam.role.coding").getItem(0).getField("display").alias("care_team_role"), 
    col("insurance.coverage.reference").alias("insurance_coverage_reference"),
    col("insurance.coverage.display").alias("insurance_coverage_display"),
    col("insurance.focal").alias("insurance_focal"),
    col("item.category.coding").getItem(0).getField("display").alias("item_category_display"),
    col("item.productOrService.coding").getItem(0).getField("code").alias("item_service_code"),
    col("item.productOrService.coding").getItem(0).getField("display").alias("item_service_description"),
    col("item.net.value").alias("item_net_value"),
    col("item.net.currency").alias("item_net_currency"),
    col("item.locationCodeableConcept.coding").getItem(0).getField("display").alias("location_description"),
    col("item.servicedPeriod.start").alias("service_period_start"),
    col("item.servicedPeriod.end").alias("service_period_end"),
    col("item.adjudication").alias("adjudication"),
    col("total_parsed.value").alias("total_amount"),
    col("total_parsed.currency").alias("total_currency"),
    col("payment.amount.value").alias("payment_amount"),
    col("payment.amount.currency").alias("payment_currency"),
    col("created").alias("created_date")  
)

# Show the resulting DataFrame
eob_df.show(truncate=False)

+------------------------------------+------------------------------------+----------------------+------+---------------+-----+---------------------------------------------+-------------------------+-------------------------+---------------+-----------------------------------------------------------------+------------------+---------------------------------------------+--------+-----------------------------------------------------------------+-------------------------+----------------------------+--------------------------+---------------+---------------------+-----------------+--------------------------------------------------+--------------+-----------------+------------------------------+-------------------------+-------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [15]:
#todo explode adjudication 



In [16]:
# Save the DataFrame to Parquet
eob_df.write.mode("overwrite").parquet(f"{output_path}/explanation_of_benefit")

print("Explanation of Benefit DataFrame has been successfully saved in the 'explanation_of_benefit' folder.")

Explanation of Benefit DataFrame has been successfully saved in the 'explanation_of_benefit' folder.


24/11/29 17:27:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
24/11/29 17:27:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
24/11/29 17:27:04 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


### Care Plan

In [17]:
# Filter for CarePlan resources
careplans = entries.filter(col("entry.resource.resourceType") == "CarePlan") \
                   .select(col("entry.resource.*"))

# Define schema for the 'category' field
category_schema = StructType([
        StructField("coding", ArrayType(
            StructType([
                StructField("system", StringType(), True),
                StructField("code", StringType(), True),
                StructField("display", StringType(), True),
            ])
        ), True),
        StructField("text", StringType(), True)
    ])


careplans = (careplans
             .withColumn("careTeam", explode(col("careTeam")))
             .withColumn("activity", explode(col("activity")))
             )

# Extract relevant fields from CarePlan
careplan_df = careplans.select(
    col("id").alias("careplan_id"),
    col("status").alias("status"),
    col("intent").alias("intent"),
    col("category").getItem(0).alias("category"),  
    col("subject.reference").alias("patient_reference"),
    col("encounter.reference").alias("encounter_reference"),
    col("period.start").alias("period_start"),
    col("period.end").alias("period_end"),
    col("created").alias("created_date"),
    col("careTeam.reference").alias("care_team_reference"), 
    col("activity.detail.code.coding").getItem(0).getField("display").alias("activity_display"),
    col("activity.detail.code.coding").getItem(0).getField("code").alias("activity_code"),
    col("activity.detail.status").alias("activity_status"),
    col("activity.detail.location.display").alias("activity_location"),
    col("addresses.reference").alias("addresses")
)

# Since category is an array of string, separately process the column
careplan_df = careplan_df.withColumn("category_parsed", from_json(col("category"), category_schema)).withColumn("category_text", col("category_parsed.text"))

# Dropping the redundant columns
careplan_df = careplan_df.select(
    [column for column in careplan_df.columns if column not in ["category", "category_parsed"]]
)

# Show the resulting DataFrame
careplan_df.show(truncate=False)


+------------------------------------+---------+------+---------------------------------------------+---------------------------------------------+-------------------------+-------------------------+------------+---------------------------------------------+--------------------------------------------------------+-------------+---------------+--------------------------------+-----------------------------------------------+-------------------------------------+
|careplan_id                         |status   |intent|patient_reference                            |encounter_reference                          |period_start             |period_end               |created_date|care_team_reference                          |activity_display                                        |activity_code|activity_status|activity_location               |addresses                                      |category_text                        |
+------------------------------------+---------+------+---------------

In [18]:
# Save the DataFrame to Parquet
careplan_df.write.mode("overwrite").parquet(f"{output_path}/careplan")

print(" CarePlan DataFrame has been successfully saved in the 'careplan' folder.")

 CarePlan DataFrame has been successfully saved in the 'careplan' folder.


### Care Team

In [19]:
# Filter for CareTeam resources
care_team_resources = entries.filter(col("entry.resource.resourceType") == "CareTeam") \
                             .select(col("entry.resource.*"))

# Explode the participants array if it exists
care_team_resources = (care_team_resources.withColumn("participant", explode(col("participant")))
                       .withColumn("managingOrganization", explode(col("managingOrganization"))))

# Extract relevant fields
care_team_df = care_team_resources.select(
    col("id").alias("care_team_id"),
    col("status").alias("status"),
    col("subject.reference").alias("subject_reference"),
    col("encounter.reference").alias("encounter_reference"),
    col("period.start").alias("period_start"),
    col("period.end").alias("period_end"),
    col("managingOrganization.display").alias("managing_organization"),
    col("participant.role.text").getItem(0).alias("participant_role"),
    col("participant.member.reference").alias("participant_reference"),
    col("participant.member.display").alias("participant_display"),
)

# Show the resulting DataFrame
care_team_df.show(truncate=False)


+------------------------------------+--------+---------------------------------------------+---------------------------------------------+-------------------------+-------------------------+--------------------------------+-------------------------------------+-------------------------------------------------------------------------------------------------------+--------------------------------+
|care_team_id                        |status  |subject_reference                            |encounter_reference                          |period_start             |period_end               |managing_organization           |participant_role                     |participant_reference                                                                                  |participant_display             |
+------------------------------------+--------+---------------------------------------------+---------------------------------------------+-------------------------+-------------------------+---------

In [20]:
# Save the DataFrame to Parquet
care_team_df.write.mode("overwrite").parquet(f"{output_path}/careteam")

print(" CareTeam DataFrame has been successfully saved in the 'careteam' folder.")

 CareTeam DataFrame has been successfully saved in the 'careteam' folder.


### Procedure

In [21]:
# Filter for Procedure resources
procedures = entries.filter(col("entry.resource.resourceType") == "Procedure") \
                    .select(col("entry.resource.*"))

procedures = procedures.withColumn("reasonReference", explode(col("reasonReference")))

# Extract relevant fields
procedure_df = procedures.select(
    col("id").alias("procedure_id"),
    col("status").alias("status"),
    col("code.coding").getItem(0).getField("display").alias("procedure_code_display"),
    col("code.coding").getItem(0).getField("code").alias("procedure_code"),
    col("subject.reference").alias("subject_reference"),
    col("performedPeriod.start").alias("performed_period_start"),
    col("performedPeriod.end").alias("performed_period_end"),
    col("encounter.reference").alias("encounter_reference"),
    col("reasonReference.reference").alias("reason_reference"),
    col("reasonReference.display").alias("reason_reference_display")
)

# Show the resulting DataFrame
procedure_df.show(truncate=False)

+------------------------------------+---------+-------------------------------------------------------------------------------+--------------+---------------------------------------------+-------------------------+-------------------------+---------------------------------------------+---------------------------------------------+-------------------------------------------+
|procedure_id                        |status   |procedure_code_display                                                         |procedure_code|subject_reference                            |performed_period_start   |performed_period_end     |encounter_reference                          |reason_reference                             |reason_reference_display                   |
+------------------------------------+---------+-------------------------------------------------------------------------------+--------------+---------------------------------------------+-------------------------+-------------------------+---

In [22]:
# Save the DataFrame to Parquet
procedure_df.write.mode("overwrite").parquet(f"{output_path}/procedure")

print("Procedure DataFrame has been successfully saved in the 'procedure' folder.")

Procedure DataFrame has been successfully saved in the 'procedures' folder.


### Diagnostic Report

In [23]:
# Filter for DiagnosticReport resources
diagnostic_reports = entries.filter(col("entry.resource.resourceType") == "DiagnosticReport") \
                            .select(col("entry.resource.*"))

# Explode results array if present
diagnostic_reports = diagnostic_reports.withColumn("result", explode(col("result")))

# Extract relevant fields
diagnostic_report_df = diagnostic_reports.select(
    col("id").alias("diagnostic_report_id"),
    col("status").alias("status"),
    col("category").getItem(0).alias("category"),
    col("code.coding").getItem(0).getField("display").alias("code"),
    col("code.coding").getItem(0).getField("code").alias("code_system"),
    col("subject.reference").alias("subject_reference"),
    col("encounter.reference").alias("encounter_reference"),
    col("effectiveDateTime").alias("effective_date_time"),
    col("issued").alias("issued_date"),
    col("result.display").alias("result_display")
)

diagnostic_report_df = diagnostic_report_df.withColumn("category_parsed", from_json(col("category"), category_schema)).withColumn("category_display", col("category_parsed.coding").getItem(0).getField("display"))

diagnostic_report_df = diagnostic_report_df.select(
    [column for column in diagnostic_report_df.columns if column not in ["category", "category_parsed"]]
)
# Show the resulting DataFrame
diagnostic_report_df.show(truncate=False)

+------------------------------------+------+----------------------------------------------------------------+-----------+---------------------------------------------+---------------------------------------------+-------------------------+-----------------------------+---------------------------------------------------------+----------------+
|diagnostic_report_id                |status|code                                                            |code_system|subject_reference                            |encounter_reference                          |effective_date_time      |issued_date                  |result_display                                           |category_display|
+------------------------------------+------+----------------------------------------------------------------+-----------+---------------------------------------------+---------------------------------------------+-------------------------+-----------------------------+--------------------------------------

In [24]:
# Save the Dataframe to Parquet
diagnostic_report_df.write.mode("overwrite").parquet(f"{output_path}/diagnostic_report")

print("Diagnostic report DataFrame has been successfully saved in the 'diagnostic_report' folder.")

Diagnostic report DataFrame has been successfully saved in the 'diagnostic_reports' folder.


### Immunization

In [25]:
# Filter for Immunization resources
immunizations = entries.filter(col("entry.resource.resourceType") == "Immunization") \
                       .select(col("entry.resource.*"))

# Extract relevant fields
immunization_df = immunizations.select(
    col("id").alias("immunization_id"),
    col("status").alias("status"),
    col("vaccineCode.coding").getItem(0).getField("display").alias("vaccine_display"),
    col("vaccineCode.coding").getItem(0).getField("code").alias("vaccine_code"),
    col("patient.reference").alias("patient_reference"),
    col("encounter.reference").alias("encounter_reference"),
    col("primarySource").alias("primary_source"),
    col("occurrenceDateTime").alias("occurrence_date_time"),
)

# Show the resulting DataFrame
immunization_df.show(truncate=False)

+------------------------------------+---------+--------------------------------------------------+------------+---------------------------------------------+---------------------------------------------+--------------+-------------------------+
|immunization_id                     |status   |vaccine_display                                   |vaccine_code|patient_reference                            |encounter_reference                          |primary_source|occurrence_date_time     |
+------------------------------------+---------+--------------------------------------------------+------------+---------------------------------------------+---------------------------------------------+--------------+-------------------------+
|765b57b1-2e25-3bb9-ebfb-5de7c26a1e44|completed|Influenza, seasonal, injectable, preservative free|140         |urn:uuid:7ca297d1-94e0-8069-4cdb-50f313de9a5f|urn:uuid:7dea3a29-6f9c-b011-2d6c-b5ff8ef276de|true          |2005-05-02T03:44:33-04:00|
|959f992c-d560-1

In [26]:
# Save the Dataframe to Parquet
immunization_df.write.mode("overwrite").parquet(f"{output_path}/immunization")

print("Immunization DataFrame has been successfully saved in the 'immunization' folder.")

Immunization DataFrame has been successfully saved in the 'immunizations' folder.


### Observation

In [27]:
# Filter for Observation resources
observations = entries.filter(col("entry.resource.resourceType") == "Observation") \
                      .select(col("entry.resource.*"))


# Extract relevant fields
observation_df = observations.select(
    col("id").alias("observation_id"),
    col("status").alias("status"),
    col("category").getItem(0).alias("category"),
    col("code.coding").getItem(0).getField("display").alias("observation_code_display"),
    col("code.coding").getItem(0).getField("code").alias("observation_code"),
    col("subject.reference").alias("subject_reference"),
    col("encounter.reference").alias("encounter_reference"),
    col("effectiveDateTime").alias("effective_date_time"),
    col("issued").alias("issued_date"),
    col("valueQuantity.value").alias("value_quantity_value"),
    col("valueQuantity.unit").alias("value_quantity_unit"),
    col("valueString").alias("value_string")
    
)

observation_df = observation_df.withColumn("category_parsed", from_json(col("category"), category_schema)).withColumn("category_display", col("category_parsed.coding").getItem(0).getField("display"))

observation_df = observation_df.select(
    [column for column in observation_df.columns if column not in ["category", "category_parsed"]]
)
# Show the resulting DataFrame
observation_df.show(truncate=False)


+------------------------------------+------+---------------------------------------------------------------------+----------------+---------------------------------------------+---------------------------------------------+-------------------------+-----------------------------+--------------------+-------------------+------------+----------------+
|observation_id                      |status|observation_code_display                                             |observation_code|subject_reference                            |encounter_reference                          |effective_date_time      |issued_date                  |value_quantity_value|value_quantity_unit|value_string|category_display|
+------------------------------------+------+---------------------------------------------------------------------+----------------+---------------------------------------------+---------------------------------------------+-------------------------+-----------------------------+----------------

In [28]:
# Save the Dataframe to Parquet
observation_df.write.mode("overwrite").parquet(f"{output_path}/observation")

print("Observation DataFrame has been successfully saved in the 'observation' folder.")

Observation DataFrame has been successfully saved in the 'observations' folder.
