# FHIR Digitally Signing FHIR Bundle or QuestionnaireResponse Object

This is a Jupyter Notebook using Python 3.7 and openSSl to create JSON Web Signature (JWS)(see RFC 7515) and attach it to a FHIR Bundle or QuestionnaireResponse resource.

- If the resource is a Bundle use Bundle.signature
- If the resource is a QuestionnaireResponse use its [Signature extension](http://hl7.org/fhir/StructureDefinition/questionnaireresponse-signature
)

See signatures: http://build.fhir.org/signatures.html

*Although self-signed certificates are used for the purpose of these examples, they are not recommended for production systems.*

### Import Libraries

In [60]:
from requests import get, post
from json import dumps, loads
from yaml import dump as dumpy
from pathlib import Path
from datetime import datetime
import pytz
from jose import jws  #python JWS package
from base64 import b64encode
from jcs import canonicalize #package for a JCS (RFC 8785) compliant canonicalizer.
from cryptography import x509
from cryptography.hazmat.backends import default_backend

### 1. Define the location of the Document Signing Certificate

 - See the Jupyter file [Create_Cert.ipynb]() for how to generate your own self-signed certificate.

In [61]:
certificate_path = Path('example_org_cert') # update this to your folder
cert_pem = certificate_path / 'cert.pem'
cert_der = certificate_path / 'cert.der'
certificate = x509.load_der_x509_certificate(cert_der.read_bytes(), default_backend())
# certificate

### 3. Create JWS to Attach to Bundle or QR (START HERE IF USING THE SAME CERT)

#### 3.01  Auto Timestamp Function

In [62]:
auto_timestamp = False
timezone= 'US/Pacific'

def get_timestamp(timezone):  # Get current time in timezone
  tz = pytz.timezone('US/Pacific')
  current_time = datetime.now(tz)
  iso_timestamp = current_time.isoformat()  # Format as ISO 8601 compliant string
  return(iso_timestamp)
# print(get_timestamp(timezone))

#### 3.02 Extract Common Name (CN) from the Subject as String

In [63]:
def get_cn():
  for attr in certificate.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME):
      cn_raw = attr.value  # e.g., "John Hancock, MD"
      # Check if CN is an email address (contains '@')
      if '@' not in cn_raw:
          cn = cn_raw
          break  # Use the first non-email CN
  return(cn)
# print(get_cn())

#### 3.03 Extract Subject Key Identifier (KID) as Hexadecimal String
    return(kid)

In [64]:
def get_kid():
    kid_ext = certificate.extensions.get_extension_for_class(x509.SubjectKeyIdentifier)
    kid = kid_ext.value.digest.hex() # Hexadecimal string
    return(kid)
# print(get_kid())

#### 3.04 Extract Subject Alternative Name (SAN) as Dict

- format
  -  "otherName" = 2.16.840.1.113883.4.6;UTF8:9941339108
  -  "URI" = https://example.org/fhir/Practitioner/123

In [65]:
def get_san():
  san = {}
  san['NPI'] = []
  san_ext = certificate.extensions.get_extension_for_class(x509.SubjectAlternativeName)
  san['DNS'] = san_ext.value.get_values_for_type(x509.DNSName)
  san['OtherName'] = san_ext.value.get_values_for_type(x509.OtherName)
  san['URI'] = san_ext.value.get_values_for_type(x509.UniformResourceIdentifier)
  for other_name in san['OtherName']:
      if other_name.type_id.dotted_string == '2.16.840.1.113883.4.6':
        san['NPI'].append(other_name.value.decode('utf-8').replace('\x0c\n',''))
  return san
# print(get_san()['NPI'][0])

#### 3.1. Prepare Header

 note the base64 DER is Cert PEM file wihout the footer and header and line returns

