Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-8318: [C++][Dataset] Construct FileSystemDataset from fragments #7073

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
145 changes: 63 additions & 82 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,54 +103,51 @@ Result<std::shared_ptr<Dataset>> UnionDatasetFactory::Finish(FinishOptions optio
}

FileSystemDatasetFactory::FileSystemDatasetFactory(
std::shared_ptr<fs::FileSystem> filesystem, fs::PathForest forest,
std::vector<std::string> paths, std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options)
: fs_(std::move(filesystem)),
forest_(std::move(forest)),
: paths_(std::move(paths)),
fs_(std::move(filesystem)),
format_(std::move(format)),
options_(std::move(options)) {}

bool StartsWithAnyOf(const std::vector<std::string>& prefixes, const std::string& path) {
if (prefixes.empty()) {
return false;
}

auto basename = fs::internal::GetAbstractPathParent(path).second;

return std::any_of(prefixes.cbegin(), prefixes.cend(), [&](util::string_view prefix) {
return util::string_view(basename).starts_with(prefix);
});
util::optional<util::string_view> FileSystemDatasetFactory::RemovePartitionBaseDir(
util::string_view path) {
const util::string_view partition_base_dir{options_.partition_base_dir};
return fs::internal::RemoveAncestor(partition_base_dir, path);
}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<std::string>& paths,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options) {
ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(paths));
ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));

std::unordered_set<fs::FileInfo, fs::FileInfo::ByPath> missing;
DCHECK_OK(forest.Visit([&](fs::PathForest::Ref ref) {
util::string_view parent_path = options.partition_base_dir;
if (auto parent = ref.parent()) {
parent_path = parent.info().path();
std::vector<std::string> filtered_paths;
for (const auto& path : paths) {
if (options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
format->IsSupported(FileSource(path, filesystem)));
if (!supported) {
continue;
}
}

for (auto&& path :
fs::internal::AncestorsFromBasePath(parent_path, ref.info().path())) {
ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(std::move(path)));
missing.insert(std::move(file));
}
return Status::OK();
}));
filtered_paths.push_back(path);
}

files = std::move(forest).infos();
files.resize(files.size() + missing.size());
std::move(missing.begin(), missing.end(), files.end() - missing.size());
return std::shared_ptr<DatasetFactory>(
new FileSystemDatasetFactory(std::move(filtered_paths), std::move(filesystem),
std::move(format), std::move(options)));
}

ARROW_ASSIGN_OR_RAISE(forest, fs::PathForest::Make(std::move(files)));
bool StartsWithAnyOf(const std::string& path, const std::vector<std::string>& prefixes) {
if (prefixes.empty()) {
return false;
}

return std::shared_ptr<DatasetFactory>(new FileSystemDatasetFactory(
std::move(filesystem), std::move(forest), std::move(format), std::move(options)));
auto parts = fs::internal::SplitAbstractPath(path);
return std::any_of(parts.cbegin(), parts.cend(), [&](util::string_view part) {
return std::any_of(prefixes.cbegin(), prefixes.cend(), [&](util::string_view prefix) {
return util::string_view(part).starts_with(prefix);
});
});
}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
Expand All @@ -164,50 +161,44 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
}

ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));
ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));

std::vector<fs::FileInfo> filtered_files;

