# Dask vs Pandas speed tests



Dask based on Tornado and first release was in 2015 (before asyncio)

https://stackoverflow.com/questions/39861685/does-dask-distributed-use-tornado-coroutines-for-workers-tasks


In [2]:
from IPython.core.display import HTML
HTML("""
<style>
    table {width: 50%;
          }
</style>
""")

Articles:
+ CSV to Parquet with Dask: <https://mungingdata.com/dask/read-csv-to-parquet/>
+ https://towardsdatascience.com/beyond-pandas-spark-dask-vaex-and-other-big-data-technologies-battling-head-to-head-a453a1f8cc13
+ https://towardsdatascience.com/make-your-data-processing-code-fly-in-5-minutes-c4998e6da094
+ https://stackoverflow.com/questions/47191675/pandas-write-dataframe-to-parquet-format-with-append

Issues:
+ https://stackoverflow.com/questions/60173358/distributed-worker-memory-use-is-high-but-worker-has-no-data-to-store-to-disk
+ https://github.com/dask/distributed/issues/4594

Results for my task:
+ no reindex for dask
+ No "Try using .loc[row_indexer,col_indexer] = value instead" warning in dask
+ no gzip input for CSV in dask
+ dask creates *a lot* of files with *huge* cumulative size
+ parquet writing a lot faster than csv
+ pyarrow is not working with dask (possible bug)
+ No append for parquet for both by default (possible with pandas and pyarrow, but there is a [bug](https://issues.apache.org/jira/browse/ARROW-12080))
+ in my case, pandas + pyarrow + snappy is the best choice

    
|      | from->to         | Compression | Time    | Files, n | Files, size |
| :--: | :--------------: | :---------: | :-----: | :------: | :---------: |
| dask | csv->csv         | none        | 35 min  | 554      | 56 GB       |
| dask | csv.gz->?        | NA          | NA      | NA       | NA          |
| pd   | csv->csv         | none        | 55 min  | 1        | 56 GB       |
| pd   | csv.gz->csv.gz   | gzip        | 55 min  | 1        | ~5 GB       |
|      |                  |             |         |          |             |
| dask | csv->pyarrow     | ?           | 11 min  | NA       | NA          |
| dask | csv->fastparquet | ?           | 14 min  | 554      | 58 GB       |
| pd   | csv->pyarrow     | gzip        | ~28 min | 33       | ~3.7 GB     |
| pd   | csv->pyarrow     | brotli      | ~33 min | 33       | ~3 GB       |
| pd   | csv->pyarrow     | snappy      | ~20 min | 33       | ~5 GB       |
| pd   | csv.gz->pyarrow  | snappy      | ~20 min | 33       | ~5 GB       |
| pd   | csv->fastparquet | none        | ~25 min | 33       | ~45 GB      |
| pd   | csv->fastparquet | snappy      | ~27 min | 33       | ~7 GB       |
| pd   | csv->fastparquet | gzip        | ~37 min | 33       | ~3.7 GB     |


In [None]:
import pandas as pd
from pathlib import Path
import dask.dataframe as dd
# from dask.diagnostics import ProgressBar  # single machine progressbar
# from dask.distributed import progress  # does it work?
from dask.distributed import Client  # has dashboard

In [None]:
data_path = './data/mimic-iii-clinical-database-1.4/'
output_path = './data/fhir_out/'

## Pandas

In [None]:
from pathlib import Path
import time
import gc


def transform_chartevents(data_path, output_path, chunksize=10**7):
    """ ~6GB RAM in peak consumption with default chunksize
    """
    # delete outputfile if exists
    output_filename = output_path+'observation_ce'
    Path(output_filename).unlink(missing_ok=True)
    
    d_items = pd.read_csv(data_path+'D_ITEMS.csv.gz', index_col=0,
                      # dropped 'ABBREVIATION', 'LINKSTO', 'CONCEPTID', 'UNITNAME'
                      usecols=['ROW_ID', 'ITEMID', 'LABEL', 'DBSOURCE', 'CATEGORY', 'PARAM_TYPE'],
                      dtype={'ROW_ID': int, 'ITEMID': int, 'LABEL': str, 'DBSOURCE': 'category',
                             'CATEGORY': 'category', 'PARAM_TYPE': str})
    
    # it is the biggest file ~4GB gzipped, 33GB unpacked, 330M strings
    # looks like CareVue and Metavision data should be processed separately
    chunk_container =  pd.read_csv(data_path+'CHARTEVENTS.csv.gz',
                                   # STORETIME
                                   usecols=['ROW_ID', 'SUBJECT_ID', 'HADM_ID', 'ICUSTAY_ID', 'ITEMID', 'CHARTTIME',
                                            'STORETIME', 'CGID', 'VALUE', 'VALUENUM', 'VALUEUOM', 'WARNING', 'ERROR',
                                            'RESULTSTATUS', 'STOPPED'],
                                   dtype={'ROW_ID': int, 'SUBJECT_ID': int, 'HADM_ID': int, 'ICUSTAY_ID': float,
                                          'ITEMID': int, 'CGID': float, 'VALUE': str, 'VALUENUM': float, 
                                          'VALUEUOM': str, 'WARNING': float, 'ERROR': float,
                                          'RESULTSTATUS': str, 'STOPPED': str},
                                   parse_dates=['CHARTTIME'],
                                   chunksize=chunksize)  # 2.67GB for 10**7
    
    for i, chartevents in enumerate(chunk_container):
        # Show progress (~330M strings)
        print(f'{i + 1}/{330*10**6 / chunksize}', flush=True, end =" ")
        start_time = time.time()

        observation_ce = pd.merge(chartevents, d_items, on='ITEMID')

        observation_ce['note'] = observation_ce['LABEL'].str.cat(observation_ce['DBSOURCE'], sep=' ', na_rep='NA')
        observation_ce['note'] = observation_ce['note'].str.cat(observation_ce['PARAM_TYPE'], sep=' ', na_rep='')

        observation_ce.loc[observation_ce['STOPPED'] == "D/C'd", 'RESULTSTATUS'] = 'discharged'
        observation_ce.loc[observation_ce['ERROR'] == 1, 'RESULTSTATUS'] = 'Error'
        
        # New columns to adapt to Chartevents observations
        observation_ce['category'] = 'chartevents'  # ????

        observation_ce.drop(['LABEL', 'PARAM_TYPE', 'ERROR', 'DBSOURCE', 'STOPPED'], axis=1, inplace=True)

        observation_ce.rename(columns={'ROW_ID':'identifier',
                                       'SUBJECT_ID':'subject',
                                       'HADM_ID':'encounter',                               
                                       'ICUSTAY_ID':'partOf',
                                       'ITEMID':'code',
                                       'CGID':'performer',
                                       'CHARTTIME':'effectiveDateTime',
                                       'VALUE':'value',
                                       'VALUENUM':'value_quantity',
                                       'VALUEUOM':'unit',
                                       'WARNING':'interpretation',
                                       'RESULTSTATUS':'status',
                                       'CATEGORY':'category_sub'}, inplace=True)

        observation_ce = observation_ce.reindex(columns=['identifier',
                                                         'subject', 
                                                         'encounter', 
                                                         'partOf', 
                                                         'code',
                                                         'effectiveDateTime',
                                                         'performer',
                                                         'value',
                                                         'value_quantity',
                                                         'unit', 
                                                         'interpretation',
                                                         'status',
                                                         'note',
                                                         'category_sub',
                                                         'category'], copy=False)

#         observation_ce.to_csv(output_filename + '.csv.gz',
#                               compression={'method': 'gzip', 'compresslevel': 1},
#                               index=False, mode='a')
        
#         observation_ce.to_csv(output_filename + '.csv', index=False, mode='a')
        
        # will create a lot of files, no append mode
        observation_ce.to_parquet(f"{output_filename}_{i}.parquet", 
                                  compression='snappy', index=False)

        # force free mem, for some reasons without it, RAM ends pretty quick
        gc.collect()
        # show execution time per chunk
        print(f"--- {time.time() - start_time} seconds ---", flush=True)

        
transform_chartevents(data_path, output_path)
# observation_ce = transform_chartevents(data_path, output_path)
# observation_ce.head()

## Dask

In [None]:
d_items = pd.read_csv(data_path+'D_ITEMS.csv.gz', index_col=0,
                  # dropped 'ABBREVIATION', 'LINKSTO', 'CONCEPTID', 'UNITNAME'
                  usecols=['ROW_ID', 'ITEMID', 'LABEL', 'DBSOURCE', 'CATEGORY', 'PARAM_TYPE'],
                  dtype={'ROW_ID': int, 'ITEMID': int, 'LABEL': str, 'DBSOURCE': 'category',
                         'CATEGORY': 'category', 'PARAM_TYPE': str})

d_items.set_index('ITEMID', inplace=True)
d_items.head()

In [None]:
# client = Client(threads_per_worker=2, n_workers=4, memory_limit='2.5GB')
client = Client()  # it will consume all memory by default
client

In [None]:
# Warning gzip compression does not support breaking apart files
# Q: what default blocksize? A: https://github.com/dask/dask/pull/1328 (32M)
# max 64e6 bytes ~61MB
df =  dd.read_csv(data_path + 'CHARTEVENTS' + '.csv',
                  usecols=['ROW_ID', 'SUBJECT_ID', 'HADM_ID', 'ICUSTAY_ID', 'ITEMID', 'CHARTTIME',
                           'STORETIME', 'CGID', 'VALUE', 'VALUENUM', 'VALUEUOM', 'WARNING', 'ERROR',
                           'RESULTSTATUS', 'STOPPED'],
                  dtype={'ROW_ID': int, 'SUBJECT_ID': int, 'HADM_ID': int, 'ICUSTAY_ID': float,
                         'ITEMID': int, 'CGID': float, 'VALUE': str, 'VALUENUM': float, 
                         'VALUEUOM': str, 'WARNING': float, 'ERROR': float,
                         'RESULTSTATUS': str, 'STOPPED': str},
                  parse_dates=['CHARTTIME'])
#                   blocksize=64e6) 

# it will take a while, like 4min
# raises "TypeError: memoryview: cannot cast view with zeros in shape or strides"
# at the final stages
# df = df.set_index('ITEMID')

In [None]:
observation_ce = dd.merge(df, d_items, left_on='ITEMID', right_index=True)

In [None]:
output_filename = output_path+'observation_ce'

observation_ce['note'] = observation_ce['LABEL'].str.cat(observation_ce['DBSOURCE'], sep=' ', na_rep='NA')
observation_ce['note'] = observation_ce['note'].str.cat(observation_ce['PARAM_TYPE'], sep=' ', na_rep='')

# No "Try using .loc[row_indexer,col_indexer] = value instead" warning
# .loc way is not implemented/usable in dask
mask = observation_ce['STOPPED'] == "D/C'd"
observation_ce[mask]['RESULTSTATUS'] = 'discharged'
mask2 = observation_ce['ERROR'] == 1
observation_ce[mask]['RESULTSTATUS'] = 'Error'

# New columns to adapt to Chartevents observations
observation_ce['category'] = 'chartevents'  # ????

observation_ce.drop(columns=['LABEL', 'PARAM_TYPE', 'ERROR', 'DBSOURCE', 'STOPPED'])

observation_ce.rename(columns={'ROW_ID':'identifier',
                               'SUBJECT_ID':'subject',
                               'HADM_ID':'encounter',                               
                               'ICUSTAY_ID':'partOf',
                               'ITEMID':'code',
                               'CGID':'performer',
                               'CHARTTIME':'effectiveDateTime',
                               'VALUE':'value',
                               'VALUENUM':'value_quantity',
                               'VALUEUOM':'unit',
                               'WARNING':'interpretation',
                               'RESULTSTATUS':'status',
                               'CATEGORY':'category_sub'})

# dask loves parquet
# using pyarrow: https://github.com/dask/dask/issues/6587
# observation_ce.to_csv(output_filename + '.csv')
observation_ce.to_parquet(output_filename + '.parquet', schema="infer")

Attemt to use `map_partition(foo)` failed with issues from header

In [None]:
# https://stackoverflow.com/questions/41806850/dask-difference-between-client-persist-and-client-compute
# observation_ce.persist()  # in bg
# observation_ce.compute()

## parquet to single file

+ https://arrow.apache.org/docs/python/dataset.html
+ https://stackoverflow.com/questions/47191675/pandas-write-dataframe-to-parquet-format-with-append
+ **https://stackoverflow.com/questions/47113813/using-pyarrow-how-do-you-append-to-parquet-file**

### Use dataset (dir with a bunch of small parquet files)

it need to be dowloaded into the memory in order to save as a single file

https://arrow.apache.org/docs/python/generated/pyarrow.dataset.Dataset.html#pyarrow.dataset.Dataset.to_table
>Note that this method reads all the selected data from the dataset into memory.

It has problems: https://issues.apache.org/jira/browse/ARROW-12080
>The first table schema becomes a common schema for the full Dataset. It is a problem if all values in the first table is na.
> It ignores pandas dtypes

```python
import pandas as pd 
import pyarrow.parquet as pq
import pyarrow as pa
import pyarrow.dataset as ds
import shutil
from pathlib import Path

data_path = './data/mimic-iii-clinical-database-1.4/'
output = "mydataset.parquet"
if Path(output).exists():
    shutil.rmtree(output)

d_items = pd.read_csv(data_path+'D_ITEMS.csv.gz', index_col=0,
                  # dropped 'ABBREVIATION', 'LINKSTO', 'CONCEPTID', 'UNITNAME'
                  usecols=['ROW_ID', 'ITEMID', 'LABEL', 'DBSOURCE', 'CATEGORY', 'PARAM_TYPE'],
                  dtype={'ROW_ID': int, 'ITEMID': int, 'LABEL': str, 'DBSOURCE': str,
                         'CATEGORY': str, 'PARAM_TYPE': str}, chunksize=1000)

for i, chunk in enumerate(d_items):
    # create a parquet table from your dataframe
    table = pa.Table.from_pandas(chunk)
    if i == 0:
        schema = pa.Schema.from_pandas(chunk)
#         schema = table.schema
    # write direct to your parquet file
    pq.write_to_dataset(table, root_path=output)
    
    
print(dataset.schema.types)
# try to convert dataset to a single file
# dataset = ds.dataset(output, schema=schema)
dataset = ds.dataset(output)
# it will load all into the memory
dataset.to_table()
```

#### Test case
will crash at random

```python
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pandas as pd
import numpy as np
import shutil
from pathlib import Path


output='./tmp.parquet'
if Path(output).exists():
    shutil.rmtree(output)

df = pd.DataFrame({'A': [np.nan, np.nan, '3', np.nan],
                   'B': ['1', '2', '3', np.nan]},
                  dtype=str).to_csv('tmp.csv', index=False)

for i, chunk in enumerate(pd.read_csv('tmp.csv', dtype={'A': str, 'B': str}, chunksize=1)):
    # create a parquet table from your dataframe
    table = pa.Table.from_pandas(chunk)
    print(table.schema.types)
    # write direct to your parquet file
    pq.write_to_dataset(table, root_path=output)

# it will crash randomly
dataset = ds.dataset('./tmp.parquet')
print('dataset schema: ', dataset.schema.types)
print(dataset.to_table().to_pandas())
```

---

### Use single parquet file from start

Has the same bug/problems: https://issues.apache.org/jira/browse/ARROW-12080

```python
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
import pandas as pd

# will crash with chunksize=1000
chunksize=10000 # this is the number of lines
data_path = './data/mimic-iii-clinical-database-1.4/'
output = 'sample.parquet'
Path(output).unlink(missing_ok=True)

d_items = pd.read_csv(data_path+'D_ITEMS.csv.gz', index_col=0,
                  # dropped 'ABBREVIATION', 'LINKSTO', 'CONCEPTID', 'UNITNAME'
                  usecols=['ROW_ID', 'ITEMID', 'LABEL', 'DBSOURCE', 'CATEGORY', 'PARAM_TYPE'],
                  dtype={'ROW_ID': int, 'ITEMID': int, 'LABEL': str, 'DBSOURCE': str,
                         'CATEGORY': str, 'PARAM_TYPE': str}, chunksize=chunksize)


table = pa.Table.from_pandas(next(d_items))
with pq.ParquetWriter(output, table.schema) as pqwriter:
    pqwriter.write_table(table)
    for chunk in d_items:
        table = pa.Table.from_pandas(chunk)
        pqwriter.write_table(table)

    
df1 = pd.read_parquet(output)

df2 = pd.read_csv(data_path+'D_ITEMS.csv.gz', index_col=0,
                  # dropped 'ABBREVIATION', 'LINKSTO', 'CONCEPTID', 'UNITNAME'
                  usecols=['ROW_ID', 'ITEMID', 'LABEL', 'DBSOURCE', 'CATEGORY', 'PARAM_TYPE'],
                  dtype={'ROW_ID': int, 'ITEMID': int, 'LABEL': str, 'DBSOURCE': str,
                         'CATEGORY': str, 'PARAM_TYPE': str})

df1.equals(df2)
df1.head()
```