Skip to content

Commit

Permalink
Merge pull request #10383: [BEAM-8575] Added a unit test to test that…
Browse files Browse the repository at this point in the history
… Combine works with FixedWi…
  • Loading branch information
bumblebee-coming authored and chamikaramj committed Dec 17, 2019
1 parent 1cd62ef commit 4875f45
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions sdks/python/apache_beam/transforms/combiners_test.py
Expand Up @@ -461,6 +461,28 @@ def test_sessions_combine(self):
assert_that(sum_per_key, equal_to([('c', 1), ('c', 21), ('d', 6)]),
label='sum per key')

def test_fixed_windows_combine(self):
with TestPipeline() as p:
input = (
p
| beam.Create([('c', 1), ('c', 2), ('c', 10),
('d', 5), ('d', 8), ('d', 9)])
| beam.MapTuple(lambda k, v: window.TimestampedValue((k, v), v))
| beam.WindowInto(window.FixedWindows(4)))

global_sum = (input
| beam.Values()
| beam.CombineGlobally(sum).without_defaults())
sum_per_key = input | beam.CombinePerKey(sum)

# The first window has 2 elements: ('c', 1), ('c', 2).
# The second window has 1 elements: ('d', 5).
# The third window has 3 elements: ('c', 10), ('d', 8), ('d', 9).
assert_that(global_sum, equal_to([3, 5, 27]), label='global sum')
assert_that(sum_per_key,
equal_to([('c', 3), ('c', 10), ('d', 5), ('d', 17)]),
label='sum per key')


class LatestTest(unittest.TestCase):

Expand Down

0 comments on commit 4875f45

Please sign in to comment.