From 445c120510948fb23e6d35b502da1e5a4f0ffdfb Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 15 Dec 2016 20:45:56 -0800 Subject: [PATCH 1/2] Move InMemoryTimerInternals to runners-core --- ...GroupAlsoByWindowsViaOutputBufferDoFn.java | 1 - .../runners/core}/InMemoryTimerInternals.java | 8 ++--- .../core}/InMemoryTimerInternalsTest.java | 4 ++- .../beam/runners/core/ReduceFnTester.java | 1 - .../runners/core/SplittableParDoTest.java | 16 +++++++-- .../triggers/TriggerStateMachineTester.java | 2 +- .../translation/SparkGroupAlsoByWindowFn.java | 2 +- .../beam/sdk/transforms/DoFnTester.java | 36 ------------------- 8 files changed, 21 insertions(+), 49 deletions(-) rename {sdks/java/core/src/main/java/org/apache/beam/sdk/util/state => runners/core-java/src/main/java/org/apache/beam/runners/core}/InMemoryTimerInternals.java (97%) rename {sdks/java/core/src/test/java/org/apache/beam/sdk/util/state => runners/core-java/src/test/java/org/apache/beam/runners/core}/InMemoryTimerInternalsTest.java (97%) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java index 918919170ec6..efcd771d01b9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsViaOutputBufferDoFn.java @@ -27,7 +27,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.joda.time.Instant; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java similarity index 97% rename from sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java rename to runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java index 44b44f06ead6..5fcd088c9400 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.util.state; +package org.apache.beam.runners.core; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; @@ -29,12 +29,10 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.state.StateNamespace; import org.joda.time.Instant; -/** - * Simulates the firing of timers and progression of input and output watermarks for a single - * computation and key in a Windmill-like streaming environment. - */ +/** {@link TimerInternals} with all watermarks and processing clock simulated in-memory. */ public class InMemoryTimerInternals implements TimerInternals { /** At most one timer per timestamp is kept. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java similarity index 97% rename from sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java rename to runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java index 4a2763ccc76e..2caa8742dea1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/InMemoryTimerInternalsTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryTimerInternalsTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.sdk.util.state; +package org.apache.beam.runners.core; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; @@ -25,6 +25,8 @@ import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.state.StateNamespace; +import org.apache.beam.sdk.util.state.StateNamespaceForTest; import org.joda.time.Instant; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java index db0cf9186a7d..890195a0d3c1 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java @@ -73,7 +73,6 @@ import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; import org.apache.beam.sdk.util.state.StateTag; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 0f0b106d2ebd..74a566b35613 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue; import java.io.Serializable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; @@ -196,6 +197,8 @@ private static class ProcessFnTester< tester; private Instant currentProcessingTime; + private InMemoryTimerInternals timerInternals; + ProcessFnTester( Instant currentProcessingTime, DoFn fn, @@ -206,6 +209,7 @@ private static class ProcessFnTester< new SplittableParDo.ProcessFn<>( fn, inputCoder, restrictionCoder, IntervalWindow.getCoder()); this.tester = DoFnTester.of(processFn); + this.timerInternals = new InMemoryTimerInternals(); processFn.setStateInternalsFactory( new StateInternalsFactory() { @Override @@ -217,7 +221,7 @@ public StateInternals stateInternalsForKey(String key) { new TimerInternalsFactory() { @Override public TimerInternals timerInternalsForKey(String key) { - return tester.getTimerInternals(); + return timerInternals; } }); processFn.setOutputWindowedValue( @@ -253,7 +257,7 @@ public void sideOutputWindowedValue( // through the state/timer/output callbacks. this.tester.setCloningBehavior(DoFnTester.CloningBehavior.DO_NOT_CLONE); this.tester.startBundle(); - this.tester.advanceProcessingTime(currentProcessingTime); + timerInternals.advanceProcessingTime(currentProcessingTime); this.currentProcessingTime = currentProcessingTime; } @@ -291,7 +295,13 @@ void startElement( */ boolean advanceProcessingTimeBy(Duration duration) throws Exception { currentProcessingTime = currentProcessingTime.plus(duration); - List timers = tester.advanceProcessingTime(currentProcessingTime); + timerInternals.advanceProcessingTime(currentProcessingTime); + + List timers = new ArrayList<>(); + TimerInternals.TimerData nextTimer; + while ((nextTimer = timerInternals.removeNextProcessingTimer()) != null) { + timers.add(nextTimer); + } if (timers.isEmpty()) { return false; } diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java index be63c0644e27..2a626d4311d5 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineTester.java @@ -34,6 +34,7 @@ import javax.annotation.Nullable; import org.apache.beam.runners.core.ActiveWindowSet; import org.apache.beam.runners.core.ActiveWindowSet.MergeCallback; +import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.MergingActiveWindowSet; import org.apache.beam.runners.core.NonMergingActiveWindowSet; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -46,7 +47,6 @@ import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateNamespace; import org.apache.beam.sdk.util.state.StateNamespaces; diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java index 87d3f5059039..5432d58994af 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowFn.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.List; import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; +import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; @@ -37,7 +38,6 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 93b3f5954898..2d8684aaaff7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -46,12 +46,10 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.Timer; -import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.InMemoryStateInternals; -import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -143,10 +141,6 @@ public StateInternals getStateInternals() { return (StateInternals) stateInternals; } - public TimerInternals getTimerInternals() { - return timerInternals; - } - /** * When a {@link DoFnTester} should clone the {@link DoFn} under test and how it should manage * the lifecycle of the {@link DoFn}. @@ -233,7 +227,6 @@ public void startBundle() throws Exception { context.setupDelegateAggregators(); // State and timer internals are per-bundle. stateInternals = InMemoryStateInternals.forKey(new Object()); - timerInternals = new InMemoryTimerInternals(); try { fnInvoker.invokeStartBundle(context); } catch (UserCodeException e) { @@ -542,34 +535,6 @@ public AggregateT getAggregatorValue(Aggregator agg) return extractAggregatorValue(agg.getName(), agg.getCombineFn()); } - public List advanceInputWatermark(Instant newWatermark) { - try { - timerInternals.advanceInputWatermark(newWatermark); - final List firedTimers = new ArrayList<>(); - TimerInternals.TimerData timer; - while ((timer = timerInternals.removeNextEventTimer()) != null) { - firedTimers.add(timer); - } - return firedTimers; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public List advanceProcessingTime(Instant newProcessingTime) { - try { - timerInternals.advanceProcessingTime(newProcessingTime); - final List firedTimers = new ArrayList<>(); - TimerInternals.TimerData timer; - while ((timer = timerInternals.removeNextProcessingTimer()) != null) { - firedTimers.add(timer); - } - return firedTimers; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - private AggregateT extractAggregatorValue( String name, CombineFn combiner) { @SuppressWarnings("unchecked") @@ -814,7 +779,6 @@ private enum State { private Map, List>> outputs; private InMemoryStateInternals stateInternals; - private InMemoryTimerInternals timerInternals; /** The state of processing of the {@link DoFn} under test. */ private State state = State.UNINITIALIZED; From 69d2c47b6a476099535e9cefe62d4cce5ccafbc1 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Fri, 16 Dec 2016 20:22:59 -0800 Subject: [PATCH 2/2] Restore SDK's InMemoryTimerInternals, deprecated --- .../util/state/InMemoryTimerInternals.java | 275 ++++++++++++++++++ 1 file changed, 275 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java new file mode 100644 index 000000000000..a910d647760f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util.state; + +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.base.MoreObjects; +import java.util.HashSet; +import java.util.PriorityQueue; +import java.util.Set; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.WindowTracing; +import org.joda.time.Instant; + +/** + * @deprecated use {@code org.apache.beam.runners.core.InMemoryTimerInternals}. + */ +@Deprecated +public class InMemoryTimerInternals implements TimerInternals { + + /** At most one timer per timestamp is kept. */ + private Set existingTimers = new HashSet<>(); + + /** Pending input watermark timers, in timestamp order. */ + private PriorityQueue watermarkTimers = new PriorityQueue<>(11); + + /** Pending processing time timers, in timestamp order. */ + private PriorityQueue processingTimers = new PriorityQueue<>(11); + + /** Pending synchronized processing time timers, in timestamp order. */ + private PriorityQueue synchronizedProcessingTimers = new PriorityQueue<>(11); + + /** Current input watermark. */ + private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current output watermark. */ + @Nullable private Instant outputWatermarkTime = null; + + /** Current processing time. */ + private Instant processingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + /** Current synchronized processing time. */ + private Instant synchronizedProcessingTime = BoundedWindow.TIMESTAMP_MIN_VALUE; + + @Override + @Nullable + public Instant currentOutputWatermarkTime() { + return outputWatermarkTime; + } + + /** + * Returns when the next timer in the given time domain will fire, or {@code null} + * if there are no timers scheduled in that time domain. + */ + @Nullable + public Instant getNextTimer(TimeDomain domain) { + final TimerData data; + switch (domain) { + case EVENT_TIME: + data = watermarkTimers.peek(); + break; + case PROCESSING_TIME: + data = processingTimers.peek(); + break; + case SYNCHRONIZED_PROCESSING_TIME: + data = synchronizedProcessingTimers.peek(); + break; + default: + throw new IllegalArgumentException("Unexpected time domain: " + domain); + } + return (data == null) ? null : data.getTimestamp(); + } + + private PriorityQueue queue(TimeDomain domain) { + switch (domain) { + case EVENT_TIME: + return watermarkTimers; + case PROCESSING_TIME: + return processingTimers; + case SYNCHRONIZED_PROCESSING_TIME: + return synchronizedProcessingTimers; + default: + throw new IllegalArgumentException("Unexpected time domain: " + domain); + } + } + + @Override + public void setTimer(StateNamespace namespace, String timerId, Instant target, + TimeDomain timeDomain) { + throw new UnsupportedOperationException("Setting a timer by ID is not yet supported."); + } + + @Override + public void setTimer(TimerData timerData) { + WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData); + if (existingTimers.add(timerData)) { + queue(timerData.getDomain()).add(timerData); + } + } + + @Override + public void deleteTimer(StateNamespace namespace, String timerId) { + throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported."); + } + + @Override + public void deleteTimer(TimerData timer) { + WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer); + existingTimers.remove(timer); + queue(timer.getDomain()).remove(timer); + } + + @Override + public Instant currentProcessingTime() { + return processingTime; + } + + @Override + @Nullable + public Instant currentSynchronizedProcessingTime() { + return synchronizedProcessingTime; + } + + @Override + public Instant currentInputWatermarkTime() { + return inputWatermarkTime; + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(getClass()) + .add("watermarkTimers", watermarkTimers) + .add("processingTimers", processingTimers) + .add("synchronizedProcessingTimers", synchronizedProcessingTimers) + .add("inputWatermarkTime", inputWatermarkTime) + .add("outputWatermarkTime", outputWatermarkTime) + .add("processingTime", processingTime) + .toString(); + } + + /** Advances input watermark to the given value. */ + public void advanceInputWatermark(Instant newInputWatermark) throws Exception { + checkNotNull(newInputWatermark); + checkState( + !newInputWatermark.isBefore(inputWatermarkTime), + "Cannot move input watermark time backwards from %s to %s", + inputWatermarkTime, + newInputWatermark); + WindowTracing.trace( + "{}.advanceInputWatermark: from {} to {}", + getClass().getSimpleName(), inputWatermarkTime, newInputWatermark); + inputWatermarkTime = newInputWatermark; + } + + /** Advances output watermark to the given value. */ + public void advanceOutputWatermark(Instant newOutputWatermark) { + checkNotNull(newOutputWatermark); + final Instant adjustedOutputWatermark; + if (newOutputWatermark.isAfter(inputWatermarkTime)) { + WindowTracing.trace( + "{}.advanceOutputWatermark: clipping output watermark from {} to {}", + getClass().getSimpleName(), newOutputWatermark, inputWatermarkTime); + adjustedOutputWatermark = inputWatermarkTime; + } else { + adjustedOutputWatermark = newOutputWatermark; + } + + checkState( + outputWatermarkTime == null || !adjustedOutputWatermark.isBefore(outputWatermarkTime), + "Cannot move output watermark time backwards from %s to %s", + outputWatermarkTime, + adjustedOutputWatermark); + WindowTracing.trace( + "{}.advanceOutputWatermark: from {} to {}", + getClass().getSimpleName(), outputWatermarkTime, adjustedOutputWatermark); + outputWatermarkTime = adjustedOutputWatermark; + } + + /** Advances processing time to the given value. */ + public void advanceProcessingTime(Instant newProcessingTime) throws Exception { + checkNotNull(newProcessingTime); + checkState( + !newProcessingTime.isBefore(processingTime), + "Cannot move processing time backwards from %s to %s", + processingTime, + newProcessingTime); + WindowTracing.trace( + "{}.advanceProcessingTime: from {} to {}", + getClass().getSimpleName(), processingTime, newProcessingTime); + processingTime = newProcessingTime; + } + + /** Advances synchronized processing time to the given value. */ + public void advanceSynchronizedProcessingTime(Instant newSynchronizedProcessingTime) + throws Exception { + checkNotNull(newSynchronizedProcessingTime); + checkState( + !newSynchronizedProcessingTime.isBefore(synchronizedProcessingTime), + "Cannot move processing time backwards from %s to %s", + synchronizedProcessingTime, + newSynchronizedProcessingTime); + WindowTracing.trace( + "{}.advanceProcessingTime: from {} to {}", + getClass().getSimpleName(), synchronizedProcessingTime, newSynchronizedProcessingTime); + synchronizedProcessingTime = newSynchronizedProcessingTime; + } + + /** Returns the next eligible event time timer, if none returns null. */ + @Nullable + public TimerData removeNextEventTimer() { + TimerData timer = removeNextTimer(inputWatermarkTime, TimeDomain.EVENT_TIME); + if (timer != null) { + WindowTracing.trace( + "{}.removeNextEventTimer: firing {} at {}", + getClass().getSimpleName(), timer, inputWatermarkTime); + } + return timer; + } + + /** Returns the next eligible processing time timer, if none returns null. */ + @Nullable + public TimerData removeNextProcessingTimer() { + TimerData timer = removeNextTimer(processingTime, TimeDomain.PROCESSING_TIME); + if (timer != null) { + WindowTracing.trace( + "{}.removeNextProcessingTimer: firing {} at {}", + getClass().getSimpleName(), timer, processingTime); + } + return timer; + } + + /** Returns the next eligible synchronized processing time timer, if none returns null. */ + @Nullable + public TimerData removeNextSynchronizedProcessingTimer() { + TimerData timer = removeNextTimer( + synchronizedProcessingTime, TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + if (timer != null) { + WindowTracing.trace( + "{}.removeNextSynchronizedProcessingTimer: firing {} at {}", + getClass().getSimpleName(), timer, synchronizedProcessingTime); + } + return timer; + } + + @Nullable + private TimerData removeNextTimer(Instant currentTime, TimeDomain domain) { + PriorityQueue queue = queue(domain); + if (!queue.isEmpty() && currentTime.isAfter(queue.peek().getTimestamp())) { + TimerData timer = queue.remove(); + existingTimers.remove(timer); + return timer; + } else { + return null; + } + } +}