In [1]:
# Initialize Spark session
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("Synthea ETL")
    .getOrCreate()
)

# Show Spark session info to confirm
spark


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/10 16:02:44 WARN Utils: Your hostname, Armandas-MacBookAir.local, resolves to a loopback address: 127.0.0.1; using 192.168.178.46 instead (on interface en0)
25/09/10 16:02:44 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/10 16:02:45 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Check the actual schema, column names and sample data (to understand nested structure)
df = (spark.read
    .option("multiLine", "true")
    .option("mode", "PERMISSIVE")
    .json("../data/fhir"))

print("Actual schema:")
df.printSchema()

print("\nColumn names:")
print(df.columns)

print("\nSample data:")
df.show(5)

                                                                                

Actual schema:
root
 |-- entry: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- fullUrl: string (nullable = true)
 |    |    |-- request: struct (nullable = true)
 |    |    |    |-- ifNoneExist: string (nullable = true)
 |    |    |    |-- method: string (nullable = true)
 |    |    |    |-- url: string (nullable = true)
 |    |    |-- resource: struct (nullable = true)
 |    |    |    |-- abatementDateTime: string (nullable = true)
 |    |    |    |-- active: boolean (nullable = true)
 |    |    |    |-- activity: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- detail: struct (nullable = true)
 |    |    |    |    |    |    |-- code: struct (nullable = true)
 |    |    |    |    |    |    |    |-- coding: array (nullable = true)
 |    |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |    |    |    |-- code: string (null

                                                                                

+--------------------+------------+-----------+
|               entry|resourceType|       type|
+--------------------+------------+-----------+
|[{urn:uuid:e23c7a...|      Bundle|transaction|
|[{urn:uuid:8c6ae4...|      Bundle|transaction|
|[{urn:uuid:55c5b8...|      Bundle|transaction|
|[{urn:uuid:10a170...|      Bundle|transaction|
|[{urn:uuid:96df42...|      Bundle|transaction|
+--------------------+------------+-----------+
only showing top 5 rows


In [3]:
# Get the first entry
first_entry = df.select("entry").first().entry[0]

# Access the resource field
resource_row = first_entry.resource

# Convert Row to dict to inspect keys to see all fields in a Python-friendly way
resource_dict = resource_row.asDict() if resource_row else None

# Print keys safely
if resource_dict:
    print("Keys inside resource:", resource_dict.keys())
else:
    print("No resource found in this entry")


                                                                                

Keys inside resource: dict_keys(['abatementDateTime', 'active', 'activity', 'address', 'addresses', 'agent', 'author', 'authoredOn', 'billablePeriod', 'birthDate', 'careTeam', 'category', 'claim', 'class', 'clinicalStatus', 'code', 'communication', 'component', 'contained', 'content', 'context', 'created', 'criticality', 'custodian', 'date', 'deceasedDateTime', 'deviceName', 'diagnosis', 'distinctIdentifier', 'dosage', 'dosageInstruction', 'effectiveDateTime', 'encounter', 'expirationDate', 'extension', 'facility', 'gender', 'hospitalization', 'id', 'identifier', 'insurance', 'insurer', 'intent', 'issued', 'item', 'location', 'lotNumber', 'managingOrganization', 'manufactureDate', 'maritalStatus', 'medicationCodeableConcept', 'medicationReference', 'meta', 'multipleBirthBoolean', 'multipleBirthInteger', 'name', 'numberOfInstances', 'numberOfSeries', 'occurrenceDateTime', 'onsetDateTime', 'organization', 'outcome', 'participant', 'patient', 'payment', 'performedPeriod', 'performer', 'pe

In [4]:
# Look at the 'address' field inside resource
if resource_dict and "address" in resource_dict:
    print("\nAddress field:", resource_dict["address"])



Address field: [ {
        "extension": [ {
          "url": "http://hl7.org/fhir/StructureDefinition/geolocation",
          "extension": [ {
            "url": "latitude",
            "valueDecimal": 42.32562987012356
          }, {
            "url": "longitude",
            "valueDecimal": -71.12685124765576
          } ]
        } ],
        "line": [ "754 Effertz Gardens Suite 78" ],
        "city": "Boston",
        "state": "MA",
        "postalCode": "02116",
        "country": "US"
      } ]


In [5]:
from pyspark.sql.functions import explode, col, from_json
from pyspark.sql.types import ArrayType, StructType, StructField, StringType

# Explode top-level entry array
df_entries = df.select(explode("entry").alias("entry"))

# Keep only Patient resources
df_patients = df_entries.filter(col("entry.resource.resourceType") == "Patient")

# Define schema for the address field (which is an array of structs)
# We cast to string so from_json can parse it properly
address_schema = ArrayType(
    StructType([
        StructField("city", StringType(), True),
        StructField("state", StringType(), True),
        StructField("postalCode", StringType(), True)
    ])
)

# Parse address JSON string
df_patients = df_patients.withColumn(
    "address_parsed",
    from_json(col("entry.resource.address").cast("string"), address_schema)
)

# Explode address array
df_address = df_patients.select(
    col("entry.resource.id").alias("patient_id"),
    col("entry.resource.gender").alias("gender"),
    explode("address_parsed").alias("address")
)

# Explode telecom array (no parsing needed because it's already a proper array of structs)
df_telecom = df_patients.select(
    col("entry.resource.id").alias("patient_id"),
    explode("entry.resource.telecom").alias("telecom")
)

# Join on patient_id
df_final = df_address.join(df_telecom, on="patient_id", how="left") \
    .select(
        col("patient_id"),
        col("gender"),
        col("address.city").alias("city"),
        col("address.state").alias("state"),
        col("address.postalCode").alias("postal_code"),
        col("telecom.value").alias("telephone_number")
    )

df_final.show(truncate=False)




+------------------------------------+------+--------------------+-----+-----------+----------------+
|patient_id                          |gender|city                |state|postal_code|telephone_number|
+------------------------------------+------+--------------------+-----+-----------+----------------+
|e23c7a6e-6ef2-3932-1361-b73b4c8a3961|male  |Boston              |MA   |02116      |555-370-6012    |
|8c6ae452-5f8c-9ff6-006d-c6c860acf5cd|female|Quincy              |MA   |02171      |555-201-6155    |
|55c5b8d3-99d0-58ad-8444-e42bb81bd5c7|male  |Fall River          |MA   |02721      |555-205-9683    |
|10a1709a-5dde-1e3a-4c06-d8de03764712|male  |Yarmouth            |MA   |NULL       |555-993-6270    |
|96df4283-1388-506c-e88f-77cd4232e1b4|male  |Pembroke            |MA   |NULL       |555-968-4365    |
|03e502b6-b810-06c1-7d65-83db077ed3ee|female|Holyoke             |MA   |01040      |555-814-6694    |
|3dfce025-b696-711a-8c2d-6903736e2882|female|Scituate            |MA   |NULL      

                                                                                

In [6]:
# Filter Encounter resources
df_encounters = df_entries.filter(col("entry.resource.resourceType") == "Encounter")

# Extract patient_id from subject.reference (e.g., "urn:uuid:{id}")
df_encounters = df_encounters.withColumn(
    "patient_id",
    col("entry.resource.subject.reference").substr(10, 36)  # remove 'urn:uuid:' prefix
)

# Count encounters per patient
df_encounters_count = df_encounters.groupBy("patient_id").count().withColumnRenamed("count", "num_encounters")

# Join with patient info
df_patients_with_encounters = df_final.join(df_encounters_count, on="patient_id", how="left")

# Show final table
df_patients_with_encounters.show(truncate=False)


25/09/10 16:03:28 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+------------------------------------+------+--------------------+-----+-----------+----------------+--------------+
|patient_id                          |gender|city                |state|postal_code|telephone_number|num_encounters|
+------------------------------------+------+--------------------+-----+-----------+----------------+--------------+
|e23c7a6e-6ef2-3932-1361-b73b4c8a3961|male  |Boston              |MA   |02116      |555-370-6012    |705           |
|8c6ae452-5f8c-9ff6-006d-c6c860acf5cd|female|Quincy              |MA   |02171      |555-201-6155    |638           |
|55c5b8d3-99d0-58ad-8444-e42bb81bd5c7|male  |Fall River          |MA   |02721      |555-205-9683    |412           |
|10a1709a-5dde-1e3a-4c06-d8de03764712|male  |Yarmouth            |MA   |NULL       |555-993-6270    |80            |
|96df4283-1388-506c-e88f-77cd4232e1b4|male  |Pembroke            |MA   |NULL       |555-968-4365    |69            |
|03e502b6-b810-06c1-7d65-83db077ed3ee|female|Holyoke            

                                                                                

In [None]:
from pyspark.sql.functions import when

# Replace NULL postal codes with 'Unknown' for consistency and data quality
df_patients_with_encounters = df_patients_with_encounters.withColumn(
    "postal_code",
    when(col("postal_code").isNull(), "Unknown").otherwise(col("postal_code"))
)

# Quick check: show updated table
df_patients_with_encounters.show(truncate=False)



+------------------------------------+------+--------------------+-----+-----------+----------------+--------------+
|patient_id                          |gender|city                |state|postal_code|telephone_number|num_encounters|
+------------------------------------+------+--------------------+-----+-----------+----------------+--------------+
|e23c7a6e-6ef2-3932-1361-b73b4c8a3961|male  |Boston              |MA   |02116      |555-370-6012    |705           |
|8c6ae452-5f8c-9ff6-006d-c6c860acf5cd|female|Quincy              |MA   |02171      |555-201-6155    |638           |
|55c5b8d3-99d0-58ad-8444-e42bb81bd5c7|male  |Fall River          |MA   |02721      |555-205-9683    |412           |
|10a1709a-5dde-1e3a-4c06-d8de03764712|male  |Yarmouth            |MA   |Unknown    |555-993-6270    |80            |
|96df4283-1388-506c-e88f-77cd4232e1b4|male  |Pembroke            |MA   |Unknown    |555-968-4365    |69            |
|03e502b6-b810-06c1-7d65-83db077ed3ee|female|Holyoke            

                                                                                

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, desc

# Define a window partitioned by city, ordered by descending number of encounters
window_city = Window.partitionBy("city").orderBy(desc("num_encounters"))

# Add a column with rank of each patient within their city based on encounters
df_ranked = df_patients_with_encounters.withColumn(
    "rank_in_city",
    row_number().over(window_city)
)

# Show top 10 patients with rank info
df_ranked.show(10)




+--------------------+------+--------+-----+-----------+----------------+--------------+------------+
|          patient_id|gender|    city|state|postal_code|telephone_number|num_encounters|rank_in_city|
+--------------------+------+--------+-----+-----------+----------------+--------------+------------+
|961f61f8-ed32-f11...|female|Abington|   MA|      02351|    555-249-7941|            26|           1|
|eebbb6c9-87b3-997...|  male|   Acton|   MA|    Unknown|    555-997-8082|           103|           1|
|988ba5c5-bb2c-145...|female|Acushnet|   MA|    Unknown|    555-776-1922|            52|           1|
|165a4435-31ff-049...|female|  Agawam|   MA|    Unknown|    555-989-2984|            60|           1|
|c1c5bcd1-d232-8de...|female|  Agawam|   MA|    Unknown|    555-356-8824|            51|           2|
|436a7fef-6640-96a...|female|  Agawam|   MA|      01001|    555-551-1160|            30|           3|
|e000e5c8-30f7-7a5...|  male| Amherst|   MA|    Unknown|    555-691-6530|         

                                                                                

In [15]:
# Quality check
# Check NO NULL patient_id
null_patients = df_ranked.filter(col("patient_id").isNull()).count()
if null_patients > 0:
    print(f"❌ Found {null_patients} NULL patient_id values!")
else:
    print("✅ No NULL patient_id")




✅ No NULL patient_id


                                                                                

In [16]:
# Check encounters count is non-negative
negative_encounters = df_ranked.filter(col("num_encounters") < 0).count()
if negative_encounters > 0:
    print(f"❌ Found {negative_encounters} negative encounter counts!")
else:
    print("✅ All encounter counts are non-negative")




✅ All encounter counts are non-negative


                                                                                

In [17]:
from pyspark.sql.functions import avg

# Average encounters per patient
avg_encounters = df_ranked.agg(avg("num_encounters")).first()[0]
if avg_encounters is not None:
    print(f"✅ Average encounters per patient: {avg_encounters:.2f}")
else:
    print("❌ Could not compute average encounters")




✅ Average encounters per patient: 47.38


                                                                                

In [18]:
# Percent of patients with multiple encounters
total_patients = df_ranked.count()
multi_patients = df_ranked.filter(col("num_encounters") > 1).count()
if total_patients > 0:
    percent_multi = (multi_patients / total_patients) * 100
    print(f"✅ Patients with multiple encounters: {percent_multi:.1f}%")
else:
    print("❌ No patients found, cannot compute %")




✅ Patients with multiple encounters: 100.0%


                                                                                

In [19]:
# Top 5 cities by total encounters
print("✅ Top 5 cities by total encounters:")
df_ranked.groupBy("city") \
    .sum("num_encounters") \
    .withColumnRenamed("sum(num_encounters)", "total_encounters") \
    .orderBy(col("total_encounters").desc()) \
    .show(5, truncate=False)

✅ Top 5 cities by total encounters:




+----------+----------------+
|city      |total_encounters|
+----------+----------------+
|Boston    |3105            |
|Quincy    |1542            |
|Lowell    |529             |
|Fall River|505             |
|Holyoke   |462             |
+----------+----------------+
only showing top 5 rows


                                                                                

In [None]:
# Save final clean table to Parquet
df_ranked \
    .write \
    .mode("overwrite") \
    .partitionBy("state") \
    .parquet("../data/clean/patients_with_encounters")


# Quick check: read back the Parquet
df_check = spark.read.parquet("../data/clean/patients_with_encounters")
df_check.show(5, truncate=False)




+------------------------------------+------+--------+-----------+----------------+--------------+------------+-----+
|patient_id                          |gender|city    |postal_code|telephone_number|num_encounters|rank_in_city|state|
+------------------------------------+------+--------+-----------+----------------+--------------+------------+-----+
|961f61f8-ed32-f113-8450-192064b49aa9|female|Abington|02351      |555-249-7941    |26            |1           |MA   |
|eebbb6c9-87b3-9978-5eab-84edacbccc7c|male  |Acton   |Unknown    |555-997-8082    |103           |1           |MA   |
|988ba5c5-bb2c-1453-cfe7-16ea41c47b42|female|Acushnet|Unknown    |555-776-1922    |52            |1           |MA   |
|165a4435-31ff-0493-eb3f-3a86106f3a59|female|Agawam  |Unknown    |555-989-2984    |60            |1           |MA   |
|c1c5bcd1-d232-8de3-7e0f-3ef13280455c|female|Agawam  |Unknown    |555-356-8824    |51            |2           |MA   |
+------------------------------------+------+--------+--

                                                                                