From 97bfdcbb546563c8914438f3d8f78eb826f36348 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 6 Nov 2017 20:36:09 -0800 Subject: [PATCH] Stage the portable pipeline; put URL in pipeline options --- .../runners/dataflow/internal/apiclient.py | 8 +----- .../dataflow/internal/apiclient_test.py | 28 +++++++++++++++++++ 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 3aa563dc071b..64c4ac98ac27 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -190,18 +190,11 @@ def __init__(self, packages, options, environment_version, pipeline_url): pool = dataflow.WorkerPool( kind='local' if self.local else 'harness', packages=package_descriptors, - # https://issues.apache.org/jira/browse/BEAM-3116 - # metadata=dataflow.WorkerPool.MetadataValue(), taskrunnerSettings=dataflow.TaskRunnerSettings( parallelWorkerSettings=dataflow.WorkerSettings( baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT, servicePath=self.google_cloud_options.dataflow_endpoint))) - # https://issues.apache.org/jira/browse/BEAM-3116 - # pool.metadata.additionalProperties.append( - # dataflow.WorkerPool.MetadataValue.AdditionalProperty( - # key=names.STAGED_PIPELINE_URL_METADATA_FIELD, value=pipeline_url)) - pool.autoscalingSettings = dataflow.AutoscalingSettings() # Set worker pool options received through command line. if self.worker_options.num_workers: @@ -260,6 +253,7 @@ def __init__(self, packages, options, environment_version, pipeline_url): options_dict = {k: v for k, v in sdk_pipeline_options.iteritems() if v is not None} + options_dict["pipelineUrl"] = pipeline_url self.proto.sdkPipelineOptions.additionalProperties.append( dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( key='options', value=to_json_value(options_dict))) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index f8a4471815c8..bd851092ecc9 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -44,6 +44,34 @@ def test_create_application_client(self): pipeline_options = PipelineOptions() apiclient.DataflowApplicationClient(pipeline_options) + def test_pipeline_url(self): + pipeline_options = PipelineOptions( + ['--subnetwork', '/regions/MY/subnetworks/SUBNETWORK', + '--temp_location', 'gs://any-location/temp']) + env = apiclient.Environment([], + pipeline_options, + '2.0.0', # any environment version + FAKE_PIPELINE_URL) + + recovered_options = None + for additionalProperty in env.proto.sdkPipelineOptions.additionalProperties: + if additionalProperty.key == 'options': + recovered_options = additionalProperty.value + break + else: + self.fail('No pipeline options found in %s' + % env.proto.sdkPipelineOptions) + + pipeline_url = None + for property in recovered_options.object_value.properties: + if property.key == 'pipelineUrl': + pipeline_url = property.value + break + else: + self.fail('No pipeline_url found in %s' % recovered_options) + + self.assertEqual(pipeline_url.string_value, FAKE_PIPELINE_URL) + def test_set_network(self): pipeline_options = PipelineOptions( ['--network', 'anetworkname',