# Data PIPELINE for MetaVision VUmc Database

In [None]:
################################################################################################################
################################################################################################################
### import libraries
import os
import platform
import copy
import sys
import pyodbc
import pymssql
import pandas as pd
import numpy as np
import functools 
from functools import reduce
from datetime import datetime

################################################################################################################
################################################################################################################
# import from parent directory with a little help from sys.path.insert()
#sys.path.insert(0, '..\\src\\') # Windows
sys.path.insert(0, '../src')  # OSX / Linux

### from util.py (file which once contained all classes and functions):
from util import * # automatically reload python (e.g. util.py) file when they are changed.
%load_ext autoreload
%reload_ext autoreload
%autoreload 2

### Configuration file to determine root directory 
import conf

# from configuration file set working directory
main_path = os.path.join(conf.ROOT_DIR, 'SEPSIS')
os.chdir(main_path)

# Define the subfolders paths
data_path = '\ICV_data\\'
query_path = '\ICV_sql\\'
source_path = '\ICV_src\\'
admission_path = '\ICV_data\\Admission\\'
chunk_data_path = '\ICV_data\\Chunks\\'

############################################################################
"""
SQL CONNECTION to MetaVision Databse using PYODBC
"""
# Check connection query:
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'"

# Define MV_connection
PYODBC_con = pyodbc.connect('Trusted_Connection=yes', driver = '{ODBC Driver 13 for SQL Server}',
                            server = 'server', database = 'db', Timeout=20)
# check connection, print list of tables in database
PYODBC_check = pd.read_sql_query(sql, PYODBC_con)

# PYODBC Query from file function
def qfile(query,encoding=None):
    import fileinput
    with open(main_path + query_path + query,'r', encoding=encoding) as inserts: # ,'r' to open file in "read only" mode
        sql = " ".join(line for line in inserts if not line.isspace())
        
    # execute query
    df = pd.read_sql_query(sql, PYODBC_con)
    
    return df

def qptlist(query,pt_list,encoding=None):
    import fileinput
    with open(main_path + query_path + query,'r', encoding=encoding) as inserts: # ,'r' to open file in "read only" mode
        sql = " ".join(line for line in inserts if not line.isspace())
        
    # replace pt_list by actual pt_list:
    pt_list_holder = 'PT_LIST'
    sql = sql.replace(pt_list_holder, pt_list)
    #print(sql)
    
    # execute query
    df = pd.read_sql_query(sql, PYODBC_con)
    
    return df

############################################################################
# Settings for Pandas to display more then the default amount of collumns
pd.set_option("display.max_columns",150)

# SQL working
print("SQL connections established!\n") 

### Check everything
print_python_environment()

# Import pt_list

### Create Chunks of pt_list for VITALS, LABS, FLUIDS, VASOPRESSOR data

In [None]:
pt_list_df = pd.read_csv(main_path + data_path + "admissions_df.csv", sep=',')
print("Number of total Metavision admissions: %d" % len(pt_list_df.index))

# convert pt_list_dataframe_column to a string for future queries
pt_list = "'" + ("', '".join(map(str, pt_list_df['PatientID'].tolist()))) + "'"

# Check for DUPLICATES!
print("Number of duplicated Metavision admissions: %d" % sum(pt_list_df.duplicated('PatientID', keep=False) == True))

n = 500  #chunk row size
list_df = [pt_list_df[i:i+n] for i in range(0,pt_list_df.shape[0],n)]

# Get Demographics, SIRS and SOFA

In [None]:
# Note start time
print("Started at : " + str(datetime.now()))

### Get Queries from file
AGE = qptlist('GET_Age_ptlist.sql',pt_list)
print('Count of AGE: ' + str(len(AGE)))
#print(AGE.head(),'\n')

# Gender
GENDER = qptlist('GET_Gender_ptlist.sql',pt_list)
print('Count of GENDER: ' + str(len(GENDER)))
#print(GENDER.head(),'\n')

# Admissio nweight
WEIGHT = qptlist('GET_AdmissionWeight_ptlist.sql',pt_list,'utf-16')
print('Count of WEIGHT: ' + str(len(WEIGHT)))
#print(WEIGHT.head(),'\n')

# length
LENGTH = qptlist('GET_length_ptlist.sql',pt_list)
print('Count of LENGTH: ' + str(len(LENGTH)))
#print(WEIGHT.head(),'\n')

# YES/NO Ventilation in first 24 hours
VENT = qptlist('GET_First24H_ventilator_ptlist.sql',pt_list)
print('Count of VENT: ' + str(len(VENT)))
#print(VENT.head(),'\n')

