In [99]:

# Import Libraries
import json
import os


import pandas as pd
import numpy as np
from tqdm import tqdm
tqdm.pandas()

import src.data_processing.MIMIC.test_functions as tests

# LOAD CONFIGURATION 
with open("src/data_processing/MIMIC/MIMIC_PROCESSING_DEFAULT_VARS.json", "r") as f:
    DEFAULT_CONFIG = json.load(f)
    f.close()

if not os.path.exists(DEFAULT_CONFIG["SAVE_FD"]):
    os.makedirs(DEFAULT_CONFIG["SAVE_FD"])

In [2]:
try:
    assert os.path.exists(DEFAULT_CONFIG["SAVE_FD"] + "admissions_intermediate.csv")
    assert os.path.exists(DEFAULT_CONFIG["SAVE_FD"] + "vitals_intermediate.csv")

except AssertionError as e:
    raise e

In [3]:
# Print Information
print("\n\n ======== PROCESSING OUTCOMES ======== \n\n")

# Load previously processed data
adm_proc = pd.read_csv(
    DEFAULT_CONFIG["SAVE_FD"] + "admissions_intermediate.csv",
    index_col=0, 
    header=0, 
    parse_dates=["intime", "outtime", "intime_next", "outtime_next", "deathtime"]
)
vit_proc = (
    pd.read_csv(
    DEFAULT_CONFIG["SAVE_FD"] + "vitals_intermediate.csv", 
    index_col=0, 
    header=0, 
    parse_dates=DEFAULT_CONFIG["VITALS_TIME_VARS"]
    )
    .reset_index(drop=False)
    .assign(sampled_time_to_end=lambda x: pd.to_timedelta(x["sampled_time_to_end"]))  # pd does not load timedelta automatically
)


# Check correct computation of admissions and vitals
tests.test_admissions_processed_correctly(adm_proc)
tests.test_vitals_processed_correctly(vit_proc, config_dic=DEFAULT_CONFIG)






Testing admissions processed correctly...

Testing outtime is after intime.
Test passed!

Testing next transfer information is consistent.
Test passed!

Testing admission times are before death (if exists).
Test passed!

Testing ids are unique for params ('subject_id', 'hadm_id', 'stay_id', 'transfer_id_next')
Test passed for variable  subject_id!
Test passed for variable  hadm_id!
Test passed for variable  stay_id!
Test passed for variable  transfer_id_next!

Testing ids are complete for params ('subject_id', 'stay_id', 'intime', 'outtime')
Test passed for variable subject_id!
Test passed for variable stay_id!
Test passed for variable intime!
Test passed for variable outtime!
Test passed!
Admissions correctly computed! Safe to go ahead.

Testing vitals were processed correctly and make sense.

Testing ids are complete for params ('subject_id', 'stay_id', 'sampled_time_to_end')
Test passed for variable subject_id!
Test passed for variable stay_id!
Test passed for variable sampled_

100%|██████████| 8364/8364 [01:30<00:00, 91.92it/s] 


Test passed!

Testing resampling data is linear from min to max per patient.


100%|██████████| 8364/8364 [00:06<00:00, 1194.93it/s]

Test passed!
Vitals seem correctly processed!





In [4]:
# Load core info
transfers_core = pd.read_csv(
    DEFAULT_CONFIG["DATA_FD"] + "core/transfers.csv", 
    index_col=None, 
    header=0, 
    parse_dates=["intime", "outtime"]
)
admissions_core = pd.read_csv(
    DEFAULT_CONFIG["DATA_FD"] + "core/admissions.csv",
    index_col=None,
    header=0,
    parse_dates=["admittime", "dischtime", "deathtime", "edregtime", "edouttime"]
)



In [5]:

"""
Step 1: Subset the set of transfers/admissions_core to the already processed cohort.

We do this by merging. 
"""

# Define Id for merging. We separate deathtime as one database registers only date, while the other
# registers everyting (i.e. up to second)
# tr_merge_ids = [
#     col for 
#     col in vit_proc.columns.tolist() if
#     col in transfers_core.columns.tolist() and
#     "death" not in col
# ]
hadm_merge_ids = [
    col for
    col in vit_proc.columns.tolist() if
    col in admissions_core.columns.tolist() and
    "death" not in col
]
merge_ids = ["subject_id", "hadm_id", "stay_id"]         # Useful simplication

