# eICU Data Joining
---

Reading and joining all parts of the eICU dataset from MIT with the data from over 139k patients collected in the US.

The main goal of this notebook is to prepare a single CSV document that contains all the relevant data to be used when training a machine learning model that predicts mortality, joining tables, filtering useless columns and performing imputation.

## Importing the necessary packages

In [None]:
import dask.dataframe as dd                # Dask to handle big data in dataframes
import pandas as pd                        # Pandas to load the data initially
from dask.distributed import Client        # Dask scheduler
from dask.diagnostics import ProgressBar   # Dask progress bar
import re                                  # re to do regex searches in string data
import os                                  # os handles directory/workspace changes
import numpy as np                         # NumPy to handle numeric and NaN operations
from tqdm import tqdm_notebook             # tqdm allows to track code execution progress
import numbers                             # numbers allows to check if data is numeric
import utils                               # Contains auxiliary functions

In [None]:
# Debugging packages
import pixiedust                           # Debugging in Jupyter Notebook cells

In [None]:
# Change to parent directory (presumably "Documents")
os.chdir("../../..")

# Path to the CSV dataset files
data_path = 'Documents/Datasets/Thesis/eICU/uncompressed/'
project_path = 'Documents/GitHub/eICU-mortality-prediction/'

In [None]:
# Set up local cluster
client = Client("tcp://127.0.0.1:56898")
client

In [None]:
# Upload the utils.py file, so that the Dask cluster has access to relevant auxiliary functions
client.upload_file(f'{project_path}NeuralNetwork.py')
client.upload_file(f'{project_path}utils.py')

In [None]:
print(f'{project_path}utils.py')

In [None]:
client.run(os.getcwd)

## Initialize variables

In [None]:
cat_feat = []                              # List of categorical features
cat_embed_feat = []                        # List of categorical features that will be embedded
cat_embed_feat_enum = dict()               # Dictionary of the enumerations of the categorical features that will be embedded

## Patient data

### Read the data

In [None]:
patient_df = dd.read_csv(f'{data_path}original/patient.csv')
patient_df.head()

In [None]:
patient_df = patient_df.repartition(npartitions=30)

In [None]:
patient_df.npartitions

Get an overview of the dataframe through the `describe` method:

In [None]:
patient_df.describe().compute().transpose()

In [None]:
patient_df.visualize()

In [None]:
patient_df.columns

In [None]:
patient_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(patient_df)

### Remove unneeded features

In [None]:
patient_df = patient_df[['patientunitstayid', 'gender', 'age', 'ethnicity', 'apacheadmissiondx',  'admissionheight', 
                         'hospitaldischargeoffset', 'hospitaldischargelocation', 'hospitaldischargestatus', 
                         'admissionweight', 'dischargeweight', 'unitdischargeoffset']]
patient_df.head()

### Make the age feature numeric

In the eICU dataset, ages above 89 years old are not specified. Instead, we just receive the indication "> 89". In order to be able to work with the age feature numerically, we'll just replace the "> 89" values with "90", as if the patient is 90 years old. It might not always be the case, but it shouldn't be very different and it probably doesn't affect too much the model's logic.

In [None]:
patient_df.age.value_counts().head()

In [None]:
# Replace the "> 89" years old indication with 90 years
patient_df.age = patient_df.age.replace(to_replace='> 89', value=90)

In [None]:
patient_df.age.value_counts().head()

In [None]:
# Make the age feature numeric
patient_df.age = patient_df.age.astype(float)

In [None]:
patient_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
patient_df = client.persist(patient_df)

In [None]:
patient_df.visualize()

### Discretize categorical features

Convert binary categorical features into simple numberings, one hot encode features with a low number of categories (in this case, 5) and enumerate sparse categorical features that will be embedded.

#### Convert binary categorical features into numeric

In [None]:
patient_df.gender.value_counts().compute()

In [None]:
patient_df.gender = patient_df.gender.map(lambda x: 1 if x == 'Male' else 0 if x == 'Female' else np.nan)

In [None]:
patient_df.gender.value_counts().compute()

#### Separate and prepare features for embedding

Identify categorical features that have more than 5 unique categories, which will go through an embedding layer afterwards, and enumerate them.

[TODO] Only enumerate the `apacheadmissiondx` feature after joining it with all the remaining diagnosis features

Update list of categorical features and add those that will need embedding (features with more than 5 unique values):

In [None]:
new_cat_feat = ['ethnicity', 'apacheadmissiondx']
[cat_feat.append(col) for col in new_cat_feat]

In [None]:
cat_feat_nunique = [patient_df[feature].nunique().compute() for feature in new_cat_feat]
cat_feat_nunique

In [None]:
new_cat_embed_feat = []
for i in range(len(new_cat_feat)):
    if cat_feat_nunique[i] > 5:
        # Add feature to the list of those that will be embedded
        cat_embed_feat.append(new_cat_feat[i])
        # Add feature to the list of the new ones (from the current table) that will be embedded
        new_cat_embed_feat.append(new_cat_feat[i])

In [None]:
patient_df[new_cat_feat].head()

In [None]:
for i in range(len(new_cat_embed_feat)):
    feature = new_cat_embed_feat[i]
    # Prepare for embedding, i.e. enumerate categories
    patient_df[feature], cat_embed_feat_enum[feature] = utils.enum_categorical_feature(patient_df, feature)

In [None]:
patient_df[new_cat_feat].head()

In [None]:
cat_embed_feat_enum

In [None]:
patient_df[cat_feat].dtypes

In [None]:
patient_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
patient_df = client.persist(patient_df)

In [None]:
patient_df.visualize()

### Create mortality label

Combine info from discharge location and discharge status. Using the hospital discharge data, instead of the unit, as it has a longer perspective on the patient's status. I then save a feature called "deathOffset", which has a number if the patient is dead on hospital discharge or is NaN if the patient is still alive/unknown (presumed alive if unknown). Based on this, a label can be made later on, when all the tables are combined in a single dataframe, indicating if a patient dies in the following X time, according to how faraway we want to predict.

In [None]:
patient_df.hospitaldischargestatus.value_counts().compute()

In [None]:
patient_df.hospitaldischargelocation.value_counts().compute()

In [None]:
patient_df['deathoffset'] = patient_df.apply(lambda df: df['hospitaldischargeoffset'] 
                                                        if df['hospitaldischargestatus'] == 'Expired' or
                                                        df['hospitaldischargelocation'] == 'Death' else np.nan, axis=1, 
                                                        meta=('x', float))

In [None]:
patient_df.head()

Remove the now unneeded hospital discharge features:

In [None]:
patient_df = patient_df.drop(['hospitaldischargeoffset', 'hospitaldischargestatus', 'hospitaldischargelocation'], axis=1)
patient_df.head(6)

In [None]:
patient_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
patient_df = client.persist(patient_df)

In [None]:
patient_df.visualize()

### Create a discharge instance and the timestamp feature

Create the timestamp (`ts`) feature:

In [None]:
patient_df['ts'] = 0
patient_df.head()

In [None]:
patient_df.patientunitstayid.value_counts().compute()

Duplicate every row, so as to create a discharge event:

In [None]:
patient_df = patient_df.append(patient_df)
patient_df.patientunitstayid.value_counts().compute()

Sort by `patientunitstayid` so as to keep the timestamps of the same patient together:

In [None]:
patient_df = patient_df.compute().sort_values(by='patientunitstayid')
patient_df.head(6)

Create a weight feature:

In [None]:
# Create feature weight and assign the initial weight that the patient has on admission
patient_df['weight'] = patient_df['admissionweight']
patient_df.head()

Set the `weight` and `ts` features to initially have the value on admission and, on the second timestamp, have the value on discharge:

In [None]:
def set_weight(row):
    global patient_first_row
    if not patient_first_row:
        row['weight'] = row['dischargeweight']
        patient_first_row = True
    else:
        patient_first_row = False
    return row

In [None]:
patient_first_row = False
patient_df = patient_df.apply(lambda row: set_weight(row), axis=1)
patient_df.head(6)

In [None]:
def set_ts(row):
    global patient_first_row
    if not patient_first_row:
        row['ts'] = row['unitdischargeoffset']
        patient_first_row = True
    else:
        patient_first_row = False
    return row

In [None]:
patient_first_row = False
patient_df = patient_df.apply(lambda row: set_ts(row), axis=1)
patient_df.head(6)

