New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-30533][runtime] SourceOperator#emitNext() should push records to DataOutput in a while loop #21576
Conversation
@flinkbot run azure |
df885c0
to
d26585f
Compare
@xintongsong @zhuzhurk Can you review this PR? Thanks! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening the PR, @lindong28. The changes LGTM. I have only 1 minor comment. +1 for merging.
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR! @lindong28
I'm a bit uncertain about how much this change will affect the input selection of multi-input operators. Currently it switches inputs/sources each record. Looks to me this change may cause it to drain one input first before moving to the next input, when inputs are fast enough.
@xintongsong @zhuzhurk Thanks for the review! @zhuzhurk After reading through related source code, I believe this PR won't affect the behavior of multi-input operators, due to the following reasons:
Does this answer your question? |
@zhuzhurk You are right. Currently for SQL program executed in batch mode, the program might create a I agree it might potentially cause performance for I have updated PR to exclude |
027772f
to
d58e84c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing the comments.
The change looks good to me except for one last minor comment.
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
Outdated
Show resolved
Hide resolved
…to DataOutput in a while loop This optimization is currently disabled for TwoInputStreamTask and MultipleInputStreamTask.
…to DataOutput in a while loop This optimization is currently disabled for TwoInputStreamTask and MultipleInputStreamTask. This closes apache#21576.
…to DataOutput in a while loop This optimization is currently disabled for TwoInputStreamTask and MultipleInputStreamTask. This closes apache#21576.
…to DataOutput in a while loop This optimization is currently disabled for TwoInputStreamTask and MultipleInputStreamTask. This closes apache#21576.
What is the purpose of the change
Improve Flink performance by reducing the average depth of the call stack needed to produce a record.
Note that this optimization is currently disabled for SourceOperator executed by either
MultipleInputStreamTask
orAbstractTwoInputStreamTask
.Brief change log
Updated
SourceOperator#emitNext()
to push records to the givenDataOutput
in a while loop. The loop will break when any of the following conditions are met:DataInputStatus#MORE_AVAILABLE
MultipleInputStreamTask
orAbstractTwoInputStreamTask
.For the example program provided below, the following 4 function calls will be removed from the call stack needed to produce a record for most records.
Verifying this change
Here are the benchmark results obtained by running the above program with parallelism=1 and object re-use enabled. The results are averaged across 5 runs for each setup.
Prior to the proposed change, the average execution time is 46.1 sec with std=5.1 sec.
After the proposed change, the average execution time is 33.3 sec with std=0.9 sec.
The proposed change increases throughput by 38.4%.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation