Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make filesystem-backend configurable in read_parquet #9699

Merged
merged 10 commits into from
Dec 15, 2022
203 changes: 168 additions & 35 deletions dask/dataframe/io/parquet/arrow.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import textwrap
import warnings
from collections import defaultdict
from datetime import datetime

Expand All @@ -21,7 +22,6 @@
_set_gather_statistics,
_set_metadata_task_size,
_sort_and_analyze_paths,
_split_user_options,
)
from dask.dataframe.io.utils import _get_pyarrow_dtypes, _is_local_fs, _open_input_files
from dask.dataframe.utils import clear_known_categories
Expand All @@ -30,7 +30,10 @@

# Check PyArrow version for feature support
_pa_version = parse_version(pa.__version__)
from fsspec.core import expand_paths_if_needed, stringify_path
from fsspec.implementations.arrow import ArrowFSWrapper
from pyarrow import dataset as pa_ds
from pyarrow import fs as pa_fs

subset_stats_supported = _pa_version > parse_version("2.0.0")
pre_buffer_supported = _pa_version >= parse_version("5.0.0")
Expand All @@ -42,6 +45,34 @@
#


_s3_note = (
"Note that this version of `ArrowDatasetEngine` will attempt "
"to use `S3FileSystem` by default when reading from s3 storage. "
"If necessary, try passing in `dataset=dict(filesystem='arrow')` "
"to revert the default to `s3fs` (if available)"
)


def _wrapped_fs(fs):
"""Return the wrapped filesystem if fs is ArrowFSWrapper"""
return fs.fs if isinstance(fs, ArrowFSWrapper) else fs


def _with_wrapped_fs(func, *args, filesystem=None, **kwargs):
"""Call a function with a filesystem kwarg that may be wrapped"""
fs = _wrapped_fs(filesystem)
try:
return func(*args, filesystem=fs, **kwargs)
except Exception as err:
if not (hasattr(fs, "type_name") and fs.type_name == "s3"):
raise err
raise type(err)(
f"Call to {func} failed with `filesystem={filesystem}`.\n"
f"{_s3_note}\n"
f"Original Error: {err}"
)


