Skip to content

Commit

Permalink
v0.4.1: eicu database converted to parquet before processing
Browse files Browse the repository at this point in the history
  • Loading branch information
USM-CHU-FGuyon committed May 3, 2024
1 parent d4759db commit f621248
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 122 deletions.
6 changes: 5 additions & 1 deletion 1_extract_eicu.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
It creates a set of .parquet files at the specified path
('eicu' in paths.json).
Approximate running time: 20min.
Approximate running time:
* raw_tables_to_parquet() 12min #only run once. csv.gz -> parquet with no data changes.
* gen_* : 7min
"""
from eicu_preprocessing.eicupreparator import eicuPreparator

Expand All @@ -24,6 +26,8 @@
aperiodic_pth='vitalAperiodic.csv.gz',
intakeoutput_pth='intakeOutput.csv.gz')

eicu_prep.raw_tables_to_parquet()

eicu_prep.gen_labels()
eicu_prep.gen_flat()
eicu_prep.gen_medication()
Expand Down
4 changes: 2 additions & 2 deletions 2_eicu.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
tsp = eicuTSP(
lab_pth='lab.parquet',
resp_pth='tsresp.parquet',
nurse_pth='tsnurse/',
nurse_pth='tsnurse.parquet',
aperiodic_pth='tsaperiodic.parquet',
periodic_pth='tsperiodic/',
periodic_pth='tsperiodic.parquet',
inout_pth='tsintakeoutput.parquet')

tsp.run(reset_dir=False)
Expand Down
28 changes: 26 additions & 2 deletions database_processing/datapreparator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from pathlib import Path

import polars as pl
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

from database_processing.dataprocessor import DataProcessor

Expand All @@ -14,6 +17,13 @@ def __init__(self, dataset, col_stayid):
self.inch_to_cm = 2.54
self.lbs_to_kg = 0.454

@staticmethod
def _get_name_as_parquet(pth):
pth = Path(pth).name
extensions = "".join(Path(pth).suffixes)
pth = pth.removesuffix(extensions)
return pth + '.parquet'

def _clip_time(self, df, col_offset='resultoffset'):
idx = ((df[col_offset] < df[self.col_los])
& (df[col_offset] > -self.preadm_anteriority*24*3600))
Expand All @@ -36,8 +46,22 @@ def get_labels(self, lazy=False):
if self.labels is None:
raise ValueError('Run gen_labels first !')



@staticmethod
def write_as_parquet(pth_src, pth_tgt, astype_dic={}, chunksize=1e6):
print(f'Writing {pth_tgt}')
Path(pth_tgt).parent.mkdir(exist_ok=True)
df_chunks = pd.read_csv(pth_src, chunksize=chunksize)
for i, df in enumerate(df_chunks):
astype_dic = {k:v for k, v in astype_dic.items() if k in df.columns}
df = df.astype(astype_dic)
table = pa.Table.from_pandas(df)
if i == 0:
pqwriter = pq.ParquetWriter(pth_tgt, table.schema)
pqwriter.write_table(table)

if pqwriter:
pqwriter.close()
print(' -> Done')
def _to_seconds(self, df, col, unit='second'):
k = {'day': 86400,
'hour': 3600,
Expand Down
1 change: 1 addition & 0 deletions database_processing/dataprocessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self,
self.labels_pths = {d: self._preprocessed_pth(d, 'labels') for d in self.datasets}
self.diagnoses_pths = {d: self._preprocessed_pth(d, 'diagnoses') for d in self.datasets}
self.savepath = self.data_pth + self._datadir_name()
self.raw_as_parquet_pth = self.savepath + '/raw_parquet/'
self.aux_pth = self.pth_dic['auxillary_files']
self.voc_pth = self.pth_dic['vocabulary']
self.user_input_pth = self.pth_dic['user_input']
Expand Down
Loading

0 comments on commit f621248

Please sign in to comment.