Phase 1 - Utility notebook (copied from Databricks, not all code may work correctly in other IDEs).

This notebook defines functions used in other Phase 1 notebooks.

**Author:** Nate Bean

**Date created:** 10/23 -- **Last modified:** 11/24

**Purpose:** A utility that contains functions used for querying and transforming syphilis FHIR data. Request code was originally written by Mark Knutson.

**Schedule:** None

**Changelog:**
- 10/02/2023: Created notebook with 'get_token', 'query_fhir', and 'parse_fhir' functions.
- 10/27/2023: Edited 'parse_fhir' based on formatting changes in the UAT data.
- 10/30/2023: Added code to 'parse_specimen' to capture 'Site2' values.
- 11/14/2023: Added code to 'parse_patient' to capture 'Case Modification Date' values. 
- 11/16/2023: Added improved error handling to 'query_fhir'
- 07/30/2024: Added send_fhir_message function
- 11/14/2024: Edited 'parse_fhir' to handle the five new Phase 1 fields.

In [0]:
import json #uat_token, query_fhir, parse_fhir, send_fhir_message
import requests #uat_token, query_fhir, send_fhir_message
from requests.auth import HTTPDigestAuth #uat_token
import re #parse_fhir
from functools import reduce #parse_fhir
import pandas as pd #parse_fhir
import time #query fhir, send_fhir_message

In [0]:
#This function connects to the Keycloak API and retrieves an access token used to authenticate our query
def get_token(keycloak_url, realm, client_id, client_secret):
    endpoint = f"{keycloak_url}/realms/{realm}/protocol/openid-connect/token"
    data = {"grant_type": "client_credentials", "client_secret": client_secret, "client_id": client_id}
    headers = {"Content-Type": "application/x-www-form-urlencoded"}

    print("Retrieving token from: " + endpoint)
    resp = requests.post(endpoint, data = data, headers = headers)
    respContent = resp.content
    respStr = resp.content.decode('utf-8')

    if '503 Service Temporarily Unavailable' in respStr:
        raise ValueError('MDH server error: 503 Service Temporarily Unavailable')

    respJson = json.loads(respStr)
    
    token_type = respJson['token_type']
    scope = respJson['scope']
    token = respJson['access_token']
    print(f"Token retrieved from: {endpoint}")
    print(str(len(token)) + " token bytes")
    return token

In [0]:
#Retrieve data in FHIR JSON format
def query_fhir(app_host, suffix):
    endpoint = f"{app_host}{suffix}"

    #Requires an access token retrieved using get_token
    headers = {"Content-Type": "application/json","Authorization": f"Bearer {access_token}"}

    attempts = 0
    success = 0

    #Make up to 3 attempts to query the API endpoint
    while attempts <= 2 and success != 1:
        print("Querying: " + endpoint)
        resp = requests.get(endpoint, headers = headers)
        respStr = resp.content.decode('utf-8')

        #If an error message is received, print it
        if 'message":' in respStr:
            print(respStr)

            #If the server hasn't spun up yet, wait and retry (see 11/15 email from Dan saved in project folder)
            if '"message": "Endpoint request timed out"' in respStr:
                attempts += 1

                if attempts == 3:
                    raise ValueError('Endpoint request timed out (attempt 3 of 3).')

                else:
                    print(f"Waiting for 5 seconds, then retrying... (attempt {attempts} of 3)")
                    time.sleep(5)
            else:
                raise ValueError('Query failed. Check error message')
    
        else: 
            print("Query successful.")
            success += 1

    #If there is no new data (based on modification date) this will exit the function & notebook when in a workflow
    if len(respStr) == 53:
        dbutils.notebook.exit("No new data available")
    #Otherwise return the data is JSON format
    else:
        print(str(len(respStr)) + " bytes long")
        query_data = json.loads(respStr)
        e = query_data['entry']
        print(f"Downloaded data for: {len(e)} resource groups")
        ids = map(lambda x: re.sub("^.*\-", "", e[x]['resource']['id']), range(len(e)))
        print(f"Downloaded data for: {len(set(list(ids)))} patients")
        return(query_data)

