# Exports raw data from mimic-iv database

Only the following care unit patients are exported:
- Coronary Care unit (CCU)
- Cardiac Vascular Intensive Care unit (CVICU)

In [6]:
from projects.utils import *
from projects.common import *
from typing import Tuple
from tqdm import tqdm
# from tqdm.notebook import tqdm
from multiprocessing import Pool, RLock
from configobj import ConfigObj
import numpy as np
import getpass
import json
import math
import os
import psycopg2
import pandas as pd
import time

import matplotlib.pyplot as plt
%matplotlib inline


In [12]:
def connect_db():
    db_dir = os.path.abspath('') + "/../../../db"
    return connect_to_database(db_dir)


connect_db()

# def merge_lab_df(df1: pd.DataFrame, df2: pd.DataFrame):
#     target_cols = ['subject_id', 'hadm_id', 'charttime', 'specimen_id']
#     df1 = df1.sort_values(target_cols)
#     df2 = df2.sort_values(target_cols)
#     df2 = df2.loc[:, ~df2.columns.isin(target_cols)]
#     return pd.concat([df1, df2], axis=1)


def create_dummy_files_func(export_dir, _custom_icustays_list, pid):
    for icustay_id in _custom_icustays_list:
        # if icustay_id != 39060235:
        #     continue
        assert not os.path.exists(os.path.join(
            export_dir, 'data_'+str(icustay_id)+'.dsv'))
        save_data_dsv(export_dir, icustay_id,
                      pd.DataFrame(DataTable().data))


def create_dummy_files(export_dir: str, _custom_icustays_list: list):
    """ Create empty dummy .dsv files."""
    parallel_processing(create_dummy_files_func, MP_NUM_PROCESSES,
                        export_dir, _custom_icustays_list)
    print("Created dummy .dsv files.")


def get_database_table_as_dataframe_ext(_schema_type: str, _table: str,
                                        _chunk: int = 10000):
    """Wrapper for generating dataframe from the db. """
    (query_schema_core,
     query_schema_hosp,
     query_schema_icu,
     query_schema_derived,
     conn) = connect_db()

    if _schema_type == 'core':
        _query_schema = query_schema_core
    if _schema_type == 'hosp':
        _query_schema = query_schema_hosp
    if _schema_type == 'icu':
        _query_schema = query_schema_icu
    if _schema_type == 'derived':
        _query_schema = query_schema_derived
    else:
        _query_schema = None

    df_iter, num_entries = get_database_table_as_dataframe(
        conn, _query_schema, _table, _chunk_size=_chunk*MP_NUM_PROCESSES)
    num_entries = math.ceil(num_entries / (_chunk*MP_NUM_PROCESSES))
    return df_iter, num_entries


def split_df(df: pd.DataFrame, num_processes: int = 8):
    interval = math.ceil(len(df)/num_processes)
    dfs = [df.iloc[interval*i:interval*(i+1)]
           for i in range((num_processes-1))]
    dfs += [df.iloc[interval*(num_processes-1):]]
    return dfs


def parallel_processing_ext(_func,
                            _df_iter,
                            _num_entries: int,
                            _custom_icustays_list: list):
    """Wrapper for parallel processing. Sorts the dataframe based on 
    `sort_list` before running the `func`.

    TODO: df should be splitted up based on `stay_id`, i.e. where all 
    the `stay_id` are assigned to the same process. If not may cause reading 
    error because this id determines which dsv file to read from. The current 
    hack is to create a large enough df chunk so that this error situation 
    will not occur.
    """
    sort_list = ['subject_id', 'hadm_id', 'stay_id',
                 'charttime', 'starttime', 'endtime', ]
    for df in tqdm(_df_iter, total=_num_entries):
        if 'stay_id' in df.columns.tolist():
            df = df[df.stay_id.isin(_custom_icustays_list)]
        df = df.sort_values(
            by=[i for i in sort_list if i in df.columns.tolist()])
        dfs = split_df(df, MP_NUM_PROCESSES)
        parallel_processing(_func, MP_NUM_PROCESSES, dfs)


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<


In [8]:
# # Save labevents_info table from db.
# (_, _, _, query_schema_derived, conn) = connect_db()
# df = get_database_table_as_dataframe(
#     conn, query_schema_derived, 'labevents_info')
# df = df.sort_values('itemid')
# df.to_csv("../../../"+LAB_INFO_PATH, na_rep='', sep='\t', index=False)


# Prepare the required mappings.
The custom dict and list are created from `05a_export_raw_info.ipynb` .

In [9]:
# labitems = pd.read_csv("../../../"+LAB_ITEM_PATH, sep='\t', header=0)
# labitems.fillna('None')

d_derived = pd.read_csv("../../../"+DERIVED_ITEM_PATH, sep='\t', header=0)
d_derived = d_derived.fillna('None')
print('d_derived', d_derived.columns.to_list())

d_items = pd.read_csv("../../../"+CHART_ITEM_PATH, sep='\t', header=0)
d_items = d_items.fillna('None')
print('d_items', d_items.columns.to_list())

d_labinfos = pd.read_csv("../../../"+LAB_INFO_PATH, sep='\t', header=0)
d_labinfos = d_labinfos.fillna('None')
print('d_labinfos', d_labinfos.columns.to_list())

d_labitems = pd.read_csv("../../../"+LAB_ITEM_PATH, sep='\t', header=0)
d_labitems = d_labitems.fillna('None')
print('d_labitems', d_labitems.columns.to_list())

with open("../../../" + TMP_CUSTOM_LIST, 'r') as f:
    custom_icustays_list = json.load(f)

with open("../../../" + TMP_CUSTOM_DICT, 'r') as f:
    custom_icustays_dict = json.load(f)


def create_mappings(_id_mapping: dict):

    id_mapping = {}
    unit_mapping = {}
    low_mapping = {}
    high_mapping = {}
    cat_mapping = {}

    for k, v in _id_mapping.items():

        id_mapping[v] = k

        if k//100000 == 1:
            unit_mapping[v] = d_derived[d_derived['uid'] == k]['units'].values[0]
            low_mapping[v] = None  # TODO ADD THIS IN THE DERIVED TABLE?
            high_mapping[v] = None  # TODO ADD THIS IN THE DERIVED TABLE?
            cat_mapping[v] = d_derived[d_derived['uid'] == k]['category'].values[0]

        elif k//200000 == 1:
            unit_mapping[v] = d_items[d_items['uid'] == k]['unitname'].values[0]
            low_mapping[v] = d_items[d_items['uid'] == k]['lownormalvalue'].values[0]
            high_mapping[v] = d_items[d_items['uid'] == k]['highnormalvalue'].values[0]
            cat_mapping[v] = d_items[d_items['uid'] == k]['category'].values[0]

        elif k//500000 == 1:
            cat_mapping[v] = d_labitems[d_labitems['uid'] == k]['category'].values[0]
            unit_mapping[v] = None  # From db table
            low_mapping[v] = None  # From db table
            high_mapping[v] = None  # From db table

        else:
            unit_mapping[v] = None
            low_mapping[v] = None
            high_mapping[v] = None
            cat_mapping[v] = None

    return id_mapping, unit_mapping, low_mapping, high_mapping, cat_mapping


d_derived ['uid', 'label', 'units', 'category', 'notes']
d_items ['uid', 'itemid', 'label', 'abbreviation', 'linksto', 'category', 'unitname', 'param_type', 'lownormalvalue', 'highnormalvalue']
d_labinfos ['itemid', 'valueuom', 'valueuom_count', 'ref_range_lower', 'ref_range_lower_count', 'ref_range_upper', 'ref_range_upper_count']
d_labitems ['uid', 'itemid', 'label', 'fluid', 'category', 'loinc_code']


