From 2e0fc969a7f9bc89972f484bc4878e673777eea5 Mon Sep 17 00:00:00 2001 From: Sourabh Bajaj Date: Wed, 31 May 2017 17:54:17 -0700 Subject: [PATCH] [BEAM-1585] Add ability to attach a bootstrap script to the pipeline --- .../apache_beam/options/pipeline_options.py | 7 ++++ .../runners/dataflow/internal/dependency.py | 16 +++++++++ .../dataflow/internal/dependency_test.py | 35 +++++++++++++++++++ sdks/python/tox.ini | 2 ++ 4 files changed, 60 insertions(+) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 777926ab5297..043c081f99e2 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -534,6 +534,13 @@ def _add_argparse_args(cls, parser): 'workers during startup. The cache is refreshed as needed ' 'avoiding extra downloads for existing packages. Typically the ' 'file is named requirements.txt.')) + parser.add_argument( + '--bootstrap_script', + default=None, + help= + ('Bootstrap the python process before executing any code by executing ' + 'this script, the user can add variables or imports to the global' + 'scope here.')) parser.add_argument( '--requirements_cache', default=None, diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py index e69c8d7dac50..1024fe9e8eed 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency.py @@ -78,6 +78,7 @@ # Standard file names used for staging files. WORKFLOW_TARBALL_FILE = 'workflow.tar.gz' REQUIREMENTS_FILE = 'requirements.txt' +BOOTSTRAP_SCRIPT = 'bootstrap_script.py' EXTRA_PACKAGES_FILE = 'extra_packages.txt' GOOGLE_PACKAGE_NAME = 'google-cloud-dataflow' @@ -311,6 +312,21 @@ def stage_job_resources( os.path.basename(pkg))) resources.append(os.path.basename(pkg)) + # Stage a bootstrap script file if present. + if setup_options.bootstrap_script is not None: + if not os.path.isfile(setup_options.bootstrap_script): + raise RuntimeError('The file %s cannot be found. It was specified in the ' + '--bootstrap_script command line option.' % + setup_options.bootstrap_script) + if not setup_options.bootstrap_script.endswith('.py'): + raise RuntimeError('The --bootstrap_script option expects the ' + 'full path to a .py file' % + setup_options.bootstrap_script) + staged_path = FileSystems.join(google_cloud_options.staging_location, + BOOTSTRAP_SCRIPT) + file_copy(setup_options.bootstrap_script, staged_path) + resources.append(BOOTSTRAP_SCRIPT) + # Handle a setup file if present. # We will build the setup package locally and then copy it to the staging # location because the staging location is a GCS path and the file cannot be diff --git a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py index 5eac7d608010..2ea13b723dc4 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/dependency_test.py @@ -237,6 +237,41 @@ def test_setup_file_not_named_setup_dot_py(self): 'The --setup_file option expects the full path to a file named ' 'setup.py instead of ')) + def test_with_bootstrap_script(self): + try: + staging_dir = tempfile.mkdtemp() + source_dir = tempfile.mkdtemp() + + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).bootstrap_script = os.path.join( + source_dir, dependency.BOOTSTRAP_SCRIPT) + self.create_temp_file( + os.path.join(source_dir, dependency.BOOTSTRAP_SCRIPT), 'nothing') + self.assertEqual( + [dependency.BOOTSTRAP_SCRIPT], + dependency.stage_job_resources(options)) + self.assertTrue( + os.path.isfile( + os.path.join(staging_dir, dependency.BOOTSTRAP_SCRIPT))) + finally: + shutil.rmtree(staging_dir) + shutil.rmtree(source_dir) + + def test_bootstrap_script_not_present(self): + staging_dir = tempfile.mkdtemp() + with self.assertRaises(RuntimeError) as cm: + options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir + self.update_options(options) + options.view_as(SetupOptions).bootstrap_script = 'nosuchfile' + dependency.stage_job_resources(options) + self.assertEqual( + cm.exception.message, + 'The file %s cannot be found. It was specified in the ' + '--bootstrap_script command line option.' % 'nosuchfile') + def override_file_copy(self, expected_from_path, expected_to_dir): def file_copy(from_path, to_path): if not from_path.endswith(names.PICKLED_MAIN_SESSION_FILE): diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 2166f6a4ee80..eff91fece986 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -29,6 +29,7 @@ select = E3 deps = nose==1.3.7 grpcio-tools==1.3.5 +whitelist_externals=find commands = python --version # Clean up all previous python generated files. @@ -73,6 +74,7 @@ passenv = TRAVIS* # autocomplete_test depends on nose when invoked directly. deps = nose==1.3.7 +whitelist_externals=find commands = pip install -e .[test,gcp] python --version