Skip to content

Commit

Permalink
ARROW-8318: [C++][Dataset] Construct FileSystemDataset from fragments
Browse files Browse the repository at this point in the history
* Simplified FileSystemDataset to hold a FragmentVector. Each
  Fragment must be a FileFragment and is checked at
  `FileSystemDataset::Make`.  Fragments are not required to use the same
  backing filesystem nor the same format.

* Removed `FileSystemDataset::format` and
  `FileSystemDataset::partitions`.

* Since FileInfo is not required by neither FileSystemDataset and
  FileSystemDatasetFactory, it is no possible to create a dataset
  without any IO involved.

* Re-introduced the natural behavior of creating FileFragment with their
  full partition expressions instead of removing the ancestors common
  partitions.

* Added `Expression::IsSatisfiableWith` method.

* Added missing compression cmake options to archery.

* Ensure FileSource holds a shared_ptr<FileSystem> pointer.
  This is required to refactor FileSystemDataset to support Buffer
  FileSource and heterogeneous FileSystems.

* Rename `type` to `id`, following other classes.
  • Loading branch information
fsaintjacques committed May 1, 2020
1 parent 17404e1 commit d2263be
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 365 deletions.
146 changes: 64 additions & 82 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,54 +103,51 @@ Result<std::shared_ptr<Dataset>> UnionDatasetFactory::Finish(FinishOptions optio
}

FileSystemDatasetFactory::FileSystemDatasetFactory(
std::shared_ptr<fs::FileSystem> filesystem, fs::PathForest forest,
std::vector<std::string> paths, std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options)
: fs_(std::move(filesystem)),
forest_(std::move(forest)),
: paths_(std::move(paths)),
fs_(std::move(filesystem)),
format_(std::move(format)),
options_(std::move(options)) {}

bool StartsWithAnyOf(const std::vector<std::string>& prefixes, const std::string& path) {
if (prefixes.empty()) {
return false;
}

auto basename = fs::internal::GetAbstractPathParent(path).second;

return std::any_of(prefixes.cbegin(), prefixes.cend(), [&](util::string_view prefix) {
return util::string_view(basename).starts_with(prefix);
});
util::optional<util::string_view> FileSystemDatasetFactory::BaselessPath(
util::string_view path) {
const util::string_view partition_base_dir{options_.partition_base_dir};
return fs::internal::RemoveAncestor(partition_base_dir, path);
}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
std::shared_ptr<fs::FileSystem> filesystem, const std::vector<std::string>& paths,
std::shared_ptr<FileFormat> format, FileSystemFactoryOptions options) {
ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(paths));
ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));

std::unordered_set<fs::FileInfo, fs::FileInfo::ByPath> missing;
DCHECK_OK(forest.Visit([&](fs::PathForest::Ref ref) {
util::string_view parent_path = options.partition_base_dir;
if (auto parent = ref.parent()) {
parent_path = parent.info().path();
std::vector<std::string> filtered_paths;
for (const auto& path : paths) {
if (options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
format->IsSupported(FileSource(path, filesystem)));
if (!supported) {
continue;
}
}

for (auto&& path :
fs::internal::AncestorsFromBasePath(parent_path, ref.info().path())) {
ARROW_ASSIGN_OR_RAISE(auto file, filesystem->GetFileInfo(std::move(path)));
missing.insert(std::move(file));
}
return Status::OK();
}));
filtered_paths.push_back(path);
}

files = std::move(forest).infos();
files.resize(files.size() + missing.size());
std::move(missing.begin(), missing.end(), files.end() - missing.size());
return std::shared_ptr<DatasetFactory>(
new FileSystemDatasetFactory(std::move(filtered_paths), std::move(filesystem),
std::move(format), std::move(options)));
}

ARROW_ASSIGN_OR_RAISE(forest, fs::PathForest::Make(std::move(files)));
bool StartsWithAnyOf(const std::string& path, const std::vector<std::string>& prefixes) {
if (prefixes.empty()) {
return false;
}

return std::shared_ptr<DatasetFactory>(new FileSystemDatasetFactory(
std::move(filesystem), std::move(forest), std::move(format), std::move(options)));
auto parts = fs::internal::SplitAbstractPath(path);
return std::any_of(parts.cbegin(), parts.cend(), [&](util::string_view part) {
return std::any_of(prefixes.cbegin(), prefixes.cend(), [&](util::string_view prefix) {
return util::string_view(part).starts_with(prefix);
});
});
}

Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
Expand All @@ -164,50 +161,44 @@ Result<std::shared_ptr<DatasetFactory>> FileSystemDatasetFactory::Make(
}

ARROW_ASSIGN_OR_RAISE(auto files, filesystem->GetFileInfo(selector));
ARROW_ASSIGN_OR_RAISE(auto forest, fs::PathForest::Make(std::move(files)));

std::vector<fs::FileInfo> filtered_files;

