# Perform Chunking
Based on the visual analysis, we derived two possible chunking options:
* Chunk after 60 min difference to previous timestamp
* Chunk after 120 min difference to previous timestamp

After the discussion with the teaching team, we decided to **chunk after 65 min difference to the previous timestamp** with the possibility to adapt that in future

## Add Timestamp of previous measurement and difference between timestamps to dataset

In [None]:
import pandas as pd
import pyarrow as pa

# Read chartevents_subset from parquet file to pandas data frame
chartevents_subset = pd.read_parquet('./data/chartevents_clean.parquet', engine='pyarrow')
unique_icu_stays = pd.read_parquet('./data/unique_icustays_in_chartevents_subset.parquet', engine='pyarrow')
#unique_icu_stays = chartevents_subset['ICUSTAY_ID'].value_counts()

In [None]:
#select ICU_stay ids for analysis
icustayid_filter = unique_icu_stays.ICUSTAY_ID

# Filter by ICU_stay
chunk_analysis_data = chartevents_subset[chartevents_subset.ICUSTAY_ID.isin(icustayid_filter)].copy()

In [None]:
# Sampling Rate Analysis is only being conducted on the values, not thresholds
# Filter for item ids that refer to value
itemids_for_values_filter = [220045, 220179, 220277]
chunk_analysis_data = chunk_analysis_data[chunk_analysis_data.ITEMID.isin(itemids_for_values_filter)].copy()
len(chunk_analysis_data)

In [None]:
# Idea: Keep chunk_analysis_data as is, only add a new column that holds the previous timestamp, the difference can then be performed outside the loop
# Prerequisite: Sorted Data by ICUSTAY_ID,ITEMID,CHARTTIME
chunk_analysis_data['CHARTTIME_PREV'] = chunk_analysis_data.groupby(['ICUSTAY_ID','ITEMID'])['CHARTTIME'].shift(1)

In [None]:
# Calculate difference between timestamps
chunk_analysis_data['DIF_CHARTTIME_PREV'] = chunk_analysis_data['CHARTTIME']-chunk_analysis_data['CHARTTIME_PREV']
chunk_analysis_data['DIF_CHARTTIME_PREV_S'] = chunk_analysis_data['DIF_CHARTTIME_PREV'].dt.total_seconds()
chunk_analysis_data['DIF_CHARTTIME_PREV_MIN'] = divmod(chunk_analysis_data['DIF_CHARTTIME_PREV_S'], 60)[0]

In [None]:
chunk_analysis_data.head()

## Apply Chunking Rule


In [None]:
chunking_dif = 65

In [None]:
# reduce data to relevant columns to make validation easier
#chunk_analysis_data = chunk_analysis_data[['ICUSTAY_ID','ITEMID','CHARTTIME','VALUENUM','VALUEUOM','CHARTTIME_PREV','DIF_CHARTTIME_PREV_MIN']]

In [None]:
# select all rows where dif to prev measurement is >chunking dif
chunk_data = chunk_analysis_data[chunk_analysis_data["DIF_CHARTTIME_PREV_MIN"] > chunking_dif]

In [None]:
# assign a unique chunking ID to these rows
chunk_data["CHUNK_ID"] = chunk_data.ICUSTAY_ID.map(str) + "_" + chunk_data.ITEMID.map(str) + "_" + chunk_data.CHARTTIME.map(str)

In [None]:
# check uniqueness - can only be violated if multiple measurements for that itemid/icustayid occured at the same charttime
print(len(chunk_data["CHUNK_ID"].value_counts()))
print(len(chunk_data))
# uiqueness for this data set is given

In [None]:
# only keep chunkid and index
chunk_data_subset = chunk_data["CHUNK_ID"]

In [None]:
#merge back to all rows via index
#no we have a data set that has a chunk_id at the beginning of each measurement that was conducted later than the chunking rule allows
chunk_data_merged = pd.merge(chunk_analysis_data, chunk_data_subset,  how='left', left_index=True, right_index=True )

In [None]:
# change sorting structure -  turn ITEMID and CHARTTIME around
chunk_data_merged = chunk_data_merged.sort_values(by=['ICUSTAY_ID', 'ITEMID','CHARTTIME'])

In [None]:
# Assign Chunk ID to first measurement of   ICUSTAY_ID/TEMID in case it does not already exist
# Calculate min timestamp
chunk_data_min = chunk_data_merged.groupby(['ICUSTAY_ID','ITEMID'])['CHARTTIME'].min()
chunk_data_min_df = chunk_data_min.to_frame()
chunk_data_min_df.reset_index(inplace=True)

# for each first charttime (by ICUSTAYID/ITEEMID) create a chunk ID
chunk_data_min_df["CHUNK_ID_MIN"] = chunk_data_min_df.ICUSTAY_ID.map(str) + "_" + chunk_data_min_df.ITEMID.map(str) + "_" + chunk_data_min_df.CHARTTIME.map(str)

In [None]:
# merge that back so we have a chunk id for each first Measurement (by ICUSTAYID/TEMID)
chunk_data_merged_2 = pd.merge(chunk_data_merged, chunk_data_min_df,  how='left', on=['ICUSTAY_ID','ITEMID','CHARTTIME'])

In [None]:
import numpy as np
# if chunkIdMin not Nan,write chunk_id_min in chunk_id
# #no we have a data set that has a chunk_id at the beginning of each measurement that was conducted later than the chunking rule allows as well as an initial chunk id
chunk_data_merged_2['CHUNK_ID'] = np.where(chunk_data_merged_2['CHUNK_ID_MIN'].notnull(), chunk_data_merged_2['CHUNK_ID_MIN'], chunk_data_merged_2['CHUNK_ID'])

In [None]:
chunk_data_merged_2 = chunk_data_merged_2.drop(columns='CHUNK_ID_MIN')

In [None]:
# fill all cells with previous chunk id, until new chunk idea occurs
#pre-requisite: data is sorted by ICUSTAY_ID & ITEMID
chunk_data_merged_2['CHUNK_ID_FILLED'] = chunk_data_merged_2['CHUNK_ID'].fillna(method='ffill')

In [None]:
# remove columns that are obsolete now - only kept for validation purpose in previous steps
#chunk_data_merged_2 = chunk_data_merged_2.drop(columns='CHUNK_ID')
#chunk_data_merged_2.rename(columns={"CHUNK_ID_FILLED":"CHUNK_ID"})

In [None]:
chunk_data_merged_2 = chunk_data_merged_2.drop(columns='DIF_CHARTTIME_PREV')

In [None]:
chunk_data_merged_2.head()

In [None]:
# Save as parquet file
pd.DataFrame(chunk_data_merged_2).to_parquet('./data/chartevents_clean_values_with_chunkid_' + str(chunking_dif) + '.parquet', engine='pyarrow')

## Bring Threshold Values back in

In [None]:
import pandas as pd
import pyarrow as pa

# Read chartevents_subset from parquet file to pandas data frame
chartevents_subset = pd.read_parquet('./data/chartevents_clean.parquet', engine='pyarrow')
chunk_data = pd.read_parquet('./data/chartevents_clean_values_with_chunkid_65.parquet', engine='pyarrow')
unique_icu_stays = pd.read_parquet('./data/unique_icustays_in_chartevents_subset.parquet', engine='pyarrow')

In [None]:
#select ICU_stay ids for analysis
icustayid_filter = unique_icu_stays.ICUSTAY_ID

# Filter by ICU_stay
chartevents_subset = chartevents_subset[chartevents_subset.ICUSTAY_ID.isin(icustayid_filter)]

In [None]:
#match columns for later union
import numpy as np
chunk_data = chunk_data.drop(columns=['CHARTTIME_PREV','DIF_CHARTTIME_PREV_S','DIF_CHARTTIME_PREV_MIN','CHUNK_ID'])
chartevents_subset.insert(loc=len(chartevents_subset.columns), column='CHUNK_ID_FILLED', value=np.nan)

In [None]:
chartevents_subset.head()


In [None]:
chunk_data.head()

In [None]:
#respective ITEMIDs
itemids_for_thresholds_HR = [220046, 220047]
itemids_for_thresholds_NBP = [223751, 223752]
itemids_for_thresholds_O2 = [223769, 223770]
itemids_for_value_HR = [220045]
itemids_for_value_NBP = [220179]
itemids_for_value_O2 = [220277]


In [None]:
#threshold data
threshold_data_HR = chartevents_subset[chartevents_subset.ITEMID.isin(itemids_for_thresholds_HR)].copy()
threshold_data_NBP = chartevents_subset[chartevents_subset.ITEMID.isin(itemids_for_thresholds_NBP)].copy()
threshold_data_O2 = chartevents_subset[chartevents_subset.ITEMID.isin(itemids_for_thresholds_O2)].copy()


In [None]:
#value data
value_chunk_data_HR = chunk_data[chunk_data.ITEMID.isin(itemids_for_value_HR)].copy()
value_chunk_data_NBP = chunk_data[chunk_data.ITEMID.isin(itemids_for_value_NBP)].copy()
value_chunk_data_O2 = chunk_data[chunk_data.ITEMID.isin(itemids_for_value_O2)].copy()


In [None]:
#union threshold and value
threshold_and_value_HR = threshold_data_HR.append(value_chunk_data_HR)
threshold_and_value_NBP = threshold_data_NBP.append(value_chunk_data_NBP)
threshold_and_value_O2 = threshold_data_O2.append(value_chunk_data_O2)

In [None]:
#sort bei icusstay & charttime
threshold_and_value_HR = threshold_and_value_HR.sort_values(by=['ICUSTAY_ID','CHARTTIME'])
threshold_and_value_NBP = threshold_and_value_NBP.sort_values(by=['ICUSTAY_ID','CHARTTIME'])
threshold_and_value_O2 = threshold_and_value_O2.sort_values(by=['ICUSTAY_ID','CHARTTIME'])

In [None]:
# now we can use ffill to fill the correspoding chunk IDs in
# Problem: Cases where the threshold was first set before a value appeared - need to make sure to not write chunk id of different icustay in that
#Solution: group by ICUSTAYID, first forward fill to fill threshold chunks where value was there before threshold, then backward fill to fill cases where threshold existed before value
#For HR
threshold_and_value_HR["CHUNK_ID_FILLED_TH"] = threshold_and_value_HR.groupby('ICUSTAY_ID')['CHUNK_ID_FILLED'].transform(lambda x: x.ffill())
threshold_and_value_HR["CHUNK_ID_FILLED_TH"] = threshold_and_value_HR.groupby('ICUSTAY_ID')['CHUNK_ID_FILLED_TH'].transform(lambda x: x.bfill())

#For NBP
threshold_and_value_NBP["CHUNK_ID_FILLED_TH"] = threshold_and_value_NBP.groupby('ICUSTAY_ID')['CHUNK_ID_FILLED'].transform(lambda x: x.ffill())
threshold_and_value_NBP["CHUNK_ID_FILLED_TH"] = threshold_and_value_NBP.groupby('ICUSTAY_ID')['CHUNK_ID_FILLED_TH'].transform(lambda x: x.bfill())

#For O2
threshold_and_value_O2["CHUNK_ID_FILLED_TH"] = threshold_and_value_O2.groupby('ICUSTAY_ID')['CHUNK_ID_FILLED'].transform(lambda x: x.ffill())
threshold_and_value_O2["CHUNK_ID_FILLED_TH"] = threshold_and_value_O2.groupby('ICUSTAY_ID')['CHUNK_ID_FILLED_TH'].transform(lambda x: x.bfill())

In [None]:
# Quick Validation
threshold_and_value_HR.isna().sum()
threshold_and_value_HR[threshold_and_value_HR['CHUNK_ID_FILLED_TH'].isna()]
#we have some ICUSTAY_IDs where only thresholds are set but we have no values -> no chunkIDs

In [None]:

threshold_and_value_all = pd.concat([threshold_and_value_HR, threshold_and_value_NBP, threshold_and_value_O2])

In [None]:

# Sort the rows 
threshold_and_value_all = threshold_and_value_all.sort_values(by=['ICUSTAY_ID', 'CHARTTIME','ITEMID'])

In [None]:
pd.DataFrame(threshold_and_value_all).to_parquet('./data/chartevents_clean_values_and_thresholds_with_chunkid_' + str(chunking_dif) + '.parquet', engine='pyarrow')