New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Performance improvements loading dask dataframes from many big parquet files in cloud storage #7750
Comments
Before looking at the code, this is an interesting question. I don't believe there's a case elsewhere in dask where dask is used client-side in generating the graph. In dask/fastparquet#619 , we at least concurrent fetch the metadata bytes if the filesystem happens to be async and support |
I may be misunderstanding, but the arrow-based engines will fetch the parquet metadata in parallel via create_metadata_file when the global metadata is required, but there is not Re arrow:
Can you say more about this and share the command that is taking a long time? (I certainly believe you, but I'm also interested to know more) @hhuuggoo - I agree that we should add |
Well, now I'm questioning what I observed! Let me attempt to reproduce. |
@rjzamora @martindurant I dug a litlte bit deeper (but I haven't dug into the arrow engine yet to confirm. Will look later). The dataset I was looking at was 800k files, and is non-public. However I took the NYC taxi dataset, and I re-partitioned it to create way more files than anyone should have for the data (30k files). That bucket/directory should be public if anyone wants to experiment. For this number of files listing the directory takes 7.5 seconds
Passing the files into fastparquet is fast - 1 second
Passing the files into pyarrow is slow - 35 seconds
Passing the directories into fastparquet and pyarrow are both fast - 1 second
|
@hhuuggoo I did some profiling on this and have some insights on the root cause. I believe this issue does not have much to do with pyarrow or dask. ProfilescProfilecProfile ordered by internal time: Click here
PyInstruments ProfilePyInstruments Profile: Click here
_ ._ __/__ _ _ _ _ _/_ Recorded: 12:41:01 Samples: 1179
/_//_/// /_\ / //_// / //_'/ // Duration: 13.376 CPU time: 10.891
/ _/ v3.4.2
Program: /Users/aktech/anaconda3/envs/dask-dev/lib/python3.8/site-packages/ipykernel_launcher.py -f /Users/aktech/Library/Jupyter/runtime/kernel-71f2caac-f877-4a62-9b1d-0314dea6a48a.json
13.376 run_code IPython/core/interactiveshell.py:3377
└─ 13.376 <module> <ipython-input-8-8697f9619301>:6
└─ 13.376 read_parquet <ipython-input-5-9f00d55c0c2f>:1
└─ 13.376 read_parquet dask/dataframe/io/parquet/core.py:98
├─ 13.206 read_metadata dask/dataframe/io/parquet/arrow.py:483
│ └─ 13.168 _gather_metadata dask/dataframe/io/parquet/arrow.py:1674
│ ├─ 10.733 _get_dataset_object dask/dataframe/io/parquet/arrow.py:1624
│ │ └─ 10.725 __init__ pyarrow/parquet.py:1243
│ │ [14 frames hidden] pyarrow, urllib, <built-in>, <string>
│ │ 10.720 _make_manifest pyarrow/parquet.py:1436
│ │ └─ 10.720 wrapper fsspec/asyn.py:84
│ │ └─ 10.720 sync fsspec/asyn.py:37
│ │ ├─ 10.347 wait threading.py:540
│ │ │ [4 frames hidden] threading, <built-in>
│ │ │ 10.347 lock.acquire <built-in>:0
│ │ └─ 0.373 run_coroutine_threadsafe asyncio/tasks.py:900
│ │ [5 frames hidden] asyncio, <built-in>
│ └─ 2.435 get_metadata pyarrow/parquet.py:754
│ [4 frames hidden] pyarrow
│ 2.419 __init__ pyarrow/parquet.py:214
│ └─ 2.419 read fsspec/spec.py:1449
│ └─ 2.419 _fetch fsspec/caching.py:345
│ └─ 2.419 _fetch_range s3fs/core.py:1847
│ [2 frames hidden] s3fs
│ 2.419 _fetch_range s3fs/core.py:1978
│ ├─ 2.024 wrapper fsspec/asyn.py:84
│ │ └─ 2.024 sync fsspec/asyn.py:37
│ │ └─ 2.024 wait threading.py:540
│ │ [4 frames hidden] threading, <built-in>
│ └─ 0.395 sync fsspec/asyn.py:37
│ └─ 0.395 wait threading.py:540
│ [4 frames hidden] threading, <built-in>
└─ 0.137 get_engine dask/dataframe/io/parquet/core.py:802 SummaryThe bottleneck is call to Minimal Example:import s3fs
fs = s3fs.S3FileSystem(anon=True)
files = fs.ls('s3://saturn-public-data/nyc-taxi/taxi_parquet_speed_test')
def minimal_example(files):
for path in files:
fs.isfile(path) Timings%%time
minimal_example(files[:1000])
CPU times: user 11.2 s, sys: 72.4 ms, total: 11.3 s
Wall time: 11.3 s
%%time
fs.isfile(files[0])
CPU times: user 14.5 ms, sys: 1.27 ms, total: 15.7 ms
Wall time: 14.6 ms If it takes 14.6 ms for doing Also note that, the slowness in not particularly in Potential SolutionMaking Something like (in function + if isinstance(path, str):
+ paths = [path]
+ else:
+ paths = path
+ k = await self._info(paths[0])
+ out = await asyncio.gather(
+ *[self._info(path) for path in paths],
+ return_exceptions=True,
+ )
+ return [
+ o["type"] == "file" for o in out
+ ] Although the above suggestion doesn't makes it fast, probably because the function _ls_from_cache is blocking, which is called here in @martindurant any thoughts on this?
|
Yes a couple of thoughts:
|
(Note that fsspec/filesystem_spec#676 might also help; releases are coming in the next couple of days) |
@aktech I think there are 2 things to look at here
|
There is no reason that fastparquet, in its current form, can't support split_row_groups. This option means multiple row-groups per dask partition (usually, but not necessarily, grouped within files). That means partitions get bigger - is that what you are after? Note that fastparquet doesn't have any threading internally, so you would loose a little parallelism by doing this, but have potentially much smaller graphs, and maybe benefit from readahead-caching of the data files. |
I was using fsspec from main branch and now after using s3fs as well from the main branch, it doesn't seems to affect the timing of the minimal example in my previous comment.
I am indeed listing the directory in the minimal example above: files = fs.ls('s3://saturn-public-data/nyc-taxi/taxi_parquet_speed_test')
Where would you use that? I am probably missing something here, using it directly instead of
I have not played with this yet, will let you know as soon as I do |
@martindurant @aktech sorry for the delay. @aktech prepped a file that has row groups we can use for reference.
being able to split_row_groups, without having to pay the overhead of gathering statistics (which can be quite expensive for many files) is what I was looking for. Spitting row groups splits each parquet file into multiple dataframes/partitions, resulting in smaller dataframes. @martindurant do you think implementing parallell/distributed statistics gathering would be a bad idea? |
@hhuuggoo from the conversation above, there are some things that we might do immediately around btw: have you tried with latest fsspec/fastparquet? It ought to now fetch all of the file metadata footers
This does break a dask model, by which the creation of a graph is done by the client, not by executing another graph. I don't see a problem with doing it conceptually, You actually caused my to make a small fix in fastparquet (which may be released tomorrow): dask/fastparquet#652 . Following that, instantiating ParquetFile took 4.4s on the first go (including setting up S3 credentials, imports, etc) and 0.65s thereafter, versus 1.4-2.2s to make the uncomputed dataframe. Of course, dask is faster to actually load the dataframe. |
This is usually true. However, there certainly are cases where we execute a graph on the workers to generate the necessary information to build a new graph on the client (e.g. |
@martindurant I haven't tried the latest fastparquet - I will do so, but I no longer have access to the problematic dataset in question. I can close this though, and if it becomes an issue again, I can raise/PR. Thanks! |
I'd like to improve the performance of loading dask dataframes from parquet when
I've experimented with both the Arrow dataset/legacy enginges, as well as the FastParquet engines.
arrow
split_row_groups
option seems to work well for reducing the memory footprint. It splits up a single parquet file into multiple dataframes, which mitigates the memory problem.fastparquet
gather_statistics
is False.split_row_groups
so the resulting dataframes are too large.The following code addresses this issue with the fastparquet backend (it's not suitable to be merged for many reasons)
I wanted to see what others thought about how this could/should be solved. #7557 seems to be a more general version for solving this problem. I would need to adapt that for fastparquet. The approach in #7557 requires parquet metadata, In my code snippet I gather row-group information in parallel using dask. Would there be interest in distributed fetches of fastparquet metadata?
tagging @aktech who will be working on it with me. Also cc @martindurant and @rjzamora, since we've chatted about this a bit.
The text was updated successfully, but these errors were encountered: