## OperationDefinition-cpg-cql and OperationDefinition-cpg-library-evaluate example


### Prerequisites

In order to evaluate cql, a FHIR server is required that has implementations for the following clinical practice guidelines.

[OperationDefinition-cpg-cql](http://build.fhir.org/ig/HL7/cqf-recommendations/OperationDefinition-cpg-cql.html)
- URL: [base]/$cql


[OperationDefinition-cpg-library-evaluate](http://build.fhir.org/ig/HL7/cqf-recommendations/OperationDefinition-cpg-library-evaluate.html)
- URL: [base]/Library/[id]/$evaluate
 
The easiest way to deploy a FHIR server with these capabilities is to use the Alvearie Health Patterns ingest pattern described [here](https://github.com/Alvearie/health-patterns/tree/main/ingest) making the following change to the `values.yaml` file to point the FHIR server to the `Quay.io` `fhir-cql` prebuilt container image. Details found [here]( https://github.com/Alvearie/health-patterns/tree/main/services/ibm-fhir-server/fhir-cql-eval).

`fhir:
  name: fhir
  enabled: true
...
  image:
  \# -- The repository to pull the IBM FHIR Server image from
    repository: quay.io/alvearie/fhir-cql
  \# -- IBM FHIR Server container image tag
    tag: "latest"`

Once the deployment is complete, use the URL for the FHIR server in the cells below.  Use the ingestion pattern to persist patients to the FHIR server or persist them directly as shown below.

### Python and patient data setup

In [None]:
# INSTALL PACKAGES THAT ARE REQUIRED

!pip install requests
!pip install pandas

In [None]:
#HELPER CELL TO PRELOAD DIRECTORIES/UNZIP (optional)
#Use this helper to create local directories of patients
#Run with as many directories as you like

import zipfile

with zipfile.ZipFile('patientsOne/patientsOne.zip', 'r') as zip_file:
    zip_file.extractall('.')

In [None]:
#IMPORT modules (run once)

import requests
import json

### FHIR server setup and health check

In [None]:
#FHIR Server SETUP (run once)

fhir_server_url = "https://<<your fhir server base url>>"
username = "<your fhir username>"
password = "<your fhir password>"




In [None]:
# Check the fhir server for health
requests.packages.urllib3.disable_warnings()
resp = requests.get(fhir_server_url + "/$healthcheck", auth=(username, password), verify=False)

print("Status code", resp.status_code)

if resp.status_code == 200:
    print("FHIR server is healthy")



### Loading patients into the FHIR server


In [None]:
# CLEAR patient library
# ALSO need to clear the fhir server of all patients, libraries, etc.

patient_library = {}

In [None]:
# LOAD the fhir server with a single patient json

from os import listdir
from os import path

headers = {'Content-Type': 'application/fhir+json'}

patient_filename = input("Enter a patient filename (.json)")

plist = []

with open(patient_filename, 'r') as file:
    print("Loading (posting) ...", patient_filename)
    response = requests.post(fhir_server_url, 
                             auth=(username, password), 
                             verify=False,
                             data=file.read(), 
                             headers=headers)
    if response.status_code == 200 or response.status_code == 201:
        pid = response.json()["entry"][0]["response"]["location"].split("/_history")[0]
        print(pid)
        patient_library[pid] = []
    else:
        print("Error pushing %s" % (filename))
    


In [None]:
# LOAD the fhir server from a directory of patient jsons

from os import listdir
from os import path

headers = {'Content-Type': 'application/fhir+json'}

patient_directory = input("Enter patient directory: ")

successful_pushes = 0
failed_pushes = []

plist = []
for filename in listdir(patient_directory):
    if not filename.endswith(".json"):
        continue
    with open(path.join(patient_directory, filename), 'r') as file:
        print("Loading (posting) ...", filename)
        response = requests.post(fhir_server_url, 
                                 auth=(username, password), 
                                 verify=False,
                                 data=file.read(), 
                                 headers=headers)
        if response.status_code == 200 or response.status_code == 201:
            successful_pushes += 1
            pid = response.json()["entry"][0]["response"]["location"].split("/_history")[0]
            print(pid)
            plist.append(pid)
        else:
            print("Error pushing %s: %s" % (filename, response.content))
            failed_pushes.append(filename)
    
print("Successfully pushed %s patient files" % successful_pushes)
patient_library[patient_directory] = plist

if failed_pushes:
    print("The following patient files failed: %s" % failed_pushes)


In [None]:
#SHOW PATIENT LIBRARY
#These have been loaded into the fhir server

for k in patient_library:
    print(k)
    for p in patient_library[k]:
        print("    ", p)
        

In [None]:
# Get a list of ALL patients in fhir server

#https://dlr-ext-fhir.wh-health-patterns.dev.watson-health.ibm.com/fhir/Patient
query_params = {
        "_count": 100
    }

def get_patient_list():
    plist = []
    resp = requests.get(fhir_server_url + "/Patient", params=query_params, auth=(username, password), verify=False)

    print("Status code", resp.status_code)

    resp_json = resp.json()

    print("Found", resp_json["total"], "patients")
    for p in resp_json["entry"]:
        pid = p["fullUrl"].split("v4/")[1]
        #print(pid)
        plist.append(pid)
    return plist

subject_list = get_patient_list()
print(subject_list)

## CQL


##### CPGCQL and CPGLibraryEvaluate

- OPERATION: CPGCQL

  - The official URL for this operation definition is:http://hl7.org/fhir/uv/cpg/OperationDefinition/cpg-cql

  - Evaluates a CQL expression and returns the results as a Parameters resource.

  - URL: [base]/$cql




- OPERATION: CPGLibraryEvaluate

  - The official URL for this operation definition is:http://hl7.org/fhir/uv/cpg/OperationDefinition/cpg-library-evaluate

  - Evaluates the contents of a library and returns the results as a Parameters resource.

  - URL: [base]/Library/$evaluate

  - URL: [base]/Library/[id]/$evaluate

### Evaluate INLINE CQL expressions

In [None]:
#Run CQL expression inline against subject

def run_inline_cql(expression, subject):
    query_params = {
        "expression": cql_expression,
        "subject": subject
    }
    resp = requests.get(fhir_server_url + "/$cql", auth=(username, password), params=query_params, verify=False)
    if resp.status_code == 200:
        print("Evaluation successful")

        result_json = resp.json()

        return_stuff = None
        for p in result_json["parameter"]:
            if p["name"] == "return":
                return_stuff = p["part"]
                break

        if return_stuff:
            for apart in return_stuff:
                keylist = apart.keys()
                the_value = None
                part_name = apart["name"]
                print("name", "-->", apart["name"])
                for k in keylist:
                    if k != "name":
                        the_value = apart[k]
                        if k == "resource":
                            the_value = apart[k]["resourceType"] + "/" + apart[k]["id"]
                            print(k, "-->", the_value)
                        else:
                            print(k, "-->", the_value)

    else:
        print("Evaluation failure, return code -->", resp.status_code)

In [None]:
#Run CQL expression against every patient in the patient_list

def run_cql_over_patientlist(cql_expression, patient_list):
    for p in patient_list:
        print("Running against", p)
        run_inline_cql(cql_expression, p)
        


In [None]:
cql_expression = "34 + 12"
subject = "Patient/17dc3b9c5d0-e4448978-ee07-4a33-adee-98542009ec60"

run_inline_cql(cql_expression, subject)

In [None]:
cql_expression = "'dog' + 'house'"
subject = "Patient/17dc3b9c5d0-e4448978-ee07-4a33-adee-98542009ec60"

run_inline_cql(cql_expression, subject)

In [None]:
cql_expression = "34 + 56"
subject_list = patient_library["patientsTwo"]
run_cql_over_patientlist(cql_expression, subject_list)

In [None]:
cql_expression = "[Patient] p where p.gender = 'male'"
subject = "Patient/17dc3b9caba-658e8e4c-8503-4a9f-ad12-754c9f2022f8"
run_inline_cql(cql_expression, subject)

In [None]:
cql_expression = "[Patient] p where p.gender = 'male'"
run_cql_over_patientlist(cql_expression, patient_library["patientsTwo"])

### Create and evaluate Library resources that contain cql code

In [None]:
# Helper that will encode cql into base64

import base64

def encode_to_base64(cql_source):
    message_bytes = cql_source.encode('ascii')
    base64_bytes = base64.b64encode(message_bytes)
    base64_cql = base64_bytes.decode('ascii')

    return base64_cql

In [None]:
# Encode the cql source

cql_source = '''library "PatientsByAgeGender" version '1.0.0'

using FHIR version '4.0.1'

context Patient

define "Patient is Male":
   Patient.gender.value = 'male'
   
define "Patient is Female":
   Patient.gender.value = 'female'
   
define "Initial Population":
   "Patient is Male"
   
define "OlderThan40":
   AgeInYears() >= 40

define "OlderMales":
   "OlderThan40" and "Patient is Male"'''

encoded_cql = encode_to_base64(cql_source)
print(encoded_cql)

In [None]:
# Instantiate a library resource with the base64 encoded cql content
# Be sure the name/version of the resource matches the name/version of the cql source above

library_resource = '''{
  "resourceType": "Library",
  "id": "example2",
  "name": "PatientsByAgeGender",
  "version": "1.0.0",
  "title": "Example library 2",
  "status": "active",
  "type": {
    "coding": [
      {
        "code": "logic-library"
      }
    ]
  },
  "date": "2021-12-13",
  "description": "Example 2 logic",
  "content": [
    {
      "contentType": "text/cql",
      "data": "ENCODED_CQL"
    }
  ]
}'''

library_resource = library_resource.replace("ENCODED_CQL", encoded_cql)

In [None]:
library_resource

In [None]:
#Post a library resource

from os import listdir
from os import path

headers = {'Content-Type': 'application/json'}

response = requests.post(fhir_server_url + "/Library", 
                                 auth=(username, password), 
                                 verify=False,
                                 data=library_resource,
                                 #data=file.read(), 
                                 headers=headers)

library_id = response.headers["location"].split("v4/")[1].split("/_")[0]
print(library_id)

In [None]:
# Get a list of ALL Libraries in fhir server

#https://dlr-ext-fhir.wh-health-patterns.dev.watson-health.ibm.com/fhir/Library

def get_library_list():
    liblist = []
    resp = requests.get(fhir_server_url + "/Library", auth=(username, password), verify=False)

    print("Status code", resp.status_code)

    resp_json = resp.json()

    print("Found", resp_json["total"], "libraries")
    for e in resp_json["entry"]:
        libid = e["fullUrl"].split("v4/")[1]
        #print(pid)
        liblist.append(libid)
    return liblist

library_list = get_library_list()
print(library_list)

In [None]:
def run_library_evaluation(library_id, subject, results_table):
    url = fhir_server_url + "/" + library_id + "/$evaluate"
    #print(url)

    query_params = {
        "subject": subject
    }
    resp = requests.get(url, auth=(username, password), params=query_params, verify=False)
    if resp.status_code == 200:
        print("Evaluation successful")

        result_json = resp.json()
        print(result_json.keys())
        return_stuff = None
        for p in result_json["parameter"]:
            if p["name"] == "return":
                return_stuff = p["part"]
                break

        if return_stuff:
            if results_table["headerrow"] == None:
                #create header row first by getting all parts in order
                headerrow = []
                for col in return_stuff:
                    headerrow.append(col["name"])
                results_table["headerrow"] = headerrow
                
            data_row = []
            data_row.append(subject)
            for apart in return_stuff:
                keylist = apart.keys()
                #print(keylist)
                the_value = None
                for k in keylist:
                    if "value" in k:
                        the_value = apart[k]
                        print(apart["name"], "-->", the_value)
                        data_row.append(the_value)
                        break
                #print(apart["name"], "-->", the_value)
                #data_row.append(the_value)
            results_table["datarows"].append(data_row)

    else:
        print("Evaluation failure, return code -->", resp.status_code)

In [None]:
results_table = {
    "headerrow":None,
    "datarows": []
}
subject = "Patient/17dc3b9c5d0-e4448978-ee07-4a33-adee-98542009ec60"
library_id = "Library/17dc3bc5e89-8fdb55be-078c-4a52-9a0d-3f425e6c242f"
run_library_evaluation(library_id, subject, results_table)

In [None]:
results_table = {
    "headerrow":None,
    "datarows": []
}
subject = "Patient/17dc3b9caba-658e8e4c-8503-4a9f-ad12-754c9f2022f8"
run_library_evaluation(library_id, subject, results_table)

In [None]:
def run_library_over_patientlist(library_id, patient_list, results_table):
    for p in patient_list:
        print("Running against", p)
        run_library_evaluation(library_id, p, results_table)

In [None]:
results_table = {
    "headerrow":None,
    "datarows": []
}

run_library_over_patientlist(library_id, patient_library["patientsOne"] + patient_library["patientsTwo"], results_table)


In [None]:
results_table.keys()

In [None]:
import csv
def create_csv(data_table, filename):
    with open(filename, 'w', newline='') as f:
        writer = csv.writer(f)
        writer.writerow(data_table["headerrow"])
        for arow in data_table["datarows"]:
            writer.writerow(arow)
            
    print(filename, "created")
            
create_csv(results_table, "results.csv")

In [None]:
#Use pandas dataframe to visualize the results

import pandas as pd
data = pd.read_csv("results.csv")
data.head(20)