# Checks the labevents data.
This is to check whether there are duplicates in the units and upper/lower boundaries.

In [10]:
# target_lab_itemid = {
#     50862: 'albumin',
#     50930: 'globulin',
#     50976: 'total_protein',
#     50868: 'aniongap',
#     50882: 'bicarbonate',
#     51006: 'bun',
#     50893: 'calcium',
#     50902: 'chloride',
#     50912: 'creatinine',
#     50931: 'glucose',
#     50983: 'sodium',
#     50971: 'potassium',

#     52028: 'specimen',
#     50801: 'aado2',
#     50802: 'baseexcess',
#     50803: 'bicarbonate',
#     50804: 'totalco2',
#     50805: 'carboxyhemoglobin',
#     50806: 'chloride',
#     50808: 'calcium',
#     50809: 'glucose',
#     50810: 'hematocrit',
#     50811: 'hemoglobin',
#     50813: 'lactate',
#     50814: 'methemoglobin',
#     50816: 'fio2',
#     50817: 'so2',
#     50818: 'pco2',
#     50820: 'ph',
#     50821: 'po2',
#     50822: 'potassium',
#     50824: 'sodium',
#     50825: 'temperature',
#     # 223835: 'fio2_chartevents',
#     # 100038: 'pao2fio2ratio',  # nounit
#     # 100039: 'aado2_calc',
#     # 100040: 'specimen_pred',  # nounit
#     # 100041: 'specimen_prob',

#     51146: 'basophils',
#     52069: 'basophils_abs',
#     51200: 'eosinophils',
#     51254: 'monocytes',
#     51256: 'neutrophils',
#     52075: 'neutrophils_abs',
#     51143: 'atypical_lymphocytes',
#     51144: 'bands',
#     52135: 'immature_granulocytes',
#     51251: 'metamyelocytes',
#     51257: 'nrbc',
#     # 51300: 'wbc',  # Has None as unit.
#     # 51301: 'wbc',  # Has None as unit.
#     # 51755: 'wbc',
#     # 100003: 'wbc',  # TODO: May need to split due to category.
#     # 100004: 'lymphocytes',
#     # 100005: 'eosinophils_abs',  # Has None as unit.
#     # 100006: 'lymphocytes_abs',  # Has None as unit.
#     # 100007: 'monocytes_abs',  # Has None as unit.

#     51002: 'troponin_i',
#     51003: 'troponin_t',
#     50911: 'ck_mb',

#     51196: 'd_dimer',
#     51214: 'fibrinogen',
#     51297: 'thrombin',
#     51237: 'inr',
#     51274: 'pt',
#     51275: 'ptt',

#     51221: 'hematocrit',
#     51222: 'hemoglobin',
#     51248: 'mch',
#     51249: 'mchc',
#     51250: 'mcv',
#     51265: 'platelet',
#     51279: 'rbc',
#     51277: 'rdw',
#     52159: 'rdwsd',
#     # 51301: 'wbc', # present in blood_differential

#     50861: 'alt',
#     50863: 'alp',
#     50878: 'ast',
#     50867: 'amylase',
#     50885: 'bilirubin_total',
#     50884: 'bilirubin_indirect',
#     50883: 'bilirubin_direct',
#     50910: 'ck_cpk',
#     50911: 'ck_mb',
#     50927: 'ggt',
#     50954: 'ld_ldh',

#     50889: 'crp',

# }

# target_lab_itemid = target_lab_itemid.keys()

# d_labinfos = pd.read_csv("../../../"+LAB_INFO_PATH, sep='\t', header=0)
# d_labinfos = d_labinfos.fillna('None')
# print('d_labinfos', d_labinfos.columns.to_list())


# print("Checking valueuom_count")
# for i in target_lab_itemid:
#     if (d_labinfos[d_labinfos['itemid'] == i]['valueuom_count'] > 1).any():
#         print(i)


# print("Checking ref_range_lower_count")
# for i in target_lab_itemid:
#     if (d_labinfos[d_labinfos['itemid'] == i]['ref_range_lower_count'] > 1).any():
#         print(i, d_labinfos[d_labinfos['itemid'] == i]['ref_range_lower_count'])


# print("Checking ref_range_upper_count")
# for i in target_lab_itemid:
#     if (d_labinfos[d_labinfos['itemid'] == i]['ref_range_upper_count'] > 1).any():
#         print(i, d_labinfos[d_labinfos['itemid'] == i]['ref_range_upper_count'])


# Export data info

Currently the unit is taken from the original tables. A better solution is to include them in the concepts.


In [13]:
create_dummy_files(STRUCTURED_EXPORT_DIR, custom_icustays_list)


Created dummy .dsv files.


## Height

In [14]:
# ['subject_id', 'stay_id', 'charttime', 'height']
df_iter, num_entries = get_database_table_as_dataframe_ext('derived', 'height')


def func(dfs, pid):

    # This line is the strange hack for notebook.tqdm
    # print(' ', end='', flush=True)

    df = dfs[0]
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]
        dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, df_row['stay_id'])
        dt.append(
            uid=100001,
            value=df_row['height'],
            unit='cm',
            category='General',
            starttime=df_row['charttime'],
        )
        save_data_dsv(STRUCTURED_EXPORT_DIR,
                      df_row['stay_id'], pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added height entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting height data


1it [00:00,  5.16it/s]


Number of entries for height : 35170
Column names : ['subject_id', 'stay_id', 'charttime', 'height']



100%|██████████| 1/1 [00:17<00:00, 17.97s/it]

Added height entries.





## Weight

In [15]:
# ['stay_id', 'starttime', 'endtime', 'weight', 'weight_type']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'weight_durations')


def func(dfs, pid):

    df = dfs[0]
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]
        dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, df_row['stay_id'])
        dt.append(
            uid=100002,
            value=df_row['weight'],
            unit='kg',
            category='General, ' + df_row['weight_type'],
            starttime=df_row['starttime'],
            endtime=df_row['endtime'],
        )
        save_data_dsv(STRUCTURED_EXPORT_DIR,
                      df_row['stay_id'], pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added weight entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting weight_durations data


4it [00:00,  4.84it/s]


Number of entries for weight_durations : 287155
Column names : ['stay_id', 'starttime', 'endtime', 'weight', 'weight_type']



100%|██████████| 4/4 [01:38<00:00, 24.57s/it]

Added weight entries.





## Chemistry

In [16]:
id_mapping = {
    550862: 'albumin',
    550930: 'globulin',
    550976: 'total_protein',
    550868: 'aniongap',
    550882: 'bicarbonate',
    551006: 'bun',
    550893: 'calcium',
    550902: 'chloride',
    550912: 'creatinine',
    550931: 'glucose',
    550983: 'sodium',
    550971: 'potassium',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'albumin', 'globulin', 'total_protein', 'aniongap', 'bicarbonate', 'bun', 'calcium', 'chloride', 'creatinine', 'glucose', 'sodium', 'potassium']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'chemistry')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_list:
            continue
        if df_row['hadm_id'] not in custom_icustays_dict[df_row['subject_id']]:
            continue

        stay_ids = custom_icustays_dict[df_row['subject_id']
                                        ][df_row['hadm_id']]

        for stay_id in stay_ids:
            it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
            dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, stay_id)

            icu_intime = it.data[13]
            icu_outtime = it.data[14]

            if icu_intime <= df_row['charttime'] <= icu_outtime:
                for col in df.columns.tolist():
                    if col in id_mapping:
                        dt.append(
                            uid=id_mapping[col],
                            value=df_row[col],
                            unit=df_row[col+'_unit'],
                            lower_range=df_row[col+'_lower'],
                            upper_range=df_row[col+'_upper'],
                            category=cat_mapping[col],
                            specimen_id=df_row['specimen_id'],
                            starttime=df_row['charttime'],
                        )
                save_data_dsv(STRUCTURED_EXPORT_DIR,
                              stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added chemistry (lab) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting chemistry data


50it [00:53,  1.06s/it]


Number of entries for chemistry : 3956323
Column names : ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'albumin', 'globulin', 'total_protein', 'aniongap', 'bicarbonate', 'bun', 'calcium', 'chloride', 'creatinine', 'glucose', 'sodium', 'potassium', 'albumin_unit', 'globulin_unit', 'total_protein_unit', 'aniongap_unit', 'bicarbonate_unit', 'bun_unit', 'calcium_unit', 'chloride_unit', 'creatinine_unit', 'glucose_unit', 'sodium_unit', 'potassium_unit', 'albumin_lower', 'globulin_lower', 'total_protein_lower', 'aniongap_lower', 'bicarbonate_lower', 'bun_lower', 'calcium_lower', 'chloride_lower', 'creatinine_lower', 'glucose_lower', 'sodium_lower', 'potassium_lower', 'albumin_upper', 'globulin_upper', 'total_protein_upper', 'aniongap_upper', 'bicarbonate_upper', 'bun_upper', 'calcium_upper', 'chloride_upper', 'creatinine_upper', 'glucose_upper', 'sodium_upper', 'potassium_upper']



