## Overview
This notebook loads a collection of synthetic FHIR bundles and value sets and shows some simple queries. Running this first will set up the environment for other notebooks in the tutorial

## Setup Tasks
Some setup before the real show begins...

In [1]:
from pyspark.sql import SparkSession

# Enable Hive support for our session so we can save resources as Hive tables
spark = SparkSession.builder \
                    .config('hive.exec.dynamic.partition.mode', 'nonstrict') \
                    .enableHiveSupport() \
                    .getOrCreate()

## Import Synthetic Data
This tutorial uses data generated by Synthea. It is simply a directory of STU3 bundles visible included in the tutorial; you can see it in the bundles directory.

Let's load the bundles and examine a couple data types in them.

In [2]:
from bunsen.stu3.bundles import load_from_directory, extract_entry, write_to_database

# Load and cache the bundles so we don't reload them every time.
bundles = load_from_directory(spark, 'bundles').cache()

# Get the observation and encounters
observations = extract_entry(spark, bundles, 'observation')
encounters = extract_entry(spark, bundles, 'encounter')

## Bunsen documentation
To get help using functions like *load_from_directory* or *extract_entry*, you can see the documentation at https://engineering.cerner.com/bunsen or via Python's help system, like this:

In [3]:
help(extract_entry)

Help on function extract_entry in module bunsen.stu3.bundles:

extract_entry(sparkSession, javaRDD, resourceName)
    Returns a dataset for the given entry type from the bundles.
    
    :param sparkSession: the SparkSession instance
    :param javaRDD: the RDD produced by :func:`load_from_directory` or other methods
        in this package
    :param resourceName: the name of the FHIR resource to extract
        (condition, observation, etc)
    :return: a DataFrame containing the given resource encoded into Spark columns



In [4]:
observations.printSchema()

