<a href="https://colab.research.google.com/github/Enrico-Call/RL-AKI/blob/main/Data%20Aggregation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [22]:
#sets the project id
PROJECT_ID = "rl-aki" #@param {type:"string"}

In [23]:
import os
from google.colab import auth
from IPython.display import display
from google.colab import drive
import os
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
import numpy as np
import warnings
warnings.filterwarnings('ignore')
pd.set_option('use_inf_as_na', True)

drive.mount('/content/drive', force_remount=True)
os.chdir('/content/drive/MyDrive/MLRFH')
 
#sets dateset
PROJECT_ID = 'mlrh-330919'
DATASET_ID = 'version1_0_2'
LOCATION = 'eu'
 
#all libraries check this environment variable, so set it:
os.environ["GOOGLE_CLOUD_PROJECT"] = PROJECT_ID
 
auth.authenticate_user()
print('Authenticated')

Mounted at /content/drive
Authenticated


In [24]:
#Some preprocessing functions 

def to_cols(data):

  grouped = data.pivot_table(index=['admissionid', 'time'], 
          columns=['item'], values='value')

  return grouped
  

def to_cols_action(data):

  grouped = data.pivot_table(index=['admissionid', 'time'], 
            columns=['item'], values='administered')

  return grouped

def remove_outliers(data):
  #delete outliers
  data = data.reset_index() #return to single index

  #select outlier cols
  all_cols = ['Kalium (bloed)', 'ABP gemiddeld', 'Kreatinine (bloed)', 'Natrium (bloed)', 'UrineCAD', 'UrineSupraPubis', 'UrineSpontaan', 'UrineUP', 'Kreatinine', 'Nefrodrain re Uit', 'Nefrodrain li Uit', 'UrineIncontinentie']
  
  data['ABP gemiddeld'][(data['ABP gemiddeld'] < 30.) & (data['ABP gemiddeld'] > 165.)] = np.nan
  data['Natrium (bloed)'][(data['Natrium (bloed)'] < 65.) & (data['Natrium (bloed)'] > 165.)] = np.nan
  data['Niet invasieve bloeddruk gemiddeld'][(data['Niet invasieve bloeddruk gemiddeld'] < 30) & (data['Niet invasieve bloeddruk gemiddeld'] > 165)]

  #make nans of all negative vals
  data[all_cols] = data[all_cols].applymap(lambda x: np.nan if x < 0 else x)
  return data


def remove_outliers_action(data):

  #delete outliers
  outliers = data.reset_index() #return to single index

  cols = ['Noradrenaline (Norepinefrine)', 'NaCl 0,45%/Glucose 2,5%']

  #select outlier cols
  data['Noradrenaline (Norepinefrine)'][data['Noradrenaline (Norepinefrine)'] > 10.] = np.nan
  data['NaCl 0,45%/Glucose 2,5%'][data['NaCl 0,45%/Glucose 2,5%'] > 500.] = np.nan

  data = data[cols].applymap(lambda x: np.nan if x < 0 else x)

  return data


def get_4h(data):
  #per patient, average the values in 4h timeslots

  data = data.sort_values('time')
  res = data.groupby([pd.Grouper('admissionid'),
                        pd.Grouper(key ='time', freq='4H')
                              
  ]).mean()

  return res

def get_4h_urine(data):
  #per patient, average the values in 4h timeslots

  data = data.sort_values('time')
  res = data.groupby([pd.Grouper('admissionid'),
                        pd.Grouper(key ='time', freq='4H')
                              
  ]).sum()

  return res

def aggregate_col(data, colname):

  #create new columns with cumulative count for consecutive nans
  data['nancount'] = np.zeros(len(data))
  data['nancount'] = data[colname].groupby((data[colname].notnull()).cumsum()).cumcount()
                           
  #manually set first row to 1 if nan since this is excluded in cumsum/count from line before
  for i, v in enumerate(data[colname]):
    if i == 0:
      if np.isnan(v):
        data["nancount"] += 1
  
  #and all other ones +1, except non-nan values
  data["nancount"][data[colname] == np.nan] += 1

  #set non-null values as 0 for nancount
  data["nancount"] = np.where(~data[colname].isnull(), 0, data["nancount"])

  #if value is not nan, then use previous value + 1 to get total cumulative nan count including the non-nan value
  #this is the value we want to divide through to get the right value per hour

  group_val = pd.DataFrame(data[colname]).reset_index()[colname]
  group_count = pd.DataFrame(data["nancount"]).reset_index()["nancount"]

  for i, v in enumerate(group_count):

    if v == 0: #where no null values
 
      if (i != 0): #first row: do nothing
        if group_val[i] != 0: #value 0: do not divide
          if group_count[i-1] != 0: #value before is not missing: do nothing
          
            group_val[i] = group_val[i] / (group_count[i-1]) #otherwise: divide through nancount of row before
  

  #and then fill backwards and return
  return group_val.bfill()