100%|██████████| 50/50 [06:06<00:00,  7.34s/it]

Added chemistry (lab) entries.





## Blood Gas

In [17]:
id_mapping = {
    552028: 'specimen',
    550801: 'aado2',
    550802: 'baseexcess',
    550803: 'bicarbonate',
    550804: 'totalco2',
    550805: 'carboxyhemoglobin',
    550806: 'chloride',
    550808: 'calcium',
    550809: 'glucose',
    550810: 'hematocrit',
    550811: 'hemoglobin',
    550813: 'lactate',
    550814: 'methemoglobin',
    550816: 'fio2',
    550817: 'so2',
    550818: 'pco2',
    550820: 'ph',
    550821: 'po2',
    550822: 'potassium',
    550824: 'sodium',
    550825: 'temperature',
    223835: 'fio2_chartevents',
    100038: 'pao2fio2ratio',  # nounit
    100039: 'aado2_calc',
    100040: 'specimen_pred',  # nounit
    100041: 'specimen_prob',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'charttime', 'specimen', 'specimen_pred', 'specimen_prob', 'so2', 'po2', 'pco2', 'fio2_chartevents', 'fio2', 'aado2', 'aado2_calc', 'pao2fio2ratio', 'ph', 'baseexcess', 'bicarbonate', 'totalco2', 'hematocrit', 'hemoglobin', 'carboxyhemoglobin', 'methemoglobin', 'chloride', 'calcium', 'temperature', 'potassium', 'sodium', 'lactate', 'glucose']
