From 87c7811a3bab6bf4bc0b8b9181127fe074579898 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 20:08:53 -0700 Subject: [PATCH 01/17] Make Trigger#subTriggers public and non-null --- .../beam/sdk/transforms/windowing/DefaultTrigger.java | 2 +- .../apache/beam/sdk/transforms/windowing/Trigger.java | 9 ++++++--- .../java/org/apache/beam/sdk/util/ReshuffleTrigger.java | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java index d6b72ef603520..fee7cdfb76df8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/DefaultTrigger.java @@ -30,7 +30,7 @@ public class DefaultTrigger extends Trigger{ private DefaultTrigger() { - super(null); + super(); } /** diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java index a960aa4b7a009..cfabb8ba46c88 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -20,6 +20,7 @@ import com.google.common.base.Joiner; import java.io.Serializable; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Objects; import javax.annotation.Nullable; @@ -263,13 +264,15 @@ public abstract class OnMergeContext extends TriggerContext { public abstract MergingTriggerInfo trigger(); } - @Nullable protected final List subTriggers; - protected Trigger(@Nullable List subTriggers) { + protected Trigger(List subTriggers) { this.subTriggers = subTriggers; } + protected Trigger() { + this(Collections.EMPTY_LIST); + } /** * Called every time an element is incorporated into a window. @@ -370,7 +373,7 @@ public void clear(TriggerContext c) throws Exception { } } - public Iterable subTriggers() { + public List subTriggers() { return subTriggers; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java index 9e2c27d1bc7f1..437f14a73ea0d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java @@ -31,7 +31,7 @@ public class ReshuffleTrigger extends Trigger { public ReshuffleTrigger() { - super(null); + super(); } @Override From b2bb7c048086a3e5eee7d2652d4bb971bc0694e7 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 20:42:38 -0700 Subject: [PATCH 02/17] Construct AfterAllStateMachine with a list of subtriggers --- .../beam/runners/core/triggers/AfterAllStateMachine.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java index 2f4ad634f3c8b..12cbc3da15e66 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.List; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; @@ -45,6 +46,10 @@ public static OnceTriggerStateMachine of(OnceTriggerStateMachine... triggers) { return new AfterAllStateMachine(Arrays.asList(triggers)); } + public static OnceTriggerStateMachine of(Iterable triggers) { + return new AfterAllStateMachine(ImmutableList.copyOf(triggers)); + } + @Override public void onElement(OnElementContext c) throws Exception { for (ExecutableTriggerStateMachine subTrigger : c.trigger().unfinishedSubTriggers()) { From 77332f1e612caf9090e148e1493c11ca8e753076 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 20:43:08 -0700 Subject: [PATCH 03/17] Construct AfterFirstStateMachine with a list of subtriggers --- .../beam/runners/core/triggers/AfterFirstStateMachine.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java index 272c27830b3c1..f4b305e20190e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.List; import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; @@ -46,6 +47,11 @@ public static OnceTriggerStateMachine of( return new AfterFirstStateMachine(Arrays.asList(triggers)); } + public static OnceTriggerStateMachine of( + Iterable triggers) { + return new AfterFirstStateMachine(ImmutableList.copyOf(triggers)); + } + @Override public void onElement(OnElementContext c) throws Exception { for (ExecutableTriggerStateMachine subTrigger : c.trigger().subTriggers()) { From b19918df1992d445ad8c13a63722c690ddca3899 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 20:43:33 -0700 Subject: [PATCH 04/17] Make AfterSynchronizedProcessingTime public We need to be able to access this class to reason about it when converting a trigger to a state machine. --- .../windowing/AfterSynchronizedProcessingTime.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java index 59ece1073c20f..b96b29344164d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterSynchronizedProcessingTime.java @@ -25,7 +25,11 @@ import org.apache.beam.sdk.util.TimeDomain; import org.joda.time.Instant; -class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement { +/** + * A trigger that fires after synchronized processing time has reached a shared + * threshold between upstream workers. + */ +public class AfterSynchronizedProcessingTime extends AfterDelayFromFirstElement { @Override @Nullable From 8afb80e18f80a9d5a4ed18623a770dbf15ff5e65 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 20:44:35 -0700 Subject: [PATCH 05/17] Add direct accessors for the components of OrFinallyTrigger --- .../transforms/windowing/OrFinallyTrigger.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java index 25b7b34b520b4..1a0345084a8a9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -35,6 +35,21 @@ class OrFinallyTrigger extends Trigger { super(Arrays.asList(actual, until)); } + /** + * The main trigger, which will continue firing until the "until" trigger fires. See + * {@link #getUntilTrigger()} + */ + public Trigger getMainTrigger() { + return subTriggers().get(ACTUAL); + } + + /** + * The trigger that signals termination of this trigger. + */ + public OnceTrigger getUntilTrigger() { + return (OnceTrigger) subTriggers().get(UNTIL); + } + @Override public void onElement(OnElementContext c) throws Exception { c.trigger().subTrigger(ACTUAL).invokeOnElement(c); From f46ce0db372ffb66915eda22092bd760e474b9c0 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 20:56:54 -0700 Subject: [PATCH 06/17] Construct AfterEachStateMachine from list of subtriggers --- .../beam/runners/core/triggers/AfterEachStateMachine.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java index 140ac752ffd86..38357d47be6d1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterEachStateMachine.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.List; @@ -54,6 +55,10 @@ public static TriggerStateMachine inOrder(TriggerStateMachine... triggers) { return new AfterEachStateMachine(Arrays.asList(triggers)); } + public static TriggerStateMachine inOrder(Iterable triggers) { + return new AfterEachStateMachine(ImmutableList.copyOf(triggers)); + } + @Override public void onElement(OnElementContext c) throws Exception { if (!c.trigger().isMerging()) { From 303a42ab993aee70527272dd908c3568a29f1e27 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 20:57:48 -0700 Subject: [PATCH 07/17] Add accessors to Repeatedly trigger --- .../beam/sdk/transforms/windowing/Repeatedly.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java index 88587983dfc20..45bc6c128ef66 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -49,10 +49,16 @@ public static Repeatedly forever(Trigger repeated) { return new Repeatedly(repeated); } - private Repeatedly(Trigger repeated) { - super(Arrays.asList(repeated)); + private Trigger repeatedTrigger; + + private Repeatedly(Trigger repeatedTrigger) { + super(Arrays.asList(repeatedTrigger)); + this.repeatedTrigger = repeatedTrigger; } + public Trigger getRepeatedTrigger() { + return repeatedTrigger; + } @Override public void onElement(OnElementContext c) throws Exception { From 2e565172d74bc906bdbdb1c4ee986c9e3d65089f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 20:58:12 -0700 Subject: [PATCH 08/17] Make OrFinallyTrigger public so it can be examined --- .../apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java index 1a0345084a8a9..9bef45ad3a114 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -26,7 +26,7 @@ /** * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires. */ -class OrFinallyTrigger extends Trigger { +public class OrFinallyTrigger extends Trigger { private static final int ACTUAL = 0; private static final int UNTIL = 1; From 0df929f984c2f5af3d19a4c54635c345ccd5b410 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 21:13:27 -0700 Subject: [PATCH 09/17] Add accessors to AfterWatermark trigger --- .../beam/sdk/transforms/windowing/AfterWatermark.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index e2463d83fe962..da96de37babb9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -85,6 +85,14 @@ public static class AfterWatermarkEarlyAndLate extends Trigger { @Nullable private final OnceTrigger lateTrigger; + public OnceTrigger getEarlyTrigger() { + return earlyTrigger; + } + + public OnceTrigger getLateTrigger() { + return lateTrigger; + } + @SuppressWarnings("unchecked") public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrigger) { super(lateTrigger == null From b4445ac43530441463ee11c395bcf631b66ef2e9 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 22:13:43 -0700 Subject: [PATCH 10/17] Add access to values from AfterDelay triggers --- .../AfterDelayFromFirstElementStateMachine.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index a6616fafbc87e..02b156b1498f1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -82,7 +82,7 @@ protected abstract AfterDelayFromFirstElementStateMachine newWith( */ protected final List> timestampMappers; - private final TimeDomain timeDomain; + protected final TimeDomain timeDomain; public AfterDelayFromFirstElementStateMachine( TimeDomain timeDomain, @@ -96,6 +96,21 @@ private Instant getTargetTimestamp(OnElementContext c) { return computeTargetTimestamp(c.currentProcessingTime()); } + /** + * The time domain according for which this trigger sets timers. + */ + public TimeDomain getTimeDomain() { + return timeDomain; + } + + /** + * The mapping functions applied to the arrival time of an element to determine when to + * set a wake-up timer for triggering. + */ + public List> getTimestampMappers() { + return timestampMappers; + } + /** * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater * than the timestamp. From e0c576649382ea4d1d70ee9e54ff25018210dcfb Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 22:14:32 -0700 Subject: [PATCH 11/17] Add accessors for AfterPane(StateMachine) parameters --- .../beam/runners/core/triggers/AfterPaneStateMachine.java | 7 +++++++ .../apache/beam/sdk/transforms/windowing/AfterPane.java | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java index 723aba60e7229..288643d3d87fc 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java @@ -48,6 +48,13 @@ private AfterPaneStateMachine(int countElems) { this.countElems = countElems; } + /** + * The number of elements after which this trigger may fire. + */ + public int getElementCount() { + return countElems; + } + /** * Creates a trigger that fires when the pane contains at least {@code countElems} elements. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java index 8c128dd32bc22..4d59d58539d44 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterPane.java @@ -50,6 +50,13 @@ private AfterPane(int countElems) { this.countElems = countElems; } + /** + * The number of elements after which this trigger may fire. + */ + public int getElementCount() { + return countElems; + } + /** * Creates a trigger that fires when the pane contains at least {@code countElems} elements. */ From e1c5bfbc76c0ef3766d5b1bf2dbd47e13f0ed97c Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 22:15:32 -0700 Subject: [PATCH 12/17] Make return types of trigger static factory methods precise This is helpful in testing and loses no abstraction - the code locations calling these methods already know the more-specific type that will be returned. --- .../org/apache/beam/sdk/transforms/windowing/AfterAll.java | 2 +- .../org/apache/beam/sdk/transforms/windowing/AfterEach.java | 2 +- .../org/apache/beam/sdk/transforms/windowing/AfterFirst.java | 2 +- .../apache/beam/sdk/transforms/windowing/AfterWatermark.java | 4 ++-- .../java/org/apache/beam/sdk/transforms/windowing/Never.java | 2 +- .../org/apache/beam/sdk/transforms/windowing/Trigger.java | 2 +- 6 files changed, 7 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index cc8c97fa14bf6..0e37d332d248a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -41,7 +41,7 @@ private AfterAll(List subTriggers) { /** * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. */ - public static OnceTrigger of(OnceTrigger... triggers) { + public static AfterAll of(OnceTrigger... triggers) { return new AfterAll(Arrays.asList(triggers)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 629c640eabddf..961d97f432fd0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -54,7 +54,7 @@ private AfterEach(List subTriggers) { * Returns an {@code AfterEach} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static Trigger inOrder(Trigger... triggers) { + public static AfterEach inOrder(Trigger... triggers) { return new AfterEach(Arrays.asList(triggers)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index 6b06cfa09eb3f..7840fc4089708 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -42,7 +42,7 @@ public class AfterFirst extends OnceTrigger { /** * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. */ - public static OnceTrigger of(OnceTrigger... triggers) { + public static AfterFirst of(OnceTrigger... triggers) { return new AfterFirst(Arrays.asList(triggers)); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index da96de37babb9..89c1ba9375766 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -102,11 +102,11 @@ public AfterWatermarkEarlyAndLate(OnceTrigger earlyTrigger, OnceTrigger lateTrig this.lateTrigger = lateTrigger; } - public Trigger withEarlyFirings(OnceTrigger earlyTrigger) { + public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTrigger earlyTrigger) { return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } - public Trigger withLateFirings(OnceTrigger lateTrigger) { + public AfterWatermarkEarlyAndLate withLateFirings(OnceTrigger lateTrigger) { return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 5f2046579cb43..353258b87b233 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -34,7 +34,7 @@ public final class Never { * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey} * when the {@link BoundedWindow} closes. */ - public static OnceTrigger ever() { + public static NeverTrigger ever() { // NeverTrigger ignores all inputs and is Window-type independent. return new NeverTrigger(); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java index cfabb8ba46c88..90e9386fb1e0c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Trigger.java @@ -490,7 +490,7 @@ public int hashCode() { *

