From d3e6babebd3f2703cc3110fef85bb8ff6dd9bda5 Mon Sep 17 00:00:00 2001 From: Thomas Weise Date: Thu, 27 Sep 2018 14:43:11 -0700 Subject: [PATCH] [BEAM-5443] Pipeline option defaults for portable runner. --- sdks/python/apache_beam/pipeline.py | 9 +++++++++ .../apache_beam/runners/portability/flink_runner_test.py | 5 +++-- .../runners/portability/fn_api_runner_test.py | 7 +------ .../apache_beam/runners/portability/portable_runner.py | 7 +++++++ sdks/python/build.gradle | 2 -- 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 5a4c1dc9228d6..68e313909a3a7 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -60,6 +60,7 @@ from apache_beam import pvalue from apache_beam.internal import pickler from apache_beam.io.filesystems import FileSystems +from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions @@ -152,6 +153,14 @@ def __init__(self, runner=None, options=None, argv=None): raise ValueError( 'Pipeline has validations errors: \n' + '\n'.join(errors)) + # set default experiments for portable runner + # (needs to occur prior to pipeline construction) + if self._options.view_as(StandardOptions).runner == 'PortableRunner': + experiments = (self._options.view_as(DebugOptions).experiments or []) + if not 'beam_fn_api' in experiments: + experiments.append('beam_fn_api') + self._options.view_as(DebugOptions).experiments = experiments + # Default runner to be used. self.runner = runner # Stack of transforms generated by nested apply() calls. The stack will diff --git a/sdks/python/apache_beam/runners/portability/flink_runner_test.py b/sdks/python/apache_beam/runners/portability/flink_runner_test.py index 5aba3f1607672..09261c9030be9 100644 --- a/sdks/python/apache_beam/runners/portability/flink_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/flink_runner_test.py @@ -25,7 +25,6 @@ import apache_beam as beam from apache_beam.options.pipeline_options import DebugOptions -from apache_beam.options.pipeline_options import SetupOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.runners.portability import portable_runner from apache_beam.runners.portability import portable_runner_test @@ -66,7 +65,6 @@ def get_runner(cls): def create_options(self): options = super(FlinkRunnerTest, self).create_options() options.view_as(DebugOptions).experiments = ['beam_fn_api'] - options.view_as(SetupOptions).sdk_location = 'container' if streaming: options.view_as(StandardOptions).streaming = True return options @@ -80,6 +78,9 @@ def test_read(self): def test_no_subtransform_composite(self): raise unittest.SkipTest("BEAM-4781") + def test_pardo_state_only(self): + raise unittest.SkipTest("BEAM-2918 - User state not yet supported.") + # Inherits all other tests. # Run the tests. diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index 98f073387b552..a1a4c655a4d21 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -243,11 +243,6 @@ def cross_product(elem, sides): equal_to([('a', 'a'), ('a', 'b'), ('b', 'a'), ('b', 'b')])) def test_pardo_state_only(self): - p = self.create_pipeline() - if not isinstance(p.runner, fn_api_runner.FnApiRunner): - # test is inherited by Flink PVR, which does not support the feature yet - self.skipTest('User state not supported.') - index_state_spec = userstate.CombiningValueStateSpec( 'index', beam.coders.VarIntCoder(), sum) @@ -265,7 +260,7 @@ def process(self, kv, index=beam.DoFn.StateParam(index_state_spec)): ('B', 'b', 2), ('B', 'b', 3)] - with p: + with self.create_pipeline() as p: assert_that(p | beam.Create(inputs) | beam.ParDo(AddIndex()), equal_to(expected)) diff --git a/sdks/python/apache_beam/runners/portability/portable_runner.py b/sdks/python/apache_beam/runners/portability/portable_runner.py index ec53726632498..16643c54207ef 100644 --- a/sdks/python/apache_beam/runners/portability/portable_runner.py +++ b/sdks/python/apache_beam/runners/portability/portable_runner.py @@ -28,6 +28,7 @@ from apache_beam import metrics from apache_beam.internal import pickler from apache_beam.options.pipeline_options import PortableOptions +from apache_beam.options.pipeline_options import SetupOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_job_api_pb2 from apache_beam.portability.api import beam_job_api_pb2_grpc @@ -104,6 +105,12 @@ def _create_environment(options): def run_pipeline(self, pipeline): portable_options = pipeline.options.view_as(PortableOptions) job_endpoint = portable_options.job_endpoint + + # TODO: https://issues.apache.org/jira/browse/BEAM-5525 + # portable runner specific default + if pipeline.options.view_as(SetupOptions).sdk_location == 'default': + pipeline.options.view_as(SetupOptions).sdk_location = 'container' + if not job_endpoint: docker = DockerizedJobServer() job_endpoint = docker.start() diff --git a/sdks/python/build.gradle b/sdks/python/build.gradle index dd9c8f1fd2ae5..b99f153697f7d 100644 --- a/sdks/python/build.gradle +++ b/sdks/python/build.gradle @@ -245,9 +245,7 @@ task portableWordCount(dependsOn: 'installGcpTest') { def options = [ "--input=/etc/profile", "--output=/tmp/py-wordcount-direct", - "--experiments=beam_fn_api", "--runner=PortableRunner", - "--sdk_location=container", ] if (project.hasProperty("streaming")) options += ["--streaming"]