# Interactive pipeline runner notebook

In [8]:
import sys
sys.path.insert(1, '..')
import tools.read_data as rd
import pandas as pd
import unittest
import time
import json
import importlib
for k,v in list(sys.modules.items()):
    if k.startswith('tools'):
        importlib.reload(v)

## Read in the data

In [9]:
# read in the json data using a generator method - usually slightly faster for larger datasets
patient_json_list = []
start = time.time()
for json_obj in rd.read_json_files('data'):
    patient_json_list.append(json_obj)
end = time.time()
print(end - start)

2.251110315322876


In [10]:
# read in the json data using a standard method - usually slightly slower for larger datasets
patient_json_list_alt = []
start = time.time()
patient_file_list = rd.get_file_list('data')
for json_obj in patient_file_list:
    patient_json_list_alt.append(rd.read_patient_file('data', json_obj))
end = time.time()
print(end - start)

2.689486026763916


In [None]:
# we have also implemented functions to read in the data from a database or an API - these are not implemented in this example
# patient_json_list = rd.get_json_objects_from_API('https://www.example-patient-api.com/get-patient-FHIR-data')

In [11]:
# check the two different methods correctly calculated the same list
assert patient_json_list == patient_json_list_alt, "Two alternative methods of calculating the same FHIR data list have returned different results"

## Run tests for data quality / correctness

In [12]:
# run all tests on the incoming data
from tools.data_tests import TestFHIRData

test_runner = unittest.TextTestRunner()
for json_obj in patient_json_list:
    TestFHIRData.JSON_OBJ = json_obj
    test_suite = unittest.TestLoader().loadTestsFromTestCase(TestFHIRData)
    test_results = test_runner.run(test_suite)

....
----------------------------------------------------------------------
Ran 4 tests in 0.372s

OK
....
----------------------------------------------------------------------
Ran 4 tests in 0.206s

OK
....
----------------------------------------------------------------------
Ran 4 tests in 0.108s

OK
....
----------------------------------------------------------------------
Ran 4 tests in 0.338s

OK
....
----------------------------------------------------------------------
Ran 4 tests in 5.321s

OK
....
----------------------------------------------------------------------
Ran 4 tests in 0.314s

OK
....
----------------------------------------------------------------------
Ran 4 tests in 0.205s

OK
....
----------------------------------------------------------------------
Ran 4 tests in 0.144s

OK
....
----------------------------------------------------------------------
Ran 4 tests in 0.276s

OK
....
----------------------------------------------------------------------
Ran 4 

In [None]:
# example test failure
with open("resources/bad_example.json") as f:
    bad_json_obj = json.load(f)

TestFHIRData.JSON_OBJ = bad_json_obj
test_suite = unittest.TestLoader().loadTestsFromTestCase(TestFHIRData)
test_results = test_runner.run(test_suite)

FFFF
FAIL: test_all_fields_in_patient (tools.data_tests.TestFHIRData)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/Users/joshuastapleton/Desktop/EMIS_interview/exa-data-eng-assessment/tools/data_tests.py", line 43, in test_all_fields_in_patient
    self.assertIn(field, expected_fields, msg=f"{field} field not found in expected fields list")