RETURN_NOT_OK(forest.Visit([&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune {
const auto& path = ref.info().path();
std::vector<std::string> paths;
for (const auto& info : files) {
const auto& path = info.path();

if (StartsWithAnyOf(options.selector_ignore_prefixes, path)) {
return fs::PathForest::Prune;
if (!info.IsFile()) {
// TODO(fsaintjacques): push this filtering into Selector logic so we
// don't copy big vector around.
continue;
}

if (ref.info().IsFile() && options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
format->IsSupported(FileSource(path, filesystem.get())));
if (!supported) {
return fs::PathForest::Continue;
}
if (StartsWithAnyOf(path, options.selector_ignore_prefixes)) {
continue;
}

filtered_files.push_back(std::move(forest.infos()[ref.i]));
return fs::PathForest::Continue;
}));
paths.push_back(path);
}

ARROW_ASSIGN_OR_RAISE(forest,
fs::PathForest::MakeFromPreSorted(std::move(filtered_files)));
// Sorting by path guarantees a stability sometimes needed by unit tests.
std::sort(paths.begin(), paths.end());

return std::shared_ptr<DatasetFactory>(new FileSystemDatasetFactory(
filesystem, std::move(forest), std::move(format), std::move(options)));
return Make(std::move(filesystem), std::move(paths), std::move(format),
std::move(options));
}

Result<std::shared_ptr<Schema>> FileSystemDatasetFactory::PartitionSchema() {
if (auto partitioning = options_.partitioning.partitioning()) {
return partitioning->schema();
}

std::vector<util::string_view> paths;
for (const auto& info : forest_.infos()) {
if (auto relative =
fs::internal::RemoveAncestor(options_.partition_base_dir, info.path())) {
paths.push_back(*relative);
std::vector<util::string_view> relative_paths;
for (const auto& path : paths_) {
if (auto relative = RemovePartitionBaseDir(path)) {
relative_paths.push_back(*relative);
}
}

return options_.partitioning.factory()->Inspect(paths);
return options_.partitioning.factory()->Inspect(relative_paths);
}

Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSchemas(
Expand All @@ -216,11 +207,9 @@ Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSc

const bool has_fragments_limit = options.fragments >= 0;
int fragments = options.fragments;
for (const auto& f : forest_.infos()) {
if (!f.IsFile()) continue;
for (const auto& path : paths_) {
if (has_fragments_limit && fragments-- == 0) break;
FileSource src(f.path(), fs_.get());
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(src));
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect({path, fs_}));
schemas.push_back(schema);
}

Expand All @@ -246,32 +235,24 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions
}
}

ExpressionVector partitions(forest_.size(), scalar(true));
std::shared_ptr<Partitioning> partitioning = options_.partitioning.partitioning();
if (partitioning == nullptr) {
auto factory = options_.partitioning.factory();
ARROW_ASSIGN_OR_RAISE(partitioning, factory->Finish(schema));
}

// apply partitioning to forest to derive partitions
auto apply_partitioning = [&](fs::PathForest::Ref ref) {
if (auto relative = fs::internal::RemoveAncestor(options_.partition_base_dir,
ref.info().path())) {
auto segments = fs::internal::SplitAbstractPath(relative->to_string());

if (segments.size() > 0) {
auto segment_index = static_cast<int>(segments.size()) - 1;
auto maybe_partition = partitioning->Parse(segments.back(), segment_index);

partitions[ref.i] = std::move(maybe_partition).ValueOr(scalar(true));
}
std::vector<std::shared_ptr<FileFragment>> fragments;
for (const auto& path : paths_) {
std::shared_ptr<Expression> partition = scalar(true);
if (auto relative = RemovePartitionBaseDir(path)) {
partition = partitioning->Parse(relative->to_string()).ValueOr(scalar(true));
}
return Status::OK();
};

RETURN_NOT_OK(forest_.Visit(apply_partitioning));
return FileSystemDataset::Make(schema, root_partition_, format_, fs_, forest_,
std::move(partitions));
ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_}, partition));
fragments.push_back(fragment);
}

return FileSystemDataset::Make(schema, root_partition_, format_, fragments);
}

} // namespace dataset
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,20 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;

protected:
FileSystemDatasetFactory(std::shared_ptr<fs::FileSystem> filesystem,
fs::PathForest forest, std::shared_ptr<FileFormat> format,
FileSystemDatasetFactory(std::vector<std::string> paths,
std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options);

Result<std::shared_ptr<Schema>> PartitionSchema();

std::vector<std::string> paths_;
std::shared_ptr<fs::FileSystem> fs_;
fs::PathForest forest_;
std::shared_ptr<FileFormat> format_;
FileSystemFactoryOptions options_;

private:
util::optional<util::string_view> RemovePartitionBaseDir(util::string_view path);
};

} // namespace dataset
Expand Down