From bef56a8915a35b7e1f52ae8e458ea8a589f6e445 Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Wed, 11 Apr 2018 17:39:08 -0700 Subject: [PATCH 1/3] Support staging binary distributions of Beam SDK as wheel files. --- .../runners/dataflow/internal/dependency.py | 177 +++++++++++---- .../dataflow/internal/dependency_test.py | 210 ++++++++++++++---- 2 files changed, 295 insertions(+), 92 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 7e89ab391d8a..8917decb06b2 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -55,6 +55,7 @@ import logging import os import shutil +import subprocess import sys import tempfile @@ -373,17 +374,18 @@ def stage_job_resources( if hasattr(setup_options, 'sdk_location'): if setup_options.sdk_location == 'default': - stage_tarball_from_remote_location = True + stage_sdk_from_remote_location = True elif (setup_options.sdk_location.startswith('gs://') or setup_options.sdk_location.startswith('http://') or setup_options.sdk_location.startswith('https://')): - stage_tarball_from_remote_location = True + stage_sdk_from_remote_location = True else: - stage_tarball_from_remote_location = False + stage_sdk_from_remote_location = False - staged_path = FileSystems.join(google_cloud_options.staging_location, - names.DATAFLOW_SDK_TARBALL_FILE) - if stage_tarball_from_remote_location: + staged_path = FileSystems.join( + google_cloud_options.staging_location, + _desired_sdk_filename_in_staging_location(setup_options.sdk_location)) + if stage_sdk_from_remote_location: # If --sdk_location is not specified then the appropriate package # will be obtained from PyPI (https://pypi.python.org) based on the # version of the currently running SDK. If the option is @@ -392,16 +394,17 @@ def stage_job_resources( # # Unit tests running in the 'python setup.py test' context will # not have the sdk_location attribute present and therefore we - # will not stage a tarball. + # will not stage SDK. if setup_options.sdk_location == 'default': sdk_remote_location = 'pypi' else: sdk_remote_location = setup_options.sdk_location - _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir) - resources.append(names.DATAFLOW_SDK_TARBALL_FILE) + resources.extend( + _stage_beam_sdk(sdk_remote_location, + google_cloud_options.staging_location, temp_dir)) else: - # Check if we have a local Beam SDK tarball present. This branch is - # used by tests running with the SDK built at head. + # This branch is also used by internal tests running with the SDK built + # at head. if setup_options.sdk_location == 'default': module_path = os.path.abspath(__file__) sdk_path = os.path.join( @@ -415,7 +418,8 @@ def stage_job_resources( if os.path.isfile(sdk_path): logging.info('Copying Beam SDK "%s" to staging location.', sdk_path) file_copy(sdk_path, staged_path) - resources.append(names.DATAFLOW_SDK_TARBALL_FILE) + _, sdk_staged_filename = FileSystems.split(staged_path) + resources.append(sdk_staged_filename) else: if setup_options.sdk_location == 'default': raise RuntimeError('Cannot find default Beam SDK tar file "%s"', @@ -453,36 +457,85 @@ def _build_setup_package(setup_file, temp_dir, build_setup_args=None): os.chdir(saved_current_directory) -def _stage_beam_sdk_tarball(sdk_remote_location, staged_path, temp_dir): - """Stage a Beam SDK tarball with the appropriate version. +def _desired_sdk_filename_in_staging_location(sdk_location): + """Returns the name that SDK file should have file in the staging location. Args: - sdk_remote_location: A GCS path to a SDK tarball or a URL from - the file can be downloaded. - staged_path: GCS path where the found SDK tarball should be copied. + sdk_location: Full path to SDK file. + """ + if sdk_location.endswith('.whl'): + if sdk_location.startswith('http'): + raise RuntimeError('Staging SDK wheel from an HTTP location is currently' + 'not supported.') + _, wheel_filename = FileSystems.split(sdk_location) + if (wheel_filename.startswith('apache_beam') or + wheel_filename.startswith('google_cloud_dataflow')): + return wheel_filename + else: + raise RuntimeError('Unrecognized SDK wheel file: %s' % sdk_location) + else: + return names.DATAFLOW_SDK_TARBALL_FILE + + +def _stage_beam_sdk(sdk_remote_location, staging_location, temp_dir): + """Stages a Beam SDK file with the appropriate version. + + Args: + sdk_remote_location: A GCS path to a SDK file or a URL from + the file can be downloaded. The SDK file can be a tarball or a wheel. + Set to 'pypi' to download and stage a wheel and source SDK from PyPi. + staging_location: A GCS bucket where the SDK file should be copied. temp_dir: path to temporary location where the file should be downloaded. + Returns: + A list of SDK files that were staged to the staging location. + Raises: - RuntimeError: If wget on the URL specified returs errors or the file - cannot be copied from/to GCS. + RuntimeError: if staging was not successful. """ if (sdk_remote_location.startswith('http://') or sdk_remote_location.startswith('https://')): - logging.info( - 'Staging Beam SDK tarball from %s to %s', - sdk_remote_location, staged_path) local_download_file = _dependency_file_download( sdk_remote_location, temp_dir) + staged_name = _desired_sdk_filename_in_staging_location(local_download_file) + staged_path = FileSystems.join(staging_location, staged_name) + logging.info( + 'Staging Beam SDK from %s to %s', + sdk_remote_location, staged_path) _dependency_file_copy(local_download_file, staged_path) + return [staged_name] elif sdk_remote_location.startswith('gs://'): # Stage the file to the GCS staging area. + staged_name = _desired_sdk_filename_in_staging_location(sdk_remote_location) + staged_path = FileSystems.join(staging_location, staged_name) logging.info( - 'Staging Beam SDK tarball from %s to %s', + 'Staging Beam SDK from %s to %s', sdk_remote_location, staged_path) _dependency_file_copy(sdk_remote_location, staged_path) + return [staged_name] elif sdk_remote_location == 'pypi': - logging.info('Staging the SDK tarball from PyPI to %s', staged_path) - _dependency_file_copy(_download_pypi_sdk_package(temp_dir), staged_path) + sdk_local_file = _download_pypi_sdk_package(temp_dir) + sdk_sources_staged_name = _desired_sdk_filename_in_staging_location( + sdk_local_file) + staged_path = FileSystems.join(staging_location, sdk_sources_staged_name) + logging.info('Staging SDK sources from PyPI to %s', staged_path) + _dependency_file_copy(sdk_local_file, staged_path) + staged_sdk_files = [sdk_sources_staged_name] + try: + # Stage binary distribution of the SDK, for now on a best-effort basis. + sdk_local_file = _download_pypi_sdk_package(temp_dir, fetch_binary=True) + sdk_binary_staged_name = _desired_sdk_filename_in_staging_location( + sdk_local_file) + staged_path = FileSystems.join(staging_location, sdk_binary_staged_name) + logging.info('Staging binary distribution of the SDK from PyPI to %s', + staged_path) + _dependency_file_copy(sdk_local_file, staged_path) + staged_sdk_files.append(sdk_binary_staged_name) + except RuntimeError as e: + logging.warn('Failed to download requested binary distribution ' + 'of the SDK: %s', repr(e)) + + return staged_sdk_files else: raise RuntimeError( 'The --sdk_location option was used with an unsupported ' @@ -565,7 +618,11 @@ def get_sdk_package_name(): return BEAM_PACKAGE_NAME -def _download_pypi_sdk_package(temp_dir): +def _download_pypi_sdk_package(temp_dir, fetch_binary=False, + language_version_tag='27', + language_implementation_tag='cp', + abi_tag='cp27mu', + platform_tag='manylinux1_x86_64'): """Downloads SDK package from PyPI and returns path to local path.""" package_name = get_sdk_package_name() try: @@ -574,23 +631,51 @@ def _download_pypi_sdk_package(temp_dir): raise RuntimeError('Please set --sdk_location command-line option ' 'or install a valid {} distribution.' .format(package_name)) - - # Get a source distribution for the SDK package from PyPI. - cmd_args = [ - _get_python_executable(), '-m', 'pip', 'download', '--dest', temp_dir, - '%s==%s' % (package_name, version), - '--no-binary', ':all:', '--no-deps'] - logging.info('Executing command: %s', cmd_args) - processes.check_call(cmd_args) - zip_expected = os.path.join( - temp_dir, '%s-%s.zip' % (package_name, version)) - if os.path.exists(zip_expected): - return zip_expected - tgz_expected = os.path.join( - temp_dir, '%s-%s.tar.gz' % (package_name, version)) - if os.path.exists(tgz_expected): - return tgz_expected - raise RuntimeError( - 'Failed to download a source distribution for the running SDK. Expected ' - 'either %s or %s to be found in the download folder.' % ( - zip_expected, tgz_expected)) + if fetch_binary: + # Get a wheel distribution for the SDK from PyPI. + cmd_args = [ + _get_python_executable(), '-m', 'pip', 'download', '--dest', temp_dir, + '%s==%s' % (package_name, version), + '--only-binary', ':all:', '--no-deps', + '--python-version', language_version_tag, + '--implementation', language_implementation_tag, '--abi', abi_tag, + '--platform', platform_tag + ] + logging.info('Executing command: %s', cmd_args) + try: + processes.check_call(cmd_args) + except subprocess.CalledProcessError as e: + raise RuntimeError(repr(e)) + # Example wheel: apache_beam-2.4.0-cp27-cp27mu-manylinux1_x86_64.whl + whl_expected = os.path.join( + temp_dir, + '%s-%s-%s%s-%s-%s.whl' % (package_name.replace('-', '_'), version, + language_implementation_tag, + language_version_tag, abi_tag, platform_tag) + ) + if os.path.exists(whl_expected): + return whl_expected + raise RuntimeError( + 'Failed to download requested binary distribution for the running SDK. ' + 'Expected %s to be found in the download folder.' % whl_expected) + else: + # Get a source distribution for the SDK package from PyPI. + cmd_args = [ + _get_python_executable(), '-m', 'pip', 'download', '--dest', temp_dir, + '%s==%s' % (package_name, version), + '--no-binary', ':all:', '--no-deps'] + + logging.info('Executing command: %s', cmd_args) + processes.check_call(cmd_args) + zip_expected = os.path.join( + temp_dir, '%s-%s.zip' % (package_name, version)) + if os.path.exists(zip_expected): + return zip_expected + tgz_expected = os.path.join( + temp_dir, '%s-%s.tar.gz' % (package_name, version)) + if os.path.exists(tgz_expected): + return tgz_expected + raise RuntimeError( + 'Failed to download a distribution for the running SDK. Expected ' + 'either %s or %s to be found in the download folder.' % ( + zip_expected, tgz_expected)) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py index 41afe0a8c5b1..70679a7dbaba 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py @@ -23,6 +23,8 @@ import tempfile import unittest +import mock + from apache_beam.io.filesystems import FileSystems from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions @@ -252,57 +254,106 @@ def test_setup_file_not_named_setup_dot_py(self): 'The --setup_file option expects the full path to a file named ' 'setup.py instead of ')) - def override_file_copy(self, expected_from_path, expected_to_dir): - def file_copy(from_path, to_path): - if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE): - self.assertEqual(expected_from_path, from_path) - self.assertEqual(FileSystems.join(expected_to_dir, - names.DATAFLOW_SDK_TARBALL_FILE), - to_path) - if from_path.startswith('gs://') or to_path.startswith('gs://'): - logging.info('Faking file_copy(%s, %s)', from_path, to_path) - else: - shutil.copyfile(from_path, to_path) - dependency._dependency_file_copy = file_copy - - def override_file_download(self, expected_from_url, expected_to_folder): - def file_download(from_url, _): - self.assertEqual(expected_from_url, from_url) - tarball_path = os.path.join(expected_to_folder, 'sdk-tarball') - with open(tarball_path, 'w') as f: - f.write('Some contents.') - return tarball_path - dependency._dependency_file_download = file_download - return os.path.join(expected_to_folder, 'sdk-tarball') - - def override_pypi_download(self, expected_from_url, expected_to_folder): - def pypi_download(_): - tarball_path = os.path.join(expected_to_folder, 'sdk-tarball') - with open(tarball_path, 'w') as f: - f.write('Some contents.') - return tarball_path - dependency._download_pypi_sdk_package = pypi_download - return os.path.join(expected_to_folder, 'sdk-tarball') + def build_fake_pip_download_command_handler(self, has_wheels): + """A stub for apache_beam.utils.processes.check_call that imitates pip. + + Args: + has_wheels: Whether pip fake should have a whl distribution of packages. + """ + + def pip_fake(args): + """Fakes fetching a package from pip by creating a temporary file. + + Args: + args: a complete list of command line arguments to invoke pip. + The fake is sensitive to the order of the arguments. + Supported commands: + + 1) Download SDK sources file: + python pip -m download --dest /tmp/dir apache-beam==2.0.0 \ + --no-binary :all: --no-deps + + 2) Download SDK binary wheel file: + python pip -m download --dest /tmp/dir apache-beam==2.0.0 \ + --no-binary :all: --no-deps --python-version 27 \ + --implementation cp --abi cp27mu --platform manylinux1_x86_64 + """ + package_file = None + if len(args) >= 8: + # package_name==x.y.z + if '==' in args[6]: + distribution_name = args[6][0:args[6].find('==')] + distribution_version = args[6][args[6].find('==')+2:] + + if args[7] == '--no-binary': + package_file = '%s-%s.zip' % ( + distribution_name, distribution_version) + elif args[7] == '--only-binary' and len(args) >= 18: + if not has_wheels: + # Imitate the case when desired wheel distribution is not in PyPI. + raise RuntimeError("No matching distribution.") + + # Per PEP-0427 in wheel filenames non-alphanumeric characters + # in distribution name are replaced with underscore. + distribution_name = distribution_name.replace('-', '_') + package_file = '%s-%s-%s%s-%s-%s.whl' % ( + distribution_name, distribution_version, + args[13], # implementation + args[11], # python version + args[15], # abi tag + args[17] # platform + ) + + assert package_file, "Pip fake does not support the command: " + str(args) + self.create_temp_file( + FileSystems.join(args[5], package_file), "SDK from PyPi.") + + return pip_fake def test_sdk_location_default(self): staging_dir = self.make_temp_dir() - expected_from_url = 'pypi' - expected_from_path = self.override_pypi_download( - expected_from_url, staging_dir) - self.override_file_copy(expected_from_path, staging_dir) - options = PipelineOptions() options.view_as(GoogleCloudOptions).staging_location = staging_dir self.update_options(options) options.view_as(SetupOptions).sdk_location = 'default' + with mock.patch('apache_beam.utils.processes.check_call', + self.build_fake_pip_download_command_handler( + has_wheels=False)): + staged_resources = dependency.stage_job_resources( + options, temp_dir=self.make_temp_dir()) + self.assertEqual( - [names.DATAFLOW_SDK_TARBALL_FILE], - dependency.stage_job_resources( - options, - file_copy=dependency._dependency_file_copy)) + [names.DATAFLOW_SDK_TARBALL_FILE], staged_resources) + + with open(os.path.join( + staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)) as f: + self.assertEqual(f.read(), 'SDK from PyPi.') - def test_sdk_location_local(self): + def test_sdk_location_default_with_wheels(self): + staging_dir = self.make_temp_dir() + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).sdk_location = 'default' + + with mock.patch( + 'apache_beam.utils.processes.check_call', + self.build_fake_pip_download_command_handler(has_wheels=True)): + staged_resources = dependency.stage_job_resources( + options, + temp_dir=self.make_temp_dir()) + + self.assertTrue(len(staged_resources), 2) + self.assertEqual(staged_resources[0], names.DATAFLOW_SDK_TARBALL_FILE) + # Exact name depends on the version of the SDK. + self.assertTrue(staged_resources[1].endswith('whl')) + for name in staged_resources: + with open(os.path.join(staging_dir, name)) as f: + self.assertEqual(f.read(), 'SDK from PyPi.') + + def test_sdk_location_local_directory(self): staging_dir = self.make_temp_dir() sdk_location = self.make_temp_dir() self.create_temp_file( @@ -324,7 +375,47 @@ def test_sdk_location_local(self): with open(tarball_path) as f: self.assertEqual(f.read(), 'contents') - def test_sdk_location_local_not_present(self): + def test_sdk_location_local_source_file(self): + staging_dir = self.make_temp_dir() + sdk_directory = self.make_temp_dir() + sdk_filename = 'apache-beam-3.0.0.tar.gz' + sdk_location = os.path.join(sdk_directory, sdk_filename) + self.create_temp_file(sdk_location, 'contents') + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).sdk_location = sdk_location + + self.assertEqual( + [names.DATAFLOW_SDK_TARBALL_FILE], + dependency.stage_job_resources(options)) + tarball_path = os.path.join( + staging_dir, names.DATAFLOW_SDK_TARBALL_FILE) + with open(tarball_path) as f: + self.assertEqual(f.read(), 'contents') + + def test_sdk_location_local_wheel_file(self): + staging_dir = self.make_temp_dir() + sdk_directory = self.make_temp_dir() + sdk_filename = 'apache_beam-1.0.0-cp27-cp27mu-manylinux1_x86_64.whl' + sdk_location = os.path.join(sdk_directory, sdk_filename) + self.create_temp_file(sdk_location, 'contents') + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).sdk_location = sdk_location + + self.assertEqual( + [sdk_filename], + dependency.stage_job_resources(options)) + tarball_path = os.path.join( + staging_dir, sdk_filename) + with open(tarball_path) as f: + self.assertEqual(f.read(), 'contents') + + def test_sdk_location_local_directory_not_present(self): staging_dir = self.make_temp_dir() sdk_location = 'nosuchdir' with self.assertRaises(RuntimeError) as cm: @@ -343,16 +434,43 @@ def test_sdk_location_local_not_present(self): def test_sdk_location_gcs(self): staging_dir = self.make_temp_dir() sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz' - self.override_file_copy(sdk_location, staging_dir) options = PipelineOptions() options.view_as(GoogleCloudOptions).staging_location = staging_dir self.update_options(options) options.view_as(SetupOptions).sdk_location = sdk_location - self.assertEqual( - [names.DATAFLOW_SDK_TARBALL_FILE], - dependency.stage_job_resources(options)) + with mock.patch('apache_beam.runners.dataflow.internal.' + 'dependency._dependency_file_copy'): + self.assertEqual( + [names.DATAFLOW_SDK_TARBALL_FILE], + dependency.stage_job_resources(options)) + + def test_sdk_location_http(self): + staging_dir = self.make_temp_dir() + sdk_location = 'http://storage.googleapis.com/my-gcs-bucket/tarball.tar.gz' + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).sdk_location = sdk_location + + def file_download(_, to_folder): + tarball_path = os.path.join(to_folder, 'sdk-tarball') + with open(tarball_path, 'w') as f: + f.write('SDK from HTTP location.') + return tarball_path + + with mock.patch('apache_beam.runners.dataflow.internal.' + 'dependency._dependency_file_download', file_download): + self.assertEqual( + [names.DATAFLOW_SDK_TARBALL_FILE], + dependency.stage_job_resources(options)) + + tarball_path = os.path.join( + staging_dir, names.DATAFLOW_SDK_TARBALL_FILE) + with open(tarball_path) as f: + self.assertEqual(f.read(), 'SDK from HTTP location.') def test_with_extra_packages(self): staging_dir = self.make_temp_dir() From 9597df4d6283f3b29b377c69621603949b188bda Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Wed, 11 Apr 2018 19:53:54 -0700 Subject: [PATCH 2/3] Make a few string literals more consistent. --- .../runners/dataflow/internal/dependency.py | 6 ++--- .../dataflow/internal/dependency_test.py | 26 +++++++++---------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 8917decb06b2..7cb8ef51d990 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -481,7 +481,7 @@ def _stage_beam_sdk(sdk_remote_location, staging_location, temp_dir): """Stages a Beam SDK file with the appropriate version. Args: - sdk_remote_location: A GCS path to a SDK file or a URL from + sdk_remote_location: A GCS path to a SDK file or a URL from which the file can be downloaded. The SDK file can be a tarball or a wheel. Set to 'pypi' to download and stage a wheel and source SDK from PyPi. staging_location: A GCS bucket where the SDK file should be copied. @@ -676,6 +676,6 @@ def _download_pypi_sdk_package(temp_dir, fetch_binary=False, if os.path.exists(tgz_expected): return tgz_expected raise RuntimeError( - 'Failed to download a distribution for the running SDK. Expected ' - 'either %s or %s to be found in the download folder.' % ( + 'Failed to download a source distribution for the running SDK. ' + 'Expected either %s or %s to be found in the download folder.' % ( zip_expected, tgz_expected)) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py index 70679a7dbaba..023e1ed4ee3d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py @@ -291,7 +291,7 @@ def pip_fake(args): elif args[7] == '--only-binary' and len(args) >= 18: if not has_wheels: # Imitate the case when desired wheel distribution is not in PyPI. - raise RuntimeError("No matching distribution.") + raise RuntimeError('No matching distribution.') # Per PEP-0427 in wheel filenames non-alphanumeric characters # in distribution name are replaced with underscore. @@ -304,9 +304,9 @@ def pip_fake(args): args[17] # platform ) - assert package_file, "Pip fake does not support the command: " + str(args) + assert package_file, 'Pip fake does not support the command: ' + str(args) self.create_temp_file( - FileSystems.join(args[5], package_file), "SDK from PyPi.") + FileSystems.join(args[5], package_file), 'Package content.') return pip_fake @@ -328,7 +328,7 @@ def test_sdk_location_default(self): with open(os.path.join( staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)) as f: - self.assertEqual(f.read(), 'SDK from PyPi.') + self.assertEqual(f.read(), 'Package content.') def test_sdk_location_default_with_wheels(self): staging_dir = self.make_temp_dir() @@ -351,7 +351,7 @@ def test_sdk_location_default_with_wheels(self): self.assertTrue(staged_resources[1].endswith('whl')) for name in staged_resources: with open(os.path.join(staging_dir, name)) as f: - self.assertEqual(f.read(), 'SDK from PyPi.') + self.assertEqual(f.read(), 'Package content.') def test_sdk_location_local_directory(self): staging_dir = self.make_temp_dir() @@ -360,7 +360,7 @@ def test_sdk_location_local_directory(self): os.path.join( sdk_location, names.DATAFLOW_SDK_TARBALL_FILE), - 'contents') + 'Package content.') options = PipelineOptions() options.view_as(GoogleCloudOptions).staging_location = staging_dir @@ -373,14 +373,14 @@ def test_sdk_location_local_directory(self): tarball_path = os.path.join( staging_dir, names.DATAFLOW_SDK_TARBALL_FILE) with open(tarball_path) as f: - self.assertEqual(f.read(), 'contents') + self.assertEqual(f.read(), 'Package content.') def test_sdk_location_local_source_file(self): staging_dir = self.make_temp_dir() sdk_directory = self.make_temp_dir() sdk_filename = 'apache-beam-3.0.0.tar.gz' sdk_location = os.path.join(sdk_directory, sdk_filename) - self.create_temp_file(sdk_location, 'contents') + self.create_temp_file(sdk_location, 'Package content.') options = PipelineOptions() options.view_as(GoogleCloudOptions).staging_location = staging_dir @@ -393,14 +393,14 @@ def test_sdk_location_local_source_file(self): tarball_path = os.path.join( staging_dir, names.DATAFLOW_SDK_TARBALL_FILE) with open(tarball_path) as f: - self.assertEqual(f.read(), 'contents') + self.assertEqual(f.read(), 'Package content.') def test_sdk_location_local_wheel_file(self): staging_dir = self.make_temp_dir() sdk_directory = self.make_temp_dir() sdk_filename = 'apache_beam-1.0.0-cp27-cp27mu-manylinux1_x86_64.whl' sdk_location = os.path.join(sdk_directory, sdk_filename) - self.create_temp_file(sdk_location, 'contents') + self.create_temp_file(sdk_location, 'Package content.') options = PipelineOptions() options.view_as(GoogleCloudOptions).staging_location = staging_dir @@ -413,7 +413,7 @@ def test_sdk_location_local_wheel_file(self): tarball_path = os.path.join( staging_dir, sdk_filename) with open(tarball_path) as f: - self.assertEqual(f.read(), 'contents') + self.assertEqual(f.read(), 'Package content.') def test_sdk_location_local_directory_not_present(self): staging_dir = self.make_temp_dir() @@ -458,7 +458,7 @@ def test_sdk_location_http(self): def file_download(_, to_folder): tarball_path = os.path.join(to_folder, 'sdk-tarball') with open(tarball_path, 'w') as f: - f.write('SDK from HTTP location.') + f.write('Package content.') return tarball_path with mock.patch('apache_beam.runners.dataflow.internal.' @@ -470,7 +470,7 @@ def file_download(_, to_folder): tarball_path = os.path.join( staging_dir, names.DATAFLOW_SDK_TARBALL_FILE) with open(tarball_path) as f: - self.assertEqual(f.read(), 'SDK from HTTP location.') + self.assertEqual(f.read(), 'Package content.') def test_with_extra_packages(self): staging_dir = self.make_temp_dir() From ea6a9c8799470af2dfadd940d88cb7aed4234f1f Mon Sep 17 00:00:00 2001 From: Valentyn Tymofieiev Date: Fri, 13 Apr 2018 22:47:26 -0700 Subject: [PATCH 3/3] Address review comments. --- .../runners/dataflow/internal/dependency.py | 88 ++++++++----------- .../dataflow/internal/dependency_test.py | 26 ++++-- 2 files changed, 60 insertions(+), 54 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index 7cb8ef51d990..e84579b3cbed 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -382,9 +382,6 @@ def stage_job_resources( else: stage_sdk_from_remote_location = False - staged_path = FileSystems.join( - google_cloud_options.staging_location, - _desired_sdk_filename_in_staging_location(setup_options.sdk_location)) if stage_sdk_from_remote_location: # If --sdk_location is not specified then the appropriate package # will be obtained from PyPI (https://pypi.python.org) based on the @@ -417,6 +414,10 @@ def stage_job_resources( sdk_path = setup_options.sdk_location if os.path.isfile(sdk_path): logging.info('Copying Beam SDK "%s" to staging location.', sdk_path) + staged_path = FileSystems.join( + google_cloud_options.staging_location, + _desired_sdk_filename_in_staging_location( + setup_options.sdk_location)) file_copy(sdk_path, staged_path) _, sdk_staged_filename = FileSystems.split(staged_path) resources.append(sdk_staged_filename) @@ -464,12 +465,8 @@ def _desired_sdk_filename_in_staging_location(sdk_location): sdk_location: Full path to SDK file. """ if sdk_location.endswith('.whl'): - if sdk_location.startswith('http'): - raise RuntimeError('Staging SDK wheel from an HTTP location is currently' - 'not supported.') _, wheel_filename = FileSystems.split(sdk_location) - if (wheel_filename.startswith('apache_beam') or - wheel_filename.startswith('google_cloud_dataflow')): + if wheel_filename.startswith('apache_beam'): return wheel_filename else: raise RuntimeError('Unrecognized SDK wheel file: %s' % sdk_location) @@ -631,51 +628,44 @@ def _download_pypi_sdk_package(temp_dir, fetch_binary=False, raise RuntimeError('Please set --sdk_location command-line option ' 'or install a valid {} distribution.' .format(package_name)) + cmd_args = [ + _get_python_executable(), '-m', 'pip', 'download', '--dest', temp_dir, + '%s==%s' % (package_name, version), '--no-deps'] + if fetch_binary: + logging.info('Downloading binary distribtution of the SDK from PyPi') # Get a wheel distribution for the SDK from PyPI. - cmd_args = [ - _get_python_executable(), '-m', 'pip', 'download', '--dest', temp_dir, - '%s==%s' % (package_name, version), - '--only-binary', ':all:', '--no-deps', - '--python-version', language_version_tag, + cmd_args.extend([ + '--only-binary', ':all:', '--python-version', language_version_tag, '--implementation', language_implementation_tag, '--abi', abi_tag, - '--platform', platform_tag - ] - logging.info('Executing command: %s', cmd_args) - try: - processes.check_call(cmd_args) - except subprocess.CalledProcessError as e: - raise RuntimeError(repr(e)) + '--platform', platform_tag]) # Example wheel: apache_beam-2.4.0-cp27-cp27mu-manylinux1_x86_64.whl - whl_expected = os.path.join( - temp_dir, - '%s-%s-%s%s-%s-%s.whl' % (package_name.replace('-', '_'), version, - language_implementation_tag, - language_version_tag, abi_tag, platform_tag) - ) - if os.path.exists(whl_expected): - return whl_expected - raise RuntimeError( - 'Failed to download requested binary distribution for the running SDK. ' - 'Expected %s to be found in the download folder.' % whl_expected) + expected_files = [ + os.path.join( + temp_dir, + '%s-%s-%s%s-%s-%s.whl' % (package_name.replace('-', '_'), version, + language_implementation_tag, + language_version_tag, abi_tag, + platform_tag))] else: - # Get a source distribution for the SDK package from PyPI. - cmd_args = [ - _get_python_executable(), '-m', 'pip', 'download', '--dest', temp_dir, - '%s==%s' % (package_name, version), - '--no-binary', ':all:', '--no-deps'] + logging.info('Downloading source distribtution of the SDK from PyPi') + cmd_args.extend(['--no-binary', ':all:']) + expected_files = [ + os.path.join(temp_dir, '%s-%s.zip' % (package_name, version)), + os.path.join(temp_dir, '%s-%s.tar.gz' % (package_name, version)) + ] - logging.info('Executing command: %s', cmd_args) + logging.info('Executing command: %s', cmd_args) + try: processes.check_call(cmd_args) - zip_expected = os.path.join( - temp_dir, '%s-%s.zip' % (package_name, version)) - if os.path.exists(zip_expected): - return zip_expected - tgz_expected = os.path.join( - temp_dir, '%s-%s.tar.gz' % (package_name, version)) - if os.path.exists(tgz_expected): - return tgz_expected - raise RuntimeError( - 'Failed to download a source distribution for the running SDK. ' - 'Expected either %s or %s to be found in the download folder.' % ( - zip_expected, tgz_expected)) + except subprocess.CalledProcessError as e: + raise RuntimeError(repr(e)) + + for sdk_file in expected_files: + if os.path.exists(sdk_file): + return sdk_file + + raise RuntimeError( + 'Failed to download a distribution for the running SDK. ' + 'Expected either one of %s to be found in the download folder.' % ( + expected_files)) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py index 023e1ed4ee3d..585cd234c886 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py @@ -271,11 +271,11 @@ def pip_fake(args): 1) Download SDK sources file: python pip -m download --dest /tmp/dir apache-beam==2.0.0 \ - --no-binary :all: --no-deps + --no-deps --no-binary :all: 2) Download SDK binary wheel file: python pip -m download --dest /tmp/dir apache-beam==2.0.0 \ - --no-binary :all: --no-deps --python-version 27 \ + --no-deps --no-binary :all: --python-version 27 \ --implementation cp --abi cp27mu --platform manylinux1_x86_64 """ package_file = None @@ -285,10 +285,10 @@ def pip_fake(args): distribution_name = args[6][0:args[6].find('==')] distribution_version = args[6][args[6].find('==')+2:] - if args[7] == '--no-binary': + if args[8] == '--no-binary': package_file = '%s-%s.zip' % ( distribution_name, distribution_version) - elif args[7] == '--only-binary' and len(args) >= 18: + elif args[8] == '--only-binary' and len(args) >= 18: if not has_wheels: # Imitate the case when desired wheel distribution is not in PyPI. raise RuntimeError('No matching distribution.') @@ -431,7 +431,7 @@ def test_sdk_location_local_directory_not_present(self): sdk_location, cm.exception.args[0]) - def test_sdk_location_gcs(self): + def test_sdk_location_gcs_source_file(self): staging_dir = self.make_temp_dir() sdk_location = 'gs://my-gcs-bucket/tarball.tar.gz' @@ -446,6 +446,22 @@ def test_sdk_location_gcs(self): [names.DATAFLOW_SDK_TARBALL_FILE], dependency.stage_job_resources(options)) + def test_sdk_location_gcs_wheel_file(self): + staging_dir = self.make_temp_dir() + sdk_filename = 'apache_beam-1.0.0-cp27-cp27mu-manylinux1_x86_64.whl' + sdk_location = 'gs://my-gcs-bucket/' + sdk_filename + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).sdk_location = sdk_location + + with mock.patch('apache_beam.runners.dataflow.internal.' + 'dependency._dependency_file_copy'): + self.assertEqual( + [sdk_filename], + dependency.stage_job_resources(options)) + def test_sdk_location_http(self): staging_dir = self.make_temp_dir() sdk_location = 'http://storage.googleapis.com/my-gcs-bucket/tarball.tar.gz'