Skip to content

Commit

Permalink
ARROW-17318: [C++][Dataset] Support async streaming interface for get…
Browse files Browse the repository at this point in the history
…ting fragments in Dataset

Add `GetFragmentsAsync()` and `GetFragmentsAsyncImpl()`
functions to the generic `Dataset` interface, which
allows to produce fragments in a streamed fashion.

This is one of the prerequisites for making
`FileSystemDataset` to support lazy fragment
processing, which, in turn, can be used to start
scan operations without waiting for the entire
dataset to be discovered.

To aid the transition process of moving to async
implementation in `Dataset`/`AsyncScanner` code,
a default implementation for `GetFragmentsAsyncImpl()`
is provided (yielding a VectorGenerator over
the fragments vector, which is stored by every
implementation of Dataset interface at the moment).

Tests: unit(release)

Signed-off-by: Pavel Solodovnikov <pavel.al.solodovnikov@gmail.com>
  • Loading branch information
ManManson committed Aug 5, 2022
1 parent 6a3fb97 commit da4235e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 0 deletions.
19 changes: 19 additions & 0 deletions cpp/src/arrow/dataset/dataset.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,25 @@ Result<FragmentIterator> Dataset::GetFragments(compute::Expression predicate) {
: MakeEmptyIterator<std::shared_ptr<Fragment>>();
}

Result<FragmentGenerator> Dataset::GetFragmentsAsync() {
return GetFragmentsAsync(compute::literal(true));
}

Result<FragmentGenerator> Dataset::GetFragmentsAsync(compute::Expression predicate) {
ARROW_ASSIGN_OR_RAISE(
predicate, SimplifyWithGuarantee(std::move(predicate), partition_expression_));
return predicate.IsSatisfiable() ? GetFragmentsAsyncImpl(std::move(predicate))
: MakeEmptyGenerator<std::shared_ptr<Fragment>>();
}

// Default impl delegating the work to `GetFragmentsImpl` and wrapping it into a
// VectorGenerator
Result<FragmentGenerator> Dataset::GetFragmentsAsyncImpl(compute::Expression predicate) {
ARROW_ASSIGN_OR_RAISE(auto iter, GetFragmentsImpl(std::move(predicate)));
ARROW_ASSIGN_OR_RAISE(auto vec, iter.ToVector());
return MakeVectorGenerator(std::move(vec));
}

struct VectorRecordBatchGenerator : InMemoryDataset::RecordBatchGenerator {
explicit VectorRecordBatchGenerator(RecordBatchVector batches)
: batches_(std::move(batches)) {}
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/arrow/dataset/dataset.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "arrow/compute/exec/expression.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/macros.h"
#include "arrow/util/mutex.h"
#include "arrow/util/optional.h"
Expand Down Expand Up @@ -134,6 +135,8 @@ class ARROW_DS_EXPORT InMemoryFragment : public Fragment {

/// @}

using FragmentGenerator = AsyncGenerator<std::shared_ptr<Fragment>>;

/// \brief A container of zero or more Fragments.
///
/// A Dataset acts as a union of Fragments, e.g. files deeply nested in a
Expand All @@ -148,6 +151,10 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
Result<FragmentIterator> GetFragments(compute::Expression predicate);
Result<FragmentIterator> GetFragments();

/// \brief Async versions of `GetFragments`.
Result<FragmentGenerator> GetFragmentsAsync(compute::Expression predicate);
Result<FragmentGenerator> GetFragmentsAsync();

const std::shared_ptr<Schema>& schema() const { return schema_; }

/// \brief An expression which evaluates to true for all data viewed by this Dataset.
Expand All @@ -174,6 +181,7 @@ class ARROW_DS_EXPORT Dataset : public std::enable_shared_from_this<Dataset> {
Dataset(std::shared_ptr<Schema> schema, compute::Expression partition_expression);

virtual Result<FragmentIterator> GetFragmentsImpl(compute::Expression predicate) = 0;
virtual Result<FragmentGenerator> GetFragmentsAsyncImpl(compute::Expression predicate);

std::shared_ptr<Schema> schema_;
compute::Expression partition_expression_ = compute::literal(true);
Expand Down

0 comments on commit da4235e

Please sign in to comment.