From 11117482b107735da8e4cc76a96d05a99a2e19b2 Mon Sep 17 00:00:00 2001 From: Maria Garcia Herrero Date: Thu, 25 May 2017 18:21:46 -0700 Subject: [PATCH 1/4] Add template examples to snippets.py --- .../apache_beam/examples/snippets/snippets.py | 85 +++++++++++++++++++ .../examples/snippets/snippets_test.py | 15 +++- 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 70929e9e8fae..3067f0686bca 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -502,6 +502,52 @@ def process(self, element): p.visit(SnippetUtils.RenameFiles(renames)) +def examples_wordcount_templated(renames): + """Templated WordCount example snippet.""" + import re + + import apache_beam as beam + from apache_beam.io import ReadFromText + from apache_beam.io import WriteToText + from apache_beam.options.pipeline_options import PipelineOptions + + # [START example_wordcount_templated] + class WordcountOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + # Use add_value_provider_argument for arguments to be templatable + # Use add_argument as usual for non-templatable arguments + parser.add_value_provider_argument( + '--input', + # default = 'gs://dataflow-samples/shakespeare/kinglear.txt', + help='Path of the file to read from') + parser.add_argument( + '--output', + required=True, + help='Output file to write results to.') + pipeline_options = PipelineOptions(['--output', 'some/output_path']) + p = beam.Pipeline(options=pipeline_options) + + wordcount_options = pipeline_options.view_as(WordcountOptions) + lines = p | 'read' >> ReadFromText(wordcount_options.input) + # [END example_wordcount_templated] + + ( + lines + | 'ExtractWords' >> beam.FlatMap( + lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'PairWithOnes' >> beam.Map(lambda x: (x, 1)) + | 'Group' >> beam.GroupByKey() + | 'Sum' >> beam.Map(lambda (word, ones): (word, sum(ones))) + | 'Format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + | 'Write' >> WriteToText(wordcount_options.output) + ) + + p.visit(SnippetUtils.RenameFiles(renames)) + result = p.run() + result.wait_until_finish() + + def examples_wordcount_debugging(renames): """DebuggingWordCount example snippets.""" import re @@ -569,6 +615,45 @@ def process(self, element): p.visit(SnippetUtils.RenameFiles(renames)) +def examples_ptransforms_templated(renames): + # [START examples_ptransforms_templated] + import apache_beam as beam + from apache_beam.options.pipeline_options import PipelineOptions + from apache_beam.utils.value_provider import StaticValueProvider + from apache_beam.io import WriteToText + + class UserOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_value_provider_argument('--templated_int', type=int) + + class MySumFn(beam.DoFn): + def __init__(self, templated_int): + self.templated_int = templated_int + + def process(self, an_int): + yield self.templated_int.get() + an_int + + pipeline_options = PipelineOptions() + p = beam.Pipeline(options=pipeline_options) + + user_options = pipeline_options.view_as(UserOptions) + my_sum_fn = MySumFn(user_options.templated_int) + sum = (p + | 'ReadCollection' >> beam.io.ReadFromText( + 'gs://some/integer_collection') + | 'StringToInt' >> beam.Map(lambda w: int(w)) + | 'AddGivenInt' >> beam.ParDo(my_sum_fn) + | 'WriteResultingCollection' >> WriteToText('some/output_path')) + # [END examples_ptransforms_templated] + + my_sum_fn.templated_int = StaticValueProvider(int, 10) + + p.visit(SnippetUtils.RenameFiles(renames)) + result = p.run() + result.wait_until_finish() + + import apache_beam as beam from apache_beam.io import iobase from apache_beam.io.range_trackers import OffsetRangeTracker diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index e302465985a4..ba35681d434e 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -636,7 +636,8 @@ def test_pipeline_logging(self): def test_examples_wordcount(self): pipelines = [snippets.examples_wordcount_minimal, snippets.examples_wordcount_wordcount, - snippets.pipeline_monitoring] + snippets.pipeline_monitoring, + snippets.examples_wordcount_templated] for pipeline in pipelines: temp_path = self.create_temp_file( @@ -647,6 +648,18 @@ def test_examples_wordcount(self): self.get_output(result_path), ['abc: 2', 'def: 1', 'ghi: 1', 'jkl: 1']) + def test_examples_ptransforms_templated(self): + pipelines = [snippets.examples_ptransforms_templated] + + for pipeline in pipelines: + temp_path = self.create_temp_file( + '1\n 2\n 3') + result_path = self.create_temp_file() + pipeline({'read': temp_path, 'write': result_path}) + self.assertEqual( + self.get_output(result_path), + ['11', '12', '13']) + def test_examples_wordcount_debugging(self): temp_path = self.create_temp_file( 'Flourish Flourish Flourish stomach abc def') From 117ae09670d69835065afa46940f84f00f5fc417 Mon Sep 17 00:00:00 2001 From: Maria Garcia Herrero Date: Thu, 25 May 2017 19:40:23 -0700 Subject: [PATCH 2/4] Address review comments --- .../apache_beam/examples/snippets/snippets.py | 13 +++++++------ .../apache_beam/examples/snippets/snippets_test.py | 3 +-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 3067f0686bca..6fc426627bf2 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -512,14 +512,13 @@ def examples_wordcount_templated(renames): from apache_beam.options.pipeline_options import PipelineOptions # [START example_wordcount_templated] - class WordcountOptions(PipelineOptions): + class WordcountTemplatedOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', - # default = 'gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', @@ -528,7 +527,7 @@ def _add_argparse_args(cls, parser): pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) - wordcount_options = pipeline_options.view_as(WordcountOptions) + wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input) # [END example_wordcount_templated] @@ -618,11 +617,11 @@ def process(self, element): def examples_ptransforms_templated(renames): # [START examples_ptransforms_templated] import apache_beam as beam + from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.utils.value_provider import StaticValueProvider - from apache_beam.io import WriteToText - class UserOptions(PipelineOptions): + class TemplatedUserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) @@ -637,7 +636,7 @@ def process(self, an_int): pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) - user_options = pipeline_options.view_as(UserOptions) + user_options = pipeline_options.view_as(TemplatedUserOptions) my_sum_fn = MySumFn(user_options.templated_int) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( @@ -647,6 +646,8 @@ def process(self, an_int): | 'WriteResultingCollection' >> WriteToText('some/output_path')) # [END examples_ptransforms_templated] + # Templates are not supported by DirectRunner (only by DataflowRunner) + # so a value must be provided at graph-construction time my_sum_fn.templated_int = StaticValueProvider(int, 10) p.visit(SnippetUtils.RenameFiles(renames)) diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index ba35681d434e..9183d0dfea19 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -652,8 +652,7 @@ def test_examples_ptransforms_templated(self): pipelines = [snippets.examples_ptransforms_templated] for pipeline in pipelines: - temp_path = self.create_temp_file( - '1\n 2\n 3') + temp_path = self.create_temp_file('1\n 2\n 3') result_path = self.create_temp_file() pipeline({'read': temp_path, 'write': result_path}) self.assertEqual( From 58a92624acd6f7f9314b41874f6f47ff60dd3d79 Mon Sep 17 00:00:00 2001 From: Maria Garcia Herrero Date: Fri, 26 May 2017 10:37:42 -0700 Subject: [PATCH 3/4] Fix Jenkins test --- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 6fc426627bf2..1866f144bf5c 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -619,7 +619,7 @@ def examples_ptransforms_templated(renames): import apache_beam as beam from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions - from apache_beam.utils.value_provider import StaticValueProvider + from apache_beam.options.value_provider import StaticValueProvider class TemplatedUserOptions(PipelineOptions): @classmethod From d69857140a3714a27edecfeec3744ba6885c760a Mon Sep 17 00:00:00 2001 From: Maria Garcia Herrero Date: Wed, 31 May 2017 10:03:18 -0700 Subject: [PATCH 4/4] Address review comments --- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 1866f144bf5c..3a5f9b15a456 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -528,7 +528,7 @@ def _add_argparse_args(cls, parser): p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions) - lines = p | 'read' >> ReadFromText(wordcount_options.input) + lines = p | 'Read' >> ReadFromText(wordcount_options.input) # [END example_wordcount_templated] (