Remove the remaining, now unneeded, weight and timestamp features:

In [None]:
patient_df = patient_df.drop(['admissionweight', 'dischargeweight', 'unitdischargeoffset'], axis=1)
patient_df.head(6)

Create a `diagnosis` feature:

In [None]:
patient_df['diagnosis'] = patient_df['apacheadmissiondx']
patient_df.head()

Add to the list of categorical and to be embedded features:

In [None]:
cat_feat.remove('apacheadmissiondx')
cat_embed_feat.remove('apacheadmissiondx')
new_cat_feat.remove('apacheadmissiondx')
new_cat_embed_feat.remove('apacheadmissiondx')
cat_feat.append('diagnosis')
cat_embed_feat.append('diagnosis')
new_cat_feat.append('diagnosis')
new_cat_embed_feat.append('diagnosis')

Similarly, only set the `diagnosis` to the admission instance, as the current table only has diagnosis on admission:

In [None]:
def set_diagnosis(row):
    global patient_first_row
    if not patient_first_row:
        row['diagnosis'] = np.nan
        patient_first_row = True
    else:
        patient_first_row = False
    return row

In [None]:
patient_first_row = False
patient_df = patient_df.apply(lambda row: set_diagnosis(row), axis=1)
patient_df.head(6)

Remove the admission diagnosis feature `apacheadmissiondx`:

In [None]:
patient_df = patient_df.drop('apacheadmissiondx', axis=1)
patient_df.head(6)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
patient_df = dd.from_pandas(patient_df.set_index('ts'), npartitions=30, sort=False)
patient_df.head(6)

In [None]:
patient_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
patient_df = client.persist(patient_df)

In [None]:
patient_df.visualize()

### Normalize data

Save the dataframe before normalizing:

In [None]:
patient_df.to_parquet(f'{data_path}cleaned/unnormalized/patient.parquet')

In [None]:
new_cat_feat

In [None]:
patient_df.head()

In [None]:
patient_df_norm = utils.normalize_data(patient_df, embed_columns=new_cat_feat, 
                                       id_columns=['patientunitstayid', 'deathoffset'])
patient_df_norm.head(6)

In [None]:
patient_df_norm.to_parquet(f'{data_path}cleaned/normalized/patient.parquet')

Confirm that everything is ok through the `describe` method:

In [None]:
patient_df_norm.describe().compute().transpose()

## Vital signs periodic data

### Read the data

In [None]:
vital_prdc_df = dd.read_csv(f'{data_path}original/vitalPeriodic.csv')
vital_prdc_df.head()

In [None]:
vital_prdc_df.npartitions

In [None]:
vital_prdc_df = vital_prdc_df.repartition(npartitions=30)

Get an overview of the dataframe through the `describe` method:

In [None]:
vital_prdc_df.describe().compute().transpose()

In [None]:
vital_prdc_df.visualize()

In [None]:
vital_prdc_df.columns

In [None]:
vital_prdc_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(patient_df)

### Remove unneeded features

In [None]:
patient_df = patient_df[['patientunitstayid', 'gender', 'age', 'ethnicity', 'apacheadmissiondx',  'admissionheight', 
                         'hospitaldischargeoffset', 'hospitaldischargelocation', 'hospitaldischargestatus', 
                         'admissionweight', 'dischargeweight', 'unitdischargeoffset']]
patient_df.head()

### Discretize categorical features

Convert binary categorical features into simple numberings, one hot encode features with a low number of categories (in this case, 5) and enumerate sparse categorical features that will be embedded.

#### Convert binary categorical features into numeric

In [None]:
patient_df.gender.value_counts().compute()

In [None]:
patient_df.gender = patient_df.gender.map(lambda x: 1 if x == 'Male' else 0 if x == 'Female' else np.nan)

In [None]:
patient_df.gender.value_counts().compute()

#### Separate and prepare features for embedding

Identify categorical features that have more than 5 unique categories, which will go through an embedding layer afterwards, and enumerate them.

Update list of categorical features and add those that will need embedding (features with more than 5 unique values):

In [None]:
new_cat_feat = ['ethnicity', 'apacheadmissiondx']
[cat_feat.append(col) for col in new_cat_feat]

In [None]:
cat_feat_nunique = [patient_df[feature].nunique().compute() for feature in new_cat_feat]
cat_feat_nunique

In [None]:
new_cat_embed_feat = []
for i in range(len(new_cat_feat)):
    if cat_feat_nunique[i] > 5:
        # Add feature to the list of those that will be embedded
        cat_embed_feat.append(new_cat_feat[i])
        # Add feature to the list of the new ones (from the current table) that will be embedded
        new_cat_embed_feat.append(new_cat_feat[i])

In [None]:
patient_df[new_cat_feat].head()

In [None]:
for i in range(len(new_cat_embed_feat)):
    feature = new_cat_embed_feat[i]
    # Prepare for embedding, i.e. enumerate categories
    patient_df[feature], cat_embed_feat_enum[feature] = utils.enum_categorical_feature(patient_df, feature)

In [None]:
patient_df[new_cat_feat].head()

In [None]:
cat_embed_feat_enum

In [None]:
patient_df[cat_feat].dtypes

In [None]:
patient_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
patient_df = client.persist(patient_df)

In [None]:
patient_df.visualize()

### Create the timestamp feature and sort

Create the timestamp (`ts`) feature:

In [None]:
patient_df['ts'] = 0
vital_aprdc_df = vital_aprdc_df.drop('observationoffset', axis=1)
patient_df.head()

In [None]:
patient_df.patientunitstayid.value_counts().compute()

Remove duplicate rows:

In [None]:
len(patient_df)

In [None]:
patient_df = patient_df.drop_duplicates()
patient_df.head()

In [None]:
len(patient_df)

In [None]:
patient_df = patient_df.repartition(npartitions=30)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
vital_prdc_df = vital_prdc_df.set_index('ts')
vital_prdc_df.head(6)

In [None]:
patient_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
patient_df = client.persist(patient_df)

In [None]:
patient_df.visualize()

Check for possible multiple rows with the same unit stay ID and timestamp:

In [None]:
micro_df.reset_index().head()

In [None]:
micro_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='culturesite').head()

In [None]:
micro_df[micro_df.patientunitstayid == 3069495].compute().head(20)

### Join rows that have the same IDs

In [None]:
micro_df = utils.join_categorical_enum(micro_df, new_cat_embed_feat)
micro_df.head()

In [None]:
micro_df.dtypes

In [None]:
micro_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='culturesite').head()

In [None]:
micro_df[micro_df.patientunitstayid == 3069495].compute().head(20)

Comparing the output from the two previous cells with what we had before the `join_categorical_enum` method, we can see that all rows with duplicate IDs have been successfully joined.

In [None]:
micro_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
micro_df = client.persist(micro_df)

In [None]:
micro_df.visualize()

### Normalize data

Save the dataframe before normalizing:

In [None]:
patient_df.to_parquet(f'{data_path}cleaned/unnormalized/patient.parquet')

In [None]:
patient_df_norm = utils.normalize_data(patient_df, embed_columns=new_cat_feat, 
                                       id_columns=['patientunitstayid', 'ts', 'deathoffset'])
patient_df_norm.head(6)

In [None]:
patient_df_norm.to_parquet(f'{data_path}cleaned/normalized/patient.parquet')

Confirm that everything is ok through the `describe` method:

In [None]:
patient_df_norm.describe().compute().transpose()

### Join dataframes

Merge dataframes by the unit stay, `patientunitstayid`, and the timestamp, `ts`, with a tolerence for a difference of up to 30 minutes.

In [None]:
patient_df = dd.read_parquet(f'{data_path}cleaned/normalized/patient.parquet')
patient_df.head()

In [None]:
vital_prdc_df = dd.read_parquet(f'{data_path}cleaned/normalized/vitalPeriodic.parquet')
vital_prdc_df.head()

In [None]:
eICU_df = dd.merge_asof(patient_df, vital_aprdc_df, on='ts', by='patientunitstayid', direction='nearest', tolerance=30)
eICU_df.head()

## Vital signs aperiodic data

### Read the data

In [None]:
vital_aprdc_df = dd.read_csv(f'{data_path}original/vitalAperiodic.csv')
vital_aprdc_df.head()

In [None]:
len(vital_aprdc_df)

