From a501deb8d7d20dc1d2bcd17485fec4e77e7dbd84 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 14 Jun 2017 20:11:49 -0400 Subject: [PATCH 1/3] removed OnceTrigger from TriggerStateMachine implementation --- .../core/triggers/AfterAllStateMachine.java | 20 ++++++++----------- ...fterDelayFromFirstElementStateMachine.java | 5 ++--- .../core/triggers/AfterFirstStateMachine.java | 9 ++++----- .../core/triggers/AfterPaneStateMachine.java | 5 ++--- .../triggers/AfterWatermarkStateMachine.java | 5 ++--- .../core/triggers/NeverStateMachine.java | 5 ++--- 6 files changed, 20 insertions(+), 29 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java index 0f0c17ca41c0..cc201821e228 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.List; -import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; /** @@ -31,7 +30,7 @@ * have fired. */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterAllStateMachine extends OnceTriggerStateMachine { +public class AfterAllStateMachine extends TriggerStateMachine { private AfterAllStateMachine(List subTriggers) { super(subTriggers); @@ -42,11 +41,11 @@ private AfterAllStateMachine(List subTriggers) { * Returns an {@code AfterAll} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static OnceTriggerStateMachine of(TriggerStateMachine... triggers) { + public static TriggerStateMachine of(TriggerStateMachine... triggers) { return new AfterAllStateMachine(Arrays.asList(triggers)); } - public static OnceTriggerStateMachine of(Iterable triggers) { + public static TriggerStateMachine of(Iterable triggers) { return new AfterAllStateMachine(ImmutableList.copyOf(triggers)); } @@ -78,26 +77,23 @@ public void onMerge(OnMergeContext c) throws Exception { */ @Override public boolean shouldFire(TriggerContext context) throws Exception { - for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { - if (!context.forTrigger(subtrigger).trigger().isFinished() - && !subtrigger.invokeShouldFire(context)) { + for (ExecutableTriggerStateMachine subtTrigger : context.trigger().subTriggers()) { + if (!context.forTrigger(subtTrigger).trigger().isFinished() + && !subtTrigger.invokeShouldFire(context)) { return false; } } return true; } - /** - * Invokes {@link #onFire} for all subtriggers, eliding redundant calls to {@link #shouldFire} - * because they all must be ready to fire. - */ @Override - public void onOnlyFiring(TriggerContext context) throws Exception { + public void onFire(TriggerContext context) throws Exception { for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { subtrigger.invokeOnFire(context); } } + @Override public String toString() { StringBuilder builder = new StringBuilder("AfterAll.of("); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index 8d8d0de40bbe..5b9c00dc43ed 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -27,7 +27,6 @@ import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; -import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.state.CombiningState; @@ -50,7 +49,7 @@ // This class should be inlined to subclasses and deleted, simplifying them too // https://issues.apache.org/jira/browse/BEAM-1486 @Experimental(Experimental.Kind.TRIGGER) -public abstract class AfterDelayFromFirstElementStateMachine extends OnceTriggerStateMachine { +public abstract class AfterDelayFromFirstElementStateMachine extends TriggerStateMachine { protected static final List> IDENTITY = ImmutableList.>of(); @@ -237,7 +236,7 @@ && getCurrentTime(context) != null } @Override - protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { + public final void onFire(TriggerContext context) throws Exception { clear(context); } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java index 840a65cfdd04..49f4dfc22e9d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java @@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList; import java.util.Arrays; import java.util.List; -import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; /** @@ -31,7 +30,7 @@ * sub-triggers have fired. */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterFirstStateMachine extends OnceTriggerStateMachine { +public class AfterFirstStateMachine extends TriggerStateMachine { AfterFirstStateMachine(List subTriggers) { super(subTriggers); @@ -42,12 +41,12 @@ public class AfterFirstStateMachine extends OnceTriggerStateMachine { * Returns an {@code AfterFirst} {@code Trigger} with the given subtriggers. */ @SafeVarargs - public static OnceTriggerStateMachine of( + public static TriggerStateMachine of( TriggerStateMachine... triggers) { return new AfterFirstStateMachine(Arrays.asList(triggers)); } - public static OnceTriggerStateMachine of( + public static TriggerStateMachine of( Iterable triggers) { return new AfterFirstStateMachine(ImmutableList.copyOf(triggers)); } @@ -79,7 +78,7 @@ public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exc } @Override - protected void onOnlyFiring(TriggerContext context) throws Exception { + public void onFire(TriggerContext context) throws Exception { for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { TriggerContext subContext = context.forTrigger(subtrigger); if (subtrigger.invokeShouldFire(subContext)) { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java index b9fbac34d0fb..fdac911a9ac4 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java @@ -23,7 +23,6 @@ import org.apache.beam.runners.core.StateMerging; import org.apache.beam.runners.core.StateTag; import org.apache.beam.runners.core.StateTags; -import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.state.CombiningState; @@ -33,7 +32,7 @@ * {@link TriggerStateMachine}s that fire based on properties of the elements in the current pane. */ @Experimental(Experimental.Kind.TRIGGER) -public class AfterPaneStateMachine extends OnceTriggerStateMachine { +public class AfterPaneStateMachine extends TriggerStateMachine { private static final StateTag> ELEMENTS_IN_PANE_TAG = @@ -130,7 +129,7 @@ public int hashCode() { } @Override - protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { clear(context); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index c9eee15b857f..57dfdf4ae17e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableList; import java.util.Objects; import javax.annotation.Nullable; -import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.state.TimeDomain; @@ -242,7 +241,7 @@ private void onLateFiring(TriggerStateMachine.TriggerContext context) throws Exc /** * A watermark trigger targeted relative to the end of the window. */ - public static class FromEndOfWindow extends OnceTriggerStateMachine { + public static class FromEndOfWindow extends TriggerStateMachine { private FromEndOfWindow() { super(null); @@ -319,6 +318,6 @@ private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) { } @Override - protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) throws Exception { } + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { } } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java index f32c7a8d9d5d..f8c5e8ba5eb0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/NeverStateMachine.java @@ -17,7 +17,6 @@ */ package org.apache.beam.runners.core.triggers; -import org.apache.beam.runners.core.triggers.TriggerStateMachine.OnceTriggerStateMachine; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -27,7 +26,7 @@ *

Using this trigger will only produce output when the watermark passes the end of the * {@link BoundedWindow window} plus the allowed lateness. */ -public final class NeverStateMachine extends OnceTriggerStateMachine { +public final class NeverStateMachine extends TriggerStateMachine { /** * Returns a trigger which never fires. Output will be produced from the using {@link GroupByKey} * when the {@link BoundedWindow} closes. @@ -53,7 +52,7 @@ public boolean shouldFire(TriggerStateMachine.TriggerContext context) { } @Override - protected void onOnlyFiring(TriggerStateMachine.TriggerContext context) { + public void onFire(TriggerStateMachine.TriggerContext context) { throw new UnsupportedOperationException( String.format("%s should never fire", getClass().getSimpleName())); } From 3c92a1f27b9e93273a39e6100092129b0afc3f00 Mon Sep 17 00:00:00 2001 From: = <=> Date: Wed, 14 Jun 2017 21:02:07 -0400 Subject: [PATCH 2/3] set firing termination indicator where relevant --- .../runners/core/triggers/AfterAllStateMachine.java | 6 +++--- .../AfterDelayFromFirstElementStateMachine.java | 1 + .../runners/core/triggers/AfterFirstStateMachine.java | 11 ++++++----- .../runners/core/triggers/AfterPaneStateMachine.java | 1 + .../core/triggers/AfterWatermarkStateMachine.java | 4 +++- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java index cc201821e228..d4a8e9e8b5a9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterAllStateMachine.java @@ -88,9 +88,10 @@ public boolean shouldFire(TriggerContext context) throws Exception { @Override public void onFire(TriggerContext context) throws Exception { - for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { - subtrigger.invokeOnFire(context); + for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) { + subTrigger.invokeOnFire(context); } + context.trigger().setFinished(true); } @@ -99,7 +100,6 @@ public String toString() { StringBuilder builder = new StringBuilder("AfterAll.of("); Joiner.on(", ").appendTo(builder, subTriggers); builder.append(")"); - return builder.toString(); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java index 5b9c00dc43ed..06c2066a1c31 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterDelayFromFirstElementStateMachine.java @@ -238,6 +238,7 @@ && getCurrentTime(context) != null @Override public final void onFire(TriggerContext context) throws Exception { clear(context); + context.trigger().setFinished(true); } protected Instant computeTargetTimestamp(Instant time) { diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java index 49f4dfc22e9d..58c24c5e82a8 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterFirstStateMachine.java @@ -79,17 +79,18 @@ public boolean shouldFire(TriggerStateMachine.TriggerContext context) throws Exc @Override public void onFire(TriggerContext context) throws Exception { - for (ExecutableTriggerStateMachine subtrigger : context.trigger().subTriggers()) { - TriggerContext subContext = context.forTrigger(subtrigger); - if (subtrigger.invokeShouldFire(subContext)) { + for (ExecutableTriggerStateMachine subTrigger : context.trigger().subTriggers()) { + TriggerContext subContext = context.forTrigger(subTrigger); + if (subTrigger.invokeShouldFire(subContext)) { // If the trigger is ready to fire, then do whatever it needs to do. - subtrigger.invokeOnFire(subContext); + subTrigger.invokeOnFire(subContext); } else { // If the trigger is not ready to fire, it is nonetheless true that whatever // pending pane it was tracking is now gone. - subtrigger.invokeClear(subContext); + subTrigger.invokeClear(subContext); } } + context.trigger().setFinished(true); } @Override diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java index fdac911a9ac4..1ce035a7d6c0 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterPaneStateMachine.java @@ -131,5 +131,6 @@ public int hashCode() { @Override public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { clear(context); + context.trigger().setFinished(true); } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java index 57dfdf4ae17e..509c96b9995e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/AfterWatermarkStateMachine.java @@ -318,6 +318,8 @@ private boolean endOfWindowReached(TriggerStateMachine.TriggerContext context) { } @Override - public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { } + public void onFire(TriggerStateMachine.TriggerContext context) throws Exception { + context.trigger().setFinished(true); + } } } From 02088c85409388f857f173c3beb17bda7e0b74f3 Mon Sep 17 00:00:00 2001 From: = <=> Date: Tue, 20 Jun 2017 19:00:19 -0400 Subject: [PATCH 3/3] Mutability verification to be based on coder's structural value rather than encoded- decoded byte arrays --- .../reflect/ByteBuddyDoFnInvokerFactory.java | 1 + .../transforms/reflect/OnTimerInvokers.java | 1 + .../beam/sdk/util/MutationDetectors.java | 20 +++++++++++-------- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java index 5d5887a3c59b..c8d03524d9cc 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java @@ -202,6 +202,7 @@ public DoFnInvoker newByteBuddyInvoker( for (OnTimerMethod onTimerMethod : signature.onTimerMethods().values()) { invoker.addOnTimerInvoker( onTimerMethod.id(), OnTimerInvokers.forTimer(fn, onTimerMethod.id())); + } return invoker; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java index 287828ae92e7..19bcdcea1798 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerInvokers.java @@ -39,4 +39,5 @@ public static OnTimerInvoker forTimer( DoFn fn, String timerId) { return ByteBuddyOnTimerInvokerFactory.only().forTimer(fn, timerId); } + } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java index 3b593bf0e944..602f5775094e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MutationDetectors.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.util; -import java.util.Arrays; + import java.util.Objects; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -148,13 +148,17 @@ private void verifyUnmodifiedThrowingCheckedExceptions() throws CoderException { // encodedObject, because the Coder may treat it differently. // // For example, an unbounded Iterable will be encoded in an unbounded way, but decoded into an - // ArrayList, which will then be re-encoded in a bounded format. So we really do need to - // encode-decode-encode retainedObject. - if (Arrays.equals( - CoderUtils.encodeToByteArray(coder, clonedOriginalObject), - CoderUtils.encodeToByteArray(coder, clonedPossiblyModifiedObject))) { - return; - } + // ArrayList, which will then be re-encoded in a bounded format. So we get a structural value + // from a coder and used that to check if the byte array[] are the same. The structuralValue() + // method of the Coder returns a StructuralByteArray object. + // StructuralByteArray is a wrapper around a byte[] that uses structural, value-based equality + // rather than byte[]'s normal object identity. + + + if (coder.structuralValue(clonedOriginalObject) + .equals(coder.structuralValue(clonedPossiblyModifiedObject))){ + return; + } // If we got here, then they are not deepEquals() and do not have deepEquals() encodings. // Even if there is some conceptual sense in which the objects are equivalent, it has not