df_iter, num_entries = get_database_table_as_dataframe_ext('derived', 'bg')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_dict:
            continue
        if df_row['hadm_id'] not in custom_icustays_dict[df_row['subject_id']]:
            continue

        stay_ids = custom_icustays_dict[df_row['subject_id']
                                        ][df_row['hadm_id']]

        for stay_id in stay_ids:
            it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
            dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, stay_id)

            icu_intime = it.data[13]
            icu_outtime = it.data[14]

            if icu_intime <= df_row['charttime'] <= icu_outtime:
                for col in df.columns.tolist():
                    if col in id_mapping:

                        if id_mapping[col]//100000 == 1:
                            unit = unit_mapping[col]
                            lower_range = low_mapping[col]
                            upper_range = high_mapping[col]

                            if id_mapping[col] == 100039:
                                unit = df_row['aado2_unit'],
                                lower_range = df_row['aado2_lower'],
                                upper_range = df_row['aado2_upper'],

                        else:
                            unit = df_row[col+'_unit'],
                            lower_range = df_row[col+'_lower'],
                            upper_range = df_row[col+'_upper'],

                        dt.append(
                            uid=id_mapping[col],
                            value=df_row[col],
                            unit=unit,
                            lower_range=lower_range,
                            upper_range=upper_range,
                            category=cat_mapping[col],
                            starttime=df_row['charttime'],
                        )
                save_data_dsv(STRUCTURED_EXPORT_DIR,
                              stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added bg (lab) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting bg data


8it [00:10,  1.28s/it]


Number of entries for bg : 561212
Column names : ['subject_id', 'hadm_id', 'charttime', 'specimen', 'specimen_unit', 'specimen_lower', 'specimen_upper', 'specimen_pred', 'specimen_prob', 'so2', 'so2_unit', 'so2_lower', 'so2_upper', 'po2', 'po2_unit', 'po2_lower', 'po2_upper', 'pco2', 'pco2_unit', 'pco2_lower', 'pco2_upper', 'fio2_chartevents', 'fio2', 'fio2_unit', 'fio2_lower', 'fio2_upper', 'aado2', 'aado2_unit', 'aado2_lower', 'aado2_upper', 'aado2_calc', 'pao2fio2ratio', 'ph', 'ph_unit', 'ph_lower', 'ph_upper', 'baseexcess', 'baseexcess_unit', 'baseexcess_lower', 'baseexcess_upper', 'bicarbonate', 'bicarbonate_unit', 'bicarbonate_lower', 'bicarbonate_upper', 'totalco2', 'totalco2_unit', 'totalco2_lower', 'totalco2_upper', 'hematocrit', 'hematocrit_unit', 'hematocrit_lower', 'hematocrit_upper', 'hemoglobin', 'hemoglobin_unit', 'hemoglobin_lower', 'hemoglobin_upper', 'carboxyhemoglobin', 'carboxyhemoglobin_unit', 'carboxyhemoglobin_lower', 'carboxyhemoglobin_upper', 'methemoglobin', '

100%|██████████| 8/8 [00:12<00:00,  1.61s/it]

Added bg (lab) entries.





## Blood Differential

In [18]:
# impute absolute count if percentage & WBC is available
id_mapping = {
    551146: 'basophils',
    552069: 'basophils_abs',
    551200: 'eosinophils',
    551254: 'monocytes',
    551256: 'neutrophils',
    552075: 'neutrophils_abs',
    551143: 'atypical_lymphocytes',
    551144: 'bands',
    552135: 'immature_granulocytes',
    551251: 'metamyelocytes',
    551257: 'nrbc',

    100003: 'wbc',  # TODO: May need to split due to category.
    100004: 'lymphocytes',
    100005: 'eosinophils_abs',
    100006: 'lymphocytes_abs',
    100007: 'monocytes_abs',

    # 51300: 'wbc',
    # 51301: 'wbc',
    # 51755: 'wbc',
    # [51244, 51245]: lymphocytes
    # [52073, 51199]: eosinophils_abs
    # [51133, 52769]: lymphocytes_abs
    # [52074, 51253]: monocytes_abs
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'wbc', 'basophils_abs', 'eosinophils_abs', 'lymphocytes_abs', 'monocytes_abs', 'neutrophils_abs', 'basophils', 'eosinophils', 'lymphocytes', 'monocytes', 'neutrophils', 'atypical_lymphocytes', 'bands', 'immature_granulocytes', 'metamyelocytes', 'nrbc']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'blood_differential')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_dict:
            continue
        if df_row['hadm_id'] not in custom_icustays_dict[df_row['subject_id']]:
            continue

        stay_ids = custom_icustays_dict[df_row['subject_id']
                                        ][df_row['hadm_id']]

        for stay_id in stay_ids:
            it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
            dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, stay_id)

            icu_intime = it.data[13]
            icu_outtime = it.data[14]

            if icu_intime <= df_row['charttime'] <= icu_outtime:
                for col in df.columns.tolist():
                    if col in id_mapping:

                        if id_mapping[col]//100000 == 1:
                            unit = unit_mapping[col]
                            lower_range = low_mapping[col]
                            upper_range = high_mapping[col]

                            if id_mapping[col] == 100003:
                                lower_range = df_row['wbc_lower'],
                                upper_range = df_row['wbc_upper'],
                            elif id_mapping[col] == 100004:
                                lower_range = df_row['lymphocytes_lower'],
                                upper_range = df_row['lymphocytes_upper'],
                            elif id_mapping[col] == 100005:
                                lower_range = df_row['eosinophils_abs_lower'],
                                upper_range = df_row['eosinophils_abs_upper'],
                            elif id_mapping[col] == 100006:
                                lower_range = df_row['lymphocytes_abs_lower'],
                                upper_range = df_row['lymphocytes_abs_upper'],
                            elif id_mapping[col] == 100007:
                                lower_range = df_row['monocytes_abs_lower'],
                                upper_range = df_row['monocytes_abs_upper'],

                        else:
                            unit = df_row[col+'_unit'],
                            lower_range = df_row[col+'_lower'],
                            upper_range = df_row[col+'_upper'],

                        dt.append(
                            uid=id_mapping[col],
                            value=df_row[col],
                            unit=unit,
                            lower_range=lower_range,
                            upper_range=upper_range,
                            category=cat_mapping[col],
                            specimen_id=df_row['specimen_id'],
                            starttime=df_row['charttime'],
                        )
                save_data_dsv(STRUCTURED_EXPORT_DIR,
                              stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added blood_differential (lab) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting blood_differential data


42it [00:39,  1.06it/s]


Number of entries for blood_differential : 3283493
Column names : ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'wbc', 'wbc_unit', 'wbc_lower', 'wbc_upper', 'basophils_abs', 'basophils_abs_unit', 'basophils_abs_lower', 'basophils_abs_upper', 'eosinophils_abs', 'eosinophils_abs_unit', 'eosinophils_abs_lower', 'eosinophils_abs_upper', 'lymphocytes_abs', 'lymphocytes_abs_unit', 'lymphocytes_abs_lower', 'lymphocytes_abs_upper', 'monocytes_abs', 'monocytes_abs_unit', 'monocytes_abs_lower', 'monocytes_abs_upper', 'neutrophils_abs', 'neutrophils_abs_unit', 'neutrophils_abs_lower', 'neutrophils_abs_upper', 'basophils', 'basophils_unit', 'basophils_lower', 'basophils_upper', 'eosinophils', 'eosinophils_unit', 'eosinophils_lower', 'eosinophils_upper', 'lymphocytes', 'lymphocytes_unit', 'lymphocytes_lower', 'lymphocytes_upper', 'monocytes', 'monocytes_unit', 'monocytes_lower', 'monocytes_upper', 'neutrophils', 'neutrophils_unit', 'neutrophils_lower', 'neutrophils_upper', 'atypical_lymphoc

100%|██████████| 42/42 [01:07<00:00,  1.60s/it]

Added blood_differential (lab) entries.





## Cardiac Marker

In [19]:
id_mapping = {
    551002: 'troponin_i',
    551003: 'troponin_t',
    550911: 'ck_mb',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'troponin_i', 'troponin_t', 'ck_mb']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'cardiac_marker')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_dict:
            continue
        if df_row['hadm_id'] not in custom_icustays_dict[df_row['subject_id']]:
            continue

        stay_ids = custom_icustays_dict[df_row['subject_id']
                                        ][df_row['hadm_id']]

        for stay_id in stay_ids:
            it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
            dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, stay_id)

            icu_intime = it.data[13]
            icu_outtime = it.data[14]

            if icu_intime <= df_row['charttime'] <= icu_outtime:
                for col in df.columns.tolist():
                    if col in id_mapping:
                        dt.append(
                            uid=id_mapping[col],
                            value=df_row[col],
                            unit=df_row[col+'_unit'],
                            lower_range=df_row[col+'_lower'],
                            upper_range=df_row[col+'_upper'],
                            category=cat_mapping[col],
                            specimen_id=df_row['specimen_id'],
                            starttime=df_row['charttime'],
                        )
                save_data_dsv(STRUCTURED_EXPORT_DIR,
                              stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added cardiac_marker (lab) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting cardiac_marker data


6it [00:01,  4.43it/s]


Number of entries for cardiac_marker : 430049
Column names : ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'troponin_i', 'troponin_t', 'ck_mb', 'troponin_i_unit', 'troponin_t_unit', 'ck_mb_unit', 'troponin_i_lower', 'troponin_t_lower', 'ck_mb_lower', 'troponin_i_upper', 'troponin_t_upper', 'ck_mb_upper']



100%|██████████| 6/6 [00:07<00:00,  1.27s/it]

Added cardiac_marker (lab) entries.





## Coagulation

In [20]:
id_mapping = {
    551196: 'd_dimer',
    551214: 'fibrinogen',
    551297: 'thrombin',
    551237: 'inr',
    551274: 'pt',
    551275: 'ptt',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'd_dimer', 'fibrinogen', 'thrombin', 'inr', 'pt', 'ptt']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'coagulation')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_dict:
            continue
        if df_row['hadm_id'] not in custom_icustays_dict[df_row['subject_id']]:
            continue

        stay_ids = custom_icustays_dict[df_row['subject_id']
                                        ][df_row['hadm_id']]

        for stay_id in stay_ids:
            it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
            dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, stay_id)

            icu_intime = it.data[13]
            icu_outtime = it.data[14]

            if icu_intime <= df_row['charttime'] <= icu_outtime:
                for col in df.columns.tolist():
                    if col in id_mapping:
                        dt.append(
                            uid=id_mapping[col],
                            value=df_row[col],
                            unit=df_row[col+'_unit'],
                            lower_range=df_row[col+'_lower'],
                            upper_range=df_row[col+'_upper'],
                            category=cat_mapping[col],
                            specimen_id=df_row['specimen_id'],
                            starttime=df_row['charttime'],
                        )
                save_data_dsv(STRUCTURED_EXPORT_DIR,
                              stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added coagulation (lab) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting coagulation data


20it [00:08,  2.23it/s]


Number of entries for coagulation : 1594879
Column names : ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'd_dimer', 'fibrinogen', 'thrombin', 'inr', 'pt', 'ptt', 'd_dimer_unit', 'fibrinogen_unit', 'thrombin_unit', 'inr_unit', 'pt_unit', 'ptt_unit', 'd_dimer_lower', 'fibrinogen_lower', 'thrombin_lower', 'inr_lower', 'pt_lower', 'ptt_lower', 'd_dimer_upper', 'fibrinogen_upper', 'thrombin_upper', 'inr_upper', 'pt_upper', 'ptt_upper']



100%|██████████| 20/20 [00:27<00:00,  1.39s/it]

Added coagulation (lab) entries.





## Complete blood count

