Skip to content

[BEAM-759] Implement wait_until_finish method for existing runners.#1762

Closed
aaltay wants to merge 6 commits intoapache:python-sdkfrom
aaltay:expand
Closed

[BEAM-759] Implement wait_until_finish method for existing runners.#1762
aaltay wants to merge 6 commits intoapache:python-sdkfrom
aaltay:expand

Conversation

@aaltay
Copy link
Copy Markdown
Member

@aaltay aaltay commented Jan 10, 2017

Implement wait_until_finish method for existing runners.

Also defines the not implemented cancel() method and updates existing
usages to use wait_until_finish() instead of blocking runners.

Main changes are in the runners/ folder
runner.py - has the APIs
dataflow_runner.py, direct_runner.py modified to implement the API (moving the existing blocking code around.)

Rest of the changes are mechanical to mainly convert
p.run() to p.run().wait_until_finish() in tests and examples. Changed tests because they run validation after the run and need to block until completion. We may revert the changes in examples. I converted the because in the instructions we directed users to blocking runners before and this change keeps the behavior same.

I have started a local post commit run (not completed yet) and it was successful with the first few tests so far and the changes are same for all tests.

Remaining work after this PR:

  • Removing BlockingDataflowRunner. It is not used after this change with the SDK code/examples/tests.
  • Support for duration argument in wait_until_finish is missing.

@aaltay aaltay changed the title Implement wait_until_finish method for existing runners. [BEAM-759] Implement wait_until_finish method for existing runners. Jan 10, 2017
@aaltay
Copy link
Copy Markdown
Member Author

aaltay commented Jan 10, 2017

R: @charlesccychen


# Actually run the pipeline (all operations above are deferred).
return p.run()
return p.run().wait_until_finish()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we return the result. However, I don't think the result is actually used. Should we remove "return" for consistency?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result is used to get aggregated_values. I am keeping the return.

environment_version=self.environment_version).proto
# TODO(silviuc): Remove the debug logging eventually.
logging.info('JOB: %s', job)
# logging.info('JOB: %s', job)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the intention here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, reverted. It was my local debugging change.

@aaltay
Copy link
Copy Markdown
Member Author

aaltay commented Jan 10, 2017

Thank you @charlesccychen.

R: @robertwb

@charlesccychen
Copy link
Copy Markdown
Contributor

Thanks, LGTM.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6495/
--none--

@aaltay aaltay closed this Jan 10, 2017
@aaltay aaltay reopened this Jan 10, 2017
@asfbot
Copy link
Copy Markdown

asfbot commented Jan 10, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6501/
--none--

@robertwb
Copy link
Copy Markdown
Contributor

I think that requiring test authors to write wait_until_finished() is quite error prone. Perhaps we could make the test runner "blocking" by default (but parametrizable of course). There is no contract that p.run() return immediately, just that it can return early to provide a nicer API.

@aaltay
Copy link
Copy Markdown
Member Author

aaltay commented Jan 11, 2017

@robertwb Thank you, I agree that, it is easy to miss wait_until_finished() in tests.

I made the TestDataflowRunner blocking, but most tests also run with DirectRunner. Are you suggesting to make DirectRunner blocking by default?

@robertwb
Copy link
Copy Markdown
Contributor

robertwb commented Jan 11, 2017 via email

@robertwb
Copy link
Copy Markdown
Contributor

Sorry for the confusion, I see it's a TestPipeline, not TestRunner

I think the way to do this would be to override TestPipeline.run() to return super(TestPipeline, self).run().wait_until_finished(). We could make this behavior optional, but it should default to True.

@charlesccychen
Copy link
Copy Markdown
Contributor

Since we're on the topic, are we therefore explicitly making the decision that non-testing runners will not block on .run() by default? This makes the more common use case of .run() less intuitive.

@robertwb
Copy link
Copy Markdown
Contributor

robertwb commented Jan 13, 2017 via email

Also defines the not implemented cancel() method and updates existing
usages to use wait_until_finish() instead of blocking runners.
@aaltay
Copy link
Copy Markdown
Member Author

aaltay commented Jan 13, 2017

@robertwb PTAL.

I updated TestPipeline and tests using that. I had to rebase because of the other changes.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6554/
--none--

Copy link
Copy Markdown
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly looks good. However, I think most of the test should be converted to use TestPipeline rather than manually waiting.

('that', ((1, 'that'), )),
]))
p.run()
p.run().wait_until_finish()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be updated to use the TestPipeline instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated this and most other tests to use TestPipeline.

lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
| 'write' >> WriteToText(known_args.output))
p.run()
p.run().wait_until_finish()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this? What happens if we don't have it? (I'd assume these are non-daemon threads we're starting, so the pipeline will still continue to run until complete before the process exits, right?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need it, removed it.

DirectRunner uses non-daemon threads, and you are right pipeline continue to run until complete before the process exits.

DataflowRunner uses daemon threads. I kept the existing behavior and there is a comment about why it is needed. Pipelines still continue to finish tough because exiting from the runner after job submissions does not cancel the job.


# Actually run the pipeline (all operations above are deferred).
p.run()
p.run().wait_until_finish()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same

# that is very small (VERY) given that we run at least 10 million trials.
assert_that(result, in_between(3.13, 3.15))
p.run()
p.run().wait_until_finish()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same (and the rest of the examples below). Perhaps one example should demonstrate waiting...

| beam.io.WriteToText(my_options.output))

p.run()
result = p.run()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems a lot of these snippets could be updated to use TestPipeline, right? (At least as long as it's part of the setup/teardown code, not the actual snippet.) They're never run not under a test...

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated most of them. I could not easily update some of these in which the pipeline creation was part of the snippet and it was immediately followed by applying transforms inside the snippet.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I wonder if these should be part of the snippets (including constructing an empty PipelineOptions()) but we should address that as part of the documentation revamp.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Maybe. At least some of them could be part of the snippets.

beam.assert_that(small_but_nontrivial, beam.equal_to(['bb']),
label='small_but_not_trivial')
p.run()
p.run().wait_until_finish()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for these, we should be using TestPipeline.

assert_that(pcoll, equal_to(range(1000)))

pipeline.run()
pipeline.run().wait_until_finish()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason these io tests are not using TestPipeline as well?


def _is_in_terminal_state(self):
if not self.has_job:
return True
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UNKNOWN is terminal? What if self.job gets assigned later? Or can that not happen?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry I missed this in my previous reply.

It cannot happen. apiclient.DataflowApplicationClient.create_job either returns None (in case of error and template jobs) or returns a Job with a job id. It is not assigned later.

@@ -25,14 +25,16 @@
class TestDataflowRunner(DataflowRunner):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of this logic feels like it belongs in TestPipeline, not a subclass of DataflowRunner. @markflyhigh

Removed wait_until_finish except for a few exceptions:
- tfidf: as an example usage.
- some examples in cookbook - they run examples directly
  and, did not want to update the examples to use TestPipeline.
- some snippets - if the pipeline creations is part of the snippet
  and it was not easy to override.
@asfbot
Copy link
Copy Markdown

asfbot commented Jan 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6560/
--none--

@aaltay
Copy link
Copy Markdown
Member Author

aaltay commented Jan 15, 2017

Thank you @robertwb. I updated most of the existing tests to use TestPipeline and remove wait_until_finish() from those. Except for a few places I could not easily clean. PTAL.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6561/
--none--

Copy link
Copy Markdown
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Just a few minor comments.

p.run()
result = p.run()
# [END pipelines_constructing_running]
result
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just to silence lint? Could that be done more explicitly?

Copy link
Copy Markdown
Member Author

@aaltay aaltay Jan 17, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, leftover code. Removed it. (Did a a global search to find other left over code similar to this but could not.)

| beam.io.WriteToText(my_options.output))

p.run()
result = p.run()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. I wonder if these should be part of the snippets (including constructing an empty PipelineOptions()) but we should address that as part of the documentation revamp.


def _is_in_terminal_state(self):
if not self.has_job:
return True
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump.


[testenv:py27]
deps=
nose
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this new?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New. Running autocomplete_test outside the testing framework depends on this. (I think the other option is to add it as an install_requires package since tox does setup.py install at first. Adding it as tests_require was not enough.)

@aaltay
Copy link
Copy Markdown
Member Author

aaltay commented Jan 17, 2017

Thank you @robertwb. PTAL.

Copy link
Copy Markdown
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/beam_PreCommit_Java_MavenInstall/6601/
--none--

asfgit pushed a commit that referenced this pull request Jan 18, 2017
@aaltay
Copy link
Copy Markdown
Member Author

aaltay commented Jan 18, 2017

Thank you.

@aaltay aaltay closed this Jan 18, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants