<a href="https://colab.research.google.com/github/alexontour/snippets/blob/main/snip_fhir_create.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Author: Alexander Kollmann, 08/2022**

---

**Funktion**

FHIR - Ressorcen schreiben und verknüpfen (versch. Optionen)

Patient, Encounter, Observation, Condition

---



**Referenzen**

https://colab.research.google.com/drive/1OSuqqACnCqw8h67E7DKDMyIGO1C2qbEq?usp=sharing#scrollTo=rU-lfJob3fOf

---



In [None]:
import json
import requests
from collections import OrderedDict
from io import StringIO

In [None]:
# Define Base URL
url = "http://hapi.fhir.org/baseR4/" # Open HAPI FHIR Server
#url = "https://server.fire.ly/R4/" # Open Firely Server

In [None]:
#Option 1: JSON
patient = json.loads("""{
   "resourceType":"Patient",
   "name":[
      {
         "given": "Christiano",
         "family": "Ronaldo",
         "text":"Christiano Ronaldo",
         "use": "official"
      }
   ],
   "gender":"male",
   "birthDate":"1950-01-01"
}""")

In [None]:
headers = {"Content-Type": "application/fhir+json;charset=utf-8"}

response = requests.request("POST", url + "Patient", headers=headers, data=str(patient))

patient_id = json.loads(response.text)['id']
patient_name = json.loads(response.text)['name']
#response.json()


print(patient_id)
print(patient_name[0]['given'])



6997815
['Christiano']


In [None]:
# Option 2: Funktion
def create_patient():


    body = {
        "name":[
            {
              "given": "Christiano",
              "family": "Ronaldo",
              "text":"Christiano Ronaldo",
              "use": "official"
            }
        ],
        "birthDate":"1950-01-01",
        "gender": "male",
        "resourceType": "Patient",
    }

    io = StringIO()
    json.dump(body, io, indent=2)

    return io


In [None]:
headers = {"Content-Type": "application/fhir+json;charset=utf-8"}

response = requests.request("POST", url + "Patient", headers=headers, data=create_patient().getvalue())

patient_id = json.loads(response.text)['id']
patient_name = json.loads(response.text)['name']
#response.json()


print(patient_id)
print(patient_name[0]['given'])


6997911
['Christiano']


In [None]:
def create_encounter(patient_id):


    body = {
        "status": "finished",
        "class": {
            "system": "http://hl7.org/fhir/v3/ActCode",
            "code": "IMP",
            "display": "inpatient encounter",
        },
        "reason": [
            {
                "text": "The patient had an abnormal heart rate. She was concerned about this."
            }
        ],
        "subject": {"reference": "Patient/{}".format(patient_id)},
        "resourceType": "Encounter",
    }


    io = StringIO()
    json.dump(body, io, indent=2)

    return io


# [END healthcare_create_encounter]




In [None]:
headers = {"Content-Type": "application/fhir+json;charset=utf-8"}

e = create_encounter(patient_id)

response = requests.request("POST", url + "Encounter", headers=headers, data=create_encounter(patient_id).getvalue())

encounter_id = json.loads(response.text)['id']
#response.json()

print(encounter_id)




6997907


In [None]:
# [START healthcare_create_observation]
def create_observation(patient_id,encounter_id):

    # Sets required application/fhir+json header on the request

    body = {
        "resourceType": "Observation",
        "code": {
            "coding": [
                {
                    "system": "http://loinc.org",
                    "code": "8867-4",
                    "display": "Heart rate",
                }
            ]
        },
        "status": "final",
        "subject": {"reference": "Patient/{}".format(patient_id)},
        "effectiveDateTime": "2019-01-01T00:00:00+00:00",
        "valueQuantity": {"value": 80, "unit": "bpm"},
        "context": {"reference": "Encounter/{}".format(encounter_id)},
    }

    io = StringIO()
    json.dump(body, io, indent=2)

    return io


In [None]:
headers = {"Content-Type": "application/fhir+json;charset=utf-8"}

o = create_observation(patient_id, encounter_id)

response = requests.request("POST", url + "Observation", headers=headers, data=create_observation(patient_id,encounter_id).getvalue())

observation_id = json.loads(response.text)['id']
#response.json()

print(observation_id)

6997909


In [None]:
# Option 3_ Dictionary
problem = OrderedDict()
problem['resourceType'] = 'Condition'
#problem['id'] = 'example'
problem['patient'] = {"reference": "Patient/{}".format(patient_id)}
#problem['encounter'] = {'reference': 'Encounter/67890'}
problem['dateRecorded'] = '2016-04-01T13:00'
problem['code'] = {'coding':
                       [{'system': 'http://hl7.org/fhir/sid/icd-10-us',
                         'code': 'E10.65',
                         'display': 'Type 1 Diabetes Mellitus with Hyperglycemia'}]}
problem['clinicalStatus'] = "active"
problem['verificationStatus'] = 'provisional'

io = StringIO()
json.dump(problem, io, indent=2)
print(io.getvalue())

diab = io.getvalue()

headers = {"Content-Type": "application/fhir+json;charset=utf-8"}


response = requests.request("POST", url + "Condition", headers=headers, data=diab)

condition_id = json.loads(response.text)['id']
#response.json()

print(condition_id)

{
  "resourceType": "Condition",
  "patient": {
    "reference": "Patient/6997904"
  },
  "dateRecorded": "2016-04-01T13:00",
  "code": {
    "coding": [
      {
        "system": "http://hl7.org/fhir/sid/icd-10-us",
        "code": "E10.65",
        "display": "Type 1 Diabetes Mellitus with Hyperglycemia"
      }
    ]
  },
  "clinicalStatus": "active",
  "verificationStatus": "provisional"
}
6997910