In [21]:
id_mapping = {
    551221: 'hematocrit',
    551222: 'hemoglobin',
    551248: 'mch',
    551249: 'mchc',
    551250: 'mcv',
    551265: 'platelet',
    551279: 'rbc',
    551277: 'rdw',
    552159: 'rdwsd',
    # 551301: 'wbc', # present in blood_differential
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'hematocrit', 'hemoglobin', 'mch', 'mchc', 'mcv', 'platelet', 'rbc', 'rdw', 'rdwsd', 'wbc']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'complete_blood_count')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_dict:
            continue
        if df_row['hadm_id'] not in custom_icustays_dict[df_row['subject_id']]:
            continue

        stay_ids = custom_icustays_dict[df_row['subject_id']
                                        ][df_row['hadm_id']]

        for stay_id in stay_ids:
            it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
            dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, stay_id)

            icu_intime = it.data[13]
            icu_outtime = it.data[14]

            if icu_intime <= df_row['charttime'] <= icu_outtime:
                for col in df.columns.tolist():
                    if col in id_mapping:
                        dt.append(
                            uid=id_mapping[col],
                            value=df_row[col],
                            unit=df_row[col+'_unit'],
                            lower_range=df_row[col+'_lower'],
                            upper_range=df_row[col+'_upper'],
                            category=cat_mapping[col],
                            specimen_id=df_row['specimen_id'],
                            starttime=df_row['charttime'],
                        )
                save_data_dsv(STRUCTURED_EXPORT_DIR,
                              stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added complete_blood_count (lab) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting complete_blood_count data


44it [00:47,  1.07s/it]


Number of entries for complete_blood_count : 3492512
Column names : ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'hematocrit', 'hemoglobin', 'mch', 'mchc', 'mcv', 'platelet', 'rbc', 'rdw', 'rdwsd', 'wbc', 'hematocrit_unit', 'hemoglobin_unit', 'mch_unit', 'mchc_unit', 'mcv_unit', 'platelet_unit', 'rbc_unit', 'rdw_unit', 'rdwsd_unit', 'wbc_unit', 'hematocrit_lower', 'hemoglobin_lower', 'mch_lower', 'mchc_lower', 'mcv_lower', 'platelet_lower', 'rbc_lower', 'rdw_lower', 'rdwsd_lower', 'wbc_lower', 'hematocrit_upper', 'hemoglobin_upper', 'mch_upper', 'mchc_upper', 'mcv_upper', 'platelet_upper', 'rbc_upper', 'rdw_upper', 'rdwsd_upper', 'wbc_upper']



100%|██████████| 44/44 [01:20<00:00,  1.83s/it]

Added complete_blood_count (lab) entries.





## Enzyme

In [22]:
id_mapping = {
    550861: 'alt',
    550863: 'alp',
    550878: 'ast',
    550867: 'amylase',
    550885: 'bilirubin_total',
    550884: 'bilirubin_indirect',
    550883: 'bilirubin_direct',
    550910: 'ck_cpk',
    550911: 'ck_mb',
    550927: 'ggt',
    550954: 'ld_ldh',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'alt', 'alp', 'ast', 'amylase', 'bilirubin_total', 'bilirubin_direct', 'bilirubin_indirect', 'ck_cpk', 'ck_mb', 'ggt', 'ld_ldh']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'enzyme')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_dict:
            continue
        if df_row['hadm_id'] not in custom_icustays_dict[df_row['subject_id']]:
            continue

        stay_ids = custom_icustays_dict[df_row['subject_id']
                                        ][df_row['hadm_id']]

        for stay_id in stay_ids:
            it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
            dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, stay_id)

            icu_intime = it.data[13]
            icu_outtime = it.data[14]

            if icu_intime <= df_row['charttime'] <= icu_outtime:
                for col in df.columns.tolist():
                    if col in id_mapping:
                        dt.append(
                            uid=id_mapping[col],
                            value=df_row[col],
                            unit=df_row[col+'_unit'],
                            lower_range=df_row[col+'_lower'],
                            upper_range=df_row[col+'_upper'],
                            category=cat_mapping[col],
                            specimen_id=df_row['specimen_id'],
                            starttime=df_row['charttime'],
                        )
                save_data_dsv(STRUCTURED_EXPORT_DIR,
                              stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added enzyme (lab) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting enzyme data


23it [00:16,  1.44it/s]


Number of entries for enzyme : 1787236
Column names : ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'alt', 'alp', 'ast', 'amylase', 'bilirubin_total', 'bilirubin_direct', 'bilirubin_indirect', 'ck_cpk', 'ck_mb', 'ggt', 'ld_ldh', 'alt_unit', 'alp_unit', 'ast_unit', 'amylase_unit', 'bilirubin_total_unit', 'bilirubin_direct_unit', 'bilirubin_indirect_unit', 'ck_cpk_unit', 'ck_mb_unit', 'ggt_unit', 'ld_ldh_unit', 'alt_lower', 'alp_lower', 'ast_lower', 'amylase_lower', 'bilirubin_total_lower', 'bilirubin_direct_lower', 'bilirubin_indirect_lower', 'ck_cpk_lower', 'ck_mb_lower', 'ggt_lower', 'ld_ldh_lower', 'alt_upper', 'alp_upper', 'ast_upper', 'amylase_upper', 'bilirubin_total_upper', 'bilirubin_direct_upper', 'bilirubin_indirect_upper', 'ck_cpk_upper', 'ck_mb_upper', 'ggt_upper', 'ld_ldh_upper']



100%|██████████| 23/23 [00:40<00:00,  1.76s/it]

Added enzyme (lab) entries.





## Inflamation

In [23]:
id_mapping = {
    550889: 'crp',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'crp']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'inflammation')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_dict:
            continue
        if df_row['hadm_id'] not in custom_icustays_dict[df_row['subject_id']]:
            continue

        stay_ids = custom_icustays_dict[df_row['subject_id']
                                        ][df_row['hadm_id']]

        for stay_id in stay_ids:
            it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
            dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, stay_id)

            icu_intime = it.data[13]
            icu_outtime = it.data[14]

            if icu_intime <= df_row['charttime'] <= icu_outtime:
                for col in df.columns.tolist():
                    if col in id_mapping:
                        dt.append(
                            uid=id_mapping[col],
                            value=df_row[col],
                            unit=df_row[col+'_unit'],
                            lower_range=df_row[col+'_lower'],
                            upper_range=df_row[col+'_upper'],
                            category=cat_mapping[col],
                            specimen_id=df_row['specimen_id'],
                            starttime=df_row['charttime'],
                        )
                save_data_dsv(STRUCTURED_EXPORT_DIR,
                              stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added inflammation (lab) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting inflammation data


2it [00:00,  7.62it/s]


Number of entries for inflammation : 118290
Column names : ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'crp', 'crp_unit', 'crp_lower', 'crp_upper']



100%|██████████| 2/2 [00:02<00:00,  1.01s/it]

Added inflammation (lab) entries.





## O2 delivery

In [24]:
id_mapping = {
    227287: 'o2_flow_additional',

    100012: 'o2_flow',
    100008: 'o2_delivery_device_1',
    100009: 'o2_delivery_device_2',
    100010: 'o2_delivery_device_3',
    100011: 'o2_delivery_device_4',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'stay_id', 'charttime', 'o2_flow', 'o2_flow_additional', 'o2_delivery_device_1', 'o2_delivery_device_2', 'o2_delivery_device_3', 'o2_delivery_device_4']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'oxygen_delivery')


