Skip to content

Commit

Permalink
This closes #2738
Browse files Browse the repository at this point in the history
  • Loading branch information
aaltay committed Apr 27, 2017
2 parents 42e3a6f + 9174ebf commit 884935c
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
24 changes: 15 additions & 9 deletions sdks/python/apache_beam/examples/snippets/snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ def construct_pipeline(renames):
"""A reverse words snippet as an example for constructing a pipeline."""
import re

# This is duplicate of the import statement in
# pipelines_constructing_creating tag below, but required to avoid
# Unresolved reference in ReverseWords class
import apache_beam as beam

class ReverseWords(beam.PTransform):
"""A PTransform that reverses individual elements in a PCollection."""

Expand All @@ -85,6 +90,7 @@ def filter_words(unused_x):
return True

# [START pipelines_constructing_creating]
import apache_beam as beam
from apache_beam.utils.pipeline_options import PipelineOptions

p = beam.Pipeline(options=PipelineOptions())
Expand Down Expand Up @@ -172,16 +178,18 @@ def _add_argparse_args(cls, parser):
# [START model_pcollection]
p = beam.Pipeline(options=pipeline_options)

(p
| beam.Create([
'To be, or not to be: that is the question: ',
'Whether \'tis nobler in the mind to suffer ',
'The slings and arrows of outrageous fortune, ',
'Or to take arms against a sea of troubles, '])
lines = (p
| beam.Create([
'To be, or not to be: that is the question: ',
'Whether \'tis nobler in the mind to suffer ',
'The slings and arrows of outrageous fortune, ',
'Or to take arms against a sea of troubles, ']))
# [END model_pcollection]

(lines
| beam.io.WriteToText(my_options.output))

result = p.run()
# [END model_pcollection]
result.wait_until_finish()


Expand Down Expand Up @@ -1006,9 +1014,7 @@ def model_multiple_pcollections_flatten(contents, output_path):
# types.)
# [START model_multiple_pcollections_flatten]
merged = (
# [START model_multiple_pcollections_tuple]
(pcoll1, pcoll2, pcoll3)
# [END model_multiple_pcollections_tuple]
# A list of tuples can be "piped" directly into a Flatten transform.
| beam.Flatten())
# [END model_multiple_pcollections_flatten]
Expand Down
6 changes: 4 additions & 2 deletions sdks/python/apache_beam/examples/snippets/snippets_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ def test_combine_reduce(self):
def test_custom_average(self):
pc = [2, 3, 5, 7]

# [START combine_custom_average]
# [START combine_custom_average_define]
class AverageFn(beam.CombineFn):
def create_accumulator(self):
return (0.0, 0)
Expand All @@ -781,8 +781,10 @@ def merge_accumulators(self, accumulators):

def extract_output(self, (sum, count)):
return sum / count if count else float('NaN')
# [END combine_custom_average_define]
# [START combine_custom_average_execute]
average = pc | beam.CombineGlobally(AverageFn())
# [END combine_custom_average]
# [END combine_custom_average_execute]
self.assertEqual([4.25], average)

def test_keys(self):
Expand Down

0 comments on commit 884935c

Please sign in to comment.