In [None]:
vital_aprdc_df.patientunitstayid.nunique().compute()

In [None]:
vital_aprdc_df.npartitions

In [None]:
vital_aprdc_df = vital_aprdc_df.repartition(npartitions=30)

Get an overview of the dataframe through the `describe` method:

In [None]:
vital_aprdc_df.describe().compute().transpose()

In [None]:
vital_aprdc_df.visualize()

In [None]:
vital_aprdc_df.columns

In [None]:
vital_aprdc_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(vital_aprdc_df)

### Remove unneeded features

In [None]:
vital_aprdc_df = vital_aprdc_df.drop('vitalaperiodicid', axis=1)
vital_aprdc_df.head()

### Create the timestamp feature and sort

Create the timestamp (`ts`) feature:

In [None]:
vital_aprdc_df['ts'] = vital_aprdc_df['observationoffset']
vital_aprdc_df = vital_aprdc_df.drop('observationoffset', axis=1)
vital_aprdc_df.head()

Remove duplicate rows:

In [None]:
len(vital_aprdc_df)

In [None]:
vital_aprdc_df = vital_aprdc_df.drop_duplicates()
vital_aprdc_df.head()

In [None]:
len(vital_aprdc_df)

In [None]:
vital_aprdc_df = vital_aprdc_df.repartition(npartitions=30)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
vital_aprdc_df = vital_aprdc_df.set_index('ts')
vital_aprdc_df.head(6)

In [None]:
vital_aprdc_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
vital_aprdc_df = client.persist(vital_aprdc_df)

In [None]:
vital_aprdc_df.visualize()

Check for possible multiple rows with the same unit stay ID and timestamp:

In [None]:
vital_aprdc_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='noninvasivemean').head()

In [None]:
vital_aprdc_df[micro_df.patientunitstayid == 3069495].compute().head(20)

### Join rows that have the same IDs

In [None]:
micro_df = utils.join_categorical_enum(micro_df, new_cat_embed_feat)
micro_df.head()

In [None]:
micro_df.dtypes

In [None]:
micro_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='culturesite').head()

In [None]:
micro_df[micro_df.patientunitstayid == 3069495].compute().head(20)

Comparing the output from the two previous cells with what we had before the `join_categorical_enum` method, we can see that all rows with duplicate IDs have been successfully joined.

In [None]:
micro_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
micro_df = client.persist(micro_df)

In [None]:
micro_df.visualize()

### Normalize data

Save the dataframe before normalizing:

In [None]:
vital_aprdc_df.to_parquet(f'{data_path}cleaned/unnormalized/vitalAperiodic.parquet')

In [None]:
vital_aprdc_df_norm = utils.normalize_data(vital_aprdc_df, 
                                           id_columns=['patientunitstayid', 'ts'])
vital_aprdc_df_norm.head(6)

In [None]:
vital_aprdc_df_norm.to_parquet(f'{data_path}cleaned/normalized/vitalAperiodic.parquet')

Confirm that everything is ok through the `describe` method:

In [None]:
vital_aprdc_df_norm.describe().compute().transpose()

### Join dataframes

Merge dataframes by the unit stay, `patientunitstayid`, and the timestamp, `ts`, with a tolerence for a difference of up to 30 minutes.

In [None]:
patient_df = dd.read_parquet(f'{data_path}cleaned/normalized/patient.parquet')
patient_df.head()

In [None]:
vital_aprdc_df = dd.read_parquet(f'{data_path}cleaned/normalized/vitalAperiodic.parquet')
vital_aprdc_df.head()

In [None]:
eICU_df = dd.merge_asof(patient_df, vital_aprdc_df, on='ts', by='patientunitstayid', direction='nearest', tolerance=30)
eICU_df.head()

## Infectious disease data

### Read the data

In [None]:
infect_df = dd.read_csv(f'{data_path}original/carePlanInfectiousDisease.csv')
infect_df.head()

In [None]:
infect_df.npartitions

In [None]:
infect_df = infect_df.repartition(npartitions=30)

In [None]:
infect_df.infectdiseasesite.value_counts().head(10)

In [None]:
infect_df.infectdiseaseassessment.value_counts().head(10)

In [None]:
infect_df.responsetotherapy.value_counts().head(10)

In [None]:
infect_df.treatment.value_counts().head(10)

Most features in this table either don't add much information or they have a lot of missing values. The truly relevant one seems to be `infectdiseasesite`. Even `activeupondischarge` doesn't seem very practical as we don't have complete information as to when infections end, might as well just register when they are first verified.

Get an overview of the dataframe through the `describe` method:

In [None]:
infect_df.describe().compute().transpose()

In [None]:
infect_df.visualize()

In [None]:
infect_df.columns

In [None]:
infect_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(infect_df)

### Remove unneeded features

In [None]:
infect_df = infect_df[['patientunitstayid', 'cplinfectdiseaseoffset', 'infectdiseasesite']]
infect_df.head()

### Discretize categorical features

Convert binary categorical features into simple numberings, one hot encode features with a low number of categories (in this case, 5) and enumerate sparse categorical features that will be embedded.

#### Separate and prepare features for embedding

Identify categorical features that have more than 5 unique categories, which will go through an embedding layer afterwards, and enumerate them.

Update list of categorical features and add those that will need embedding (features with more than 5 unique values):

In [None]:
new_cat_feat = ['infectdiseasesite']
[cat_feat.append(col) for col in new_cat_feat]

In [None]:
cat_feat_nunique = [infect_df[feature].nunique().compute() for feature in new_cat_feat]
cat_feat_nunique

In [None]:
new_cat_embed_feat = []
for i in range(len(new_cat_feat)):
    if cat_feat_nunique[i] > 5:
        # Add feature to the list of those that will be embedded
        cat_embed_feat.append(new_cat_feat[i])
        # Add feature to the list of the new ones (from the current table) that will be embedded
        new_cat_embed_feat.append(new_cat_feat[i])

In [None]:
infect_df[new_cat_feat].head()

In [None]:
for i in range(len(new_cat_embed_feat)):
    feature = new_cat_embed_feat[i]
    # Prepare for embedding, i.e. enumerate categories
    infect_df[feature], cat_embed_feat_enum[feature] = utils.enum_categorical_feature(infect_df, feature)

In [None]:
infect_df[new_cat_feat].head()

In [None]:
cat_embed_feat_enum

In [None]:
infect_df[cat_feat].dtypes

In [None]:
infect_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
infect_df = client.persist(infect_df)

In [None]:
infect_df.visualize()

### Create the timestamp feature and sort

Create the timestamp (`ts`) feature:

In [None]:
infect_df['ts'] = infect_df['cplinfectdiseaseoffset']
infect_df = infect_df.drop('cplinfectdiseaseoffset', axis=1)
infect_df.head()

In [None]:
infect_df.patientunitstayid.value_counts().compute()

Only 3620 unit stays have infection data. Might not be useful to include them.

Remove duplicate rows:

In [None]:
len(infect_df)

In [None]:
infect_df = infect_df.drop_duplicates()
infect_df.head()

In [None]:
len(infect_df)

In [None]:
infect_df = infect_df.repartition(npartitions=30)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
infect_df = infect_df.set_index('ts')
infect_df.head(6)

In [None]:
infect_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
infect_df = client.persist(infect_df)

In [None]:
infect_df.visualize()

Check for possible multiple rows with the same unit stay ID and timestamp:

In [None]:
infect_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='infectdiseasesite').head()

In [None]:
infect_df[infect_df.patientunitstayid == 3049689].compute().head(20)

We can see that there are up to 6 categories per set of `patientunitstayid` and `ts`. As such, we must join them.

### Join rows that have the same IDs

In [None]:
infect_df = utils.join_categorical_enum(infect_df, new_cat_embed_feat)
infect_df.head()

In [None]:
infect_df.dtypes

In [None]:
infect_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='infectdiseasesite').head()

In [None]:
infect_df[infect_df.patientunitstayid == 3049689].compute().head(20)

Comparing the output from the two previous cells with what we had before the `join_categorical_enum` method, we can see that all rows with duplicate IDs have been successfully joined.

In [None]:
infect_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
infect_df = client.persist(infect_df)

In [None]:
infect_df.visualize()

### Normalize data

Save the dataframe before normalizing:

In [None]:
infect_df.to_parquet(f'{data_path}cleaned/unnormalized/carePlanInfectiousDisease.parquet')

