From e1f7a5b0c448516e7c42bfc4938755a43e3bcae0 Mon Sep 17 00:00:00 2001 From: Konstantin Knauf Date: Tue, 5 Apr 2016 19:59:19 +0200 Subject: [PATCH 1/2] Timer coalescing across keys and cleanup of unused trigger tasks Per timestamp only one TriggerTask is registered at the runtime context. When the first timer is registered a new TriggerTask is sheduled. When no timer is registered anymore for a specific timestamp the corresponding trigger task is canceled and hence removed. The ScheduledFutures to cancel trigger tasks are not checkpointed. So cleanup of trigger tasks will not work after a failure. --- .../kafka/testutils/MockRuntimeContext.java | 5 +- .../api/operators/AbstractStreamOperator.java | 5 +- .../operators/StreamingRuntimeContext.java | 5 +- .../operators/windowing/WindowOperator.java | 94 ++++++++++++++----- .../streaming/runtime/tasks/StreamTask.java | 46 ++++----- .../windowing/WindowOperatorTest.java | 75 +++++++++++++++ 6 files changed, 180 insertions(+), 50 deletions(-) diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java index 3e46503e6fdcf..1ac2ef53637c2 100644 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java +++ b/flink-streaming-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/MockRuntimeContext.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @SuppressWarnings("deprecation") @@ -187,14 +188,14 @@ public ReducingState getReducingState(ReducingStateDescriptor statePro } @Override - public void registerTimer(final long time, final Triggerable target) { + public ScheduledFuture registerTimer(final long time, final Triggerable target) { if (timer == null) { timer = Executors.newSingleThreadScheduledExecutor(); } final long delay = Math.max(time - System.currentTimeMillis(), 0); - timer.schedule(new Runnable() { + return timer.schedule(new Runnable() { @Override public void run() { synchronized (checkpointLock) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a5d0ace936c01..9673f874e80f0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.concurrent.ScheduledFuture; /** * Base class for all stream operators. Operators that contain a user function should extend the class @@ -246,8 +247,8 @@ public AbstractStateBackend getStateBackend() { * @param time The absolute time in milliseconds. * @param target The target to be triggered. */ - protected void registerTimer(long time, Triggerable target) { - container.registerTimer(time, target); + protected ScheduledFuture registerTimer(long time, Triggerable target) { + return container.registerTimer(time, target); } /** diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java index bd391cd84fe32..4500ee73a1161 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamingRuntimeContext.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Map; +import java.util.concurrent.ScheduledFuture; import static java.util.Objects.requireNonNull; @@ -88,8 +89,8 @@ public InputSplitProvider getInputSplitProvider() { * @param time The absolute time in milliseconds. * @param target The target to be triggered. */ - public void registerTimer(long time, Triggerable target) { - operator.registerTimer(time, target); + public ScheduledFuture registerTimer(long time, Triggerable target) { + return operator.registerTimer(time, target); } // ------------------------------------------------------------------------ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 919cee7465a93..35bdf5bf2ab1d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -19,6 +19,8 @@ package org.apache.flink.streaming.runtime.operators.windowing; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.HashMultiset; +import com.google.common.collect.Multiset; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.AppendingState; @@ -39,6 +41,7 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; @@ -65,6 +68,7 @@ import java.util.Map; import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.ScheduledFuture; import static java.util.Objects.requireNonNull; @@ -134,6 +138,8 @@ public class WindowOperator */ protected transient TimestampedCollector timestampedCollector; + protected transient Map> processingTimeTimerFutures; + /** * To keep track of the current watermark so that we can immediately fire if a trigger * registers an event time callback for a timestamp that lies in the past. @@ -149,15 +155,15 @@ public class WindowOperator /** * Processing time timers that are currently in-flight. */ - protected transient Set> processingTimeTimers; protected transient PriorityQueue> processingTimeTimersQueue; + protected transient Set> processingTimeTimers; + protected transient Multiset processingTimeTimerTimestamps; /** * Current waiting watermark callbacks. */ protected transient Set> watermarkTimers; protected transient PriorityQueue> watermarkTimersQueue; - protected transient Map> mergingWindowsByKey; /** @@ -213,9 +219,13 @@ public final void open() throws Exception { } if (processingTimeTimers == null) { processingTimeTimers = new HashSet<>(); + processingTimeTimerTimestamps = HashMultiset.create(); processingTimeTimersQueue = new PriorityQueue<>(100); } + //ScheduledFutures are not checkpointed + processingTimeTimerFutures = new HashMap<>(); + context = new Context(null, null); if (windowAssigner instanceof MergingWindowAssigner) { @@ -424,13 +434,17 @@ private void processTriggersFor(Watermark mark) throws Exception { public final void trigger(long time) throws Exception { boolean fire; + //Remove information about the triggering task + processingTimeTimerFutures.remove(time); + processingTimeTimerTimestamps.remove(time, processingTimeTimerTimestamps.count(time)); + do { Timer timer = processingTimeTimersQueue.peek(); if (timer != null && timer.timestamp <= time) { fire = true; - processingTimeTimers.remove(timer); processingTimeTimersQueue.remove(); + processingTimeTimers.remove(timer); context.key = timer.key; context.window = timer.window; @@ -525,9 +539,14 @@ public S getPartitionedState(StateDescriptor stateDescri @Override public void registerProcessingTimeTimer(long time) { Timer timer = new Timer<>(time, key, window); + // make sure we only put one timer per key into the queue if (processingTimeTimers.add(timer)) { processingTimeTimersQueue.add(timer); - getRuntimeContext().registerTimer(time, WindowOperator.this); + //If this is the first timer added for this timestamp register a TriggerTask + if (processingTimeTimerTimestamps.add(time, 1) == 0) { + ScheduledFuture scheduledFuture= getRuntimeContext().registerTimer(time, WindowOperator.this); + processingTimeTimerFutures.put(time, scheduledFuture); + } } } @@ -542,14 +561,24 @@ public void registerEventTimeTimer(long time) { // immediately schedule a trigger, so that we don't wait for the next // watermark update to fire the watermark trigger getRuntimeContext().registerTimer(System.currentTimeMillis(), WindowOperator.this); + //No need to put it in processingTimeTimerFutures as this timer is never removed } } @Override public void deleteProcessingTimeTimer(long time) { Timer timer = new Timer<>(time, key, window); - if (processingTimeTimers.remove(timer)) { - processingTimeTimersQueue.remove(timer); + + processingTimeTimers.remove(timer); + processingTimeTimersQueue.remove(timer); + + //If there are no timers left for this timestamp, remove it from queue and cancel TriggerTask + if (processingTimeTimerTimestamps.remove(time,1) == 1) { + ScheduledFuture triggerTaskFuture = processingTimeTimerFutures.get(timer); + if (triggerTaskFuture != null && !triggerTaskFuture.isDone()) { + triggerTaskFuture.cancel(false); + processingTimeTimerFutures.remove(triggerTaskFuture); + } } } @@ -592,6 +621,7 @@ public String toString() { } } + /** * Internal class for keeping track of in-flight timers. */ @@ -670,19 +700,7 @@ public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) AbstractStateBackend.CheckpointStateOutputView out = getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); - out.writeInt(watermarkTimersQueue.size()); - for (Timer timer : watermarkTimersQueue) { - keySerializer.serialize(timer.key, out); - windowSerializer.serialize(timer.window, out); - out.writeLong(timer.timestamp); - } - - out.writeInt(processingTimeTimers.size()); - for (Timer timer : processingTimeTimersQueue) { - keySerializer.serialize(timer.key, out); - windowSerializer.serialize(timer.window, out); - out.writeLong(timer.timestamp); - } + snapshotStateTo(out); taskState.setOperatorState(out.closeAndGetHandle()); @@ -699,6 +717,10 @@ public void restoreState(StreamTaskState taskState, long recoveryTimestamp) thro StateHandle inputState = (StateHandle) taskState.getOperatorState(); DataInputView in = inputState.getState(userClassloader); + restoreStateFrom(in); + } + + private void restoreStateFrom(DataInputView in ) throws IOException { int numWatermarkTimers = in.readInt(); watermarkTimers = new HashSet<>(numWatermarkTimers); watermarkTimersQueue = new PriorityQueue<>(Math.max(numWatermarkTimers, 1)); @@ -712,15 +734,45 @@ public void restoreState(StreamTaskState taskState, long recoveryTimestamp) thro } int numProcessingTimeTimers = in.readInt(); - processingTimeTimers = new HashSet<>(numProcessingTimeTimers); processingTimeTimersQueue = new PriorityQueue<>(Math.max(numProcessingTimeTimers, 1)); + processingTimeTimers = new HashSet<>(); for (int i = 0; i < numProcessingTimeTimers; i++) { K key = keySerializer.deserialize(in); W window = windowSerializer.deserialize(in); long timestamp = in.readLong(); Timer timer = new Timer<>(timestamp, key, window); - processingTimeTimers.add(timer); processingTimeTimersQueue.add(timer); + processingTimeTimers.add(timer); + } + + int numProcessingTimeTimerTimestamp = in.readInt(); + processingTimeTimerTimestamps = HashMultiset.create(); + for (int i = 0; i< numProcessingTimeTimerTimestamp; i++) { + long timestamp = in.readLong(); + int count = in.readInt(); + processingTimeTimerTimestamps.add(timestamp, count); + } + } + + private void snapshotStateTo(DataOutputView out) throws IOException { + out.writeInt(watermarkTimersQueue.size()); + for (Timer timer : watermarkTimersQueue) { + keySerializer.serialize(timer.key, out); + windowSerializer.serialize(timer.window, out); + out.writeLong(timer.timestamp); + } + + out.writeInt(processingTimeTimers.size()); + for (Timer timer : processingTimeTimers) { + keySerializer.serialize(timer.key, out); + windowSerializer.serialize(timer.window, out); + out.writeLong(timer.timestamp); + } + + out.writeInt(processingTimeTimerTimestamps.entrySet().size()); + for (Multiset.Entry timerTimestampCounts: processingTimeTimerTimestamps.entrySet()) { + out.writeLong(timerTimestampCounts.getElement()); + out.writeInt(timerTimestampCounts.getCount()); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 7b154cd923d71..51904b384ad8b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -17,16 +17,6 @@ package org.apache.flink.streaming.runtime.tasks; -import java.io.Serializable; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.api.common.typeutils.TypeSerializer; @@ -41,23 +31,32 @@ import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; import org.apache.flink.runtime.state.AsynchronousStateHandle; import org.apache.flink.runtime.state.KvStateSnapshot; +import org.apache.flink.runtime.state.StateBackendFactory; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.runtime.state.StateBackendFactory; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.runtime.io.RecordWriterOutput; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Serializable; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + /** * Base class for all streaming tasks. A task is the unit of local processing that is deployed * and executed by the TaskManagers. Each task runs one or more {@link StreamOperator}s which form @@ -129,7 +128,7 @@ public abstract class StreamTask> private ClassLoader userClassLoader; /** The executor service that schedules and calls the triggers of this task*/ - private ScheduledExecutorService timerService; + private ScheduledThreadPoolExecutor timerService; /** The map of user-defined accumulators of this task */ private Map> accumulatorMap; @@ -191,8 +190,9 @@ public final void invoke() throws Exception { headOperator.setup(this, configuration, operatorChain.getChainEntryPoint()); } - timerService = Executors.newSingleThreadScheduledExecutor( - new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName())); + timerService =new ScheduledThreadPoolExecutor(1, new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName())); + // allow trigger tasks to be removed if all timers for that timestamp are removed by user + timerService.setRemoveOnCancelPolicy(true); // task specific initialization init(); @@ -663,13 +663,13 @@ public AbstractStateBackend createStateBackend(String operatorIdentifier, TypeSe /** * Registers a timer. */ - public void registerTimer(final long timestamp, final Triggerable target) { + public ScheduledFuture registerTimer(final long timestamp, final Triggerable target) { long delay = Math.max(timestamp - System.currentTimeMillis(), 0); - timerService.schedule( - new TriggerTask(this, lock, target, timestamp), - delay, - TimeUnit.MILLISECONDS); + return timerService.schedule( + new TriggerTask(this, lock, target, timestamp), + delay, + TimeUnit.MILLISECONDS); } /** diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 233131d256c21..94b0e3c983f0f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -28,6 +28,8 @@ import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.LocalStreamEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -58,6 +60,8 @@ import org.junit.Assert; import org.junit.Test; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -842,6 +846,77 @@ public void testCountTrigger() throws Exception { testHarness.close(); } + @Test + public void testRestoreAndSnapshotAreInSync() throws Exception { + + final int WINDOW_SIZE = 3; + final int WINDOW_SLIDE = 1; + + TypeInformation> inputType = TypeInfoParser.parse("Tuple2"); + + ReducingStateDescriptor> stateDesc = new ReducingStateDescriptor<>("window-contents", + new SumReducer(), + inputType.createSerializer(new ExecutionConfig())); + + WindowOperator, Tuple2, Tuple2, TimeWindow> operator = new WindowOperator<>( + SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), + EventTimeTrigger.create()); + + + OneInputStreamOperatorTestHarness, Tuple2> testHarness = + new OneInputStreamOperatorTestHarness<>(operator); + + testHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + + operator.setInputType(inputType, new ExecutionConfig()); + testHarness.open(); + + WindowOperator.Timer timer1 = new WindowOperator.Timer<>(1L, "key1", new TimeWindow(1L, 2L)); + WindowOperator.Timer timer2 = new WindowOperator.Timer<>(3L, "key1", new TimeWindow(1L, 2L)); + WindowOperator.Timer timer3 = new WindowOperator.Timer<>(2L, "key1", new TimeWindow(1L, 2L)); + operator.processingTimeTimers.add(timer1); + operator.processingTimeTimers.add(timer2); + operator.processingTimeTimers.add(timer3); + operator.processingTimeTimersQueue.add(timer1); + operator.processingTimeTimersQueue.add(timer2); + operator.processingTimeTimersQueue.add(timer3); + + operator.processingTimeTimerTimestamps.add(1L, 10); + operator.processingTimeTimerTimestamps.add(2L, 5); + operator.processingTimeTimerTimestamps.add(3L, 1); + + + StreamTaskState snapshot = testHarness.snapshot(0, 0); + + WindowOperator, Tuple2, Tuple2, TimeWindow> otherOperator = new WindowOperator<>( + SlidingEventTimeWindows.of(Time.of(WINDOW_SIZE, TimeUnit.SECONDS), Time.of(WINDOW_SLIDE, TimeUnit.SECONDS)), + new TimeWindow.Serializer(), + new TupleKeySelector(), + BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(new PassThroughWindowFunction>()), + EventTimeTrigger.create()); + + OneInputStreamOperatorTestHarness, Tuple2> otherTestHarness = + new OneInputStreamOperatorTestHarness<>(otherOperator); + + otherTestHarness.configureForKeyedStream(new TupleKeySelector(), BasicTypeInfo.STRING_TYPE_INFO); + otherOperator.setInputType(inputType, new ExecutionConfig()); + + otherTestHarness.setup(); + otherTestHarness.restore(snapshot, 0); + otherTestHarness.open(); + + Assert.assertEquals(operator.processingTimeTimers, otherOperator.processingTimeTimers); + Assert.assertArrayEquals(operator.processingTimeTimersQueue.toArray(), otherOperator.processingTimeTimersQueue.toArray()); + Assert.assertEquals(operator.processingTimeTimerTimestamps, otherOperator.processingTimeTimerTimestamps); + } + // ------------------------------------------------------------------------ // UDFs // ------------------------------------------------------------------------ From 9d8683e78efc142a293dbb9ee698be15b2bf3034 Mon Sep 17 00:00:00 2001 From: Konstantin Knauf Date: Fri, 29 Apr 2016 19:09:08 +0200 Subject: [PATCH 2/2] Fixed deleteProcessingTimeTimer --- .../streaming/runtime/operators/windowing/WindowOperator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 35bdf5bf2ab1d..a9b211fa3a08f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -574,10 +574,9 @@ public void deleteProcessingTimeTimer(long time) { //If there are no timers left for this timestamp, remove it from queue and cancel TriggerTask if (processingTimeTimerTimestamps.remove(time,1) == 1) { - ScheduledFuture triggerTaskFuture = processingTimeTimerFutures.get(timer); + ScheduledFuture triggerTaskFuture = processingTimeTimerFutures.remove(timer.timestamp); if (triggerTaskFuture != null && !triggerTaskFuture.isDone()) { triggerTaskFuture.cancel(false); - processingTimeTimerFutures.remove(triggerTaskFuture); } } }