## Create FHIR STU3 CapStatement Resource

### Outline:

- Source excel with requirements
- pandas to convert in python Ordered Dict
- build json
- generate narrative using Jinja2 templates

### Prerequisites:

- Python 3.6 or greater

### Import FHIRClient and other libraries

In [1]:
%config IPCompleter.greedy=True

In [2]:
from fhirclient.models import searchparameter as SP
from fhirclient.models import capabilitystatement as CS
from fhirclient.models import bundle as B
from fhirclient.models import narrative as N
import fhirclient.models.identifier as I
import fhirclient.models.coding as C
import fhirclient.models.codeableconcept as CC
import fhirclient.models.fhirdate as D
import fhirclient.models.extension as X
import fhirclient.models.contactdetail as CD
import fhirclient.models.fhirreference as FR
from json import dumps, loads, load
from requests import get, post, put
import os
from pathlib import Path
from csv import reader as csvreader
from pprint import pprint
from stringcase import snakecase, titlecase
from collections import namedtuple
from pandas import *
from datetime import date
from jinja2 import Environment, FileSystemLoader, select_autoescape
from commonmark import commonmark
from IPython.display import display, HTML
from lxml import etree

#### Inspect ElementProperties as reference

In [3]:
CS.CapabilityStatementRest().elementProperties()