In [None]:
infect_df_norm = utils.normalize_data(infect_df, embed_columns=new_cat_feat, 
                                      id_columns=['patientunitstayid'])
infect_df_norm.head(6)

In [None]:
infect_df_norm.to_parquet(f'{data_path}cleaned/normalized/carePlanInfectiousDisease.parquet')

Confirm that everything is ok through the `describe` method:

In [None]:
infect_df_norm.describe().compute().transpose()

### Join dataframes

Merge dataframes by the unit stay, `patientunitstayid`, and the timestamp, `ts`, with a tolerence for a difference of up to 30 minutes.

In [None]:
infect_df = dd.read_parquet(f'{data_path}cleaned/normalized/carePlanInfectiousDisease.parquet')
infect_df.head()

In [None]:
eICU_df = dd.merge_asof(eICU_df, infect_df, on='ts', by='patientunitstayid', direction='nearest', tolerance=30)
eICU_df.head()

## Microbiology data

### Read the data

In [None]:
micro_df = dd.read_csv(f'{data_path}original/microLab.csv')
micro_df.head()

In [None]:
len(micro_df)

In [None]:
micro_df.patientunitstayid.nunique().compute()

Only 2923 unit stays have microbiology data. Might not be useful to include them.

In [None]:
micro_df.npartitions

In [None]:
micro_df = micro_df.repartition(npartitions=30)

Get an overview of the dataframe through the `describe` method:

In [None]:
micro_df.describe().compute().transpose()

In [None]:
micro_df.visualize()

In [None]:
micro_df.columns

In [None]:
micro_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(micro_df)

### Remove unneeded features

In [None]:
micro_df.culturesite.value_counts().compute()

In [None]:
micro_df.organism.value_counts().compute()

In [None]:
micro_df.antibiotic.value_counts().compute()

In [None]:
micro_df.sensitivitylevel.value_counts().compute()

All features appear to be relevant, except the unique identifier of the table.

In [None]:
micro_df = micro_df.drop('microlabid', axis=1)
micro_df.head()

### Discretize categorical features

Convert binary categorical features into simple numberings, one hot encode features with a low number of categories (in this case, 5) and enumerate sparse categorical features that will be embedded.

#### Separate and prepare features for embedding

Identify categorical features that have more than 5 unique categories, which will go through an embedding layer afterwards, and enumerate them.

In the case of microbiology data, we're also going to embed the antibiotic `sensitivitylevel`, not because it has many categories, but because there can be several rows of data per timestamp (which would be impractical on one hot encoded data).

Update list of categorical features and add those that will need embedding (features with more than 5 unique values):

In [None]:
new_cat_feat = ['culturesite', 'organism', 'antibiotic', 'sensitivitylevel']
[cat_feat.append(col) for col in new_cat_feat]

In [None]:
cat_feat_nunique = [micro_df[feature].nunique().compute() for feature in new_cat_feat]
cat_feat_nunique

In [None]:
new_cat_embed_feat = []
for i in range(len(new_cat_feat)):
    if cat_feat_nunique[i] > 5 or new_cat_feat[i] == 'sensitivitylevel':
        # Add feature to the list of those that will be embedded
        cat_embed_feat.append(new_cat_feat[i])
        new_cat_embed_feat.append(new_cat_feat[i])

In [None]:
micro_df[new_cat_feat].head()

In [None]:
for i in range(len(new_cat_embed_feat)):
    feature = new_cat_embed_feat[i]
    # Prepare for embedding, i.e. enumerate categories
    micro_df[feature], cat_embed_feat_enum[feature] = utils.enum_categorical_feature(micro_df, feature)

In [None]:
micro_df[new_cat_feat].head()

In [None]:
cat_embed_feat_enum

In [None]:
micro_df[new_cat_feat].dtypes

In [None]:
micro_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
micro_df = client.persist(micro_df)

In [None]:
micro_df.visualize()

### Create the timestamp feature and sort

Create the timestamp (`ts`) feature:

In [None]:
micro_df['ts'] = micro_df['culturetakenoffset']
micro_df = micro_df.drop('culturetakenoffset', axis=1)
micro_df.head()

Remove duplicate rows:

In [None]:
len(micro_df)

In [None]:
micro_df = micro_df.drop_duplicates()
micro_df.head()

In [None]:
len(micro_df)

In [None]:
micro_df = micro_df.repartition(npartitions=30)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
micro_df = micro_df.set_index('ts')
micro_df.head()

In [None]:
micro_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
micro_df = client.persist(micro_df)

In [None]:
micro_df.visualize()

Check for possible multiple rows with the same unit stay ID and timestamp:

In [None]:
micro_df.reset_index().head()

In [None]:
micro_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='culturesite').head()

In [None]:
micro_df[micro_df.patientunitstayid == 3069495].compute().head(20)

We can see that there are up to 120 categories per set of `patientunitstayid` and `ts`. As such, we must join them.

### Join rows that have the same IDs

In [None]:
micro_df = utils.join_categorical_enum(micro_df, new_cat_embed_feat)
micro_df.head()

In [None]:
micro_df.dtypes

In [None]:
micro_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='culturesite').head()

In [None]:
micro_df[micro_df.patientunitstayid == 3069495].compute().head(20)

Comparing the output from the two previous cells with what we had before the `join_categorical_enum` method, we can see that all rows with duplicate IDs have been successfully joined.

In [None]:
micro_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
micro_df = client.persist(micro_df)

In [None]:
micro_df.visualize()

### Normalize data

Save the dataframe before normalizing:

In [None]:
micro_df.to_parquet(f'{data_path}cleaned/unnormalized/microLab.parquet')

In [None]:
micro_df_norm = utils.normalize_data(micro_df, embed_columns=new_cat_feat, 
                                     id_columns=['patientunitstayid'])
micro_df_norm.head(6)

In [None]:
micro_df_norm.to_parquet(f'{data_path}cleaned/normalized/microLab.parquet')

Confirm that everything is ok through the `describe` method:

In [None]:
micro_df_norm.describe().compute().transpose()

### Join dataframes

Merge dataframes by the unit stay, `patientunitstayid`, and the timestamp, `ts`, with a tolerence for a difference of up to 30 minutes.

In [None]:
micro_df = dd.read_parquet(f'{data_path}cleaned/normalized/microLab.parquet')
micro_df.head()

In [None]:
eICU_df = dd.merge_asof(eICU_df, micro_df, on='ts', by='patientunitstayid', direction='nearest', tolerance=30)
eICU_df.head()

## Respiratory care data

### Read the data

In [None]:
resp_care_df = dd.read_csv(f'{data_path}original/respiratoryCare.csv', dtype={'airwayposition': 'object',
                                                                              'airwaysize': 'object',
                                                                              'apneaparms': 'object',
                                                                              'setapneafio2': 'object',
                                                                              'setapneaie': 'object',
                                                                              'setapneainsptime': 'object',
                                                                              'setapneainterval': 'object',
                                                                              'setapneaippeephigh': 'object',
                                                                              'setapneapeakflow': 'object',
                                                                              'setapneatv': 'object'})
resp_care_df.head()

In [None]:
len(resp_care_df)

In [None]:
resp_care_df.patientunitstayid.nunique().compute()

In [None]:
resp_care_df.npartitions

In [None]:
resp_care_df = resp_care_df.repartition(npartitions=30)

Get an overview of the dataframe through the `describe` method:

In [None]:
resp_care_df.describe().compute().transpose()

In [None]:
resp_care_df.visualize()

In [None]:
resp_care_df.columns

In [None]:
resp_care_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(resp_care_df)

### Remove unneeded features

For the respiratoryCare table, I'm not going to use any of the several features that detail what the vent in the hospital is like. Besides not appearing to be very relevant for the patient, they have a lot of missing values (>67%). Instead, I'm going to set a ventilation label (between the start and the end), and a previous ventilation label.

In [None]:
resp_care_df = resp_care_df[['patientunitstayid', 'ventstartoffset',
                             'ventendoffset', 'priorventstartoffset']]
resp_care_df.head()

### Create the timestamp feature and sort

Create the timestamp (`ts`) feature:

In [None]:
resp_care_df['ts'] = resp_care_df['ventstartoffset']
resp_care_df = resp_care_df.drop('ventstartoffset', axis=1)
resp_care_df.head()

