Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-210] Test that empty final panes are not produced. #211

Closed
wants to merge 4 commits into from
Closed
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,8 @@ public void testPaneInfoAllStatesAfterWatermark() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
.withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
AfterPane.<IntervalWindow>elementCountAtLeast(2),
AfterWatermark.<IntervalWindow>pastEndOfWindow())))
AfterPane.elementCountAtLeast(2),
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
Expand Down Expand Up @@ -547,13 +547,39 @@ public void testPaneInfoAllStatesAfterWatermark() throws Exception {
WindowMatchers.isSingleWindowedValue(emptyIterable(), 9, 0, 10)));
}

@Test
public void noEmptyPanes() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
.withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
AfterPane.elementCountAtLeast(2),
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
.withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY));

tester.advanceInputWatermark(new Instant(0));
tester.injectElements(
TimestampedValue.of(1, new Instant(1)),
TimestampedValue.of(2, new Instant(2)));
tester.advanceInputWatermark(new Instant(20));
tester.advanceInputWatermark(new Instant(250));

List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
assertThat(output, contains(
// Trigger with 2 elements
WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10),
// Trigger for the empty on time pane.
WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You said you couldn't repro the problem in the JIRA ticket, but this test seems to confirm exactly the WAI behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was unable to repro the report of broken behavior. I was able to verify the WAI behavior.

}

@Test
public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
.withTrigger(Repeatedly.<IntervalWindow>forever(AfterFirst.<IntervalWindow>of(
AfterPane.<IntervalWindow>elementCountAtLeast(2),
AfterWatermark.<IntervalWindow>pastEndOfWindow())))
AfterPane.elementCountAtLeast(2),
AfterWatermark.pastEndOfWindow())))
.withMode(AccumulationMode.ACCUMULATING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
Expand Down Expand Up @@ -604,8 +630,8 @@ public void testPaneInfoFinalAndOnTime() throws Exception {
ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = ReduceFnTester.nonCombining(
WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
.withTrigger(
Repeatedly.<IntervalWindow>forever(AfterPane.<IntervalWindow>elementCountAtLeast(2))
.orFinally(AfterWatermark.<IntervalWindow>pastEndOfWindow()))
Repeatedly.<IntervalWindow>forever(AfterPane.elementCountAtLeast(2))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as you are cleaning up unused generic params, might as well get this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These weren't reported as unused parameter warnings. I suspect because the methods in question still accept a generic (public static <W extends BoundedWindow> Repeatedly forever(Trigger repeated)) while the ones that I fixed no longer accept one.

.orFinally(AfterWatermark.pastEndOfWindow()))
.withMode(AccumulationMode.DISCARDING_FIRED_PANES)
.withAllowedLateness(Duration.millis(100))
.withClosingBehavior(ClosingBehavior.FIRE_ALWAYS));
Expand Down Expand Up @@ -890,7 +916,7 @@ public void testDropDataMultipleWindowsFinishedTrigger() throws Exception {
ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining(
WindowingStrategy.of(
SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30)))
.withTrigger(AfterWatermark.<IntervalWindow>pastEndOfWindow())
.withTrigger(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.millis(1000)),
new Sum.SumIntegerFn().<String>asKeyedFn(), VarIntCoder.of());

Expand Down Expand Up @@ -1024,7 +1050,7 @@ public void testEmptyOnTimeFromOrFinally() throws Exception {
.<IntervalWindow>forever(
AfterProcessingTime.<IntervalWindow>pastFirstElementInPane().plusDelayOf(
new Duration(5)))
.orFinally(AfterWatermark.<IntervalWindow>pastEndOfWindow()),
.orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.<IntervalWindow>forever(
AfterProcessingTime.<IntervalWindow>pastFirstElementInPane().plusDelayOf(
new Duration(25)))),
Expand Down Expand Up @@ -1074,7 +1100,7 @@ public void testProcessingTime() throws Exception {
.<IntervalWindow>forever(
AfterProcessingTime.<IntervalWindow>pastFirstElementInPane().plusDelayOf(
new Duration(5)))
.orFinally(AfterWatermark.<IntervalWindow>pastEndOfWindow()),
.orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.<IntervalWindow>forever(
AfterProcessingTime.<IntervalWindow>pastFirstElementInPane().plusDelayOf(
new Duration(25)))),
Expand Down