Skip to content

parsing dates on read_csv vs map_partitions(to_datetime) #3717

@nr7s

Description

@nr7s

Before anything else thanks for the continuous improvement of dask. And sorry for the long issue and verbosity.

I have hundreds of CSV files with 2 columns (timestamp, value). I want to read these CSV, parse the timestamp column into an appropriate dtype and save all of these into a single parquet that contains all the CSV files melted and with appropriate dtypes.

I set up some code in dask, that would load each CSV file with parse_dates=['timestamp'] active, concatenate all the files and save them to a parquet file. It was a pretty clean graph but for some reason reading the CSV was still taking too long compared to pandas.

I didn't understand why. I imagine dask also uses to_datetime from pandas and the time difference was so drastic I doubted it was due to communication overhead.

But in trying to find a solution I ended up, with something I don't completely understand and would like some clarification.

I have 2 issues with the reproducible example I created:

  • I know that if the data I have fits in memory I could use pandas instead, but the timestamp was taking too long and I wanted to parallelize it.
  • I wasn't able to isolate the to_datetime operation as well as I would like.

Outline of example:

  1. Get a data frame and save it to a CSV with the format of the data I was dealing with.
  2. Load those 2 CSV, convert timestamp from object to date time, concatenate them and save them to a parquet.

The parsing in step is done first with pandas read_csv, then with dask.read_csv and after that with map_partitions. Initially, I was using workers with multiple threads but I imagine that stuff that messes with string (to_datetime) doesn't release the GIL so I changed to a single thread per worker.

Example
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=2, threads_per_worker=1)
client = Client(cluster)
/Users/neuronist/.virtualenvs/dev/lib/python3.6/site-packages/dask/context.py:23: UserWarning: The dask.set_options function has been deprecated. Please use dask.config.set instead
  warnings.warn("The dask.set_options function has been deprecated. "
client

Client

Cluster

  • Workers: 2
  • Cores: 2
  • Memory: 8.59 GB
def generate_df(size):
    '''get a dataframe with size rows with a timestamp and a value column
    format the timestamp column according to a specific format
    '''
    d = {'timestamp' : pd.date_range('1980-01-01', periods=size, freq='1T'),
         'value' : np.random.randn(size),}
    df = pd.DataFrame(d)
    df.timestamp = df.timestamp.apply(lambda x: x.strftime('%d/%m/%Y %H:%M:%S'))
    return df
df1 = generate_df(10000)
df2 = generate_df(10000)
df1.to_csv('df1', index=False)
df2.to_csv('df2', index=False)
dfs = ['df1', 'df2']
%%timeit
full = [pd.read_csv(path, parse_dates=['timestamp'], dayfirst=True)
        for path in dfs]
pd.concat(full)
#to_parquet('test1', engine='fastparquet', compression='snappy')
4.21 s ± 713 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
%%timeit
full = [dd.read_csv(path, parse_dates=['timestamp'], dayfirst=True)
        for path in dfs]
dd.concat(full).compute()
#.to_parquet('test2', engine='fastparquet', compression='snappy')
4.3 s ± 194 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
def read_and_parse(path):
    '''load the csv without parsing the datetime column
    and then parse it using map_partitions
    '''
    df = dd.read_csv(path)
    meta = ('timestamp', 'datetime64[ns]')
    df.timestamp = df.timestamp.map_partitions(pd.to_datetime, dayfirst=True, 
                                               meta=meta)
    return df
%%timeit
full = [read_and_parse(path) for path in dfs]
dd.concat(full).compute()
#.to_parquet('test3', engine='fastparquet', compression='snappy')
1.98 s ± 78.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

Here are the graphs of both dask operations if it helps. I sometimes have trouble understanding what some nodes mean like partial_by_order, I don't find an explanation for these operations anywhere the source code for partial_by_order has a single example that I coulnd't really abstract to other stuff.

Graphs parse_dates on read_csv

ddreadcsvparsedates

to_datetime with map_partitions

ddmappartitionsparsedates

There's a relevant speed increase from the parse_dates in dd.read_csv to my map_partitions approach but I don't understand why. Do you understand this difference?

In the end, I switched from concatenating the files on dask and just appended them directly to the parquet as each CSV read had exactly the same format.

Thanks.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions