From 3237440e2120555a90d74dfaae1d7b44b2d17203 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 19 Oct 2016 16:48:58 -0700 Subject: [PATCH 1/3] Port ReduceFnRunner to TriggerStateMachine --- .../GroupAlsoByWindowViaWindowSetDoFn.java | 5 + ...GroupAlsoByWindowsViaOutputBufferDoFn.java | 5 + .../beam/runners/core/ReduceFnRunner.java | 37 +-- .../triggers/TriggerStateMachineRunner.java | 2 +- .../beam/runners/core/ReduceFnRunnerTest.java | 281 ++++++++++-------- .../beam/runners/core/ReduceFnTester.java | 157 +++++++--- 6 files changed, 313 insertions(+), 174 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index b427037ef008..75a5aa7470f7 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -18,6 +18,8 @@ package org.apache.beam.runners.core; import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; @@ -81,6 +83,9 @@ public void processElement(ProcessContext c) throws Exception { new ReduceFnRunner<>( key, windowingStrategy, + ExecutableTriggerStateMachine.create( + TriggerStateMachines.stateMachineForTrigger( + windowingStrategy.getTrigger().getSpec())), stateInternals, timerInternals, c.windowingInternals(), diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index 23986df43f26..4dea77535af1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -19,6 +19,8 @@ import com.google.common.collect.Iterables; import java.util.List; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; @@ -70,6 +72,9 @@ public void processElement( new ReduceFnRunner( key, strategy, + ExecutableTriggerStateMachine.create( + TriggerStateMachines.stateMachineForTrigger( + strategy.getTrigger().getSpec())), stateInternals, timerInternals, c.windowingInternals(), diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 24d472bb06ef..78c4e0ba5a9e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -34,6 +34,9 @@ import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; import org.apache.beam.runners.core.ReduceFnContextFactory.OnTriggerCallbacks; import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory; +import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.windowing.AfterWatermark; @@ -50,7 +53,6 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.TriggerContextFactory; import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; @@ -69,25 +71,25 @@ * Manages the execution of a {@link ReduceFn} after a {@link GroupByKeyOnly} has partitioned the * {@link PCollection} by key. * - *

The {@link #onTrigger} relies on a {@link TriggerRunner} to manage the execution of - * the triggering logic. The {@code ReduceFnRunner}s responsibilities are: + *

The {@link #onTrigger} relies on a {@link TriggerStateMachineRunner} to manage the execution + * of the triggering logic. The {@code ReduceFnRunner}s responsibilities are: * *

* - * @param The type of key being processed. - * @param The type of values associated with the key. + * @param The type of key being processed. + * @param The type of values associated with the key. * @param The output type that will be produced for each key. - * @param The type of windows this operates on. + * @param The type of windows this operates on. */ public class ReduceFnRunner implements TimerCallback { @@ -165,7 +167,7 @@ public class ReduceFnRunner impleme * garbage collected. * */ - private final TriggerRunner triggerRunner; + private final TriggerStateMachineRunner triggerRunner; /** * Store the output watermark holds for each window. @@ -212,6 +214,7 @@ public class ReduceFnRunner impleme public ReduceFnRunner( K key, WindowingStrategy windowingStrategy, + ExecutableTriggerStateMachine triggerStateMachine, StateInternals stateInternals, TimerInternals timerInternals, WindowingInternals> windowingInternals, @@ -242,9 +245,9 @@ public ReduceFnRunner( this.watermarkHold = new WatermarkHold<>(timerInternals, windowingStrategy); this.triggerRunner = - new TriggerRunner<>( - windowingStrategy.getTrigger(), - new TriggerContextFactory<>( + new TriggerStateMachineRunner<>( + triggerStateMachine, + new TriggerStateMachineContextFactory<>( windowingStrategy.getWindowFn(), stateInternals, activeWindows)); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java index 0ffbbca0d2b7..9f03216f5b9b 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java @@ -57,7 +57,7 @@ */ public class TriggerStateMachineRunner { @VisibleForTesting - static final StateTag> FINISHED_BITS_TAG = + public static final StateTag> FINISHED_BITS_TAG = StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of())); private final ExecutableTriggerStateMachine rootTrigger; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 4d5680c5ea97..20eb08b67ecd 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -38,6 +38,7 @@ import com.google.common.collect.Iterables; import java.util.Iterator; import java.util.List; +import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.sdk.WindowMatchers; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.options.PipelineOptions; @@ -94,23 +95,23 @@ @RunWith(JUnit4.class) public class ReduceFnRunnerTest { @Mock private SideInputReader mockSideInputReader; - private Trigger mockTrigger; + private TriggerStateMachine mockTriggerStateMachine; private PCollectionView mockView; private IntervalWindow firstWindow; - private static Trigger.TriggerContext anyTriggerContext() { - return Mockito.any(); + private static TriggerStateMachine.TriggerContext anyTriggerContext() { + return Mockito.any(); } - private static Trigger.OnElementContext anyElementContext() { - return Mockito.any(); + private static TriggerStateMachine.OnElementContext anyElementContext() { + return Mockito.any(); } @Before public void setUp() { MockitoAnnotations.initMocks(this); - mockTrigger = mock(Trigger.class, withSettings().serializable()); + mockTriggerStateMachine = mock(TriggerStateMachine.class, withSettings().serializable()); @SuppressWarnings("unchecked") PCollectionView mockViewUnchecked = @@ -121,17 +122,17 @@ public void setUp() { private void injectElement(ReduceFnTester tester, int element) throws Exception { - doNothing().when(mockTrigger).onElement(anyElementContext()); + doNothing().when(mockTriggerStateMachine).onElement(anyElementContext()); tester.injectElements(TimestampedValue.of(element, new Instant(element))); } - private void triggerShouldFinish(Trigger mockTrigger) throws Exception { + private void triggerShouldFinish(TriggerStateMachine mockTrigger) throws Exception { doAnswer(new Answer() { @Override public Void answer(InvocationOnMock invocation) throws Exception { @SuppressWarnings("unchecked") - Trigger.TriggerContext context = - (Trigger.TriggerContext) invocation.getArguments()[0]; + TriggerStateMachine.TriggerContext context = + (TriggerStateMachine.TriggerContext) invocation.getArguments()[0]; context.trigger().setFinished(true); return null; } @@ -143,20 +144,20 @@ public Void answer(InvocationOnMock invocation) throws Exception { public void testOnElementBufferingDiscarding() throws Exception { // Test basic execution of a trigger using a non-combining window set and discarding mode. ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100), ClosingBehavior.FIRE_IF_NON_EMPTY); // Pane of {1, 2} injectElement(tester, 1); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 2); assertThat(tester.extractOutput(), contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10))); // Pane of just 3, and finish - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 3); assertThat(tester.extractOutput(), contains(isSingleWindowedValue(containsInAnyOrder(3), 3, 0, 10))); @@ -173,19 +174,22 @@ public void testOnElementBufferingDiscarding() throws Exception { public void testOnElementBufferingAccumulating() throws Exception { // Test basic execution of a trigger using a non-combining window set and accumulating mode. ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger, - AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100), + ReduceFnTester.nonCombining( + FixedWindows.of(Duration.millis(10)), + mockTriggerStateMachine, + AccumulationMode.ACCUMULATING_FIRED_PANES, + Duration.millis(100), ClosingBehavior.FIRE_IF_NON_EMPTY); injectElement(tester, 1); // Fires {1, 2} - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 2); // Fires {1, 2, 3} because we are in accumulating mode - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 3); // This element shouldn't be seen, because the trigger has finished @@ -203,17 +207,27 @@ public void testOnElementBufferingAccumulating() throws Exception { @Test public void testOnElementCombiningDiscarding() throws Exception { // Test basic execution of a trigger using a non-combining window set and discarding mode. - ReduceFnTester tester = ReduceFnTester.combining( - FixedWindows.of(Duration.millis(10)), mockTrigger, AccumulationMode.DISCARDING_FIRED_PANES, - new Sum.SumIntegerFn().asKeyedFn(), VarIntCoder.of(), Duration.millis(100)); + + WindowingStrategy strategy = + WindowingStrategy.of((WindowFn) FixedWindows.of(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withMode(AccumulationMode.DISCARDING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)); + + ReduceFnTester tester = + ReduceFnTester.combining( + strategy, + mockTriggerStateMachine, + new Sum.SumIntegerFn().asKeyedFn(), + VarIntCoder.of()); injectElement(tester, 2); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 3); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 4); // This element shouldn't be seen, because the trigger has finished @@ -267,14 +281,17 @@ public BoundedWindow window() { window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp())); // Test basic execution of a trigger using a non-combining window set and accumulating mode. + + WindowingStrategy strategy = + WindowingStrategy.of((WindowFn) windowFn) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever())) + .withMode(AccumulationMode.DISCARDING_FIRED_PANES) + .withAllowedLateness(allowedLateness); + ReduceFnTester tester = - ReduceFnTester.combining( - windowFn, - AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()), - AccumulationMode.DISCARDING_FIRED_PANES, - new Sum.SumIntegerFn().asKeyedFn(), - VarIntCoder.of(), - allowedLateness); + ReduceFnTester + .combining(strategy, new Sum.SumIntegerFn().asKeyedFn(), VarIntCoder.of()); tester.injectElements(TimestampedValue.of(13, elementTimestamp)); @@ -295,18 +312,27 @@ public BoundedWindow window() { @Test public void testOnElementCombiningAccumulating() throws Exception { // Test basic execution of a trigger using a non-combining window set and accumulating mode. + + WindowingStrategy strategy = + WindowingStrategy.of((WindowFn) FixedWindows.of(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)); + ReduceFnTester tester = - ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)), mockTrigger, - AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().asKeyedFn(), - VarIntCoder.of(), Duration.millis(100)); + ReduceFnTester.combining( + strategy, + mockTriggerStateMachine, + new Sum.SumIntegerFn().asKeyedFn(), + VarIntCoder.of()); injectElement(tester, 1); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 2); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 3); // This element shouldn't be seen, because the trigger has finished @@ -326,7 +352,6 @@ public void testOnElementCombiningWithContext() throws Exception { Integer expectedValue = 5; WindowingStrategy windowingStrategy = WindowingStrategy .of(FixedWindows.of(Duration.millis(10))) - .withTrigger(mockTrigger) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withAllowedLateness(Duration.millis(100)); @@ -345,16 +370,16 @@ public void testOnElementCombiningWithContext() throws Exception { SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue); // Test basic execution of a trigger using a non-combining window set and discarding mode. ReduceFnTester tester = ReduceFnTester.combining( - windowingStrategy, combineFn.asKeyedFn(), + windowingStrategy, mockTriggerStateMachine, combineFn.asKeyedFn(), VarIntCoder.of(), options, mockSideInputReader); injectElement(tester, 2); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 3); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 4); // This element shouldn't be seen, because the trigger has finished @@ -373,7 +398,7 @@ public void testOnElementCombiningWithContext() throws Exception { public void testWatermarkHoldAndLateData() throws Exception { // Test handling of late data. Specifically, ensure the watermark hold is correct. ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10), ClosingBehavior.FIRE_IF_NON_EMPTY); @@ -385,7 +410,7 @@ public void testWatermarkHoldAndLateData() throws Exception { injectElement(tester, 1); injectElement(tester, 3); assertEquals(new Instant(1), tester.getWatermarkHold()); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 2); List>> output = tester.extractOutput(); assertThat(output, contains( @@ -406,14 +431,14 @@ public void testWatermarkHoldAndLateData() throws Exception { assertEquals(new Instant(4), tester.getOutputWatermark()); // Some late, some on time. Verify that we only hold to the minimum of on-time. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); tester.advanceInputWatermark(new Instant(4)); injectElement(tester, 2); injectElement(tester, 3); assertEquals(new Instant(9), tester.getWatermarkHold()); injectElement(tester, 5); assertEquals(new Instant(5), tester.getWatermarkHold()); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 4); output = tester.extractOutput(); assertThat(output, @@ -428,7 +453,7 @@ public void testWatermarkHoldAndLateData() throws Exception { equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))); // All late -- output at end of window timestamp. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); tester.advanceInputWatermark(new Instant(8)); injectElement(tester, 6); injectElement(tester, 5); @@ -436,7 +461,7 @@ public void testWatermarkHoldAndLateData() throws Exception { injectElement(tester, 4); // Fire the ON_TIME pane - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); tester.advanceInputWatermark(new Instant(10)); // Output time is end of the window, because all the new data was late, but the pane @@ -455,7 +480,7 @@ public void testWatermarkHoldAndLateData() throws Exception { // This is "pending" at the time the watermark makes it way-late. // Because we're about to expire the window, we output it. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); injectElement(tester, 8); assertEquals(0, tester.getElementsDroppedDueToClosedWindow()); @@ -492,7 +517,7 @@ public void testWatermarkHoldAndLateData() throws Exception { public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception { // Make sure holds are only set if they are accompanied by an end-of-window timer. ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(10), ClosingBehavior.FIRE_ALWAYS); tester.setAutoAdvanceOutputWatermark(false); @@ -506,9 +531,9 @@ public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception { assertEquals(new Instant(19), tester.getNextTimer(TimeDomain.EVENT_TIME)); // Trigger the end-of-window timer. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); tester.advanceInputWatermark(new Instant(20)); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); // Hold has been replaced with garbage collection hold. Waiting for garbage collection. assertEquals(new Instant(29), tester.getWatermarkHold()); assertEquals(new Instant(29), tester.getNextTimer(TimeDomain.EVENT_TIME)); @@ -530,37 +555,37 @@ public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception { @Test public void testPaneInfoAllStates() throws Exception { ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100), ClosingBehavior.FIRE_IF_NON_EMPTY); tester.advanceInputWatermark(new Instant(0)); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 1); assertThat(tester.extractOutput(), contains( WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY)))); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 2); assertThat(tester.extractOutput(), contains( WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)))); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); tester.advanceInputWatermark(new Instant(15)); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 3); assertThat(tester.extractOutput(), contains( WindowMatchers.valueWithPaneInfo( PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)))); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); injectElement(tester, 4); assertThat(tester.extractOutput(), contains( WindowMatchers.valueWithPaneInfo( PaneInfo.createPane(false, false, Timing.LATE, 3, 1)))); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 5); assertThat(tester.extractOutput(), contains( WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2)))); @@ -758,13 +783,13 @@ public void testPaneInfoFinalAndOnTime() throws Exception { @Test public void testPaneInfoSkipToFinish() throws Exception { ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100), ClosingBehavior.FIRE_IF_NON_EMPTY); tester.advanceInputWatermark(new Instant(0)); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 1); assertThat(tester.extractOutput(), contains( WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.EARLY)))); @@ -773,13 +798,13 @@ public void testPaneInfoSkipToFinish() throws Exception { @Test public void testPaneInfoSkipToNonSpeculativeAndFinish() throws Exception { ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100), ClosingBehavior.FIRE_IF_NON_EMPTY); tester.advanceInputWatermark(new Instant(15)); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 1); assertThat(tester.extractOutput(), contains( WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.LATE)))); @@ -790,7 +815,8 @@ public void testMergeBeforeFinalizing() throws Exception { // Verify that we merge windows before producing output so users don't see undesired // unmerged windows. ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), + mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(0), ClosingBehavior.FIRE_IF_NON_EMPTY); @@ -823,7 +849,8 @@ public void testMergeBeforeFinalizing() throws Exception { @Test public void testMergingWithCloseBeforeGC() throws Exception { ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), + mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), ClosingBehavior.FIRE_IF_NON_EMPTY); @@ -833,8 +860,8 @@ public void testMergingWithCloseBeforeGC() throws Exception { TimestampedValue.of(10, new Instant(10))); // in [10, 20) // Close the trigger, but the gargbage collection timer is still pending. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); tester.advanceInputWatermark(new Instant(30)); // Now the garbage collection timer will fire, finding the trigger already closed. @@ -858,7 +885,8 @@ public void testMergingWithCloseBeforeGC() throws Exception { @Test public void testMergingWithCloseTrigger() throws Exception { ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), + mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), ClosingBehavior.FIRE_IF_NON_EMPTY); @@ -867,14 +895,14 @@ public void testMergingWithCloseTrigger() throws Exception { TimestampedValue.of(2, new Instant(2))); // Force the trigger to be closed for the merged window. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); tester.advanceInputWatermark(new Instant(13)); // Trigger is now closed. assertTrue(tester.isMarkedFinished(new IntervalWindow(new Instant(1), new Instant(12)))); - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); // Revisit the same session window. tester.injectElements(TimestampedValue.of(1, new Instant(1)), @@ -891,7 +919,8 @@ public void testMergingWithCloseTrigger() throws Exception { @Test public void testMergingWithReusedWindow() throws Exception { ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), + mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), ClosingBehavior.FIRE_IF_NON_EMPTY); @@ -899,8 +928,8 @@ public void testMergingWithReusedWindow() throws Exception { tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. // Close the trigger, but the gargbage collection timer is still pending. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); tester.advanceInputWatermark(new Instant(15)); // Another element in the same session window. @@ -932,14 +961,15 @@ public void testMergingWithReusedWindow() throws Exception { @Test public void testMergingWithClosedRepresentative() throws Exception { ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), + mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), ClosingBehavior.FIRE_IF_NON_EMPTY); // 2 elements into merged session window. // Close the trigger, but the garbage collection timer is still pending. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); tester.injectElements(TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21. TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28. @@ -973,17 +1003,18 @@ public void testMergingWithClosedRepresentative() throws Exception { @Test public void testMergingWithClosedDoesNotPoison() throws Exception { ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(Sessions.withGapDuration(Duration.millis(10)), + mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(50), ClosingBehavior.FIRE_IF_NON_EMPTY); // 1 element, force its trigger to close. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); tester.injectElements(TimestampedValue.of(2, new Instant(2))); // 3 elements, one already closed. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(false); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); tester.injectElements(TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2)), TimestampedValue.of(3, new Instant(3))); @@ -1052,7 +1083,7 @@ public void testIdempotentEmptyPanesDiscarding() throws Exception { // Test uninteresting (empty) panes don't increment the index or otherwise // modify PaneInfo. ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.DISCARDING_FIRED_PANES, Duration.millis(100), ClosingBehavior.FIRE_IF_NON_EMPTY); @@ -1062,16 +1093,16 @@ public void testIdempotentEmptyPanesDiscarding() throws Exception { tester.advanceInputWatermark(new Instant(12)); // Fire the on-time pane - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME); // Fire another timer (with no data, so it's an uninteresting pane that should not be output). - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME); // Finish it off with another datum. - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 3); // The intermediate trigger firing shouldn't result in any output. @@ -1097,7 +1128,7 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception { // Test uninteresting (empty) panes don't increment the index or otherwise // modify PaneInfo. ReduceFnTester, IntervalWindow> tester = - ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTrigger, + ReduceFnTester.nonCombining(FixedWindows.of(Duration.millis(10)), mockTriggerStateMachine, AccumulationMode.ACCUMULATING_FIRED_PANES, Duration.millis(100), ClosingBehavior.FIRE_IF_NON_EMPTY); @@ -1107,7 +1138,7 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception { tester.advanceInputWatermark(new Instant(12)); // Trigger the on-time pane - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME); List>> output = tester.extractOutput(); assertThat(output.size(), equalTo(1)); @@ -1117,13 +1148,13 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception { // Fire another timer with no data; the empty pane should not be output even though the // trigger is ready to fire - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME); assertThat(tester.extractOutput().size(), equalTo(0)); // Finish it off with another datum, which is late - when(mockTrigger.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTrigger); + when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); + triggerShouldFinish(mockTriggerStateMachine); injectElement(tester, 3); output = tester.extractOutput(); assertThat(output.size(), equalTo(1)); @@ -1146,19 +1177,25 @@ public void testIdempotentEmptyPanesAccumulating() throws Exception { */ @Test public void testEmptyOnTimeFromOrFinally() throws Exception { + + WindowingStrategy strategy = + WindowingStrategy.of((WindowFn) FixedWindows.of(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTrigger( + AfterEach.inOrder( + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(new Duration(5))) + .orFinally(AfterWatermark.pastEndOfWindow()), + Repeatedly.forever( + AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(new Duration(25))))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)); + ReduceFnTester tester = - ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)), - AfterEach.inOrder( - Repeatedly - .forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf( - new Duration(5))) - .orFinally(AfterWatermark.pastEndOfWindow()), - Repeatedly.forever( - AfterProcessingTime.pastFirstElementInPane().plusDelayOf( - new Duration(25)))), - AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().asKeyedFn(), - VarIntCoder.of(), Duration.millis(100)); + ReduceFnTester + .combining(strategy, new Sum.SumIntegerFn().asKeyedFn(), VarIntCoder.of()); tester.advanceInputWatermark(new Instant(0)); tester.advanceProcessingTime(new Instant(0)); @@ -1196,9 +1233,11 @@ public void testEmptyOnTimeFromOrFinally() throws Exception { */ @Test public void testProcessingTime() throws Exception { - ReduceFnTester tester = - ReduceFnTester.combining(FixedWindows.of(Duration.millis(10)), - AfterEach.inOrder( + + WindowingStrategy strategy = + WindowingStrategy.of((WindowFn) FixedWindows.of(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTrigger(AfterEach.inOrder( Repeatedly .forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf( @@ -1206,9 +1245,13 @@ public void testProcessingTime() throws Exception { .orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf( - new Duration(25)))), - AccumulationMode.ACCUMULATING_FIRED_PANES, new Sum.SumIntegerFn().asKeyedFn(), - VarIntCoder.of(), Duration.millis(100)); + new Duration(25))))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)); + + ReduceFnTester tester = + ReduceFnTester + .combining(strategy, new Sum.SumIntegerFn().asKeyedFn(), VarIntCoder.of()); tester.advanceInputWatermark(new Instant(0)); tester.advanceProcessingTime(new Instant(0)); @@ -1352,11 +1395,13 @@ public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception { public void setGarbageCollectionHoldOnLateElements() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( - FixedWindows.of(Duration.millis(10)), - AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(2)), - AccumulationMode.DISCARDING_FIRED_PANES, - Duration.millis(100), - ClosingBehavior.FIRE_IF_NON_EMPTY); + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withTrigger( + AfterWatermark.pastEndOfWindow() + .withLateFirings(AfterPane.elementCountAtLeast(2))) + .withMode(AccumulationMode.DISCARDING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)) + .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)); tester.advanceInputWatermark(new Instant(0)); tester.advanceOutputWatermark(new Instant(0)); diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 5752b113f3a0..f707349fca26 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -38,6 +38,10 @@ import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.IterableCoder; @@ -59,7 +63,6 @@ import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.AppliedCombineFn; -import org.apache.beam.sdk.util.ExecutableTrigger; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.SideInputReader; @@ -106,6 +109,7 @@ public class ReduceFnTester { private final TestWindowingInternals windowingInternals; private final Coder outputCoder; private final WindowingStrategy objectStrategy; + private final ExecutableTriggerStateMachine executableTriggerStateMachine; private final ReduceFn reduceFn; private final PipelineOptions options; @@ -117,38 +121,99 @@ public class ReduceFnTester { */ private boolean autoAdvanceOutputWatermark = true; - private ExecutableTrigger executableTrigger; - private final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator(GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); + /** + * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, creating + * a {@link TriggerStateMachine} from its {@link Trigger}. + */ public static ReduceFnTester, W> nonCombining(WindowingStrategy windowingStrategy) throws Exception { return new ReduceFnTester, W>( windowingStrategy, + TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger().getSpec()), SystemReduceFn.buffering(VarIntCoder.of()), IterableCoder.of(VarIntCoder.of()), PipelineOptionsFactory.create(), NullSideInputReader.empty()); } - public static ReduceFnTester, W> - nonCombining(WindowFn windowFn, Trigger trigger, AccumulationMode mode, - Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception { + /** + * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and {@link + * TriggerStateMachine}, for mocking the interactions between {@link ReduceFnRunner} and the + * {@link TriggerStateMachine}. + * + *

Ignores the {@link Trigger} on the {@link WindowingStrategy}. + */ + public static + ReduceFnTester, W> nonCombining( + WindowingStrategy windowingStrategy, TriggerStateMachine triggerStateMachine) + throws Exception { + return new ReduceFnTester<>( + windowingStrategy, + triggerStateMachine, + SystemReduceFn.buffering(VarIntCoder.of()), + IterableCoder.of(VarIntCoder.of()), + PipelineOptionsFactory.create(), + NullSideInputReader.empty()); + } + + public static + ReduceFnTester, W> nonCombining( + WindowFn windowFn, + TriggerStateMachine triggerStateMachine, + AccumulationMode mode, + Duration allowedDataLateness, + ClosingBehavior closingBehavior) + throws Exception { WindowingStrategy strategy = WindowingStrategy.of(windowFn) .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) - .withTrigger(trigger) .withMode(mode) .withAllowedLateness(allowedDataLateness) .withClosingBehavior(closingBehavior); - return nonCombining(strategy); + return nonCombining(strategy, triggerStateMachine); } - public static ReduceFnTester - combining(WindowingStrategy strategy, + /** + * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} and + * {@link KeyedCombineFn}, creating a {@link TriggerStateMachine} from the + * {@link Trigger} in the {@link WindowingStrategy}. + */ + public static + ReduceFnTester combining( + WindowingStrategy strategy, KeyedCombineFn combineFn, - Coder outputCoder) throws Exception { + Coder outputCoder) + throws Exception { + + CoderRegistry registry = new CoderRegistry(); + registry.registerStandardCoders(); + AppliedCombineFn fn = + AppliedCombineFn.withInputCoder( + combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); + + return combining( + strategy, + TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()), + combineFn, + outputCoder); + } + + /** + * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}, + * {@link KeyedCombineFn}, and {@link TriggerStateMachine}, for mocking the interaction + * between {@link ReduceFnRunner} and the {@link TriggerStateMachine}. + * Ignores the {@link Trigger} in the {@link WindowingStrategy}. + */ + public static + ReduceFnTester combining( + WindowingStrategy strategy, + TriggerStateMachine triggerStateMachine, + KeyedCombineFn combineFn, + Coder outputCoder) + throws Exception { CoderRegistry registry = new CoderRegistry(); registry.registerStandardCoders(); @@ -158,18 +223,45 @@ public class ReduceFnTester { return new ReduceFnTester( strategy, + triggerStateMachine, SystemReduceFn.combining(StringUtf8Coder.of(), fn), outputCoder, PipelineOptionsFactory.create(), NullSideInputReader.empty()); } - public static ReduceFnTester - combining(WindowingStrategy strategy, + public static + ReduceFnTester combining( + WindowingStrategy strategy, + KeyedCombineFnWithContext combineFn, + Coder outputCoder, + PipelineOptions options, + SideInputReader sideInputReader) + throws Exception { + CoderRegistry registry = new CoderRegistry(); + registry.registerStandardCoders(); + AppliedCombineFn fn = + AppliedCombineFn.withInputCoder( + combineFn, registry, KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())); + + return combining( + strategy, + TriggerStateMachines.stateMachineForTrigger(strategy.getTrigger().getSpec()), + combineFn, + outputCoder, + options, + sideInputReader); + } + + public static + ReduceFnTester combining( + WindowingStrategy strategy, + TriggerStateMachine triggerStateMachine, KeyedCombineFnWithContext combineFn, Coder outputCoder, PipelineOptions options, - SideInputReader sideInputReader) throws Exception { + SideInputReader sideInputReader) + throws Exception { CoderRegistry registry = new CoderRegistry(); registry.registerStandardCoders(); AppliedCombineFn fn = @@ -178,29 +270,21 @@ public class ReduceFnTester { return new ReduceFnTester( strategy, + triggerStateMachine, SystemReduceFn.combining(StringUtf8Coder.of(), fn), outputCoder, options, sideInputReader); } - public static ReduceFnTester - combining(WindowFn windowFn, Trigger trigger, AccumulationMode mode, - KeyedCombineFn combineFn, Coder outputCoder, - Duration allowedDataLateness) throws Exception { - - WindowingStrategy strategy = - WindowingStrategy.of(windowFn) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) - .withTrigger(trigger) - .withMode(mode) - .withAllowedLateness(allowedDataLateness); - - return combining(strategy, combineFn, outputCoder); - } - private ReduceFnTester(WindowingStrategy wildcardStrategy, - ReduceFn reduceFn, Coder outputCoder, - PipelineOptions options, SideInputReader sideInputReader) throws Exception { + private ReduceFnTester( + WindowingStrategy wildcardStrategy, + TriggerStateMachine triggerStateMachine, + ReduceFn reduceFn, + Coder outputCoder, + PipelineOptions options, + SideInputReader sideInputReader) + throws Exception { @SuppressWarnings("unchecked") WindowingStrategy objectStrategy = (WindowingStrategy) wildcardStrategy; @@ -208,8 +292,8 @@ private ReduceFnTester(WindowingStrategy wildcardStrategy, this.reduceFn = reduceFn; this.windowFn = objectStrategy.getWindowFn(); this.windowingInternals = new TestWindowingInternals(sideInputReader); + this.executableTriggerStateMachine = ExecutableTriggerStateMachine.create(triggerStateMachine); this.outputCoder = outputCoder; - this.executableTrigger = wildcardStrategy.getTrigger(); this.options = options; } @@ -226,6 +310,7 @@ ReduceFnRunner createRunner() { return new ReduceFnRunner<>( KEY, objectStrategy, + executableTriggerStateMachine, stateInternals, timerInternals, windowingInternals, @@ -234,10 +319,6 @@ ReduceFnRunner createRunner() { options); } - public ExecutableTrigger getTrigger() { - return executableTrigger; - } - public boolean isMarkedFinished(W window) { return createRunner().isFinished(window); } @@ -250,7 +331,7 @@ public boolean hasNoActiveWindows() { public final void assertHasOnlyGlobalAndFinishedSetsFor(W... expectedWindows) { assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows), - ImmutableSet.>of(TriggerRunner.FINISHED_BITS_TAG)); + ImmutableSet.>of(TriggerStateMachineRunner.FINISHED_BITS_TAG)); } @SafeVarargs @@ -258,7 +339,7 @@ public final void assertHasOnlyGlobalAndFinishedSetsAndPaneInfoFor(W... expected assertHasOnlyGlobalAndAllowedTags( ImmutableSet.copyOf(expectedWindows), ImmutableSet.>of( - TriggerRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG, + TriggerStateMachineRunner.FINISHED_BITS_TAG, PaneInfoTracker.PANE_INFO_TAG, WatermarkHold.watermarkHoldTagForOutputTimeFn(objectStrategy.getOutputTimeFn()), WatermarkHold.EXTRA_HOLD_TAG)); } From 33d9baaf5778c565632f6fe98344b8f1bd8a1d75 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 19 Oct 2016 17:41:39 -0700 Subject: [PATCH 2/3] Delete TriggerRunner --- .../beam/runners/core/TriggerRunner.java | 247 ------------------ 1 file changed, 247 deletions(-) delete mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java deleted file mode 100644 index 8d0f32254585..000000000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/TriggerRunner.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.util.BitSet; -import java.util.Collection; -import java.util.Map; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; -import org.apache.beam.sdk.transforms.windowing.Trigger; -import org.apache.beam.sdk.util.BitSetCoder; -import org.apache.beam.sdk.util.ExecutableTrigger; -import org.apache.beam.sdk.util.FinishedTriggers; -import org.apache.beam.sdk.util.FinishedTriggersBitSet; -import org.apache.beam.sdk.util.Timers; -import org.apache.beam.sdk.util.TriggerContextFactory; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.StateAccessor; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.ValueState; -import org.joda.time.Instant; - -/** - * Executes a trigger while managing persistence of information about which subtriggers are - * finished. Subtriggers include all recursive trigger expressions as well as the entire trigger. - * - *

Specifically, the responsibilities are: - * - *

    - *
  • Invoking the trigger's methods via its {@link ExecutableTrigger} wrapper by - * constructing the appropriate trigger contexts.
  • - *
  • Committing a record of which subtriggers are finished to persistent state.
  • - *
  • Restoring the record of which subtriggers are finished from persistent state.
  • - *
  • Clearing out the persisted finished set when a caller indicates - * (via {#link #clearFinished}) that it is no longer needed.
  • - *
- * - *

These responsibilities are intertwined: trigger contexts include mutable information about - * which subtriggers are finished. This class provides the information when building the contexts - * and commits the information when the method of the {@link ExecutableTrigger} returns. - * - * @param The kind of windows being processed. - */ -public class TriggerRunner { - @VisibleForTesting - static final StateTag> FINISHED_BITS_TAG = - StateTags.makeSystemTagInternal(StateTags.value("closed", BitSetCoder.of())); - - private final ExecutableTrigger rootTrigger; - private final TriggerContextFactory contextFactory; - - public TriggerRunner(ExecutableTrigger rootTrigger, TriggerContextFactory contextFactory) { - checkState(rootTrigger.getTriggerIndex() == 0); - this.rootTrigger = rootTrigger; - this.contextFactory = contextFactory; - } - - private FinishedTriggersBitSet readFinishedBits(ValueState state) { - if (!isFinishedSetNeeded()) { - // If no trigger in the tree will ever have finished bits, then we don't need to read them. - // So that the code can be agnostic to that fact, we create a BitSet that is all 0 (not - // finished) for each trigger in the tree. - return FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()); - } - - BitSet bitSet = state.read(); - return bitSet == null - ? FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree()) - : FinishedTriggersBitSet.fromBitSet(bitSet); - } - - - private void clearFinishedBits(ValueState state) { - if (!isFinishedSetNeeded()) { - // Nothing to clear. - return; - } - state.clear(); - } - - /** Return true if the trigger is closed in the window corresponding to the specified state. */ - public boolean isClosed(StateAccessor state) { - return readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger); - } - - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") - public void prefetchForValue(W window, StateAccessor state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } - rootTrigger.getSpec().prefetchOnElement( - contextFactory.createStateAccessor(window, rootTrigger)); - } - - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") - public void prefetchOnFire(W window, StateAccessor state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } - rootTrigger.getSpec().prefetchOnFire(contextFactory.createStateAccessor(window, rootTrigger)); - } - - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") - public void prefetchShouldFire(W window, StateAccessor state) { - if (isFinishedSetNeeded()) { - state.access(FINISHED_BITS_TAG).readLater(); - } - rootTrigger.getSpec().prefetchShouldFire( - contextFactory.createStateAccessor(window, rootTrigger)); - } - - /** - * Run the trigger logic to deal with a new value. - */ - public void processValue(W window, Instant timestamp, Timers timers, StateAccessor state) - throws Exception { - // Clone so that we can detect changes and so that changes here don't pollute merging. - FinishedTriggersBitSet finishedSet = - readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - Trigger.OnElementContext triggerContext = contextFactory.createOnElementContext( - window, timers, timestamp, rootTrigger, finishedSet); - rootTrigger.invokeOnElement(triggerContext); - persistFinishedSet(state, finishedSet); - } - - @SuppressFBWarnings(value = "RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT", - justification = "prefetch side effect") - public void prefetchForMerge( - W window, Collection mergingWindows, MergingStateAccessor state) { - if (isFinishedSetNeeded()) { - for (ValueState value : state.accessInEachMergingWindow(FINISHED_BITS_TAG).values()) { - value.readLater(); - } - } - rootTrigger.getSpec().prefetchOnMerge(contextFactory.createMergingStateAccessor( - window, mergingWindows, rootTrigger)); - } - - /** - * Run the trigger merging logic as part of executing the specified merge. - */ - public void onMerge(W window, Timers timers, MergingStateAccessor state) throws Exception { - // Clone so that we can detect changes and so that changes here don't pollute merging. - FinishedTriggersBitSet finishedSet = - readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - - // And read the finished bits in each merging window. - ImmutableMap.Builder builder = ImmutableMap.builder(); - for (Map.Entry> entry : - state.accessInEachMergingWindow(FINISHED_BITS_TAG).entrySet()) { - // Don't need to clone these, since the trigger context doesn't allow modification - builder.put(entry.getKey(), readFinishedBits(entry.getValue())); - // Clear the underlying finished bits. - clearFinishedBits(entry.getValue()); - } - ImmutableMap mergingFinishedSets = builder.build(); - - Trigger.OnMergeContext mergeContext = contextFactory.createOnMergeContext( - window, timers, rootTrigger, finishedSet, mergingFinishedSets); - - // Run the merge from the trigger - rootTrigger.invokeOnMerge(mergeContext); - - persistFinishedSet(state, finishedSet); - } - - public boolean shouldFire(W window, Timers timers, StateAccessor state) throws Exception { - FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - Trigger.TriggerContext context = contextFactory.base(window, timers, - rootTrigger, finishedSet); - return rootTrigger.invokeShouldFire(context); - } - - public void onFire(W window, Timers timers, StateAccessor state) throws Exception { - // shouldFire should be false. - // However it is too expensive to assert. - FinishedTriggersBitSet finishedSet = - readFinishedBits(state.access(FINISHED_BITS_TAG)).copy(); - Trigger.TriggerContext context = contextFactory.base(window, timers, - rootTrigger, finishedSet); - rootTrigger.invokeOnFire(context); - persistFinishedSet(state, finishedSet); - } - - private void persistFinishedSet( - StateAccessor state, FinishedTriggersBitSet modifiedFinishedSet) { - if (!isFinishedSetNeeded()) { - return; - } - - ValueState finishedSetState = state.access(FINISHED_BITS_TAG); - if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) { - if (modifiedFinishedSet.getBitSet().isEmpty()) { - finishedSetState.clear(); - } else { - finishedSetState.write(modifiedFinishedSet.getBitSet()); - } - } - } - - /** - * Clear the finished bits. - */ - public void clearFinished(StateAccessor state) { - clearFinishedBits(state.access(FINISHED_BITS_TAG)); - } - - /** - * Clear the state used for executing triggers, but leave the finished set to indicate - * the window is closed. - */ - public void clearState(W window, Timers timers, StateAccessor state) throws Exception { - // Don't need to clone, because we'll be clearing the finished bits anyways. - FinishedTriggers finishedSet = readFinishedBits(state.access(FINISHED_BITS_TAG)); - rootTrigger.invokeClear(contextFactory.base(window, timers, rootTrigger, finishedSet)); - } - - private boolean isFinishedSetNeeded() { - // TODO: If we know that no trigger in the tree will ever finish, we don't need to do the - // lookup. Right now, we special case this for the DefaultTrigger. - return !(rootTrigger.getSpec() instanceof DefaultTrigger); - } -} From be07065fe6ae404cc36d21c95ff5025e0ed4233c Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 20 Oct 2016 10:14:08 -0700 Subject: [PATCH 3/3] Translate ReshuffleTrigger to ReshuffleTriggerStateMachine --- .../beam/runners/core/triggers/TriggerStateMachines.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java index 317e3b94698f..f19f3cf4b2ea 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.ReshuffleTrigger; import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; @@ -100,6 +101,10 @@ private TriggerStateMachine evaluateSpecific(DefaultTrigger v) { return DefaultTriggerStateMachine.of(); } + private TriggerStateMachine evaluateSpecific(ReshuffleTrigger v) { + return new ReshuffleTriggerStateMachine(); + } + private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) { return AfterWatermarkStateMachine.pastEndOfWindow(); }