Skip to content

Commit

Permalink
Deprecate gather_statistics from read_parquet (#8992)
Browse files Browse the repository at this point in the history
* change the split_row_groups default to False

* update docs

* deprecate gather_statistics

* update split_row_groups guidance

* remove paste error

* fix s3 tests
  • Loading branch information
rjzamora committed Apr 28, 2022
1 parent e429efa commit 3715ab5
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 214 deletions.
2 changes: 1 addition & 1 deletion dask/bytes/tests/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ def test_parquet(s3, engine, s3so, metadata_file):
assert "part.0.parquet" in files

df2 = dd.read_parquet(
url, index="foo", gather_statistics=True, engine=engine, storage_options=s3so
url, index="foo", calculate_divisions=True, engine=engine, storage_options=s3so
)
assert len(df2.divisions) > 1

Expand Down
60 changes: 14 additions & 46 deletions dask/dataframe/io/parquet/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
_parse_pandas_metadata,
_process_open_file_options,
_row_groups_to_parts,
_set_gather_statistics,
_set_metadata_task_size,
_sort_and_analyze_paths,
_split_user_options,
Expand Down Expand Up @@ -820,8 +821,6 @@ def _collect_dataset_info(
**_dataset_kwargs,
)
has_metadata_file = True
if gather_statistics is None:
gather_statistics = True
elif parquet_file_extension:
# Need to materialize all paths if we are missing the _metadata file
# Raise error if all files have been filtered by extension
Expand Down Expand Up @@ -854,8 +853,6 @@ def _collect_dataset_info(
**_dataset_kwargs,
)
has_metadata_file = True
if gather_statistics is None:
gather_statistics = True

# Populate valid_paths, since the original path list
# must be used to filter the _metadata-based dataset
Expand All @@ -875,24 +872,6 @@ def _collect_dataset_info(
**_dataset_kwargs,
)

# At this point, we know if `split_row_groups` should be
# set to `True` by default. If the user has not specified
# this option, we will only collect statistics if there is
# a global "_metadata" file available, otherwise we will
# opt for `gather_statistics=False`. For `ArrowDatasetEngine`,
# statistics are only required to calculate divisions
# and/or aggregate row-groups using `chunksize` (not for
# filtering).
#
# By default, we will create an output partition for each
# row group in the dataset (`split_row_groups=True`).
# However, we will NOT split by row-group if
# `gather_statistics=False`, because this can be
# interpreted as an indication that metadata overhead should
# be avoided at all costs.
if gather_statistics is None:
gather_statistics = False

# Deal with directory partitioning
# Get all partition keys (without filters) to populate partition_obj
partition_obj = [] # See `partition_info` description below
Expand Down Expand Up @@ -1137,20 +1116,6 @@ def _construct_collection_plan(cls, dataset_info):
dataset_info["metadata_task_size"], fs
)

# Cannot gather_statistics if our `metadata` is a list
# of paths, or if we are building a multiindex (for now).
# We also don't "need" to gather statistics if we don't
# want to apply any filters or calculate divisions. Note
# that the `ArrowDatasetEngine` doesn't even require
# `gather_statistics=True` for filtering.
_need_aggregation_stats = chunksize or (
int(split_row_groups) > 1 and aggregation_depth
)
if len(index_cols) > 1:
gather_statistics = False
elif not _need_aggregation_stats and filters is None and len(index_cols) == 0:
gather_statistics = False

# Make sure that any `in`-predicate filters have iterable values
filter_columns = set()
if filters is not None:
Expand All @@ -1163,23 +1128,26 @@ def _construct_collection_plan(cls, dataset_info):
filter_columns.add(col)

# Determine which columns need statistics.
# At this point, gather_statistics is only True if
# the user specified calculate_divisions=True
stat_col_indices = {}
_index_cols = index_cols if (gather_statistics and len(index_cols) == 1) else []
for i, name in enumerate(schema.names):
if name in index_cols or name in filter_columns:
if name in _index_cols or name in filter_columns:
if name in partition_names:
# Partition columns wont have statistics
continue
stat_col_indices[name] = i

# If the user has not specified `gather_statistics`,
# we will only do so if there are specific columns in
# need of statistics.
# NOTE: We cannot change `gather_statistics` from True
# to False (even if `stat_col_indices` is empty), in
# case a `chunksize` was specified, and the row-group
# statistics are needed for part aggregation.
if gather_statistics is None:
gather_statistics = bool(stat_col_indices)
# Decide final `gather_statistics` setting
gather_statistics = _set_gather_statistics(
gather_statistics,
chunksize,
split_row_groups,
aggregation_depth,
filter_columns,
set(stat_col_indices),
)

