Skip to content

Commit

Permalink
ARROW-8290: [Python] Improve FileSystemDataset constructor
Browse files Browse the repository at this point in the history
* Handle the required `schema`, `format` and `filesystem` keyword validation manually, instead of letting cython handle it (which gives very cryptic error messages, like complaining about positional arguments if you made a typo in one of the keyword names)
* Move the paths to the first argument, move `root_expression` to the end
* Make `partitions` and `root_expression` optional
* Rename `file_format` to `format`, since this name is what we are using elsewhere (eg in `ds.dataset(..)`

Closes #6913 from jorisvandenbossche/ARROW-8290-FileSystemDataset

Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
  • Loading branch information
jorisvandenbossche authored and kszucs committed Apr 13, 2020
1 parent 32de9fa commit 52a3126
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 18 deletions.
48 changes: 33 additions & 15 deletions python/pyarrow/_dataset.pyx
Expand Up @@ -276,55 +276,73 @@ cdef class FileSystemDataset(Dataset):
Parameters
----------
paths_or_selector : Union[FileSelector, List[FileInfo]]
List of files/directories to consume.
schema : Schema
The top-level schema of the DataDataset.
root_partition : Expression
The top-level partition of the DataDataset.
file_format : FileFormat
format : FileFormat
File format to create fragments from, currently only
ParquetFileFormat and IpcFileFormat are supported.
filesystem : FileSystem
The filesystem which files are from.
paths_or_selector : Union[FileSelector, List[FileInfo]]
List of files/directories to consume.
partitions : List[Expression]
partitions : List[Expression], optional
Attach aditional partition information for the file paths.
root_partition : Expression, optional
The top-level partition of the DataDataset.
"""

cdef:
CFileSystemDataset* filesystem_dataset

def __init__(self, Schema schema not None, Expression root_partition,
FileFormat file_format not None,
FileSystem filesystem not None,
paths_or_selector, partitions):
def __init__(self, paths_or_selector, schema=None, format=None,
filesystem=None, partitions=None, root_partition=None):
cdef:
FileInfo info
Expression expr
vector[CFileInfo] c_file_infos
vector[shared_ptr[CExpression]] c_partitions
CResult[shared_ptr[CDataset]] result

# validate required arguments
for arg, class_, name in [
(schema, Schema, 'schema'),
(format, FileFormat, 'format'),
(filesystem, FileSystem, 'filesystem')
]:
if not isinstance(arg, class_):
raise TypeError(
"Argument '{0}' has incorrect type (expected {1}, "
"got {2})".format(name, class_.__name__, type(arg))
)

for info in filesystem.get_file_info(paths_or_selector):
c_file_infos.push_back(info.unwrap())

if partitions is None:
partitions = [
ScalarExpression(True) for _ in range(c_file_infos.size())]
for expr in partitions:
c_partitions.push_back(expr.unwrap())

if c_file_infos.size() != c_partitions.size():
raise ValueError(
'The number of files resulting from paths_or_selector must be '
'equal to the number of partitions.'
'The number of files resulting from paths_or_selector '
'must be equal to the number of partitions.'
)

if root_partition is None:
root_partition = ScalarExpression(True)
elif not isinstance(root_partition, Expression):
raise TypeError(
"Argument 'root_partition' has incorrect type (expected "
"Expression, got {0})".format(type(root_partition))
)

result = CFileSystemDataset.Make(
pyarrow_unwrap_schema(schema),
root_partition.unwrap(),
file_format.unwrap(),
filesystem.unwrap(),
(<Expression> root_partition).unwrap(),
(<FileFormat> format).unwrap(),
(<FileSystem> filesystem).unwrap(),
c_file_infos,
c_partitions
)
Expand Down
24 changes: 21 additions & 3 deletions python/pyarrow/tests/test_dataset.py
Expand Up @@ -191,15 +191,33 @@ def test_filesystem_dataset(mockfs):
partitions = [ds.ScalarExpression(True), ds.ScalarExpression(True)]

dataset = ds.FileSystemDataset(
schema,
schema=schema,
root_partition=None,
file_format=file_format,
format=file_format,
filesystem=mockfs,
paths_or_selector=paths,
partitions=partitions
)
assert isinstance(dataset.format, ds.ParquetFileFormat)

# the root_partition and partitions keywords have defaults
dataset = ds.FileSystemDataset(
paths, schema, format=file_format, filesystem=mockfs,
)
assert isinstance(dataset.format, ds.ParquetFileFormat)

# validation of required arguments
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(paths, format=file_format, filesystem=mockfs)
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(paths, schema=schema, filesystem=mockfs)
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(paths, schema=schema, format=file_format)
# validation of root_partition
with pytest.raises(TypeError, match="incorrect type"):
ds.FileSystemDataset(paths, schema=schema, format=file_format,
filesystem=mockfs, root_partition=1)

root_partition = ds.ComparisonExpression(
ds.CompareOperator.Equal,
ds.FieldExpression('level'),
Expand All @@ -223,7 +241,7 @@ def test_filesystem_dataset(mockfs):
root_partition=root_partition,
filesystem=mockfs,
partitions=partitions,
file_format=file_format
format=file_format
)
assert dataset.partition_expression.equals(root_partition)
assert set(dataset.files) == set(paths)
Expand Down

0 comments on commit 52a3126

Please sign in to comment.