Skip to content

Commit

Permalink
[BEAM-8645] Create a py test case for Re-iteration on GBK result. (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
HuangLED authored and robertwb committed Nov 19, 2019
1 parent 3685a6e commit 7828147
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions sdks/python/apache_beam/transforms/ptransform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,24 @@ def test_group_by_key(self):
assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
pipeline.run()

def test_group_by_key_reiteration(self):
class MyDoFn(beam.DoFn):
def process(self, gbk_result):
key, value_list = gbk_result
sum_val = 0
# Iterate the GBK result for multiple times.
for _ in range(0, 17):
sum_val += sum(value_list)
return [(key, sum_val)]

pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(
[(1, 1), (1, 2), (1, 3), (1, 4)])
result = (pcoll | 'Group' >> beam.GroupByKey()
| 'Reiteration-Sum' >> beam.ParDo(MyDoFn()))
assert_that(result, equal_to([(1, 170)]))
pipeline.run()

def test_partition_with_partition_fn(self):

class SomePartitionFn(beam.PartitionFn):
Expand Down

0 comments on commit 7828147

Please sign in to comment.