Permalink
Browse files

ENH: Refactoring to support multiple Parquet readers, add PyArrow rea…

…der (#2223)

* Refactor Parquet read path to support multiple implementations. Add
PyArrow-based implementation. Disable tests that don't yet work

* Remove superfluous global

* Comment typo

* Use functional style for Parquet read functions. Add failing test for #2177

* Fix renamed module

* Code review comments

* Add pyarrow to conda dependencies

* Code reviews

* Add documentation about Parquet engines in dataframe-performance.rst
  • Loading branch information...
wesm authored and mrocklin committed Apr 28, 2017
1 parent 0298d40 commit 68f9e417924a985c1f2e2a587126833c70a2e9f4
@@ -44,6 +44,7 @@ conda install -q -c conda-forge \
ipython \
partd \
psutil \
+ pyarrow \
pytables \
pytest \
scikit-image \
@@ -22,53 +22,27 @@
fastparquet = False
default_encoding = None
+try:
+ import pyarrow.parquet as pyarrow_parquet
+except:
+ pyarrow_parquet = False
-def read_parquet(path, columns=None, filters=None, categories=None, index=None,
- storage_options=None):
- """
- Read ParquetFile into a Dask DataFrame
- This reads a directory of Parquet data into a Dask.dataframe, one file per
- partition. It selects the index among the sorted columns if any exist.
+def _meta_from_dtypes(to_read_columns, file_columns, file_dtypes):
+ meta = pd.DataFrame({c: pd.Series([], dtype=d)
+ for (c, d) in file_dtypes.items()},
+ columns=[c for c in file_columns
+ if c in file_dtypes])
+ return meta[list(to_read_columns)]
- This uses the fastparquet project: http://fastparquet.readthedocs.io/en/latest
+# ----------------------------------------------------------------------
+# Fastparquet interface
- Parameters
- ----------
- path : string
- Source directory for data. May be a glob string.
- Prepend with protocol like ``s3://`` or ``hdfs://`` for remote data.
- columns: list or None
- List of column names to load
- filters: list
- List of filters to apply, like ``[('x', '>' 0), ...]``
- index: string or None (default) or False
- Name of index column to use if that column is sorted;
- False to force dask to not use any column as the index
- categories: list, dict or None
- For any fields listed here, if the parquet encoding is Dictionary,
- the column will be created with dtype category. Use only if it is
- guaranteed that the column is encoded as dictionary in all row-groups.
- If a list, assumes up to 2**16-1 labels; if a dict, specify the number
- of labels expected; if None, will load categories automatically for
- data written by dask/fastparquet, not otherwise.
- storage_options : dict
- Key/value pairs to be passed on to the file-system backend, if any.
- Examples
- --------
- >>> df = read_parquet('s3://bucket/my-parquet-data') # doctest: +SKIP
-
- See Also
- --------
- to_parquet
- """
- if fastparquet is False:
- raise ImportError("fastparquet not installed")
+def _read_fastparquet(fs, paths, myopen, columns=None, filters=None,
+ categories=None, index=None, storage_options=None):
if filters is None:
filters = []
- fs, paths, myopen = get_fs_paths_myopen(path, None, 'rb',
- **(storage_options or {}))
if isinstance(columns, list):
columns = tuple(columns)
@@ -125,10 +99,7 @@ def read_parquet(path, columns=None, filters=None, categories=None, index=None,
categories = pf.categories
dtypes = pf._dtypes(categories)
- meta = pd.DataFrame({c: pd.Series([], dtype=d)
- for (c, d) in dtypes.items()},
- columns=[c for c in pf.columns if c in dtypes])
- meta = meta[list(all_columns)]
+ meta = _meta_from_dtypes(all_columns, pf.columns, dtypes)
for cat in categories:
meta[cat] = pd.Series(pd.Categorical([],
@@ -174,11 +145,155 @@ def _read_parquet_row_group(open, fn, index, columns, rg, series, categories,
return df
+# ----------------------------------------------------------------------
+# PyArrow interface
+
+def _read_pyarrow(fs, paths, file_opener, columns=None, filters=None,
+ categories=None, index=None):
+ api = pyarrow_parquet
+
+ if filters is not None:
+ raise NotImplemented("Predicate pushdown not implemented")
+
+ if categories is not None:
+ raise NotImplemented("Categorical reads not yet implemented")
+
+ if isinstance(columns, tuple):
+ columns = list(columns)
+
+ dataset = api.ParquetDataset(paths)
+ schema = dataset.schema.to_arrow_schema()
+ task_name = 'read-parquet-' + tokenize(dataset, columns)
+
+ if columns is None:
+ all_columns = schema.names
+ else:
+ all_columns = columns
+
+ if not isinstance(all_columns, list):
+ out_type = Series
+ all_columns = [all_columns]
+ else:
+ out_type = DataFrame
+
+ divisions = (None,) * (len(dataset.pieces) + 1)
+
+ dtypes = _get_pyarrow_dtypes(schema)
+
+ meta = _meta_from_dtypes(all_columns, schema.names, dtypes)
+
+ task_plan = {
+ (task_name, i): (_read_arrow_parquet_piece,
+ file_opener,
+ piece, all_columns,
+ out_type == Series,
+ dataset.partitions)
+ for i, piece in enumerate(dataset.pieces)
+ }
+
+ return out_type(task_plan, task_name, meta, divisions)
+
+
+def _get_pyarrow_dtypes(schema):
+ dtypes = {}
+ for i in range(len(schema)):
+ field = schema[i]
+ numpy_dtype = field.type.to_pandas_dtype()
+ dtypes[field.name] = numpy_dtype
+
+ return dtypes
+
+
+def _read_arrow_parquet_piece(open_file_func, piece, columns, is_series,
+ partitions):
+ with open_file_func(piece.path, mode='rb') as f:
+ table = piece.read(columns=columns, partitions=partitions,
+ file=f)
+ df = table.to_pandas()
+
+ if is_series:
+ return df[df.columns[0]]
+ else:
+ return df
+
+
+# ----------------------------------------------------------------------
+# User read API
+
+def read_parquet(path, columns=None, filters=None, categories=None, index=None,
+ storage_options=None, engine='auto'):
+ """
+ Read ParquetFile into a Dask DataFrame
+
+ This reads a directory of Parquet data into a Dask.dataframe, one file per
+ partition. It selects the index among the sorted columns if any exist.
+
+ Parameters
+ ----------
+ path : string
+ Source directory for data. May be a glob string.
+ Prepend with protocol like ``s3://`` or ``hdfs://`` for remote data.
+ columns: list or None
+ List of column names to load
+ filters: list
+ List of filters to apply, like ``[('x', '>' 0), ...]``
+ index: string or None (default) or False
+ Name of index column to use if that column is sorted;
+ False to force dask to not use any column as the index
+ categories: list, dict or None
+ For any fields listed here, if the parquet encoding is Dictionary,
+ the column will be created with dtype category. Use only if it is
+ guaranteed that the column is encoded as dictionary in all row-groups.
+ If a list, assumes up to 2**16-1 labels; if a dict, specify the number
+ of labels expected; if None, will load categories automatically for
+ data written by dask/fastparquet, not otherwise.
+ storage_options : dict
+ Key/value pairs to be passed on to the file-system backend, if any.
+ engine : {'auto', 'fastparquet', 'arrow'}, default 'auto'
+ Parquet reader library to use. If only one library is installed, it
+ will use that one; if both, it will use 'fastparquet'
+
+ Examples
+ --------
+ >>> df = read_parquet('s3://bucket/my-parquet-data') # doctest: +SKIP
+
+ See Also
+ --------
+ to_parquet
+ """
+ fs, paths, file_opener = get_fs_paths_myopen(path, None, 'rb',
+ **(storage_options or {}))
+
+ if engine == 'auto':
+ if fastparquet:
+ engine = 'fastparquet'
+ elif pyarrow_parquet:
+ engine = 'arrow'
+ else:
+ raise ImportError("Please install either fastparquet or pyarrow")
+ elif engine == 'fastparquet':
+ if not fastparquet:
+ raise ImportError("fastparquet not installed")
+ elif engine == 'arrow':
+ if not pyarrow_parquet:
+ raise ImportError("pyarrow not installed")
+ else:
+ raise ValueError('Unsupported engine type: {0}'.format(engine))
+
+ if engine == 'fastparquet':
+ return _read_fastparquet(fs, paths, file_opener, columns=columns,
+ filters=filters,
+ categories=categories, index=index)
+ else:
+ return _read_pyarrow(fs, paths, file_opener, columns=columns,
+ filters=filters,
+ categories=categories, index=index)
+
+
def to_parquet(path, df, compression=None, write_index=None, has_nulls=None,
fixed_text=None, object_encoding=None, storage_options=None,
append=False, ignore_divisions=False):
- """
- Store Dask.dataframe to Parquet files
+ """Store Dask.dataframe to Parquet files
Notes
-----
@@ -212,14 +327,15 @@ def to_parquet(path, df, compression=None, write_index=None, has_nulls=None,
storage_options : dict
Key/value pairs to be passed on to the file-system backend, if any.
append: bool (False)
- If False, construct data-set from scratch; if True, add new row-group(s)
- to existing data-set. In the latter case, the data-set must exist,
- and the schema must match the input data.
+ If False, construct data-set from scratch; if True, add new
+ row-group(s) to existing data-set. In the latter case, the data-set
+ must exist, and the schema must match the input data.
ignore_divisions: bool (False)
If False raises error when previous divisions overlap with the new
appended divisions. Ignored if append=False.
- This uses the fastparquet project: http://fastparquet.readthedocs.io/en/latest
+ This uses the fastparquet project:
+ http://fastparquet.readthedocs.io/en/latest
Examples
--------
@@ -229,9 +345,9 @@ def to_parquet(path, df, compression=None, write_index=None, has_nulls=None,
See Also
--------
read_parquet: Read parquet data to dask.dataframe
+
"""
- if fastparquet is False:
- raise ImportError("fastparquet not installed")
+ import fastparquet
fs, paths, myopen = get_fs_paths_myopen(path, None, 'wb',
**(storage_options or {}))
@@ -322,5 +438,11 @@ def normalize_ParquetFile(pf):
return (type(pf), pf.fn, pf.sep) + normalize_token(pf.open)
+if pyarrow_parquet:
+ @partial(normalize_token.register, pyarrow_parquet.ParquetDataset)
+ def normalize_PyArrowParquetDataset(ds):
+ return (type(ds), ds.paths)
+
+
if PY3:
DataFrame.to_parquet.__doc__ = to_parquet.__doc__
Oops, something went wrong.

0 comments on commit 68f9e41

Please sign in to comment.