[('extension',
  'extension',
  fhirclient.models.extension.Extension,
  True,
  None,
  False),
 ('id', 'id', str, False, None, False),
 ('modifierExtension',
  'modifierExtension',
  fhirclient.models.extension.Extension,
  True,
  None,
  False),
 ('compartment', 'compartment', str, True, None, False),
 ('documentation', 'documentation', str, False, None, False),
 ('interaction',
  'interaction',
  fhirclient.models.capabilitystatement.CapabilityStatementRestInteraction,
  True,
  None,
  False),
 ('mode', 'mode', str, False, None, True),
 ('operation',
  'operation',
  fhirclient.models.capabilitystatement.CapabilityStatementRestOperation,
  True,
  None,
  False),
 ('resource',
  'resource',
  fhirclient.models.capabilitystatement.CapabilityStatementRestResource,
  True,
  None,
  False),
 ('searchParam',
  'searchParam',
  fhirclient.models.capabilitystatement.CapabilityStatementRestResourceSearchParam,
  True,
  None,
  False),
 ('security',
  'security',
  fhirclient.models.cap

####  Assign Global Variables


Here is where we assign all the global variables for this example such as the canonical base and project information

In [4]:
fhir_term_server = 'http://test.fhir.org/r3'
fhir_test_server = 'http://test.fhir.org/r3'

headers = {
'Accept':'application/fhir+json',
'Content-Type':'application/fhir+json'
}

fhir_base_url = 'http://hl7.org/fhir/'

#canon = "http://fhir.org/guides/argonaut-questionnaire/"
#canon = 'http://fhir.org/guides/argonaut-clinicalnotes/'
canon = 'http://hl7.org/fhir/us/davinci-deqm/STU3/'

#pre = "Argonaut"
pre = 'DaVinci DEQM'

#publisher = 'The Argonaut Project'
publisher = 'The DaVinci Project'


'''publisher_endpoint = dict(
                        system = 'url',
                        value = 'https://github.com/argonautproject/questionnaire/issues'
                        )
'''

publisher_endpoint = dict(
                        system = 'url',
                        value = 'http://www.hl7.org/Special/committees/cqi/index.cfm'
                        )


f_jurisdiction =  CC.CodeableConcept({
      "coding" : [
        {
          "system" : "urn:iso:std:iso:3166",
          "code" : "US"
        }
      ]
    })

conf_url = 'http://hl7.org/fhir/StructureDefinition/capabilitystatement-expectation'
combo_url = 'http://hl7.org/fhir/StructureDefinition/capabilitystatement-search-parameter-combination'

none_list = ['', ' ', 'none', 'n/a', 'N/A', 'N', 'False']

sep_list = (',', ';', ' ', ', ', '; ')

f_now = D.FHIRDate(str(date.today()))

#### Conformance Extension

In [5]:
def get_conf(conf='MAY'):
    return [X.Extension(dict(
        url = conf_url,
        valueCode = conf
        ))]

### validate

In [6]:
# *********************** validate Resource ********************************

def validate(r):
    fhir_test_server = 'http://fhirtest.uhn.ca/baseDstu3'
    #fhir_test_server = 'http://test.fhir.org/r3'

    headers = {
    'Accept':'application/fhir+json',
    'Content-Type':'application/fhir+json'
    }

    # profile = 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient' # The official URL for this profile is: http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient
 
    params = dict(
      # profile = 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient' # The official URL for this profile is: http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient
        )
    
    #   r = requests.post('https://httpbin.org/post', data = {'key':'value'})
    r = post(f'{fhir_test_server}/Questionnaire/$validate', params = params, headers = headers, data = dumps(r.as_json()))
    # return r.status_code
    # view  output
    # return (r.json()["text"]["div"])
    return r

### Write to 

In [7]:
 def write_file(name, data): # write file
    #out_path = ''
    #out_path = '//ERICS-AIR-2/ehaas/Documents/FHIR/Argo-Questionnaire/source/resources/'
    out_path = '//ERICS-AIR-2/ehaas/Documents/FHIR/Davinci-DEQM/source/resources/'
    with open(f'{Path(out_path)}/{name}.json', 'w') as f:
        f.write(data)

### Get Cap Statement input data

#### first the meta sheet

In [8]:
#in_path = Path('/Users/ehaas/Documents/FHIR/pyfhir/test/')
in_path ='/ERICS-AIR-2/ehaas/Documents/FHIR/Davinci-DEQM/source/capstat_spreadsheets/'
#out_path = '/Users/ehaas/Documents/FHIR/pyfhir/test/'
out_path=''
#out_path = Path("C:/Users/Eric/Documents/Jan_2019_FHIR_Experience")

# in_file = 'AssBank'
# in_file = 'AnsBank'
# in_file = 'EHRProvider'
#in_file = 'AdaptService'
#in_file = 'Argonaut_Capability_Statement_Clinical_Notes'
#in_file = 'Argonaut_Capability_Statement_CLIENT_Clinical_Notes'

in_file = "DEQM_Capability_Statement_Consumer_Client"
#in_file = "DEQM_Capability_Statement_Reporter_Client"
#in_file = "DEQM_Capability_Statement_Consumer_Server"
#in_file = "DEQM_Capability_Statement_Producer_Client"
#in_file = "DEQM_Capability_Statement_Producer_Server"
#in_file = "DEQM_Capability_Statement_Receiver_Server"


xls = ExcelFile(f'{in_path}{in_file}.xlsx')
df = read_excel(xls,'meta',na_filter = False)

df

FileNotFoundError: [Errno 2] No such file or directory: '/ERICS-AIR-2/ehaas/Documents/FHIR/Davinci-DEQM/source/capstat_spreadsheets/DEQM_Capability_Statement_Consumer_Client.xlsx'

#### Create NamedTuple from df to use dot notation

In [None]:
d = dict(zip(df.Element, df.Value))
meta = namedtuple("Meta", d.keys())(*d.values())      
         
meta.id


### Create CS instance

In [None]:
def get_op():
    op_list = []
    df_op = read_excel(xls,'ops',na_filter = False)
    for i in df_op.itertuples(index=True):
        op = CS.CapabilityStatementRestOperation()
        op.name = i.name
        ref = FR.FHIRReference()
        ref.reference = i.definition if i.definition else f'{canon}OperationDefinition/{i.name}'
        print(f'op.definition =  {ref.reference}')    
        op.definition =  ref
        op.extension = get_conf(i.conf) 
        op_list.append(op)
    return op_list

def get_igs():
    ig_list = []
    df_igs = read_excel(xls,'igs',na_filter = False)
    for ig in df_igs.itertuples(index=True):
        ig_list.append(ig.uri)
    return ig_list


cs = CS.CapabilityStatement()
cs.id = meta.id
cs.url = f'{canon}CapabilityStatement/{meta.id}'
cs.version = '0.0.0'  # placeholder changed by build
cs.name = snakecase(meta.id)
cs.title = f'{pre} {titlecase(meta.id)} {cs.resource_type}'
cs.status = 'active'
cs.experimental = False
cs.date = f_now  # as FHIRDate
cs.publisher = publisher
cs.contact = [CD.ContactDetail( {"telecom" : [ publisher_endpoint ] })]
cs.description = meta.description
cs.jurisdiction = [f_jurisdiction]
cs.kind = 'requirements'
cs.fhirVersion = '3.0.1'
cs.acceptUnknown = 'both'
cs.format = [
    "xml",
    "json"
  ]
cs.patchFormat = [
    "application/json-patch+json",
  ]
cs.implementationGuide = meta.ig.split(",") + get_igs()
rest = CS.CapabilityStatementRest(dict(
    mode = meta.mode,
    documentation = meta.documentation,
    security = dict(
        description = meta.security
        )
    ))
rest.operation = get_op()
cs.rest = [rest]


cs.as_json()

#### Then the list of IG profiles ( for STU3 )

In [None]:
xls = ExcelFile(f'{in_path}{in_file}.xlsx')
df = read_excel(xls,'profiles',na_filter = False)

df

In [None]:

p_map ={}
cs.profile = [] 
for p in df.itertuples(index=True):
    print(p.Profile, p.Conformance, p.Name, p.Type)
    try: # for mapping stu3 profiles to resources :-(
        p_map[p.Type].append(f'[{p.Name}]({p.Profile})')
    except KeyError:
        p_map[p.Type]=[f'[{p.Name}]({p.Profile})']
        
    ref = FR.FHIRReference(dict(
        reference = p.Profile,
        display = p.Name
        ))
    ref.extension = get_conf(p.Conformance)

 
    cs.profile.append(ref)
                                           
    
pprint(p_map)


#### add Resources

In [None]:
df = read_excel(xls,'resources',na_filter = False)
df_i = read_excel(xls,'interactions',na_filter = False)
df_sp = read_excel(xls,'sps',na_filter = False)

def get_i(type):
    int_list = []
    for i in df_i.itertuples(index=True):
        print(i.code, getattr(i,f'conf_{type}'))
        if getattr(i,f'conf_{type}') not in none_list:
            int  = CS.CapabilityStatementRestResourceInteraction()
            int.code = i.code  
            int.extension = get_conf(getattr(i,f'conf_{type}'))    
            int_list.append(int.as_json())
        
    return int_list


def get_sp(r_type):
    sp_list = []
    for i in df_sp.itertuples(index=True):
        if i.Resource == r_type:
            # print(i.Parameter, i.Resource, i.Conformance)
            sp  = CS.CapabilityStatementRestResourceSearchParam()
            sp.name = i.Parameter
            sp.definition = (f'{canon}SearchParameter/{i.Resource}-{i.Parameter}' if i.Update == 'Y' or i.Exists =='N'
                             else f'{fhir_base_url}SearchParameter/{i.Base}-{i.Parameter.split("_")[-1]}')  # removes the '_' for things like _id
            # print(sp.definition)
            sp.type = i.Type
            sp.extension = get_conf(i.Conformance)    
            sp_list.append(sp.as_json())
    return sp_list


def get_conf_str(combo, r_type):
    conf_str = ''
    for k in df_sp.itertuples(index=True):
        if k.Resource == r_type and k.Parameter in combo:
            if  k.Conformance == 'MAY' or k.Conformance in none_list:
                conf_str = 'MAY'
                break
            elif k.Conformance == 'SHOULD':
                conf_str = 'SHOULD' 
            elif k.Conformance == 'SHALL' and conf_str not in ['SHALL','MAY']:
                conf_str ='SHALL' 
    return conf_str


def get_combo_ext(r_type,combos):
    x_list = []
    for combo in combos:
        # convert to extension
        combo_ext = X.Extension()
        combo_ext.url = combo_url
        combo_conf_ext = get_conf(get_conf_str(combo, r_type))
        combo_ext.extension=combo_conf_ext
        for param in combo:
            req_combo = X.Extension(
                dict (
                    url = 'required',
                    valueString =param
                    )
                )
            combo_ext.extension.append(req_combo)
        x_list.append(combo_ext)
        # print(x_list)
    return x_list
 
    
def get_combos(pairs,c_list):
    a_list =[]
    for i in c_list:
        for j in c_list:
            #print(f'i={i} j = {j}, i&j= {i&j} i^j= {i^j}')
            if i & j and i != j:
                #print(f'i={i}, j = {j}, i&j= {i&j},i^j= {i^j} i|j = {i|j}')
                if i^j in pairs:
                    if i|j not in a_list + c_list:
                        a_list.append(i|j)
    return a_list
    
    

def get_search_combos(r_type,sp_len):
    pairs = []
    for k in df_sp.itertuples(index=True):
        if k.Resource == r_type:
            # print(i.Parameter, i.Resource, i.Conformance)     
            for v in k.combo_pairs.split(','):  #get allowed pairs
                # print(k.Parameter,v)
                if {v,k.Parameter} not in pairs and v not in none_list:
                    pairs.append({k.Parameter,v})
    # print(pairs)
    combo_list = pairs
    for j in range(sp_len-1):
         combo_list = combo_list + get_combos(pairs,combo_list)
    # convert to sorted tuples
    combo_list = [sorted(tuple(i)) for i in combo_list]
    combo_list = sorted(combo_list)
    # print(combo_list)
    return combo_list
                
            
    


rest.resource =  []
for r in df.itertuples(index=True):
    # print(r.type, r.conformance, r.readHistory)
    res = CS.CapabilityStatementRestResource(
    dict(
        type = r.type,
        documentation = r.documentation if r.documentation not in none_list else None,
        versioning = r.versioning if r.versioning not in none_list else None,
        readHistory = None if r.readHistory is None else r.readHistory == 'True',
        updateCreate = None if r.readHistory is None else r.readHistory == 'True',
        referencePolicy = r.referencePolicy.split(",") if r.referencePolicy not in none_list else [],
        interaction = get_i(r.type),
        searchParam = get_sp(r.type),
        searchInclude = r.searchInclude.split(",") if r.searchInclude not in none_list else []
        
        ) 
    )
    res.extension = get_conf(r.conformance)
    print(len(res.searchParam))
    combos = get_search_combos(r.type, len(res.searchParam)) # sorted tuples
    print(f'{len(combos)} combos = {combos}')
    try: #subtract forbidden combos
        f_combos = [sorted(i.split(',')) for i in r.forbidden_s_combos.split('|')]   #forbidden combos
        print(f' r.forbidden_s_combos.split("|") = {r.forbidden_s_combos.split("|")}, f_combos= {f_combos}')
        combos = [i for i in combos if i not in f_combos]
    except AttributeError:
        pass #forbidden_s_combos is missing
    print(f'{len(combos)} combos = {combos}')
    res.extension = res.extension + get_combo_ext(r.type,combos) # convert list to  lst of combo extensions

    rest.resource.append(res)
cs.rest = [rest]
    
print(dumps(cs.as_json(),indent=3))

### Validate

In [None]:
 #validate and write to file

print('...validating')
r = validate(cs)
display(HTML(f'<h1>Validation output</h1><h3>Status Code = {r.status_code}</h3> {r.json()["text"]["div"]}'))



### Create Narrative

- Using Jinja2 Template create xhtml for narrative

In [None]:
in_path = ''
in_file = 'STU3capabilitystatement-server.xhtml'


def markdown(text, *args, **kwargs):
    return commonmark(text, *args, **kwargs)



env = Environment(
    loader=FileSystemLoader(searchpath = in_path),
    autoescape=select_autoescape(['html','xml','xhtml','j2','md'])
    )

env.filters['markdown'] = markdown


template = env.get_template(in_file)

print(p_map)
display(HTML(template.render(cs=cs, p_map=p_map)))
rendered = template.render(cs=cs, p_map=p_map)
#print(rendered)

parser = etree.XMLParser(remove_blank_text=True)
root = etree.fromstring(rendered, parser=parser)

div = (etree.tostring(root[1][0], encoding='unicode', method='html'))
narr = N.Narrative()
narr.status = 'generated'
narr.div = div
cs.text = narr


#print(dumps(cs.as_json(),indent=3))

### validate again

In [None]:
print('...validating')
r = validate(cs)
display(HTML(f'<h1>Validation output</h1><h3>Status Code = {r.status_code}</h3> {r.json()["text"]["div"]}'))


### Write to folder

In [None]:
# save to file

rjson = dumps(cs.as_json(), indent=3)
name =f'capabilitystatement-{cs.id.lower()}'
print(name)
write_file(name, rjson)