def func(dfs, pid):

    df = dfs[0]
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]
        dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, df_row['stay_id'])
        for col in df.columns.tolist():
            if col in id_mapping:
                dt.append(
                    uid=id_mapping[col],
                    value=df_row[col],
                    unit=unit_mapping[col],
                    lower_range=low_mapping[col],
                    upper_range=high_mapping[col],
                    category=cat_mapping[col],
                    starttime=df_row['charttime'],
                )
        save_data_dsv(STRUCTURED_EXPORT_DIR,
                      df_row['stay_id'], pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added oxygen_delivery (chart) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting oxygen_delivery data


11it [00:01,  7.54it/s]


Number of entries for oxygen_delivery : 829534
Column names : ['subject_id', 'stay_id', 'charttime', 'o2_flow', 'o2_flow_additional', 'o2_delivery_device_1', 'o2_delivery_device_2', 'o2_delivery_device_3', 'o2_delivery_device_4']



100%|██████████| 11/11 [04:28<00:00, 24.43s/it]

Added oxygen_delivery (chart) entries.





## Rhythm

In [25]:
id_mapping = {
    220048: 'heart_rhythm',
    224650: 'ectopy_type',
    224651: 'ectopy_frequency',
    226479: 'ectopy_type_secondary',
    226480: 'ectopy_frequency_secondary',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'charttime', 'heart_rhythm', 'ectopy_type', 'ectopy_frequency', 'ectopy_type_secondary', 'ectopy_frequency_secondary']
df_iter, num_entries = get_database_table_as_dataframe_ext('derived', 'rhythm')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_dict:
            continue

        for hadm_id, stay_ids in custom_icustays_dict[df_row['subject_id']]:

            for stay_id in stay_ids:
                it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
                dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, stay_id)

                icu_intime = it.data[13]
                icu_outtime = it.data[14]

                if icu_intime <= df_row['charttime'] <= icu_outtime:
                    for col in df.columns.tolist():
                        if col in id_mapping:
                            dt.append(
                                uid=id_mapping[col],
                                value=df_row[col],
                                unit=unit_mapping[col],
                                lower_range=low_mapping[col],
                                upper_range=high_mapping[col],
                                category=cat_mapping[col],
                                starttime=df_row['charttime'],
                            )
                    save_data_dsv(STRUCTURED_EXPORT_DIR,
                                  stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added rhythm (chart) entries.")


Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting rhythm data


78it [00:07,  9.77it/s]


Number of entries for rhythm : 6184785
Column names : ['subject_id', 'charttime', 'heart_rhythm', 'ectopy_type', 'ectopy_frequency', 'ectopy_type_secondary', 'ectopy_frequency_secondary']



100%|██████████| 78/78 [01:42<00:00,  1.31s/it]

Added rhythm (chart) entries.





## Urine Output

In [29]:
create_dummy_files(STRUCTURED_EXPORT_DIR+'_uo', custom_icustays_list)

# ['stay_id', 'charttime', 'urineoutput']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'urine_output')


def func(dfs, pid):

    df = dfs[0]
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]
        dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR+'_uo', df_row['stay_id'])
        dt.append(
            uid=100013,
            value=df_row['urineoutput'],
            unit='mL',
            category='Output',
            starttime=df_row['charttime'],
        )
        save_data_dsv(STRUCTURED_EXPORT_DIR+'_uo',
                      df_row['stay_id'], pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added urine_output (chart) entries.")


Created dummy .dsv files.
Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting urine_output data


44it [00:04, 10.13it/s]


Number of entries for urine_output : 3497267
Column names : ['stay_id', 'charttime', 'urineoutput']



100%|██████████| 44/44 [12:57<00:00, 17.67s/it]

Added urine_output (chart) entries.





## Urine Output Rate

In [34]:
create_dummy_files(STRUCTURED_EXPORT_DIR+'_uor', custom_icustays_list)

# -- attempt to calculate urine output per hour
# -- rate/hour is the interpretable measure of kidney function
# -- though it is difficult to estimate from aperiodic point measures
# -- first we get the earliest heart rate documented for the stay
id_mapping = {
    # 100013: 'uo', present in previous table.
    100014: 'urineoutput_6hr',  # output within 6hr (floor)
    100015: 'urineoutput_12hr',
    100016: 'urineoutput_24hr',
    100017: 'uo_mlkghr_6hr',  # (urineoutput_6hr/weight/uo_tm_6hr)
    100018: 'uo_mlkghr_12hr',
    100019: 'uo_mlkghr_24hr',
    100020: 'uo_tm_6hr',  # time from last uo measurement within 6hr (floor)
    100021: 'uo_tm_12hr',
    100022: 'uo_tm_24hr',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['stay_id', 'charttime', 'weight', 'uo', 'urineoutput_6hr', 'urineoutput_12hr', 'urineoutput_24hr', 'uo_mlkghr_6hr', 'uo_mlkghr_12hr', 'uo_mlkghr_24hr', 'uo_tm_6hr', 'uo_tm_12hr', 'uo_tm_24hr']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'urine_output_rate')


def func(dfs, pid):

    df = dfs[0]
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]
        dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR +
                                '_uor', df_row['stay_id'])
        for col in df.columns.tolist():
            if col in id_mapping:
                dt.append(
                    uid=id_mapping[col],
                    value=df_row[col],
                    unit=unit_mapping[col],
                    lower_range=low_mapping[col],
                    upper_range=high_mapping[col],
                    category=cat_mapping[col],
                    starttime=df_row['charttime'],
                )
        save_data_dsv(STRUCTURED_EXPORT_DIR+'_uor',
                      df_row['stay_id'], pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added urine_output_rate (derived) entries.")


Created dummy .dsv files.
Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting urine_output_rate data


44it [00:25,  1.76it/s]


Number of entries for urine_output_rate : 3497266
Column names : ['stay_id', 'charttime', 'weight', 'uo', 'urineoutput_6hr', 'urineoutput_12hr', 'urineoutput_24hr', 'uo_mlkghr_6hr', 'uo_mlkghr_12hr', 'uo_mlkghr_24hr', 'uo_tm_6hr', 'uo_tm_12hr', 'uo_tm_24hr']



 41%|████      | 18/44 [15:24<22:15, 51.36s/it]


ValueError: ('something wrong loading file :', 34269559)

## Vent settings

In [28]:
create_dummy_files(STRUCTURED_EXPORT_DIR+'_vent', custom_icustays_list)

id_mapping = {
    224688: 'respiratory_rate_set',
    224690: 'respiratory_rate_total',
    224689: 'respiratory_rate_spontaneous',
    224687: 'minute_volume',
    224684: 'tidal_volume_set',
    224685: 'tidal_volume_observed',
    224686: 'tidal_volume_spontaneous',
    224696: 'plateau_pressure',
    100023: 'peep',
    # 223835: 'fio2',  # same as fio2_chartevents
    223849: 'ventilator_mode',
    229314: 'ventilator_mode_hamilton',
    223848: 'ventilator_type',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'stay_id', 'charttime', 'respiratory_rate_set', 'respiratory_rate_total', 'respiratory_rate_spontaneous', 'minute_volume', 'tidal_volume_set', 'tidal_volume_observed', 'tidal_volume_spontaneous', 'plateau_pressure', 'peep', 'fio2', 'ventilator_mode', 'ventilator_mode_hamilton', 'ventilator_type']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'ventilator_setting')


