Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-3074] Stage the pipeline in Python DataflowRunner #4010

Merged
merged 1 commit into from Oct 27, 2017

Conversation

kennknowles
Copy link
Member

@kennknowles kennknowles commented Oct 18, 2017

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

R: @aaltay (or redirect to other appropriate Python reviewer?)

Just hacked this out naively; it probably isn't respecting abstractions quite right. I confirmed enough that the file is staged - much simpler than Java 👍. Also no tests 🥇.

In doing a manual smoke test, I just tried to follow some combination of the quickstart plus the contribution guide, but broke during staging because pip install --download doesn't like that I did pip install -e .[gcp]. Is there a doc that has the steps for a new contributor to run wordcount with local modifications? I'm a bit rusty on the approved way of setting up the virtualenv. The crash occurs after the pipeline is staged, so I was able to check the basics anyhow.

Copy link
Member

@aaltay aaltay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kennknowles wrote:
Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Each commit in the pull request should have a meaningful subject line and body.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

R: @aaltay (or redirect to other appropriate Python reviewer?)

Just hacked this out naively; it probably isn't respecting abstractions quite right. I confirmed enough that the file is staged - much simpler than Java 👍. Also no tests 🥇.

In doing a manual smoke test, I just tried to follow some combination of the quickstart plus the contribution guide, but broke during staging because pip install --download doesn't like that I did pip install -e .[gcp]. Is there a doc that has the steps for a new contributor to run wordcount with local modifications? I'm a bit rusty on the approved way of setting up the virtualenv. The crash occurs after the pipeline is staged, so I was able to check the basics anyhow.

Unfortunately we do not have a page that explains how to get started with developing python in Beam. Filed https://issues.apache.org/jira/browse/BEAM-3075 for this.

@@ -304,6 +311,11 @@ def run(self, pipeline):
self.dataflow_client = apiclient.DataflowApplicationClient(
pipeline._options)

# Upload the original proto for the pipeline
self.dataflow_client.stage_file(self.job.google_cloud_options.staging_location,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you do this within stage_job_resources (

resources = dependency.stage_job_resources(
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You want me to plumb the original pipeline through to there? It isn't part of the job.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am conflicted about what to do here. I do not think we should plumb the pipeline. At the same time I want to keep all staging to happen in a single location.

You can leave a TODO here to figure out what to do about it. Perhaps we can find a middle ground as a list of additional files/buffers that needs to be staged could be passed to the dataflow_client.

@@ -79,6 +81,8 @@ class DataflowRunner(PipelineRunner):
CreatePTransformOverride(),
]

_STAGED_PIPELINE_FILENAME = "pipeline.pb"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this to runners/dataflow/internal/names.py ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@kennknowles kennknowles force-pushed the py-stage-pipeline branch 2 times, most recently from e458703 to 2d82eb3 Compare October 19, 2017 17:50
@kennknowles
Copy link
Member Author

I actually forgot the most important part, which was the part where we tell the runner harness where to find the pipeline. So this is mostly a total rewrite so I rebased anyhow; please take a fresh look.

  • The pipeline being "staged" is actually the DataflowRunner talking to the runner harness, so it isn't really part of the other staging logic that is the SDK/user pipeline talking to the SDK harness. The staging location just happens to be a convenient place where we know the client can write and the runner harness can read. So I went back to using the raw stage_file method but I am happy to use more appropriate helper logic if it exists.
  • The setting of the metadata "pipeline_url" to this proto is the common piece across SDKs. Everyone needs to put it somewhere accessible to the runner harness, but the SDK can choose where and set this field to anything. Is it a mandatory part of every job from now on.
  • It looks like the stage_file method returns nothing if staged to a local file or the full GCS Object (this is the response) if it is GCS. Normally, I would set the metadata field to maybe a URL returned by stage_file but I didn't want to mess with that. So the URL is hardcoded.

@kennknowles
Copy link
Member Author

Incidentally, my formatting may or may not matter to you. I did some default IntelliJ autoformatting, because even though there is a yapf formatter I think it does whole files only. If you have a recommended tool or manual process I am happy to oblige.

Copy link
Member

@aaltay aaltay left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

If you prefer you can modify stage_file to return something in the local file case. I do not think we use its return value anywhere.

For formatting I do not use a special tool. As long as it complies with the linter it is good.

@@ -40,7 +40,7 @@
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import WorkerOptions
from apache_beam.runners.dataflow.internal import dependency
from apache_beam.runners.dataflow.internal import dependency, names
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you break this into two lines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@kennknowles
Copy link
Member Author

Thanks! I'll wait for the Python precommit to pass then self-merge.

@kennknowles
Copy link
Member Author

The failure looks like something in the fn-api runner that I've seen before.

@kennknowles
Copy link
Member Author

run python precommit

@kennknowles
Copy link
Member Author

Hmm, I either failed to push or wiped out my changes to apiclient_test.py.

@kennknowles
Copy link
Member Author

retest this please

@kennknowles kennknowles force-pushed the py-stage-pipeline branch 2 times, most recently from 8893d11 to 97a3a58 Compare October 25, 2017 01:37
@kennknowles
Copy link
Member Author

I believe the remaining failure in the Python precommit is that HEAD is broken. This can wait.

@aaltay
Copy link
Member

aaltay commented Oct 25, 2017

@kennknowles tests are failing due to https://issues.apache.org/jira/browse/BEAM-3040, you can try re-running or running tox locally. It is unfortunate that we are blocking PRs on flaky tests.

@kennknowles
Copy link
Member Author

Yes, I think based on personal judgment of the risk of a change we can move forward sometimes after careful manual validation.

@kennknowles
Copy link
Member Author

retest this please

@kennknowles
Copy link
Member Author

run python precommit

@kennknowles
Copy link
Member Author

OK, anyhow the unit tests for Python are g2g. Gonna merge.

@aaltay
Copy link
Member

aaltay commented Oct 27, 2017

Sounds good. Thank you!

@asfgit asfgit merged commit 7d59c96 into apache:master Oct 27, 2017
asfgit pushed a commit that referenced this pull request Oct 27, 2017
…unner

  Stage the pipeline in Python DataflowRunner
@kennknowles kennknowles deleted the py-stage-pipeline branch April 25, 2018 02:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants