Skip to content

Support staging binary distributions (wheel files) of Beam SDK.#5110

Merged
aaltay merged 3 commits intoapache:masterfrom
tvalentyn:use_wheels_in_dataflow_runner
Apr 18, 2018
Merged

Support staging binary distributions (wheel files) of Beam SDK.#5110
aaltay merged 3 commits intoapache:masterfrom
tvalentyn:use_wheels_in_dataflow_runner

Conversation

@tvalentyn
Copy link
Copy Markdown
Contributor

@tvalentyn tvalentyn commented Apr 12, 2018

[BEAM-3950] Support staging binary (wheel) distributions of Beam SDK to Dataflow. This can help reduce worker startup time latency for Beam users on Dataflow runner.

This PR introduces two changes to current behavior:

  1. If --sdk_location points to a wheel, the staged SDK file name will not be renamed to dataflow_python_sdk.tar, instead original name will be used.
  2. If --sdk_location is not set, try to download and stage both SDK sources and SDK wheel from PyPi. Wheel file is downloaded and staged on a best effort - if PyPi does not have desired wheel, only the sources will be staged, and pipeline execution will continue.

Follow this checklist to help us incorporate your contribution quickly and easily:

  • Make sure there is a JIRA issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a JIRA issue. Your pull request should address just this issue, without pulling in other changes.
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue.
  • Write a pull request description that is detailed enough to understand:
    • What the pull request does
    • Why it does it
    • How it does it
    • Why this approach
  • Each commit in the pull request should have a meaningful subject line and body.
  • Run mvn clean verify to make sure basic checks pass. A more thorough check will be performed on your pull request automatically.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@tvalentyn
Copy link
Copy Markdown
Contributor Author

R: @charlesccychen
cc: @aaltay

@tvalentyn tvalentyn changed the title Support staging binary distributions of Beam SDK as wheel files. Support staging binary distributions (wheel files) of Beam SDK. Apr 12, 2018
Copy link
Copy Markdown
Contributor

@charlesccychen charlesccychen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! This LGTM.

if os.path.exists(tgz_expected):
return tgz_expected
raise RuntimeError(
'Failed to download a distribution for the running SDK. Expected '
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep "source distribution", since the other if branch has a corresponding error mentioning "binary distribution"?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, done.

@tvalentyn
Copy link
Copy Markdown
Contributor Author

Thanks for such a prompt review, @charlesccychen.

@charlesccychen
Copy link
Copy Markdown
Contributor

Thanks Valentyn! LGTM.

R: @aaltay

"""
if sdk_location.endswith('.whl'):
if sdk_location.startswith('http'):
raise RuntimeError('Staging SDK wheel from an HTTP location is currently'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_stage_beam_sdk is doing the staging. Do you want to check for this error there? It sounds like this method should be concerned about picking a good staging name only.

(Also, do we support reading from gcs locations?)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, this check does not belong here, and actually is useless, since HTTP downloader creates it's own file name after download:

local_download_file = os.path.join(to_folder, 'beam-sdk.tar.gz')

To support download wheels from HTTP we need to save the wheel file under it's original name. I was not sure if URL download path is used by anyone so didn't change that logic. I can add a TODO there if you think we need to support it.

Reading wheels from GCS is supported with this PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can update the JIRA, or open a new one. It is OK to not change the PR now.

raise RuntimeError('Staging SDK wheel from an HTTP location is currently'
'not supported.')
_, wheel_filename = FileSystems.split(sdk_location)
if (wheel_filename.startswith('apache_beam') or
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

google_cloud_dataflow does not have a wheel file (and will probably not have it in the future). We do not need to check for that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

_, wheel_filename = FileSystems.split(sdk_location)
if (wheel_filename.startswith('apache_beam') or
wheel_filename.startswith('google_cloud_dataflow')):
return wheel_filename
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check that wheel_filename has the right architecture name etc. in it that matches Dataflow workers?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My plan was to leave the decision to the worker. We also control the naming in another part of this module where we download the file.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is fine. But is a delayed error. Maybe a open a JIRA to reconsider this.

staged_path = FileSystems.join(google_cloud_options.staging_location,
names.DATAFLOW_SDK_TARBALL_FILE)
if stage_tarball_from_remote_location:
staged_path = FileSystems.join(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should staged_path be moved inside the else? I do not see it being used outside that branch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I like that.

sdk_remote_location, staged_path)
local_download_file = _dependency_file_download(
sdk_remote_location, temp_dir)
staged_name = _desired_sdk_filename_in_staging_location(local_download_file)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not this support staging a wheel file from an http location, since we just downloaded it above as a local file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said above, as long as we preserve the original wheel name, it will work.

'%s==%s' % (package_name, version),
'--no-binary', ':all:', '--no-deps']

logging.info('Executing command: %s', cmd_args)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to restructure this code to reduce duplication. For example as following?
Set common pip args
Optionally add binary related args
Executed common code (check_call ...)
Check the output

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, PTAL.


def _download_pypi_sdk_package(temp_dir):
def _download_pypi_sdk_package(temp_dir, fetch_binary=False,
language_version_tag='27',
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for the fetch_binary do you need the other args for now? For dataflow they should be fixed for the foreseeable future. Maybe the language version is an exception though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the other args here since this keeps the method general, while the wheel package is defined as all of these properties together. It is equivalent to hard-code these in the pip download method call, but I think it's a reasonable choice to keep these as defaults in kwargs.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

staged_sdk_files.append(sdk_binary_staged_name)
except RuntimeError as e:
logging.warn('Failed to download requested binary distribution '
'of the SDK: %s', repr(e))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason repr(e) does not keep the exception message that CalledProcessError contains:
This is what it looks like: WARNING:root:Failed to download requested binary distribution of the SDK: RuntimeError('CalledProcessError()',). There is a command line in the logs above, so it's clear what goes wrong, but I was wondering what is a better way of passing exception message here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

traceback.format_exc() perhaps? Please do a follow up update or open a JIRA.

@yifanzou
Copy link
Copy Markdown
Contributor

This LGTM. Thanks @tvalentyn for fixing this problem!

@tvalentyn
Copy link
Copy Markdown
Contributor Author

@aaltay Do you have any other comments I should address before we can merge this?

@aaltay aaltay merged commit de6ea7e into apache:master Apr 18, 2018
@tvalentyn
Copy link
Copy Markdown
Contributor Author

Thanks for review & merge, @aaltay, @charlesccychen. Will follow up with remaining action items.

@tvalentyn tvalentyn deleted the use_wheels_in_dataflow_runner branch October 23, 2019 00:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants