From 42abc9200a8eae6313e3edd3b7fc816f4b6f493a Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 28 Sep 2016 19:19:12 -0700 Subject: [PATCH 1/6] Dedups TestInMemoryStateInternals --- .../beam/runners/core/ReduceFnTester.java | 40 +----------- .../state/TestInMemoryStateInternals.java | 61 +++++++++++++++++++ .../apache/beam/sdk/util/TriggerTester.java | 43 +------------ 3 files changed, 64 insertions(+), 80 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 45062fbb8c82..2fba6cb70cfa 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -83,6 +83,7 @@ import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; @@ -440,45 +441,6 @@ public void fireTimer(W window, Instant timestamp, TimeDomain domain) throws Exc runner.persist(); } - /** - * Simulate state. - */ - private static class TestInMemoryStateInternals extends InMemoryStateInternals { - - public TestInMemoryStateInternals(K key) { - super(key); - } - - public Set> getTagsInUse(StateNamespace namespace) { - Set> inUse = new HashSet<>(); - for (Entry, State> entry : - inMemoryState.getTagsInUse(namespace).entrySet()) { - if (!isEmptyForTesting(entry.getValue())) { - inUse.add(entry.getKey()); - } - } - return inUse; - } - - public Set getNamespacesInUse() { - return inMemoryState.getNamespacesInUse(); - } - - /** Return the earliest output watermark hold in state, or null if none. */ - public Instant earliestWatermarkHold() { - Instant minimum = null; - for (State storage : inMemoryState.values()) { - if (storage instanceof WatermarkHoldState) { - Instant hold = ((WatermarkHoldState) storage).read(); - if (minimum == null || (hold != null && hold.isBefore(minimum))) { - minimum = hold; - } - } - } - return minimum; - } - } - /** * Convey the simulated state and implement {@link #outputWindowedValue} to capture all output * elements. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java new file mode 100644 index 000000000000..7b6ee6873306 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TestInMemoryStateInternals.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.joda.time.Instant; + +/** + * Simulates state like {@link InMemoryStateInternals} and provides some extra helper methods. + */ +public class TestInMemoryStateInternals extends InMemoryStateInternals { + public TestInMemoryStateInternals(K key) { + super(key); + } + + public Set> getTagsInUse(StateNamespace namespace) { + Set> inUse = new HashSet<>(); + for (Map.Entry, State> entry : + inMemoryState.getTagsInUse(namespace).entrySet()) { + if (!isEmptyForTesting(entry.getValue())) { + inUse.add(entry.getKey()); + } + } + return inUse; + } + + public Set getNamespacesInUse() { + return inMemoryState.getNamespacesInUse(); + } + + /** Return the earliest output watermark hold in state, or null if none. */ + public Instant earliestWatermarkHold() { + Instant minimum = null; + for (State storage : inMemoryState.values()) { + if (storage instanceof WatermarkHoldState) { + Instant hold = ((WatermarkHoldState) storage).read(); + if (minimum == null || (hold != null && hold.isBefore(minimum))) { + minimum = hold; + } + } + } + return minimum; + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index a1f1d21fc3dc..f6b0926fe573 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; import org.apache.beam.sdk.util.state.StateTag; +import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; @@ -94,7 +95,7 @@ public SimpleTriggerTester withAllowedLateness(Duration allowedLateness) thro protected final WindowingStrategy windowingStrategy; private final TestInMemoryStateInternals stateInternals = - new TestInMemoryStateInternals(); + new TestInMemoryStateInternals(null /* key */); private final TestTimerInternals timerInternals = new TestTimerInternals(); private final TriggerContextFactory contextFactory; private final WindowFn windowFn; @@ -351,46 +352,6 @@ private FinishedTriggers getFinishedSet(W window) { return finishedSet; } - /** - * Simulate state. - */ - private static class TestInMemoryStateInternals extends InMemoryStateInternals { - - public TestInMemoryStateInternals() { - super(null); - } - - public Set> getTagsInUse(StateNamespace namespace) { - Set> inUse = new HashSet<>(); - for (Map.Entry, State> entry : - inMemoryState.getTagsInUse(namespace).entrySet()) { - if (!isEmptyForTesting(entry.getValue())) { - inUse.add(entry.getKey()); - } - } - return inUse; - } - - public Set getNamespacesInUse() { - return inMemoryState.getNamespacesInUse(); - } - - /** Return the earliest output watermark hold in state, or null if none. */ - public Instant earliestWatermarkHold() { - Instant minimum = null; - for (State storage : inMemoryState.values()) { - if (storage instanceof WatermarkHoldState) { - @SuppressWarnings("unchecked") - Instant hold = ((WatermarkHoldState) storage).read(); - if (minimum == null || (hold != null && hold.isBefore(minimum))) { - minimum = hold; - } - } - } - return minimum; - } - } - private static class TestAssignContext extends WindowFn.AssignContext { private Object element; From f9279478e638e7708eece0bbd4b95901068d24fd Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 28 Sep 2016 19:21:55 -0700 Subject: [PATCH 2/6] Factors InMemoryTimerInternals out of ReduceFnTester --- .../beam/runners/core/ReduceFnRunner.java | 3 +- .../beam/runners/core/ReduceFnTester.java | 208 ++-------------- .../util/state/InMemoryTimerInternals.java | 234 ++++++++++++++++++ .../beam/sdk/util/state/TimerCallback.java | 28 +++ 4 files changed, 278 insertions(+), 195 deletions(-) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java index 96d764aa62ea..24d472bb06ef 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java @@ -59,6 +59,7 @@ import org.apache.beam.sdk.util.state.ReadableState; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; +import org.apache.beam.sdk.util.state.TimerCallback; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.joda.time.Duration; @@ -88,7 +89,7 @@ * @param The output type that will be produced for each key. * @param The type of windows this operates on. */ -public class ReduceFnRunner { +public class ReduceFnRunner implements TimerCallback { /** * The {@link ReduceFnRunner} depends on most aspects of the {@link WindowingStrategy}. diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index 2fba6cb70cfa..5752b113f3a0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -17,14 +17,11 @@ */ package org.apache.beam.runners.core; -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.common.base.Function; -import com.google.common.base.MoreObjects; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -39,11 +36,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.PriorityQueue; import java.util.Set; import javax.annotation.Nullable; - import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; import org.apache.beam.sdk.coders.IterableCoder; @@ -77,14 +71,13 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; -import org.apache.beam.sdk.util.state.WatermarkHoldState; +import org.apache.beam.sdk.util.state.TimerCallback; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -122,7 +115,7 @@ public class ReduceFnTester { * If false, the output watermark must be explicitly advanced by the test, which can * be used to exercise some of the more subtle behavior of WatermarkHold. */ - private boolean autoAdvanceOutputWatermark; + private boolean autoAdvanceOutputWatermark = true; private ExecutableTrigger executableTrigger; @@ -216,7 +209,6 @@ private ReduceFnTester(WindowingStrategy wildcardStrategy, this.windowFn = objectStrategy.getWindowFn(); this.windowingInternals = new TestWindowingInternals(sideInputReader); this.outputCoder = outputCoder; - this.autoAdvanceOutputWatermark = true; this.executableTrigger = wildcardStrategy.getTrigger(); this.options = options; } @@ -566,193 +558,21 @@ public long getSum() { } } - /** - * Simulate the firing of timers and progression of input and output watermarks for a - * single computation and key in a Windmill-like streaming environment. Similar to - * {@link BatchTimerInternals}, but also tracks the output watermark. - */ - private class TestTimerInternals implements TimerInternals { - /** At most one timer per timestamp is kept. */ - private Set existingTimers = new HashSet<>(); - - /** Pending input watermark timers, in timestamp order. */ - private PriorityQueue watermarkTimers = new PriorityQueue<>(11); - - /** Pending processing time timers, in timestamp order. */ - private PriorityQueue processingTimers = new PriorityQueue<>(11); - - /** Current input watermark. */ - @Nullable - private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current output watermark. */ - @Nullable - private Instant outputWatermarkTime = null; - - /** Current processing time. */ - private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current synchronized processing time. */ - @Nullable - private Instant synchronizedProcessingTime = null; - - @Nullable - public Instant getNextTimer(TimeDomain domain) { - TimerData data = null; - switch (domain) { - case EVENT_TIME: - data = watermarkTimers.peek(); - break; - case PROCESSING_TIME: - case SYNCHRONIZED_PROCESSING_TIME: - data = processingTimers.peek(); - break; - } - checkNotNull(data); // cases exhaustive - return data == null ? null : data.getTimestamp(); - } - - private PriorityQueue queue(TimeDomain domain) { - switch (domain) { - case EVENT_TIME: - return watermarkTimers; - case PROCESSING_TIME: - case SYNCHRONIZED_PROCESSING_TIME: - return processingTimers; - } - throw new RuntimeException(); // cases exhaustive - } - + private class TestTimerInternals extends InMemoryTimerInternals { @Override - public void setTimer(TimerData timer) { - WindowTracing.trace("TestTimerInternals.setTimer: {}", timer); - if (existingTimers.add(timer)) { - queue(timer.getDomain()).add(timer); - } - } - - @Override - public void deleteTimer(TimerData timer) { - WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer); - existingTimers.remove(timer); - queue(timer.getDomain()).remove(timer); - } - - @Override - public Instant currentProcessingTime() { - return processingTime; - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return synchronizedProcessingTime; - } - - @Override - public Instant currentInputWatermarkTime() { - return checkNotNull(inputWatermarkTime); - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - return outputWatermarkTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("watermarkTimers", watermarkTimers) - .add("processingTimers", processingTimers) - .add("inputWatermarkTime", inputWatermarkTime) - .add("outputWatermarkTime", outputWatermarkTime) - .add("processingTime", processingTime) - .toString(); - } - - public void advanceInputWatermark( - ReduceFnRunner runner, Instant newInputWatermark) throws Exception { - checkNotNull(newInputWatermark); - checkState( - !newInputWatermark.isBefore(inputWatermarkTime), - "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, - newInputWatermark); - WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", - inputWatermarkTime, newInputWatermark); - inputWatermarkTime = newInputWatermark; - advanceAndFire(runner, newInputWatermark, TimeDomain.EVENT_TIME); - - Instant hold = stateInternals.earliestWatermarkHold(); - if (hold == null) { - WindowTracing.trace("TestTimerInternals.advanceInputWatermark: no holds, " - + "so output watermark = input watermark"); - hold = inputWatermarkTime; - } + public void advanceInputWatermark(TimerCallback timerCallback, Instant newInputWatermark) + throws Exception { + super.advanceInputWatermark(timerCallback, newInputWatermark); if (autoAdvanceOutputWatermark) { - advanceOutputWatermark(hold); - } - } - - public void advanceOutputWatermark(Instant newOutputWatermark) { - checkNotNull(newOutputWatermark); - if (newOutputWatermark.isAfter(inputWatermarkTime)) { - WindowTracing.trace( - "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", - newOutputWatermark, inputWatermarkTime); - newOutputWatermark = inputWatermarkTime; - } - checkState( - outputWatermarkTime == null || !newOutputWatermark.isBefore(outputWatermarkTime), - "Cannot move output watermark time backwards from %s to %s", outputWatermarkTime, - newOutputWatermark); - WindowTracing.trace("TestTimerInternals.advanceOutputWatermark: from {} to {}", - outputWatermarkTime, newOutputWatermark); - outputWatermarkTime = newOutputWatermark; - } - - public void advanceProcessingTime( - ReduceFnRunner runner, Instant newProcessingTime) throws Exception { - checkState(!newProcessingTime.isBefore(processingTime), - "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime); - WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", processingTime, - newProcessingTime); - processingTime = newProcessingTime; - advanceAndFire(runner, newProcessingTime, TimeDomain.PROCESSING_TIME); - } - - public void advanceSynchronizedProcessingTime( - ReduceFnRunner runner, Instant newSynchronizedProcessingTime) throws Exception { - checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), - "Cannot move processing time backwards from %s to %s", processingTime, - newSynchronizedProcessingTime); - WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", - synchronizedProcessingTime, newSynchronizedProcessingTime); - synchronizedProcessingTime = newSynchronizedProcessingTime; - advanceAndFire( - runner, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); - } - - private void advanceAndFire( - ReduceFnRunner runner, Instant currentTime, TimeDomain domain) - throws Exception { - PriorityQueue queue = queue(domain); - boolean shouldFire = false; - - do { - TimerData timer = queue.peek(); - // Timers fire when the current time progresses past the timer time. - shouldFire = timer != null && currentTime.isAfter(timer.getTimestamp()); - if (shouldFire) { + Instant hold = stateInternals.earliestWatermarkHold(); + if (hold == null) { WindowTracing.trace( - "TestTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); - // Remove before firing, so that if the trigger adds another identical - // timer we don't remove it. - queue.remove(); - - runner.onTimer(timer); + "TestInMemoryTimerInternals.advanceInputWatermark: no holds, " + + "so output watermark = input watermark"); + hold = currentInputWatermarkTime(); } - } while (shouldFire); + advanceOutputWatermark(hold); + } } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java new file mode 100644 index 000000000000..0f642f6575f6 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.joda.time.Instant; + +/** + * Simulates the firing of timers and progression of input and output watermarks for a single + * computation and key in a Windmill-like streaming environment. + */ +public class InMemoryTimerInternals implements TimerInternals { + /** At most one timer per timestamp is kept. */ + private Set existingTimers = new HashSet<>(); + + /** Pending input watermark timers, in timestamp order. */ + private PriorityQueue watermarkTimers = new PriorityQueue<>(11); + + /** Pending processing time timers, in timestamp order. */ + private PriorityQueue processingTimers = new PriorityQueue<>(11); + + /** Current input watermark. */ + @Nullable private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current output watermark. */ + @Nullable private Instant outputWatermarkTime = null; + + /** Current processing time. */ + private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current synchronized processing time. */ + @Nullable private Instant synchronizedProcessingTime = null; + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { + return outputWatermarkTime; + } + + /** + * Returns when the next timer in the given time domain will fire. + * + * @return the {@link Instant} of the next timer in the given time domain, or {@code null} + * if there are no timers scheduled in that time domain. + */ + @Nullable + public Instant getNextTimer(TimeDomain domain) { + TimerData data = null; + switch (domain) { + case EVENT_TIME: + data = watermarkTimers.peek(); + break; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + data = processingTimers.peek(); + break; + default: + checkState(false, "Unexpected time domain: %s", domain); + } + return (data == null) ? null : data.getTimestamp(); + } + + private PriorityQueue queue(TimeDomain domain) { + switch (domain) { + case EVENT_TIME: + return watermarkTimers; + case PROCESSING_TIME: + case SYNCHRONIZED_PROCESSING_TIME: + return processingTimers; + } + throw new RuntimeException(); // cases exhaustive + } + + @Override + public void setTimer(TimerData timer) { + WindowTracing.trace("TestTimerInternals.setTimer: {}", timer); + if (existingTimers.add(timer)) { + queue(timer.getDomain()).add(timer); + } + } + + @Override + public void deleteTimer(TimerData timer) { + WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer); + existingTimers.remove(timer); + queue(timer.getDomain()).remove(timer); + } + + @Override + public Instant currentProcessingTime() { + return processingTime; + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return synchronizedProcessingTime; + } + + @Override + public Instant currentInputWatermarkTime() { + return checkNotNull(inputWatermarkTime); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("watermarkTimers", watermarkTimers) + .add("processingTimers", processingTimers) + .add("inputWatermarkTime", inputWatermarkTime) + .add("outputWatermarkTime", outputWatermarkTime) + .add("processingTime", processingTime) + .toString(); + } + + /** Advances input watermark to the given value and fires event-time timers accordingly. */ + public void advanceInputWatermark( + @Nullable TimerCallback timerCallback, Instant newInputWatermark) throws Exception { + checkNotNull(newInputWatermark); + checkState( + !newInputWatermark.isBefore(inputWatermarkTime), + "Cannot move input watermark time backwards from %s to %s", + inputWatermarkTime, + newInputWatermark); + WindowTracing.trace( + "TestTimerInternals.advanceInputWatermark: from {} to {}", + inputWatermarkTime, + newInputWatermark); + inputWatermarkTime = newInputWatermark; + advanceAndFire(timerCallback, newInputWatermark, TimeDomain.EVENT_TIME); + } + + /** Advances output watermark to the given value. */ + public void advanceOutputWatermark(Instant newOutputWatermark) { + checkNotNull(newOutputWatermark); + if (newOutputWatermark.isAfter(inputWatermarkTime)) { + WindowTracing.trace( + "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", + newOutputWatermark, + inputWatermarkTime); + newOutputWatermark = inputWatermarkTime; + } + checkState( + outputWatermarkTime == null || !newOutputWatermark.isBefore(outputWatermarkTime), + "Cannot move output watermark time backwards from %s to %s", + outputWatermarkTime, + newOutputWatermark); + WindowTracing.trace( + "TestTimerInternals.advanceOutputWatermark: from {} to {}", + outputWatermarkTime, + newOutputWatermark); + outputWatermarkTime = newOutputWatermark; + } + + /** Advances processing time to the given value and fires processing-time timers accordingly. */ + public void advanceProcessingTime( + @Nullable TimerCallback timerCallback, Instant newProcessingTime) throws Exception { + checkState( + !newProcessingTime.isBefore(processingTime), + "Cannot move processing time backwards from %s to %s", + processingTime, + newProcessingTime); + WindowTracing.trace( + "TestTimerInternals.advanceProcessingTime: from {} to {}", + processingTime, + newProcessingTime); + processingTime = newProcessingTime; + advanceAndFire(timerCallback, newProcessingTime, TimeDomain.PROCESSING_TIME); + } + + /** + * Advances synchronized processing time to the given value and fires processing-time timers + * accordingly. + */ + public void advanceSynchronizedProcessingTime( + @Nullable TimerCallback timerCallback, Instant newSynchronizedProcessingTime) + throws Exception { + checkState( + !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), + "Cannot move processing time backwards from %s to %s", + processingTime, + newSynchronizedProcessingTime); + WindowTracing.trace( + "TestTimerInternals.advanceProcessingTime: from {} to {}", + synchronizedProcessingTime, + newSynchronizedProcessingTime); + synchronizedProcessingTime = newSynchronizedProcessingTime; + advanceAndFire( + timerCallback, newSynchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + } + + private void advanceAndFire( + @Nullable TimerCallback timerCallback, Instant currentTime, TimeDomain domain) + throws Exception { + PriorityQueue queue = queue(domain); + while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { + TimerData timer = queue.peek(); + WindowTracing.trace( + "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); + // Remove before firing, so that if the callback adds another identical + // timer we don't remove it. + queue.remove(); + if (timerCallback != null) { + timerCallback.onTimer(timer); + } + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java new file mode 100644 index 000000000000..1436a2ec13ac --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import org.apache.beam.sdk.util.TimerInternals; + +/** + * A callback that processes a {@link TimerInternals.TimerData TimerData}. + */ +public interface TimerCallback { + /** Processes the {@link TimerInternals.TimerData TimerData}. */ + void onTimer(TimerInternals.TimerData timer) throws Exception; +} From 4caf3078e60028e83f2b04243c58326c987309d0 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Wed, 28 Sep 2016 19:24:46 -0700 Subject: [PATCH 3/6] Uses InMemoryTimerInternals instead of BatchTimerInternals and TriggerTester.TestTimerInternals --- .../runners/core/BatchTimerInternals.java | 140 --------------- ...GroupAlsoByWindowsViaOutputBufferDoFn.java | 6 +- .../apache/beam/sdk/util/TriggerTester.java | 160 +----------------- .../state/InMemoryTimerInternalsTest.java | 64 ++++--- 4 files changed, 43 insertions(+), 327 deletions(-) delete mode 100644 runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java rename runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java => sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java (60%) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java deleted file mode 100644 index 829dbde84a4c..000000000000 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/BatchTimerInternals.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static com.google.common.base.Preconditions.checkState; - -import com.google.common.base.MoreObjects; -import java.util.HashSet; -import java.util.PriorityQueue; -import java.util.Set; -import javax.annotation.Nullable; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.TimeDomain; -import org.apache.beam.sdk.util.TimerInternals; - -import org.joda.time.Instant; - -/** - * TimerInternals that uses priority queues to manage the timers that are ready to fire. - */ -public class BatchTimerInternals implements TimerInternals { - /** Set of timers that are scheduled used for deduplicating timers. */ - private Set existingTimers = new HashSet<>(); - - // Keep these queues separate so we can advance over them separately. - private PriorityQueue watermarkTimers = new PriorityQueue<>(11); - private PriorityQueue processingTimers = new PriorityQueue<>(11); - - private Instant inputWatermarkTime; - private Instant processingTime; - - private PriorityQueue queue(TimeDomain domain) { - return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers; - } - - public BatchTimerInternals(Instant processingTime) { - this.processingTime = processingTime; - this.inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - @Override - public void setTimer(TimerData timer) { - if (existingTimers.add(timer)) { - queue(timer.getDomain()).add(timer); - } - } - - @Override - public void deleteTimer(TimerData timer) { - existingTimers.remove(timer); - queue(timer.getDomain()).remove(timer); - } - - @Override - public Instant currentProcessingTime() { - return processingTime; - } - - /** - * {@inheritDoc} - * - * @return {@link BoundedWindow#TIMESTAMP_MAX_VALUE}: in batch mode, upstream processing - * is already complete. - */ - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return BoundedWindow.TIMESTAMP_MAX_VALUE; - } - - @Override - public Instant currentInputWatermarkTime() { - return inputWatermarkTime; - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - // The output watermark is always undefined in batch mode. - return null; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("watermarkTimers", watermarkTimers) - .add("processingTimers", processingTimers) - .toString(); - } - - public void advanceInputWatermark(ReduceFnRunner runner, Instant newInputWatermark) - throws Exception { - checkState(!newInputWatermark.isBefore(inputWatermarkTime), - "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, - newInputWatermark); - inputWatermarkTime = newInputWatermark; - advance(runner, newInputWatermark, TimeDomain.EVENT_TIME); - } - - public void advanceProcessingTime(ReduceFnRunner runner, Instant newProcessingTime) - throws Exception { - checkState(!newProcessingTime.isBefore(processingTime), - "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime); - processingTime = newProcessingTime; - advance(runner, newProcessingTime, TimeDomain.PROCESSING_TIME); - } - - private void advance(ReduceFnRunner runner, Instant newTime, TimeDomain domain) - throws Exception { - PriorityQueue timers = queue(domain); - boolean shouldFire = false; - - do { - TimerData timer = timers.peek(); - // Timers fire if the new time is ahead of the timer - shouldFire = timer != null && newTime.isAfter(timer.getTimestamp()); - if (shouldFire) { - // Remove before firing, so that if the trigger adds another identical - // timer we don't remove it. - timers.remove(); - runner.onTimer(timer); - } - } while (shouldFire); - } -} diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index 091ad33bd8de..e5532850a8b6 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; @@ -58,7 +59,10 @@ public void processElement( // Used with Batch, we know that all the data is available for this key. We can't use the // timer manager from the context because it doesn't exist. So we create one and emulate the // watermark, knowing that we have all data and it is in timestamp order. - BatchTimerInternals timerInternals = new BatchTimerInternals(Instant.now()); + InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); + timerInternals.advanceProcessingTime(null /* timerCallback */, Instant.now()); + timerInternals.advanceSynchronizedProcessingTime( + null /* timerCallback */, BoundedWindow.TIMESTAMP_MAX_VALUE); StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); ReduceFnRunner reduceFnRunner = diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index f6b0926fe573..47903515fe4c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -19,10 +19,8 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import static org.junit.Assert.assertTrue; -import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.util.ArrayList; @@ -32,7 +30,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.PriorityQueue; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -43,16 +40,13 @@ import org.apache.beam.sdk.util.ActiveWindowSet.MergeCallback; import org.apache.beam.sdk.util.TimerInternals.TimerData; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; -import org.apache.beam.sdk.util.state.StateTag; import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; -import org.apache.beam.sdk.util.state.WatermarkHoldState; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -96,7 +90,7 @@ public SimpleTriggerTester withAllowedLateness(Duration allowedLateness) thro private final TestInMemoryStateInternals stateInternals = new TestInMemoryStateInternals(null /* key */); - private final TestTimerInternals timerInternals = new TestTimerInternals(); + private final InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); private final TriggerContextFactory contextFactory; private final WindowFn windowFn; private final ActiveWindowSet activeWindows; @@ -201,22 +195,16 @@ private StateNamespace windowNamespace(W window) { } /** - * Advance the input watermark to the specified time, firing any timers that should - * fire. Then advance the output watermark as far as possible. + * Advance the input watermark to the specified time, then advance the output watermark as far as + * possible. */ public void advanceInputWatermark(Instant newInputWatermark) throws Exception { - timerInternals.advanceInputWatermark(newInputWatermark); + timerInternals.advanceInputWatermark(null /* timerCallback */, newInputWatermark); } - /** Advance the processing time to the specified time, firing any timers that should fire. */ + /** Advance the processing time to the specified time. */ public void advanceProcessingTime(Instant newProcessingTime) throws Exception { - timerInternals.advanceProcessingTime(newProcessingTime); - } - - /** Advance the processing time to the specified time, firing any timers that should fire. */ - public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime) - throws Exception { - timerInternals.advanceSynchronizedProcessingTime(newSynchronizedProcessingTime); + timerInternals.advanceProcessingTime(null /* timerCallback */, newProcessingTime); } /** @@ -382,140 +370,6 @@ public BoundedWindow window() { } } - /** - * Simulate the firing of timers and progression of input and output watermarks for a - * single computation and key in a Windmill-like streaming environment. Similar to - * {@link BatchTimerInternals}, but also tracks the output watermark. - */ - private class TestTimerInternals implements TimerInternals { - /** At most one timer per timestamp is kept. */ - private Set existingTimers = new HashSet<>(); - - /** Pending input watermark timers, in timestamp order. */ - private PriorityQueue watermarkTimers = new PriorityQueue<>(11); - - /** Pending processing time timers, in timestamp order. */ - private PriorityQueue processingTimers = new PriorityQueue<>(11); - - /** Current input watermark. */ - @Nullable - private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current output watermark. */ - @Nullable - private Instant outputWatermarkTime = null; - - /** Current processing time. */ - private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; - - /** Current processing time. */ - private Instant synchronizedProcessingTime = null; - - private PriorityQueue queue(TimeDomain domain) { - return TimeDomain.EVENT_TIME.equals(domain) ? watermarkTimers : processingTimers; - } - - @Override - public void setTimer(TimerData timer) { - WindowTracing.trace("TestTimerInternals.setTimer: {}", timer); - if (existingTimers.add(timer)) { - queue(timer.getDomain()).add(timer); - } - } - - @Override - public void deleteTimer(TimerData timer) { - WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer); - existingTimers.remove(timer); - queue(timer.getDomain()).remove(timer); - } - - @Override - public Instant currentProcessingTime() { - return processingTime; - } - - @Override - @Nullable - public Instant currentSynchronizedProcessingTime() { - return synchronizedProcessingTime; - } - - @Override - public Instant currentInputWatermarkTime() { - return checkNotNull(inputWatermarkTime); - } - - @Override - @Nullable - public Instant currentOutputWatermarkTime() { - return outputWatermarkTime; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(getClass()) - .add("watermarkTimers", watermarkTimers) - .add("processingTimers", processingTime) - .add("inputWatermarkTime", inputWatermarkTime) - .add("outputWatermarkTime", outputWatermarkTime) - .add("processingTime", processingTime) - .toString(); - } - - public void advanceInputWatermark(Instant newInputWatermark) throws Exception { - checkNotNull(newInputWatermark); - checkState(!newInputWatermark.isBefore(inputWatermarkTime), - "Cannot move input watermark time backwards from %s to %s", inputWatermarkTime, - newInputWatermark); - WindowTracing.trace("TestTimerInternals.advanceInputWatermark: from {} to {}", - inputWatermarkTime, newInputWatermark); - inputWatermarkTime = newInputWatermark; - - Instant hold = stateInternals.earliestWatermarkHold(); - if (hold == null) { - WindowTracing.trace("TestTimerInternals.advanceInputWatermark: no holds, " - + "so output watermark = input watermark"); - hold = inputWatermarkTime; - } - advanceOutputWatermark(hold); - } - - private void advanceOutputWatermark(Instant newOutputWatermark) throws Exception { - checkNotNull(newOutputWatermark); - if (newOutputWatermark.isAfter(inputWatermarkTime)) { - WindowTracing.trace( - "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", - newOutputWatermark, inputWatermarkTime); - newOutputWatermark = inputWatermarkTime; - } - checkState(outputWatermarkTime == null || !newOutputWatermark.isBefore(outputWatermarkTime), - "Cannot move output watermark time backwards from %s to %s", outputWatermarkTime, - newOutputWatermark); - WindowTracing.trace("TestTimerInternals.advanceOutputWatermark: from {} to {}", - outputWatermarkTime, newOutputWatermark); - outputWatermarkTime = newOutputWatermark; - } - - public void advanceProcessingTime(Instant newProcessingTime) throws Exception { - checkState(!newProcessingTime.isBefore(processingTime), - "Cannot move processing time backwards from %s to %s", processingTime, newProcessingTime); - WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", processingTime, - newProcessingTime); - processingTime = newProcessingTime; - } - - public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime) - throws Exception { - checkState(!newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), - "Cannot move processing time backwards from %s to %s", synchronizedProcessingTime, - newSynchronizedProcessingTime); - WindowTracing.trace("TestTimerInternals.advanceProcessingTime: from {} to {}", - synchronizedProcessingTime, newSynchronizedProcessingTime); - synchronizedProcessingTime = newSynchronizedProcessingTime; - } - } - private class TestTimers implements Timers { private final StateNamespace namespace; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java similarity index 60% rename from runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java index 122e60ce6131..951803a8f659 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/BatchTimerInternalsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java @@ -15,12 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.util.state; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaceForTest; import org.joda.time.Instant; import org.junit.Before; import org.junit.Test; @@ -31,15 +29,15 @@ import org.mockito.MockitoAnnotations; /** - * Tests for {@link BatchTimerInternals}. + * Tests for {@link InMemoryTimerInternals}. */ @RunWith(JUnit4.class) -public class BatchTimerInternalsTest { +public class InMemoryTimerInternalsTest { private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); @Mock - private ReduceFnRunner mockRunner; + private TimerCallback timerCallback; @Before public void setUp() { @@ -48,36 +46,36 @@ public void setUp() { @Test public void testFiringTimers() throws Exception { - BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); underTest.setTimer(processingTime1); underTest.setTimer(processingTime2); - underTest.advanceProcessingTime(mockRunner, new Instant(20)); - Mockito.verify(mockRunner).onTimer(processingTime1); - Mockito.verifyNoMoreInteractions(mockRunner); + underTest.advanceProcessingTime(timerCallback, new Instant(20)); + Mockito.verify(timerCallback).onTimer(processingTime1); + Mockito.verifyNoMoreInteractions(timerCallback); // Advancing just a little shouldn't refire - underTest.advanceProcessingTime(mockRunner, new Instant(21)); - Mockito.verifyNoMoreInteractions(mockRunner); + underTest.advanceProcessingTime(timerCallback, new Instant(21)); + Mockito.verifyNoMoreInteractions(timerCallback); // Adding the timer and advancing a little should refire underTest.setTimer(processingTime1); - Mockito.verify(mockRunner).onTimer(processingTime1); - underTest.advanceProcessingTime(mockRunner, new Instant(21)); - Mockito.verifyNoMoreInteractions(mockRunner); + Mockito.verify(timerCallback).onTimer(processingTime1); + underTest.advanceProcessingTime(timerCallback, new Instant(21)); + Mockito.verifyNoMoreInteractions(timerCallback); // And advancing the rest of the way should still have the other timer - underTest.advanceProcessingTime(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(processingTime2); - Mockito.verifyNoMoreInteractions(mockRunner); + underTest.advanceProcessingTime(timerCallback, new Instant(30)); + Mockito.verify(timerCallback).onTimer(processingTime2); + Mockito.verifyNoMoreInteractions(timerCallback); } @Test public void testTimerOrdering() throws Exception { - BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME); @@ -88,31 +86,31 @@ public void testTimerOrdering() throws Exception { underTest.setTimer(processingTime2); underTest.setTimer(watermarkTime2); - underTest.advanceInputWatermark(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(watermarkTime1); - Mockito.verify(mockRunner).onTimer(watermarkTime2); - Mockito.verifyNoMoreInteractions(mockRunner); + underTest.advanceInputWatermark(timerCallback, new Instant(30)); + Mockito.verify(timerCallback).onTimer(watermarkTime1); + Mockito.verify(timerCallback).onTimer(watermarkTime2); + Mockito.verifyNoMoreInteractions(timerCallback); - underTest.advanceProcessingTime(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(processingTime1); - Mockito.verify(mockRunner).onTimer(processingTime2); - Mockito.verifyNoMoreInteractions(mockRunner); + underTest.advanceProcessingTime(timerCallback, new Instant(30)); + Mockito.verify(timerCallback).onTimer(processingTime1); + Mockito.verify(timerCallback).onTimer(processingTime2); + Mockito.verifyNoMoreInteractions(timerCallback); } @Test public void testDeduplicate() throws Exception { - BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); + InMemoryTimerInternals underTest = new InMemoryTimerInternals(); TimerData watermarkTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); underTest.setTimer(watermarkTime); underTest.setTimer(watermarkTime); underTest.setTimer(processingTime); underTest.setTimer(processingTime); - underTest.advanceProcessingTime(mockRunner, new Instant(20)); - underTest.advanceInputWatermark(mockRunner, new Instant(20)); + underTest.advanceProcessingTime(timerCallback, new Instant(20)); + underTest.advanceInputWatermark(timerCallback, new Instant(20)); - Mockito.verify(mockRunner).onTimer(processingTime); - Mockito.verify(mockRunner).onTimer(watermarkTime); - Mockito.verifyNoMoreInteractions(mockRunner); + Mockito.verify(timerCallback).onTimer(processingTime); + Mockito.verify(timerCallback).onTimer(watermarkTime); + Mockito.verifyNoMoreInteractions(timerCallback); } } From 218b6d1423c7e08e7543c905f9ff13b0933018cd Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Thu, 29 Sep 2016 10:23:59 -0700 Subject: [PATCH 4/6] Addressed comments by tgroh --- ...GroupAlsoByWindowsViaOutputBufferDoFn.java | 5 ++- .../util/state/InMemoryTimerInternals.java | 41 ++++++++++--------- .../beam/sdk/util/state/TimerCallback.java | 7 ++++ .../apache/beam/sdk/util/TriggerTester.java | 5 ++- 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index e5532850a8b6..23986df43f26 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -27,6 +27,7 @@ import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; +import org.apache.beam.sdk.util.state.TimerCallback; import org.apache.beam.sdk.values.KV; import org.joda.time.Instant; @@ -60,9 +61,9 @@ public void processElement( // timer manager from the context because it doesn't exist. So we create one and emulate the // watermark, knowing that we have all data and it is in timestamp order. InMemoryTimerInternals timerInternals = new InMemoryTimerInternals(); - timerInternals.advanceProcessingTime(null /* timerCallback */, Instant.now()); + timerInternals.advanceProcessingTime(TimerCallback.NO_OP, Instant.now()); timerInternals.advanceSynchronizedProcessingTime( - null /* timerCallback */, BoundedWindow.TIMESTAMP_MAX_VALUE); + TimerCallback.NO_OP, BoundedWindow.TIMESTAMP_MAX_VALUE); StateInternals stateInternals = stateInternalsFactory.stateInternalsForKey(key); ReduceFnRunner reduceFnRunner = diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java index 0f642f6575f6..5230679d775f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -71,7 +71,7 @@ public Instant currentOutputWatermarkTime() { */ @Nullable public Instant getNextTimer(TimeDomain domain) { - TimerData data = null; + final TimerData data; switch (domain) { case EVENT_TIME: data = watermarkTimers.peek(); @@ -81,7 +81,7 @@ public Instant getNextTimer(TimeDomain domain) { data = processingTimers.peek(); break; default: - checkState(false, "Unexpected time domain: %s", domain); + throw new IllegalArgumentException("Unexpected time domain: " + domain); } return (data == null) ? null : data.getTimestamp(); } @@ -93,8 +93,9 @@ private PriorityQueue queue(TimeDomain domain) { case PROCESSING_TIME: case SYNCHRONIZED_PROCESSING_TIME: return processingTimers; + default: + throw new IllegalArgumentException("Unexpected time domain: " + domain); } - throw new RuntimeException(); // cases exhaustive } @Override @@ -141,7 +142,7 @@ public String toString() { /** Advances input watermark to the given value and fires event-time timers accordingly. */ public void advanceInputWatermark( - @Nullable TimerCallback timerCallback, Instant newInputWatermark) throws Exception { + TimerCallback timerCallback, Instant newInputWatermark) throws Exception { checkNotNull(newInputWatermark); checkState( !newInputWatermark.isBefore(inputWatermarkTime), @@ -159,28 +160,32 @@ public void advanceInputWatermark( /** Advances output watermark to the given value. */ public void advanceOutputWatermark(Instant newOutputWatermark) { checkNotNull(newOutputWatermark); + final Instant adjustedOutputWatermark; if (newOutputWatermark.isAfter(inputWatermarkTime)) { WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: clipping output watermark from {} to {}", newOutputWatermark, inputWatermarkTime); - newOutputWatermark = inputWatermarkTime; + adjustedOutputWatermark = inputWatermarkTime; + } else { + adjustedOutputWatermark = newOutputWatermark; } + checkState( - outputWatermarkTime == null || !newOutputWatermark.isBefore(outputWatermarkTime), + outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime), "Cannot move output watermark time backwards from %s to %s", outputWatermarkTime, - newOutputWatermark); + adjustedOutputWatermark); WindowTracing.trace( "TestTimerInternals.advanceOutputWatermark: from {} to {}", outputWatermarkTime, - newOutputWatermark); - outputWatermarkTime = newOutputWatermark; + adjustedOutputWatermark); + outputWatermarkTime = adjustedOutputWatermark; } /** Advances processing time to the given value and fires processing-time timers accordingly. */ public void advanceProcessingTime( - @Nullable TimerCallback timerCallback, Instant newProcessingTime) throws Exception { + TimerCallback timerCallback, Instant newProcessingTime) throws Exception { checkState( !newProcessingTime.isBefore(processingTime), "Cannot move processing time backwards from %s to %s", @@ -199,7 +204,7 @@ public void advanceProcessingTime( * accordingly. */ public void advanceSynchronizedProcessingTime( - @Nullable TimerCallback timerCallback, Instant newSynchronizedProcessingTime) + TimerCallback timerCallback, Instant newSynchronizedProcessingTime) throws Exception { checkState( !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), @@ -216,19 +221,17 @@ public void advanceSynchronizedProcessingTime( } private void advanceAndFire( - @Nullable TimerCallback timerCallback, Instant currentTime, TimeDomain domain) + TimerCallback timerCallback, Instant currentTime, TimeDomain domain) throws Exception { + checkNotNull(timerCallback); PriorityQueue queue = queue(domain); while (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { - TimerData timer = queue.peek(); - WindowTracing.trace( - "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); // Remove before firing, so that if the callback adds another identical // timer we don't remove it. - queue.remove(); - if (timerCallback != null) { - timerCallback.onTimer(timer); - } + TimerData timer = queue.remove(); + WindowTracing.trace( + "InMemoryTimerInternals.advanceAndFire: firing {} at {}", timer, currentTime); + timerCallback.onTimer(timer); } } } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java index 1436a2ec13ac..6598e300f256 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/TimerCallback.java @@ -25,4 +25,11 @@ public interface TimerCallback { /** Processes the {@link TimerInternals.TimerData TimerData}. */ void onTimer(TimerInternals.TimerData timer) throws Exception; + + TimerCallback NO_OP = new TimerCallback() { + @Override + public void onTimer(TimerInternals.TimerData timer) throws Exception { + // Nothing + } + }; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index 47903515fe4c..5ad0ce78b4d5 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -47,6 +47,7 @@ import org.apache.beam.sdk.util.state.StateNamespaces.WindowAndTriggerNamespace; import org.apache.beam.sdk.util.state.StateNamespaces.WindowNamespace; import org.apache.beam.sdk.util.state.TestInMemoryStateInternals; +import org.apache.beam.sdk.util.state.TimerCallback; import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Duration; import org.joda.time.Instant; @@ -199,12 +200,12 @@ private StateNamespace windowNamespace(W window) { * possible. */ public void advanceInputWatermark(Instant newInputWatermark) throws Exception { - timerInternals.advanceInputWatermark(null /* timerCallback */, newInputWatermark); + timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark); } /** Advance the processing time to the specified time. */ public void advanceProcessingTime(Instant newProcessingTime) throws Exception { - timerInternals.advanceProcessingTime(null /* timerCallback */, newProcessingTime); + timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime); } /** From c03696ac7c3a7ab3f12c21e0ce76f1502eba2fe2 Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Thu, 29 Sep 2016 10:50:20 -0700 Subject: [PATCH 5/6] Added JIRA links to no-op callbacks in TriggerTester --- .../src/test/java/org/apache/beam/sdk/util/TriggerTester.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java index 5ad0ce78b4d5..5fe17addad7a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TriggerTester.java @@ -200,11 +200,13 @@ private StateNamespace windowNamespace(W window) { * possible. */ public void advanceInputWatermark(Instant newInputWatermark) throws Exception { + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 timerInternals.advanceInputWatermark(TimerCallback.NO_OP, newInputWatermark); } /** Advance the processing time to the specified time. */ public void advanceProcessingTime(Instant newProcessingTime) throws Exception { + // TODO: Should test timer firings: see https://issues.apache.org/jira/browse/BEAM-694 timerInternals.advanceProcessingTime(TimerCallback.NO_OP, newProcessingTime); } From 0fedd29fee2d52db9a3e2f61fee45ed2a7c043cb Mon Sep 17 00:00:00 2001 From: Eugene Kirpichov Date: Fri, 30 Sep 2016 09:59:49 -0700 Subject: [PATCH 6/6] Javadoc fix --- .../apache/beam/sdk/util/state/InMemoryTimerInternals.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java index 5230679d775f..dcab5feefa58 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -64,9 +64,7 @@ public Instant currentOutputWatermarkTime() { } /** - * Returns when the next timer in the given time domain will fire. - * - * @return the {@link Instant} of the next timer in the given time domain, or {@code null} + * Returns when the next timer in the given time domain will fire, or {@code null} * if there are no timers scheduled in that time domain. */ @Nullable