Skip to content

Commit

Permalink
ARROW-8220: [Python] Make dataset FileFormat objects serializable
Browse files Browse the repository at this point in the history
Also did some refactoring for a more pleasant user API.

Closes #6720 from kszucs/ARROW-8220

Authored-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
Signed-off-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
  • Loading branch information
kszucs committed Mar 30, 2020
1 parent 5bf36bd commit 6be085f
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 42 deletions.
3 changes: 3 additions & 0 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {

bool splittable() const override { return true; }

// Note: the default values are exposed in the python bindings and documented
// in the docstrings, if any of the default values gets changed please
// update there as well.
struct ReaderOptions {
/// \defgroup parquet-file-format-reader-properties properties which correspond to
/// members of parquet::ReaderProperties.
Expand Down
127 changes: 86 additions & 41 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,12 @@ cdef class FileFormat:
partition_expression.unwrap()))
return Fragment.wrap(<shared_ptr[CFragment]> move(c_fragment))

def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return False


cdef class Fragment:
"""Fragment of data from a Dataset."""
Expand Down Expand Up @@ -591,62 +597,95 @@ cdef class ParquetFileFragment(FileFragment):
yield Fragment.wrap(c_fragment)


cdef class ParquetFileFormatReaderOptions:
cdef:
CParquetFileFormatReaderOptions* options

def __init__(self, ParquetFileFormat fmt):
self.options = &fmt.parquet_format.reader_options

@property
def use_buffered_stream(self):
"""Read files through buffered input streams rather than
loading entire row groups at once. This may be enabled to
reduce memory overhead. Disabled by default."""
return self.options.use_buffered_stream

@use_buffered_stream.setter
def use_buffered_stream(self, bint value):
self.options.use_buffered_stream = value

@property
def buffer_size(self):
"""Size of buffered stream, if enabled. Default is 8KB."""
return self.options.buffer_size
cdef class ParquetReadOptions:
"""
Parquet format specific options for reading.
@buffer_size.setter
def buffer_size(self, int value):
self.options.buffer_size = value
Parameters
----------
use_buffered_stream : bool, default False
Read files through buffered input streams rather than loading entire
row groups at once. This may be enabled to reduce memory overhead.
Disabled by default.
buffer_size : int, default 8192
Size of buffered stream, if enabled. Default is 8KB.
dictionary_columns : list of string, default None
Names of columns which should be read as dictionaries.
"""

@property
def dict_columns(self):
"""Names of columns which should be read as dictionaries."""
return self.options.dict_columns
cdef public:
bint use_buffered_stream
uint32_t buffer_size
set dictionary_columns

def __init__(self, bint use_buffered_stream=False,
uint32_t buffer_size=8192,
dictionary_columns=None):
self.use_buffered_stream = use_buffered_stream
self.buffer_size = buffer_size
self.dictionary_columns = set(dictionary_columns or set())

def equals(self, ParquetReadOptions other):
return (
self.use_buffered_stream == other.use_buffered_stream and
self.buffer_size == other.buffer_size and
self.dictionary_columns == other.dictionary_columns
)

@dict_columns.setter
def dict_columns(self, values):
self.options.dict_columns.clear()
for value in set(values):
self.options.dict_columns.insert(tobytes(value))
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return False


cdef class ParquetFileFormat(FileFormat):

cdef:
CParquetFileFormat* parquet_format

def __init__(self, dict reader_options=dict()):
self.init(<shared_ptr[CFileFormat]> make_shared[CParquetFileFormat]())
for name, value in reader_options.items():
setattr(self.reader_options, name, value)
def __init__(self, read_options=None):
cdef:
shared_ptr[CParquetFileFormat] wrapped
CParquetFileFormatReaderOptions* options

if read_options is None:
read_options = ParquetReadOptions()
elif isinstance(read_options, dict):
read_options = ParquetReadOptions(**read_options)
elif not isinstance(read_options, ParquetReadOptions):
raise TypeError('`read_options` must be either a dictionary or an '
'instance of ParquetReadOptions')