AssertionError: 'subject' not found in ['resourceType', 'fhir_comments', 'id', 'implicitRules', 'implicitRules__ext', 'language', 'language__ext', 'meta', 'contained', 'extension', 'modifierExtension', 'text', 'active', 'active__ext', 'address', 'birthDate', 'birthDate__ext', 'communication', 'contact', 'deceasedBoolean', 'deceasedBoolean__ext', 'deceasedDateTime', 'deceasedDateTime__ext', 'gender', 'gender__ext', 'generalPractitioner', 'identifier', 'link', 'managingOrganization', 'maritalStatus', 'multipleBirthBoolean', 'multipleBirthBoolean__ext', 'multipleBirthInteger', 'multi

## Basic / local implementation of pipeline using filesystem

In [None]:
from fhir.resources.patient import Patient
from fhir.resources.bundle import Bundle

FHIR_patient_object_list = [Patient.parse_obj(Bundle.parse_obj(patient_json).entry[0].resource) for patient_json in patient_json_list]
patient_df = rd.patients_to_dataframe(FHIR_patient_object_list).drop(columns=['resource_type']) # we can drop this column because it is constant by definition

In [None]:
# write the raw tabular data to a csv file. This needs to be normalized and cleaned before it can be used for analysis.
patient_df.to_csv('data_output/patient_data_tabular_raw.csv', index=False)

In [None]:
# 1NF normalization - each table cell should have a single value
# the columns in the dataframe in need of normalization are extension, address, maritalStatus, name, telecom, etc.
# a naive solution would be to explode the columns that are lists. This, however, tends to become monolithic, as the number of table rows grows exponentially.
print("exploding column: extension")
patient_exploded_df = patient_df.explode('extension') # start by exploding extension - the first column of type list
for column in patient_df.columns.drop('extension'):
    if type(patient_df[column][0]) == list:
        print("exploding column: " + column)
        patient_exploded_df = patient_exploded_df.explode(column)

patient_exploded_df.to_csv('data_output/1NF_data/patient_data_tabular.csv', index=False)

exploding column: address
exploding column: communication
exploding column: identifier
exploding column: name
exploding column: telecom


In [None]:
# 2NF normalization - create additional tables for initial table cells with multiple/list entires
# this is a more complex solution, but it is more scalable, easier to maintain, and there is less data redundancy
patient_df_2NF = patient_df.copy()

for column in patient_df_2NF.columns:
    if type(patient_df_2NF[column][0]) == list:
        print("exploding column: " + column)
        patient_exploded_df = patient_df_2NF.explode(column)
        patient_df_2NF = patient_df_2NF.drop(columns=[column])

        # drop all columns from the exploded dataframe that are in the original dataframe except ID
        NF_columns = list(patient_df_2NF.columns)
        NF_columns.remove('id')
        patient_exploded_df.drop(columns=NF_columns, inplace=True)
        patient_exploded_df.to_csv('data_output/2NF_data/patient_data_tabular_' + column + '.csv', index=False)

# finally, write the original table with all multi-value columns removed to a csv file
patient_df_2NF.to_csv('data_output/2NF_data/patient_data_tabular.csv', index=False)

exploding column: extension
exploding column: address
exploding column: communication
exploding column: identifier
exploding column: name
exploding column: telecom


In [None]:
# #  we can further expand the table by identifying values which are FHIR objects and splitting them up by field, however this can get tricky with string parsing. 
# For now, we will write the whole objects encoded as strings.

# patient_1NF_df = pd.read_csv('data_output/1NF_data/patient_data_tabular.csv')
# for column in patient_1NF_df.columns:
#     # if the column starts with 'resource'
#     first_column_value = patient_1NF_df[column].values[0]
#     if type(first_column_value) == str and first_column_value.startswith('resource_type'):
#         print("Fields of column:", column)
#         for field in first_column_value.split(' '):
#             print("--",field)

## SQL implementation of pipeline with database

### First, create the tables

In [1]:
import sys
sys.path.insert(1, '..')
import pandas as pd
import tools.read_data as rd
import tools.create_database as cd
import tools.update_database as ud
from sqlalchemy import text
import importlib
for k,v in list(sys.modules.items()):
    if k.startswith('tools'):
        importlib.reload(v)

# get/open the connection to the patient database
patient_database_conn = cd.connect_to_sqla_server()

In [2]:
# dropping tables if they already exist
patient_database_conn.execute(text("DROP TABLE IF EXISTS patient"))
patient_database_conn.execute(text("DROP TABLE IF EXISTS address"))
patient_database_conn.execute(text("DROP TABLE IF EXISTS communication"))
patient_database_conn.execute(text("DROP TABLE IF EXISTS extension"))
patient_database_conn.execute(text("DROP TABLE IF EXISTS identifier"))
patient_database_conn.execute(text("DROP TABLE IF EXISTS name"))
patient_database_conn.execute(text("DROP TABLE IF EXISTS telecom"))

<sqlalchemy.engine.cursor.CursorResult at 0x1077e2a00>

In [3]:
# create the tables in the patient database with 2NF standards - these correspond to the csv files in the data_output 2NF folder generated in the previous steps
TWONF_CREATE_PATIENT_TABLE_SQL = """
        CREATE TABLE patient (
            fhir_comments string,
            id string,
            implicitRules string,
            implicitRules__ext string,
            language string,
            language__ext string,
            meta string,
            contained string,
            modifierExtension string,
            text string,
            active string,
            active__ext string,
            birthDate string,
            birthDate__ext string,
            contact string,
            deceasedBoolean string,
            deceasedBoolean__ext string,
            deceasedDateTime string,
            deceasedDateTime__ext string,
            gender string,
            gender__ext string,
            generalPractitioner string,
            link string,
            managingOrganization string,
            maritalStatus string,
            multipleBirthBoolean string,
            multipleBirthBoolean__ext string,
            multipleBirthInteger string,
            multipleBirthInteger__ext string,
            photo string
        );
    """

ud.execute_sql(TWONF_CREATE_PATIENT_TABLE_SQL, patient_database_conn)

# create the address table
TWONF_CREATE_ADDRESS_TABLE_SQL = """
    CREATE TABLE address (
        id bool,
        address string
    );
"""
ud.execute_sql(TWONF_CREATE_ADDRESS_TABLE_SQL, patient_database_conn)

# create the communication table
TWONF_CREATE_COMMUNICATION_TABLE_SQL = """
    CREATE TABLE communication (
        id string,
        communication string
    );
"""
ud.execute_sql(TWONF_CREATE_COMMUNICATION_TABLE_SQL, patient_database_conn)

# create the extension table
TWONF_CREATE_EXTENSION_TABLE_SQL = """
    CREATE TABLE extension (
        id string,
        extension string
    );
"""
ud.execute_sql(TWONF_CREATE_EXTENSION_TABLE_SQL, patient_database_conn)

# create the identifier table
TWONF_CREATE_IDENTIFIER_TABLE_SQL = """
    CREATE TABLE identifier (
        id string,
        identifier string
    );
"""
ud.execute_sql(TWONF_CREATE_IDENTIFIER_TABLE_SQL, patient_database_conn)

# create the name table
TWONF_CREATE_NAME_TABLE_SQL = """
    CREATE TABLE name (
        id string,
        name string
    );
"""
ud.execute_sql(TWONF_CREATE_NAME_TABLE_SQL, patient_database_conn)

# create the telecom table
TWONF_CREATE_TELECOM_TABLE_SQL = """
    CREATE TABLE telecom (
        id string, 
        telecom string
    );
"""
ud.execute_sql(TWONF_CREATE_TELECOM_TABLE_SQL, patient_database_conn)

# IMPORTANT:
# I decided to use the generic, more pythonic variable types for the database table fields (ie: string as opposed to VARCHAR).
# This is because I am not 100% sure what types of data will be coming in, and if FHIR data types map exactly to SQL data types.
# Once we have a stronger idea of the data types based on incoming data consistency, we can update the table fields to be more specific and SQL compliant.

True

### Second, populate the tables

In [4]:
# Populate the main table.
# We can workaround writing any physical artifacts by simply writing the dataframes as calculated above to the database.
patient_df = pd.read_csv('data_output/2NF_data/patient_data_tabular.csv')

def replace_quotes(df): # this can mess up formatting
    """
    Replaces all instances of " with ' in all cells in a pandas DataFrame.
    """
    return df.applymap(lambda x: str(x).replace("\"", "'"))
patient_df = replace_quotes(patient_df)

for index, row in patient_df.iterrows():
    values = row.to_list()
    values_string = '\", \"'.join(values)
    values_string = '\"'+values_string+'\"'
    insert_sql = "INSERT INTO patient (fhir_comments, id, implicitRules, implicitRules__ext, language, language__ext, meta, contained, modifierExtension, text, active, active__ext, birthDate, birthDate__ext, contact, deceasedBoolean, deceasedBoolean__ext, deceasedDateTime, deceasedDateTime__ext, gender, gender__ext, generalPractitioner, link, managingOrganization, maritalStatus, multipleBirthBoolean, multipleBirthBoolean__ext, multipleBirthInteger, multipleBirthInteger__ext, photo) VALUES ("+values_string+")"
    patient_database_conn.execute(text(insert_sql))

# populate the sub tables
sub_table_list = rd.get_file_list('data_output/2NF_data')
for table in sub_table_list[:-1]: # populate all sub tables
    table_data = pd.read_csv('data_output/2NF_data/'+table)
    table_name = str(table_data.columns[1]) # same as the second column name
    for index, row in table_data.iterrows():
        patient_id = "\""+row['id']+"\""
        # enclose with double quotes to ensure it is recognized as a string
        value = "\""+row[table_name].replace("\"","'")+"\""
        insert_sql = "INSERT INTO "+table_name+" (id, "+table_name+") VALUES ({}, {})".format(patient_id, value)
        patient_database_conn.execute(text(insert_sql))

### Example queries

In [7]:
# example of how to query the database
result = patient_database_conn.execute(text("SELECT * FROM extension"))
df = pd.DataFrame(result.all(), columns=result.keys())
df

Unnamed: 0,id,extension
0,8a3247d3-a54c-43f2-2c5d-a8f5e28ff588,resource_type='Extension' fhir_comments=None e...
1,8a3247d3-a54c-43f2-2c5d-a8f5e28ff588,resource_type='Extension' fhir_comments=None e...
2,8a3247d3-a54c-43f2-2c5d-a8f5e28ff588,resource_type='Extension' fhir_comments=None e...
3,8a3247d3-a54c-43f2-2c5d-a8f5e28ff588,resource_type='Extension' fhir_comments=None e...
4,8a3247d3-a54c-43f2-2c5d-a8f5e28ff588,resource_type='Extension' fhir_comments=None e...
...,...,...
548,b0bccf43-3bf5-217c-7315-9e44d106bb6b,resource_type='Extension' fhir_comments=None e...
549,b0bccf43-3bf5-217c-7315-9e44d106bb6b,resource_type='Extension' fhir_comments=None e...
550,b0bccf43-3bf5-217c-7315-9e44d106bb6b,resource_type='Extension' fhir_comments=None e...
551,b0bccf43-3bf5-217c-7315-9e44d106bb6b,resource_type='Extension' fhir_comments=None e...
