Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17318: [C++][Dataset] Support async streaming interface for getting fragments in Dataset #13804

Merged
merged 3 commits into from
Sep 20, 2022

Conversation

ManManson
Copy link
Contributor

Add GetFragmentsAsync() and GetFragmentsAsyncImpl()
functions to the generic Dataset interface, which
allows to produce fragments in a streamed fashion.

This is one of the prerequisites for making
FileSystemDataset to support lazy fragment
processing, which, in turn, can be used to start
scan operations without waiting for the entire
dataset to be discovered.

To aid the transition process of moving to async
implementation in Dataset/AsyncScanner code,
a default implementation for GetFragmentsAsyncImpl()
is provided (yielding a VectorGenerator over
the fragments vector, which is stored by every
implementation of Dataset interface at the moment).

Tests: unit(release)

Signed-off-by: Pavel Solodovnikov pavel.al.solodovnikov@gmail.com

@github-actions
Copy link

github-actions bot commented Aug 5, 2022

@github-actions
Copy link

github-actions bot commented Aug 5, 2022

⚠️ Ticket has no components in JIRA, make sure you assign one.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few minor stylistic thoughts but this looks good to me otherwise.

cpp/src/arrow/dataset/dataset.cc Outdated Show resolved Hide resolved
cpp/src/arrow/dataset/dataset.h Outdated Show resolved Hide resolved
@ManManson
Copy link
Contributor Author

Force-pushed the branch, addressed style comments from @westonpace. Diff can be found here: https://github.com/apache/arrow/compare/fae325e7c553cc857ae9d05c757d5ff90e646260..da4235e3c08b46a8ff7a5207a93c03db75ce5515

@@ -28,6 +28,7 @@
#include "arrow/compute/exec/expression.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/util/async_generator.h"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a rather heavy include, I'd rather include it only in .cc files.