Remove duplicate rows:

In [None]:
len(resp_care_df)

In [None]:
resp_care_df = resp_care_df.drop_duplicates()
resp_care_df.head()

In [None]:
len(resp_care_df)

In [None]:
resp_care_df = resp_care_df.repartition(npartitions=30)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
resp_care_df = resp_care_df.set_index('ts')
resp_care_df.head()

In [None]:
resp_care_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
resp_care_df = client.persist(resp_care_df)

In [None]:
resp_care_df.visualize()

Check for possible multiple rows with the same unit stay ID and timestamp:

In [None]:
resp_care_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='ventendoffset').head()

In [None]:
resp_care_df[resp_care_df.patientunitstayid == 3348331].compute().head(20)

We can see that there are up to 5283 duplicate rows per set of `patientunitstayid` and `ts`. As such, we must join them.

### Join rows that have the same IDs

Even after removing duplicates rows, there are still some that have different information for the same ID and timestamp. We have to apply a groupby function, selecting the minimum value for each of the offset features, as the larger values don't make sense (in the `priorventstartoffset`).

In [None]:
((resp_care_df.index > resp_care_df.ventendoffset) & resp_care_df.ventendoffset != 0).compute().value_counts()

There are no errors of having the start vent timestamp later than the end vent timestamp.

In [None]:
resp_care_df = utils.join_categorical_enum(resp_care_df, cont_join_method='min')
resp_care_df.head()

In [None]:
resp_care_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='ventendoffset').head()

In [None]:
resp_care_df[resp_care_df.patientunitstayid == 1113084].compute().head(10)

Comparing the output from the two previous cells with what we had before the `join_categorical_enum` method, we can see that all rows with duplicate IDs have been successfully joined.

In [None]:
resp_care_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
resp_care_df = client.persist(resp_care_df)

In [None]:
resp_care_df.visualize()

Only keep the first instance of each patient, as we're only keeping track of when they are on ventilation:

In [None]:
resp_care_df = resp_care_df.reset_index().groupby('patientunitstayid').first().reset_index().set_index('ts')
resp_care_df.head(20)

### Create prior ventilation label

Make a feature `priorvent` that indicates if the patient has been on ventilation before.

Convert to pandas:

In [None]:
resp_care_df = resp_care_df.compute()

Create the prior ventilation column:

In [None]:
resp_care_df['priorvent'] = (resp_care_df.priorventstartoffset < resp_care_df.index).astype(int)
resp_care_df.head()

Revert to Dask:

In [None]:
resp_care_df = dd.from_pandas(resp_care_df, npartitions=30)
resp_care_df.head()

Remove the now unneeded `priorventstartoffset` column:

In [None]:
resp_care_df = resp_care_df.drop('priorventstartoffset', axis=1)
resp_care_df.head()

### Create current ventilation label

Make a feature `onvent` that indicates if the patient is currently on ventilation.

Duplicate every row, so as to create a discharge event:

In [None]:
resp_care_df = resp_care_df.append(resp_care_df)
resp_care_df.head()

Sort by `ts` so as to keep the order of timestamps:

In [None]:
resp_care_df = resp_care_df.reset_index()
resp_care_df.head()

In [None]:
resp_care_df = resp_care_df.compute().sort_values(by='ts')
resp_care_df.head(6)

Create a `onvent` feature:

In [None]:
resp_care_df['onvent'] = 1
resp_care_df.head(6)

Set the `onvent` and `ts` features to initially have the value on ventilation start and, on the second timestamp, have the value on ventilation end:

In [None]:
def set_onvent(row):
    global first_row
    if not first_row:
        row['onvent'] = 0
        first_row = True
    else:
        first_row = False
    return row

In [None]:
first_row = False
resp_care_df = resp_care_df.apply(lambda row: set_onvent(row), axis=1)
resp_care_df.head(6)

In [None]:
def set_ts_vent(row):
    global first_row
    if not first_row:
        row['ts'] = row['ventendoffset']
        first_row = True
    else:
        first_row = False
    return row

In [None]:
first_row = False
resp_care_df = resp_care_df.apply(lambda row: set_ts_vent(row), axis=1)
resp_care_df.head(6)

Remove the now unneeded ventilation end column:

In [None]:
resp_care_df = resp_care_df.drop('ventendoffset', axis=1)
resp_care_df.head(6)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
resp_care_df = dd.from_pandas(resp_care_df.set_index('ts'), npartitions=30, sort=True)
resp_care_df.head(6)

In [None]:
resp_care_df.tail(6)

In [None]:
resp_care_df[resp_care_df.patientunitstayid == 1557538].compute()

Save the dataframe:

In [None]:
resp_care_df.to_parquet(f'{data_path}cleaned/unnormalized/respiratoryCare.parquet')

In [None]:
resp_care_df.to_parquet(f'{data_path}cleaned/normalized/respiratoryCare.parquet')

### Join dataframes

Merge dataframes by the unit stay, `patientunitstayid`, and the timestamp, `ts`, with a tolerence for a difference of up to 30 minutes.

In [None]:
resp_care_df = dd.read_parquet(f'{data_path}cleaned/normalized/respiratoryCare.parquet')
resp_care_df.head()

In [None]:
eICU_df = dd.merge_asof(eICU_df, resp_care_df, on='ts', by='patientunitstayid', direction='nearest', tolerance=30)
eICU_df.head()

## Allergy data

### Read the data

In [None]:
alrg_df = dd.read_csv(f'{data_path}original/allergy.csv')
alrg_df.head()

In [None]:
len(alrg_df)

In [None]:
alrg_df.patientunitstayid.nunique().compute()

In [None]:
alrg_df.npartitions

In [None]:
alrg_df = alrg_df.repartition(npartitions=30)

Get an overview of the dataframe through the `describe` method:

In [None]:
alrg_df.describe().compute().transpose()

In [None]:
alrg_df.visualize()

In [None]:
alrg_df.columns

In [None]:
alrg_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(alrg_df)

### Remove unneeded features

In [None]:
alrg_df[alrg_df.allergytype == 'Non Drug'].drughiclseqno.value_counts().compute()

In [None]:
alrg_df[alrg_df.allergytype == 'Drug'].drughiclseqno.value_counts().compute()

As we can see, the drug features in this table only have data if the allergy derives from using the drug. As such, we don't need the `allergytype` feature. Also ignoring hospital staff related information and using just the drug codes instead of their names, as they're independent of the drug brand.

In [None]:
alrg_df.allergynotetype.value_counts().compute()

Feature `allergynotetype` also doesn't seem very relevant, discarding it.

In [None]:
alrg_df = alrg_df[['patientunitstayid', 'allergyoffset', 
                   'allergyname', 'drughiclseqno']]
alrg_df.head()

### Discretize categorical features

Convert binary categorical features into simple numberings, one hot encode features with a low number of categories (in this case, 5) and enumerate sparse categorical features that will be embedded.

#### Separate and prepare features for embedding

Identify categorical features that have more than 5 unique categories, which will go through an embedding layer afterwards, and enumerate them.

In the case of microbiology data, we're also going to embed the antibiotic `sensitivitylevel`, not because it has many categories, but because there can be several rows of data per timestamp (which would be impractical on one hot encoded data).

Update list of categorical features and add those that will need embedding (features with more than 5 unique values):

In [None]:
new_cat_feat = ['allergyname', 'drughiclseqno']
[cat_feat.append(col) for col in new_cat_feat]

In [None]:
cat_feat_nunique = [alrg_df[feature].nunique().compute() for feature in new_cat_feat]
cat_feat_nunique

In [None]:
new_cat_embed_feat = []
for i in range(len(new_cat_feat)):
    if cat_feat_nunique[i] > 5:
        # Add feature to the list of those that will be embedded
        cat_embed_feat.append(new_cat_feat[i])
        new_cat_embed_feat.append(new_cat_feat[i])

In [None]:
alrg_df[new_cat_feat].head()

In [None]:
for i in range(len(new_cat_embed_feat)):
    feature = new_cat_embed_feat[i]
    # Skip the 'drughiclseqno' from enumeration encoding
    if feature == 'drughiclseqno':
        continue
    # Prepare for embedding, i.e. enumerate categories
    alrg_df[feature], cat_embed_feat_enum[feature] = utils.enum_categorical_feature(alrg_df, feature)

Fill missing values of the drug data with 0, so as to prepare for embedding:

