Skip to content

Commit

Permalink
ARROW-7839: [Python][Dataset] Expose IPC format in python bindings
Browse files Browse the repository at this point in the history
Closes #6409 from jorisvandenbossche/ARROW-7839-dataset-ipc and squashes the following commits:

3ceffe5 <Joris Van den Bossche> ARROW-7839:  Expose IPC format in python bindings

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
  • Loading branch information
jorisvandenbossche authored and kszucs committed Feb 18, 2020
1 parent d014bc6 commit 01190ab
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 7 deletions.
12 changes: 10 additions & 2 deletions python/pyarrow/_dataset.pyx
Expand Up @@ -61,6 +61,8 @@ cdef class FileFormat:
typ = frombytes(sp.get().type_name())
if typ == 'parquet':
self = ParquetFileFormat.__new__(ParquetFileFormat)
if typ == 'ipc':
self = IpcFileFormat.__new__(IpcFileFormat)
else:
raise TypeError(typ)

Expand All @@ -77,6 +79,12 @@ cdef class ParquetFileFormat(FileFormat):
self.init(shared_ptr[CFileFormat](new CParquetFileFormat()))


cdef class IpcFileFormat(FileFormat):

def __init__(self):
self.init(shared_ptr[CFileFormat](new CIpcFileFormat()))


cdef class Partitioning:

cdef:
Expand Down Expand Up @@ -479,7 +487,7 @@ cdef class FileSystemSourceFactory(SourceFactory):
paths_or_selector: pyarrow.fs.Selector or list of path-likes
Either a Selector object or a list of path-like objects.
format : FileFormat
Currently only ParquetFileFormat is supported.
Currently only ParquetFileFormat and IpcFileFormat are supported.
options : FileSystemFactoryOptions, optional
Various flags influencing the discovery of filesystem paths.
"""
Expand Down Expand Up @@ -629,7 +637,7 @@ cdef class FileSystemSource(Source):
The top-level partition of the DataSource.
file_format : FileFormat
File format to create fragments from, currently only
ParquetFileFormat is supported.
ParquetFileFormat and IpcFileFormat are supported.
filesystem : FileSystem
The filesystem which files are from.
paths_or_selector : Union[FileSelector, List[FileStats]]
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/dataset.py
Expand Up @@ -36,6 +36,7 @@
FileSystemFactoryOptions,
HivePartitioning,
InExpression,
IpcFileFormat,
IsValidExpression,
NotExpression,
OrExpression,
Expand Down Expand Up @@ -210,6 +211,8 @@ def _ensure_format(obj):
return obj
elif obj == "parquet":
return ParquetFileFormat()
elif obj == "ipc":
return IpcFileFormat()
else:
raise ValueError("format '{}' is not supported".format(obj))

Expand Down
9 changes: 4 additions & 5 deletions python/pyarrow/includes/libarrow_dataset.pxd
Expand Up @@ -260,11 +260,6 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
shared_ptr[CFileFormat] format()
shared_ptr[CScanOptions] scan_options()

cdef cppclass CParquetFragment "arrow::dataset::ParquetFragment"(
CFileFragment):
CParquetFragment(const CFileSource& source,
shared_ptr[CScanOptions] options)

cdef cppclass CFileSystemSource \
"arrow::dataset::FileSystemSource"(CSource):
@staticmethod
Expand Down Expand Up @@ -297,6 +292,10 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
shared_ptr[CScanOptions] options)
c_bool splittable()

cdef cppclass CIpcFileFormat "arrow::dataset::IpcFileFormat"(
CFileFormat):
pass

cdef cppclass CPartitioning "arrow::dataset::Partitioning":
c_string type_name() const
CResult[shared_ptr[CExpression]] Parse(const c_string& path) const
Expand Down
19 changes: 19 additions & 0 deletions python/pyarrow/tests/test_dataset.py
Expand Up @@ -767,3 +767,22 @@ def test_multiple_sources_with_selectors(multisourcefs):
('year', pa.int32())
])
assert dataset.schema.equals(expected_schema, check_metadata=False)


def test_ipc_format(tempdir):
table = pa.table({'a': pa.array([1, 2, 3], type="int8"),
'b': pa.array([.1, .2, .3], type="float64")})

path = str(tempdir / 'test.arrow')
with pa.output_stream(path) as sink:
writer = pa.RecordBatchFileWriter(sink, table.schema)
writer.write_batch(table.to_batches()[0])
writer.close()

dataset = ds.dataset(path, format=ds.IpcFileFormat())
result = dataset.to_table()
assert result.equals(table)

dataset = ds.dataset(path, format="ipc")
result = dataset.to_table()
assert result.equals(table)

0 comments on commit 01190ab

Please sign in to comment.