wrapped = make_shared[CParquetFileFormat]()
options = &(wrapped.get().reader_options)
options.use_buffered_stream = read_options.use_buffered_stream
options.buffer_size = read_options.buffer_size
if read_options.dictionary_columns is not None:
for column in read_options.dictionary_columns:
options.dict_columns.insert(tobytes(column))

self.init(<shared_ptr[CFileFormat]> wrapped)

cdef void init(self, const shared_ptr[CFileFormat]& sp):
FileFormat.init(self, sp)
self.parquet_format = <CParquetFileFormat*> self.wrapped.get()
self.parquet_format = <CParquetFileFormat*> sp.get()

@property
def reader_options(self):
return ParquetFileFormatReaderOptions(self)
def read_options(self):
cdef CParquetFileFormatReaderOptions* options
options = &self.parquet_format.reader_options
return ParquetReadOptions(
use_buffered_stream=options.use_buffered_stream,
buffer_size=options.buffer_size,
dictionary_columns={frombytes(col) for col in options.dict_columns}
)

def equals(self, ParquetFileFormat other):
return self.read_options.equals(other.read_options)

def __reduce__(self):
return ParquetFileFormat, (self.read_options,)

def make_fragment(self, str path not None, FileSystem filesystem not None,
Schema schema=None, columns=None, filter=None,
Expand Down Expand Up @@ -685,6 +724,12 @@ cdef class IpcFileFormat(FileFormat):
def __init__(self):
self.init(shared_ptr[CFileFormat](new CIpcFileFormat()))

def equals(self, IpcFileFormat other):
return True

def __reduce__(self):
return IpcFileFormat, tuple()


cdef class Partitioning:

Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
OrExpression,
ParquetFileFormat,
ParquetFileFragment,
ParquetReadOptions,
Partitioning,
PartitioningFactory,
ScalarExpression,
Expand Down
46 changes: 45 additions & 1 deletion python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,48 @@ def test_expression_ergonomics():
field | [1]


def test_parquet_read_options():
opts1 = ds.ParquetReadOptions()
opts2 = ds.ParquetReadOptions(buffer_size=4096,
dictionary_columns=['a', 'b'])
opts3 = ds.ParquetReadOptions(buffer_size=2**13, use_buffered_stream=True,
dictionary_columns={'a', 'b'})

assert opts1.use_buffered_stream is False
assert opts1.buffer_size == 2**13
assert opts1.dictionary_columns == set()

assert opts2.use_buffered_stream is False
assert opts2.buffer_size == 2**12
assert opts2.dictionary_columns == {'a', 'b'}

assert opts3.use_buffered_stream is True
assert opts3.buffer_size == 2**13
assert opts3.dictionary_columns == {'a', 'b'}

assert opts1 == opts1
assert opts1 != opts2
assert opts2 != opts3


def test_file_format_pickling():
formats = [
ds.IpcFileFormat(),
ds.ParquetFileFormat(),
ds.ParquetFileFormat(
read_options=ds.ParquetReadOptions(use_buffered_stream=True)
),
ds.ParquetFileFormat(
read_options={
'use_buffered_stream': True,
'buffer_size': 4096,
}
)
]
for file_format in formats:
assert pickle.loads(pickle.dumps(file_format)) == file_format


@pytest.mark.parametrize('paths_or_selector', [
fs.FileSelector('subdir', recursive=True),
[
Expand All @@ -499,7 +541,9 @@ def test_expression_ergonomics():
]
])
def test_filesystem_factory(mockfs, paths_or_selector):
format = ds.ParquetFileFormat(reader_options=dict(dict_columns={"str"}))
format = ds.ParquetFileFormat(
read_options=ds.ParquetReadOptions(dictionary_columns={"str"})
)

options = ds.FileSystemFactoryOptions('subdir')
options.partitioning = ds.DirectoryPartitioning(
Expand Down

0 comments on commit 6be085f

Please sign in to comment.