Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from __future__ import absolute_import

import argparse
import json
import logging
from builtins import list
from builtins import object
Expand Down Expand Up @@ -476,7 +477,16 @@ def _add_argparse_args(cls, parser):
action='store_true',
help='Update an existing streaming Cloud Dataflow job. '
'Experimental. '
'See https://cloud.google.com/dataflow/pipelines/'
'See https://cloud.google.com/dataflow/docs/guides/'
'updating-a-pipeline')
parser.add_argument('--transform_name_mapping',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I can tell python does not have a place for streaming specific options. Is it worth making it and moving this there?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option is probably more update related, which is currently only support with dataflow runner I guess?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. Update is only supported by Dataflow.

default=None,
type=json.loads,
help='The transform mapping that maps the named '
'transforms in your prior pipeline code to names '
'in your replacement pipeline code.'
'Experimental. '
'See https://cloud.google.com/dataflow/docs/guides/'
'updating-a-pipeline')
parser.add_argument('--enable_streaming_engine',
default=False,
Expand Down
6 changes: 6 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import hamcrest as hc

from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import ProfilingOptions
from apache_beam.options.pipeline_options import TypeOptions
Expand Down Expand Up @@ -477,6 +478,11 @@ def test_lookup_experiments(self):
True,
debug_options.lookup_experiment('existing_experiment'))

def test_transform_name_mapping(self):
options = PipelineOptions(['--transform_name_mapping={\"from\":\"to\"}'])
mapping = options.view_as(GoogleCloudOptions).transform_name_mapping
self.assertEqual(mapping['from'], 'to')


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
15 changes: 15 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import re
from builtins import object

from past.builtins import unicode

from apache_beam.internal import pickler
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
Expand Down Expand Up @@ -75,6 +77,9 @@ class PipelineOptionsValidator(object):
ERR_INVALID_TEST_MATCHER_UNPICKLABLE = (
'Invalid value (%s) for option: %s. Please make sure the test matcher '
'is unpicklable.')
ERR_INVALID_TRANSFORM_NAME_MAPPING = (
'Invalid transform name mapping format. Please make sure the mapping is '
'string key-value pairs. Invalid pair: (%s:%s)')

# GCS path specific patterns.
GCS_URI = '(?P<SCHEME>[^:]+)://(?P<BUCKET>[^/]+)(/(?P<OBJECT>.*))?'
Expand Down Expand Up @@ -174,6 +179,16 @@ def validate_cloud_options(self, view):
if not view.job_name:
errors.extend(self._validate_error(
'Existing job name must be provided when updating a pipeline.'))
if view.transform_name_mapping:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also confirm in here that the job is streaming and update.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added streaming option validation below.

if not view.update or not self.options.view_as(StandardOptions).streaming:
errors.append('Transform name mapping option is only useful when '
'--update and --streaming is specified')
for _, (key, value) in enumerate(view.transform_name_mapping.items()):
if not isinstance(key, (str, unicode)) \
or not isinstance(value, (str, unicode)):
errors.extend(self._validate_error(
self.ERR_INVALID_TRANSFORM_NAME_MAPPING, key, value))
break
return errors

def validate_optional_argument_positive(self, view, arg_name):
Expand Down
33 changes: 33 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options_validator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import unittest
from builtins import object

from hamcrest import assert_that
from hamcrest import contains_string
from hamcrest import only_contains
from hamcrest.core.base_matcher import BaseMatcher

from apache_beam.internal import pickler
Expand Down Expand Up @@ -333,6 +336,36 @@ def get_validator(matcher):
self.assertEqual(
self.check_errors_for_arguments(errors, case['errors']), [])

def test_transform_name_mapping_without_update(self):
options = ['--project=example:example',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar',
'--transform_name_mapping={\"fromPardo\":\"toPardo\"}']

pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
errors = validator.validate()
assert_that(errors, only_contains(
contains_string('Transform name mapping option is only useful when '
'--update and --streaming is specified')))

def test_transform_name_mapping_invalid_format(self):
options = ['--project=example:example',
'--staging_location=gs://foo/bar',
'--temp_location=gs://foo/bar',
'--update',
'--job_name=test',
'--streaming',
'--transform_name_mapping={\"fromPardo\":123}']

pipeline_options = PipelineOptions(options)
runner = MockRunners.DataflowRunner()
validator = PipelineOptionsValidator(pipeline_options, runner)
errors = validator.validate()
assert_that(errors, only_contains(
contains_string('Invalid transform name mapping format.')))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,14 @@ def __init__(self, options, proto_pipeline):
self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH
if self.google_cloud_options.update:
self.proto.replaceJobId = self.job_id_for_name(self.proto.name)

if self.google_cloud_options.transform_name_mapping:
self.proto.transformNameMapping = (
dataflow.Job.TransformNameMappingValue())
for _, (key, value) in enumerate(
self.google_cloud_options.transform_name_mapping.items()):
self.proto.transformNameMapping.additionalProperties.append(
dataflow.Job.TransformNameMappingValue
.AdditionalProperty(key=key, value=value))
# Labels.
if self.google_cloud_options.labels:
self.proto.labels = dataflow.Job.LabelsValue()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,16 @@ def test_worker_harness_override_takes_precedence_over_sdk_defaults(self):
env.proto.workerPools[0].workerHarnessContainerImage,
'some:image')

@mock.patch('apache_beam.runners.dataflow.internal.apiclient.Job.'
'job_id_for_name', return_value='test_id')
def test_transform_name_mapping(self, mock_job):
pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
'--temp_location', 'gs://test-location/temp', '--update',
'--transform_name_mapping', '{\"from\":\"to\"}'])
job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL)
self.assertIsNotNone(job.proto.transformNameMapping)

def test_labels(self):
pipeline_options = PipelineOptions(
['--project', 'test_project', '--job_name', 'test_job_name',
Expand Down