def func(dfs, pid):

    df = dfs[0]
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]
        dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR+'_vent', df_row['stay_id'])
        for col in df.columns.tolist():
            if col in id_mapping:
                dt.append(
                    uid=id_mapping[col],
                    value=df_row[col],
                    unit=unit_mapping[col],
                    lower_range=low_mapping[col],
                    upper_range=high_mapping[col],
                    category=cat_mapping[col],
                    starttime=df_row['charttime'],
                )
        save_data_dsv(STRUCTURED_EXPORT_DIR+'_vent',
                      df_row['stay_id'], pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added ventilator_setting (chart) entries.")


Created dummy .dsv files.
Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting ventilator_setting data


14it [00:03,  3.65it/s]


Number of entries for ventilator_setting : 1067028
Column names : ['subject_id', 'stay_id', 'charttime', 'respiratory_rate_set', 'respiratory_rate_total', 'respiratory_rate_spontaneous', 'minute_volume', 'tidal_volume_set', 'tidal_volume_observed', 'tidal_volume_spontaneous', 'plateau_pressure', 'peep', 'fio2', 'ventilator_mode', 'ventilator_mode_hamilton', 'ventilator_type']



100%|██████████| 14/14 [09:30<00:00, 40.77s/it]

Added ventilator_setting (chart) entries.





## Vital Signs

In [13]:
create_dummy_files(STRUCTURED_EXPORT_DIR+'_vs', custom_icustays_list)

id_mapping = {
    220045: 'heart_rate',
    100024: 'sbp',
    100025: 'dbp',
    100026: 'mbp',
    220179: 'sbp_ni',
    220180: 'dbp_ni',
    220181: 'mbp_ni',
    100027: 'resp_rate',
    100028: 'temperature',
    224642: 'temperature_site',
    220277: 'spo2',
    100029: 'glucose_chartevents',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'stay_id', 'charttime', 'heart_rate', 'sbp', 'dbp', 'mbp', 'sbp_ni', 'dbp_ni', 'mbp_ni', 'resp_rate', 'temperature', 'temperature_site', 'spo2', 'glucose']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'vitalsign', _chunk=100000)


def func(dfs, pid):

    df = dfs[0]
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]
        dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR+'_vs', df_row['stay_id'])
        for col in df.columns.tolist():
            if col in id_mapping:
                dt.append(
                    uid=id_mapping[col],
                    value=df_row[col],
                    unit=unit_mapping[col],
                    lower_range=low_mapping[col],
                    upper_range=high_mapping[col],
                    category=cat_mapping[col],
                    starttime=df_row['charttime'],
                )
        save_data_dsv(STRUCTURED_EXPORT_DIR+'_vs',
                      df_row['stay_id'], pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added vitalsign (chart) entries.")


Created dummy .dsv files.
Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting vitalsign data


13it [00:33,  2.62s/it]


Number of entries for vitalsign : 10249430
Column names : ['subject_id', 'stay_id', 'charttime', 'heart_rate', 'sbp', 'dbp', 'mbp', 'sbp_ni', 'dbp_ni', 'mbp_ni', 'resp_rate', 'temperature', 'temperature_site', 'spo2', 'glucose']



  0%|          | 0/13 [00:00<?, ?it/s]

## Antibiotics

In [None]:
# (query_schema_core,
#  query_schema_hosp,
#  query_schema_icu,
#  query_schema_derived,
#  conn) = connect_db()

# # ['subject_id', 'hadm_id', 'stay_id', 'antibiotic', 'route', 'starttime', 'stoptime']
# df = get_database_table_as_dataframe(conn, query_schema_derived, 'antibiotic')
# df = df[df.stay_id.isin(custom_icustays_list)]


# def func(dfs, pid):

# 
# 

#     df = dfs[0]
#     it = InfoTable()
#     dt = DataTable()
#     for df_i in df.iterrows():
#         df_row = df_i[1]
#         dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR, df_row['stay_id'])
#         dt.append(
#             uid=100030,
#             value=df_row['antibiotic'],
#             category=df_row['route'],
#             starttime=df_row['starttime'],
#             endtime=df_row['stoptime'],
#         )
#         save_data_dsv(STRUCTURED_EXPORT_DIR,
#                       df_row['stay_id'], pd.DataFrame(dt.data))


# dfs = split_df(df, MP_NUM_PROCESSES)
# parallel_processing(func, MP_NUM_PROCESSES, dfs)

# print("Added antibiotic (hosp.prescriptions) entries.")


## Medications

In [32]:
create_dummy_files(STRUCTURED_EXPORT_DIR+'_med', custom_icustays_list)

med_ids = [
    220995,  # Sodium Bicarbonate 8.4%
    221794,  # Furosemide (Lasix) **
    228340,  # Furosemide (Lasix) 250/50 **
    # 100037,  # Furosemide (Lasix)
    221986,  # Milrinone
    229068,  # Protamine sulfate
    229639,  # Bumetanide (Bumex)

    221653,  # Dobutamine
    221662,  # Dopamine
    221289,  # Epinephrine
    229617,  # Epinephrine. ~145 entries only
    # 100036,  # Epinephrine
    221906,  # Norepinephrine
    221749,  # Phenylephrine
    222315,  # Vasopressin
]
id_mapping = {
    221794: 100037,
    228340: 100037,
    221289: 100036,
    229617: 100036,
}

(query_schema_core,
 query_schema_hosp,
 query_schema_icu,
 query_schema_derived,
 conn) = connect_db()

df_iter, num_entries = get_database_table_as_dataframe(
    conn, query_schema_icu, 'inputevents',
    _filter_col='itemid',
    _filter_col_val=tuple(med_ids),
    _chunk_size=10000*MP_NUM_PROCESSES)
num_entries = math.ceil(num_entries / (10000*MP_NUM_PROCESSES))
# df = df[df.stay_id.isin(custom_icustays_list)]
# df = df.sort_values('stay_id')


def func(dfs, pid):

    df = dfs[0]
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]
        dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR +
                                '_med', df_row['stay_id'])
        uid = df_row['itemid']
        dt.append(
            uid=id_mapping[uid] if uid in id_mapping else uid,
            value=df_row['amount'],
            unit=df_row['amountuom'],
            rate=df_row['rate'],
            rate_unit=df_row['rateuom'],
            category='Medication',
            starttime=df_row['starttime'],
            endtime=df_row['endtime'],
        )
        save_data_dsv(STRUCTURED_EXPORT_DIR+'_med',
                      df_row['stay_id'], pd.DataFrame(dt.data))


for df in tqdm(df_iter, total=num_entries):
    df = df[df.stay_id.isin(custom_icustays_list)]
    df = df.sort_values(by=['subject_id', 'hadm_id',
                        'stay_id', 'starttime', 'endtime'])
    dfs = split_df(df, MP_NUM_PROCESSES)
    parallel_processing(func, MP_NUM_PROCESSES, dfs)

print("Added (medication) entries.")


Created dummy .dsv files.
Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting inputevents data


10it [00:08,  1.22it/s]


Number of entries for inputevents : 782373
Column names : ['subject_id', 'hadm_id', 'stay_id', 'starttime', 'endtime', 'storetime', 'itemid', 'amount', 'amountuom', 'rate', 'rateuom', 'orderid', 'linkorderid', 'ordercategoryname', 'secondaryordercategoryname', 'ordercomponenttypedescription', 'ordercategorydescription', 'patientweight', 'totalamount', 'totalamountuom', 'isopenbag', 'continueinnextdept', 'cancelreason', 'statusdescription', 'originalamount', 'originalrate']



100%|██████████| 10/10 [05:56<00:00, 35.70s/it]

Added (medication) entries.





## KDIGO

In [26]:
create_dummy_files(STRUCTURED_EXPORT_DIR+'_kdigo', custom_icustays_list)

