Skip to content
99 changes: 49 additions & 50 deletions sdks/python/apache_beam/dataflow_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,33 +54,33 @@ class DataflowTest(unittest.TestCase):
def Count(pcoll): # pylint: disable=invalid-name, no-self-argument
"""A Count transform: v, ... => (v, n), ..."""
return (pcoll
| Map('AddCount', lambda x: (x, 1))
| GroupByKey('GroupCounts')
| Map('AddCounts', lambda (x, ones): (x, sum(ones))))
| 'AddCount' >> Map(lambda x: (x, 1))
| 'GroupCounts' >> GroupByKey()
| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones))))

def test_word_count(self):
pipeline = Pipeline('DirectPipelineRunner')
lines = pipeline | Create('SomeWords', DataflowTest.SAMPLE_DATA)
lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
result = (
(lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
(lines | 'GetWords' >> FlatMap(lambda x: re.findall(r'\w+', x)))
.apply('CountWords', DataflowTest.Count))
assert_that(result, equal_to(DataflowTest.SAMPLE_RESULT))
pipeline.run()

def test_map(self):
pipeline = Pipeline('DirectPipelineRunner')
lines = pipeline | Create('input', ['a', 'b', 'c'])
lines = pipeline | 'input' >> Create(['a', 'b', 'c'])
result = (lines
| Map('upper', str.upper)
| Map('prefix', lambda x, prefix: prefix + x, 'foo-'))
| 'upper' >> Map(str.upper)
| 'prefix' >> Map(lambda x, prefix: prefix + x, 'foo-'))
assert_that(result, equal_to(['foo-A', 'foo-B', 'foo-C']))
pipeline.run()

def test_par_do_with_side_input_as_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
words = pipeline | Create('SomeWords', words_list)
prefix = pipeline | Create('SomeString', ['xyz']) # side in
words = pipeline | 'SomeWords' >> Create(words_list)
prefix = pipeline | 'SomeString' >> Create(['xyz']) # side in
suffix = 'zyx'
result = words | FlatMap(
'DecorateWords',
Expand All @@ -92,9 +92,9 @@ def test_par_do_with_side_input_as_arg(self):
def test_par_do_with_side_input_as_keyword_arg(self):
pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
words = pipeline | Create('SomeWords', words_list)
words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
suffix = pipeline | Create('SomeString', ['xyz']) # side in
suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
result = words | FlatMap(
'DecorateWords',
lambda x, pfx, sfx: ['%s-%s-%s' % (pfx, x, sfx)],
Expand All @@ -111,11 +111,11 @@ def process(self, context, prefix, suffix):

pipeline = Pipeline('DirectPipelineRunner')
words_list = ['aa', 'bb', 'cc']
words = pipeline | Create('SomeWords', words_list)
words = pipeline | 'SomeWords' >> Create(words_list)
prefix = 'zyx'
suffix = pipeline | Create('SomeString', ['xyz']) # side in
result = words | ParDo('DecorateWordsDoFn', SomeDoFn(), prefix,
suffix=AsSingleton(suffix))
suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in
result = words | 'DecorateWordsDoFn' >> ParDo(
SomeDoFn(), prefix, suffix=AsSingleton(suffix))
assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
pipeline.run()

Expand All @@ -131,7 +131,7 @@ def process(self, context):
yield SideOutputValue('odd', context.element)

pipeline = Pipeline('DirectPipelineRunner')
nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | ParDo(
'ClassifyNumbers', SomeDoFn()).with_outputs('odd', 'even', main='main')
assert_that(results.main, equal_to([1, 2, 3, 4]))
Expand All @@ -147,7 +147,7 @@ def some_fn(v):
return [v, SideOutputValue('odd', v)]

pipeline = Pipeline('DirectPipelineRunner')
nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers', some_fn).with_outputs('odd', 'even', main='main')
assert_that(results.main, equal_to([1, 2, 3, 4]))
Expand All @@ -157,45 +157,44 @@ def some_fn(v):

def test_empty_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
pcol = pipeline | Create('start', [1, 2])
side = pipeline | Create('side', []) # Empty side input.
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([]) # Empty side input.

def my_fn(k, s):
v = ('empty' if isinstance(s, EmptySideInput) else 'full')
return [(k, v)]
result = pcol | FlatMap('compute', my_fn, AsSingleton(side))
result = pcol | 'compute' >> FlatMap(my_fn, AsSingleton(side))
assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
pipeline.run()

def test_multi_valued_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
pcol = pipeline | Create('start', [1, 2])
side = pipeline | Create('side', [3, 4]) # 2 values in side input.
pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side)) # pylint: disable=expression-not-assigned
with self.assertRaises(ValueError):
pipeline.run()

