-
Notifications
You must be signed in to change notification settings - Fork 4.5k
[BEAM-6853] Make sdkWorkerParallelism option consistent #8286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
b85fa32 to
a583c98
Compare
|
R: @tvalentyn |
8bbe1a1 to
0a5c1af
Compare
0a5c1af to
05a0472
Compare
|
Run Java PreCommit |
05a0472 to
44741d0
Compare
|
Run Java PreCommit |
1 similar comment
|
Run Java PreCommit |
tvalentyn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @angoenka
|
|
||
| class PortableOptions(PipelineOptions): | ||
|
|
||
| """Portable options are common options expected to be understood by most of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@angoenka What do you think about removing current RunnerOptions in Python SDK, and renaming PortableOptions to either RunnerOptions or PortableRunnerOptions in all SDKs? See also: #8225 (comment)
cc: @mxm, @robertwb .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fine with me to remove RunnerOptions because it is just a stub. Not sure about PortableOptions => PortableRunnerOptions because we currently do not include Runner in any option name, e.g. FlinkPipelineOptions, not FlinkRunnerPipelineOptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we can remove the RunnerOptions as its not used anywhere.
I would like to stick with the current convention of not adding "Runner" to PortableOptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think PortableOptions is understandable by developers but might be less user-friendly, since users might not know what portability refers to.
| usage = "Default parallelism for SDK worker processes (see portable pipeline options)") | ||
| private Long sdkWorkerParallelism = 1L; | ||
| usage = | ||
| "Default parallelism for SDK worker processes (see portable pipeline options). " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider slightly different order: Parallelism level for SDK worker processes. This value is used only when the pipeline option sdkWorkerParallelism is set to 0, otherwise the value provided in pipeline options takes precedence. Default is 0, which means worker parallelism will be dynamically decided by a runner. See also: sdkWorkerParallelism Pipeline Option. Also, should last sentence be a Javadoc reference?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. I will update the wording.
| 'command.')) | ||
| parser.add_argument( | ||
| '--sdk_worker_parallelism', default=None, | ||
| '--sdk_worker_parallelism', default=1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, we probably want 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have seen machine lock down when running python processes equal to the number of cores.
We can still go with 0 here as long as we set the default to be 1 in JobServerDriver.java.
And I think this is reasonable.
44741d0 to
c1225de
Compare
|
Run Java PreCommit |
|
Run Java_Examples_Dataflow PreCommit |
c1225de to
f3623e8
Compare
|
Ping @tvalentyn |
tvalentyn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Please update the PR description, looks like it does not reflect changes in last iteration.
|
Thanks @tvalentyn |
sdkWorkerParallelism behavior was not consistent in the previous implementation and was not easy to describe.
The new behavior of sdkWorkerParallelism is as follows.
A) User input sdkWorkerParallelism = null -> Default value of 0 is set by PortablePipelineOptions -> Jobserver Overwrite(default value 1) to 1 -> 1 is used as the final value and sdkWorkerParallelism is dynamically decided.
B) User input sdkWorkerParallelism = 5 -> No default overwrite by PortablePipelineOptions and 5 is passed along -> Jobserver Overwrite not used -> 5 is used as the final value.
C) User input sdkWorkerParallelism = null -> Default value of 0 is set by PortablePipelineOptions -> Jobserver Overwrite (2) used -> 2 is used as the final value.
D) User input sdkWorkerParallelism = 0 -> No default overwrite by PortablePipelineOptions and 0 is passed along -> Jobserver Overwrite(default value 0) to 0 -> 0 is used as the final value and actual worker parallelism is determined based on CPU.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.