Skip to content

Commit

Permalink
Avoid collecting parquet metadata in pyarrow when write_metadata_file…
Browse files Browse the repository at this point in the history
…=False (#8906)

* avoid collecting metadata in pyarrow write when write_metadata_file is False anyway

* remove confusing syntax

* use 'store-' rather than 'metadata-' in final task name when metadata is not being written

* add comment explaining extra task
  • Loading branch information
rjzamora committed Apr 13, 2022
1 parent 783b15b commit 0ccad39
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 9 deletions.
14 changes: 11 additions & 3 deletions dask/dataframe/io/parquet/arrow.py
Expand Up @@ -72,6 +72,7 @@ def _write_partitioned(
pandas_to_arrow_table,
preserve_index,
index_cols=(),
return_metadata=True,
**kwargs,
):
"""Write table to a partitioned dataset with pyarrow.
Expand Down Expand Up @@ -119,8 +120,14 @@ def _write_partitioned(
fs.mkdirs(prefix, exist_ok=True)
full_path = fs.sep.join([prefix, filename])
with fs.open(full_path, "wb") as f:
pq.write_table(subtable, f, metadata_collector=md_list, **kwargs)
md_list[-1].set_file_path(fs.sep.join([subdir, filename]))
pq.write_table(
subtable,
f,
metadata_collector=md_list if return_metadata else None,
**kwargs,
)
if return_metadata:
md_list[-1].set_file_path(fs.sep.join([subdir, filename]))

return md_list

Expand Down Expand Up @@ -680,6 +687,7 @@ def write_partition(
preserve_index,
index_cols=index_cols,
compression=compression,
return_metadata=return_metadata,
**kwargs,
)
if md_list:
Expand All @@ -693,7 +701,7 @@ def write_partition(
t,
fil,
compression=compression,
metadata_collector=md_list,
metadata_collector=md_list if return_metadata else None,
**kwargs,
)
if md_list:
Expand Down
18 changes: 12 additions & 6 deletions dask/dataframe/io/parquet/core.py
Expand Up @@ -808,10 +808,10 @@ def to_parquet(

# Collect metadata and write _metadata.
# TODO: Use tree-reduction layer (when available)
meta_name = "metadata-" + data_write._name
if write_metadata_file:
final_name = "metadata-" + data_write._name
dsk = {
(meta_name, 0): (
(final_name, 0): (
apply,
engine.write_metadata,
[
Expand All @@ -824,16 +824,22 @@ def to_parquet(
)
}
else:
dsk = {(meta_name, 0): (lambda x: None, data_write.__dask_keys__())}
# NOTE: We still define a single task to tie everything together
# when we are not writing a _metadata file. We do not want to
# return `data_write` (or a `data_write.to_bag()`), because calling
# `compute()` on a multi-partition collection requires the overhead
# of trying to concatenate results on the client.
final_name = "store-" + data_write._name
dsk = {(final_name, 0): (lambda x: None, data_write.__dask_keys__())}

# Convert data_write + dsk to computable collection
graph = HighLevelGraph.from_collections(meta_name, dsk, dependencies=(data_write,))
graph = HighLevelGraph.from_collections(final_name, dsk, dependencies=(data_write,))
if compute:
return compute_as_if_collection(
Scalar, graph, [(meta_name, 0)], **compute_kwargs
Scalar, graph, [(final_name, 0)], **compute_kwargs
)
else:
return Scalar(graph, meta_name, "")
return Scalar(graph, final_name, "")


def create_metadata_file(
Expand Down

0 comments on commit 0ccad39

Please sign in to comment.