[BEAM-8457] Label Dataflow jobs from Notebook#9854
Conversation
|
R: @pabloem |
sdks/python/apache_beam/pipeline.py
Outdated
There was a problem hiding this comment.
What if either runner or options are not provided? Should that throw an error? Currently, if only one is provided, it'll be ignored - and that would be quite surprising for users.
There was a problem hiding this comment.
You're right! This will surprise the user. I've changed it to throw error if either is not provided instead of ignoring the input by default.
There was a problem hiding this comment.
This seems fine - but what if we go with the runner_in_use codepath? Would users do: p.run(runner=InteractiveRunner(DataflowRunner()), options=...)? Or would users create a pipeline with InteractiveRunner and then do p.run(runner=DataflowRunner()...? Is it poissible for users to do p = beam.Pipeline(), and then do InteractiveRunner().run_pipeline(p)/InteractiveRunner(DataflowRunner()).run_pipeline(p)?
IIUC users would have to pass the interactive runner in p = beam.Pipeline() to activate the interactive mode, right? InteractiveRunner is not automatically selected?
There was a problem hiding this comment.
I've missed the path where a new Pipeline is created and run() is invoked again.
Yes, all of these would be possible.
I've added an interactive parameter at the constructor level for Pipeline using default value None. run() and from_runner_api() will pass the None or bool value down no matter how the user chains the runners. I'm not very confident with the naming but the change should be backward compatible for Beam.
Currently, I'm running into a problem when testing. Once I set labels, Dataflow job will fail immediately and throw Error processing pipeline. error. There will be no job graph, no worker started, no logs. Looks like when there is user label in the job request, Dataflow cannot convert the work item into internal representation.
I'll do some investigation and figure out why.
7caba45 to
a9de48a
Compare
1. Changed the pipeline.run() API to allow a runner and an option parameter so that a pipeline initially bundled w/ an interactive runner can be directly run by other runners from notebook. 2. Implicitly added the necessary source information through user labels when the user does p.run(runner=DataflowRunner(), options=options) or DataflowRunner().run_pipeline(p, options). 3. User '--labels' doesn't support character '.' or '"'. When applying version related label, replace '.' w/ '_'. Avoid enclosing any label with '"'.
sdks/python/apache_beam/pipeline.py
Outdated
| self._options).run(False) | ||
| runner_in_use, | ||
| options_in_use, | ||
| interactive=self.interactive).run(False) |
There was a problem hiding this comment.
Did you find that this was necessary? I don't think we should change the signature of the from_runner_api call. The pipeline protobuf should contain all the necessary information... Though I'd defer to @robertwb on this.
There was a problem hiding this comment.
No, it's not necessary. On a second thought, I can just move the logic to determine interactive ad hoc in run() and put the interactive field as a parameter in the run() method. Then I don't even need to change the Pipeline constructor.
Also, I've added this to the interactive_runner for below use case:
InteractiveRunner(underlying_runner=DataflowRunner()).run_pipeline(Pipeline(DataflowRunner()))
|
Thanks Ning |
|
Let's roll this back. I do not think we should be importing interactive in pipeline.py. And also my understanding is that runner will keep track of the interactivity not the pipeline. |
… from Notebook" This reverts commit 1a8391d.
…w jobs from Notebook" This reverts commit 1a8391d.
|
+1. We should not be importing the interactive runner (it's causing problems with tests as well), and interactivity should not be a property of the pipeline, but of the runner (and I'd prefer a design that avoid passing an interactive bit around everywhere). |
parameter so that a pipeline initially bundled w/ an interactive runner
can be directly run by other runners from notebook.
when the user does p.run(runner=DataflowRunner(), options=options) or
DataflowRunner().run_pipeline(p, options).
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.See the Contributor Guide for more tips on how to make review process smoother.
Post-Commit Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.