In [26]:
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession

# Create Spark session with Delta Lake support
builder = (
    SparkSession.builder
    .appName("Synthea Delta Pipeline")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

# Wrap builder to include delta-spark pip package
spark = configure_spark_with_delta_pip(builder).getOrCreate()


In [27]:
# Load CSV Files into PySpark DataFrames
# File paths
patients_csv = "../data/raw/patients.csv"
encounters_csv = "../data/raw/encounters.csv"
observations_csv = "../data/raw/observations.csv"
medications_csv = "../data/raw/medications.csv"
conditions_csv = "../data/raw/conditions.csv"

# Load CSVs
patients_df = spark.read.option("header", True).csv(patients_csv)
encounters_df = spark.read.option("header", True).csv(encounters_csv)
observations_df = spark.read.option("header", True).csv(observations_csv)
medications_df = spark.read.option("header", True).csv(medications_csv)
conditions_df = spark.read.option("header", True).csv(conditions_csv)

# Quick check what data we have
patients_df.show(5)
encounters_df.show(5)
observations_df.show(5)
medications_df.show(5)
conditions_df.show(5)


+--------------------+----------+---------+-----------+---------+----------+------+----------+--------------+------+------+-------+-----+-----------+------+--------------------+--------------------+-----------+-------------+---------------+-----+------------------+------------------+-------------------+-------------------+
|                  Id| BIRTHDATE|DEATHDATE|        SSN|  DRIVERS|  PASSPORT|PREFIX|     FIRST|          LAST|SUFFIX|MAIDEN|MARITAL| RACE|  ETHNICITY|GENDER|          BIRTHPLACE|             ADDRESS|       CITY|        STATE|         COUNTY|  ZIP|               LAT|               LON|HEALTHCARE_EXPENSES|HEALTHCARE_COVERAGE|
+--------------------+----------+---------+-----------+---------+----------+------+----------+--------------+------+------+-------+-----+-----------+------+--------------------+--------------------+-----------+-------------+---------------+-----+------------------+------------------+-------------------+-------------------+
|b9c610cd-28a6-463...|201

In [28]:
# Show schema of patients_df
patients_df.printSchema()


root
 |-- Id: string (nullable = true)
 |-- BIRTHDATE: string (nullable = true)
 |-- DEATHDATE: string (nullable = true)
 |-- SSN: string (nullable = true)
 |-- DRIVERS: string (nullable = true)
 |-- PASSPORT: string (nullable = true)
 |-- PREFIX: string (nullable = true)
 |-- FIRST: string (nullable = true)
 |-- LAST: string (nullable = true)
 |-- SUFFIX: string (nullable = true)
 |-- MAIDEN: string (nullable = true)
 |-- MARITAL: string (nullable = true)
 |-- RACE: string (nullable = true)
 |-- ETHNICITY: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- BIRTHPLACE: string (nullable = true)
 |-- ADDRESS: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTY: string (nullable = true)
 |-- ZIP: string (nullable = true)
 |-- LAT: string (nullable = true)
 |-- LON: string (nullable = true)
 |-- HEALTHCARE_EXPENSES: string (nullable = true)
 |-- HEALTHCARE_COVERAGE: string (nullable = true)



In [29]:
# Count rows
print("#️⃣ Number of patients:", patients_df.count())


#️⃣ Number of patients: 1163


In [30]:
# Check missing values
from pyspark.sql import functions as F

# Check missing values per column
patients_df.select([F.sum(F.col(c).isNull().cast("int")).alias(c) for c in patients_df.columns]).show()

# View unique values in a column
patients_df.select("gender").distinct().show()


+---+---------+---------+---+-------+--------+------+-----+----+------+------+-------+----+---------+------+----------+-------+----+-----+------+---+---+---+-------------------+-------------------+
| Id|BIRTHDATE|DEATHDATE|SSN|DRIVERS|PASSPORT|PREFIX|FIRST|LAST|SUFFIX|MAIDEN|MARITAL|RACE|ETHNICITY|GENDER|BIRTHPLACE|ADDRESS|CITY|STATE|COUNTY|ZIP|LAT|LON|HEALTHCARE_EXPENSES|HEALTHCARE_COVERAGE|
+---+---------+---------+---+-------+--------+------+-----+----+------+------+-------+----+---------+------+----------+-------+----+-----+------+---+---+---+-------------------+-------------------+
|  0|        0|     1000|  0|    215|     276|   245|    0|   0|  1147|   832|    384|   0|        0|     0|         0|      0|   0|    0|     0|545|  0|  0|                  0|                  0|
+---+---------+---------+---+-------+--------+------+-----+----+------+------+-------+----+---------+------+----------+-------+----+-----+------+---+---+---+-------------------+-------------------+

+------+


In [31]:
# Convert all column names to lowercase
patients_df = patients_df.toDF(*[c.lower() for c in patients_df.columns])
encounters_df = encounters_df.toDF(*[c.lower() for c in encounters_df.columns])
observations_df = observations_df.toDF(*[c.lower() for c in observations_df.columns])
medications_df = medications_df.toDF(*[c.lower() for c in medications_df.columns])
conditions_df = conditions_df.toDF(*[c.lower() for c in conditions_df.columns])


In [32]:
# Prepare patient-level info
patients_clean = (
    patients_df
    .select(
        "id",
        "birthdate",
        "deathdate",
        "gender",
        "race",
        "ethnicity",
        "marital",
        "state",
        "city"
    )
    # Convert birthdate and deathdate to proper date type
    .withColumn("birthdate", F.to_date("birthdate"))
    .withColumn("deathdate", F.to_date("deathdate"))
    # Fill missing categorical values
    .fillna({"gender": "unknown",
             "race": "unknown",
             "ethnicity": "unknown",
             "marital": "unknown"})
)

patients_clean.show(5)

+--------------------+----------+---------+------+-----+-----------+-------+-------------+-----------+
|                  id| birthdate|deathdate|gender| race|  ethnicity|marital|        state|       city|
+--------------------+----------+---------+------+-----+-----------+-------+-------------+-----------+
|b9c610cd-28a6-463...|2019-02-17|     NULL|     M|white|nonhispanic|unknown|Massachusetts|Springfield|
|c1f1fcaa-82fd-d5b...|2005-07-04|     NULL|     F|white|nonhispanic|unknown|Massachusetts| Bellingham|
|339144f8-50e1-633...|1998-05-11|     NULL|     M|white|nonhispanic|unknown|Massachusetts|     Boston|
|d488232e-bf14-4be...|2003-01-28|     NULL|     F|white|nonhispanic|unknown|Massachusetts|    Hingham|
|217f95a3-4e10-bd5...|1993-12-23|     NULL|     M|black|nonhispanic|      M|Massachusetts|     Revere|
+--------------------+----------+---------+------+-----+-----------+-------+-------------+-----------+
only showing top 5 rows


In [33]:
# Aggregate encounters per patient and convert full timestamp to YYYY-MM-DD
encounters_agg = (
    encounters_df
    .withColumn("start_date", F.to_date("start"))
    .groupBy("patient")
    .agg(
        F.count("*").alias("num_encounters"),
        F.min("start_date").alias("first_encounter"),
        F.max("start_date").alias("last_encounter")
    )
)

encounters_agg.show(5)

+--------------------+--------------+---------------+--------------+
|             patient|num_encounters|first_encounter|last_encounter|
+--------------------+--------------+---------------+--------------+
|34b1b3b3-73ce-1bc...|            66|     1965-11-06|    2021-09-18|
|179d9711-5c0f-958...|            22|     1981-12-10|    1991-11-21|
|b5e092a6-8d10-a0b...|            22|     2012-01-08|    2021-10-10|
|60e27639-436c-6c0...|            36|     1977-11-20|    2021-10-17|
|303c8bd7-a047-5e7...|            27|     2012-06-30|    2021-08-21|
+--------------------+--------------+---------------+--------------+
only showing top 5 rows


In [34]:
# Keep only numeric pain severity observations
observations_pain = observations_df.filter(
    (F.col("type") == "numeric") & (F.col("description").like("%Pain severity%"))
)

# Aggregate per patient
pain_summary = (
    observations_pain.groupBy("patient")
    .agg(
        F.min(F.col("value").cast("double")).alias("pain_min"),
        F.max(F.col("value").cast("double")).alias("pain_max"),
        F.avg(F.col("value").cast("double")).alias("pain_avg"),
        F.count("*").alias("pain_count")
    )
)

pain_summary.show(5)


+--------------------+--------+--------+------------------+----------+
|             patient|pain_min|pain_max|          pain_avg|pain_count|
+--------------------+--------+--------+------------------+----------+
|34b1b3b3-73ce-1bc...|     1.0|     3.0|               2.3|        10|
|303c8bd7-a047-5e7...|     0.0|     4.0|               2.0|         8|
|1d1af1df-c916-953...|     0.0|     4.0|               2.6|        10|
|403f6042-df1e-7ef...|     0.0|     3.0|1.2727272727272727|        11|
|e8ffd460-0685-796...|     1.0|     3.0|             1.625|         8|
+--------------------+--------+--------+------------------+----------+
only showing top 5 rows


In [35]:
# Aggregate medications per patient
medications_agg = (
    medications_df.groupBy("patient")
    .agg(
        F.count("*").alias("num_medications"),
        F.sum(F.col("totalcost").cast("double")).alias("total_med_cost")
    )
)

medications_agg.show(5)


+--------------------+---------------+------------------+
|             patient|num_medications|    total_med_cost|
+--------------------+---------------+------------------+
|34b1b3b3-73ce-1bc...|             60|52761.789999999986|
|179d9711-5c0f-958...|              5|           4863.55|
|ad134528-56a5-35f...|            191| 509172.7399999998|
|a57215c7-228c-a1f...|             80| 383205.5199999999|
|b5e092a6-8d10-a0b...|              4|             62.41|
+--------------------+---------------+------------------+
only showing top 5 rows


In [36]:
# Count conditions per patient
conditions_agg = (
    conditions_df.groupBy("patient")
    .agg(
        F.count("*").alias("num_conditions")
    )
)

conditions_agg.show(5)

+--------------------+--------------+
|             patient|num_conditions|
+--------------------+--------------+
|964576b0-7527-b34...|             8|
|34b1b3b3-73ce-1bc...|            34|
|179d9711-5c0f-958...|            10|
|eb16b994-f2ca-bee...|             4|
|ad134528-56a5-35f...|            63|
+--------------------+--------------+
only showing top 5 rows


In [37]:
# Join patients_clean, encounters_agg, pain_summary, medications_agg and conditions_agg into one patient-level table

# Start from patients_clean
patients_main = patients_clean

# Join encounters_agg
patients_main = patients_main.join(
    encounters_agg,
    patients_main.id == encounters_agg.patient,
    how="left"
).drop(encounters_agg.patient)

# Join pain_summary
patients_main = patients_main.join(
    pain_summary,
    patients_main.id == pain_summary.patient,
    how="left"
).drop(pain_summary.patient)

# Join medications_agg
patients_main = patients_main.join(
    medications_agg,
    patients_main.id == medications_agg.patient,
    how="left"
).drop(medications_agg.patient)

# Join conditions_agg
patients_main = patients_main.join(
    conditions_agg,
    patients_main.id == conditions_agg.patient,
    how="left"
).drop(conditions_agg.patient)

patients_main.show(5)

+--------------------+----------+---------+------+-----+-----------+-------+-------------+-----------+--------------+---------------+--------------+--------+--------+------------------+----------+---------------+------------------+--------------+
|                  id| birthdate|deathdate|gender| race|  ethnicity|marital|        state|       city|num_encounters|first_encounter|last_encounter|pain_min|pain_max|          pain_avg|pain_count|num_medications|    total_med_cost|num_conditions|
+--------------------+----------+---------+------+-----+-----------+-------+-------------+-----------+--------------+---------------+--------------+--------+--------+------------------+----------+---------------+------------------+--------------+
|d488232e-bf14-4be...|2003-01-28|     NULL|     F|white|nonhispanic|unknown|Massachusetts|    Hingham|            31|     2011-06-01|    2021-09-27|     1.0|     4.0|2.6666666666666665|        12|              2|19.799999999999997|            10|
|339144f8-50

In [38]:
#  Add calculated columns like age as of today and if a person is deceased or not
patients_main = (
    patients_main
    .withColumn(
        # Calculate age as integer (difference between today and birthdate in years)
        "age", F.floor(F.datediff(F.current_date(), F.col("birthdate")) / 365.25)
    )
    .withColumn(
        # Flag if patient is deceased
        "is_deceased", F.when(F.col("deathdate").isNotNull(), 1).otherwise(0)
    )
    .fillna({
        # Fill missing numeric columns with 0
        "pain_min": 0,
        "pain_max": 0,
        "pain_avg": 0,
        "pain_count": 0,
        "num_medications": 0,
        "total_med_cost": 0,
        "num_conditions": 0
    })
    # Round pain_avg and total_med_cost columns to 2 decimals
    .withColumn("pain_avg", F.round(F.col("pain_avg"), 2))
    .withColumn("total_med_cost", F.round(F.col("total_med_cost"), 2))
)

patients_main.show(10)


+--------------------+----------+---------+------+-----+-----------+-------+-------------+---------------+--------------+---------------+--------------+--------+--------+--------+----------+---------------+--------------+--------------+---+-----------+
|                  id| birthdate|deathdate|gender| race|  ethnicity|marital|        state|           city|num_encounters|first_encounter|last_encounter|pain_min|pain_max|pain_avg|pain_count|num_medications|total_med_cost|num_conditions|age|is_deceased|
+--------------------+----------+---------+------+-----+-----------+-------+-------------+---------------+--------------+---------------+--------------+--------+--------+--------+----------+---------------+--------------+--------------+---+-----------+
|b9c610cd-28a6-463...|2019-02-17|     NULL|     M|white|nonhispanic|unknown|Massachusetts|    Springfield|            12|     2019-02-17|    2021-07-25|     0.0|     4.0|     1.8|        10|              2|       5313.63|             0|  6| 

In [39]:
# Save the patient-level DataFrame as a Delta table (overwrite if it already exists)
patients_main.write.format("delta").mode("overwrite").save("../data/clean/patients_main")