Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-8575] Added a unit test to CombineTest class to test that Combi… #10159

31 changes: 31 additions & 0 deletions sdks/python/apache_beam/transforms/combiners_test.py
Expand Up @@ -31,18 +31,24 @@
import apache_beam as beam
import apache_beam.transforms.combiners as combine
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.testing.util import equal_to_per_window
from apache_beam.transforms import trigger
from apache_beam.transforms import window
from apache_beam.transforms.core import CombineGlobally
from apache_beam.transforms.core import Create
from apache_beam.transforms.core import Map
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.trigger import AfterAll
from apache_beam.transforms.trigger import AfterCount
from apache_beam.transforms.trigger import AfterWatermark
from apache_beam.transforms.window import GlobalWindows
from apache_beam.transforms.window import TimestampCombiner
from apache_beam.typehints import TypeCheckError
from apache_beam.utils.timestamp import Timestamp
Expand Down Expand Up @@ -399,6 +405,31 @@ def test_global_fanout(self):
| beam.CombineGlobally(combine.MeanCombineFn()).with_fanout(11))
assert_that(result, equal_to([49.5]))

def test_combining_with_accumulation_mode_and_fanout(self):
# PCollection will contain elements from 1 to 5.
elements = [i for i in range(1, 6)]

ts = TestStream().advance_watermark_to(0)
for i in elements:
ts.add_elements([i])
ts.advance_watermark_to_infinity()

options = PipelineOptions()
options.view_as(StandardOptions).streaming = True
bumblebee-coming marked this conversation as resolved.
Show resolved Hide resolved
with TestPipeline(options=options) as p:
result = (p
| ts
| beam.WindowInto(
GlobalWindows(),
accumulation_mode=trigger.AccumulationMode.ACCUMULATING,
trigger=AfterWatermark(early=AfterAll(AfterCount(1)))
)
| beam.CombineGlobally(sum).without_defaults().with_fanout(2))
bumblebee-coming marked this conversation as resolved.
Show resolved Hide resolved

# The frings for DISCARDING mode is [1, 2, 3, 4, 5, 0, 0].
firings = [1, 3, 6, 10, 15, 15, 15]
assert_that(result, equal_to(firings))

def test_MeanCombineFn_combine(self):
with TestPipeline() as p:
input = (p
Expand Down