New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-9347] Don't overwrite default runner harness for unified worker #10919
[BEAM-9347] Don't overwrite default runner harness for unified worker #10919
Conversation
Thanks Ankur! LGTM. |
0a8d95d
to
7962fa6
Compare
@@ -916,6 +919,9 @@ def _use_unified_worker(pipeline_options): | |||
debug_options = pipeline_options.view_as(DebugOptions) | |||
use_unified_worker_flag = 'use_unified_worker' | |||
|
|||
if debug_options.lookup_experiment(use_unified_worker_flag): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL at this method - we need to remove some of the statements in 922-928.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need 922-924?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nvm, reading this again, looks like this logic is intenional
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think I understand the intent but I think it is confusing - can we move lines 925-926 to dataflow_runner.py, see:
# Elevate "min_cpu_platform" to pipeline option, but using the existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move 925-926 to dataflow_runner.py but then i feel the logic to determine unified_worker usage will be more staggered.
I think it will be better to keep the interpretation of use_unified_worker
and use_runner_v2
at a single place.
For now, we support both flag but I think we will transition to use_runner_v2
over time (not sure by when).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it would be cleaner to not modify experiments in _use_unified_worker
, since the name of this method does not suggest that it modifies the state, and it would also be easier to follow the logic. Also, if Dataflow runner relies on the fact that "use_runner_v2" experiment is set, it might be better to set it explicitly at some point, instead of relying on the fact that "_use_unified_worker" will be called. Leaving this up to you.
@@ -186,8 +186,11 @@ def __init__(self, packages, options, environment_version, pipeline_url): | |||
if job_type.startswith('FNAPI_'): | |||
self.debug_options = self.debug_options or DebugOptions() | |||
self.debug_options.experiments = self.debug_options.experiments or [] | |||
# Don't add the default image overwrite if user overwrites or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be easier to read if we restructure this:
if (self.debug_options.lookup_experiment('runner_harness_container_image') or
_use_unified_worker(self.debug_options)):
# Comment on WHY we don't want to set the override
pass
else:
<...set the override...>
WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, should we be passing pipeline_options instead of debug options? Perhaps it works either way but passing pipeline options would be cleaner since that's what the signature of _use_unified_worker() expects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be easier to read if we restructure this:
if (self.debug_options.lookup_experiment('runner_harness_container_image') or _use_unified_worker(self.debug_options)): # Comment on WHY we don't want to set the override pass else: <...set the override...>
WDYT?
I think it makes sense. Will update it.
Also, should we be passing pipeline_options instead of debug options? Perhaps it works either way but passing pipeline options would be cleaner since that's what the signature of _use_unified_worker() expects.
I think the code is structured in a way that a self.debug_options
is referenced every where and the reference to it can be different from options
passed in the method argument. So I think it will need a more thoughtful refactoring to move to change it to options in the right manner. Which I think we can differ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tvalentyn
Made some changes and responded to some comments.
@@ -916,6 +919,9 @@ def _use_unified_worker(pipeline_options): | |||
debug_options = pipeline_options.view_as(DebugOptions) | |||
use_unified_worker_flag = 'use_unified_worker' | |||
|
|||
if debug_options.lookup_experiment(use_unified_worker_flag): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can move 925-926 to dataflow_runner.py but then i feel the logic to determine unified_worker usage will be more staggered.
I think it will be better to keep the interpretation of use_unified_worker
and use_runner_v2
at a single place.
For now, we support both flag but I think we will transition to use_runner_v2
over time (not sure by when).
@@ -186,8 +186,11 @@ def __init__(self, packages, options, environment_version, pipeline_url): | |||
if job_type.startswith('FNAPI_'): | |||
self.debug_options = self.debug_options or DebugOptions() | |||
self.debug_options.experiments = self.debug_options.experiments or [] | |||
# Don't add the default image overwrite if user overwrites or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it will be easier to read if we restructure this:
if (self.debug_options.lookup_experiment('runner_harness_container_image') or _use_unified_worker(self.debug_options)): # Comment on WHY we don't want to set the override pass else: <...set the override...>
WDYT?
I think it makes sense. Will update it.
Also, should we be passing pipeline_options instead of debug options? Perhaps it works either way but passing pipeline options would be cleaner since that's what the signature of _use_unified_worker() expects.
I think the code is structured in a way that a self.debug_options
is referenced every where and the reference to it can be different from options
passed in the method argument. So I think it will need a more thoughtful refactoring to move to change it to options in the right manner. Which I think we can differ.
@@ -916,6 +919,9 @@ def _use_unified_worker(pipeline_options): | |||
debug_options = pipeline_options.view_as(DebugOptions) | |||
use_unified_worker_flag = 'use_unified_worker' | |||
|
|||
if debug_options.lookup_experiment(use_unified_worker_flag): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it would be cleaner to not modify experiments in _use_unified_worker
, since the name of this method does not suggest that it modifies the state, and it would also be easier to follow the logic. Also, if Dataflow runner relies on the fact that "use_runner_v2" experiment is set, it might be better to set it explicitly at some point, instead of relying on the fact that "_use_unified_worker" will be called. Leaving this up to you.
@@ -186,8 +186,13 @@ def __init__(self, packages, options, environment_version, pipeline_url): | |||
if job_type.startswith('FNAPI_'): | |||
self.debug_options = self.debug_options or DebugOptions() | |||
self.debug_options.experiments = self.debug_options.experiments or [] | |||
if not self.debug_options.lookup_experiment( | |||
'runner_harness_container_image'): | |||
if self.debug_options.lookup_experiment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few thoughts here
- I think we cannot instantiate
DebugOptions()
inself.debug_options = self.debug_options or DebugOptions()
. We should create a view of existingoptions
. All views of pipeline options should be views of the same underlying object, and I think that line can break this invariant if we follow theor
branch. Do you know why we added it? - For the purpose of this PR it should be sufficient to pass
options
into_use_unified_worker()
in line 190.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I think I added it because in certain case it was passed in test. I might be wrong though.
Makes sense. I will update the PR with both these suggestions.
if self.debug_options.lookup_experiment( | ||
'runner_harness_container_image') or _use_unified_worker( | ||
self.debug_options): | ||
# Don't add the default image overwrite if user overwrites or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment repeats the logic; I thought you might want to explain why we don't want to pin the image for UW.
Consider: "In case of using UW, harness container should be defined by DF service unless explicitly overridden."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updates the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tvalentyn
Updated the PR.
@@ -186,8 +186,13 @@ def __init__(self, packages, options, environment_version, pipeline_url): | |||
if job_type.startswith('FNAPI_'): | |||
self.debug_options = self.debug_options or DebugOptions() | |||
self.debug_options.experiments = self.debug_options.experiments or [] | |||
if not self.debug_options.lookup_experiment( | |||
'runner_harness_container_image'): | |||
if self.debug_options.lookup_experiment( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. I think I added it because in certain case it was passed in test. I might be wrong though.
Makes sense. I will update the PR with both these suggestions.
if self.debug_options.lookup_experiment( | ||
'runner_harness_container_image') or _use_unified_worker( | ||
self.debug_options): | ||
# Don't add the default image overwrite if user overwrites or |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updates the comment.
Thanks @tvalentyn |
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.