In [1]:
%pip install requests PyJWT cryptography xmltodict

Collecting PyJWT
  Obtaining dependency information for PyJWT from https://files.pythonhosted.org/packages/6f/1d/ef9b066e7ef60494c94173dc9f0b9adf5d9ec5f888109f5c669f53d4144b/PyJWT-2.10.0-py3-none-any.whl.metadata
  Downloading PyJWT-2.10.0-py3-none-any.whl.metadata (4.0 kB)
Collecting cryptography
  Obtaining dependency information for cryptography from https://files.pythonhosted.org/packages/30/d5/c8b32c047e2e81dd172138f772e81d852c51f0f2ad2ae8a24f1122e9e9a7/cryptography-43.0.3-cp39-abi3-macosx_10_9_universal2.whl.metadata
  Downloading cryptography-43.0.3-cp39-abi3-macosx_10_9_universal2.whl.metadata (5.4 kB)
Collecting xmltodict
  Obtaining dependency information for xmltodict from https://files.pythonhosted.org/packages/d6/45/fc303eb433e8a2a271739c98e953728422fa61a3c1f36077a49e395c972e/xmltodict-0.14.2-py2.py3-none-any.whl.metadata
  Downloading xmltodict-0.14.2-py2.py3-none-any.whl.metadata (8.0 kB)
Collecting cffi>=1.12 (from cryptography)
  Obtaining dependency information for cf

In [23]:
import json
import pandas as pd
import xml.etree.ElementTree as ET


import xmltodict


def parse_patient(response):

    
    # Parse the XML response
    root = ET.fromstring(response.text)

    # Define the namespace
    namespace = {'ns': 'http://hl7.org/fhir'}

    # Extract patient details (assuming it's consistent across entries)
    patient_reference = root.find(".//ns:subject/ns:reference", namespace)
    patient_name = root.find(".//ns:subject/ns:display", namespace)

    # Safely get attributes
    patient_reference_value = patient_reference.attrib["value"] if patient_reference is not None else "Unknown"
    patient_name_value = patient_name.attrib["value"] if patient_name is not None else "Unknown"

    # Initialize list to store condition details
    conditions = []

    # Loop through each entry in the Bundle
    for entry in root.findall(".//ns:entry", namespace):
        condition = entry.find(".//ns:Condition", namespace)
        if condition is not None:
            condition_id = condition.find("ns:id", namespace)
            condition_id_value = condition_id.attrib["value"] if condition_id is not None else "Unknown"
            
            clinical_status = condition.find(".//ns:clinicalStatus/ns:coding/ns:display", namespace)
            clinical_status_value = clinical_status.attrib["value"] if clinical_status is not None else "Unknown"
            
            verification_status = condition.find(".//ns:verificationStatus/ns:coding/ns:display", namespace)
            verification_status_value = verification_status.attrib["value"] if verification_status is not None else "Unknown"
            
            category_elements = condition.findall(".//ns:category", namespace)
            category = ", ".join(
                cat.find("ns:coding/ns:display", namespace).attrib["value"]
                for cat in category_elements if cat.find("ns:coding/ns:display", namespace) is not None
            )
            
            diagnosis_codes = ", ".join(
                f"{code.attrib['value']} ({code.get('display', {}).get('value', '')})"
                for code in condition.findall(".//ns:code/ns:coding/ns:code", namespace)
            )
            
            encounter = condition.find(".//ns:encounter/ns:display", namespace)
            encounter_display = encounter.attrib["value"] if encounter is not None else "N/A"
            
            onset_date = condition.find(".//ns:onsetDateTime", namespace)
            onset_date_value = onset_date.attrib["value"] if onset_date is not None else "N/A"
            
            recorded_date = condition.find(".//ns:recordedDate", namespace)
            recorded_date_value = recorded_date.attrib["value"] if recorded_date is not None else "N/A"
            
            # Add condition details to the list
            conditions.append({
                "Patient Name": patient_name_value,
                "Patient Reference": patient_reference_value,
                "Condition ID": condition_id_value,
                "Clinical Status": clinical_status_value,
                "Verification Status": verification_status_value,
                "Category": category,
                "Diagnosis Codes": diagnosis_codes,
                "Encounter": encounter_display,
                "Onset Date": onset_date_value,
                "Recorded Date": recorded_date_value,
            })

    # Create a DataFrame from conditions
    df = pd.DataFrame(conditions)

    # Aggregate conditions into a single row for the patient
    aggregated_row = {
        "Patient Name": patient_name_value,
        "Patient Reference": patient_reference_value,
        "Conditions": "; ".join(
            f"ID: {cond['Condition ID']}, Clinical Status: {cond['Clinical Status']}, "
            f"Verification Status: {cond['Verification Status']}, Category: {cond['Category']}, "
            f"Codes: {cond['Diagnosis Codes']}, Encounter: {cond['Encounter']}, "
            f"Onset: {cond['Onset Date']}, Recorded: {cond['Recorded Date']}"
            for cond in conditions
        )
    }

    final_df = pd.DataFrame([aggregated_row])
    return final_df




import requests
import jwt
import time
import uuid
from cryptography.hazmat.primitives import serialization

# Epic FHIR public sandbox base URL
base_url = "https://fhir.epic.com/interconnect-fhir-oauth/api/FHIR/R4"

# OAuth 2.0 token endpoint
token_url = "https://fhir.epic.com/interconnect-fhir-oauth/oauth2/token"

# Client ID
client_id = "6445d618-e64b-4e1a-85dd-96ecc09bd92e"

private_key_path = "privatekey.pem"

# Load your private key
with open(private_key_path, "rb") as key_file:
    private_key = serialization.load_pem_private_key(
        key_file.read(),
        password=None,
    )

# Generate JWT assertion
issued_at = int(time.time())
expiration_time = issued_at + 300  # Token valid for 5 minutes

# Generate unique identifier for jti
jti = str(uuid.uuid4())

payload = {
    "iss": client_id,
    "sub": client_id,
    "aud": token_url,
    "jti": jti,
    "exp": expiration_time,
    "nbf": issued_at,
    "iat": issued_at,
}

header = {
    "alg": "RS384",
    "typ": "JWT"
}

jwt_assertion = jwt.encode(
    payload,
    private_key,
    algorithm="RS384",
    headers=header
)

# Obtain access token
data = {
    "grant_type": "client_credentials",
    "client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
    "client_assertion": jwt_assertion,
}

token_response = requests.post(token_url, data=data)
access_token = token_response.json().get("access_token")

# Headers with authorization
headers = {
    "Authorization": f"Bearer {access_token}",
    "Content-Type": "application/json",
}


# Pt FHIR ID
#patient_id = "erXuFYUfucBZaryVksYEcMg3" #Camila Lopez
#patient_id = "e0w0LEDCYtfckT6N.CkJKCw3" #Warren McGinnis
patient_id = "eq081-VQEgP8drUUqCWzHfw3" #Derick Lin


# Medications endpoint
#endpoint = f"{base_url}/List"

#medical history endpoint
endpoint = f"{base_url}/Condition" 


# Request parameters
params = {
    "patient": patient_id
    #,"code": "medications"
}

# Make the GET request
response = requests.get(endpoint, headers=headers, params=params)

df = parse_patient(response)
#parse_patient(json.dumps(json.loads(json_data), indent=4, sort_keys=True))





In [26]:
df['Conditions'].iloc[0]

'ID: eVGf2YljIMIk76IcfbNpjWQ3, Clinical Status: Active, Verification Status: Confirmed, Category: Problem List Item, SDOH, Codes: Z91.89 (), 315016007 (), V49.89 (), 34908715 (), Encounter: N/A, Onset: 2019-05-28, Recorded: 2019-05-28; ID: eY-LMUKgFarb5r10D5sXS7nGJO9qELcndS5oncvyDjPHp.lFiCEKE6mt2pIDbyFeBHvU6Z0XikLVgIqkXp8XV1Q3, Clinical Status: Active, Verification Status: Confirmed, Category: Encounter Diagnosis, Visit Diagnosis, Codes: 315016007 (), V49.89 (), Z91.89 (), Encounter: Office Visit, Onset: 2019-05-28, Recorded: 2019-05-28; ID: eY-LMUKgFarb5r10D5sXS7tjBxvqEuuIjSj-3DP.gq8M3, Clinical Status: Unknown, Verification Status: Unknown, Category: Reason for Visit, Codes: 83 (), Encounter: Office Visit, Onset: N/A, Recorded: 2019-05-28T13:56:10Z'