New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ARROW-11935: [C++] Add push generator #9714
Conversation
@westonpace I would welcome your input on this. |
Note the generator could perhaps be made reentrant if there's some use for that. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good utility. I added one note about queuing as caution may be needed if this is used for generators that return large blocks of data.
cpp/src/arrow/util/async_generator.h
Outdated
public: | ||
PushGenerator() : state_(std::make_shared<State>()) {} | ||
|
||
void Push(Result<T> result) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could maybe check if result
is not ok and mark finished to true (potentially even clearing out the result q) and then on future pushes simply return immediately if finished is true. I can see where you question on Zulip came from now. The only disadvantage I can see to this approach is potentially wasted memory keeping blocks around that are invalid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, I could do that. The underlying question is: should an error always terminate an async generator? It doesn't seem that obvious to me.
Also, in regards to reentrancy. I don't think there would be any advantage to doing so here because there is no backpressure / connection with the producer. |
A push generator has a producer end which pushes values to a queue, and a consumer end (the generator itself) which yields futures that receive the values pushed by the producer.
8c1a0bd
to
17ee984
Compare
@westonpace I turned the API on its head so that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just a few thoughts below but nothing that needs to change.
return; | ||
} | ||
state_->finished = true; | ||
if (state_->consumer_fut.has_value()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could potentially clear the result_q here. I could understand either approach. However, if Close
is semantically the same as cancel it would seem you wouldn't want the downstream to keep processing the already generated results.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, close has nothing to do with cancel. It signals a regular end-of-stream.
producer.Close(); | ||
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), fut); | ||
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen()); | ||
ASSERT_FINISHES_OK_AND_EQ(IterationTraits<TestInt>::End(), gen()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like this check might be unnecessary? Can't hurt though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps over-cautious :-)
} | ||
ASSERT_FINISHES_OK_AND_EQ(TestInt{1}, futures[0]); | ||
ASSERT_FINISHES_AND_RAISES(Invalid, futures[1]); | ||
AssertNotFinished(futures[2]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't answer the earlier question (should an error always terminate a generator?). This seems to be your test here. I think from the general async generator concept this would be UB. This possibility is valid. Terminating early would also be valid. Downstream generators should be written to expect this as a possibility and should not rely on errors terminating successive calls automatically.
Which is a long winded way of saying this is valid.
It would also be ok if futures[2]
was IterationTraits<TestInt>::End()
here.
Travis-CI build: https://travis-ci.com/github/pitrou/arrow/builds/220827493 |
CI failure is unrelated, will merge. |
A push generator has a producer end which pushes values to a queue, and a consumer end (the generator itself) which yields futures that receive the values pushed by the producer. Closes apache#9714 from pitrou/ARROW-11935-push-gen Authored-by: Antoine Pitrou <antoine@python.org> Signed-off-by: Antoine Pitrou <antoine@python.org>
A push generator has a producer end which pushes values to a queue, and a consumer end (the generator itself) which yields futures that receive the values pushed by the producer. Closes apache#9714 from pitrou/ARROW-11935-push-gen Authored-by: Antoine Pitrou <antoine@python.org> Signed-off-by: Antoine Pitrou <antoine@python.org>
A push generator has a producer end which pushes values to a queue, and a consumer end (the generator itself) which yields futures that receive the values pushed by the producer.