# Split Chart Events by ICUSTAY ID

In [2]:
import pandas as pd
import numpy as np
from tqdm.notebook import trange, tqdm
import traceback
import os
import dask.dataframe as dd
from dask.distributed import Client

In [3]:
CHARTEVENTS_FILENAME = 'mimic-iii/CHARTEVENTS.csv'
READMISSION_FILENAME = 'data/readmission.csv'
SAMPLES_DIR = 'data/parquet/'
DATASTORE_FILENAME = 'samples.h5'
FEATHER_EXT = '.feather'
ROWS_TO_READ = 1000000
MAX_ROWS_CHARTEVENTS = 330712483

skip_rows = 0

In [4]:
chartevents_columns = ["ROW_ID","SUBJECT_ID","HADM_ID","ICUSTAY_ID","ITEMID","CHARTTIME","STORETIME","CGID","VALUE","VALUENUM","VALUEUOM","WARNING","ERROR","RESULTSTATUS","STOPPED"]


chartevents_dtype = {'ROW_ID':str,
'SUBJECT_ID':np.float64,
'HADM_ID':np.float64,
'ICUSTAY_ID':np.float64,
'ITEMID':np.float64,
'CHARTTIME':str,
'STORETIME':str,
'CGID':str,
'VALUE':str,
'VALUENUM':str,
'VALUEUOM':str,
'WARNING':str,
'ERROR':str,
'RESULTSTATUS':str,
'STOPPED':str}

In [5]:
readmission = pd.read_csv(READMISSION_FILENAME).sort_values(['SUBJECT_ID','HADM_ID','ICUSTAY_ID']).reset_index(drop=True)


In [6]:
icustay_list = readmission['ICUSTAY_ID'].unique().tolist()
icustay_list.sort()
division_index = np.array(range(icustay_list[0],icustay_list[-1],int(len(icustay_list)/300)))
division_index = np.append(division_index,icustay_list[-1]+1)
division_index = division_index.astype(str)
division_index = np.char.add(division_index,np.array(['_0']*len(division_index)).astype(str))
len(division_index)

619

In [7]:
client = Client()
client

2023-05-02 12:05:02,883 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-8mvmdjo0', purging
2023-05-02 12:05:02,883 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-r3a2bvol', purging
2023-05-02 12:05:02,883 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-bk00mfsp', purging
2023-05-02 12:05:02,884 - distributed.diskutils - INFO - Found stale lock file and directory '/tmp/dask-worker-space/worker-b7ooxnoj', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 16,Total memory: 62.57 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:46597,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 16
Started: Just now,Total memory: 62.57 GiB

0,1
Comm: tcp://127.0.0.1:42057,Total threads: 4
Dashboard: http://127.0.0.1:41667/status,Memory: 15.64 GiB
Nanny: tcp://127.0.0.1:39069,
Local directory: /tmp/dask-worker-space/worker-_vt2mwsk,Local directory: /tmp/dask-worker-space/worker-_vt2mwsk

0,1
Comm: tcp://127.0.0.1:45953,Total threads: 4
Dashboard: http://127.0.0.1:38441/status,Memory: 15.64 GiB
Nanny: tcp://127.0.0.1:40835,
Local directory: /tmp/dask-worker-space/worker-zj8iy6np,Local directory: /tmp/dask-worker-space/worker-zj8iy6np

0,1
Comm: tcp://127.0.0.1:34209,Total threads: 4
Dashboard: http://127.0.0.1:37709/status,Memory: 15.64 GiB
Nanny: tcp://127.0.0.1:34443,
Local directory: /tmp/dask-worker-space/worker-4qp_rgxm,Local directory: /tmp/dask-worker-space/worker-4qp_rgxm

0,1
Comm: tcp://127.0.0.1:41279,Total threads: 4
Dashboard: http://127.0.0.1:44587/status,Memory: 15.64 GiB
Nanny: tcp://127.0.0.1:43563,
Local directory: /tmp/dask-worker-space/worker-clwe42b3,Local directory: /tmp/dask-worker-space/worker-clwe42b3


In [8]:
df = dd.read_csv(CHARTEVENTS_FILENAME, dtype=chartevents_dtype ,assume_missing=True)

In [9]:
df = df.dropna(subset=['SUBJECT_ID','HADM_ID','ICUSTAY_ID','CHARTTIME']).drop('ROW_ID',axis=1)

In [10]:
df['SUBJECT_ID'] = df.map_partitions(lambda x: x['SUBJECT_ID'].astype(np.int64))
df['HADM_ID'] = df.map_partitions(lambda x: x['HADM_ID'].astype(np.int64))
df['ICUSTAY_ID'] = df.map_partitions(lambda x: x['ICUSTAY_ID'].astype(np.int64))
df['CHARTTIME'] = df.map_partitions(lambda x: pd.to_datetime(x['CHARTTIME'], errors='coerce'))# , format='%Y-%m-%d %H:%M:%S')

  df['CHARTTIME'] = df.map_partitions(lambda x: pd.to_datetime(x['CHARTTIME'], errors='coerce'))# , format='%Y-%m-%d %H:%M:%S')


In [12]:
df.info

<bound method DataFrame.info of Dask DataFrame Structure:
npartitions=551                                                                                                                                        
                     int64   int64      int64  float64  datetime64[ns]    object  object  object   object   object  object  object       object  object
                       ...     ...        ...      ...             ...       ...     ...     ...      ...      ...     ...     ...          ...     ...
...                    ...     ...        ...      ...             ...       ...     ...     ...      ...      ...     ...     ...          ...     ...
                       ...     ...        ...      ...             ...       ...     ...     ...      ...      ...     ...     ...          ...     ...
                       ...     ...        ...      ...             ...       ...     ...     ...      ...      ...     ...     ...          ...     ...
Dask Name: assign, 11 graph la

In [13]:
valid_chartevents = df.map_partitions(lambda df: dd.multi.merge(readmission[['SUBJECT_ID','HADM_ID','ICUSTAY_ID']],df,'inner',['SUBJECT_ID','HADM_ID','ICUSTAY_ID']))

In [14]:
# This worked
valid_chartevents.to_parquet(SAMPLES_DIR, engine="pyarrow", partition_on='ICUSTAY_ID', compute=True)