diff --git a/cpp/src/arrow/dataset/dataset.cc b/cpp/src/arrow/dataset/dataset.cc index 1e4c9b7f719bb..4ac43e136e135 100644 --- a/cpp/src/arrow/dataset/dataset.cc +++ b/cpp/src/arrow/dataset/dataset.cc @@ -160,6 +160,25 @@ Result Dataset::GetFragments(compute::Expression predicate) { : MakeEmptyIterator>(); } +Result Dataset::GetFragmentsAsync() { + return GetFragmentsAsync(compute::literal(true)); +} + +Result Dataset::GetFragmentsAsync(compute::Expression predicate) { + ARROW_ASSIGN_OR_RAISE( + predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_)); + return predicate.IsSatisfiable() ? GetFragmentsAsyncImpl(std::move(predicate)) + : MakeEmptyGenerator>(); +} + +// Default impl delegating the work to `GetFragmentsImpl` and wrapping it into a +// VectorGenerator +Result Dataset::GetFragmentsAsyncImpl(compute::Expression predicate) { + ARROW_ASSIGN_OR_RAISE(auto iter, GetFragmentsImpl(std::move(predicate))); + ARROW_ASSIGN_OR_RAISE(auto vec, iter.ToVector()); + return MakeVectorGenerator(std::move(vec)); +} + struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator { explicit VectorRecordBatchGenerator(RecordBatchVector batches) : batches_(std::move(batches)) {} diff --git a/cpp/src/arrow/dataset/dataset.h b/cpp/src/arrow/dataset/dataset.h index 9f4fee52154a9..cd8a4ee587e11 100644 --- a/cpp/src/arrow/dataset/dataset.h +++ b/cpp/src/arrow/dataset/dataset.h @@ -28,6 +28,7 @@ #include "arrow/compute/exec/expression.h" #include "arrow/dataset/type_fwd.h" #include "arrow/dataset/visibility.h" +#include "arrow/util/async_generator.h" #include "arrow/util/macros.h" #include "arrow/util/mutex.h" #include "arrow/util/optional.h" @@ -134,6 +135,8 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment { /// @} +using FragmentGenerator = AsyncGenerator>; + /// \brief A container of zero or more Fragments. /// /// A Dataset acts as a union of Fragments, e.g. files deeply nested in a @@ -148,6 +151,10 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { Result GetFragments(compute::Expression predicate); Result GetFragments(); + /// \brief Async versions of `GetFragments`. + Result GetFragmentsAsync(compute::Expression predicate); + Result GetFragmentsAsync(); + const std::shared_ptr& schema() const { return schema_; } /// \brief An expression which evaluates to true for all data viewed by this Dataset. @@ -174,6 +181,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this { Dataset(std::shared_ptr schema, compute::Expression partition_expression); virtual Result GetFragmentsImpl(compute::Expression predicate) = 0; + virtual Result GetFragmentsAsyncImpl(compute::Expression predicate); std::shared_ptr schema_; compute::Expression partition_expression_ = compute::literal(true);