-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-31706] [runtime] The default source parallelism should be the same as ex… #22555
Conversation
…ecution's default parallelism
can you spare some precious time to review? Thanks very much @Myasuka |
@clownxc Thanks for creating this pr! I'm curious why you don't fallback the value of globalDefaultSourceParallelism to parallelism.default if necessary outside the constructor of DefaultVertexParallelismAndInputInfosDecider? This change will be much cleaner. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@clownxc I've left some comments, PTAL!
@@ -99,7 +99,7 @@ public class BatchExecutionOptions { | |||
public static final ConfigOption<Integer> ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM = | |||
key("execution.batch.adaptive.auto-parallelism.default-source-parallelism") | |||
.intType() | |||
.defaultValue(1) | |||
.defaultValue(0) | |||
.withDeprecatedKeys( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to change the default value? I think a better way is through configuration.getOptional(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM))
to determine whether the configuration item has been configured
checkNotNull(dataVolumePerTask); | ||
checkArgument( | ||
defaultExecutionParallelism > 0, | ||
"The default execution parallelism must be larger than 0."); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not require that the default parallelism must be greater than 0
@@ -84,21 +85,28 @@ private DefaultVertexParallelismAndInputInfosDecider( | |||
int globalMaxParallelism, | |||
int globalMinParallelism, | |||
MemorySize dataVolumePerTask, | |||
int globalDefaultSourceParallelism) { | |||
int globalDefaultSourceParallelism, | |||
int defaultExecutionParallelism) { | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can determine the value of source parallelism externally without touching this method.
BatchExecutionOptions | ||
.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM)); | ||
BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM), | ||
configuration.get(CoreOptions.DEFAULT_PARALLELISM)); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallelism.default should be obtained from ExecutionConfig instead of jobmaster Configuration
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallelism.default should be obtained from ExecutionConfig instead of jobmaster Configuration
Thank you very much for review. I want to get the parallelism
of ExecutionConfig
as the default value.
public int getParallelism() {
return configuration.get(CoreOptions.DEFAULT_PARALLELISM);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parallelism.default should be obtained from ExecutionConfig instead of jobmaster Configuration
I don't know if my idea is correct, looking forward to your reply.
Thank you very much for review, I will try to modify the code as you say |
Thank you very much for your review, I have modified the code, can you re-review the code when you are free, and make some comments. |
@clownxc Thanks for updating. I'm sorry that I overlooked that this pr will lead to a change in the default behavior of the configuration |
@clownxc Please move to the original ticket. Any changes to break the previous behavior needs a discussion in the JIRA, that's why we did not assign anyone to take the ticket. |
What is the purpose of the change
Currently, the sources need to set
execution.batch.adaptive.auto-parallelism.default-source-parallelism
in the adaptive batch scheduler mode, otherwise, the source parallelism is only 1 by default. A better solution might be set as the default execution parallelism if no user configured.Brief change log
Modified the default value of source-parallelism to be more reasonable.
Verifying this change
This change is already covered by existing tests, such as
AdaptiveBatchSchedulerTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation