diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 5e3b7f0fc7d3..de92519fb38e 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -20,6 +20,7 @@ from __future__ import absolute_import import argparse +import json import logging from builtins import list from builtins import object @@ -476,7 +477,16 @@ def _add_argparse_args(cls, parser): action='store_true', help='Update an existing streaming Cloud Dataflow job. ' 'Experimental. ' - 'See https://cloud.google.com/dataflow/pipelines/' + 'See https://cloud.google.com/dataflow/docs/guides/' + 'updating-a-pipeline') + parser.add_argument('--transform_name_mapping', + default=None, + type=json.loads, + help='The transform mapping that maps the named ' + 'transforms in your prior pipeline code to names ' + 'in your replacement pipeline code.' + 'Experimental. ' + 'See https://cloud.google.com/dataflow/docs/guides/' 'updating-a-pipeline') parser.add_argument('--enable_streaming_engine', default=False, diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py b/sdks/python/apache_beam/options/pipeline_options_test.py index 1b8ee083a48a..fdedfc4c118c 100644 --- a/sdks/python/apache_beam/options/pipeline_options_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_test.py @@ -25,6 +25,7 @@ import hamcrest as hc from apache_beam.options.pipeline_options import DebugOptions +from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import ProfilingOptions from apache_beam.options.pipeline_options import TypeOptions @@ -477,6 +478,11 @@ def test_lookup_experiments(self): True, debug_options.lookup_experiment('existing_experiment')) + def test_transform_name_mapping(self): + options = PipelineOptions(['--transform_name_mapping={\"from\":\"to\"}']) + mapping = options.view_as(GoogleCloudOptions).transform_name_mapping + self.assertEqual(mapping['from'], 'to') + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/options/pipeline_options_validator.py b/sdks/python/apache_beam/options/pipeline_options_validator.py index 373a7b933e33..8f7c946c1a86 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator.py @@ -24,6 +24,8 @@ import re from builtins import object +from past.builtins import unicode + from apache_beam.internal import pickler from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions @@ -75,6 +77,9 @@ class PipelineOptionsValidator(object): ERR_INVALID_TEST_MATCHER_UNPICKLABLE = ( 'Invalid value (%s) for option: %s. Please make sure the test matcher ' 'is unpicklable.') + ERR_INVALID_TRANSFORM_NAME_MAPPING = ( + 'Invalid transform name mapping format. Please make sure the mapping is ' + 'string key-value pairs. Invalid pair: (%s:%s)') # GCS path specific patterns. GCS_URI = '(?P[^:]+)://(?P[^/]+)(/(?P.*))?' @@ -174,6 +179,16 @@ def validate_cloud_options(self, view): if not view.job_name: errors.extend(self._validate_error( 'Existing job name must be provided when updating a pipeline.')) + if view.transform_name_mapping: + if not view.update or not self.options.view_as(StandardOptions).streaming: + errors.append('Transform name mapping option is only useful when ' + '--update and --streaming is specified') + for _, (key, value) in enumerate(view.transform_name_mapping.items()): + if not isinstance(key, (str, unicode)) \ + or not isinstance(value, (str, unicode)): + errors.extend(self._validate_error( + self.ERR_INVALID_TRANSFORM_NAME_MAPPING, key, value)) + break return errors def validate_optional_argument_positive(self, view, arg_name): diff --git a/sdks/python/apache_beam/options/pipeline_options_validator_test.py b/sdks/python/apache_beam/options/pipeline_options_validator_test.py index 9818422e2f69..70107164f8da 100644 --- a/sdks/python/apache_beam/options/pipeline_options_validator_test.py +++ b/sdks/python/apache_beam/options/pipeline_options_validator_test.py @@ -23,6 +23,9 @@ import unittest from builtins import object +from hamcrest import assert_that +from hamcrest import contains_string +from hamcrest import only_contains from hamcrest.core.base_matcher import BaseMatcher from apache_beam.internal import pickler @@ -333,6 +336,36 @@ def get_validator(matcher): self.assertEqual( self.check_errors_for_arguments(errors, case['errors']), []) + def test_transform_name_mapping_without_update(self): + options = ['--project=example:example', + '--staging_location=gs://foo/bar', + '--temp_location=gs://foo/bar', + '--transform_name_mapping={\"fromPardo\":\"toPardo\"}'] + + pipeline_options = PipelineOptions(options) + runner = MockRunners.DataflowRunner() + validator = PipelineOptionsValidator(pipeline_options, runner) + errors = validator.validate() + assert_that(errors, only_contains( + contains_string('Transform name mapping option is only useful when ' + '--update and --streaming is specified'))) + + def test_transform_name_mapping_invalid_format(self): + options = ['--project=example:example', + '--staging_location=gs://foo/bar', + '--temp_location=gs://foo/bar', + '--update', + '--job_name=test', + '--streaming', + '--transform_name_mapping={\"fromPardo\":123}'] + + pipeline_options = PipelineOptions(options) + runner = MockRunners.DataflowRunner() + validator = PipelineOptionsValidator(pipeline_options, runner) + errors = validator.validate() + assert_that(errors, only_contains( + contains_string('Invalid transform name mapping format.'))) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index a0f307d92e49..30bd32c0b47e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -404,7 +404,14 @@ def __init__(self, options, proto_pipeline): self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH if self.google_cloud_options.update: self.proto.replaceJobId = self.job_id_for_name(self.proto.name) - + if self.google_cloud_options.transform_name_mapping: + self.proto.transformNameMapping = ( + dataflow.Job.TransformNameMappingValue()) + for _, (key, value) in enumerate( + self.google_cloud_options.transform_name_mapping.items()): + self.proto.transformNameMapping.additionalProperties.append( + dataflow.Job.TransformNameMappingValue + .AdditionalProperty(key=key, value=value)) # Labels. if self.google_cloud_options.labels: self.proto.labels = dataflow.Job.LabelsValue() diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 744643bc3fcf..96e02763e2f6 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -510,6 +510,16 @@ def test_worker_harness_override_takes_precedence_over_sdk_defaults(self): env.proto.workerPools[0].workerHarnessContainerImage, 'some:image') + @mock.patch('apache_beam.runners.dataflow.internal.apiclient.Job.' + 'job_id_for_name', return_value='test_id') + def test_transform_name_mapping(self, mock_job): + pipeline_options = PipelineOptions( + ['--project', 'test_project', '--job_name', 'test_job_name', + '--temp_location', 'gs://test-location/temp', '--update', + '--transform_name_mapping', '{\"from\":\"to\"}']) + job = apiclient.Job(pipeline_options, FAKE_PIPELINE_URL) + self.assertIsNotNone(job.proto.transformNameMapping) + def test_labels(self): pipeline_options = PipelineOptions( ['--project', 'test_project', '--job_name', 'test_job_name',