def sum_urine(data):
  # urine_cols = ['UrineCAD']
  urine_cols = ['UrineCAD', 'UrineSupraPubis', 'UrineUP', 'UrineSpontaan', 'UrineIncontinentie', 'UrineSplint Re', 'UrineSplint Li']
  data['Urine_summed'] = data[urine_cols].sum(axis=1)
  data['Urine_summed'] = np.where(data['Urine_summed'] == 0, np.nan, data['Urine_summed'])
  return data.drop(columns=urine_cols)

def aggregate_all_cols(data, space):

  if space == 'state':

    cols_to_agg = ['time', 'admissionid', 'Kreatinine', 'Kreatinine (bloed)', 'KREAT enzym. (bloed)',
       'Nefrodrain re Uit', 'Nefrodrain li Uit', 'Chloor (bloed)', 'Natrium (bloed)',
       'Kalium (bloed)', 'HCO3', 'Natrium', 'Natrium Astrup',
       'Kalium Astrup', 'Chloor Astrup', 'Chloor', 'Kalium',
       'Act.HCO3 (bloed)', 'Na (onv.ISE) (bloed)', 'K (onv.ISE) (bloed)',
       'Cl (onv.ISE) (bloed)', 'Niet invasieve bloeddruk gemiddeld',
       'ABP gemiddeld II', 'ABP gemiddeld']

    #group urine (sum)
    grouped = data.groupby('admissionid', as_index = False).apply(lambda x: aggregate_col(x, 'Urine_summed')).reset_index()['Urine_summed']
    data['Urine'] = list(grouped.head(len(grouped)))
    data = pd.DataFrame(data).reset_index()
    urine_aggr = get_4h_urine(data[['admissionid', 'time', 'Urine']])

    #group other variables (mean)
    data[cols_to_agg] = data[cols_to_agg].bfill()
    df_aggr = get_4h(data[cols_to_agg])

    #combine both aggregations
    combined = pd.concat([urine_aggr, df_aggr], axis=1)

    return combined

  if space == 'action':

    data = data.reset_index()
    cols_to_agg = ['time', 'admissionid', 'Dobutamine (Dobutrex)',
                   'Adrenaline (Epinefrine)', 'Dopamine (Inotropin)',
                   'Noradrenaline (Norepinefrine)', 'NaCl 0,45%/Glucose 2,5%']
    data[cols_to_agg] = data[cols_to_agg].bfill()
    df_aggr = get_4h(data[cols_to_agg])

    return df_aggr

  else:

    print("ERROR INVALID SPACE TYPE: options for space: state, action")


def interpolate(data_agg):
  #interpolate null values
  return data_agg.interpolate(limit_direction='forward')


def process_statespace(data):
  data['time'] = pd.to_datetime(data['time'], unit='m', origin = 'unix')
  grouped = to_cols(data)
  grouped = remove_outliers(grouped)
  data_sum = sum_urine(grouped)
  data_agg = aggregate_all_cols(data_sum, space="state")
  #data_filled = interpolate(data_agg)
  return data_agg.reset_index()

  
def process_actionspace(data):
  # data['time'] = pd.to_datetime(data['stop'] - data['start'], unit='ms')
  # data = data.drop(columns = ['start', 'stop'])
  # data['time'] = pd.to_datetime(data['time'], unit='ms', origin = 'unix')
  # grouped = to_cols_action(data)
  # #grouped = remove_outliers_action(grouped)
  # data_agg = aggregate_all_cols(grouped, space="action")
  # #data_filled = interpolate(data_agg)

  # Extract Fluids and Vasopressors
  fluids = data.loc[~data['itemid'].isin([7179,7178,6818,7229])]
  vasop = data.loc[data['itemid'].isin([7179,7178,6818,7229])]
  
  # Perform Aggregation
  df_aggr_fluids = transform_df(data=transform_daterange(fluids[['admissionid',
                                                                 'fluidin',
                                                                 'start_time',
                                                                 'stop_time']].sort_values(['admissionid', 'start_time']).copy(),
                                                     time_col = 'stop_time',
                                                     infer_start_time=False,
                                                     multi_source=False,
                                                     start_time = 'start_time',
                                                     end_time = 'stop_time',
                                                     value_col = 'fluidin',
                                                     group_col = ['admissionid']),
                                 time_col='stop_time',
                                 bins=range(0, 76*60, 4*60),
                                 bin_labels=range(0, 72*60, 4*60),
                                 group_cols=['admissionid', 'binn'],
                                 agg_func={'fluid_sum': ('prod_fill', 'sum')})
  df_aggr_vasops = transform_df(data=transform_daterange(vasop[['admissionid',
                                                                 'fluidin',
                                                                 'start_time',
                                                                 'stop_time']].sort_values(['admissionid', 'start_time']).copy(),
                                                     time_col = 'stop_time',
                                                     infer_start_time=False,
                                                     multi_source=False,
                                                     start_time = 'start_time',
                                                     end_time = 'stop_time',
                                                     value_col = 'fluidin',
                                                     group_col = ['admissionid']),
                                 time_col='stop_time',
                                 bins=range(0, 76*60, 4*60),
                                 bin_labels=range(0, 72*60, 4*60),
                                 group_cols=['admissionid', 'binn'],
                                 agg_func={'vasops_sum': ('prod_fill', 'sum')})
    
  df_aggr_fluids['fluid_sum'] = df_aggr_fluids['fluid_sum'].fillna(0)
  df_aggr_vasops['vasops_sum'] = df_aggr_vasops['vasops_sum'].fillna(0)

  return pd.merge(df_aggr_fluids, df_aggr_vasops, how='outer', on=['admissionid', 'binn'])