# Add common kwargs
common_kwargs = {
Expand Down
54 changes: 32 additions & 22 deletions dask/dataframe/io/parquet/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def read_parquet(
index=None,
storage_options=None,
engine="auto",
gather_statistics=None,
calculate_divisions=None,
ignore_metadata_file=False,
metadata_task_size=None,
split_row_groups=False,
Expand Down Expand Up @@ -240,11 +240,14 @@ def read_parquet(
installed, and falls back to ``pyarrow`` otherwise. Note that in the
future this default ordering for 'auto' will switch, with ``pyarrow``
being used if it is installed, and falling back to ``fastparquet``.
gather_statistics : bool, default None
Gather the statistics for each dataset partition. By default,
this will only be done if the _metadata file is available. Otherwise,
statistics will only be gathered if True, because the footer of
every file will be parsed (which is very slow on some systems).
calculate_divisions : bool, default False
Whether to use min/max statistics from the footer metadata (or global
``_metadata`` file) to calculate divisions for the output DataFrame
collection. Divisions will not be calculated if statistics are missing.
This option will be ignored if ``index`` is not specified and there is
no physical index column specified in the custom "pandas" Parquet
metadata. Note that ``calculate_divisions=True`` may be extremely slow
on some systems, and should be avoided when reading from remote storage.
ignore_metadata_file : bool, default False
Whether to ignore the global ``_metadata`` file (when one is present).
If ``True``, or if the global ``_metadata`` file is missing, the parquet
Expand All @@ -271,11 +274,10 @@ def read_parquet(
value. Use `aggregate_files` to enable/disable inter-file aggregation.
aggregate_files : bool or str, default None
Whether distinct file paths may be aggregated into the same output
partition. This parameter requires ``gather_statistics=True``, and is
only used when ``chunksize`` is specified or when ``split_row_groups`` is
an integer > 1. A setting of ``True`` means that any two file paths may be
aggregated into the same output partition, while ``False`` means that
inter-file aggregation is prohibited.
partition. This parameter is only used when `chunksize` is specified
or when `split_row_groups` is an integer >1. A setting of True means
that any two file paths may be aggregated into the same output partition,
while False means that inter-file aggregation is prohibited.
For "hive-partitioned" datasets, a "partition"-column name can also be
specified. In this case, we allow the aggregation of any two files
Expand Down Expand Up @@ -342,6 +344,23 @@ def read_parquet(
FutureWarning,
)

# Handle gather_statistics deprecation
if "gather_statistics" in kwargs:
if calculate_divisions is None:
calculate_divisions = kwargs.pop("gather_statistics")
warnings.warn(
"``gather_statistics`` is deprecated and will be removed in a "
"future release. Please use ``calculate_divisions`` instead.",
FutureWarning,
)
else:
warnings.warn(
f"``gather_statistics`` is deprecated. Ignoring this option "
f"in favor of ``calculate_divisions={calculate_divisions}``",
FutureWarning,
)
calculate_divisions = bool(calculate_divisions)

# We support a top-level `parquet_file_extension` kwarg, but
# must check if the deprecated `require_extension` option is
# being passed to the engine. If `parquet_file_extension` is
Expand All @@ -368,7 +387,7 @@ def read_parquet(
"index": index,
"storage_options": storage_options,
"engine": engine,
"gather_statistics": gather_statistics,
"calculate_divisions": calculate_divisions,
"ignore_metadata_file": ignore_metadata_file,
"metadata_task_size": metadata_task_size,
"split_row_groups": split_row_groups,
Expand Down Expand Up @@ -407,21 +426,12 @@ def read_parquet(
if index and isinstance(index, str):
index = [index]

if chunksize or (
split_row_groups and int(split_row_groups) > 1 and aggregate_files
):
# Require `gather_statistics=True` if `chunksize` is used,
# or if `split_row_groups>1` and we are aggregating files.
if gather_statistics is False:
raise ValueError("read_parquet options require gather_statistics=True")
gather_statistics = True

read_metadata_result = engine.read_metadata(
fs,
paths,
categories=categories,
index=index,
gather_statistics=gather_statistics,
gather_statistics=calculate_divisions,
filters=filters,
split_row_groups=split_row_groups,
chunksize=chunksize,
Expand Down
58 changes: 16 additions & 42 deletions dask/dataframe/io/parquet/fastparquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
_parse_pandas_metadata,
_process_open_file_options,
_row_groups_to_parts,
_set_gather_statistics,
_set_metadata_task_size,
_sort_and_analyze_paths,
_split_user_options,
Expand Down Expand Up @@ -381,11 +382,6 @@ def _collect_dataset_info(
# Also, initialize `parts`. If `parts` is populated here,
# then each part will correspond to a file. Otherwise, each part will
# correspond to a row group (populated later).
#
# This logic is mostly to handle `gather_statistics=False` cases,
# because this also means we should avoid scanning every file in the
# dataset. If _metadata is available, set `gather_statistics=True`
# (if `gather_statistics=None`).

# Extract "supported" key-word arguments from `kwargs`.
# Split items into `dataset_kwargs` and `read_kwargs`
Expand Down Expand Up @@ -423,8 +419,6 @@ def _collect_dataset_info(
open_with=fs.open,
**dataset_kwargs,
)
if gather_statistics is None:
gather_statistics = True
else:
# Use 0th file
# Note that "_common_metadata" can cause issues for
Expand Down Expand Up @@ -636,47 +630,27 @@ def _construct_collection_plan(cls, dataset_info):
dataset_info["metadata_task_size"], fs
)

# We don't "need" to gather statistics if we don't
# want to apply filters, aggregate files, or calculate
# divisions.
_need_aggregation_stats = chunksize or (
int(split_row_groups) > 1 and aggregation_depth
)
if len(index_cols) > 1:
gather_statistics = False
elif not _need_aggregation_stats and filters is None and len(index_cols) == 0:
gather_statistics = False

# Make sure gather_statistics allows filtering
# (if filters are desired)
if filters:
# Filters may require us to gather statistics
if gather_statistics is False and pf.cats:
warnings.warn(
"Filtering with gather_statistics=False. "
"Only partition columns will be filtered correctly."
)
elif gather_statistics is False:
raise ValueError("Cannot apply filters with gather_statistics=False")
elif not gather_statistics:
gather_statistics = True

# Determine which columns need statistics.
# At this point, gather_statistics is only True if
# the user specified calculate_divisions=True
filter_columns = {t[0] for t in flatten(filters or [], container=list)}
stat_col_indices = {}
_index_cols = index_cols if (gather_statistics and len(index_cols) == 1) else []
for i, name in enumerate(pf.columns):
if name in index_cols or name in filter_columns:
if name in _index_cols or name in filter_columns:
stat_col_indices[name] = i

# If the user has not specified `gather_statistics`,
# we will only do so if there are specific columns in
# need of statistics.
# NOTE: We cannot change `gather_statistics` from True
# to False (even if `stat_col_indices` is empty), in
# case a `chunksize` was specified, and the row-group
# statistics are needed for part aggregation.
if gather_statistics is None:
gather_statistics = bool(stat_col_indices)
# Decide final `gather_statistics` setting.
# NOTE: The "fastparquet" engine requires statistics for
# filtering even if the filter is on a paritioned column
gather_statistics = _set_gather_statistics(
gather_statistics,
chunksize,
split_row_groups,
aggregation_depth,
filter_columns,
set(stat_col_indices) | filter_columns,
)

# Define common_kwargs
common_kwargs = {
Expand Down
38 changes: 35 additions & 3 deletions dask/dataframe/io/parquet/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ def read_metadata(
If set to ``None``, pandas metadata (if available) can be used
to reset the value in this function
gather_statistics: bool
Whether or not to gather statistics data. If ``None``, we only
gather statistics data if there is a _metadata file available to
query (cheaply)
Whether or not to gather statistics to calculate divisions
for the output DataFrame collection.
filters: list
List of filters to apply, like ``[('x', '>', 0), ...]``.
**kwargs: dict (of dicts)
Expand Down Expand Up @@ -733,3 +732,36 @@ def _split_user_options(**kwargs):
read_options,
user_kwargs,
)


def _set_gather_statistics(
gather_statistics,
chunksize,
split_row_groups,
aggregation_depth,
filter_columns,
stat_columns,
):
# Use available information about the current read options
# and target dataset to decide if we need to gather metadata
# statistics to construct the graph for a `read_parquet` op.

# If the user has specified `calculate_divisions=True`, then
# we will be starting with `gather_statistics=True` here.
if (
chunksize
or (int(split_row_groups) > 1 and aggregation_depth)
or filter_columns.intersection(stat_columns)
):
# Need to gather statistics if we are aggregating files
# or filtering
# NOTE: Should avoid gathering statistics when the agg
# does not depend on a row-group statistic
gather_statistics = True
elif not stat_columns:
# Not aggregating files/row-groups.
# We only need to gather statistics if `stat_columns`
# is populated
gather_statistics = False

return bool(gather_statistics)

0 comments on commit 3715ab5

Please sign in to comment.