def test_default_value_singleton_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
pcol = pipeline | Create('start', [1, 2])
side = pipeline | Create('side', []) # 0 values in side input.
result = (
pcol | FlatMap('compute', lambda x, s: [x * s], AsSingleton(side, 10)))
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([]) # 0 values in side input.
result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
assert_that(result, equal_to([10, 20]))
pipeline.run()

def test_iterable_side_input(self):
pipeline = Pipeline('DirectPipelineRunner')
pcol = pipeline | Create('start', [1, 2])
side = pipeline | Create('side', [3, 4]) # 2 values in side input.
pcol = pipeline | 'start' >> Create([1, 2])
side = pipeline | 'side' >> Create([3, 4]) # 2 values in side input.
result = pcol | FlatMap('compute',
lambda x, s: [x * y for y in s], AllOf(side))
assert_that(result, equal_to([3, 4, 6, 8]))
pipeline.run()

def test_undeclared_side_outputs(self):
pipeline = Pipeline('DirectPipelineRunner')
nums = pipeline | Create('Some Numbers', [1, 2, 3, 4])
nums = pipeline | 'Some Numbers' >> Create([1, 2, 3, 4])
results = nums | FlatMap(
'ClassifyNumbers',
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
Expand All @@ -210,7 +209,7 @@ def test_undeclared_side_outputs(self):

def test_empty_side_outputs(self):
pipeline = Pipeline('DirectPipelineRunner')
nums = pipeline | Create('Some Numbers', [1, 3, 5])
nums = pipeline | 'Some Numbers' >> Create([1, 3, 5])
results = nums | FlatMap(
'ClassifyNumbers',
lambda x: [x, SideOutputValue('even' if x % 2 == 0 else 'odd', x)]
Expand All @@ -224,9 +223,9 @@ def test_as_list_and_as_dict_side_inputs(self):
a_list = [5, 1, 3, 2, 9]
some_pairs = [('crouton', 17), ('supreme', None)]
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)
side_pairs = pipeline | Create('side pairs', some_pairs)
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
side_pairs = pipeline | 'side pairs' >> Create(some_pairs)
results = main_input | FlatMap(
'concatenate',
lambda x, the_list, the_dict: [[x, the_list, the_dict]],
Expand All @@ -248,8 +247,8 @@ def test_as_singleton_without_unique_labels(self):
# with the same defaults will return the same PCollectionView.
a_list = [2]
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, s1, s2: [[x, s1, s2]],
Expand All @@ -271,8 +270,8 @@ def test_as_singleton_with_different_defaults_without_unique_labels(self):
# distinct PCollectionViews with the same full_label.
a_list = [2]
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)

with self.assertRaises(RuntimeError) as e:
_ = main_input | FlatMap(
Expand All @@ -287,8 +286,8 @@ def test_as_singleton_with_different_defaults_without_unique_labels(self):
def test_as_singleton_with_different_defaults_with_unique_labels(self):
a_list = []
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, s1, s2: [[x, s1, s2]],
Expand All @@ -311,8 +310,8 @@ def test_as_list_without_unique_labels(self):
# return the same PCollectionView.
a_list = [1, 2, 3]
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, ls1, ls2: [[x, ls1, ls2]],
Expand All @@ -332,8 +331,8 @@ def match(actual):
def test_as_list_with_unique_labels(self):
a_list = [1, 2, 3]
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_list = pipeline | Create('side list', a_list)
main_input = pipeline | 'main input' >> Create([1])
side_list = pipeline | 'side list' >> Create(a_list)
results = main_input | FlatMap(
'test',
lambda x, ls1, ls2: [[x, ls1, ls2]],
Expand All @@ -353,8 +352,8 @@ def match(actual):
def test_as_dict_with_unique_labels(self):
some_kvs = [('a', 1), ('b', 2)]
pipeline = Pipeline('DirectPipelineRunner')
main_input = pipeline | Create('main input', [1])
side_kvs = pipeline | Create('side kvs', some_kvs)
main_input = pipeline | 'main input' >> Create([1])
side_kvs = pipeline | 'side kvs' >> Create(some_kvs)
results = main_input | FlatMap(
'test',
lambda x, dct1, dct2: [[x, dct1, dct2]],
Expand Down Expand Up @@ -383,10 +382,10 @@ def merge(self, existing_windows):
return existing_windows

pipeline = Pipeline('DirectPipelineRunner')
numbers = pipeline | Create('KVs', [(1, 10), (2, 20), (3, 30)])
numbers = pipeline | 'KVs' >> Create([(1, 10), (2, 20), (3, 30)])
result = (numbers
| WindowInto('W', windowfn=TestWindowFn())
| GroupByKey('G'))
| 'W' >> WindowInto(windowfn=TestWindowFn())
| 'G' >> GroupByKey())
assert_that(
result, equal_to([(1, [10]), (1, [10]), (2, [20]),
(2, [20]), (3, [30]), (3, [30])]))
Expand Down
16 changes: 8 additions & 8 deletions sdks/python/apache_beam/examples/complete/autocomplete.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ def run(argv=None):
p = beam.Pipeline(argv=pipeline_args)

(p # pylint: disable=expression-not-assigned
| beam.io.Read('read', beam.io.TextFileSource(known_args.input))
| beam.FlatMap('split', lambda x: re.findall(r'[A-Za-z\']+', x))
| TopPerPrefix('TopPerPrefix', 5)
| beam.Map('format',
lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
| beam.io.Write('write', beam.io.TextFileSink(known_args.output)))
| 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'TopPerPrefix' >> TopPerPrefix(5)
| 'format' >> beam.Map(
lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
| 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
p.run()


class TopPerPrefix(beam.PTransform):

def __init__(self, label, count):
super(TopPerPrefix, self).__init__(label)
def __init__(self, count):
super(TopPerPrefix, self).__init__()
self._count = count

def apply(self, words):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):

def test_top_prefixes(self):
p = beam.Pipeline('DirectPipelineRunner')
words = p | beam.Create('create', self.WORDS)
result = words | autocomplete.TopPerPrefix('test', 5)
words = p | beam.Create(self.WORDS)
result = words | autocomplete.TopPerPrefix(5)
# values must be hashable for now
result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
assert_that(result, equal_to(
Expand Down
14 changes: 5 additions & 9 deletions sdks/python/apache_beam/examples/complete/estimate_pi.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,12 @@ def encode(self, x):
class EstimatePiTransform(beam.PTransform):
"""Runs 10M trials, and combine the results to estimate pi."""

def __init__(self, label):
super(EstimatePiTransform, self).__init__(label)

def apply(self, pcoll):
# A hundred work items of a hundred thousand tries each.
return (pcoll
| beam.Create('Initialize', [100000] * 100).with_output_types(int)
| beam.Map('Run trials', run_trials)
| beam.CombineGlobally('Sum', combine_results).without_defaults())
| 'Initialize' >> beam.Create([100000] * 100).with_output_types(int)
| 'Run trials' >> beam.Map(run_trials)
| 'Sum' >> beam.CombineGlobally(combine_results).without_defaults())


def run(argv=None):
Expand All @@ -109,9 +106,8 @@ def run(argv=None):

p = beam.Pipeline(argv=pipeline_args)
(p # pylint: disable=expression-not-assigned
| EstimatePiTransform('Estimate')
| beam.io.Write('Write',
beam.io.TextFileSink(known_args.output,
| EstimatePiTransform()
| beam.io.Write(beam.io.TextFileSink(known_args.output,
coder=JsonCoder())))

# Actually run the pipeline (all operations above are deferred).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class EstimatePiTest(unittest.TestCase):

def test_basics(self):
p = beam.Pipeline('DirectPipelineRunner')
result = p | estimate_pi.EstimatePiTransform('Estimate')
result = p | 'Estimate' >> estimate_pi.EstimatePiTransform()

# Note: Probabilistically speaking this test can fail with a probability
# that is very small (VERY) given that we run at least 10 million trials.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def point_set(n):
yield (x, y)

julia_set_colors = (pipeline
| beam.Create('add points', point_set(n))
| 'add points' >> beam.Create(point_set(n))
| beam.Map(
get_julia_set_point_color, c, n, max_iterations))

Expand Down Expand Up @@ -105,11 +105,12 @@ def run(argv=None): # pylint: disable=missing-docstring
# Group each coordinate triplet by its x value, then write the coordinates to
# the output file with an x-coordinate grouping per line.
# pylint: disable=expression-not-assigned
(coordinates | beam.Map('x coord key', lambda (x, y, i): (x, (x, y, i)))
| beam.GroupByKey('x coord') | beam.Map(
'format',
(coordinates
| 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
| 'x coord' >> beam.GroupByKey()
| 'format' >> beam.Map(
lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords))
| beam.io.Write('write', beam.io.TextFileSink(known_args.coordinate_output)))
| beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
# pylint: enable=expression-not-assigned
p.run()

Expand Down
Loading