In [0]:
import requests
import json

from databricks.sdk import WorkspaceClient

## Creating a directory under a Unity Catalog Volume

In [0]:
w = WorkspaceClient()
# Define volume, folder, and file details.
catalog = 'de_with_databricks'
schema = 'bronze'
volume = 'ingest'

# /Volumes/de_with_databricks/bronze/ingest
volume_path = f"/Volumes/{catalog}/{schema}/{volume}"
volume_folder = 'synthea_50'
volume_folder_path = f"{volume_path}/{volume_folder}"
patient_file = 'Patient.ndjson'
patient_file_path = f"{volume_folder_path}/{patient_file}"

# Create a new directory within the volume.
w.files.create_directory(volume_folder_path)

## Retrieving a FHIR Patient resource JSON file

In [0]:
# Retrieve the patient New-Line Delineated (NDJSON) file 
# from the book's Github repository
patient_url = "https://raw.githubusercontent.com/" + \
  "donghwajkim/data-engineering-with-databricks-1st-ed/" + \
  "refs/heads/main/data/synthea/50/fhir/Patient.ndjson"
response = requests.get(patient_url, stream=True)
w.files.upload(patient_file_path, response.content, overwrite = True)

patient_json_df = spark.read.format("json").load(patient_file_path)

In [0]:
display(patient_json_df.limit(10))

In [0]:
patient_json_df.printSchema()

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col

patient_df = \
    patient_json_df\
        .selectExpr(
            "id as patient_id",
            "name[0].family as last_name",
            "name[0].given[0] as first_name",
            "address[0].line[0] as address_line1",            
            "address[0].city as city", 
            "address[0].state as state",
            "address[0].postalCode as zipcode",
            "telecom[0].value as telephone_no",
            "birthDate as birth_date",
            "maritalStatus.text as marital_status",
            "gender",
            "deceasedDateTime as death_date_time"            
        )

In [0]:
display(patient_df.limit(10))

In [0]:
spark.sql("DROP TABLE IF EXISTS de_with_databricks.bronze.patient")

patient_df.write\
  .clusterBy("city", "state")\
  .mode("overwrite")\
  .saveAsTable(
    "de_with_databricks.bronze.patient")

In [0]:
%sql
SELECT * FROM de_with_databricks.bronze.patient LIMIT 10

## Create a reusable function for parsing FHIR JSON Patient Resources

In [0]:
import os
import time

def get_patient_df(fhir_patient_url):
    response = requests.get(fhir_patient_url, stream=True)

    # Note we assume simple 'file_name.extention' format
    file_name_ext_list = os.path.basename(fhir_patient_url).split('.')
    file_name = file_name_ext_list[0]
    file_ext = file_name_ext_list[-1]

    # Save the content as a file with the timestamp in the
    # Unity Catalog volume path
    current_timestamp_ms = int(round(time.time() * 1000))
    temp_patient_file = f"{file_name}_{current_timestamp_ms}.{file_ext}"
    temp_patient_file_path = f"{volume_folder_path}/{temp_patient_file}"
    w.files.upload(temp_patient_file_path, response.content, overwrite = True)

    new_patient_json_df = spark.read.format("json").load(temp_patient_file_path)    
    return new_patient_json_df.selectExpr(
            "id as patient_id",
            "name[0].family as last_name",
            "name[0].given[0] as first_name",
            "address[0].line[0] as address_line1",            
            "address[0].city as city", 
            "address[0].state as state",
            "address[0].postalCode as zipcode",
            "telecom[0].value as telephone_no",
            "birthDate as birth_date",
            "maritalStatus.text as marital_status",
            "gender",
            "deceasedDateTime as death_date_time"            
        )

## Ingesting daily incremental patient data

In [0]:
new_patient_file_url = \
    "https://raw.githubusercontent.com/donghwajkim/" + \
    "data-engineering-with-databricks-1st-ed/refs/heads" + \
    "/main/data/synthea/10/fhir/Patient.ndjson"

new_patient_df = get_patient_df(new_patient_file_url)

In [0]:
display(new_patient_df)

### Using Python

In [0]:
from delta.tables import *

patient_table = DeltaTable.forName(spark, "de_with_databricks.bronze.patient")

update_result_df = patient_table.alias('patient') \
  .merge(
    new_patient_df.alias('patient_update'),
    'patient.patient_id = patient_update.patient_id'
  ) \
  .whenMatchedUpdate(set =
    {
      "patient_id": "patient_update.patient_id",
      "last_name": "patient_update.last_name",
      "first_name": "patient_update.first_name",
      "address_line1": "patient_update.address_line1",
      "city": "patient_update.city",
      "state": "patient_update.state",
      "zipcode": "patient_update.zipcode",
      "telephone_no": "patient_update.telephone_no",
      "birth_date": "patient_update.birth_date",
      "marital_status": "patient_update.marital_status",
      "gender": "patient_update.gender",
      "death_date_time": "patient_update.death_date_time"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "patient_id": "patient_update.patient_id",
      "last_name": "patient_update.last_name",
      "first_name": "patient_update.first_name",
      "address_line1": "patient_update.address_line1",
      "city": "patient_update.city",
      "state": "patient_update.state",
      "zipcode": "patient_update.zipcode",
      "telephone_no": "patient_update.telephone_no",
      "birth_date": "patient_update.birth_date",
      "marital_status": "patient_update.marital_status",
      "gender": "patient_update.gender",
      "death_date_time": "patient_update.death_date_time"
    }
  ) \
  .execute()

display(update_result_df)

In [0]:
%sql
SELECT * FROM de_with_databricks.bronze.patient LIMIT 10

### Using SQL

In [0]:
spark.sql("DROP TABLE IF EXISTS de_with_databricks.bronze.patient")

patient_df.write\
  .clusterBy("city", "state")\
  .mode("overwrite")\
  .saveAsTable(
    "de_with_databricks.bronze.patient")

In [0]:
new_patient_df.createOrReplaceTempView("patient_update")

merge_sql = """
MERGE INTO de_with_databricks.bronze.patient
USING patient_update
ON patient.patient_id = patient_update.patient_id
WHEN MATCHED THEN
  UPDATE SET
    patient_id = patient_update.patient_id,
    last_name = patient_update.last_name,
    first_name = patient_update.first_name,
    address_line1 = patient_update.address_line1,
    city = patient_update.city,
    state = patient_update.state,
    zipcode = patient_update.zipcode,
    telephone_no = patient_update.telephone_no,
    birth_date = patient_update.birth_date,
    marital_status = patient_update.marital_status,
    gender = patient_update.gender,
    death_date_time = patient_update.death_date_time
WHEN NOT MATCHED
  THEN INSERT (
    patient_id,
    last_name,
    first_name,
    address_line1,
    city,
    state,
    zipcode,
    telephone_no,
    birth_date,
    marital_status,
    gender,
    death_date_time
  )
  VALUES (
    patient_update.patient_id,
    patient_update.last_name,
    patient_update.first_name,
    patient_update.address_line1,
    patient_update.city,
    patient_update.state,
    patient_update.zipcode,
    patient_update.telephone_no,
    patient_update.birth_date,
    patient_update.marital_status,
    patient_update.gender,
    patient_update.death_date_time
  )
"""

update_result_df = spark.sql(merge_sql)

display(update_result_df)

In [0]:
%sql
SELECT * FROM de_with_databricks.bronze.patient LIMIT 10