# ALIVE [0] OR DEAD [1] at Discharge 
DEATH = qptlist('GET_Death_ptlist.sql',pt_list)
print('Count of DEATH: ' + str(len(DEATH)))
#print(DEATH.head(),'\n')

# SIRS !! SLOW QUERY
SIRS = qptlist('GET_SIRS_Admission_ptlist.sql',pt_list,'utf-16')
print('Count of SIRS: ' + str(len(SIRS)))
#print(SIRS.head(),'\n')

# Note end time
print("Finished at : " + str(datetime.now()))

### Get SOFA score values

In [None]:
# Note start time
print("Started at : " + str(datetime.now()))

# Define SOFA SQL subfilter directory
SOFA_Filter_path = '\\util\\SOFA_filter\\'
 
# SOFA - labs subquery
SOFA_labvalues = qptlist(SOFA_Filter_path + 'GET_SOFA_labvalues_ptlist.sql',pt_list)
print('Count of Lab values: ' + str(len(SOFA_labvalues)))
#print(SOFA_labvalues.head(),'\n')

# SOFA - urine production subquery
SOFA_UP = qptlist(SOFA_Filter_path + 'GET_SOFA_UP_ptlist.sql',pt_list)
print('Count of Urine Production: ' + str(len(SOFA_UP)))
#print(SOFA_UP.head(),'\n')

# SOFA - FiO2 subquery
SOFA_FIO2 = qptlist(SOFA_Filter_path + 'GET_SOFA_FIO2_ptlist.sql',pt_list)
print('Count of FIO2: ' + str(len(SOFA_FIO2)))
#print(SOFA_FIO2.head(),'\n')

# SOFA - EMV subquery
SOFA_EMV = qptlist(SOFA_Filter_path + 'GET_SOFA_EMV_ptlist.sql',pt_list)
print('Count of EMV: ' + str(len(SOFA_EMV)))
#print(SOFA_EMV.head(),'\n')
      
# SOFA - vsopressor dose subquery
SOFA_vasodose = qptlist(SOFA_Filter_path + 'GET_SOFA_vasodose_ptlist.sql',pt_list)
print('Count of Vasoscore: ' + str(len(SOFA_vasodose)))
#print(SOFA_vasodose.head(),'\n')

# Merge tables to one file
SOFA_data_frames = [SOFA_labvalues,SOFA_UP,SOFA_FIO2,SOFA_vasodose,SOFA_EMV]

# using functools.reduce merge all dataframe. (pd.merge() only merges two dataframes at a time)
SOFA = reduce(lambda left,right: pd.merge(left,right,on=['PatientID'],
                                            how='outer'), SOFA_data_frames)

# Note end time
print("Finished at : " + str(datetime.now()))

### MERGE DEMOGRAPHICS + SIRS + SOFA

In [None]:
# Merge tables to one file
data_frames = [AGE,GENDER,LENGTH,WEIGHT,VENT,SIRS,SOFA,DEATH]

# using functools.reduce merge all dataframe. (pd.merge() only merges two dataframes at a time)
demographics = reduce(lambda left,right: pd.merge(left,right,on=['PatientID'],
                                            how='outer'), data_frames)

# Save to demographics.csv
demographics.to_csv(os.path.join(main_path + data_path, 'demographics.csv'),sep=',',index=False)

print("Finished at : " + str(datetime.now()))

# visual inspection
demographics.head(10)

# Get Vitals in chunks

In [None]:
# ### TEST DATASET -  FULL QUERY: Get Queries from file
# Vitals = qptlist('Get_Vitals_ptlist.sql',pt_list,'utf-16')
# ## Save to csv
# Vitals.to_csv(os.path.join(main_path + data_path, 'Vitals.csv'),sep=',',index=False)

# Note start time
print("Started at : " + str(datetime.now()))

# Get vitals in chunks
i=0
for list in list_df:
    # count
    i=i+1
    
    # convert pt_list_dataframe_column to a string for future queries
    pt_list_subset = "'" + ("', '".join(map(str, list['PatientID'].tolist()))) + "'"
    
    # execute query for subset of patients
    print('Chuck: ' + str(i) + ' out of ' + str(len(list_df)) + ". Started at : " + str(datetime.now()))
    Vitals = qptlist('Get_Vitals_ptlist.sql',pt_list_subset)#,'utf-16')

    # Save to csv
    Vitals.to_csv(os.path.join(main_path + chunk_data_path, 'Vitals/Vitals_Chunk_' + str(i) + '.csv'),sep=',',index=False)

