From a4e36ef94c4f7a26c96e424c18408b5ae42ca16d Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Thu, 12 Oct 2017 19:17:28 -0700 Subject: [PATCH 1/2] Add an option for dataflow job labels. --- sdks/python/apache_beam/options/pipeline_options.py | 7 +++++++ .../runners/dataflow/internal/apiclient.py | 11 +++++++++++ 2 files changed, 18 insertions(+) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 2598551298e9..7d3859425fe6 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -374,6 +374,13 @@ def _add_argparse_args(cls, parser): parser.add_argument('--template_location', default=None, help='Save job to specified local or GCS location.') + parser.add_argument( + '--label', '--labels', + dest='labels', + action='append', + default=None, + help='Labels that will be applied to the billing records for this job. ' + 'Labels are key value pairs separated by = (e.g. --label key=value).') def validate(self, validator): errors = [] diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index e48b58c3b89a..618756931278 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -363,6 +363,17 @@ def __init__(self, options): self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_STREAMING else: self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH + + # Labels + if self.google_cloud_options.labels: + self.proto.labels = dataflow.Job.LabelsValue() + for label in self.google_cloud_options.labels: + label = label.split('=') + key = label[0] + value = label[1] if len(label) > 1 else '' + self.proto.labels.additionalProperties.append( + dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value)) + self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$') self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$') From ba662782086d4fcd61303c962507a7a3b3d29ff9 Mon Sep 17 00:00:00 2001 From: Ahmet Altay Date: Fri, 13 Oct 2017 09:55:19 -0700 Subject: [PATCH 2/2] Address reviewer comments. --- sdks/python/apache_beam/options/pipeline_options.py | 4 ++-- .../apache_beam/runners/dataflow/internal/apiclient.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 7d3859425fe6..a09c7c317daa 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -379,8 +379,8 @@ def _add_argparse_args(cls, parser): dest='labels', action='append', default=None, - help='Labels that will be applied to the billing records for this job. ' - 'Labels are key value pairs separated by = (e.g. --label key=value).') + help='Labels that will be applied to this Dataflow job. Labels are key ' + 'value pairs separated by = (e.g. --label key=value).') def validate(self, validator): errors = [] diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index 618756931278..eec598a4ce37 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -364,13 +364,13 @@ def __init__(self, options): else: self.proto.type = dataflow.Job.TypeValueValuesEnum.JOB_TYPE_BATCH - # Labels + # Labels. if self.google_cloud_options.labels: self.proto.labels = dataflow.Job.LabelsValue() for label in self.google_cloud_options.labels: - label = label.split('=') - key = label[0] - value = label[1] if len(label) > 1 else '' + parts = label.split('=', 1) + key = parts[0] + value = parts[1] if len(parts) > 1 else '' self.proto.labels.additionalProperties.append( dataflow.Job.LabelsValue.AdditionalProperty(key=key, value=value))