root
 |-- id: string (nullable = true)
 |-- meta: struct (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-- versionId: string (nullable = true)
 |    |-- lastUpdated: timestamp (nullable = true)
 |    |-- profile: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- security: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- display: string (nullable = true)
 |    |    |    |-- userSelected: boolean (nullable = true)
 |    |-- tag: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- system: string (nullable = true)
 |    |    |    |-- version: string (nullable = true)
 |    |    |    |-- code: s

## Load some data
The next step will load some data and inspect it. Since Spark lazily delays execution until output is needed, all of the work will be done here. This can take several seconds or longer depending on the machine, but users can check its status by looking at the [Spark application page](http://localhost:4040).

For now, let's just turn our encounter resources into a simple table of all encounters since 2013:

In [5]:
from pyspark.sql.functions import col

encounters.select('subject.reference', 
                  'class.code', 
                  'period.start', 
                  'period.end') \
          .where(col('start') > '2013') \
          .limit(10) \
          .toPandas()

Unnamed: 0,reference,code,start,end
0,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,WELLNESS,2013-09-08T07:45:47-05:00,2013-09-08T07:45:47-05:00
1,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,WELLNESS,2014-09-14T07:45:47-05:00,2014-09-14T07:45:47-05:00
2,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,WELLNESS,2015-09-20T07:45:47-05:00,2015-09-20T07:45:47-05:00
3,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,WELLNESS,2016-08-21T07:45:47-05:00,2016-08-21T07:45:47-05:00
4,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,ambulatory,2016-08-18T07:45:47-05:00,2016-08-18T07:45:47-05:00
5,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,WELLNESS,2017-08-27T07:45:47-05:00,2017-08-27T07:45:47-05:00
6,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,emergency,2018-03-04T06:45:47-06:00,2018-03-08T06:45:47-06:00
7,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,ambulatory,2018-03-11T07:45:47-05:00,2018-03-11T08:00:47-05:00
8,urn:uuid:d9ac6b14-7ffd-46cf-9d2f-b4b982b4f1dc,WELLNESS,2013-04-29T06:56:19-05:00,2013-04-29T06:56:19-05:00
9,urn:uuid:d9ac6b14-7ffd-46cf-9d2f-b4b982b4f1dc,emergency,2013-07-07T06:56:19-05:00,2013-07-07T06:56:19-05:00


## Exploding nested lists
FHIR's nested structures group related data, making many workloads simpler. We can reference such nested structures directly, and "explode" nested lists when needed to analyze them. Let's build a table of all observation codes in our data:

In [6]:
from pyspark.sql.functions import explode

codes = observations.select('subject',
                            explode('code.coding').alias('coding')) \
                    .select('subject.reference', 
                            'coding.system', 
                            'coding.code',
                            'coding.display')
                    
codes.limit(10).toPandas()

Unnamed: 0,reference,system,code,display
0,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,8302-2,Body Height
1,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,29463-7,Body Weight
2,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,39156-5,Body Mass Index
3,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,55284-4,Blood Pressure
4,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,2093-3,Total Cholesterol
5,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,2571-8,Triglycerides
6,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,18262-6,Low Density Lipoprotein Cholesterol
7,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,2085-9,High Density Lipoprotein Cholesterol
8,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,19926-5,FEV1/​FVC
9,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,http://loinc.org,8302-2,Body Height


## Analyzing data
Our datasets become much easier to analyze once they've been projected onto a simpler model that suits the proble at hand. The code below simply shows the most frequent observation codes in our synthetic data.

In [7]:
codes.groupBy('system', 'code', 'display') \
     .count() \
     .orderBy('count', ascending=False) \
     .limit(10) \
     .toPandas()

Unnamed: 0,system,code,display,count
0,http://loinc.org,4548-4,Hemoglobin A1c/Hemoglobin.total in Blood,1753
1,http://loinc.org,29463-7,Body Weight,1377
2,http://loinc.org,55284-4,Blood Pressure,1377
3,http://loinc.org,8302-2,Body Height,1377
4,http://loinc.org,39156-5,Body Mass Index,1350
5,http://loinc.org,6299-2,Urea Nitrogen,871
6,http://loinc.org,2339-0,Glucose,871
7,http://loinc.org,38483-4,Creatinine,871
8,http://loinc.org,2069-3,Chloride,871
9,http://loinc.org,6298-4,Potassium,871


## Writing resources to a database
Directly loading JSON or XML FHIR bundles is useful for ingesting and early exploration of data, but a more efficient format works better repeated use. Since Bunsen encodes resources natively in Apache Spark dataframes, we can take advantage of Spark's ability to write it to a Hive database. Bunsen offers the *write_to_database* function as a convenient way to write resources from bundles to a database, with a table for each resource. 

Note that each table preserves the original, nested structure definition of the FHIR resource, and is field-for-field equivalent. 

The cell below will save our test data to tables in the "tutorial_small" database. When running it, you can see progress in the Spark UI at http://localhost:4040.


In [8]:
resources = ['allergyintolerance',
             'careplan',
             'claim',
             'condition',
             'encounter',
             'immunization',
             'medication',
             'medicationrequest',
             'observation',
             'organization',
             'patient',
             'procedure']

write_to_database(spark, 
                  bundles, 
                  'tutorial_small',
                  resources)

## Reading from a Hive database
Now that we've saved our data to a Hive database, we can easily view and query the tables with Spark SQL:

In [9]:
spark.sql('use tutorial_small')
spark.sql('show tables').toPandas()

Unnamed: 0,database,tableName,isTemporary
0,tutorial_small,allergyintolerance,False
1,tutorial_small,careplan,False
2,tutorial_small,claim,False
3,tutorial_small,condition,False
4,tutorial_small,encounter,False
5,tutorial_small,immunization,False
6,tutorial_small,medication,False
7,tutorial_small,medicationrequest,False
8,tutorial_small,observation,False
9,tutorial_small,organization,False


In [10]:
spark.sql("""
select subject.reference, 
       count(*) cnt
from encounter
where class.code != 'WELLNESS' and
      period.start > '2013'
group by subject.reference
order by cnt desc
limit 10
""").toPandas()

Unnamed: 0,reference,cnt
0,urn:uuid:e206880c-7762-4aee-a3e2-5a8c89512c18,53
1,urn:uuid:e538491e-cf8e-4a3f-97a5-45811e066f27,44
2,urn:uuid:dcad3c44-64de-43b6-b24c-989f8f27c71d,33
3,urn:uuid:5804a9d3-3518-4862-a1e4-a61b0f1a4be4,31
4,urn:uuid:2bf9eab0-fec0-41b2-9f91-3369e38b98f6,19
5,urn:uuid:90a7ded5-a5ce-43df-b973-7bc7ce7a3011,18
6,urn:uuid:8f538e46-a1d1-4c75-beb7-e3946124e730,16
7,urn:uuid:6f58dbea-7532-4090-97a8-79982bab98f5,12
8,urn:uuid:aa251e83-9a9b-446f-ba2f-87e2da7c4d34,8
9,urn:uuid:83f95d0c-95f4-4fcc-a3b2-4d2926fd13db,6


## Loading Valuesets
Bunsen has built-in support for working with FHIR valuesets. As a convenience, the APIs in the bunsen.stu3.codes package offers ways to save valuesets to Hive tables that are more easily used.

In [11]:
from bunsen.stu3.codes import create_value_sets

# Load the valuesets from bundles
valueset_bundles = load_from_directory(spark, 'valuesets')
valueset_data = extract_entry(spark, valueset_bundles, 'valueset')

# Import the value sets and save them to an ontologies database for easy future use
spark.sql('create database tutorial_ontologies')

create_value_sets(spark).with_value_sets(valueset_data) \
                        .write_to_database('tutorial_ontologies')

Now we can more easily look at the values in our valuesets:

In [12]:
spark.table('tutorial_ontologies.values').toPandas()

Unnamed: 0,system,version,value,valueseturi,valuesetversion
0,http://loinc.org,2.36,14647-2,http://hl7.org/fhir/ValueSet/example-extensional,20150622
1,http://loinc.org,2.36,2093-3,http://hl7.org/fhir/ValueSet/example-extensional,20150622
2,http://loinc.org,2.36,35200-5,http://hl7.org/fhir/ValueSet/example-extensional,20150622
3,http://loinc.org,2.36,9342-7,http://hl7.org/fhir/ValueSet/example-extensional,20150622


## Using Valuesets
Finally, we illustrate how we can easily use FHIR valuesets within Spark SQL. Bunsen provides an *in_valueset* user-defined function that can be invoked directly from SQL, so users can easily work with valuesets without needing complex joins to separate ontology tables.

First, we will push some interesting valuesets to the cluster with the *push_valuesets* function seen below. This uses Apache Spark's broadcast variables to get this reference data on each node, so it can be easily used. Details are in that function documentation, but typically users work with valuesets in one of three ways:

* From a FHIR ValueSet resource, as illustrated here
* As a collection of values in a Python structure
* As an is-a relationship in some ontology, like LOINC or SNOMED.

Further documentation can be viewed in the function documentation or via help(push_valuesets).

Let's take a look at an example:

In [13]:
from bunsen.stu3.valuesets import push_valuesets, valueset

# Push multiple valuesets for this example, even though we use only one.
push_valuesets(spark, 
               {'ldl'               : [('http://loinc.org', '18262-6')],                
                'hdl'               : [('http://loinc.org', '2085-9')],
                'cholesterol'       : valueset('http://hl7.org/fhir/ValueSet/example-extensional', '20150622')},
               database='tutorial_ontologies'); 

In [14]:
spark.sql("""
select subject.reference, 
       valueQuantity.value,
       valueQuantity.unit
from tutorial_small.observation
where in_valueset(code, 'cholesterol')
limit 10
""").toPandas()

Unnamed: 0,reference,value,unit
0,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,196.5052,mg/dL
1,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,176.123,mg/dL
2,urn:uuid:f88c719e-9875-446d-852b-cddb79fe4998,160.9981,mg/dL
3,urn:uuid:d9ac6b14-7ffd-46cf-9d2f-b4b982b4f1dc,166.9086,mg/dL
4,urn:uuid:d9ac6b14-7ffd-46cf-9d2f-b4b982b4f1dc,167.6831,mg/dL
5,urn:uuid:d9ac6b14-7ffd-46cf-9d2f-b4b982b4f1dc,161.0235,mg/dL
6,urn:uuid:7460ce37-9f41-4543-b906-7308e9932d21,195.872,mg/dL
7,urn:uuid:5a288b89-ba8d-48f4-b99a-ca6bdd67c2e6,197.8422,mg/dL
8,urn:uuid:5a288b89-ba8d-48f4-b99a-ca6bdd67c2e6,269.7181,mg/dL
9,urn:uuid:5a288b89-ba8d-48f4-b99a-ca6bdd67c2e6,244.6572,mg/dL