In [None]:
alrg_df.drughiclseqno = alrg_df.drughiclseqno.fillna(0).astype(int)

In [None]:
alrg_df[new_cat_feat].head()

In [None]:
cat_embed_feat_enum

In [None]:
alrg_df[new_cat_feat].dtypes

In [None]:
alrg_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
alrg_df = client.persist(alrg_df)

In [None]:
alrg_df.visualize()

### Create the timestamp feature and sort

Create the timestamp (`ts`) feature:

In [None]:
alrg_df['ts'] = alrg_df['allergyoffset']
alrg_df = alrg_df.drop('allergyoffset', axis=1)
alrg_df.head()

Remove duplicate rows:

In [None]:
len(alrg_df)

In [None]:
alrg_df = alrg_df.drop_duplicates()
alrg_df.head()

In [None]:
len(alrg_df)

In [None]:
alrg_df = alrg_df.repartition(npartitions=30)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
alrg_df = alrg_df.set_index('ts')
alrg_df.head()

In [None]:
alrg_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
alrg_df = client.persist(alrg_df)

In [None]:
alrg_df.visualize()

Check for possible multiple rows with the same unit stay ID and timestamp:

In [None]:
alrg_df.reset_index().head()

In [None]:
alrg_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='allergyname').head()

In [None]:
alrg_df[alrg_df.patientunitstayid == 3197554].compute().head(10)

We can see that there are up to 47 categories per set of `patientunitstayid` and `ts`. As such, we must join them.

### Join rows that have the same IDs

Even after removing duplicates rows, there are still some that have different information for the same ID and timestamp. We have to concatenate the categorical enumerations.

In [None]:
alrg_df = utils.join_categorical_enum(alrg_df, new_cat_embed_feat)
alrg_df.head()

In [None]:
alrg_df.dtypes

In [None]:
alrg_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='allergyname').head()

In [None]:
alrg_df[alrg_df.patientunitstayid == 3197554].compute().head(10)

Comparing the output from the two previous cells with what we had before the `join_categorical_enum` method, we can see that all rows with duplicate IDs have been successfully joined.

In [None]:
alrg_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
alrg_df = client.persist(alrg_df)

In [None]:
alrg_df.visualize()

### Renaming columns

In [None]:
alrg_df = alrg_df.rename(columns={'drughiclseqno':'drugallergyhiclseqno'})
alrg_df.head()

Save the dataframe:

In [None]:
alrg_df = alrg_df.repartition(npartitions=30)

In [None]:
alrg_df.to_parquet(f'{data_path}cleaned/unnormalized/allergy.parquet')

In [None]:
alrg_df.to_parquet(f'{data_path}cleaned/normalized/allergy.parquet')

Confirm that everything is ok through the `describe` method:

In [None]:
alrg_df.describe().compute().transpose()

### Join dataframes

Merge dataframes by the unit stay, `patientunitstayid`, and the timestamp, `ts`, with a tolerence for a difference of up to 30 minutes.

In [None]:
alrg_df = dd.read_parquet(f'{data_path}cleaned/normalized/allergy.parquet')
alrg_df.head()

In [None]:
alrg_df.npartitions

In [None]:
eICU_df = dd.merge_asof(eICU_df, alrg_df, on='ts', by='patientunitstayid', direction='nearest', tolerance=30)
eICU_df.head()

In [None]:
# [TODO] Check if careplangeneral table could be useful. It seems to have mostly subjective data.

## General care plan data

### Read the data

In [None]:
careplangen_df = dd.read_csv(f'{data_path}original/carePlanGeneral.csv')
careplangen_df.head()

In [None]:
len(careplangen_df)

In [None]:
careplangen_df.patientunitstayid.nunique().compute()

In [None]:
careplangen_df.npartitions

In [None]:
careplangen_df = careplangen_df.repartition(npartitions=30)

Get an overview of the dataframe through the `describe` method:

In [None]:
careplangen_df.describe().compute().transpose()

In [None]:
careplangen_df.visualize()

In [None]:
careplangen_df.columns

In [None]:
careplangen_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(careplangen_df)

### Remove unneeded features

In [None]:
careplangen_df.cplgroup.value_counts().compute()

In [None]:
careplangen_df.cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Activity'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Care Limitation'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Route-Status'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Critical Care Discharge/Transfer Planning'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Safety/Restraints'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Sedation'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Analgesia'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Ordered Protocols'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Volume Status'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Psychosocial Status'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Current Rate'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Baseline Status'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Protein'].cplitemvalue.value_counts().compute()

In [None]:
careplangen_df[careplangen_df.cplgroup == 'Calories'].cplitemvalue.value_counts().compute()

In this case, there aren't entire columns to remove. However, some specific types of care plan categories seem to be less relevant (e.g. activity, critical care discharge/transfer planning) or redundant (e.g. ventilation, infectious diseases). So, we're going to remove rows that have those categories.

In [None]:
careplangen_df = careplangen_df.drop('cplgeneralid', axis=1)
careplangen_df.head()

In [None]:
categories_to_remove = ['Ventilation', 'Airway', 'Activity', 'Care Limitation', 
                        'Route-Status', 'Critical Care Discharge/Transfer Planning', 
                        'Ordered Protocols', 'Acuity', 'Volume Status', 'Prognosis', 
                        'Care Providers', 'Family/Health Care Proxy/Contact Info', 'Current Rate', 
                        'Daily Goals/Safety Risks/Discharge Requirements', 'Goal Rate', 
                        'Planned Procedures', 'Infectious Disease', 
                        'Care Plan Reviewed with Patient/Family', 'Protein', 'Calories']

In [None]:
~(careplangen_df.cplgroup.isin(categories_to_remove)).head()

In [None]:
careplangen_df = careplangen_df[~(careplangen_df.cplgroup.isin(categories_to_remove))]
careplangen_df.head()

In [None]:
len(careplangen_df)

In [None]:
careplangen_df.patientunitstayid.nunique().compute()

There's still plenty of data left, affecting around 92.48% of the unit stays, even after removing several categories.

### Discretize categorical features

Convert binary categorical features into simple numberings, one hot encode features with a low number of categories (in this case, 5) and enumerate sparse categorical features that will be embedded.

#### Separate and prepare features for embedding

Identify categorical features that have more than 5 unique categories, which will go through an embedding layer afterwards, and enumerate them.

Update list of categorical features and add those that will need embedding (features with more than 5 unique values):

In [None]:
new_cat_feat = ['cplgroup', 'cplitemvalue']
[cat_feat.append(col) for col in new_cat_feat]

In [None]:
cat_feat_nunique = [careplangen_df[feature].nunique().compute() for feature in new_cat_feat]
cat_feat_nunique

In [None]:
new_cat_embed_feat = []
for i in range(len(new_cat_feat)):
    if cat_feat_nunique[i] > 5:
        # Add feature to the list of those that will be embedded
        cat_embed_feat.append(new_cat_feat[i])
        new_cat_embed_feat.append(new_cat_feat[i])

In [None]:
careplangen_df[new_cat_feat].head()

In [None]:
for i in range(len(new_cat_embed_feat)):
    feature = new_cat_embed_feat[i]
    # Prepare for embedding, i.e. enumerate categories
    careplangen_df[feature], cat_embed_feat_enum[feature] = utils.enum_categorical_feature(careplangen_df, feature)

In [None]:
careplangen_df[new_cat_feat].head()

In [None]:
cat_embed_feat_enum

In [None]:
careplangen_df[new_cat_feat].dtypes

In [None]:
careplangen_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
careplangen_df = client.persist(careplangen_df)

In [None]:
careplangen_df.visualize()

### Create the timestamp feature and sort

Create the timestamp (`ts`) feature:

In [None]:
careplangen_df['ts'] = careplangen_df['cplitemoffset']
careplangen_df = careplangen_df.drop('cplitemoffset', axis=1)
careplangen_df.head()

Remove duplicate rows:

In [None]:
len(careplangen_df)

In [None]:
careplangen_df = careplangen_df.drop_duplicates()
careplangen_df.head()

In [None]:
len(careplangen_df)

In [None]:
careplangen_df = careplangen_df.repartition(npartitions=30)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
careplangen_df = careplangen_df.set_index('ts')
careplangen_df.head()

In [None]:
careplangen_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
careplangen_df = client.persist(careplangen_df)

