Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 117 additions & 42 deletions sdks/python/apache_beam/runners/dataflow/internal/dependency.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import logging
import os
import shutil
import subprocess
import sys
import tempfile

Expand Down Expand Up @@ -373,17 +374,15 @@ def stage_job_resources(

if hasattr(setup_options, 'sdk_location'):
if setup_options.sdk_location == 'default':
stage_tarball_from_remote_location = True
stage_sdk_from_remote_location = True
elif (setup_options.sdk_location.startswith('gs://') or
setup_options.sdk_location.startswith('http://') or
setup_options.sdk_location.startswith('https://')):
stage_tarball_from_remote_location = True
stage_sdk_from_remote_location = True
else:
stage_tarball_from_remote_location = False
stage_sdk_from_remote_location = False

staged_path = FileSystems.join(google_cloud_options.staging_location,
names.DATAFLOW_SDK_TARBALL_FILE)
if stage_tarball_from_remote_location:
if stage_sdk_from_remote_location:
# If --sdk_location is not specified then the appropriate package
# will be obtained from PyPI (https://pypi.python.org) based on the
# version of the currently running SDK. If the option is
Expand All @@ -392,16 +391,17 @@ def stage_job_resources(
#
# Unit tests running in the 'python setup.py test' context will
# not have the sdk_location attribute present and therefore we
# will not stage a tarball.
# will not stage SDK.
if setup_options.sdk_location == 'default':
sdk_remote_location = 'pypi'
else:
sdk_remote_location = setup_options.sdk_location
_stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir)
resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
resources.extend(
_stage_beam_sdk(sdk_remote_location,
google_cloud_options.staging_location, temp_dir))
else:
# Check if we have a local Beam SDK tarball present. This branch is
# used by tests running with the SDK built at head.
# This branch is also used by internal tests running with the SDK built
# at head.
if setup_options.sdk_location == 'default':
module_path = os.path.abspath(__file__)
sdk_path = os.path.join(
Expand All @@ -414,8 +414,13 @@ def stage_job_resources(
sdk_path = setup_options.sdk_location
if os.path.isfile(sdk_path):
logging.info('Copying Beam SDK "%s" to staging location.', sdk_path)
staged_path = FileSystems.join(
google_cloud_options.staging_location,
_desired_sdk_filename_in_staging_location(
setup_options.sdk_location))
file_copy(sdk_path, staged_path)
resources.append(names.DATAFLOW_SDK_TARBALL_FILE)
_, sdk_staged_filename = FileSystems.split(staged_path)
resources.append(sdk_staged_filename)
else:
if setup_options.sdk_location == 'default':
raise RuntimeError('Cannot find default Beam SDK tar file "%s"',
Expand Down Expand Up @@ -453,36 +458,81 @@ def _build_setup_package(setup_file, temp_dir, build_setup_args=None):
os.chdir(saved_current_directory)


def _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir):
"""Stage a Beam SDK tarball with the appropriate version.
def _desired_sdk_filename_in_staging_location(sdk_location):
"""Returns the name that SDK file should have file in the staging location.

Args:
sdk_remote_location: A GCS path to a SDK tarball or a URL from
the file can be downloaded.
staged_path: GCS path where the found SDK tarball should be copied.
sdk_location: Full path to SDK file.
"""
if sdk_location.endswith('.whl'):
_, wheel_filename = FileSystems.split(sdk_location)
if wheel_filename.startswith('apache_beam'):
return wheel_filename
else:
raise RuntimeError('Unrecognized SDK wheel file: %s' % sdk_location)
else:
return names.DATAFLOW_SDK_TARBALL_FILE


def _stage_beam_sdk(sdk_remote_location, staging_location, temp_dir):
"""Stages a Beam SDK file with the appropriate version.

Args:
sdk_remote_location: A GCS path to a SDK file or a URL from which
the file can be downloaded. The SDK file can be a tarball or a wheel.
Set to 'pypi' to download and stage a wheel and source SDK from PyPi.
staging_location: A GCS bucket where the SDK file should be copied.
temp_dir: path to temporary location where the file should be downloaded.

Returns:
A list of SDK files that were staged to the staging location.

Raises:
RuntimeError: If wget on the URL specified returs errors or the file
cannot be copied from/to GCS.
RuntimeError: if staging was not successful.
"""
if (sdk_remote_location.startswith('http://') or
sdk_remote_location.startswith('https://')):
logging.info(
'Staging Beam SDK tarball from %s to %s',
sdk_remote_location, staged_path)
local_download_file = _dependency_file_download(
sdk_remote_location, temp_dir)
staged_name = _desired_sdk_filename_in_staging_location(local_download_file)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would not this support staging a wheel file from an http location, since we just downloaded it above as a local file?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I said above, as long as we preserve the original wheel name, it will work.

staged_path = FileSystems.join(staging_location, staged_name)
logging.info(
'Staging Beam SDK from %s to %s',
sdk_remote_location, staged_path)
_dependency_file_copy(local_download_file, staged_path)
return [staged_name]
elif sdk_remote_location.startswith('gs://'):
# Stage the file to the GCS staging area.
staged_name = _desired_sdk_filename_in_staging_location(sdk_remote_location)
staged_path = FileSystems.join(staging_location, staged_name)
logging.info(
'Staging Beam SDK tarball from %s to %s',
'Staging Beam SDK from %s to %s',
sdk_remote_location, staged_path)
_dependency_file_copy(sdk_remote_location, staged_path)
return [staged_name]
elif sdk_remote_location == 'pypi':
logging.info('Staging the SDK tarball from PyPI to %s', staged_path)
_dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path)
sdk_local_file = _download_pypi_sdk_package(temp_dir)
sdk_sources_staged_name = _desired_sdk_filename_in_staging_location(
sdk_local_file)
staged_path = FileSystems.join(staging_location, sdk_sources_staged_name)
logging.info('Staging SDK sources from PyPI to %s', staged_path)
_dependency_file_copy(sdk_local_file, staged_path)
staged_sdk_files = [sdk_sources_staged_name]
try:
# Stage binary distribution of the SDK, for now on a best-effort basis.
sdk_local_file = _download_pypi_sdk_package(temp_dir, fetch_binary=True)
sdk_binary_staged_name = _desired_sdk_filename_in_staging_location(
sdk_local_file)
staged_path = FileSystems.join(staging_location, sdk_binary_staged_name)
logging.info('Staging binary distribution of the SDK from PyPI to %s',
staged_path)
_dependency_file_copy(sdk_local_file, staged_path)
staged_sdk_files.append(sdk_binary_staged_name)
except RuntimeError as e:
logging.warn('Failed to download requested binary distribution '
'of the SDK: %s', repr(e))
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For some reason repr(e) does not keep the exception message that CalledProcessError contains:
This is what it looks like: WARNING:root:Failed to download requested binary distribution of the SDK: RuntimeError('CalledProcessError()',). There is a command line in the logs above, so it's clear what goes wrong, but I was wondering what is a better way of passing exception message here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

traceback.format_exc() perhaps? Please do a follow up update or open a JIRA.


return staged_sdk_files
else:
raise RuntimeError(
'The --sdk_location option was used with an unsupported '
Expand Down Expand Up @@ -565,7 +615,11 @@ def get_sdk_package_name():
return BEAM_PACKAGE_NAME


def _download_pypi_sdk_package(temp_dir):
def _download_pypi_sdk_package(temp_dir, fetch_binary=False,
language_version_tag='27',
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for the fetch_binary do you need the other args for now? For dataflow they should be fixed for the foreseeable future. Maybe the language version is an exception though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep the other args here since this keeps the method general, while the wheel package is defined as all of these properties together. It is equivalent to hard-code these in the pip download method call, but I think it's a reasonable choice to keep these as defaults in kwargs.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

language_implementation_tag='cp',
abi_tag='cp27mu',
platform_tag='manylinux1_x86_64'):
"""Downloads SDK package from PyPI and returns path to local path."""
package_name = get_sdk_package_name()
try:
Expand All @@ -574,23 +628,44 @@ def _download_pypi_sdk_package(temp_dir):
raise RuntimeError('Please set --sdk_location command-line option '
'or install a valid {} distribution.'
.format(package_name))

# Get a source distribution for the SDK package from PyPI.
cmd_args = [
_get_python_executable(), '-m', 'pip', 'download', '--dest', temp_dir,
'%s==%s' % (package_name, version),
'--no-binary', ':all:', '--no-deps']
'%s==%s' % (package_name, version), '--no-deps']

if fetch_binary:
logging.info('Downloading binary distribtution of the SDK from PyPi')
# Get a wheel distribution for the SDK from PyPI.
cmd_args.extend([
'--only-binary', ':all:', '--python-version', language_version_tag,
'--implementation', language_implementation_tag, '--abi', abi_tag,
'--platform', platform_tag])
# Example wheel: apache_beam-2.4.0-cp27-cp27mu-manylinux1_x86_64.whl
expected_files = [
os.path.join(
temp_dir,
'%s-%s-%s%s-%s-%s.whl' % (package_name.replace('-', '_'), version,
language_implementation_tag,
language_version_tag, abi_tag,
platform_tag))]
else:
logging.info('Downloading source distribtution of the SDK from PyPi')
cmd_args.extend(['--no-binary', ':all:'])
expected_files = [
os.path.join(temp_dir, '%s-%s.zip' % (package_name, version)),
os.path.join(temp_dir, '%s-%s.tar.gz' % (package_name, version))
]

logging.info('Executing command: %s', cmd_args)
processes.check_call(cmd_args)
zip_expected = os.path.join(
temp_dir, '%s-%s.zip' % (package_name, version))
if os.path.exists(zip_expected):
return zip_expected
tgz_expected = os.path.join(
temp_dir, '%s-%s.tar.gz' % (package_name, version))
if os.path.exists(tgz_expected):
return tgz_expected
try:
processes.check_call(cmd_args)
except subprocess.CalledProcessError as e:
raise RuntimeError(repr(e))

for sdk_file in expected_files:
if os.path.exists(sdk_file):
return sdk_file

raise RuntimeError(
'Failed to download a source distribution for the running SDK. Expected '
'either %s or %s to be found in the download folder.' % (
zip_expected, tgz_expected))
'Failed to download a distribution for the running SDK. '
'Expected either one of %s to be found in the download folder.' % (
expected_files))
Loading