Skip to content

Commit

Permalink
ARROW-8427: [C++][Dataset] Only apply ignore_prefixes to selector res…
Browse files Browse the repository at this point in the history
…ults

Closes #6915 from bkietz/8427-Do-not-ignore-file-paths-

Lead-authored-by: Benjamin Kietzman <bengilgit@gmail.com>
Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
Signed-off-by: François Saint-Jacques <fsaintjacques@gmail.com>
  • Loading branch information
2 people authored and fsaintjacques committed Apr 14, 2020
1 parent 0c214cb commit 1ab8997
Show file tree
Hide file tree
Showing 8 changed files with 123 additions and 75 deletions.
2 changes: 1 addition & 1 deletion cpp/src/arrow/dataset/dataset_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ TEST_F(TestEndToEnd, EndToEndSingleDataset) {
// FileSystemFactoryOptions configuration class. See the docstring for more
// information.
FileSystemFactoryOptions options;
options.ignore_prefixes = {"."};
options.selector_ignore_prefixes = {"."};

// Partitions expressions can be discovered for Dataset and Fragments.
// This metadata is then used in conjuction with the query filter to apply
Expand Down
78 changes: 33 additions & 45 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,43 +115,11 @@ bool StartsWithAnyOf(const std::vector<std::string>& prefixes, const std::string
return false;
}

auto dir_base = fs::internal::GetAbstractPathParent(path);
util::string_view basename{dir_base.second};
auto basename = fs::internal::GetAbstractPathParent(path).second;

auto matches_prefix = [&basename](const std::string& prefix) -> bool {
return !prefix.empty() && basename.starts_with(prefix);
};

return std::any_of(prefixes.cbegin(), prefixes.cend(), matches_prefix);
}

Result<fs::PathForest> FileSystemDatasetFactory::Filter(
const std::shared_ptr<fs::FileSystem>& filesystem,
const std::shared_ptr<FileFormat>& format, const FileSystemFactoryOptions& options,
fs::PathForest forest) {
std::vector<fs::FileInfo> out;

auto& infos = forest.infos();
RETURN_NOT_OK(forest.Visit([&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune {
const auto& path = ref.info().path();

if (StartsWithAnyOf(options.ignore_prefixes, path)) {
return fs::PathForest::Prune;
}

if (ref.info().IsFile() && options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
format->IsSupported(FileSource(path, filesystem.get())));
if (!supported) {
return fs::PathForest::Continue;
}
}

out.push_back(std::move(infos[ref.i]));
return fs::PathForest::Continue;
}));

return fs::PathForest::MakeFromPreSorted(std::move(out));
return std::any_of(prefixes.cbegin(), prefixes.cend(), [&](util::string_view prefix) {
return util::string_view(basename).starts_with(prefix);
});
}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
Expand All @@ -176,32 +144,52 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
}));

files = std::move(forest).infos();
std::move(missing.begin(), missing.end(), std::back_inserter(files));
files.resize(files.size() + missing.size());
std::move(missing.begin(), missing.end(), files.end() - missing.size());

ARROW_ASSIGN_OR_RAISE(forest, fs::PathForest::Make(std::move(files)));

ARROW_ASSIGN_OR_RAISE(forest, Filter(filesystem, format, options, std::move(forest)));

return std::shared_ptr<DatasetFactory>(new FileSystemDatasetFactory(
std::move(filesystem), std::move(forest), std::move(format), std::move(options)));
}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::shared_ptr<fs::FileSystem> filesystem, fs::FileSelector selector,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options) {
ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));

ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));

ARROW_ASSIGN_OR_RAISE(forest, Filter(filesystem, format, options, std::move(forest)));

// By automatically setting the options base_dir to the selector's base_dir,
// we provide a better experience for user providing Partitioning that are
// relative to the base_dir instead of the full path.
if (options.partition_base_dir.empty() && !selector.base_dir.empty()) {
options.partition_base_dir = selector.base_dir;
}

ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));
ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));

std::vector<fs::FileInfo> filtered_files;

RETURN_NOT_OK(forest.Visit([&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune {
const auto& path = ref.info().path();

if (StartsWithAnyOf(options.selector_ignore_prefixes, path)) {
return fs::PathForest::Prune;
}

if (ref.info().IsFile() && options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
format->IsSupported(FileSource(path, filesystem.get())));
if (!supported) {
return fs::PathForest::Continue;
}
}

filtered_files.push_back(std::move(forest.infos()[ref.i]));
return fs::PathForest::Continue;
}));

