Skip to content

Commit

Permalink
Make filesystem-backend configurable in read_parquet (#9699)
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Dec 15, 2022
1 parent d2c9e39 commit d943293
Show file tree
Hide file tree
Showing 6 changed files with 289 additions and 39 deletions.
93 changes: 78 additions & 15 deletions dask/dataframe/io/parquet/arrow.py
Expand Up @@ -22,7 +22,6 @@
_set_gather_statistics,
_set_metadata_task_size,
_sort_and_analyze_paths,
_split_user_options,
)
from dask.dataframe.io.utils import _get_pyarrow_dtypes, _is_local_fs, _open_input_files
from dask.dataframe.utils import clear_known_categories
Expand All @@ -31,7 +30,10 @@

# Check PyArrow version for feature support
_pa_version = parse_version(pa.__version__)
from fsspec.core import expand_paths_if_needed, stringify_path
from fsspec.implementations.arrow import ArrowFSWrapper
from pyarrow import dataset as pa_ds
from pyarrow import fs as pa_fs

subset_stats_supported = _pa_version > parse_version("2.0.0")
pre_buffer_supported = _pa_version >= parse_version("5.0.0")
Expand Down Expand Up @@ -60,6 +62,11 @@
#


def _wrapped_fs(fs):
"""Return the wrapped filesystem if fs is ArrowFSWrapper"""
return fs.fs if isinstance(fs, ArrowFSWrapper) else fs