RETURN_NOT_OK(forest.Visit([&](fs::PathForest::Ref ref) -> fs::PathForest::MaybePrune {
const auto& path = ref.info().path();
std::vector<std::string> paths;
for (const auto& info : files) {
const auto& path = info.path();

if (StartsWithAnyOf(options.selector_ignore_prefixes, path)) {
return fs::PathForest::Prune;
if (!info.IsFile()) {
// TODO(fsaintjacques): push this filtering into Selector logic so we
// don't copy big vector around.
continue;
}

if (ref.info().IsFile() && options.exclude_invalid_files) {
ARROW_ASSIGN_OR_RAISE(auto supported,
format->IsSupported(FileSource(path, filesystem.get())));
if (!supported) {
return fs::PathForest::Continue;
}
if (StartsWithAnyOf(path, options.selector_ignore_prefixes)) {
continue;
}

filtered_files.push_back(std::move(forest.infos()[ref.i]));
return fs::PathForest::Continue;
}));
paths.push_back(path);
}

ARROW_ASSIGN_OR_RAISE(forest,
fs::PathForest::MakeFromPreSorted(std::move(filtered_files)));
// Sorting by path guarantees a stability sometimes needed by unit tests.
std::sort(paths.begin(), paths.end());

return std::shared_ptr<DatasetFactory>(new FileSystemDatasetFactory(
filesystem, std::move(forest), std::move(format), std::move(options)));
return Make(std::move(filesystem), std::move(paths), std::move(format),
std::move(options));
}

Result<std::shared_ptr<Schema>> FileSystemDatasetFactory::PartitionSchema() {
if (auto partitioning = options_.partitioning.partitioning()) {
return partitioning->schema();
}

std::vector<util::string_view> paths;
for (const auto& info : forest_.infos()) {
if (auto relative =
fs::internal::RemoveAncestor(options_.partition_base_dir, info.path())) {
paths.push_back(*relative);
std::vector<util::string_view> relative_paths;
for (const auto& path : paths_) {
if (auto relative = BaselessPath(path)) {
relative_paths.push_back(*relative);
}
}

return options_.partitioning.factory()->Inspect(paths);
return options_.partitioning.factory()->Inspect(relative_paths);
}

Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSchemas(
Expand All @@ -216,11 +207,9 @@ Result<std::vector<std::shared_ptr<Schema>>> FileSystemDatasetFactory::InspectSc

const bool has_fragments_limit = options.fragments >= 0;
int fragments = options.fragments;
for (const auto& f : forest_.infos()) {
if (!f.IsFile()) continue;
for (const auto& path : paths_) {
if (has_fragments_limit && fragments-- == 0) break;
FileSource src(f.path(), fs_.get());
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect(src));
ARROW_ASSIGN_OR_RAISE(auto schema, format_->Inspect({path, fs_}));
schemas.push_back(schema);
}

Expand All @@ -246,32 +235,25 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions
}
}

ExpressionVector partitions(forest_.size(), scalar(true));
std::shared_ptr<Partitioning> partitioning = options_.partitioning.partitioning();
if (partitioning == nullptr) {
auto factory = options_.partitioning.factory();
ARROW_ASSIGN_OR_RAISE(partitioning, factory->Finish(schema));
}

// apply partitioning to forest to derive partitions
auto apply_partitioning = [&](fs::PathForest::Ref ref) {
if (auto relative = fs::internal::RemoveAncestor(options_.partition_base_dir,
ref.info().path())) {
auto segments = fs::internal::SplitAbstractPath(relative->to_string());

if (segments.size() > 0) {
auto segment_index = static_cast<int>(segments.size()) - 1;
auto maybe_partition = partitioning->Parse(segments.back(), segment_index);

partitions[ref.i] = std::move(maybe_partition).ValueOr(scalar(true));
}
FragmentVector fragments;
for (const auto& path : paths_) {
std::shared_ptr<Expression> partition = scalar(true);
if (auto relative = BaselessPath(path)) {
std::string path_string{*relative};
partition = partitioning->Parse(path_string).ValueOr(scalar(true));
}
return Status::OK();
};

RETURN_NOT_OK(forest_.Visit(apply_partitioning));
return FileSystemDataset::Make(schema, root_partition_, format_, fs_, forest_,
std::move(partitions));
ARROW_ASSIGN_OR_RAISE(auto fragment, format_->MakeFragment({path, fs_}, partition));
fragments.push_back(fragment);
}

return FileSystemDataset::Make(schema, root_partition_, format_, fragments);
}

} // namespace dataset
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,16 +220,20 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;

protected:
FileSystemDatasetFactory(std::shared_ptr<fs::FileSystem> filesystem,
fs::PathForest forest, std::shared_ptr<FileFormat> format,
FileSystemDatasetFactory(std::vector<std::string> paths,
std::shared_ptr<fs::FileSystem> filesystem,
std::shared_ptr<FileFormat> format,
FileSystemFactoryOptions options);

Result<std::shared_ptr<Schema>> PartitionSchema();

std::vector<std::string> paths_;
std::shared_ptr<fs::FileSystem> fs_;
fs::PathForest forest_;
std::shared_ptr<FileFormat> format_;
FileSystemFactoryOptions options_;

private:
util::optional<util::string_view> BaselessPath(util::string_view path);
};

} // namespace dataset
Expand Down
Loading

0 comments on commit d2263be

Please sign in to comment.