ARROW_ASSIGN_OR_RAISE(forest,
fs::PathForest::MakeFromPreSorted(std::move(filtered_files)));

return std::shared_ptr<DatasetFactory>(new FileSystemDatasetFactory(
filesystem, std::move(forest), std::move(format), std::move(options)));
}
Expand Down
17 changes: 7 additions & 10 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,18 @@ struct FileSystemFactoryOptions {
// (resulting in an error at scan time).
bool exclude_invalid_files = false;

// Files matching one of the following prefix will be ignored by the
// discovery process. This is matched to the basename of a path.
// When discovering from a Selector (and not from an explicit file list), ignore
// files and directories matching any of these prefixes.
//
// Example:
// ignore_prefixes = {"_", ".DS_STORE" };
// Example (with selector = "/dataset/**"):
// selector_ignore_prefixes = {"_", ".DS_STORE" };
//
// - "/dataset/data.csv" -> not ignored
// - "/dataset/_metadata" -> ignored
// - "/dataset/.DS_STORE" -> ignored
std::vector<std::string> ignore_prefixes = {
// - "/dataset/_hidden/dat" -> ignored
// - "/dataset/nested/.DS_STORE" -> ignored
std::vector<std::string> selector_ignore_prefixes = {
".",
"_",
};
Expand Down Expand Up @@ -222,11 +224,6 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
fs::PathForest forest, std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options);

static Result<fs::PathForest> Filter(const std::shared_ptr<fs::FileSystem>& filesystem,
const std::shared_ptr<FileFormat>& format,
const FileSystemFactoryOptions& options,
fs::PathForest forest);

Result<std::shared_ptr<Schema>> PartitionSchema();

std::shared_ptr<fs::FileSystem> fs_;
Expand Down
48 changes: 43 additions & 5 deletions cpp/src/arrow/dataset/discovery_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -229,28 +229,66 @@ TEST_F(FileSystemDatasetFactoryTest, MissingDirectories) {
}

TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredDefaultPrefixes) {
selector_.recursive = true;
MakeFactory({
fs::File("."),
fs::File("_"),
fs::File("_$folder$"),
fs::File("_$folder$/dat"),
fs::File("_SUCCESS"),
fs::File("not_ignored_by_default"),
fs::File("not_ignored_by_default_either/dat"),
});

AssertFinishWithPaths({"not_ignored_by_default"});
AssertFinishWithPaths({"not_ignored_by_default", "not_ignored_by_default_either/dat"});
}

TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredDefaultExplicitFiles) {
selector_.recursive = true;
std::vector<fs::FileInfo> ignored_by_default = {
fs::File(".ignored_by_default.parquet"),
fs::File("_ignored_by_default.csv"),
fs::File("_$folder$/ignored_by_default.arrow"),
};
MakeFileSystem(ignored_by_default);

std::vector<std::string> paths;
for (const auto& info : ignored_by_default) paths.push_back(info.path());
ASSERT_OK_AND_ASSIGN(
factory_, FileSystemDatasetFactory::Make(fs_, paths, format_, factory_options_));

AssertFinishWithPaths(paths);
}

TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredCustomPrefixes) {
factory_options_.ignore_prefixes = {"not_ignored"};
selector_.recursive = true;
factory_options_.selector_ignore_prefixes = {"not_ignored"};
MakeFactory({
fs::File("."),
fs::File("_"),
fs::File("_$folder$/dat"),
fs::File("_SUCCESS"),
fs::File("not_ignored_by_default"),
fs::File("not_ignored_by_default_either/dat"),
});

AssertFinishWithPaths({".", "_", "_$folder$/dat", "_SUCCESS"});
}

TEST_F(FileSystemDatasetFactoryTest, OptionsIgnoredNoPrefixes) {
// ignore nothing
selector_.recursive = true;
factory_options_.selector_ignore_prefixes = {};
MakeFactory({
fs::File("."),
fs::File("_"),
fs::File("_$folder$"),
fs::File("_$folder$/dat"),
fs::File("_SUCCESS"),
fs::File("not_ignored_by_default"),
fs::File("not_ignored_by_default_either/dat"),
});

AssertFinishWithPaths({".", "_", "_$folder$", "_SUCCESS"});
AssertFinishWithPaths({".", "_", "_$folder$/dat", "_SUCCESS", "not_ignored_by_default",
"not_ignored_by_default_either/dat"});
}

TEST_F(FileSystemDatasetFactoryTest, Inspect) {
Expand Down
22 changes: 11 additions & 11 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -1082,9 +1082,9 @@ cdef class FileSystemFactoryOptions:
fashion. Disabling this feature will skip the IO, but unsupported
files may be present in the Dataset (resulting in an error at scan
time).
ignore_prefixes : list, optional
Files matching one of those prefixes will be ignored by the
discovery process. This is matched to the basename of a path.
selector_ignore_prefixes : list, optional
When discovering from a Selector (and not from an explicit file list),
ignore files and directories matching any of these prefixes.
By default this is ['.', '_'].
"""

Expand All @@ -1094,13 +1094,13 @@ cdef class FileSystemFactoryOptions:
__slots__ = () # avoid mistakingly creating attributes

def __init__(self, partition_base_dir=None, exclude_invalid_files=None,
list ignore_prefixes=None):
list selector_ignore_prefixes=None):
if partition_base_dir is not None:
self.partition_base_dir = partition_base_dir
if exclude_invalid_files is not None:
self.exclude_invalid_files = exclude_invalid_files
if ignore_prefixes is not None:
self.ignore_prefixes = ignore_prefixes
if selector_ignore_prefixes is not None:
self.selector_ignore_prefixes = selector_ignore_prefixes

cdef inline CFileSystemFactoryOptions unwrap(self):
return self.options
Expand Down Expand Up @@ -1157,16 +1157,16 @@ cdef class FileSystemFactoryOptions:
self.options.exclude_invalid_files = value

@property
def ignore_prefixes(self):
def selector_ignore_prefixes(self):
"""
List of prefixes. Files matching one of those prefixes will be
ignored by the discovery process.
"""
return [frombytes(p) for p in self.options.ignore_prefixes]
return [frombytes(p) for p in self.options.selector_ignore_prefixes]

@ignore_prefixes.setter
def ignore_prefixes(self, values):
self.options.ignore_prefixes = [tobytes(v) for v in values]
@selector_ignore_prefixes.setter
def selector_ignore_prefixes(self, values):
self.options.selector_ignore_prefixes = [tobytes(v) for v in values]


cdef class FileSystemDatasetFactory(DatasetFactory):
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
CPartitioningOrFactory partitioning
c_string partition_base_dir
c_bool exclude_invalid_files
vector[c_string] ignore_prefixes
vector[c_string] selector_ignore_prefixes

cdef cppclass CFileSystemDatasetFactory \
"arrow::dataset::FileSystemDatasetFactory"(
Expand Down
2 changes: 1 addition & 1 deletion python/pyarrow/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ def test_filesystem_factory(mockfs, paths_or_selector):
])
)
assert options.partition_base_dir == 'subdir'
assert options.ignore_prefixes == ['.', '_']
assert options.selector_ignore_prefixes == ['.', '_']
assert options.exclude_invalid_files is False

factory = ds.FileSystemDatasetFactory(
Expand Down
27 changes: 26 additions & 1 deletion python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2488,7 +2488,13 @@ def _assert_dataset_paths(dataset, paths, use_legacy_dataset):
assert set(map(str, paths)) == {x.path for x in dataset.pieces}
else:
paths = [str(path.as_posix()) for path in paths]
assert set(paths) == set(dataset._dataset.files)
if hasattr(dataset._dataset, 'files'):
assert set(paths) == set(dataset._dataset.files)
else:
# UnionDataset
# TODO(temp hack) remove this branch once ARROW-7965 is in (which
# will change this to a FileSystemDataset)
assert dataset.read().num_rows == 50


@pytest.mark.pandas
Expand Down Expand Up @@ -2549,6 +2555,25 @@ def test_ignore_hidden_files_underscore(tempdir, use_legacy_dataset):
_assert_dataset_paths(dataset, paths, use_legacy_dataset)


@pytest.mark.pandas
@parametrize_legacy_dataset
@pytest.mark.parametrize('dir_prefix', ['_', '.'])
def test_ignore_no_private_directories_path_list(
tempdir, dir_prefix, use_legacy_dataset
):
# ARROW-8427 - don't ignore explicitly listed files if parent directory
# is a private directory
dirpath = tempdir / "{0}data".format(dir_prefix) / guid()
dirpath.mkdir(parents=True)

paths = _make_example_multifile_dataset(dirpath, nfiles=10,
file_nrows=5)

dataset = pq.ParquetDataset(paths, use_legacy_dataset=use_legacy_dataset)

_assert_dataset_paths(dataset, paths, use_legacy_dataset)


@pytest.mark.pandas
@parametrize_legacy_dataset
def test_multiindex_duplicate_values(tempdir, use_legacy_dataset):
Expand Down

0 comments on commit 1ab8997

Please sign in to comment.