-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-25937] Restore the environment parallelism before transforming in SinkExpander#expand. #18699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 4a9ad6d (Thu Feb 10 07:33:52 UTC 2022) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR @pltbkd 👍 Would it be possible to add a failing unit test verifying the change and preventing future regression?
|
Sure. I have added a test to verify this. The case will fail if I revert the change of this PR. But I am not sure if it's at the right place. Since no translator has a test, I add it in the StreamGraphGeneratorTest. Do you think it's ok? |
|
Sounds like a right place to me 👍 |
dmvk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @pltbkd for adding the test case 👍 This looks really good. Great job 🎉
|
Actually, I'll do that. I need to run CI manually anyway before the merge. |
…transforming in SinkExpander#expand
Parallelism of a transformation with default parallelism(-1) is set when transforming, using the default parallelism set in the environment. However, in SinkExpander#expand, the environment parallelism is set to -1 at the entrance, to verify if the parallelism of a expanded transformation is set. The environment parallelism will be restored when exiting the method, but at present the transform is called within this scope. If the parallelism of a sink is not set, the parallelism of the sink transformation and all transformations expanded from it will not be handled, so the JobGraph generated will have vertices with -1 parallelism, causing the assertion failure in AdaptiveScheduler.
This pr fixes the bug by putting the restoring of the environment parallelism ahead of transforming the sink transformations.