From 2f23c0009f315e0075039537f2b57c88a4f921bf Mon Sep 17 00:00:00 2001 From: Vikas Kedigehalli Date: Fri, 16 Jun 2017 17:18:57 -0700 Subject: [PATCH] Add dry run option to DataflowRunner --- sdks/python/apache_beam/options/pipeline_options.py | 5 +++++ .../apache_beam/runners/dataflow/dataflow_runner.py | 7 ++++++- .../apache_beam/runners/dataflow/dataflow_runner_test.py | 9 ++++----- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 8644e51b2dbe..dab8ff204d3a 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -605,6 +605,11 @@ def _add_argparse_args(cls, parser): help=('Verify state/output of e2e test pipeline. This is pickled ' 'version of the matcher which should extends ' 'hamcrest.core.base_matcher.BaseMatcher.')) + parser.add_argument( + '--dry_run', + default=False, + help=('Used in unit testing runners without submitting the ' + 'actual job.')) def validate(self, validator): errors = [] diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index cc9274ec40c7..8c7818a33ddb 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -47,6 +47,7 @@ from apache_beam.typehints import typehints from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.options.pipeline_options import TestOptions from apache_beam.utils.plugin import BeamPlugin @@ -217,7 +218,6 @@ def visit_transform(self, transform_node): return FlattenInputVisitor() - # TODO(mariagh): Make this method take pipepline_options def run(self, pipeline): """Remotely executes entire pipeline or parts reachable from node.""" # Import here to avoid adding the dependency for local running scenarios. @@ -249,6 +249,11 @@ def run(self, pipeline): # The superclass's run will trigger a traversal of all reachable nodes. super(DataflowRunner, self).run(pipeline) + test_options = pipeline._options.view_as(TestOptions) + # If it is a dry run, return without submitting the job. + if test_options.dry_run: + return None + standard_options = pipeline._options.view_as(StandardOptions) if standard_options.streaming: job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 74fd01df7bc2..e349b21e9a1a 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -59,7 +59,8 @@ class DataflowRunnerTest(unittest.TestCase): '--project=test-project', '--staging_location=ignored', '--temp_location=/dev/null', - '--no_auth=True'] + '--no_auth=True', + '--dry_run=True'] @mock.patch('time.sleep', return_value=None) def test_wait_until_finish(self, patched_time_sleep): @@ -108,8 +109,7 @@ def test_remote_runner_translation(self): (p | ptransform.Create([1, 2, 3]) # pylint: disable=expression-not-assigned | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - remote_runner.job = apiclient.Job(p._options) - super(DataflowRunner, remote_runner).run(p) + p.run() def test_remote_runner_display_data(self): remote_runner = DataflowRunner() @@ -142,8 +142,7 @@ def process(self): (p | ptransform.Create([1, 2, 3, 4, 5]) | 'Do' >> SpecialParDo(SpecialDoFn(), now)) - remote_runner.job = apiclient.Job(p._options) - super(DataflowRunner, remote_runner).run(p) + p.run() job_dict = json.loads(str(remote_runner.job)) steps = [step for step in job_dict['steps']