# Using fhir-pipe

Here is a demo of our pipe which takes in input **mapping rules** and a **SQL database** and output data in the **FHIR format**.

## Set up

Before starting, you need... mapping and a SQL database! Fortunately we have already provided some rules in the repository [`fhir-mapping`](https://github.com/arkhn/fhir-mapping), so all you have to do is to clone it in the same folder than `fhir-pipe`. Regarding data, well, we have provided a small SQL script `config_cw_local.sql` which will build a very small and fake database following the CW format and which is for illustrative purpose. To build your mock `cw_local` database, [install psql](https://www.postgresql.org/download/) (_you only need the command line tool_) and run:

    psql -f config_cw_local.sql 

## Let's get started !

In [1]:
# Allow to load packages from parent
import sys, os
sys.path.insert(1, os.path.realpath(os.path.pardir))

In [2]:
import json
import random
import shutil
import glob
import arkhn

Precise the project and the FHIR resource you want to fill

In [3]:
project = 'Crossway'
resource = 'Patient'
resource_structure = arkhn.fetcher.get_fhir_resource(project, resource)

### Fetch the data from SQL

Build the SQL query, and output also the graph of joins

In [4]:
sql_query, squash_rules, graph = arkhn.parser.build_sql_query(project, resource_structure)

['ICSF.PATIENT.NOPAT', 'ICSF.PATIENT.NOMPAT', 'ICSF.PATIENT.PREPAT', 'ICSF.PATIENT.PREPATSUITE', 'ICSF.PATCOMP.TELPORT', 'ICSF.PATIENT.SEXE', 'ICSF.PATIENT.DTNAIS', 'ICSF.PATIENT.DECEDE', 'ICSF.PATIENT.DTDECES', 'ICSF.PATADR.ADR1', 'ICSF.PATADR.ADR2', 'ICSF.PATADR.ADR3', 'ICSF.PATADR.ADR4', 'ICSF.PATADR.VILLE', 'ICSF.PATADR.CP', 'ICSF.PAYS.LIBELLE', 'ICSF.PATCOMP.SITUAFAM', 'ICSF.PATCOMP.SITUAFAM', 'ICSF.PATMED.NOMED']
[('OneToOne', 'ICSF.PATIENT.NOPAT=ICSF.PATCOMP.NOPAT'), ('OneToOne', 'ICSF.PATIENT.NOPAT=ICSF.PATADR.NOPAT'), ('OneToOne', 'ICSF.PATIENT.NOPAT=ICSF.PATADR.NOPAT'), ('OneToOne', 'ICSF.PATIENT.NOPAT=ICSF.PATADR.NOPAT'), ('OneToOne', 'ICSF.PATIENT.NOPAT=ICSF.PATADR.NOPAT'), ('OneToOne', 'ICSF.PATIENT.NOPAT=ICSF.PATADR.NOPAT'), ('OneToOne', 'ICSF.PATIENT.NOPAT=ICSF.PATADR.NOPAT'), ('OneToOne', 'ICSF.PATIENT.NOPAT=ICSF.PATADR.NOPAT'), ('OneToOne', 'ICSF.PATADR.NOPAYS=ICSF.PAYS.NOPAYS'), ('OneToOne', 'ICSF.PATIENT.NOPAT=ICSF.PATCOMP.NOPAT'), ('OneToOne', 'ICSF.PATIENT.NOPAT=IC

The `graph` object is intended to help understanding the joins made in the query.

In [5]:
graph

Dependency Graph
ICSF.PATIENT: [ICSF.PATIENT O2O:(ICSF.PATCOMP,ICSF.PATADR,ICSF.PATMED) O2M:()]
ICSF.PATCOMP: [ICSF.PATCOMP O2O:() O2M:()]
ICSF.PATADR: [ICSF.PATADR O2O:(ICSF.PAYS) O2M:()]
ICSF.PAYS: [ICSF.PAYS O2O:() O2M:()]
ICSF.PATMED: [ICSF.PATMED O2O:() O2M:()]

In [6]:
graph.get('ICSF.PATIENT')

[ICSF.PATIENT O2O:(ICSF.PATCOMP,ICSF.PATADR,ICSF.PATMED) O2M:()]

In [7]:
print(squash_rules)
print(sql_query)

[(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18), []]
SELECT ICSF.PATIENT.NOPAT, ICSF.PATIENT.NOMPAT, ICSF.PATIENT.PREPAT, ICSF.PATIENT.PREPATSUITE, ICSF.PATCOMP.TELPORT, ICSF.PATIENT.SEXE, ICSF.PATIENT.DTNAIS, ICSF.PATIENT.DECEDE, ICSF.PATIENT.DTDECES, ICSF.PATADR.ADR1, ICSF.PATADR.ADR2, ICSF.PATADR.ADR3, ICSF.PATADR.ADR4, ICSF.PATADR.VILLE, ICSF.PATADR.CP, ICSF.PAYS.LIBELLE, ICSF.PATCOMP.SITUAFAM, ICSF.PATCOMP.SITUAFAM, ICSF.PATMED.NOMED FROM ICSF.PATIENT LEFT JOIN ICSF.PATCOMP ON ICSF.PATIENT.NOPAT = ICSF.PATCOMP.NOPAT LEFT JOIN ICSF.PATADR ON ICSF.PATIENT.NOPAT = ICSF.PATADR.NOPAT LEFT JOIN ICSF.PAYS ON ICSF.PATADR.NOPAYS = ICSF.PAYS.NOPAYS LEFT JOIN ICSF.PATMED ON ICSF.PATIENT.NOPAT = ICSF.PATMED.NOPAT


Now it's time to launch the sql query and perform the real pipe. If your database is not structured like the one on which the mapping was made, it will fail miserably.

### Run the query all at once

In [None]:
print('Launching query...')
rows = arkhn.sql.run(sql_query)

# Fix: replace None values with ''
for i, row in enumerate(rows):
    rows[i] = [e if e is not None else '' for e in row ]

print(len(rows), 'results')

# Apply join rule to merge some lines from the same resource
rows = arkhn.sql.apply_joins(rows, squash_rules)


# Build a fhir object for each resource instance
json_rows = []
for i, row in enumerate(rows):
    if i % 1000 == 0:
        progression = round(i / len(rows) * 100, 2)
        print('PROGRESS... {} %'.format(progression))
    row = list(row)
    # The first node has a different structure so we iterate outside the
    # dfs_create_fhir function
    tree = dict()
    for attr in resource_structure['attributes']:
        arkhn.parser.dfs_create_fhir(tree, attr, row)
    tree, n_leafs = arkhn.parser.clean_fhir(tree)
    tree['id'] = int(random.random() * 10e10)
    json_rows.append(tree)
    # print(json.dumps(tree, indent=2, ensure_ascii=False))

# Uncomment to write to file
arkhn.parser.write_to_file(json_rows, 'fhir_data/samples.json')

### _or_ Run the query per batch

In [None]:
batch_size = 2

offset = arkhn.log.get('pipe.processing.offset', default=0)

for batch_idx, offset, rows in arkhn.sql.batch_run(sql_query, batch_size, offset=offset):
    print('Running batch {} offset {}...'.format(batch_idx, offset))
    # Rm None values
    for i, row in enumerate(rows):
        rows[i] = [e if e is not None else '' for e in row ]
        
    # Apply OneToMany joins
    rows = arkhn.sql.apply_joins(rows, squash_rules)
    
    # Hydrate FHIR objects
    json_rows = []
    for row in rows:
        row = list(row)
        # The first node has a different structure so we iterate outside the
        # dfs_create_fhir function
        tree = dict()
        for attr in resource_structure['attributes']:
            arkhn.parser.dfs_create_fhir(tree, attr, row)
        tree, n_leafs = arkhn.parser.clean_fhir(tree)
        tree['id'] = int(random.random() * 10e10)
        json_rows.append(tree)
        
    # Write to file
    arkhn.parser.write_to_file(json_rows, 'fhir_data/tmp/samples.{}.json'.format(offset))
    
    # Log offset to restart in case of a crash
    arkhn.log.set('pipe.processing.offset', offset)
    
# Rm tmp
arkhn.log.rm('pipe.processing.offset')

print('Merging batches...', end='')
with open('fhir_data/samples.json', 'wb') as merged_file:
    for batch_filename in glob.glob('fhir_data/tmp/samples.*.json'):
        with open(batch_filename, 'rb') as batch_file:
            shutil.copyfileobj(batch_file, merged_file)
print('done')