Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -190,18 +190,11 @@ def __init__(self, packages, options, environment_version, pipeline_url):
pool = dataflow.WorkerPool(
kind='local' if self.local else 'harness',
packages=package_descriptors,
# https://issues.apache.org/jira/browse/BEAM-3116
# metadata=dataflow.WorkerPool.MetadataValue(),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this line be uncommented rather than deleted?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

robertwb wrote:
Should this line be uncommented rather than deleted?

Actually they can't work right now. The issue is an internal Dataflow bug that manifests as BEAM-3116. TL;DR: we cannot use worker pool metadata at all.

Template jobs are encoded in proto3 JSON format, more or less, and when they are run they are decoded via proto2 JSON format. This bug went undetected until now because there were no nonempty maps. Longer term, templates should use a proto binary encoding and perhaps be created service-side.

For now, putting this in pipeline options instead, as a generic "bag of values".

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we leave these comments here as a reminder that we need to fix the bug and pass the pool.metadata (here or elsewhere)? Or is this just some cleanup (that's irrelevant to the other changes in this PR?)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an internal bug filed, and discussion on going, on fixing the pool metadata situation as well as designing the very best place for this sort of thing. IMO for the purposes of this PR both pool metadata and pipeline options are both sort of fine places to put this data, but neither are obviously the Right Thing To Do. So I really do mean to fully abandon ever trying to put this data in the pool metadata.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the context. LGTM.

taskrunnerSettings=dataflow.TaskRunnerSettings(
parallelWorkerSettings=dataflow.WorkerSettings(
baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
servicePath=self.google_cloud_options.dataflow_endpoint)))

# https://issues.apache.org/jira/browse/BEAM-3116
# pool.metadata.additionalProperties.append(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

# dataflow.WorkerPool.MetadataValue.AdditionalProperty(
# key=names.STAGED_PIPELINE_URL_METADATA_FIELD, value=pipeline_url))

pool.autoscalingSettings = dataflow.AutoscalingSettings()
# Set worker pool options received through command line.
if self.worker_options.num_workers:
Expand Down Expand Up @@ -260,6 +253,7 @@ def __init__(self, packages, options, environment_version, pipeline_url):
options_dict = {k: v
for k, v in sdk_pipeline_options.iteritems()
if v is not None}
options_dict["pipelineUrl"] = pipeline_url
self.proto.sdkPipelineOptions.additionalProperties.append(
dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
key='options', value=to_json_value(options_dict)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,34 @@ def test_create_application_client(self):
pipeline_options = PipelineOptions()
apiclient.DataflowApplicationClient(pipeline_options)

def test_pipeline_url(self):
pipeline_options = PipelineOptions(
['--subnetwork', '/regions/MY/subnetworks/SUBNETWORK',
'--temp_location', 'gs://any-location/temp'])
env = apiclient.Environment([],
pipeline_options,
'2.0.0', # any environment version
FAKE_PIPELINE_URL)

recovered_options = None
for additionalProperty in env.proto.sdkPipelineOptions.additionalProperties:
if additionalProperty.key == 'options':
recovered_options = additionalProperty.value
break
else:
self.fail('No pipeline options found in %s'
% env.proto.sdkPipelineOptions)

pipeline_url = None
for property in recovered_options.object_value.properties:
if property.key == 'pipelineUrl':
pipeline_url = property.value
break
else:
self.fail('No pipeline_url found in %s' % recovered_options)

self.assertEqual(pipeline_url.string_value, FAKE_PIPELINE_URL)

def test_set_network(self):
pipeline_options = PipelineOptions(
['--network', 'anetworkname',
Expand Down