In [None]:
careplangen_df.visualize()

Check for possible multiple rows with the same unit stay ID and timestamp:

In [None]:
careplangen_df.reset_index().head()

In [None]:
careplangen_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='cplgroup').head()

In [None]:
careplangen_df[careplangen_df.patientunitstayid == 3138123].compute().head(10)

We can see that there are up to 32 categories per set of `patientunitstayid` and `ts`. As such, we must join them.

### Join rows that have the same IDs

In [None]:
careplangen_df = utils.join_categorical_enum(careplangen_df, new_cat_embed_feat)
careplangen_df.head()

In [None]:
careplangen_df.dtypes

In [None]:
careplangen_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='cplgroup').head()

In [None]:
careplangen_df[careplangen_df.patientunitstayid == 3138123].compute().head(10)

Comparing the output from the two previous cells with what we had before the `join_categorical_enum` method, we can see that all rows with duplicate IDs have been successfully joined.

In [None]:
careplangen_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
careplangen_df = client.persist(careplangen_df)

In [None]:
careplangen_df.visualize()

### Renaming columns

Keeping the `activeupondischarge` feature so as to decide if forward fill or leave at NaN each general care plan value, when we have the full dataframe. However, we need to identify this feature's original table, general care plan, so as to not confound with other data.

In [None]:
careplangen_df = careplangen_df.rename(columns={'activeupondischarge':'cpl_activeupondischarge'})
careplangen_df.head()

Save the dataframe:

In [None]:
careplangen_df = careplangen_df.repartition(npartitions=30)

In [None]:
careplangen_df.to_parquet(f'{data_path}cleaned/unnormalized/carePlanGeneral.parquet')

In [None]:
careplangen_df.to_parquet(f'{data_path}cleaned/normalized/carePlanGeneral.parquet')

Confirm that everything is ok through the `describe` method:

In [None]:
careplangen_df.describe().compute().transpose()

### Join dataframes

Merge dataframes by the unit stay, `patientunitstayid`, and the timestamp, `ts`, with a tolerence for a difference of up to 30 minutes.

In [None]:
careplangen_df = dd.read_parquet(f'{data_path}cleaned/normalized/carePlanGeneral.parquet')
careplangen_df.head()

In [None]:
careplangen_df.npartitions

In [None]:
eICU_df = dd.merge_asof(eICU_df, careplangen_df, on='ts', by='patientunitstayid', direction='nearest', tolerance=30)
eICU_df.head()

## Past history data

### Read the data

In [None]:
pasthist_df = dd.read_csv(f'{data_path}original/pastHistory.csv')
pasthist_df.head()

In [None]:
len(pasthist_df)

In [None]:
pasthist_df.patientunitstayid.nunique().compute()

In [None]:
pasthist_df.npartitions

In [None]:
pasthist_df = pasthist_df.repartition(npartitions=30)

Get an overview of the dataframe through the `describe` method:

In [None]:
pasthist_df.describe().compute().transpose()

In [None]:
pasthist_df.visualize()

In [None]:
pasthist_df.columns

In [None]:
pasthist_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(pasthist_df)

### Remove unneeded features

In [None]:
pasthist_df.pasthistorypath.value_counts().head(20)

In [None]:
pasthist_df.pasthistorypath.value_counts().tail(20)

In [None]:
pasthist_df.pasthistoryvalue.value_counts().compute()

In [None]:
pasthist_df.pasthistorynotetype.value_counts().compute()

In [None]:
pasthist_df[pasthist_df.pasthistorypath == 'notes/Progress Notes/Past History/Past History Obtain Options/Performed'].pasthistoryvalue.value_counts().compute()

In this case, considering that it regards past diagnosis of the patients, the timestamp when that was observed probably isn't very reliable nor useful. As such, I'm going to remove the offset variables. Furthermore, `pasthistoryvaluetext` is redundant with `pasthistoryvalue`, while `pasthistorynotetype` and the past history path 'notes/Progress Notes/Past History/Past History Obtain Options/Performed' seem to be irrelevant.

In [None]:
pasthist_df = pasthist_df.drop(['pasthistoryid', 'pasthistoryoffset', 'pasthistoryenteredoffset',
                                'pasthistorynotetype', 'pasthistoryvaluetext'], axis=1)
pasthist_df.head()

In [None]:
categories_to_remove = ['notes/Progress Notes/Past History/Past History Obtain Options/Performed']

In [None]:
~(pasthist_df.pasthistorypath.isin(categories_to_remove)).head()

In [None]:
pasthist_df = pasthist_df[~(pasthist_df.pasthistorypath.isin(categories_to_remove))]
pasthist_df.head()

In [None]:
len(pasthist_df)

In [None]:
pasthist_df.patientunitstayid.nunique().compute()

There's still plenty of data left, affecting around 81.87% of the unit stays, even after removing several categories.

### Discretize categorical features

Convert binary categorical features into simple numberings, one hot encode features with a low number of categories (in this case, 5) and enumerate sparse categorical features that will be embedded.

#### Separate and prepare features for embedding

Identify categorical features that have more than 5 unique categories, which will go through an embedding layer afterwards, and enumerate them.

Update list of categorical features and add those that will need embedding (features with more than 5 unique values):

In [None]:
new_cat_feat = ['pasthistorypath', 'pasthistoryvalue']
[cat_feat.append(col) for col in new_cat_feat]

In [None]:
cat_feat_nunique = [pasthist_df[feature].nunique().compute() for feature in new_cat_feat]
cat_feat_nunique

In [None]:
new_cat_embed_feat = []
for i in range(len(new_cat_feat)):
    if cat_feat_nunique[i] > 5:
        # Add feature to the list of those that will be embedded
        cat_embed_feat.append(new_cat_feat[i])
        new_cat_embed_feat.append(new_cat_feat[i])

In [None]:
pasthist_df[new_cat_feat].head()

In [None]:
for i in range(len(new_cat_embed_feat)):
    feature = new_cat_embed_feat[i]
    # Prepare for embedding, i.e. enumerate categories
    pasthist_df[feature], cat_embed_feat_enum[feature] = utils.enum_categorical_feature(pasthist_df, feature)

In [None]:
pasthist_df[new_cat_feat].head()

In [None]:
cat_embed_feat_enum

In [None]:
pasthist_df[new_cat_feat].dtypes

In [None]:
pasthist_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
pasthist_df = client.persist(pasthist_df)

In [None]:
pasthist_df.visualize()

### Remove duplicate rows

Remove duplicate rows:

In [None]:
len(pasthist_df)

In [None]:
pasthist_df = pasthist_df.drop_duplicates()
pasthist_df.head()

In [None]:
len(pasthist_df)

In [None]:
pasthist_df = pasthist_df.repartition(npartitions=30)

In [None]:
pasthist_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
pasthist_df = client.persist(pasthist_df)

In [None]:
pasthist_df.visualize()

Check for possible multiple rows with the same unit stay ID and timestamp:

In [None]:
pasthist_df.groupby(['patientunitstayid']).count().nlargest(columns='pasthistorypath').head()

In [None]:
pasthist_df[pasthist_df.patientunitstayid == 1558102].compute().head(10)

We can see that there are up to 20 categories per `patientunitstayid`. As such, we must join them.

### Join rows that have the same IDs

In [None]:
pasthist_df = utils.join_categorical_enum(pasthist_df, new_cat_embed_feat, id_columns=['patientunitstayid'])
pasthist_df.head()

In [None]:
pasthist_df.dtypes

In [None]:
pasthist_df.groupby(['patientunitstayid']).count().nlargest(columns='pasthistorypath').head()

In [None]:
pasthist_df[pasthist_df.patientunitstayid == 1558102].compute().head(10)

Comparing the output from the two previous cells with what we had before the `join_categorical_enum` method, we can see that all rows with duplicate IDs have been successfully joined.

In [None]:
pasthist_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
pasthist_df = client.persist(pasthist_df)

In [None]:
pasthist_df.visualize()

### Save the dataframe

In [None]:
pasthist_df = pasthist_df.repartition(npartitions=30)

In [None]:
pasthist_df.to_parquet(f'{data_path}cleaned/unnormalized/pastHistory.parquet')

In [None]:
pasthist_df.to_parquet(f'{data_path}cleaned/normalized/pastHistory.parquet')

### Join dataframes

Merge dataframes by the unit stay, `patientunitstayid`, and the timestamp, `ts`, with a tolerence for a difference of up to 30 minutes.

In [None]:
pasthist_df = dd.read_parquet(f'{data_path}cleaned/normalized/pastHistory.parquet')
pasthist_df.head()

In [None]:
pasthist_df.npartitions

In [None]:
eICU_df = dd.merge_asof(eICU_df, pasthist_df, on='ts', by='patientunitstayid', direction='nearest', tolerance=30)
eICU_df.head()

## Infusion drug data

### Read the data

In [None]:
infdrug_df = dd.read_csv(f'{data_path}original/infusionDrug.csv')
infdrug_df.head()

In [None]:
len(infdrug_df)

In [None]:
infdrug_df.patientunitstayid.nunique().compute()

In [None]:
infdrug_df.npartitions

In [None]:
infdrug_df = infdrug_df.repartition(npartitions=30)

Get an overview of the dataframe through the `describe` method:

In [None]:
infdrug_df.describe().compute().transpose()

In [None]:
infdrug_df.visualize()

In [None]:
infdrug_df.columns

In [None]:
infdrug_df.dtypes

### Check for missing values

In [None]:
utils.dataframe_missing_values(infdrug_df)

### Remove unneeded features

Besides removing the row ID `infusiondrugid`, I'm also removing `infusionrate`, `volumeoffluid` and `drugamount` as they seem redundant with `drugrate` although with a lot more missing values.

In [None]:
infdrug_df = infdrug_df.drop(['infusiondrugid', 'infusionrate', 'volumeoffluid', 'drugamount'], axis=1)
infdrug_df.head()

### Remove string drug rate values

In [None]:
infdrug_df[infdrug_df.drugrate.map(utils.is_definitely_string)].head()

In [None]:
infdrug_df[infdrug_df.drugrate.map(utils.is_definitely_string)].drugrate.value_counts().compute()

In [None]:
infdrug_df.drugrate = infdrug_df.drugrate.map(lambda x: np.nan if utils.is_definitely_string(x) else x)
infdrug_df.head()

In [None]:
infdrug_df.patientunitstayid = infdrug_df.patientunitstayid.astype(int)
infdrug_df.infusionoffset = infdrug_df.infusionoffset.astype(int)
infdrug_df.drugname = infdrug_df.drugname.astype(str)
infdrug_df.drugrate = infdrug_df.drugrate.astype(float)
infdrug_df.patientweight = infdrug_df.patientweight.astype(float)
infdrug_df.head()

In [None]:
infdrug_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
infdrug_df = client.persist(infdrug_df)

In [None]:
infdrug_df.visualize()

### Discretize categorical features

Convert binary categorical features into simple numberings, one hot encode features with a low number of categories (in this case, 5) and enumerate sparse categorical features that will be embedded.

#### Separate and prepare features for embedding

Identify categorical features that have more than 5 unique categories, which will go through an embedding layer afterwards, and enumerate them.

Update list of categorical features and add those that will need embedding (features with more than 5 unique values):

In [None]:
new_cat_feat = ['drugname']
[cat_feat.append(col) for col in new_cat_feat]

In [None]:
cat_feat_nunique = [infdrug_df[feature].nunique().compute() for feature in new_cat_feat]
cat_feat_nunique

In [None]:
new_cat_embed_feat = []
for i in range(len(new_cat_feat)):
    if cat_feat_nunique[i] > 5:
        # Add feature to the list of those that will be embedded
        cat_embed_feat.append(new_cat_feat[i])
        new_cat_embed_feat.append(new_cat_feat[i])

In [None]:
infdrug_df[new_cat_feat].head()

In [None]:
for i in range(len(new_cat_embed_feat)):
    feature = new_cat_embed_feat[i]
    # Prepare for embedding, i.e. enumerate categories
    infdrug_df[feature], cat_embed_feat_enum[feature] = utils.enum_categorical_feature(infdrug_df, feature)

In [None]:
infdrug_df[new_cat_feat].head()

In [None]:
cat_embed_feat_enum

In [None]:
infdrug_df[new_cat_feat].dtypes

In [None]:
infdrug_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
infdrug_df = client.persist(infdrug_df)

In [None]:
infdrug_df.visualize()

### Create the timestamp feature and sort

Create the timestamp (`ts`) feature:

In [None]:
infdrug_df['ts'] = infdrug_df['infusionoffset']
infdrug_df = infdrug_df.drop('infusionoffset', axis=1)
infdrug_df.head()

Standardize drug names:

In [None]:
infdrug_df = utils.clean_naming(infdrug_df, 'drugname')
infdrug_df.head()

Remove duplicate rows:

In [None]:
len(infdrug_df)

In [None]:
infdrug_df = infdrug_df.drop_duplicates()
infdrug_df.head()

In [None]:
len(infdrug_df)

In [None]:
infdrug_df = infdrug_df.repartition(npartitions=30)

Sort by `ts` so as to be easier to merge with other dataframes later:

In [None]:
infdrug_df = infdrug_df.set_index('ts')
infdrug_df.head(6)

In [None]:
infdrug_df.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
infdrug_df = client.persist(infdrug_df)

In [None]:
infdrug_df.visualize()

Check for possible multiple rows with the same unit stay ID and timestamp:

In [None]:
infdrug_df.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='drugname').head()

In [None]:
infdrug_df[infdrug_df.patientunitstayid == 1785711].compute().head(20)

We can see that there are up to 17 categories per set of `patientunitstayid` and `ts`. As such, we must join them. But first, as we shouldn't mix absolute values of drug rates from different drugs, we better normalize it first.

### Normalize data

In [None]:
infdrug_df_norm = utils.normalize_data(infdrug_df, 
                                       columns_to_normalize=['patientweight'],
                                       columns_to_normalize_cat=[('drugname', 'drugrate')])
infdrug_df_norm.head()

In [None]:
infdrug_df_norm.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
infdrug_df_norm = client.persist(infdrug_df_norm)

In [None]:
infdrug_df_norm.visualize()

In [None]:
infdrug_df_norm.patientweight.value_counts().compute()

### Join rows that have the same IDs

In [None]:
infdrug_df_norm = utils.join_categorical_enum(infdrug_df_norm, new_cat_embed_feat)
infdrug_df_norm.head()

In [None]:
infdrug_df_norm.dtypes

In [None]:
infdrug_df_norm.reset_index().groupby(['patientunitstayid', 'ts']).count().nlargest(columns='drugname').head()

In [None]:
infdrug_df_norm[infdrug_df_norm.patientunitstayid == 1785711].compute().head(20)

Comparing the output from the two previous cells with what we had before the `join_categorical_enum` method, we can see that all rows with duplicate IDs have been successfully joined.

In [None]:
infdrug_df_norm.visualize()

In [None]:
# Save current dataframe in memory to avoid accumulating several operations on the dask graph
infdrug_df_norm = client.persist(infdrug_df_norm)

In [None]:
infdrug_df_norm.visualize()

### Renaming columns

In [None]:
infdrug_df = infdrug_df.rename(columns={'patientweight': 'weight', 'drugname': 'infusion_drugname',
                                        'drugrate': 'infusion_drugrate'})
infdrug_df.head()

In [None]:
infdrug_df_norm = infdrug_df_norm.rename(columns={'patientweight': 'weight', 'drugname': 'infusion_drugname',
                                                  'drugrate': 'infusion_drugrate'})
infdrug_df_norm.head()

### Save the dataframe

Save the dataframe before normalizing:

In [None]:
infdrug_df.to_parquet(f'{data_path}cleaned/unnormalized/infusionDrug.parquet')

Save the dataframe after normalizing:

In [None]:
infdrug_df_norm.to_parquet(f'{data_path}cleaned/normalized/infusionDrug.parquet')

Confirm that everything is ok through the `describe` method:

In [None]:
infdrug_df_norm.describe().compute().transpose()

### Join dataframes

Merge dataframes by the unit stay, `patientunitstayid`, and the timestamp, `ts`, with a tolerence for a difference of up to 30 minutes.

In [None]:
infdrug_df = dd.read_parquet(f'{data_path}cleaned/normalized/infusionDrug.parquet')
infdrug_df.head()

In [None]:
eICU_df = dd.merge_asof(eICU_df, infdrug_df, on='ts', by='patientunitstayid', direction='nearest', tolerance=30)
eICU_df.head()