In [66]:
der = cert_pem.read_text()
der = der.replace('-----BEGIN CERTIFICATE-----','')
der = der.replace('-----END CERTIFICATE-----','')
der = der.replace('\n','')
header = {
    "alg": "RS256",
    "kty": "RS",
    "sigT": get_timestamp(timezone) if auto_timestamp else '2020-10-23T04:54:56.048+00:00',
    "kid": get_kid(),
    "x5c": [der],
     }
header

{'alg': 'RS256',
 'kty': 'RS',
 'sigT': '2020-10-23T04:54:56.048+00:00',
 'kid': '7c2606b650f7870c60dd88f434fedaafd0f3f739',
 'x5c': ['MIIFWjCCA8KgAwIBAgIUJEN21OC1E4vmQW/MS7hv8soWBgswDQYJKoZIhvcNAQELBQAwgZUxCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDYWxpZm9ybmlhMRIwEAYDVQQHDAlTYXVzYWxpdG8xHTAbBgNVBAoMFEV4YW1wbGUgT3JnYW5pemF0aW9uMRkwFwYDVQQDDBBKb2huIEhhbmNvY2ssIE1EMSMwIQYJKoZIhvcNAQkBFhRqaGFuY29ja0BleGFtcGxlLm9yZzAeFw0yNTA2MjAyMjE0NTVaFw0yNzA2MTAyMjE0NTVaMIGVMQswCQYDVQQGEwJVUzETMBEGA1UECAwKQ2FsaWZvcm5pYTESMBAGA1UEBwwJU2F1c2FsaXRvMR0wGwYDVQQKDBRFeGFtcGxlIE9yZ2FuaXphdGlvbjEZMBcGA1UEAwwQSm9obiBIYW5jb2NrLCBNRDEjMCEGCSqGSIb3DQEJARYUamhhbmNvY2tAZXhhbXBsZS5vcmcwggGiMA0GCSqGSIb3DQEBAQUAA4IBjwAwggGKAoIBgQCjH5km+I+96qNwBBLxtEYq3QCkY8jngDbjpPxD0WElWXWicB4gr8e9pHa6AA3pQc4MpBiQvVnUs7DmfmjNcolvqizvsgWZCODQm4pCbADNaO/0cDVagHXBsDgdmAxFRxHPMneUadwLa0pokRhAWd1hyz0xXTbdES1VjhYgOEB098Uq2shIzvIfc4dIFTPSy8Et+EpQAZo3+D68NAfWF9n4QyEVg8TH5YyqPUaQoEUKFr2Zm4+KFjVW9Xw3siVuV0wV2dGcU2L9QgDoxOX812t+UoLXuXFOXf9sRRsZb2unmV1RXXERC

### 3.2 Fetch Payload from FHIR Server or Local File

- **set local_file to "True" for local file**
- to preserve meta and id in the fetched payload need to:
   -  run ig-publisher only  ("-i" option, no "-k" or "-n" options)
   -  remove example from the parameter
  

In [67]:
local_file = True
# payload_id = 'Bundle-cdex-document-digital-sig-example.json' # can be a Bundle or Questionnaire
# payload_id = 'Bundle-cdex-searchbundle-digital-sig-example.json' # can be a Bundle or Questionnaire
payload_id = 'QuestionnaireResponse-cdex-questionnaireresponse-example4.json' # can be a Bundle or Questionnaire


def fetch_payload():

  # url = "https://argopatientlist.aidbox.app/fhir/Bundle"
  # url = 'http://test.fhir.org/r4/Bundle'
  url = 'https://hl7.org/fhir/us/davinci-cdex'

  username = "basic"
  password = "secret"
  headers = {"Accept": "application/fhir+json" , "Content-Type": "application/fhir+json"}

  r = get(f'{url}/{payload_id}', auth=(username, password), headers = headers)
  my_obj = r.json()
  # print("="*80)
  # print("STATUS: ",r.status_code)

  # print("="*80)
  # print("HEADERS:\n")
  # for k,v in r.headers.items():
  #     print(f'{k} = {v}')
  # print("="*80)
  # print("BODY:\n")
  # print(dumps(my_obj,indent=2))
  return(my_obj)


if local_file: 
  in_path = '/Users/ehaas/Documents/FHIR/davinci-ecdx/output'

  path = Path() / in_path / payload_id
  my_obj =loads(path.read_text())
else:
  my_obj = fetch_payload()
print(dumps(my_obj,indent=2))

{
  "resourceType": "QuestionnaireResponse",
  "id": "cdex-questionnaireresponse-example4",
  "text": {
    "status": "generated",
    "div": "<div xmlns=\"http://www.w3.org/1999/xhtml\"><p class=\"res-header-id\"><b>Generated Narrative: QuestionnaireResponse cdex-questionnaireresponse-example4</b></p><a name=\"cdex-questionnaireresponse-example4\"> </a><a name=\"hccdex-questionnaireresponse-example4\"> </a><div style=\"display: inline-block; background-color: #d9e0e7; padding: 6px; margin: 4px; border: 1px solid #8da1b4; border-radius: 5px; line-height: 60%\"><p style=\"margin-bottom: 0px\"/><p style=\"margin-bottom: 0px\">Profile: <a href=\"StructureDefinition-cdex-sdc-questionnaireresponse.html\">CDex SDC QuestionnaireResponse Profile</a></p></div><table border=\"1\" cellpadding=\"0\" cellspacing=\"0\" style=\"border: 1px #F0F0F0 solid; font-size: 11px; font-family: verdana; vertical-align: top;\"><tr style=\"border: 2px #F0F0F0 solid; font-size: 11px; font-family: verdana; vertical

#### 3.2.1 Prepare Payload

The payload is the base64_url form of the canonicalized version of the resource before attaching the signature
 

#####  Canonicalize the resource using IETF JSON Canonicalization Scheme (JCS) before adding the signature element

- Remove the id, meta, and signature elements if present before canonicalization
- Note keep the the generated narrative if present

In [68]:
if my_obj['resourceType'] in ['Bundle', 'QuestionnaireResponse']:
  my_obj_id = my_obj.pop('id', None)
  my_obj_meta = my_obj.pop('meta', None)
  my_obj_old_sig = my_obj.pop('signature', None)
else:
  print('Not a Bundle or QuestionnaireResponse')


if my_obj['resourceType'] == 'QuestionnaireResponse':
  try:
    for i, extension in enumerate(my_obj['extension']):
       if extension['url'] == 'http://hl7.org/fhir/StructureDefinition/questionnaireresponse-signature':
          my_obj_signature_ext = my_obj['extension'].pop(i) # remove element
    if i == 0:
      my_obj.pop('extension') # remove extension if empty
  except KeyError:
    print('No signature extension found')

payload = canonicalize(my_obj)
payload, len(payload)

(b'{"author":{"identifier":{"system":"http://hl7.org/fhir/sid/us-npi","value":"9941339100"}},"authored":"2022-06-17","item":[{"answer":[{"valueString":"Examplitis"}],"linkId":"1","text":"Relevant Patient Diagnoses (conditions that might be expected to improve with oxygen therapy)"},{"answer":[{"valueCoding":{"code":"4","display":"Replacement","system":"http://example.org"}}],"linkId":"2","text":"Order Reason"}],"questionnaire":"http://example.org/cdex-questionnaire-example2","resourceType":"QuestionnaireResponse","status":"completed","subject":{"display":"Amy Shaw","identifier":{"system":"http://example.org/cdex/payer/member-ids","type":{"coding":[{"code":"MB","display":"Member Number","system":"http://terminology.hl7.org/CodeSystem/v2-0203"}],"text":"Member Number"},"use":"usual","value":"Member123"}},"text":{"div":"<div xmlns=\\"http://www.w3.org/1999/xhtml\\"><p class=\\"res-header-id\\"><b>Generated Narrative: QuestionnaireResponse cdex-questionnaireresponse-example4</b></p><a name

##### Then base64_url the payload entry

note this step is combined with 3.3 below using the jws.sign method.

#### 3.3 Create Signature using private key and the RS256 algorithm to get the JWS compact serialization format

note the signature is displayed with the parts labeled and separated with line breaks for easier viewing.

In [70]:
private_key_path = certificate_path / 'private-key.pem'
private_key = private_key_path.read_text()
private_key

signature = jws.sign(payload,private_key,algorithm='RS256',headers=header)

labels = ['header', 'payload', 'signature']
for i,j in enumerate(signature.split('.')):
    print(f'{labels[i]}:\n{j}\n')

header:
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdjMjYwNmI2NTBmNzg3MGM2MGRkODhmNDM0ZmVkYWFmZDBmM2Y3MzkiLCJrdHkiOiJSUyIsInNpZ1QiOiIyMDIwLTEwLTIzVDA0OjU0OjU2LjA0OCswMDowMCIsInR5cCI6IkpXVCIsIng1YyI6WyJNSUlGV2pDQ0E4S2dBd0lCQWdJVUpFTjIxT0MxRTR2bVFXL01TN2h2OHNvV0Jnc3dEUVlKS29aSWh2Y05BUUVMQlFBd2daVXhDekFKQmdOVkJBWVRBbFZUTVJNd0VRWURWUVFJREFwRFlXeHBabTl5Ym1saE1SSXdFQVlEVlFRSERBbFRZWFZ6WVd4cGRHOHhIVEFiQmdOVkJBb01GRVY0WVcxd2JHVWdUM0puWVc1cGVtRjBhVzl1TVJrd0Z3WURWUVFEREJCS2IyaHVJRWhoYm1Odlkyc3NJRTFFTVNNd0lRWUpLb1pJaHZjTkFRa0JGaFJxYUdGdVkyOWphMEJsZUdGdGNHeGxMbTl5WnpBZUZ3MHlOVEEyTWpBeU1qRTBOVFZhRncweU56QTJNVEF5TWpFME5UVmFNSUdWTVFzd0NRWURWUVFHRXdKVlV6RVRNQkVHQTFVRUNBd0tRMkZzYVdadmNtNXBZVEVTTUJBR0ExVUVCd3dKVTJGMWMyRnNhWFJ2TVIwd0d3WURWUVFLREJSRmVHRnRjR3hsSUU5eVoyRnVhWHBoZEdsdmJqRVpNQmNHQTFVRUF3d1FTbTlvYmlCSVlXNWpiMk5yTENCTlJERWpNQ0VHQ1NxR1NJYjNEUUVKQVJZVWFtaGhibU52WTJ0QVpYaGhiWEJzWlM1dmNtY3dnZ0dpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCandBd2dnR0tBb0lCZ1FDakg1a20rSSs5NnFOd0JCTHh0RVlxM1FDa1k4am5nRGJqcFB4RDBXRWxXWFdpY0I0Z3I4

#### 3.4. Option to Create "detached-content" payload by removing the payload from the JWS

note the signature is displayed with the parts labeled and separated with line breaks for easier viewing then as compact serialization format

In [71]:
detached = False
if detached:
  split_sig = signature.split('.')
  split_sig[1] = ''
  signature = '.'.join(split_sig)
  for i,j in enumerate(signature.split('.')):
      print(f'{labels[i]}:\n{j}\n')
print(f'\nSignature in compact serialization format:\n{"="*80}\n{signature}')


Signature in compact serialization format:
eyJhbGciOiJSUzI1NiIsImtpZCI6IjdjMjYwNmI2NTBmNzg3MGM2MGRkODhmNDM0ZmVkYWFmZDBmM2Y3MzkiLCJrdHkiOiJSUyIsInNpZ1QiOiIyMDIwLTEwLTIzVDA0OjU0OjU2LjA0OCswMDowMCIsInR5cCI6IkpXVCIsIng1YyI6WyJNSUlGV2pDQ0E4S2dBd0lCQWdJVUpFTjIxT0MxRTR2bVFXL01TN2h2OHNvV0Jnc3dEUVlKS29aSWh2Y05BUUVMQlFBd2daVXhDekFKQmdOVkJBWVRBbFZUTVJNd0VRWURWUVFJREFwRFlXeHBabTl5Ym1saE1SSXdFQVlEVlFRSERBbFRZWFZ6WVd4cGRHOHhIVEFiQmdOVkJBb01GRVY0WVcxd2JHVWdUM0puWVc1cGVtRjBhVzl1TVJrd0Z3WURWUVFEREJCS2IyaHVJRWhoYm1Odlkyc3NJRTFFTVNNd0lRWUpLb1pJaHZjTkFRa0JGaFJxYUdGdVkyOWphMEJsZUdGdGNHeGxMbTl5WnpBZUZ3MHlOVEEyTWpBeU1qRTBOVFZhRncweU56QTJNVEF5TWpFME5UVmFNSUdWTVFzd0NRWURWUVFHRXdKVlV6RVRNQkVHQTFVRUNBd0tRMkZzYVdadmNtNXBZVEVTTUJBR0ExVUVCd3dKVTJGMWMyRnNhWFJ2TVIwd0d3WURWUVFLREJSRmVHRnRjR3hsSUU5eVoyRnVhWHBoZEdsdmJqRVpNQmNHQTFVRUF3d1FTbTlvYmlCSVlXNWpiMk5yTENCTlJERWpNQ0VHQ1NxR1NJYjNEUUVKQVJZVWFtaGhibU52WTJ0QVpYaGhiWEJzWlM1dmNtY3dnZ0dpTUEwR0NTcUdTSWIzRFFFQkFRVUFBNElCandBd2dnR0tBb0lCZ1FDakg1a20rSSs5NnFOd0JCTHh0RVlxM1FD

### 4. base64 the JWS and add the Signature element to the Bundle or QR

this is what would be contained and/or referenced by TASK over-the-wire

In [72]:
b64_jws = b64encode(signature.encode()).decode()
sig_element = {
            "type": [  # Signature.type = Verification Signature
              {
                "system": "urn:iso-astm:E1762-95:2013",
                "code": "1.2.840.10065.1.12.1.5",
                "display": "Verification Signature"
              }
            ],
            "when": get_timestamp(timezone) if auto_timestamp else '2020-10-23T04:54:56.048+00:00', #system timestamp when signature created
            # "who" { #Reference to the Practitioner who signed and attested to the Bundle
            #   "reference": "Practitioner/123"  
            #   "display": "Practitioner/123" 
            # },
            # "onBehalfOf": { #Reference to the Organization
            #   "reference": "Organization/123"
            #   "display": "Organization/123"
            # },
            "who": { #Reference to the Practitioner who signed and attested to the Bundle using NPI should be same as SAN
                    "identifier": {
                        "system": "http://hl7.org/fhir/sid/us-npi",
                        "type" : {
                            "coding" : [{
                                "system" : "http://terminology.hl7.org/CodeSystem/v2-0203",
                                "code" : "NPI"
                              }]
                          },
                        "value": get_san()['NPI'][0] # extract SAN from certificate
                    },
                    "display": get_cn()  #  extract CN from certificate
                },
            "onBehalfOf": {
                    "identifier": {
                        "system": "http://hl7.org/fhir/sid/us-npi",
                        "value": "1184932014"
                    }
                },
            "targetFormat" : "application/fhir+json;canonicalization=http://hl7.org/fhir/canonicalization/json#document", # The technical format of the signed resources see 
            #https:// hl7.org/fhir/json.html#canonical. The MIME types can include optional parameters in the format type/subtype;
            # parameter=value, as defined in RFC 2045 and RFC 6838. For example, text/plain;charset=utf-8 specifies a character encoding.
            "sigFormat" : "application/jose", # The technical format of the signature
            "data": b64_jws,
             }

if my_obj_id :
  my_obj['id'] =  my_obj_id # update id
if my_obj_meta:
  my_obj['meta'] = my_obj_meta # update meta
if my_obj['resourceType'] == 'Bundle':
  my_obj['signature'] = sig_element # update signature
if my_obj['resourceType'] == 'QuestionnaireResponse':
  try:
    my_obj['extension'].append(dict(
        url='http://hl7.org/fhir/StructureDefinition/questionnaireresponse-signature',
        valueSignature=sig_element
        )) # update signature  
  except KeyError:
    my_obj['extension'] = [dict(
        url='http://hl7.org/fhir/StructureDefinition/questionnaireresponse-signature',
        valueSignature=sig_element
        )]

print(dumps(my_obj, indent=2, sort_keys=False))

{
  "resourceType": "QuestionnaireResponse",
  "text": {
    "status": "generated",
    "div": "<div xmlns=\"http://www.w3.org/1999/xhtml\"><p class=\"res-header-id\"><b>Generated Narrative: QuestionnaireResponse cdex-questionnaireresponse-example4</b></p><a name=\"cdex-questionnaireresponse-example4\"> </a><a name=\"hccdex-questionnaireresponse-example4\"> </a><div style=\"display: inline-block; background-color: #d9e0e7; padding: 6px; margin: 4px; border: 1px solid #8da1b4; border-radius: 5px; line-height: 60%\"><p style=\"margin-bottom: 0px\"/><p style=\"margin-bottom: 0px\">Profile: <a href=\"StructureDefinition-cdex-sdc-questionnaireresponse.html\">CDex SDC QuestionnaireResponse Profile</a></p></div><table border=\"1\" cellpadding=\"0\" cellspacing=\"0\" style=\"border: 1px #F0F0F0 solid; font-size: 11px; font-family: verdana; vertical-align: top;\"><tr style=\"border: 2px #F0F0F0 solid; font-size: 11px; font-family: verdana; vertical-align: top\"><th style=\"vertical-align: top; 

### Using FHIR RESTful create POST to FHIR Server 
***Deactivated until until get new AIDBOX***
using AIDBox for now at `https://argopatientlist.aidbox.app/fhir/`
<!-- using `http://test.fhir.org/r4/` -->


### Alternatively Write to Local File as JSON and YAML

In [73]:
out_path = Path(r'out_files/signed_object.json')
print(f'Writing signed object to {out_path}')
out_path.write_text(dumps(my_obj,indent=2,sort_keys=False))
out_path = Path(r'out_files/signed_object.yml')
print(f'Writing signed object to {out_path}')
how_to = '''
# steps to create/update dig signature examples

# 1. create unsigned Bundle or QuestionnaireResponse
# 1. run sushi
# 1. run publisher with only -i parameter
# 1. then optionally create certificate using this script file: https://github.com/HL7/davinci-ecdx/blob/master/CDEX-Signatures/Create_Cert.ipynb
# 1. create signatures using this script file: https://github.com/HL7/davinci-ecdx/blob/master/CDEX-Signatures/Create_Digsign_Bundle_or_QR.ipynb
# 1. save as YAML source
# 1. run sushi and publisher again with -ink parameters
# 1. verify signatures using this script file: https://github.com/HL7/davinci-ecdx/blob/master/CDEX-Signatures/Verify_digsign_Bundle_or_QR.ipynb
'''

out_path.write_text(f'{how_to}\n{dumpy(my_obj,indent=2,sort_keys=False)}')

Writing signed object to out_files/signed_object.json
Writing signed object to out_files/signed_object.yml


31642