**Function structure:**
- This function is designed to read through bundles of data in the FHIR format and return the data as a Python dataframe.
- It should be used on the output returned by the MDH FHIR server. If there are no new records (this shouldn't be the case since we query all records) the function will stop and so should the script.
- FHIR data is stored in resource groups. For example, the patient resource group contains patient demographic information and so on. Multiple FHIR resource groups are combined to provide all data necessary for a use case.
- The function needs to read data from seven different resource groups: patient, encounter, specimen, careplan, condition, questionnaire response, and location. There is a separate function below to read each data from each of these resource groups. Then, each function is used as part of the main function that will read and combine the data from each resource group. 
- These functions find the correct value using a specified path. If a value is not found at the specified path, the function will return correct missing value for most variables.

In [0]:
def parse_patient(ent):
    id = re.sub("^.*\-", "", ent['resource']['id'])
    first_name = ent['resource']['name'][0]['given'][0]
    last_name = ent['resource']['name'][1]['family']
    gender = ent['resource']['gender']

    try:
        birth_date = ent['resource']['birthDate']
    except:
        birth_date = ""


    #There are multiple variables nested under "extension" - race (up to 3), ethnicity, gender identity/detail 
    race = ""
    ethnicity = ""
    gender_detail = ""
    
    try:
        ext = len(ent['resource']['extension'])
    except:
        ext = 0

    for demo in range(0, ext):
                
        if "us-core-ethnicity" in ent['resource']['extension'][demo]['url']:
            ethnicity = ent['resource']['extension'][demo]['extension'][0]['valueCoding']['display']

        if "us-core-genderIdentity" in ent['resource']['extension'][demo]['url']:
            gender_detail = ent['resource']['extension'][demo]['extension'][0]['valueCoding']['code']

        if "us-core-race" in ent['resource']['extension'][demo]['url']:
            if race == "":
                race = ent['resource']['extension'][demo]['extension'][0]['valueCoding']['display']
            else: 
                temp = ent['resource']['extension'][demo]['extension'][0]['valueCoding']['display']
                race = f"{race};;{temp}"

    try:
        street = ent['resource']['address'][0]['line']
        street_1 = ent['resource']['address'][0]['line'][0]
    except:
        street = ""
        street_1 = ""

    if len(street) > 1:
        street_2 = street1 = ent['resource']['address'][0]['line'][1]
    else: 
        street_2 = ""

    try:
       county_name = ent['resource']['address'][0]['district']
    except:
       county_name = "" 

    try:
        city = ent['resource']['address'][0]['city']
    except:
        city = ""

    try:
        zip = ent['resource']['address'][0]['postalCode']
    except:
        zip = ""

    case_mod =  ent['resource']['meta']['tag'][0]['code']


    patient_data = {"id": id, "first_name": first_name, "last_name": last_name, "race": race, "gender": gender, "ethnicity": ethnicity, 
                    "gender_detail": gender_detail, "birth_date": birth_date, "street_1": street_1, "street_2": street_2, "city": city,
                    "zip": zip, "county": county_name, "case_modification_date": case_mod}
    
    return(patient_data)

In [0]:
def parse_encounter(ent):
    id = re.sub("^.*\-", "", ent['fullUrl']) #id is the same across all data, so use this instead for now. 

    try: #this may not be necessary
        ext = len(ent['resource']['extension'][0]['extension'])
    except:
        ext = 0
    
    age = ""
    age_units = ""

    for lvl in range(0, ext):

        if ent['resource']['extension'][0]['extension'][lvl]['valueCoding']['code'] == "AGE":
            try :
                age = ent['resource']['extension'][0]['extension'][lvl]['valueCoding']['display']
            except:
                age = ""
        
        if ent['resource']['extension'][0]['extension'][lvl]['valueCoding']['code'] == "AGE_UNITS":
            try:
               age_units = ent['resource']['extension'][0]['extension'][lvl]['valueCoding']['display']
            except:
               age_units = "" 
    
    encounter_data = {"id": id, "age": age, "age_units": age_units}
    return(encounter_data)

In [0]:
def parse_specimen(ent):
    id = re.sub("^.*\-", "", ent['resource']['id'])

    try:
        specimen_date = ent['resource']['collection']['collectedDateTime']
    except:
        specimen_date = ""

    #There may be multiple sites. Collect data for each one present.
    try:
        sites = len(ent['resource']['collection']['bodySite']['coding'])
    except:
        sites = 0
    
    site = ""
    for s in range(0, sites):
        if site == "":
            site= ent['resource']['collection']['bodySite']['coding'][s]['code']
        else:
            temp = ent['resource']['collection']['bodySite']['coding'][s]['code']
            site = f"{site};;{temp}"

    specimen_data = {"id": id, "site": site, "specimen_date": specimen_date}
    return(specimen_data)

In [0]:
def parse_careplan(ent):
    id = re.sub("^.*\-", "", ent['resource']['id'])

    #There is only one start date included - so would be the same for multiple treatments
    try:
        regimen_start_date = ent['resource']['activity'][0]['detail']['scheduledString']
    except:
        regimen_start_date = "" 

    treatment_regimen = ""

    #There may be multiple treatment regimens. Collect data for all
    try:
        act = len(ent['resource']['activity']) #the number of included regimens (only two)
    except:
        act = 0

    for meds in range(0, act): 
        if treatment_regimen == "":
            try: #I may not ultimately need try/except here
                treatment_regimen = ent['resource']['activity'][meds]['detail']['description']
            except:
                treatment_regimen == ""

        else:
            try:
                temp = ent['resource']['activity'][meds]['detail']['description']
                treatment_regimen = f"{treatment_regimen};;{temp}"
            except:
                treatment_regimen = treatment_regimen

    careplan_data = {"id": id, "regimen_start_date": regimen_start_date, "treatment_regimen": treatment_regimen}
    return(careplan_data)

In [0]:
def parse_condition(ent):
    id = re.sub("^.*\-", "", ent['resource']['id'])
    disease = ent['resource']['code']['coding'][0]['code']

    try: 
        syphcode = ent['resource']['stage'][0]['summary']['coding'][0]['code'] #i don't think there could be more than 1?
    except: 
        syphcode = "9"
            
    condition_data = {"id": id, "disease": disease, "syphcode": syphcode} 
    return(condition_data)

In [0]:
def parse_qr(ent):
    id = re.sub("^.*\-", "", ent['resource']['id'])
    qr = {"id": id}

    for x in range(len(ent['resource']['item'])):
        name = ent['resource']['item'][x]['text'].replace(" ", "").replace("-", "")

        #Edits 11/24 - five new fields added to QR. They are stored differently, so will change this code to work with both
        try:
            value = ent['resource']['item'][x]['answer'][0]['valueCoding']['code']
        except KeyError:
            try:
                value = ent['resource']['item'][x]['answer'][0]['valueBoolean'] 
            except KeyError:
                value = ent['resource']['item'][x]['answer'][0]['valueDate']
        
        temp = {name: value}
        qr = dict(qr, **temp)

    return(qr)

In [0]:
def parse_location(ent):
    id = re.sub("^.*\-", "", ent['resource']['id'])

    try:
        facility_doing_the_reporting = ent['resource']['name']
    except:
        facility_doing_the_reporting = ""
    
    location_data = {"id": id, "facility_doing_the_reporting": facility_doing_the_reporting}
    return(location_data)

In [0]:
def parse_sti_fhir(data):

    e = data['entry']
    print(f"Resource groups in data: {len(e)}")
    ids = map(lambda x: re.sub("^.*\-", "", e[x]['resource']['id']), range(len(e)))
    print(f"Patients in data: {len(set(list(ids)))}")
    print(f"Includes all records updated on or after: {mod_date}")
    
    #Use lists to hold data from each patient/entry
    patient_data = []
    specimen_data = []
    encounter_data = []
    careplan_data = []
    condition_data = []
    question_data = []
    location_data = []


    #Read data for each patient/entry
    for ent in data['entry']:
        resource_type = ent['resource']['resourceType']

        if resource_type == 'Patient':
            patient_temp = parse_patient(ent)
            patient_data.append(patient_temp)

        elif resource_type == "Specimen":
            specimen_temp = parse_specimen(ent)
            specimen_data.append(specimen_temp)

        elif resource_type == "Encounter":
            encounter_temp = parse_encounter(ent)
            encounter_data.append(encounter_temp)

        elif resource_type == "CarePlan": 
            careplan_temp = parse_careplan(ent)
            careplan_data.append(careplan_temp)
        
        elif resource_type == "Condition":
            condition_temp = parse_condition(ent)
            condition_data.append(condition_temp)

        elif resource_type == "QuestionnaireResponse":
            question_temp = parse_qr(ent)
            question_data.append(question_temp)

        elif resource_type == "Location":
            location_temp = parse_location(ent)
            location_data.append(location_temp)


    #Create dataframe row for each patient/entry
    fhir_resources = [pd.DataFrame(patient_data), pd.DataFrame(specimen_data), pd.DataFrame(encounter_data), pd.DataFrame(careplan_data), 
                      pd.DataFrame(condition_data), pd.DataFrame(question_data), pd.DataFrame(location_data)]
    
    fhir_final = reduce(lambda  left,right: pd.merge(left, right, on = ['id'], how = 'left'), fhir_resources) 
    return(fhir_final)