Note that if {@code t1} is {@link OnceTrigger}, then {@code t1.orFinally(t2)} is the same * as {@code AfterFirst.of(t1, t2)}. */ - public Trigger orFinally(OnceTrigger until) { + public OrFinallyTrigger orFinally(OnceTrigger until) { return new OrFinallyTrigger(this, until); } From fa9b3812e7262b1e0368f613d3f667b71f5de59e Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 22:17:14 -0700 Subject: [PATCH 13/17] Accessors for AfterDelay --- .../windowing/AfterDelayFromFirstElement.java | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java index c4bc94615dc3d..6078b346d4984 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java @@ -96,6 +96,21 @@ private Instant getTargetTimestamp(OnElementContext c) { return computeTargetTimestamp(c.currentProcessingTime()); } + /** + * The time domain according for which this trigger sets timers. + */ + public TimeDomain getTimeDomain() { + return timeDomain; + } + + /** + * The mapping functions applied to the arrival time of an element to determine when to + * set a wake-up timer for triggering. + */ + public List> getTimestampMappers() { + return timestampMappers; + } + /** * Aligns timestamps to the smallest multiple of {@code size} since the {@code offset} greater * than the timestamp. From 703c84efd9c8219b2a8a205f0b6f1f9a999e927c Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 22:17:26 -0700 Subject: [PATCH 14/17] Make NeverTrigger public for translation --- .../org/apache/beam/sdk/transforms/windowing/Never.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 353258b87b233..07b70f4ee3892 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -39,8 +39,10 @@ public static NeverTrigger ever() { return new NeverTrigger(); } - // package-private in order to check identity for string formatting. - static class NeverTrigger extends OnceTrigger { + /** + * The actual trigger class for {@link Never} triggers. + */ + public static class NeverTrigger extends OnceTrigger { protected NeverTrigger() { super(null); } From 476dcd740c97831a5d55953014fc5d99135addce Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 22:46:08 -0700 Subject: [PATCH 15/17] Make return types more precise for AfterWatermarkTriggerStateMachine --- .../runners/core/triggers/AfterWatermarkStateMachine.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index 5ad62144afae3..524c057cf13a1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -93,11 +93,11 @@ private AfterWatermarkEarlyAndLate( this.lateTrigger = lateTrigger; } - public TriggerStateMachine withEarlyFirings(OnceTriggerStateMachine earlyTrigger) { + public AfterWatermarkEarlyAndLate withEarlyFirings(OnceTriggerStateMachine earlyTrigger) { return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } - public TriggerStateMachine withLateFirings(OnceTriggerStateMachine lateTrigger) { + public AfterWatermarkEarlyAndLate withLateFirings(OnceTriggerStateMachine lateTrigger) { return new AfterWatermarkEarlyAndLate(earlyTrigger, lateTrigger); } From 2107f7961e863a34c71a3fcaa4dd900d6394ed05 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Mon, 17 Oct 2016 19:54:12 -0700 Subject: [PATCH 16/17] Touch up javadoc for AfterDelayFromFirstElementStateMachine --- .../core/triggers/AfterDelayFromFirstElementStateMachine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index 02b156b1498f1..d9d2c42214719 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -97,7 +97,7 @@ private Instant getTargetTimestamp(OnElementContext c) { } /** - * The time domain according for which this trigger sets timers. + * The time domain according to which this trigger sets timers. */ public TimeDomain getTimeDomain() { return timeDomain; From 00672961b5a3115c298c457dfe43f543947298a0 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 13 Oct 2016 20:02:52 -0700 Subject: [PATCH 17/17] Add TriggerStateMachines with conversion from Trigger --- .../core/triggers/TriggerStateMachines.java | 210 ++++++++++++++++++ .../triggers/TriggerStateMachinesTest.java | 199 +++++++++++++++++ 2 files changed, 409 insertions(+) create mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java create mode 100644 runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java new file mode 100644 index 0000000000000..317e3b94698f6 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachines.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.List; +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterSynchronizedProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; +import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Trigger; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Instant; + +/** Translates a {@link Trigger} to a {@link TriggerStateMachine}. */ +public class TriggerStateMachines { + + private TriggerStateMachines() {} + + @VisibleForTesting static final StateMachineConverter CONVERTER = new StateMachineConverter(); + + public static TriggerStateMachine stateMachineForTrigger(Trigger trigger) { + return CONVERTER.evaluateTrigger(trigger); + } + + public static OnceTriggerStateMachine stateMachineForOnceTrigger(OnceTrigger trigger) { + return CONVERTER.evaluateOnceTrigger(trigger); + } + + @VisibleForTesting + static class StateMachineConverter { + + public TriggerStateMachine evaluateTrigger(Trigger trigger) { + Method evaluationMethod = getEvaluationMethod(trigger.getClass()); + return tryEvaluate(evaluationMethod, trigger); + } + + public OnceTriggerStateMachine evaluateOnceTrigger(OnceTrigger trigger) { + Method evaluationMethod = getEvaluationMethod(trigger.getClass()); + return (OnceTriggerStateMachine) tryEvaluate(evaluationMethod, trigger); + } + + private TriggerStateMachine tryEvaluate(Method evaluationMethod, Trigger trigger) { + try { + return (TriggerStateMachine) evaluationMethod.invoke(this, trigger); + } catch (InvocationTargetException exc) { + if (exc.getCause() instanceof RuntimeException) { + throw (RuntimeException) exc.getCause(); + } else { + throw new RuntimeException(exc.getCause()); + } + } catch (IllegalAccessException exc) { + throw new IllegalStateException( + String.format("Internal error: could not invoke %s", evaluationMethod)); + } + } + + private Method getEvaluationMethod(Class clazz) { + Method evaluationMethod; + try { + return getClass().getDeclaredMethod("evaluateSpecific", clazz); + } catch (NoSuchMethodException exc) { + throw new UnsupportedOperationException( + String.format( + "Cannot translate trigger class %s to a state machine.", clazz.getCanonicalName()), + exc); + } + } + + private TriggerStateMachine evaluateSpecific(DefaultTrigger v) { + return DefaultTriggerStateMachine.of(); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterWatermark.FromEndOfWindow v) { + return AfterWatermarkStateMachine.pastEndOfWindow(); + } + + private OnceTriggerStateMachine evaluateSpecific(NeverTrigger v) { + return NeverStateMachine.ever(); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterSynchronizedProcessingTime v) { + return new AfterSynchronizedProcessingTimeStateMachine(); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterFirst v) { + List subStateMachines = + Lists.newArrayListWithCapacity(v.subTriggers().size()); + for (Trigger subtrigger : v.subTriggers()) { + subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger)); + } + return AfterFirstStateMachine.of(subStateMachines); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterAll v) { + List subStateMachines = + Lists.newArrayListWithCapacity(v.subTriggers().size()); + for (Trigger subtrigger : v.subTriggers()) { + subStateMachines.add(stateMachineForOnceTrigger((OnceTrigger) subtrigger)); + } + return AfterAllStateMachine.of(subStateMachines); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterPane v) { + return AfterPaneStateMachine.elementCountAtLeast(v.getElementCount()); + } + + private TriggerStateMachine evaluateSpecific(AfterWatermark.AfterWatermarkEarlyAndLate v) { + AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine = + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(stateMachineForOnceTrigger(v.getEarlyTrigger())); + + if (v.getLateTrigger() != null) { + machine = machine.withLateFirings(stateMachineForOnceTrigger(v.getLateTrigger())); + } + return machine; + } + + private TriggerStateMachine evaluateSpecific(AfterEach v) { + List subStateMachines = + Lists.newArrayListWithCapacity(v.subTriggers().size()); + + for (Trigger subtrigger : v.subTriggers()) { + subStateMachines.add(stateMachineForTrigger(subtrigger)); + } + + return AfterEachStateMachine.inOrder(subStateMachines); + } + + private TriggerStateMachine evaluateSpecific(Repeatedly v) { + return RepeatedlyStateMachine.forever(stateMachineForTrigger(v.getRepeatedTrigger())); + } + + private TriggerStateMachine evaluateSpecific(OrFinallyTrigger v) { + return new OrFinallyStateMachine( + stateMachineForTrigger(v.getMainTrigger()), + stateMachineForOnceTrigger(v.getUntilTrigger())); + } + + private OnceTriggerStateMachine evaluateSpecific(AfterProcessingTime v) { + return evaluateSpecific((AfterDelayFromFirstElement) v); + } + + private OnceTriggerStateMachine evaluateSpecific(final AfterDelayFromFirstElement v) { + return new AfterDelayFromFirstElementStateMachineAdapter(v); + } + + private static class AfterDelayFromFirstElementStateMachineAdapter + extends AfterDelayFromFirstElementStateMachine { + + public AfterDelayFromFirstElementStateMachineAdapter(AfterDelayFromFirstElement v) { + this(v.getTimeDomain(), v.getTimestampMappers()); + } + + private AfterDelayFromFirstElementStateMachineAdapter( + TimeDomain timeDomain, List> timestampMappers) { + super(timeDomain, timestampMappers); + } + + @Override + public Instant getCurrentTime(TriggerContext context) { + switch (timeDomain) { + case PROCESSING_TIME: + return context.currentProcessingTime(); + case SYNCHRONIZED_PROCESSING_TIME: + return context.currentSynchronizedProcessingTime(); + case EVENT_TIME: + return context.currentEventTime(); + default: + throw new IllegalArgumentException("A time domain that doesn't exist was received!"); + } + } + + @Override + protected AfterDelayFromFirstElementStateMachine newWith( + List> transform) { + return new AfterDelayFromFirstElementStateMachineAdapter(timeDomain, transform); + } + } + } +} diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java new file mode 100644 index 0000000000000..37f8f10a4e5eb --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachinesTest.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core.triggers; + +import static com.google.common.base.Preconditions.checkNotNull; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; +import org.apache.beam.sdk.transforms.windowing.AfterAll; +import org.apache.beam.sdk.transforms.windowing.AfterDelayFromFirstElement; +import org.apache.beam.sdk.transforms.windowing.AfterEach; +import org.apache.beam.sdk.transforms.windowing.AfterFirst; +import org.apache.beam.sdk.transforms.windowing.AfterPane; +import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; +import org.apache.beam.sdk.transforms.windowing.AfterWatermark; +import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; +import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.Never.NeverTrigger; +import org.apache.beam.sdk.transforms.windowing.OrFinallyTrigger; +import org.apache.beam.sdk.transforms.windowing.Repeatedly; +import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; +import org.apache.beam.sdk.util.TimeDomain; +import org.joda.time.Duration; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests the {@link TriggerStateMachines} static utility methods. */ +@RunWith(JUnit4.class) +public class TriggerStateMachinesTest { + + // + // Tests for leaf trigger translation + // + + @Test + public void testStateMachineForAfterPane() { + int count = 37; + AfterPane trigger = AfterPane.elementCountAtLeast(count); + AfterPaneStateMachine machine = + (AfterPaneStateMachine) TriggerStateMachines.stateMachineForOnceTrigger(trigger); + + assertThat(machine.getElementCount(), equalTo(trigger.getElementCount())); + } + + @Test + public void testStateMachineForAfterProcessingTime() { + Duration minutes = Duration.standardMinutes(94); + Duration hours = Duration.standardHours(13); + + AfterDelayFromFirstElement trigger = + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(minutes).alignedTo(hours); + + AfterDelayFromFirstElementStateMachine machine = + (AfterDelayFromFirstElementStateMachine) + TriggerStateMachines.stateMachineForOnceTrigger(trigger); + + assertThat(machine.getTimeDomain(), equalTo(TimeDomain.PROCESSING_TIME)); + + // This equality is function equality, but due to the structure of the code (no serialization) + // it is OK to check + assertThat(machine.getTimestampMappers(), equalTo(trigger.getTimestampMappers())); + } + + @Test + public void testStateMachineForAfterWatermark() { + AfterWatermark.FromEndOfWindow trigger = AfterWatermark.pastEndOfWindow(); + AfterWatermarkStateMachine.FromEndOfWindow machine = + (AfterWatermarkStateMachine.FromEndOfWindow) + TriggerStateMachines.stateMachineForOnceTrigger(trigger); + // No parameters, so if it doesn't crash, we win! + } + + @Test + public void testDefaultTriggerTranslation() { + DefaultTrigger trigger = DefaultTrigger.of(); + DefaultTriggerStateMachine machine = + (DefaultTriggerStateMachine) + checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger)); + // No parameters, so if it doesn't crash, we win! + } + + @Test + public void testNeverTranslation() { + NeverTrigger trigger = Never.ever(); + NeverStateMachine machine = + (NeverStateMachine) checkNotNull(TriggerStateMachines.stateMachineForTrigger(trigger)); + // No parameters, so if it doesn't crash, we win! + } + + // + // Tests for composite trigger translation + // + // These check just that translation was invoked recursively using somewhat random + // leaf subtriggers; by induction it all holds together. Beyond this, explicit tests + // of particular triggers will suffice. + + private static final int ELEM_COUNT = 472; + private static final Duration DELAY = Duration.standardSeconds(95673); + + private final OnceTrigger subtrigger1 = AfterPane.elementCountAtLeast(ELEM_COUNT); + private final OnceTrigger subtrigger2 = + AfterProcessingTime.pastFirstElementInPane().plusDelayOf(DELAY); + + private final OnceTriggerStateMachine submachine1 = + TriggerStateMachines.stateMachineForOnceTrigger(subtrigger1); + private final OnceTriggerStateMachine submachine2 = + TriggerStateMachines.stateMachineForOnceTrigger(subtrigger2); + + @Test + public void testAfterEachTranslation() { + AfterEach trigger = AfterEach.inOrder(subtrigger1, subtrigger2); + AfterEachStateMachine machine = + (AfterEachStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(AfterEachStateMachine.inOrder(submachine1, submachine2))); + } + + @Test + public void testAfterFirstTranslation() { + AfterFirst trigger = AfterFirst.of(subtrigger1, subtrigger2); + AfterFirstStateMachine machine = + (AfterFirstStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(AfterFirstStateMachine.of(submachine1, submachine2))); + } + + @Test + public void testAfterAllTranslation() { + AfterAll trigger = AfterAll.of(subtrigger1, subtrigger2); + AfterAllStateMachine machine = + (AfterAllStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(AfterAllStateMachine.of(submachine1, submachine2))); + } + + @Test + public void testAfterWatermarkEarlyTranslation() { + AfterWatermark.AfterWatermarkEarlyAndLate trigger = + AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1); + AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine = + (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate) + TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat( + machine, + equalTo(AfterWatermarkStateMachine.pastEndOfWindow().withEarlyFirings(submachine1))); + } + + @Test + public void testAfterWatermarkEarlyLateTranslation() { + AfterWatermark.AfterWatermarkEarlyAndLate trigger = + AfterWatermark.pastEndOfWindow().withEarlyFirings(subtrigger1).withLateFirings(subtrigger2); + AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate machine = + (AfterWatermarkStateMachine.AfterWatermarkEarlyAndLate) + TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat( + machine, + equalTo( + AfterWatermarkStateMachine.pastEndOfWindow() + .withEarlyFirings(submachine1) + .withLateFirings(submachine2))); + } + + @Test + public void testOrFinallyTranslation() { + OrFinallyTrigger trigger = subtrigger1.orFinally(subtrigger2); + OrFinallyStateMachine machine = + (OrFinallyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(submachine1.orFinally(submachine2))); + } + + @Test + public void testRepeatedlyTranslation() { + Repeatedly trigger = Repeatedly.forever(subtrigger1); + RepeatedlyStateMachine machine = + (RepeatedlyStateMachine) TriggerStateMachines.stateMachineForTrigger(trigger); + + assertThat(machine, equalTo(RepeatedlyStateMachine.forever(submachine1))); + } +}