New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-8457] Label Dataflow jobs from Notebook #9885
Conversation
sdks/python/apache_beam/pipeline.py
Outdated
self.interactive = interactive | ||
elif (type(self.runner).__module__ | ||
== 'apache_beam.runners.interactive.interactive_runner' and | ||
type(self.runner).__name__ == 'InteractiveRunner'): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the difference from previous PR.
All runners are using "new-style" classes in Python, the type(obj).__module__/__name__
should always work. Please let me know if there would be backward incompatible cases.
Thanks!
74074cc
to
5c6f2d4
Compare
Run Python PreCommit |
3 similar comments
Run Python PreCommit |
Run Python PreCommit |
Run Python PreCommit |
R: @pabloem could you make a first review pass? |
This is pretty much the same PR as before, except it checks the runner via string matching. It LGTM. |
sdks/python/apache_beam/pipeline.py
Outdated
# A boolean value indicating whether the pipeline is created in an | ||
# interactive environment such as interactive notebooks. Initialized as | ||
# None. The value is set ad hoc when `pipeline.run()` is invoked. | ||
self.interactive = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe one more suggestion from the previous PR was to not keepting track of interactivity as part of the Pipeline but making it a property of the runner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
If we track interactive
as a property of runner, we cannot implicitly pass along the property from runner to runner.
And if we deduce interactive
from the environment, we'll introduce new dependencies into DataflowRunner. See below comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll go with the check environment route and make it a standalone utility module in the interactive package.
sdks/python/apache_beam/pipeline.py
Outdated
def run(self, test_runner_api=True): | ||
"""Runs the pipeline. Returns whatever our runner returns after running.""" | ||
|
||
def run(self, test_runner_api=True, runner=None, options=None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding runner and options parameters here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, we want to allow the user to switch to DataflowRunner
using the p.run()
pattern instead of limiting the user to Runner().run_pipeline(p, options)
.
Do you think we should put it into a separate PR, or simply not supporting it at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we should put it into a separate PR,
Yes. At least let's have a seperate discussion for API changes like this.
or simply not supporting it at all?
Maybe not. This could be just a override for the interactive runners run() (e.g. run_with(NewRunner, NewOptions). At least, let's discuss with all stakeholders.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Putting this discussion on next Monday's agenda and will remove changes to the API.
@@ -360,6 +360,16 @@ def visit_transform(self, transform_node): | |||
|
|||
def run_pipeline(self, pipeline, options): | |||
"""Remotely executes entire pipeline or parts reachable from node.""" | |||
# Label goog-dataflow-notebook if pipeline is initiated from interactive | |||
# runner. | |||
if pipeline.interactive: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change could be limited to:
- here detect, whether we are in interactive environment or not. (For example, check whether certain imports are loaded?) Could also be a utility method somewhere, to check whether this in an interactive method.
- If yes, add the labels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see your point! Yes, I have the capability to check if current interpreted code is in a notebook or not. This branch will need a rebase against master to take those changes.
To clarify the process:
When a DataflowRunner tries to run a job from a given pipeline,
- Check if the module
interactive_environment
is imported by checking thesys.modules
dictionary; - Check if
current_env().is_in_notebook
; - If yes, label the job.
I think we have a little bit trade off here:
- What we have here: Determining if the job is started from a pipeline that was originally bundled with an Interactive Runner.
Doing it with string comparison, we don't introduce new dependency into DataflowRunner. - Deduce if the job is started from a notebook environment.
We'll introduce [interactive] dependencies including at least ipython into DataflowRunner.
This will label Dataflow jobs from any pipeline originally bundled with arbitrary runner in any kind of ipython-notebook as long asinteractive_environment
module ininteractive
package has been (transitively) imported but not necessarily used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/interactive_environment.py#L100:L102 to a common utility function, and each runner if they want could call this without worrying about require additional imports?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Discussed with David and Sam. Since we also want to track jobs started from notebook even if the user never uses InteractiveRunner
, checking the environment might just be the only way to do it.
By putting the logic into a try-except block as it is, we could avoid introducing ipython
dependency into DataflowRunner
. If the [interactive]
dependency is never installed and current execution_path has never imported ipython
, the code would just never be executed.
I'll move the logic into a standalone utility module and import it in DataflowRunner to do the check.
1. Changed the pipeline.run() API to allow a runner and an option parameter so that a pipeline initially bundled w/ an interactive runner can be directly run by other runners from notebook. 2. Implicitly added the necessary source information through user labels when the user does p.run(runner=DataflowRunner(), options=options) or DataflowRunner().run_pipeline(p, options). 3. User '--labels' doesn't support character '.' or '"'. When applying version related label, replace '.' w/ '_'. Avoid enclosing any label with '"'.
b1b87e8
to
e0aa5ba
Compare
Run Portable_Python PreCommit |
… to a standalone utility module.
Run Portable_Python PreCommit |
Run Portable_Python PreCommit |
self._is_in_notebook = True | ||
except ImportError: | ||
pass | ||
self._is_in_ipython, self._is_in_notebook = is_interactive() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conventionally, is_xxx
functions return a boolean. Returning a pair will be especially surprising if one writes statements like if is_interactive()
and the return value is (False, False)
(which as a non-zero-length tuple evaluates to True
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Roger, will make it into 2 separate APIs.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).[BEAM-XXX] Fixes bug in ApproximateQuantiles
, where you replaceBEAM-XXX
with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.