def transform_df(data: pd.DataFrame = None,
                 time_col: str = 'time',
                 bins: list = None,
                 bin_labels: list = None,
                 group_cols: list = ['admissionid', 'binn'],
                 agg_func: dict = None):
    """
    Transforms the input data from the AmsterdamUMCdb and return a dataframe with bins assigned to each record based on the time column
    :param data: dataframe with single timestamps as integers, patientid and values
    :param bins: list of bins to divide the timestamps in
    :param bin_labels: list of labels to name the bins with
    :param group_cols: list of column to group by, including the newly created 'binn'
    :param agg_func: dictionary of kwargs passed to the .agg() method
    """
    
    data['binn'] = pd.cut(data[time_col], bins=bins, labels=bin_labels)
    data = data[data[time_col]>=0]
    grouped_data = data.groupby(group_cols).agg(**agg_func).reset_index().sort_values(by=group_cols, ascending=True)
    
    return grouped_data


def transform_daterange(data: pd.DataFrame,
                        time_col: str = 'time',
                        infer_start_time: bool = True,
                        multi_source: bool = False,
                        multi_source_col: str = None,
                        start_time: str = 'start_time',
                        end_time: str = 'end_time',
                        time_unit: str = 'm',
                        value_col: str = 'value',
                        group_col: list = None,
                        fill_method: str = 'backfill',
                        fill_lim: int = 540
                        ):
    """
    Transform interval data with single timestamps to time range, calculate production, resample and backward fill
    :param data: dataframe with id, value and timestamp
    :param time_col: string representing the column name for the time of registration in a single timestamp dataframe
    :param infer_start_time: boolean representing whether the start time should be inferred from the previous record
    :param start_time: string representing the column name with the record start time
    :param end_time: string representing the column name with the record end time
    :param time_unit: interpret the integer timestamp as the given time unit and convert back to this unit at the end
    :param value_col: string representing the column name with the values of the measurements
    :param group_col: list representing the ids of patients and/or products
    :param fill_method: string to represent the method as used in pandas.series.fillna
    :param fill_lim: integer to represent the number of time units to be filled
    """
    
    if group_col is None:
        group_col = ['admissionid'] # PM: defining a list as default will keep alterations when rerunning the function
    
    # convert to datetime and set index to time column
    data[time_col] = pd.to_datetime(data[time_col], unit=time_unit)
    data[start_time] = pd.to_datetime(data[start_time], unit=time_unit)
    
    if infer_start_time:
        # get start time from previous record
        data['start_time'] = data.groupby(group_col)[time_col].shift(1)
        start_time = 'start_time'
        end_time = time_col
    else:
        # transform other columns to datetime if they exist and are still integer type, otherwise leave as is
        for t_col in [start_time, end_time]:
            if t_col in data:
                if pd.api.types.is_integer_dtype(data[start_time]):
                    data[t_col] = pd.to_datetime(data[t_col], unit=time_unit)
    
    # get time difference from start and end times   
    data['time_diff'] = (data[end_time] - data[start_time]) / np.timedelta64(1, time_unit)

    if multi_source:
        if multi_source_col is None:
            # give each record a unique id to group by in order to handle simultaneous records
            data['administrationid'] = range(data.shape[0])
            group_col += ['administrationid']
        else:
            group_col += [multi_source_col]
    
    # get production per time unit
    data['prod'] = data[value_col] / data['time_diff']
    
    # if start and end time are registered in the same record, create a new record with the other value as index
    if infer_start_time:
        data_merged = data.copy()
        data_merged.index = data_merged[time_col]
    else:
        data_end = data.copy()
        data.index = data.start_time
        data_end.index = data_end.stop_time
        data_merged = pd.concat([data, data_end]).sort_values(group_col + [start_time, end_time])
    
    # resample for each unit
    res = data_merged[group_col + ['prod', start_time, end_time]].groupby(group_col).resample('1T').mean().drop(group_col, axis=1, errors='ignore').reset_index().copy()
    
    # fill missing values
    res['prod_fill'] = res.groupby(group_col)['prod'].fillna(method=fill_method, limit=fill_lim) #9 hours
    
    # reset time column to integer values
    transform_time_col = {'s': 1, 'm': 60, 'h': 3600, 'd': 86_400}
    if infer_start_time:
        res[time_col] = (res[time_col].view(np.int64) / (transform_time_col.get(time_unit) * 1_000_000_000)).astype(int)
    else:
        if multi_source:
            level_col = 'level_2'
        else:
            level_col = 'level_1'
        res[time_col] = (res[level_col].view(np.int64) / (transform_time_col.get(time_unit) * 1_000_000_000)).astype(int)
    
    return res

In [25]:
statespace = pd.read_csv('final_state_space.csv')
actionspace = pd.read_csv('final_action_space.csv')

In [26]:
state = process_statespace(statespace)

In [27]:
action = process_actionspace(actionspace)

In [31]:
state.head(20)

item,admissionid,time,Urine,Kreatinine,Kreatinine (bloed),KREAT enzym. (bloed),Nefrodrain re Uit,Nefrodrain li Uit,Chloor (bloed),Natrium (bloed),...,Chloor Astrup,Chloor,Kalium,Act.HCO3 (bloed),Na (onv.ISE) (bloed),K (onv.ISE) (bloed),Cl (onv.ISE) (bloed),Niet invasieve bloeddruk gemiddeld,ABP gemiddeld II,ABP gemiddeld
0,11,1970-01-01 00:00:00,10.666667,439.0,342.2,331.0,15.0,0.0,107.2,138.4,...,1.0,102.0,3.8,17.779999,133.4,4.16,114.0,70.0,61.0,64.0
1,11,1970-01-01 04:00:00,10.666667,439.0,337.0,331.0,15.0,0.0,108.4,140.0,...,1.0,102.0,3.8,14.58,134.8,4.6,114.0,70.0,61.0,72.4
2,11,1970-01-01 08:00:00,10.666667,439.0,316.0,331.0,15.0,0.0,107.0,140.6,...,1.0,102.0,3.8,17.2,134.6,4.56,114.0,70.0,61.0,71.2
3,11,1970-01-01 12:00:00,9.633333,439.0,302.0,331.0,15.0,0.0,108.25,141.0,...,1.0,102.0,3.8,15.7,133.0,4.775,114.0,70.0,61.0,66.75
4,11,1970-01-01 16:00:00,15.5,439.0,302.0,331.0,15.0,0.0,105.6,141.0,...,1.0,102.0,3.8,18.820001,134.0,4.52,114.0,70.0,61.0,63.8
5,11,1970-01-01 20:00:00,2.0,439.0,286.0,331.0,15.0,0.0,106.5,144.0,...,1.0,102.0,3.8,20.25,134.75,4.575,114.0,70.0,61.0,61.5
6,11,1970-01-02 00:00:00,0.0,439.0,278.714286,331.0,15.0,0.0,108.428571,144.0,...,1.0,102.0,3.8,17.657143,134.0,4.485714,114.0,68.0,61.0,62.714286
7,11,1970-01-02 04:00:00,0.0,439.0,269.0,331.0,15.0,0.0,109.0,144.0,...,1.0,102.0,3.8,16.066667,134.166667,4.316667,114.0,72.5,61.0,73.5
8,11,1970-01-02 08:00:00,0.0,439.0,159.833333,331.0,15.0,0.0,110.0,134.833333,...,1.0,102.0,3.8,13.816667,135.166667,5.0,114.0,81.0,61.0,62.0
9,20,1970-01-01 00:00:00,75.0,439.0,138.0,331.0,15.0,0.0,90.0,133.0,...,1.0,102.0,3.8,24.62,130.2,3.36,114.0,42.6,61.0,66.8


In [32]:
action.head(20)

Unnamed: 0,admissionid,binn,fluid_sum,vasops_sum
0,11,0,1564.11441,127.987797
1,11,240,1081.298935,189.248434
2,11,480,564.661661,198.434211
3,11,720,4650.132862,155.536134
4,11,960,1158.447309,160.0
5,11,1200,563.910927,160.0
6,11,1440,6264.947415,156.1626
7,11,1680,1752.301233,169.029761
8,11,1920,1660.789216,161.846528
9,11,2160,38.890177,16.991977