id_mapping = {
    100031: 'creat_low_past_48hr',
    100032: 'creat_low_past_7day',
    100033: 'aki_stage_creat',
    100034: 'aki_stage_uo',
    100035: 'aki_stage',
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'stay_id', 'charttime', 'creat_low_past_7day', 'creat_low_past_48hr', 'creat', 'aki_stage_creat', 'uo_rt_6hr', 'uo_rt_12hr', 'uo_rt_24hr', 'aki_stage_uo', 'aki_stage']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'kdigo_stages')


def func(dfs, pid):

    df = dfs[0]
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]
        dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR+'_kdigo', df_row['stay_id'])
        for col in df.columns.tolist()[4:]:
            if col in id_mapping:
                dt.append(
                    uid=id_mapping[col],
                    value=df_row[col],
                    unit=unit_mapping[col],
                    lower_range=low_mapping[col],
                    upper_range=high_mapping[col],
                    category=cat_mapping[col],
                    starttime=df_row['charttime'],
                )
        save_data_dsv(STRUCTURED_EXPORT_DIR+'_kdigo',
                      df_row['stay_id'], pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added kdigo_stages (derived) entries.")


Created dummy .dsv files.
Database: mimiciv
Username: mimiciv
>>>>> Connected to DB <<<<<
Getting kdigo_stages data


52it [00:17,  2.91it/s]


Number of entries for kdigo_stages : 4111003
Column names : ['subject_id', 'hadm_id', 'stay_id', 'charttime', 'creat_low_past_7day', 'creat_low_past_48hr', 'creat', 'aki_stage_creat', 'uo_rt_6hr', 'uo_rt_12hr', 'uo_rt_24hr', 'aki_stage_uo', 'aki_stage']



100%|██████████| 52/52 [26:03<00:00, 30.06s/it]

Added kdigo_stages (derived) entries.





In [None]:
def func(files):
    for f in files:
        main_path = os.path.join(STRUCTURED_EXPORT_DIR, f)
        df = pd.read_csv(main_path, sep='$')

        path = os.path.join(STRUCTURED_EXPORT_DIR+'_kdigo', f)
        df = pd.concat([df, pd.read_csv(path, sep='$')])

        path = os.path.join(STRUCTURED_EXPORT_DIR+'_med', f)
        df = pd.concat([df, pd.read_csv(path, sep='$')])

        path = os.path.join(STRUCTURED_EXPORT_DIR+'_uo', f)
        df = pd.concat([df, pd.read_csv(path, sep='$')])

        path = os.path.join(STRUCTURED_EXPORT_DIR+'_uor', f)
        df = pd.concat([df, pd.read_csv(path, sep='$')])

        path = os.path.join(STRUCTURED_EXPORT_DIR+'_vent', f)
        df = pd.concat([df, pd.read_csv(path, sep='$')])

        path = os.path.join(STRUCTURED_EXPORT_DIR+'_vs', f)
        df = pd.concat([df, pd.read_csv(path, sep='$')])

        sort_list = ['starttime', 'uid']
        df = df.sort_values(by=sort_list)

        df.to_csv(main_path, na_rep='', sep='$', index=False)


data_files = [i for i in os.listdir(STRUCTURED_EXPORT_DIR) if 'data' in i]
parallel_processing(func, MP_NUM_PROCESSES, data_files)


# Sanity Check

In [None]:
create_dummy_files(STRUCTURED_EXPORT_DIR_TESTING, custom_icustays_list)

id_mapping = {
    551221: 'hematocrit',
    551222: 'hemoglobin',
    551248: 'mch',
    551249: 'mchc',
    551250: 'mcv',
    551265: 'platelet',
    551279: 'rbc',
    551277: 'rdw',
    552159: 'rdwsd',
    # 551301: 'wbc', # present in blood_differential
}
(id_mapping,
 unit_mapping,
 low_mapping,
 high_mapping,
 cat_mapping) = create_mappings(id_mapping)

# ['subject_id', 'hadm_id', 'charttime', 'specimen_id', 'hematocrit', 'hemoglobin', 'mch', 'mchc', 'mcv', 'platelet', 'rbc', 'rdw', 'rdwsd', 'wbc']
df_iter, num_entries = get_database_table_as_dataframe_ext(
    'derived', 'complete_blood_count')


def func(dfs, pid):

    df = dfs[0]
    it = InfoTable()
    dt = DataTable()
    for df_i in df.iterrows():
        df_row = df_i[1]

        if df_row['subject_id'] not in custom_icustays_dict:
            continue
        if df_row['hadm_id'] not in custom_icustays_dict[df_row['subject_id']]:
            continue

        stay_ids = custom_icustays_dict[df_row['subject_id']
                                        ][df_row['hadm_id']]

        for stay_id in stay_ids:
            it.data = load_info_dsv(STRUCTURED_EXPORT_DIR, stay_id)
            dt.data = load_data_dsv(STRUCTURED_EXPORT_DIR_TESTING, stay_id)

            icu_intime = it.data[13]
            icu_outtime = it.data[14]

            if icu_intime <= df_row['charttime'] <= icu_outtime:
                for col in df.columns.tolist():
                    if col in id_mapping:
                        dt.append(
                            uid=id_mapping[col],
                            value=df_row[col],
                            unit=df_row[col+'_unit'],
                            lower_range=df_row[col+'_lower'],
                            upper_range=df_row[col+'_upper'],
                            category=cat_mapping[col],
                            specimen_id=df_row['specimen_id'],
                            starttime=df_row['charttime'],
                        )
                save_data_dsv(STRUCTURED_EXPORT_DIR_TESTING,
                              stay_id, pd.DataFrame(dt.data))


parallel_processing_ext(func, df_iter, num_entries, custom_icustays_list)
print("Added complete_blood_count (lab) entries.")


data_files1 = [i for i in os.listdir(STRUCTURED_EXPORT_DIR) if 'data' in i]
data_files2 = [i for i in os.listdir(
    STRUCTURED_EXPORT_DIR_TESTING) if 'data' in i]


def func(data_files1, data_files2, pid):

    for f1, f2 in tqdm(zip(sorted(data_files1), sorted(data_files2)),
                       total=len(data_files1)):

        assert f1 == f2, f"{f1} {f2}"
        path1 = os.path.join(STRUCTURED_EXPORT_DIR, f1)
        path2 = os.path.join(STRUCTURED_EXPORT_DIR_TESTING, f2)

        data1 = pd.read_csv(path1, sep='$').to_dict('list')
        data1 = {k: np.array(v) if len(v) > 0 else np.array([], dtype=int)
                 for k, v in data1.items()}
        data2 = pd.read_csv(path2, sep='$').to_dict('list')
        data2 = {k: np.array(v) if len(v) > 0 else np.array([], dtype=int)
                 for k, v in data2.items()}

        _data1_flag = data1['uid']
        _data2_flag = data2['uid']
        for k in id_mapping.values():
            if data1['uid'][_data1_flag == k].size > 0 or \
                    data2['uid'][_data2_flag == k].size > 0:
                assert data1['uid'][_data1_flag == k] == \
                    data2['uid'][_data2_flag == k]


parallel_processing(func, MP_NUM_PROCESSES, data_files1, data_files2)


def func(files, pid):
    for f in tqdm(files):
        os.remove(os.path.join(STRUCTURED_EXPORT_DIR_TESTING, f))


parallel_processing(func, MP_NUM_PROCESSES, data_files2)
os.rmdir(STRUCTURED_EXPORT_DIR_TESTING)

print("Checked complete_blood_count (lab) entries.")