def _append_row_groups(metadata, md):
"""Append row-group metadata and include a helpful
error message if an inconsistent schema is detected.
Expand Down Expand Up @@ -332,6 +339,67 @@ class ArrowDatasetEngine(Engine):
# Public Class Methods
#

@classmethod
def extract_filesystem(
cls,
urlpath,
filesystem,
dataset_options,
open_file_options,
storage_options,
):

# Check if filesystem was specified as a dataset option
if filesystem is None:
fs = dataset_options.pop("filesystem", "fsspec")
else:
if "filesystem" in dataset_options:
raise ValueError(
"Cannot specify a filesystem argument if the "
"'filesystem' dataset option is also defined."
)
fs = filesystem

# Handle pyarrow-based filesystem
if isinstance(fs, pa_fs.FileSystem) or fs in ("arrow", "pyarrow"):
if isinstance(urlpath, (list, tuple, set)):
if not urlpath:
raise ValueError("empty urlpath sequence")
urlpath = [stringify_path(u) for u in urlpath]
else:
urlpath = [stringify_path(urlpath)]

if fs in ("arrow", "pyarrow"):
fs = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])(
**(storage_options or {})
)

fsspec_fs = ArrowFSWrapper(fs)
if urlpath[0].startswith("C:") and isinstance(fs, pa_fs.LocalFileSystem):
# ArrowFSWrapper._strip_protocol not reliable on windows
# See: https://github.com/fsspec/filesystem_spec/issues/1137
from fsspec.implementations.local import LocalFileSystem

fs_strip = LocalFileSystem()
else:
fs_strip = fsspec_fs
paths = expand_paths_if_needed(urlpath, "rb", 1, fsspec_fs, None)
return (
fsspec_fs,
[fs_strip._strip_protocol(u) for u in paths],
dataset_options,
{"open_file_func": fs.open_input_file},
)

# Use default file-system initialization
return Engine.extract_filesystem(
urlpath,
fs,
dataset_options,
open_file_options,
storage_options,
)

@classmethod
def read_metadata(
cls,
Expand Down Expand Up @@ -556,7 +624,7 @@ def initialize_write(
metadata_file_exists = False
if append:
# Extract metadata and get file offset if appending
ds = pa_ds.dataset(path, filesystem=fs, format="parquet")
ds = pa_ds.dataset(path, filesystem=_wrapped_fs(fs), format="parquet")
i_offset = len(ds.files)
if i_offset > 0:
try:
Expand Down Expand Up @@ -804,12 +872,8 @@ def _collect_dataset_info(
ds = None
valid_paths = None # Only used if `paths` is a list containing _metadata

# Extract "supported" key-word arguments from `kwargs`
(
_dataset_kwargs,
read_kwargs,
user_kwargs,
) = _split_user_options(**kwargs)
# Extract dataset-specific options
_dataset_kwargs = kwargs.pop("dataset", {})

if "partitioning" not in _dataset_kwargs:
_dataset_kwargs["partitioning"] = "hive"
Expand All @@ -831,7 +895,7 @@ def _collect_dataset_info(
# Use _metadata file
ds = pa_ds.parquet_dataset(
meta_path,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**_dataset_kwargs,
)
has_metadata_file = True
Expand Down Expand Up @@ -859,7 +923,7 @@ def _collect_dataset_info(
if not ignore_metadata_file:
ds = pa_ds.parquet_dataset(
meta_path,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**_dataset_kwargs,
)
has_metadata_file = True
Expand All @@ -873,7 +937,7 @@ def _collect_dataset_info(
if ds is None:
ds = pa_ds.dataset(
paths,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**_dataset_kwargs,
)

Expand Down Expand Up @@ -974,8 +1038,7 @@ def _collect_dataset_info(
"metadata_task_size": metadata_task_size,
"kwargs": {
"dataset": _dataset_kwargs,
"read": read_kwargs,
**user_kwargs,
**kwargs,
},
}

Expand Down Expand Up @@ -1315,7 +1378,7 @@ def _collect_file_parts(
file_frags = list(
pa_ds.dataset(
files_or_frags,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**dataset_options,
).get_fragments()
)
Expand Down Expand Up @@ -1508,7 +1571,7 @@ def _read_table(
# to a single "fragment" to read
ds = pa_ds.dataset(
path_or_frag,
filesystem=fs,
filesystem=_wrapped_fs(fs),
**kwargs.get("dataset", {}),
)
frags = list(ds.get_fragments())
Expand Down
64 changes: 49 additions & 15 deletions dask/dataframe/io/parquet/core.py
Expand Up @@ -15,7 +15,11 @@
from dask.dataframe.backends import dataframe_creation_dispatch
from dask.dataframe.core import DataFrame, Scalar
from dask.dataframe.io.io import from_map
from dask.dataframe.io.parquet.utils import Engine, _sort_and_analyze_paths
from dask.dataframe.io.parquet.utils import (
Engine,
_sort_and_analyze_paths,
_split_user_options,
)
from dask.dataframe.io.utils import DataFrameIOFunction, _is_local_fs
from dask.dataframe.methods import concat
from dask.delayed import Delayed
Expand Down Expand Up @@ -193,6 +197,7 @@ def read_parquet(
chunksize=None,
aggregate_files=None,
parquet_file_extension=(".parq", ".parquet", ".pq"),
filesystem=None,
**kwargs,
):
"""
Expand Down Expand Up @@ -246,6 +251,8 @@ def read_parquet(
data written by dask/fastparquet, not otherwise.
storage_options : dict, default None
Key/value pairs to be passed on to the file-system backend, if any.
Note that the default file-system backend can be configured with the
``filesystem`` argument, described below.
open_file_options : dict, default None
Key/value arguments to be passed along to ``AbstractFileSystem.open``
when each parquet data file is open for reading. Experimental
Expand Down Expand Up @@ -342,19 +349,27 @@ def read_parquet(
unsupported metadata files (like Spark's '_SUCCESS' and 'crc' files).
It may be necessary to change this argument if the data files in your
parquet dataset do not end in ".parq", ".parquet", or ".pq".
filesystem: "fsspec", "arrow", fsspec.AbstractFileSystem, or pyarrow.fs.FileSystem
Filesystem backend to use. Note that the "fastparquet" engine only
supports "fsspec" or an explicit ``pyarrow.fs.FileSystem`` object.
Default is "fsspec".
dataset: dict, default None
Dictionary of options to use when creating a ``pyarrow.dataset.Dataset``
or ``fastparquet.ParquetFile`` object. These options may include a
"filesystem" key (or "fs" for the "fastparquet" engine) to configure
the desired file-system backend. However, the top-level ``filesystem``
argument will always take precedence.
read: dict, default None
Dictionary of options to pass through to ``engine.read_partitions``
using the ``read`` key-word argument.
arrow_to_pandas: dict, default None
Dictionary of options to use when converting from ``pyarrow.Table`` to
a pandas ``DataFrame`` object. Only used by the "arrow" engine.
**kwargs: dict (of dicts)
Passthrough key-word arguments for read backend.
The top-level keys correspond to the appropriate operation type, and
the second level corresponds to the kwargs that will be passed on to
the underlying ``pyarrow`` or ``fastparquet`` function.
Supported top-level keys: 'dataset' (for opening a ``pyarrow`` dataset),
'file' or 'dataset' (for opening a ``fastparquet.ParquetFile``), 'read'
(for the backend read function), 'arrow_to_pandas' (for controlling the
arguments passed to convert from a ``pyarrow.Table.to_pandas()``).
Any element of kwargs that is not defined under these top-level keys
will be passed through to the `engine.read_partitions` classmethod as a
stand-alone argument (and will be ignored by the engine implementations
defined in ``dask.dataframe``).
Options to pass through to ``engine.read_partitions`` as stand-alone
key-word arguments. Note that these options will be ignored by the
engines defined in ``dask.dataframe``, but may be used by other custom
implementations.
Examples
--------
Expand Down Expand Up @@ -446,6 +461,7 @@ def read_parquet(
"chunksize": chunksize,
"aggregate_files": aggregate_files,
"parquet_file_extension": parquet_file_extension,
"filesystem": filesystem,
**kwargs,
}

Expand All @@ -466,7 +482,23 @@ def read_parquet(
# Update input_kwargs
input_kwargs.update({"columns": columns, "engine": engine})

fs, _, paths = get_fs_token_paths(path, mode="rb", storage_options=storage_options)
# Process and split user options
(
dataset_options,
read_options,
open_file_options,
other_options,
) = _split_user_options(**kwargs)

# Extract global filesystem and paths
fs, paths, dataset_options, open_file_options = engine.extract_filesystem(
path,
filesystem,
dataset_options,
open_file_options,
storage_options,
)
read_options["open_file_options"] = open_file_options
paths = sorted(paths, key=natural_sort_key) # numeric rather than glob ordering

auto_index_allowed = False
Expand All @@ -490,7 +522,9 @@ def read_parquet(
ignore_metadata_file=ignore_metadata_file,
metadata_task_size=metadata_task_size,
parquet_file_extension=parquet_file_extension,
**kwargs,
dataset=dataset_options,
read=read_options,
**other_options,
)

# In the future, we may want to give the engine the
Expand Down
9 changes: 3 additions & 6 deletions dask/dataframe/io/parquet/fastparquet.py
Expand Up @@ -35,7 +35,6 @@
_set_gather_statistics,
_set_metadata_task_size,
_sort_and_analyze_paths,
_split_user_options,
)
from dask.dataframe.io.utils import _is_local_fs, _meta_from_dtypes, _open_input_files
from dask.dataframe.utils import UNKNOWN_CATEGORIES
Expand Down Expand Up @@ -383,9 +382,8 @@ def _collect_dataset_info(
# then each part will correspond to a file. Otherwise, each part will
# correspond to a row group (populated later).

# Extract "supported" key-word arguments from `kwargs`.
# Split items into `dataset_kwargs` and `read_kwargs`
dataset_kwargs, read_kwargs, user_kwargs = _split_user_options(**kwargs)
# Extract dataset-specific options
dataset_kwargs = kwargs.pop("dataset", {})

parts = []
_metadata_exists = False
Expand Down Expand Up @@ -512,8 +510,7 @@ def _collect_dataset_info(
"metadata_task_size": metadata_task_size,
"kwargs": {
"dataset": dataset_kwargs,
"read": read_kwargs,
**user_kwargs,
**kwargs,
},
}

Expand Down

0 comments on commit d943293

Please sign in to comment.