# # Inner merge for transfers core
# transfers_S1 = (
#     transfers_core
#     .merge(
#         vit_proc.drop_duplicates(subset=merge_ids),   # Drop duplicates as we don't need all the rows
#         how="inner",
#         on=tr_merge_ids
#     )
#     .dropna(subset=["hadm_id"])                 # Drop rows with no hadm_id as we can't compare with transfers
#     .sort_values(by=merge_ids, ascending=True) # Sort by subject_id and stay_id
# )

# Inner merge for admissions core
admissions_S1 = (
    admissions_core
    .merge(
        vit_proc.drop_duplicates(subset=merge_ids), # only want one obvs per admission for merging
        how="inner",
        on=hadm_merge_ids,
        suffixes=("", "_ed")
    )
    .dropna(subset=["hadm_id"])            # Drop rows with no hadm_id as we can't compare with transfers
    .sort_values(by=merge_ids, ascending=True) # Sort by subject_id and stay_id
)

# Testing and save
# tests.test_ids_subset_of_cohort(transfers_S1, vit_proc, *merge_ids)
tests.test_ids_subset_of_cohort(admissions_S1, vit_proc, *merge_ids)
# tests.test_is_complete_ids(transfers_S1, *merge_ids, "stay_id")
tests.test_is_complete_ids(admissions_S1, *merge_ids, "stay_id")

# Check processing and correctdeness
# transfers_S1.to_csv(DEFAULT_CONFIG["SAVE_FD"] + "transfers_S1.csv", header=True, index=True)



Testing ('subject_id', 'hadm_id', 'stay_id') are subset of cohort data.
Test passed!

Testing ids are complete for params ('subject_id', 'hadm_id', 'stay_id', 'stay_id')
Test passed for variable subject_id!
Test passed for variable hadm_id!
Test passed for variable stay_id!
Test passed for variable stay_id!


In [6]:
admissions_S2 = (
    admissions_S1
    .query("intime <= admittime")                            # admissions to hospital after ED admissions
    .query("intime_next >= admittime | intime_next.isna()")  # admissions to hospital before next ED transfer
    .query("outtime <= edouttime")                           # transfer outtime before ed exit time
    .query("intime <= edregtime")                            # transfer intmie before ed registration time
    .query("dischtime - outtime_next >= @pd.Timedelta('-6h') | outtime_next.isna()")
    # discharge time not earlier than outtime_next (added -6 hours due to some potential delays)
    .query("deathtime <= dischtime | deathtime.isna()")
)

# First subset Transfers
tr_merge_ids = ["subject_id", "hadm_id", "stay_id", 
                "outtime", "deathtime", "intime_next", "outtime_next",
                "dischtime", "discharge_location"]
transfers_S1 = (
    transfers_core
    .merge(
        admissions_S2[tr_merge_ids],
        how="inner",
        on=["subject_id", "hadm_id"],
        suffixes=("", "_ed")
    )
    .sort_values(by=["subject_id", "stay_id"], ascending=True)
)

# Run tests
tests.test_is_complete_ids(transfers_S1, "subject_id", "hadm_id")
tests.test_outtimes_match(transfers_S1)
tests.test_every_patient_has_discharge_transfer(transfers_S1)



Testing ids are complete for params ('subject_id', 'hadm_id')
Test passed for variable subject_id!
Test passed for variable hadm_id!

Testing whether outtimes match when available.


100%|██████████| 8134/8134 [00:05<00:00, 1510.39it/s]


Test passed!

Testing whether every patient has exactly one 'discharge' transfer which is also the last transfer.
Test passed!


In [82]:

def get_first_death_time(df):
    """
    Given a list of transfers which includes information about the patient hospital admission and other information, get
    the time of death for the patient. This function exists for standardization.
    """

    # For each stay id (groupby), access the deathtime and compute the minimum if available
    earliest_deathtime = df.groupby("stay_id").deathtime.nth(0)    # Get the first row of deathtime (all rows have the same value)

    return earliest_deathtime


def get_first_icu_time(df):
    """
    Given a list of transfers which includes information about the patient hospital admission and other information, get
    the time of the first ICU entry for the patient if it exists.
    """

    cond1 = "careunit.str.contains('(?i)ICU', na=False, case=False)"

    # For each stay id (groupby), identify the transfers to ICU wards, and compute the entry time if available
    earliest_icu_time = (
        df
        .groupby("stay_id")
        .progress_apply(lambda x: (
            x
            # Careunit has ICU in name
            .query("""careunit.str.contains('(?i)ICU', na=False, case=False) | \
                    careunit.str.contains('(?i)Neuro Stepdown', na=False, case=False)""")
            # Get transfer entry time
            .intime
            # Get minimum of all ICU entries
            .min()
            )
        )
    )

    return earliest_icu_time


def get_first_discharge_time(df):
    """
    Given a list of transfers which includes information about the patient hospital admission and other information, get
    the time of discharge for the patient if it exists.

    Args:
        df (pd.DataFrame): Dataframe with transfers information.
    """

    # For each stay id (groupby), identify the discharge transfer, and compute the time if the location is not 'DIED'
    earliest_discharge_time = (
        df
        .groupby("stay_id")
        .progress_apply(lambda x: (
            x
            # Remove any transfers for death events
            .query("~ eventtype.str.contains('(?i)DIED', na=False, case=False)")
            # Within remaining transfers, get the discharge transfer
            .query("eventtype == 'discharge'")
            .squeeze()                     # Convert to pd.Series, we know there is exactly one discharge eventtype
            .dischtime                     # Get the discharge time
            )
        )
    )

    return earliest_discharge_time


def get_first_ward_time(df):
    """
    Given a list of transfers which includes information about the patient hospital admission and other information, get
    the time of the first transfer to a medical ward for the patient if it exists. This function exists for standardization.

    Args:
        df (pd.DataFrame): Dataframe with transfers information.
    """
    earliest_ward_time = df.groupby("stay_id").intime_next.nth(0) # Get the first row of intime_next (all rows have the same value)

    return earliest_ward_time


def compute_outcomes_from_events(df: pd.DataFrame, time_window: pd.Timedelta):
    """
    Given a dataframe with time information for each event, and the time window from outtime, compute the relevant
    outcome.

    This is an iterative process: first check if death, then ICU, then discharge, and finally discharge.

    Params:
        df (pd.DataFrame): Dataframe with time information for each event (and 'outtime', as well).
        time_window (pd.Timedelta): Time window from outtime to compute outcome.
    """

    # Check for death
    outcomes = (
        df
        .progress_apply(lambda x:
            "Death" if x.first_death <= x.outtime + time_window else (
                "ICU" if x.first_icu <= x.outtime + time_window else (
                    "Discharge" if x.first_discharge <= x.outtime + time_window else (
                        "Ward" 
                    )
                )
            ),
            axis=1
        )
    )

    # Check outcome is well-defined (i.e. the above returns exactly one outcome)
    # assert outcomes.isin(["Death", "ICU", "Discharge", "Ward"]).all()

    return outcomes



In [57]:

# First subset Transfers
tr_merge_ids = ["subject_id", "hadm_id", "stay_id",
                "outtime", "deathtime", "intime_next", "outtime_next",
                "dischtime", "discharge_location"]
transfers_S1 = (
    transfers_core
    .merge(
        admissions_S2[tr_merge_ids],
        how="inner",
        on=["subject_id", "hadm_id"],
        suffixes=("", "_ed")
    )
    .sort_values(by=["subject_id", "stay_id"], ascending=True)
)

# Run tests
tests.test_is_complete_ids(transfers_S1, "subject_id", "hadm_id")
tests.test_outtimes_match(transfers_S1)
tests.test_every_patient_has_discharge_transfer(transfers_S1)

# Now compute the earliest time given the list of transfers.
earliest_outcome_times = (
    admissions_S1
    .set_index("stay_id")              # Set index to stay_id to match the below
    .assign(first_death=get_first_death_time(transfers_S1))  # Compute first death time
    .assign(first_icu=get_first_icu_time(transfers_S1)) # Compute first icu time
    .assign(first_ward=get_first_ward_time(transfers_S1)) # Compute first ward time
    .assign(first_discharge=get_first_discharge_time(transfers_S1)) # Compute first discharge time
    .loc[:, ["first_death", "first_icu", "first_ward", "first_discharge",
            "outtime", "discharge_location", "subject_id"]]
    .query("first_death.ge(outtime) | first_death.isna()")    # Remove weird patients with death > outtime
    .query("first_icu.ge(outtime) | first_icu.isna()")       # Remove weird patients with icu > outtime
    .query("first_ward.ge(outtime) | first_ward.isna()")    # Remove weird patients with ward > outtime
    .query("first_discharge.ge(outtime) | first_discharge.isna()")   # Remove weird patients with discharge > outtime (difference is usually writing down error)
)

# Testing for computed outcomes
tests.test_events_after_outtime(earliest_outcome_times)


Testing ids are complete for params ('subject_id', 'hadm_id')
Test passed for variable subject_id!
Test passed for variable hadm_id!

Testing whether outtimes match when available.


100%|██████████| 8134/8134 [00:10<00:00, 770.05it/s] 


Test passed!

Testing whether every patient has exactly one 'discharge' transfer which is also the last transfer.
Test passed!


100%|██████████| 8134/8134 [01:28<00:00, 91.63it/s] 
100%|██████████| 8134/8134 [02:26<00:00, 55.65it/s] 


Testing whether events occur after ED outtime.
Test passed for feature first_death!
Test passed for feature first_icu!
Test passed for feature first_ward!
Test passed for feature first_discharge!





In [108]:
hours_outcome_window = DEFAULT_CONFIG["OUTCOME_WINDOW"]

# Load TIme window
day_delta = pd.Timedelta(hours=hours_outcome_window)

# Compute outcomes
cat_outcomes = compute_outcomes_from_events(earliest_outcome_times, time_window=day_delta)

# Convert to one hot encoding
oh_outcomes = pd.get_dummies(cat_outcomes.squeeze()).sort_index().reset_index(drop=False)

# Subset admissions and vitals
admissions_final = adm_proc.query("stay_id.isin(@oh_outcomes.stay_id)").sort_values("stay_id")
vitals_final = vit_proc.query("stay_id.isin(@oh_outcomes.stay_id)").sort_values("stay_id")

# Runt tests
tests.test_data_processed_correctly(admissions_final, vitals_final, oh_outcomes)


# Print Base Information
print(f"Number of cohort patient: {admissions_final.stay_id.nunique()}")
print(f"Number of observations: {vitals_final.shape[0]}")
print(f"Sample outcome distribution: \n{oh_outcomes.sum(axis=0).iloc[1:]}")

100%|██████████| 8328/8328 [00:01<00:00, 5033.48it/s]



Testing whether data was processed correctly.

Testing ids are complete for params ('subject_id', 'stay_id', 'hadm_id')
Test passed for variable subject_id!
Test passed for variable stay_id!
Test passed for variable hadm_id!

Testing ids are complete for params ('subject_id', 'stay_id', 'hadm_id')
Test passed for variable subject_id!
Test passed for variable stay_id!
Test passed for variable hadm_id!

Testing ids are complete for params ('stay_id',)
Test passed for variable stay_id!

Testing ids are unique for params ('subject_id', 'stay_id', 'hadm_id')
Test passed for variable  subject_id!
Test passed for variable  stay_id!
Test passed for variable  hadm_id!

Testing ids are unique for params ('stay_id',)
Test passed for variable  stay_id!
Test passed!
Number of cohort patient: 8328
Number of observations: 53430
Sample outcome distribution: 
stay_id      291433051618
Death                  25
Discharge            2860
ICU                  1316
Ward                 4127
dtype: int64


In [107]:
reload(tests)

<module 'src.data_processing.MIMIC.test_functions' from 'c:\\Users\\hruia\\PycharmProjects\\VarPhenClustering\\src\\data_processing\\MIMIC\\test_functions.py'>

In [104]:
vitals_final.hadm_id.unique()

array([20568985., 28717529., 23913569., ..., 22342750., 23005038.,
       20790981.])