# note end time
print("Finished at : " + str(datetime.now()))

# visual inspection
Vitals.head(10)

# Get Lab Signals in chunks

In [None]:
### TEST DATASET -  FULL QUERY: Get Queries from file
# labs = qptlist('Get_labs_ptlist.sql',pt_list,'utf-16')
## Save to csv
# labs.to_csv(os.path.join(main_path + data_path, 'Labs.csv'),sep=',',index=False)

# Note start time
print("Started at : " + str(datetime.now()))

# Get labs in chunks
i=0
for list in list_df:
    # count
    i=i+1
    
    # convert pt_list_dataframe_column to a string for future queries
    pt_list_subset = "'" + ("', '".join(map(str, list['PatientID'].tolist()))) + "'"
    
    # execute query for subset of patients
    print('Chuck: ' + str(i) + ' out of ' + str(len(list_df)) + ". Started at : " + str(datetime.now()))
    labs = qptlist('Get_labs_ptlist.sql',pt_list_subset,'utf-16')

    # Save to csv
    labs.to_csv(os.path.join(main_path + chunk_data_path, 'Labs/Labs_Chunk_' + str(i) + '.csv'),sep=',',index=False)
        
# note end time
print("Finished at : " + str(datetime.now()))

# visual inspection
labs.head(10)

# Get Fluids RangeSignals in chunks

In [None]:
# ### TEST DATASET -  FULL QUERY: Get Queries from file
# fluids = qptlist('Get_fluids_ptlist.sql',pt_list)
# ## Save to csv
# fluids.to_csv(os.path.join(main_path + data_path, 'Fluids.csv'),sep=',',index=False)

# Note start time
print("Started at : " + str(datetime.now()))

# Get fluids in chunks
i=0
for list in list_df:
    # count
    i=i+1
    
    # convert pt_list_dataframe_column to a string for future queries
    pt_list_subset = "'" + ("', '".join(map(str, list['PatientID'].tolist()))) + "'"
    
    # execute query for subset of patients
    print('Chuck: ' + str(i) + ' out of ' + str(len(list_df)) + ". Started at : " + str(datetime.now()))
    fluids = qptlist('Get_fluids_ptlist.sql',pt_list_subset)

    # Save to csv
    fluids.to_csv(os.path.join(main_path + chunk_data_path, 'Fluids/Fluids_Chunk_' + str(i) + '.csv'),sep=',',index=False)

# note end time
print("Finished at : " + str(datetime.now()))

# visual inspection
fluids.head(10)

# Get Vasopressor RangeSignals in chunks

In [None]:
# Note start time
print("Started at : " + str(datetime.now()))

# Get vitals in chunks
i=0
for list in list_df:
    # count
    i=i+1
    
    # convert pt_list_dataframe_column to a string for future queries
    pt_list_subset = "'" + ("', '".join(map(str, list['PatientID'].tolist()))) + "'"
    
    # execute query for subset of patients
    print('Chuck: ' + str(i) + ' out of ' + str(len(list_df)) + ". Started at : " + str(datetime.now()))
    vasopressor = qptlist('Get_vasopressor_ptlist.sql',pt_list_subset)
    
    # Save to 
    vasopressor.to_csv(os.path.join(main_path + chunk_data_path, 'Vasopressor/Vasopressor_Chunk_' + str(i) + '.csv'),sep=',',index=False)

# note end time
print("Finished at : " + str(datetime.now()))

# visual inspection
vasopressor.head(10)

## Get weight for dose conversion from mcg/min to mcg/kg/min

In [None]:
# Admissionweight (for conversion of vasopressor dose in mcg/min to mcg/kg/min)
WEIGHT = qptlist('GET_AdmissionWeight_ptlist.sql',pt_list,'utf-16')
print('Count of WEIGHT: ' + str(len(WEIGHT)))
print(WEIGHT.head(),'\n')
# Save to csv
WEIGHT.to_csv(os.path.join(main_path + data_path, 'Weight.csv'),sep=',',index=False)

# Get UrineOutput Signals

In [None]:
# Admissionweight
URINE = qptlist('GET_UrineProduction_ptlist.sql',pt_list)
print('Count of URINE: ' + str(len(URINE)))
print(URINE.head(),'\n')
# Save to csv
URINE.to_csv(os.path.join(main_path + data_path, 'UrineOutput.csv'),sep=',',index=False)

# End of Notebook
    Last step: safely close connection to the database (only applies to PYODBC)