In [0]:
from pyspark.sql.functions import col, max

vitals = spark.readStream.table(
    "healthcare_analytics.silver_vitals_stream"
)

In [0]:
latest_vitals = (
    vitals
    .withWatermark("event_time", "10 minutes")
    .dropDuplicates(["patient_id"])
)

In [0]:
latest_event = (
    vitals_wm
    .groupBy("patient_id")
    .agg(
        spark_max("event_time").alias("latest_event_time")
    )
)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS healthcare_analytics.gold_patient_latest_vitals (
  patient_id INT,
  heart_rate INT,
  systolic_bp INT,
  oxygen_level INT,
  event_time TIMESTAMP,
  ingest_ts TIMESTAMP
)
USING DELTA;

In [0]:
def upsert_latest_vitals(batch_df, batch_id):
    batch_df.createOrReplaceTempView("latest_vitals_batch")

    batch_df.sparkSession.sql("""
        MERGE INTO healthcare_analytics.gold_patient_latest_vitals tgt
        USING latest_vitals_batch src
        ON tgt.patient_id = src.patient_id
        WHEN MATCHED AND src.event_time > tgt.event_time THEN
          UPDATE SET *
        WHEN NOT MATCHED THEN
          INSERT *
    """)

(
    latest_vitals.writeStream
    .foreachBatch(upsert_latest_vitals)
    .option(
        "checkpointLocation",
        "/Volumes/workspace/healthcare_analytics/dataextract/checkpoints/gold_latest_vitals"
    )
    .trigger(availableNow=True)
    .start()
)

<pyspark.sql.connect.streaming.query.StreamingQuery at 0xffe5a44c7200>

In [0]:
spark.table(
    "healthcare_analytics.gold_patient_latest_vitals"
).display()

patient_id,heart_rate,systolic_bp,oxygen_level,event_time,ingest_ts,source_system,heart_rate_status,oxygen_status
496,91,90,90,2026-01-18T07:03:45.353Z,2026-01-18T18:15:10.597Z,simulated_vitals,NORMAL,LOW
148,94,102,100,2026-01-18T07:03:25.729Z,2026-01-18T18:15:10.597Z,simulated_vitals,NORMAL,NORMAL
463,96,106,87,2026-01-18T07:04:51.608Z,2026-01-18T18:15:10.597Z,simulated_vitals,NORMAL,LOW
392,82,104,93,2026-01-18T07:04:05.804Z,2026-01-18T18:15:10.597Z,simulated_vitals,NORMAL,NORMAL
243,113,103,90,2026-01-18T07:03:55.631Z,2026-01-18T18:15:10.597Z,simulated_vitals,HIGH,LOW
31,124,102,98,2026-01-18T07:04:21.081Z,2026-01-18T18:15:10.597Z,simulated_vitals,HIGH,NORMAL
451,65,104,95,2026-01-18T07:04:51.608Z,2026-01-18T18:15:10.597Z,simulated_vitals,NORMAL,NORMAL
251,95,172,99,2026-01-18T07:04:46.528Z,2026-01-18T18:15:10.597Z,simulated_vitals,NORMAL,NORMAL
458,126,126,93,2026-01-18T07:03:45.353Z,2026-01-18T18:15:10.597Z,simulated_vitals,HIGH,NORMAL
65,106,169,91,2026-01-18T07:04:56.701Z,2026-01-18T18:15:10.597Z,simulated_vitals,HIGH,LOW