def _append_row_groups(metadata, md):
"""Append row-group metadata and include a helpful
error message if an inconsistent schema is detected.
Expand Down Expand Up @@ -224,27 +255,36 @@ def _read_table_from_path(
else {}
)

with _open_input_files(
[path],
fs=fs,
precache_options=precache_options,
**open_file_options,
)[0] as fil:
if row_groups == [None]:
return pq.ParquetFile(fil, **pre_buffer).read(
columns=columns,
use_threads=False,
use_pandas_metadata=True,
**read_kwargs,
)
else:
return pq.ParquetFile(fil, **pre_buffer).read_row_groups(
row_groups,
columns=columns,
use_threads=False,
use_pandas_metadata=True,
**read_kwargs,
try:
with _open_input_files(
[path],
fs=fs,
precache_options=precache_options,
**open_file_options,
)[0] as fil:
if row_groups == [None]:
return pq.ParquetFile(fil, **pre_buffer).read(
columns=columns,
use_threads=False,
use_pandas_metadata=True,
**read_kwargs,
)
else:
return pq.ParquetFile(fil, **pre_buffer).read_row_groups(
row_groups,
columns=columns,
use_threads=False,
use_pandas_metadata=True,
**read_kwargs,
)
except Exception as err:
if open_file_options.get("open_file_func", None):
raise type(err)(
f"Failed to open and read Parquet file.\n"
f"{_s3_note}\n"
f"Original Error: {err}"
)
raise err


def _get_rg_statistics(row_group, col_indices):
Expand Down Expand Up @@ -314,6 +354,99 @@ class ArrowDatasetEngine(Engine):
# Public Class Methods
#

@classmethod
def extract_filesystem(
cls,
urlpath,
dataset_options,
open_file_options,
storage_options,
):

# Check if fs was already specified as a dataset option
filesystem = dataset_options.pop("filesystem", None)
if filesystem is None:
if isinstance(urlpath, (list, tuple, set)):
if not urlpath:
raise ValueError("empty urlpath sequence")
strpath = stringify_path(next(iter(urlpath)))
else:
strpath = stringify_path(urlpath)
if (
strpath.startswith("s3://")
and not open_file_options
and not (
# Only use PyArrow by default when storage is in s3,
# and `storage_option` only includes simple options
# # that are "expected" by `S3FileSystem`
set(storage_options)
- {
"access_key",
"secret_key",
"session_token",
"anonymous",
"role_arn",
"session_name",
"external_id",
"load_frequency",
"region",
"request_timeout",
"connect_timeout",
"scheme",
"endpoint_override",
"background_writes",
"default_metadata",
"proxy_options",
"allow_bucket_creation",
"allow_bucket_deletion",
"retry_strategy",
}
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
)
):
filesystem = "arrow"
else:
filesystem = "fsspec"

if isinstance(filesystem, pa_fs.FileSystem) or filesystem == "arrow":
if isinstance(urlpath, (list, tuple, set)):
if not urlpath:
raise ValueError("empty urlpath sequence")
urlpath = [stringify_path(u) for u in urlpath]
else:
urlpath = [stringify_path(urlpath)]

if filesystem == "arrow":
try:
filesystem = type(pa_fs.FileSystem.from_uri(urlpath[0])[0])(
**(storage_options or {})
)
except (TypeError, pa.lib.ArrowInvalid) as err:
# Try falling back to fsspec
warnings.warn(
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
f"Failed to initialize a pyarrow-based `FileSystem` object. "
f"Falling back to `fsspec`.\n"
f"{_s3_note}\n"
f"Original Error: {err}"
)
filesystem = "fsspec"

if filesystem != "fsspec":
fs = ArrowFSWrapper(filesystem)
paths = expand_paths_if_needed(urlpath, "rb", 1, fs, None)
return (
fs,
[fs._strip_protocol(u) for u in paths],
dataset_options,
{"open_file_func": filesystem.open_input_file},
)

return Engine.extract_filesystem(
urlpath,
dataset_options,
open_file_options,
storage_options,
)

@classmethod
def read_metadata(
cls,
Expand Down Expand Up @@ -534,7 +667,7 @@ def initialize_write(
metadata_file_exists = False
if append:
# Extract metadata and get file offset if appending
ds = pa_ds.dataset(path, filesystem=fs, format="parquet")
ds = _with_wrapped_fs(pa_ds.dataset, path, filesystem=fs, format="parquet")
i_offset = len(ds.files)
if i_offset > 0:
try:
Expand Down Expand Up @@ -782,12 +915,8 @@ def _collect_dataset_info(
ds = None
valid_paths = None # Only used if `paths` is a list containing _metadata

# Extract "supported" key-word arguments from `kwargs`
(
_dataset_kwargs,
read_kwargs,
user_kwargs,
) = _split_user_options(**kwargs)
# Extract dataset-specific options
_dataset_kwargs = kwargs.pop("dataset", {})

if "partitioning" not in _dataset_kwargs:
_dataset_kwargs["partitioning"] = "hive"
Expand All @@ -807,7 +936,8 @@ def _collect_dataset_info(
meta_path = fs.sep.join([paths, "_metadata"])
if not ignore_metadata_file and fs.exists(meta_path):
# Use _metadata file
ds = pa_ds.parquet_dataset(
ds = _with_wrapped_fs(
pa_ds.parquet_dataset,
meta_path,
filesystem=fs,
**_dataset_kwargs,
Expand Down Expand Up @@ -835,7 +965,8 @@ def _collect_dataset_info(
# Pyarrow cannot handle "_metadata" when `paths` is a list
# Use _metadata file
if not ignore_metadata_file:
ds = pa_ds.parquet_dataset(
ds = _with_wrapped_fs(
pa_ds.parquet_dataset,
meta_path,
filesystem=fs,
**_dataset_kwargs,
Expand All @@ -849,7 +980,8 @@ def _collect_dataset_info(

# Final "catch-all" pyarrow.dataset call
if ds is None:
ds = pa_ds.dataset(
ds = _with_wrapped_fs(
pa_ds.dataset,
paths,
filesystem=fs,
**_dataset_kwargs,
Expand Down Expand Up @@ -952,8 +1084,7 @@ def _collect_dataset_info(
"metadata_task_size": metadata_task_size,
"kwargs": {
"dataset": _dataset_kwargs,
"read": read_kwargs,
**user_kwargs,
**kwargs,
},
}

Expand Down Expand Up @@ -1282,7 +1413,8 @@ def _collect_file_parts(

# Need more information - convert the path to a fragment
file_frags = list(
pa_ds.dataset(
_with_wrapped_fs(
pa_ds.dataset,
files_or_frags,
filesystem=fs,
**dataset_options,
Expand Down Expand Up @@ -1475,7 +1607,8 @@ def _read_table(
# We are filtering with "pyarrow-dataset".
# Need to convert the path and row-group IDs
# to a single "fragment" to read
ds = pa_ds.dataset(
ds = _with_wrapped_fs(
pa_ds.dataset,
path_or_frag,
filesystem=fs,
**kwargs.get("dataset", {}),
Expand Down
Loading