In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, min, datediff, count

In [0]:
patients = spark.read.format("delta").load("/Volumes/workspace/synthea/synthea_datasets/patients_silver_delta")
encounters = spark.read.format("delta").load("/Volumes/workspace/synthea/synthea_datasets/encounters_silver_delta")
medications = spark.read.format("delta").load("/Volumes/workspace/synthea/synthea_datasets/medications_silver_delta")
conditions = spark.read.format("delta").load("/Volumes/workspace/synthea/synthea_datasets/conditions_silver_delta")

In [0]:
from pyspark.sql.functions import col

# Rename overlapping columns using prefixes
patients_prefixed = patients.select(
    [col(c).alias(f"patient_{c}") if c != "patient_id" else col(c) for c in patients.columns]
)

encounters_prefixed = encounters.select(
    [col(c).alias(f"encounter_{c}") if c != "patient_id" else col(c) for c in encounters.columns]
)

conditions_prefixed = conditions.select(
    [col(c).alias(f"condition_{c}") if c != "patient_id" else col(c) for c in conditions.columns]
)

medications_prefixed = medications.select(
    [col(c).alias(f"medication_{c}") if c != "patient_id" else col(c) for c in medications.columns]
)

In [0]:
display(patients_prefixed)

In [0]:
display(encounters_prefixed)

In [0]:
journey = (
    encounters_prefixed.join(patients_prefixed, patients_prefixed.patient_id == encounters_prefixed.encounter_patient, "inner")
              .join(conditions_prefixed, conditions_prefixed.condition_patient == patients_prefixed.patient_id, "left")
              .join(medications_prefixed, medications_prefixed.medication_patient == patients_prefixed.patient_id, "left")
)
display(journey)

In [0]:
from pyspark.sql.functions import col, min, count, datediff

journey_features = (
    journey.groupBy("patient_id")
    .agg(
        min("condition_start").alias("first_diagnosis_date"),
        min("medication_start").alias("first_medication_date"),
        count("encounter_id").alias("total_encounters")
    )
    .withColumn(
        "time_to_medication_start",
        datediff(col("first_medication_date"), col("first_diagnosis_date"))
    )
)

display(journey_features)