cpp/src/arrow/dataset/dataset.h Show resolved Hide resolved
// VectorGenerator
Result<FragmentGenerator> Dataset::GetFragmentsAsyncImpl(compute::Expression predicate) {
ARROW_ASSIGN_OR_RAISE(auto iter, GetFragmentsImpl(std::move(predicate)));
ARROW_ASSIGN_OR_RAISE(auto vec, iter.ToVector());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really want to make this a blocking call and exhaust the iterator here? This actually makes the default GetFragmentsAsync impl less lazy than GetFragments, surprisingly.

Perhaps we'd like to use MakeBackgroundGenerator instead?
cc @westonpace

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea. Will do.

…ator

The original `async_generator.h` is quite heavy to include,
although many users need only a tiny fraction of what is
inside the header.

Provide forward declarations header (`async_generator_fwd.h`)
for basic `AsyncGenerator` and other generator classes,
so that users do not need to include the entire
`async_generator.h`, for example, if they only need a
`AsyncGenerator` typedef.

Signed-off-by: Pavel Solodovnikov <pavel.al.solodovnikov@gmail.com>
@ManManson
Copy link
Contributor Author

Rebased and force-pushed the branch. Unfortunately, I cannot attach a link to the diff because rebasing broke it.

Changes summary:

  • Added utils/async_generator_fwd.h
  • GetFragmentsAsyncImpl() forwards GetFragmentsImpl() to MakeBackgroundGenerator() + MakeTransferredGenerator() instead of .ToVector():ing it.

@ManManson
Copy link
Contributor Author

@westonpace @pitrou Polite review ping.

Copy link
Member

@westonpace westonpace left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with this, I just have one request. Can we take in the CPU executor as a parameter? You can add an overload that defaults to ::arrow::internal::GetCpuThreadPool I'm hoping, in this release, to be able to run all scanning from an event-loop for serial mode and that requires avoiding ::arrow::internal::GetCpuThreadPool.

@ManManson
Copy link
Contributor Author

Force-pushed the branch to address review comments from @westonpace. The diff can be found here: https://github.com/apache/arrow/compare/e91396ccf22eec394f23369255a9fd65be60b274..a6c5d5075b8150cf4ecf6874ad59a0e8497af93c

Changelog:

  • Added a non-virtual GetFragmentsAsyncImplBase, which accepts a arrow::internal::Executor* which is used as a destination executor for the background generator.
  • Default implementation of virtual GetFragmentsAsyncImpl now calls GetFragmentsAsyncImplBase with the default executor internal::GetCPUThreadPool().
  • Expanded the comments section for the default impl method.

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update; shouldn't some tests be added for this?

@@ -16,13 +16,15 @@
// under the License.

#include "arrow/dataset/dataset.h"
#include <arrow/util/thread_pool.h>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be careful to use the same inclusion style - quotes for Arrow includes, brackets for other ones.

Copy link
Contributor Author

@ManManson ManManson Aug 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks for noticing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add some basic tests in the next iteration.


std::shared_ptr<Schema> schema_;
compute::Expression partition_expression_ = compute::literal(true);

private:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's private this means it will not be able to be called by a derived class' GetFragmentsAsyncImpl? I'm not sure I understand the respective responsibilities of GetFragmentsAsyncImpl vs GetFragmentsAsyncImplBase here...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to introduce an additional argument to the virtual GetFragmentsAsyncImpl(), so I moved the parameterized variant of the initial function to a separate one (called GetFragmentsAsyncImplBase).

Since this API should be considered experimental, I think we can also change it later if needed.

The reason I didn't make a virtual GetFragmentsAsyncImpl(..., Executor* = GetCPUThreadPool()) is because in some situations it can cause confusion and probably more serious troubles, please see the link for some common gotchas: http://www.gotw.ca/gotw/005.htm.

I don't see any reason for GetFragmentsAsyncImplBase to remain protected since it's intended to serve as an impl detail for default base implementation. Derived classes will provide a completely separate impl for GetFragmentsAsyncImpl.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Default parameters can be confusing in virtual methods, I will agree to that. Although we have a few places in Arrow where we use them and just follow the maxim of trying to be consistent in the defaults we supply. With this approach however, I'm confused how a caller would specify which executor to use? Would it have to be a property of the dataset itself?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's ok to add an Executor* argument to GetFragmentsAsyncImpl. It doesn't need to have a default value, and that sounds better than having 3 methods for the same functionality.

…ting fragments in Dataset

Add `GetFragmentsAsync()` and `GetFragmentsAsyncImpl()`
functions to the generic `Dataset` interface, which
allows to produce fragments in a streamed fashion.

This is one of the prerequisites for making
`FileSystemDataset` to support lazy fragment
processing, which, in turn, can be used to start
scan operations without waiting for the entire
dataset to be discovered.

To aid the transition process of moving to async
implementation in `Dataset`/`AsyncScanner` code,
a default implementation for `GetFragmentsAsyncImpl()`
is provided (iterating over `GetFragmentsImpl()`
via a `BackgroundGenerator` and transferring results
back to a given executor).

Tests: unit(release)

Signed-off-by: Pavel Solodovnikov <pavel.al.solodovnikov@gmail.com>
…etFragmentsAsync`

Provide two basic tests for `Dataset::GetFragments` and
`Dataset::GetFragmentsAsync` interfaces, utilizing
`InMemoryDataset` for testing purposes.

There was a helper function `AssertDatasetFragmentsEqual()`
for testing `GetFragments()` method, but it was unused until now,
meaning that the `Dataset` was essentially missing a part of
test coverage for `GetFragments()`.

This is fixed now by using this helper function in a
simple test case `TestInMemoryDataset::GetFragmentsSync`.

Analogous helper `AssertDatasetAsyncFragmentsEqual` is
introduced to iterate the dataset via `Dataset::GetFragmentsAsync()`
and is used in the `TestInMemoryDataset::GetFragmentsAsync` test-case.

Also, I have encountered a bug in
`DatasetFixtureMixin::AssertFragmentEquals`, which caused the
`GetFragmentsSync` test-case to fail:

The underlying fragment scanner always assumed to completely
drain the provided batch generator, but this is not the case with
`GetFragmentsSync` and `GetFragmentsAsync` test-cases, where each
fragment in the dataset is composed of a single batch from the source
batch generator.
Hence, the `AssertFragmentEquals()` should be called
as much times as there are fragments in the dataset, each time
advancing the batch generator position by a single batch.
But, the `AssertFragmentEquals()` would immediately fail after the
first iteration, because batch generator is not exhausted yet.

The bug is also fixed in this patch.

Tests: unit(release)

Signed-off-by: Pavel Solodovnikov <pavel.al.solodovnikov@gmail.com>
@ManManson
Copy link
Contributor Author

Force-pushed the branch to address review comments. The diff can be found there: https://github.com/apache/arrow/compare/a6c5d5075b8150cf4ecf6874ad59a0e8497af93c..b47679f0708ed736e75c0254f5654cae9d9abbe4

Changelog:

  • Fixed include style, reordered includes
  • GetFragmentsAsyncImpl() accepts a executor argument, defaulting to GetCPUThreadPool() in the default implementation of Dataset class
  • Fixed a bug in DatasetFixtureMixin::AssertFragmentEquals() which incorrectly assumed that the fragment scanner would always drain the batch generator
  • Added a test for Dataset::GetFragments by utilizing the AssertDatasetFragmentsEqual() helper, which was unused until now
  • Added a similar test for Dataset::GetFragmentsAsync along with a similar helper DatasetFixtureMixin::AssertDatasetAsyncFragmentsEqual()

@ManManson
Copy link
Contributor Author

@westonpace @pitrou Polite review ping.

Copy link
Member

@pitrou pitrou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ManManson Thanks a lot for the update! This looks good to me now.

@pitrou pitrou merged commit 4f31bfc into apache:master Sep 20, 2022
@ursabot
Copy link

ursabot commented Sep 20, 2022

Benchmark runs are scheduled for baseline = ab71673 and contender = 4f31bfc. 4f31bfc is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️0.58% ⬆️0.0%] test-mac-arm
[Failed ⬇️0.0% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.18% ⬆️0.07%] ursa-thinkcentre-m75q
Buildkite builds:
[Finished] 4f31bfc2 ec2-t3-xlarge-us-east-2
[Failed] 4f31bfc2 test-mac-arm
[Failed] 4f31bfc2 ursa-i9-9960x
[Finished] 4f31bfc2 ursa-thinkcentre-m75q
[Finished] ab71673c ec2-t3-xlarge-us-east-2
[Failed] ab71673c test-mac-arm
[Failed] ab71673c ursa-i9-9960x
[Finished] ab71673c ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

@ursabot
Copy link

ursabot commented Sep 20, 2022

['Python', 'R'] benchmarks have high level of regressions.
test-mac-arm

zagto pushed a commit to zagto/arrow that referenced this pull request Oct 7, 2022
…ting fragments in Dataset (apache#13804)

Add `GetFragmentsAsync()` and `GetFragmentsAsyncImpl()`
functions to the generic `Dataset` interface, which
allows to produce fragments in a streamed fashion.

This is one of the prerequisites for making
`FileSystemDataset` to support lazy fragment
processing, which, in turn, can be used to start
scan operations without waiting for the entire
dataset to be discovered.

To aid the transition process of moving to async
implementation in `Dataset`/`AsyncScanner` code,
a default implementation for `GetFragmentsAsyncImpl()`
is provided (yielding a VectorGenerator over
the fragments vector, which is stored by every
implementation of Dataset interface at the moment).

Tests: unit(release)

Signed-off-by: Pavel Solodovnikov <pavel.al.solodovnikov@gmail.com>

Authored-by: Pavel Solodovnikov <pavel.al.solodovnikov@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
fatemehp pushed a commit to fatemehp/arrow that referenced this pull request Oct 17, 2022
…ting fragments in Dataset (apache#13804)

Add `GetFragmentsAsync()` and `GetFragmentsAsyncImpl()`
functions to the generic `Dataset` interface, which
allows to produce fragments in a streamed fashion.

This is one of the prerequisites for making
`FileSystemDataset` to support lazy fragment
processing, which, in turn, can be used to start
scan operations without waiting for the entire
dataset to be discovered.

To aid the transition process of moving to async
implementation in `Dataset`/`AsyncScanner` code,
a default implementation for `GetFragmentsAsyncImpl()`
is provided (yielding a VectorGenerator over
the fragments vector, which is stored by every
implementation of Dataset interface at the moment).

Tests: unit(release)

Signed-off-by: Pavel Solodovnikov <pavel.al.solodovnikov@gmail.com>

Authored-by: Pavel Solodovnikov <pavel.al.solodovnikov@gmail.com>
Signed-off-by: Antoine Pitrou <antoine@python.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants