New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-5464] Use BATCH_FORCED as the default ExecutionMode for batch pipeline #6897
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix! It appears there is a scheduling bug for large pipelined
jobs. It's great to have a workaround.
@Description( | ||
"Flink mode for data exchange for batch pipeline. " | ||
+ "Reference {@link org.apache.flink.api.common.ExecutionMode}") | ||
@Default.Enum("BATCH_FORCED") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit hesitant to change the default value here. This is also used by the non-portable FlinkRunner and the default is PIPELINED
. We haven't heard from anyone having issues with the batch execution. I'd leave this at the Flink default until we have found out the exact issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also ok for me to add a link to a JIRA issue to further investigate this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I will update the default.
Flink Jira for reference https://issues.apache.org/jira/browse/FLINK-10672
"Flink mode for data exchange for batch pipeline. " | ||
+ "Reference {@link org.apache.flink.api.common.ExecutionMode}") | ||
@Default.Enum("BATCH_FORCED") | ||
ExecutionMode getExecutionModeForBatch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test in PipelineOptionsTest
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TheDefault Enum test is done in ProxyInvocationHandlerTest.java
Please let me know if you are referring to some other test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant PipelineOptionsTest
to test the default values. I'm adding it in the merge commit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be getFlinkExecutionModeForBatch, as it seems rather Flink-specific?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Considering the global namespace of the option names that would be preferred. However, not a single other option so far has a Flink prefix. Perhaps we have to come up with a way to scope option names?
514f46a
to
329e51f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to clarify, the resulting PR just enables the option, rather than setting it as the title states, right?
"Flink mode for data exchange for batch pipeline. " | ||
+ "Reference {@link org.apache.flink.api.common.ExecutionMode}") | ||
@Default.Enum("BATCH_FORCED") | ||
ExecutionMode getExecutionModeForBatch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be getFlinkExecutionModeForBatch, as it seems rather Flink-specific?
Yes, it just enables to set it. The default remains unchanged ( |
Use BATCH_FORCED ExecutionMode for Flink batch pipelines to avoid flink scheduling dead lock.
Beam merges the chained ProcessBundles in ExecutionStage so this should not have a lot of over head.
Also introducing a parameter to change batch execution mode if needed.
Reference upstream jira https://issues.apache.org/jira/browse/FLINK-10672
Please add a meaningful description for your change here
Follow this checklist to help us incorporate your contribution quickly and easily:
[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.It will help us expedite review of your Pull Request if you tag someone (e.g.
@username
) to look at it.Post-Commit Tests Status (on master branch)