From 3436339fe0aa409905dc34f43bdcc4ffc5d623cb Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Tue, 8 May 2018 16:00:55 +0200 Subject: [PATCH 1/2] [FLINK-9423][state] Implement efficient deletes for heap-based timer service. --- .../operators/HeapInternalTimerService.java | 193 ++------- .../operators/InternalTimeServiceManager.java | 6 +- .../api/operators/InternalTimer.java | 182 +------- .../api/operators/InternalTimerHeap.java | 405 ++++++++++++++++++ ...nternalTimerServiceSerializationProxy.java | 6 +- .../InternalTimersSnapshotReaderWriters.java | 9 +- .../api/operators/TimerHeapInternalTimer.java | 254 +++++++++++ .../HeapInternalTimerServiceTest.java | 84 ++-- .../api/operators/InternalTimerHeapTest.java | 301 +++++++++++++ 9 files changed, 1067 insertions(+), 373 deletions(-) create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java create mode 100644 flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java create mode 100644 flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java index fb380758b59eb..c5a68fb912a4a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/HeapInternalTimerService.java @@ -22,15 +22,12 @@ import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.util.Preconditions; -import java.io.IOException; -import java.util.HashSet; -import java.util.PriorityQueue; +import java.util.List; import java.util.Set; import java.util.concurrent.ScheduledFuture; @@ -49,20 +46,18 @@ public class HeapInternalTimerService implements InternalTimerService, /** * Processing time timers that are currently in-flight. */ - private final Set>[] processingTimeTimersByKeyGroup; - private final PriorityQueue> processingTimeTimersQueue; + private final InternalTimerHeap processingTimeTimersQueue; /** * Event time timers that are currently in-flight. */ - private final Set>[] eventTimeTimersByKeyGroup; - private final PriorityQueue> eventTimeTimersQueue; + private final InternalTimerHeap eventTimeTimersQueue; /** * Information concerning the local key-group range. */ - private final KeyGroupsList localKeyGroupRange; - private final int totalKeyGroups; + private final KeyGroupRange localKeyGroupRange; + private final int localKeyGroupRangeStartIdx; /** @@ -94,16 +89,14 @@ public class HeapInternalTimerService implements InternalTimerService, /** The restored timers snapshot, if any. */ private InternalTimersSnapshot restoredTimersSnapshot; - public HeapInternalTimerService( + HeapInternalTimerService( int totalKeyGroups, - KeyGroupsList localKeyGroupRange, + KeyGroupRange localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService) { this.keyContext = checkNotNull(keyContext); this.processingTimeService = checkNotNull(processingTimeService); - - this.totalKeyGroups = totalKeyGroups; this.localKeyGroupRange = checkNotNull(localKeyGroupRange); // find the starting index of the local key-group range @@ -113,14 +106,8 @@ public HeapInternalTimerService( } this.localKeyGroupRangeStartIdx = startIdx; - // the list of ids of the key-groups this task is responsible for - int localKeyGroups = this.localKeyGroupRange.getNumberOfKeyGroups(); - - this.eventTimeTimersQueue = new PriorityQueue<>(100); - this.eventTimeTimersByKeyGroup = new HashSet[localKeyGroups]; - - this.processingTimeTimersQueue = new PriorityQueue<>(100); - this.processingTimeTimersByKeyGroup = new HashSet[localKeyGroups]; + this.eventTimeTimersQueue = new InternalTimerHeap<>(128, localKeyGroupRange, totalKeyGroups); + this.processingTimeTimersQueue = new InternalTimerHeap<>(128, localKeyGroupRange, totalKeyGroups); } /** @@ -175,8 +162,9 @@ public void startTimerService( this.triggerTarget = Preconditions.checkNotNull(triggerTarget); // re-register the restored timers (if any) - if (processingTimeTimersQueue.size() > 0) { - nextTimer = processingTimeService.registerTimer(processingTimeTimersQueue.peek().getTimestamp(), this); + final InternalTimer headTimer = processingTimeTimersQueue.peek(); + if (headTimer != null) { + nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this); } this.isInitialized = true; } else { @@ -199,17 +187,9 @@ public long currentWatermark() { @Override public void registerProcessingTimeTimer(N namespace, long time) { - InternalTimer timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - - // make sure we only put one timer per key into the queue - Set> timerSet = getProcessingTimeTimerSetForTimer(timer); - if (timerSet.add(timer)) { - - InternalTimer oldHead = processingTimeTimersQueue.peek(); + InternalTimer oldHead = processingTimeTimersQueue.peek(); + if (processingTimeTimersQueue.scheduleTimer(time, (K) keyContext.getCurrentKey(), namespace)) { long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; - - processingTimeTimersQueue.add(timer); - // check if we need to re-schedule our timer to earlier if (time < nextTriggerTime) { if (nextTimer != null) { @@ -222,29 +202,17 @@ public void registerProcessingTimeTimer(N namespace, long time) { @Override public void registerEventTimeTimer(N namespace, long time) { - InternalTimer timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - Set> timerSet = getEventTimeTimerSetForTimer(timer); - if (timerSet.add(timer)) { - eventTimeTimersQueue.add(timer); - } + eventTimeTimersQueue.scheduleTimer(time, (K) keyContext.getCurrentKey(), namespace); } @Override public void deleteProcessingTimeTimer(N namespace, long time) { - InternalTimer timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - Set> timerSet = getProcessingTimeTimerSetForTimer(timer); - if (timerSet.remove(timer)) { - processingTimeTimersQueue.remove(timer); - } + processingTimeTimersQueue.stopTimer(time, (K) keyContext.getCurrentKey(), namespace); } @Override public void deleteEventTimeTimer(N namespace, long time) { - InternalTimer timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); - Set> timerSet = getEventTimeTimerSetForTimer(timer); - if (timerSet.remove(timer)) { - eventTimeTimersQueue.remove(timer); - } + eventTimeTimersQueue.stopTimer(time, (K) keyContext.getCurrentKey(), namespace); } @Override @@ -256,12 +224,7 @@ public void onProcessingTime(long time) throws Exception { InternalTimer timer; while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { - - Set> timerSet = getProcessingTimeTimerSetForTimer(timer); - - timerSet.remove(timer); - processingTimeTimersQueue.remove(); - + processingTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onProcessingTime(timer); } @@ -279,11 +242,7 @@ public void advanceWatermark(long time) throws Exception { InternalTimer timer; while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { - - Set> timerSet = getEventTimeTimerSetForTimer(timer); - timerSet.remove(timer); - eventTimeTimersQueue.remove(); - + eventTimeTimersQueue.poll(); keyContext.setCurrentKey(timer.getKey()); triggerTarget.onEventTime(timer); } @@ -301,114 +260,37 @@ public InternalTimersSnapshot snapshotTimersForKeyGroup(int keyGroupIdx) { keySerializer.snapshotConfiguration(), namespaceSerializer, namespaceSerializer.snapshotConfiguration(), - getEventTimeTimerSetForKeyGroup(keyGroupIdx), - getProcessingTimeTimerSetForKeyGroup(keyGroupIdx)); + eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx), + processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx)); } /** * Restore the timers (both processing and event time ones) for a given {@code keyGroupIdx}. * - * @param restoredTimersSnapshot the restored snapshot containing the key-group's timers, + * @param restoredSnapshot the restored snapshot containing the key-group's timers, * and the serializers that were used to write them * @param keyGroupIdx the id of the key-group to be put in the snapshot. */ @SuppressWarnings("unchecked") - public void restoreTimersForKeyGroup(InternalTimersSnapshot restoredTimersSnapshot, int keyGroupIdx) throws IOException { - this.restoredTimersSnapshot = (InternalTimersSnapshot) restoredTimersSnapshot; - - if ((this.keyDeserializer != null && !this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer())) || - (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer()))) { + public void restoreTimersForKeyGroup(InternalTimersSnapshot restoredSnapshot, int keyGroupIdx) { + this.restoredTimersSnapshot = (InternalTimersSnapshot) restoredSnapshot; + if (areSnapshotSerializersIncompatible(restoredSnapshot)) { throw new IllegalArgumentException("Tried to restore timers " + "for the same service with different serializers."); } - this.keyDeserializer = this.restoredTimersSnapshot.getKeySerializer(); - this.namespaceDeserializer = this.restoredTimersSnapshot.getNamespaceSerializer(); + this.keyDeserializer = restoredTimersSnapshot.getKeySerializer(); + this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer(); checkArgument(localKeyGroupRange.contains(keyGroupIdx), "Key Group " + keyGroupIdx + " does not belong to the local range."); // restore the event time timers - Set> eventTimers = getEventTimeTimerSetForKeyGroup(keyGroupIdx); - eventTimers.addAll(this.restoredTimersSnapshot.getEventTimeTimers()); - eventTimeTimersQueue.addAll(this.restoredTimersSnapshot.getEventTimeTimers()); + eventTimeTimersQueue.bulkAddRestoredTimers(restoredTimersSnapshot.getEventTimeTimers()); // restore the processing time timers - Set> processingTimers = getProcessingTimeTimerSetForKeyGroup(keyGroupIdx); - processingTimers.addAll(this.restoredTimersSnapshot.getProcessingTimeTimers()); - processingTimeTimersQueue.addAll(this.restoredTimersSnapshot.getProcessingTimeTimers()); - } - - /** - * Retrieve the set of event time timers for the key-group this timer belongs to. - * - * @param timer the timer whose key-group we are searching. - * @return the set of registered timers for the key-group. - */ - private Set> getEventTimeTimerSetForTimer(InternalTimer timer) { - checkArgument(localKeyGroupRange != null, "The operator has not been initialized."); - int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), this.totalKeyGroups); - return getEventTimeTimerSetForKeyGroup(keyGroupIdx); - } - - /** - * Retrieve the set of event time timers for the requested key-group. - * - * @param keyGroupIdx the index of the key group we are interested in. - * @return the set of registered timers for the key-group. - */ - private Set> getEventTimeTimerSetForKeyGroup(int keyGroupIdx) { - int localIdx = getIndexForKeyGroup(keyGroupIdx); - Set> timers = eventTimeTimersByKeyGroup[localIdx]; - if (timers == null) { - timers = new HashSet<>(); - eventTimeTimersByKeyGroup[localIdx] = timers; - } - return timers; - } - - /** - * Retrieve the set of processing time timers for the key-group this timer belongs to. - * - * @param timer the timer whose key-group we are searching. - * @return the set of registered timers for the key-group. - */ - private Set> getProcessingTimeTimerSetForTimer(InternalTimer timer) { - checkArgument(localKeyGroupRange != null, "The operator has not been initialized."); - int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), this.totalKeyGroups); - return getProcessingTimeTimerSetForKeyGroup(keyGroupIdx); - } - - /** - * Retrieve the set of processing time timers for the requested key-group. - * - * @param keyGroupIdx the index of the key group we are interested in. - * @return the set of registered timers for the key-group. - */ - private Set> getProcessingTimeTimerSetForKeyGroup(int keyGroupIdx) { - int localIdx = getIndexForKeyGroup(keyGroupIdx); - Set> timers = processingTimeTimersByKeyGroup[localIdx]; - if (timers == null) { - timers = new HashSet<>(); - processingTimeTimersByKeyGroup[localIdx] = timers; - } - return timers; - } - - /** - * Computes the index of the requested key-group in the local datastructures. - *
  • - * Currently we assume that each task is assigned a continuous range of key-groups, - * e.g. 1,2,3,4, and not 1,3,5. We leverage this to keep the different states by - * key-grouped in arrays instead of maps, where the offset for each key-group is - * the key-group id (an int) minus the id of the first key-group in the local range. - * This is for performance reasons. - */ - private int getIndexForKeyGroup(int keyGroupIdx) { - checkArgument(localKeyGroupRange.contains(keyGroupIdx), - "Key Group " + keyGroupIdx + " does not belong to the local range."); - return keyGroupIdx - this.localKeyGroupRangeStartIdx; + processingTimeTimersQueue.bulkAddRestoredTimers(restoredTimersSnapshot.getProcessingTimeTimers()); } public int numProcessingTimeTimers() { @@ -440,17 +322,22 @@ public int numEventTimeTimers(N namespace) { } @VisibleForTesting - public int getLocalKeyGroupRangeStartIdx() { + int getLocalKeyGroupRangeStartIdx() { return this.localKeyGroupRangeStartIdx; } @VisibleForTesting - public Set>[] getEventTimeTimersPerKeyGroup() { - return this.eventTimeTimersByKeyGroup; + List>> getEventTimeTimersPerKeyGroup() { + return eventTimeTimersQueue.getTimersByKeyGroup(); } @VisibleForTesting - public Set>[] getProcessingTimeTimersPerKeyGroup() { - return this.processingTimeTimersByKeyGroup; + List>> getProcessingTimeTimersPerKeyGroup() { + return processingTimeTimersQueue.getTimersByKeyGroup(); + } + + private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot restoredSnapshot) { + return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) || + (this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer())); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index 84c7bf1455886..0f193ecded01e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -22,7 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; @@ -47,7 +47,7 @@ public class InternalTimeServiceManager { private final int totalKeyGroups; - private final KeyGroupsList localKeyGroupRange; + private final KeyGroupRange localKeyGroupRange; private final KeyContext keyContext; private final ProcessingTimeService processingTimeService; @@ -56,7 +56,7 @@ public class InternalTimeServiceManager { InternalTimeServiceManager( int totalKeyGroups, - KeyGroupsList localKeyGroupRange, + KeyGroupRange localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java index 4b8657431da0e..5ba1a0facb084 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimer.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -19,184 +19,28 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -import java.io.IOException; /** - * Internal class for keeping track of in-flight timers. + * Internal interface for in-flight timers. * * @param Type of the keys to which timers are scoped. * @param Type of the namespace to which timers are scoped. */ @Internal -public class InternalTimer implements Comparable> { - private final long timestamp; - private final K key; - private final N namespace; - - public InternalTimer(long timestamp, K key, N namespace) { - this.timestamp = timestamp; - this.key = key; - this.namespace = namespace; - } - - public long getTimestamp() { - return timestamp; - } - - public K getKey() { - return key; - } - - public N getNamespace() { - return namespace; - } - - @Override - public int compareTo(InternalTimer o) { - return Long.compare(this.timestamp, o.timestamp); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()){ - return false; - } - - InternalTimer timer = (InternalTimer) o; - - return timestamp == timer.timestamp - && key.equals(timer.key) - && namespace.equals(timer.namespace); - - } - - @Override - public int hashCode() { - int result = (int) (timestamp ^ (timestamp >>> 32)); - result = 31 * result + key.hashCode(); - result = 31 * result + namespace.hashCode(); - return result; - } - - @Override - public String toString() { - return "Timer{" + - "timestamp=" + timestamp + - ", key=" + key + - ", namespace=" + namespace + - '}'; - } +public interface InternalTimer { /** - * A {@link TypeSerializer} used to serialize/deserialize a {@link InternalTimer}. + * Returns the timestamp of the timer. This value determines the point in time when the timer will fire. */ - public static class TimerSerializer extends TypeSerializer> { - - private static final long serialVersionUID = 1119562170939152304L; - - private final TypeSerializer keySerializer; - - private final TypeSerializer namespaceSerializer; - - TimerSerializer(TypeSerializer keySerializer, TypeSerializer namespaceSerializer) { - this.keySerializer = keySerializer; - this.namespaceSerializer = namespaceSerializer; - } - - @Override - public boolean isImmutableType() { - return false; - } + long getTimestamp(); - @Override - public TypeSerializer> duplicate() { - return this; - } - - @Override - public InternalTimer createInstance() { - return null; - } - - @Override - public InternalTimer copy(InternalTimer from) { - return new InternalTimer<>(from.timestamp, from.key, from.namespace); - } - - @Override - public InternalTimer copy(InternalTimer from, InternalTimer reuse) { - return copy(from); - } - - @Override - public int getLength() { - // we do not have fixed length - return -1; - } - - @Override - public void serialize(InternalTimer record, DataOutputView target) throws IOException { - keySerializer.serialize(record.key, target); - namespaceSerializer.serialize(record.namespace, target); - LongSerializer.INSTANCE.serialize(record.timestamp, target); - } - - @Override - public InternalTimer deserialize(DataInputView source) throws IOException { - K key = keySerializer.deserialize(source); - N namespace = namespaceSerializer.deserialize(source); - Long timestamp = LongSerializer.INSTANCE.deserialize(source); - return new InternalTimer<>(timestamp, key, namespace); - } - - @Override - public InternalTimer deserialize(InternalTimer reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - keySerializer.copy(source, target); - namespaceSerializer.copy(source, target); - LongSerializer.INSTANCE.copy(source, target); - } - - @Override - public boolean equals(Object obj) { - return obj == this || - (obj != null && obj.getClass() == getClass() && - keySerializer.equals(((TimerSerializer) obj).keySerializer) && - namespaceSerializer.equals(((TimerSerializer) obj).namespaceSerializer)); - } - - @Override - public boolean canEqual(Object obj) { - return true; - } - - @Override - public int hashCode() { - return getClass().hashCode(); - } - - @Override - public TypeSerializerConfigSnapshot snapshotConfiguration() { - throw new UnsupportedOperationException("This serializer is not registered for managed state."); - } + /** + * Returns the key that is bound to this timer. + */ + K getKey(); - @Override - public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - throw new UnsupportedOperationException("This serializer is not registered for managed state."); - } - } + /** + * Returns the namespace that is bound to this timer. + */ + N getNamespace(); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java new file mode 100644 index 0000000000000..94b3572e0e999 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerHeap.java @@ -0,0 +1,405 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; + +import javax.annotation.Nonnegative; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkArgument; + +/** + * A heap-based priority queue for internal timers. This heap is supported by hash sets for fast contains + * (de-duplication) and deletes. The heap implementation is a simple binary tree stored inside an array. Element indexes + * in the heap array start at 1 instead of 0 to make array index computations a bit simpler in the hot methods. + * + *

    Possible future improvements: + *

      + *
    • We could also implement shrinking for the heap and the deduplication maps.
    • + *
    • We could replace the deduplication maps with more efficient custom implementations. In particular, a hash set + * would be enough if it could return existing elements on unsuccessful adding, etc..
    • + *
    + * + * @param type of the key of the internal timers managed by this priority queue. + * @param type of the namespace of the internal timers managed by this priority queue. + */ +public class InternalTimerHeap implements Iterable> {//implements Queue>, Set> { + + /** + * A safe maximum size for arrays in the JVM. + */ + private static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8; + + /** + * Comparator for {@link TimerHeapInternalTimer}, based on the timestamp in ascending order. + */ + private static final Comparator> COMPARATOR = + (o1, o2) -> Long.compare(o1.getTimestamp(), o2.getTimestamp()); + + /** + * This array contains one hash set per key-group. The sets are used for fast de-duplication and deletes of timers. + */ + private final HashMap, TimerHeapInternalTimer>[] deduplicationMapsByKeyGroup; + + /** + * The array that represents the heap-organized priority queue. + */ + private TimerHeapInternalTimer[] queue; + + /** + * The current size of the priority queue. + */ + private int size; + + /** + * The key-group range of timers that are managed by this queue. + */ + private final KeyGroupRange keyGroupRange; + + /** + * The total number of key-groups of the job. + */ + private final int totalNumberOfKeyGroups; + + + /** + * Creates an empty {@link InternalTimerHeap} with the requested initial capacity. + * + * @param minimumCapacity the minimum and initial capacity of this priority queue. + */ + @SuppressWarnings("unchecked") + InternalTimerHeap( + @Nonnegative int minimumCapacity, + @Nonnull KeyGroupRange keyGroupRange, + @Nonnegative int totalNumberOfKeyGroups) { + + this.totalNumberOfKeyGroups = totalNumberOfKeyGroups; + this.keyGroupRange = keyGroupRange; + + final int keyGroupsInLocalRange = keyGroupRange.getNumberOfKeyGroups(); + final int deduplicationSetSize = 1 + minimumCapacity / keyGroupsInLocalRange; + this.deduplicationMapsByKeyGroup = new HashMap[keyGroupsInLocalRange]; + for (int i = 0; i < keyGroupsInLocalRange; ++i) { + deduplicationMapsByKeyGroup[i] = new HashMap<>(deduplicationSetSize); + } + + this.queue = new TimerHeapInternalTimer[1 + minimumCapacity]; + } + + @Nullable + public InternalTimer poll() { + return size() > 0 ? removeElementAtIndex(1) : null; + } + + @Nullable + public InternalTimer peek() { + return size() > 0 ? queue[1] : null; + } + + /** + * Adds a new timer with the given timestamp, key, and namespace to the heap, if an identical timer was not yet + * registered. + * + * @param timestamp the timer timestamp. + * @param key the timer key. + * @param namespace the timer namespace. + * @return true iff a new timer with given timestamp, key, and namespace was added to the heap. + */ + public boolean scheduleTimer(long timestamp, K key, N namespace) { + return addInternal(new TimerHeapInternalTimer<>(timestamp, key, namespace)); + } + + /** + * Stops timer with the given timestamp, key, and namespace by removing it from the heap, if it exists on the heap. + * + * @param timestamp the timer timestamp. + * @param key the timer key. + * @param namespace the timer namespace. + * @return true iff a timer with given timestamp, key, and namespace was found and removed from the heap. + */ + public boolean stopTimer(long timestamp, K key, N namespace) { + return removeInternal(new TimerHeapInternalTimer<>(timestamp, key, namespace)); + } + + public boolean isEmpty() { + return size() == 0; + } + + @Nonnegative + public int size() { + return size; + } + + public void clear() { + Arrays.fill(queue, null); + for (HashMap, TimerHeapInternalTimer> timerHashMap : + deduplicationMapsByKeyGroup) { + timerHashMap.clear(); + } + size = 0; + } + + @SuppressWarnings({"unchecked"}) + @Nonnull + public InternalTimer[] toArray() { + return (InternalTimer[]) Arrays.copyOfRange(queue, 1, size + 1, queue.getClass()); + } + + /** + * Returns an iterator over the elements in this queue. The iterator + * does not return the elements in any particular order. + * + * @return an iterator over the elements in this queue. + */ + @Nonnull + public Iterator> iterator() { + return new InternalTimerPriorityQueueIterator(); + } + + /** + * This method adds all the given timers to the heap. + */ + void bulkAddRestoredTimers(Collection> restoredTimers) { + + if (restoredTimers == null) { + return; + } + + resizeForBulkLoad(restoredTimers.size()); + + for (InternalTimer timer : restoredTimers) { + if (timer instanceof TimerHeapInternalTimer) { + addInternal((TimerHeapInternalTimer) timer); + } else { + scheduleTimer(timer.getTimestamp(), timer.getKey(), timer.getNamespace()); + } + } + } + + /** + * Returns an unmodifiable set of all timers in the given key-group. + */ + Set> getTimersForKeyGroup(@Nonnegative int keyGroupIdx) { + return Collections.unmodifiableSet(getDedupMapForKeyGroup(keyGroupIdx).keySet()); + } + + @VisibleForTesting + @SuppressWarnings("unchecked") + List>> getTimersByKeyGroup() { + List>> result = new ArrayList<>(deduplicationMapsByKeyGroup.length); + for (int i = 0; i < deduplicationMapsByKeyGroup.length; ++i) { + result.add(i, Collections.unmodifiableSet(deduplicationMapsByKeyGroup[i].keySet())); + } + return result; + } + + private boolean addInternal(TimerHeapInternalTimer timer) { + + if (getDedupMapForTimer(timer).putIfAbsent(timer, timer) == null) { + final int newSize = increaseSizeByOne(); + moveElementToIdx(timer, newSize); + siftUp(newSize); + return true; + } else { + return false; + } + } + + private boolean removeInternal(TimerHeapInternalTimer timerToRemove) { + + TimerHeapInternalTimer storedTimer = getDedupMapForTimer(timerToRemove).remove(timerToRemove); + + if (storedTimer != null) { + removeElementAtIndex(storedTimer.getTimerHeapIndex()); + return true; + } + + return false; + } + + private TimerHeapInternalTimer removeElementAtIndex(int removeIdx) { + TimerHeapInternalTimer[] heap = this.queue; + TimerHeapInternalTimer removedValue = heap[removeIdx]; + + assert removedValue.getTimerHeapIndex() == removeIdx; + + final int oldSize = size; + + if (removeIdx != oldSize) { + TimerHeapInternalTimer timer = heap[oldSize]; + moveElementToIdx(timer, removeIdx); + siftDown(removeIdx); + if (heap[removeIdx] == timer) { + siftUp(removeIdx); + } + } + + heap[oldSize] = null; + getDedupMapForTimer(removedValue).remove(removedValue); + + --size; + return removedValue; + } + + private void siftUp(int idx) { + final TimerHeapInternalTimer[] heap = this.queue; + final TimerHeapInternalTimer currentTimer = heap[idx]; + int parentIdx = idx >>> 1; + + while (parentIdx > 0 && isTimerLessThen(currentTimer, heap[parentIdx])) { + moveElementToIdx(heap[parentIdx], idx); + idx = parentIdx; + parentIdx >>>= 1; + } + + moveElementToIdx(currentTimer, idx); + } + + private void siftDown(int idx) { + final TimerHeapInternalTimer[] heap = this.queue; + final int heapSize = this.size; + + final TimerHeapInternalTimer currentTimer = heap[idx]; + int firstChildIdx = idx << 1; + int secondChildIdx = firstChildIdx + 1; + + if (isTimerIndexValid(secondChildIdx, heapSize) && + isTimerLessThen(heap[secondChildIdx], heap[firstChildIdx])) { + firstChildIdx = secondChildIdx; + } + + while (isTimerIndexValid(firstChildIdx, heapSize) && + isTimerLessThen(heap[firstChildIdx], currentTimer)) { + moveElementToIdx(heap[firstChildIdx], idx); + idx = firstChildIdx; + firstChildIdx = idx << 1; + secondChildIdx = firstChildIdx + 1; + + if (isTimerIndexValid(secondChildIdx, heapSize) && + isTimerLessThen(heap[secondChildIdx], heap[firstChildIdx])) { + firstChildIdx = secondChildIdx; + } + } + + moveElementToIdx(currentTimer, idx); + } + + private HashMap, TimerHeapInternalTimer> getDedupMapForKeyGroup( + @Nonnegative int keyGroupIdx) { + return deduplicationMapsByKeyGroup[globalKeyGroupToLocalIndex(keyGroupIdx)]; + } + + private boolean isTimerIndexValid(int timerIndex, int heapSize) { + return timerIndex <= heapSize; + } + + private boolean isTimerLessThen(TimerHeapInternalTimer a, TimerHeapInternalTimer b) { + return COMPARATOR.compare(a, b) < 0; + } + + private void moveElementToIdx(TimerHeapInternalTimer element, int idx) { + queue[idx] = element; + element.setTimerHeapIndex(idx); + } + + private HashMap, TimerHeapInternalTimer> getDedupMapForTimer( + InternalTimer timer) { + int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), totalNumberOfKeyGroups); + return getDedupMapForKeyGroup(keyGroup); + } + + private int globalKeyGroupToLocalIndex(int keyGroup) { + checkArgument(keyGroupRange.contains(keyGroup)); + return keyGroup - keyGroupRange.getStartKeyGroup(); + } + + private int increaseSizeByOne() { + final int oldArraySize = queue.length; + final int minRequiredNewSize = ++size; + if (minRequiredNewSize >= oldArraySize) { + final int grow = (oldArraySize < 64) ? oldArraySize + 2 : oldArraySize >> 1; + resizeQueueArray(oldArraySize + grow, minRequiredNewSize); + } + // TODO implement shrinking as well? + return minRequiredNewSize; + } + + private void resizeForBulkLoad(int totalSize) { + if (totalSize > queue.length) { + int desiredSize = totalSize + (totalSize >>> 3); + resizeQueueArray(desiredSize, totalSize); + } + } + + private void resizeQueueArray(int desiredSize, int minRequiredSize) { + if (isValidArraySize(desiredSize)) { + queue = Arrays.copyOf(queue, desiredSize); + } else if (isValidArraySize(minRequiredSize)) { + queue = Arrays.copyOf(queue, MAX_ARRAY_SIZE); + } else { + throw new OutOfMemoryError("Required minimum timer heap size " + minRequiredSize + + " exceeds maximum size of " + MAX_ARRAY_SIZE + "."); + } + } + + private static boolean isValidArraySize(int size) { + return size >= 0 && size <= MAX_ARRAY_SIZE; + } + + /** + * {@link Iterator} implementation for {@link InternalTimerPriorityQueueIterator}. + * {@link Iterator#remove()} is not supported. + */ + private class InternalTimerPriorityQueueIterator implements Iterator> { + + private int iterationIdx; + + InternalTimerPriorityQueueIterator() { + this.iterationIdx = 0; + } + + @Override + public boolean hasNext() { + return iterationIdx < size; + } + + @Override + public InternalTimer next() { + if (iterationIdx >= size) { + throw new NoSuchElementException("Iterator has no next element."); + } + return queue[++iterationIdx]; + } + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java index 53d15079a1014..78252a04e31de 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java @@ -22,7 +22,7 @@ import org.apache.flink.core.io.PostVersionedIOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.state.KeyGroupsList; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import java.io.IOException; @@ -47,7 +47,7 @@ public class InternalTimerServiceSerializationProxy extends PostVersionedI /** Properties of restored timer services. */ private int keyGroupIdx; private int totalKeyGroups; - private KeyGroupsList localKeyGroupRange; + private KeyGroupRange localKeyGroupRange; private KeyContext keyContext; private ProcessingTimeService processingTimeService; @@ -58,7 +58,7 @@ public InternalTimerServiceSerializationProxy( Map> timerServicesMapToPopulate, ClassLoader userCodeClassLoader, int totalKeyGroups, - KeyGroupsList localKeyGroupRange, + KeyGroupRange localKeyGroupRange, KeyContext keyContext, ProcessingTimeService processingTimeService, int keyGroupIdx) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java index 3173625d10fa8..05b77a7143cb0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimersSnapshotReaderWriters.java @@ -96,7 +96,7 @@ public AbstractInternalTimersSnapshotWriter(InternalTimersSnapshot timersS public final void writeTimersSnapshot(DataOutputView out) throws IOException { writeKeyAndNamespaceSerializers(out); - InternalTimer.TimerSerializer timerSerializer = new InternalTimer.TimerSerializer<>( + TimerHeapInternalTimer.TimerSerializer timerSerializer = new TimerHeapInternalTimer.TimerSerializer<>( timersSnapshot.getKeySerializer(), timersSnapshot.getNamespaceSerializer()); @@ -215,9 +215,10 @@ public final InternalTimersSnapshot readTimersSnapshot(DataInputView in) t restoreKeyAndNamespaceSerializers(restoredTimersSnapshot, in); - InternalTimer.TimerSerializer timerSerializer = new InternalTimer.TimerSerializer<>( - restoredTimersSnapshot.getKeySerializer(), - restoredTimersSnapshot.getNamespaceSerializer()); + TimerHeapInternalTimer.TimerSerializer timerSerializer = + new TimerHeapInternalTimer.TimerSerializer<>( + restoredTimersSnapshot.getKeySerializer(), + restoredTimersSnapshot.getNamespaceSerializer()); // read the event time timers int sizeOfEventTimeTimers = in.readInt(); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java new file mode 100644 index 0000000000000..906b0908abbf7 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/TimerHeapInternalTimer.java @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.typeutils.CompatibilityResult; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; +import org.apache.flink.api.common.typeutils.base.LongSerializer; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; + +import javax.annotation.Nonnull; + +import java.io.IOException; + +/** + * Implementation of {@link InternalTimer} for the {@link InternalTimerHeap}. + * + * @param Type of the keys to which timers are scoped. + * @param Type of the namespace to which timers are scoped. + */ +@Internal +public final class TimerHeapInternalTimer implements InternalTimer { + + /** The index that indicates that a tracked internal timer is not tracked. */ + private static final int NOT_MANAGED_BY_TIMER_QUEUE_INDEX = Integer.MIN_VALUE; + + @Nonnull + private final K key; + + @Nonnull + private final N namespace; + + private final long timestamp; + + /** + * This field holds the current physical index of this timer when it is managed by a timer heap so that we can + * support fast deletes. + */ + private transient int timerHeapIndex; + + TimerHeapInternalTimer(long timestamp, @Nonnull K key, @Nonnull N namespace) { + this.timestamp = timestamp; + this.key = key; + this.namespace = namespace; + this.timerHeapIndex = NOT_MANAGED_BY_TIMER_QUEUE_INDEX; + } + + @Override + public long getTimestamp() { + return timestamp; + } + + @Nonnull + @Override + public K getKey() { + return key; + } + + @Nonnull + @Override + public N getNamespace() { + return namespace; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o instanceof InternalTimer) { + InternalTimer timer = (InternalTimer) o; + return timestamp == timer.getTimestamp() + && key.equals(timer.getKey()) + && namespace.equals(timer.getNamespace()); + } + + return false; + } + + /** + * Returns the current index of this timer in the owning timer heap. + */ + int getTimerHeapIndex() { + return timerHeapIndex; + } + + /** + * Sets the current index of this timer in the owning timer heap and should only be called by the managing heap. + * @param timerHeapIndex the new index in the timer heap. + */ + void setTimerHeapIndex(int timerHeapIndex) { + this.timerHeapIndex = timerHeapIndex; + } + + /** + * This method can be called to indicate that the timer is no longer managed be a timer heap, e.g. because it as + * removed. + */ + void removedFromTimerQueue() { + setTimerHeapIndex(NOT_MANAGED_BY_TIMER_QUEUE_INDEX); + } + + @Override + public int hashCode() { + int result = (int) (timestamp ^ (timestamp >>> 32)); + result = 31 * result + key.hashCode(); + result = 31 * result + namespace.hashCode(); + return result; + } + + @Override + public String toString() { + return "Timer{" + + "timestamp=" + timestamp + + ", key=" + key + + ", namespace=" + namespace + + '}'; + } + + /** + * A {@link TypeSerializer} used to serialize/deserialize a {@link TimerHeapInternalTimer}. + */ + public static class TimerSerializer extends TypeSerializer> { + + private static final long serialVersionUID = 1119562170939152304L; + + @Nonnull + private final TypeSerializer keySerializer; + + @Nonnull + private final TypeSerializer namespaceSerializer; + + TimerSerializer(@Nonnull TypeSerializer keySerializer, @Nonnull TypeSerializer namespaceSerializer) { + this.keySerializer = keySerializer; + this.namespaceSerializer = namespaceSerializer; + } + + @Override + public boolean isImmutableType() { + return false; + } + + @Override + public TypeSerializer> duplicate() { + + final TypeSerializer keySerializerDuplicate = keySerializer.duplicate(); + final TypeSerializer namespaceSerializerDuplicate = namespaceSerializer.duplicate(); + + if (keySerializerDuplicate == keySerializer && + namespaceSerializerDuplicate == namespaceSerializer) { + // all delegate serializers seem stateless, so this is also stateless. + return this; + } else { + // at least one delegate serializer seems to be stateful, so we return a new instance. + return new TimerSerializer<>(keySerializerDuplicate, namespaceSerializerDuplicate); + } + } + + @Override + public InternalTimer createInstance() { + throw new UnsupportedOperationException(); + } + + @Override + public InternalTimer copy(InternalTimer from) { + return new TimerHeapInternalTimer<>(from.getTimestamp(), from.getKey(), from.getNamespace()); + } + + @Override + public InternalTimer copy(InternalTimer from, InternalTimer reuse) { + return copy(from); + } + + @Override + public int getLength() { + // we do not have fixed length + return -1; + } + + @Override + public void serialize(InternalTimer record, DataOutputView target) throws IOException { + keySerializer.serialize(record.getKey(), target); + namespaceSerializer.serialize(record.getNamespace(), target); + LongSerializer.INSTANCE.serialize(record.getTimestamp(), target); + } + + @Override + public InternalTimer deserialize(DataInputView source) throws IOException { + K key = keySerializer.deserialize(source); + N namespace = namespaceSerializer.deserialize(source); + Long timestamp = LongSerializer.INSTANCE.deserialize(source); + return new TimerHeapInternalTimer<>(timestamp, key, namespace); + } + + @Override + public InternalTimer deserialize(InternalTimer reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + keySerializer.copy(source, target); + namespaceSerializer.copy(source, target); + LongSerializer.INSTANCE.copy(source, target); + } + + @Override + public boolean equals(Object obj) { + return obj == this || + (obj != null && obj.getClass() == getClass() && + keySerializer.equals(((TimerSerializer) obj).keySerializer) && + namespaceSerializer.equals(((TimerSerializer) obj).namespaceSerializer)); + } + + @Override + public boolean canEqual(Object obj) { + return true; + } + + @Override + public int hashCode() { + return getClass().hashCode(); + } + + @Override + public TypeSerializerConfigSnapshot snapshotConfiguration() { + throw new UnsupportedOperationException("This serializer is not registered for managed state."); + } + + @Override + public CompatibilityResult> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { + throw new UnsupportedOperationException("This serializer is not registered for managed state."); + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java index aade876c81c9d..1ce400df52507 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/HeapInternalTimerServiceTest.java @@ -24,7 +24,6 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.KeyGroupsList; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; @@ -41,6 +40,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; @@ -79,7 +79,7 @@ public void testKeyGroupStartIndexSetting() { int startKeyGroupIdx = 7; int endKeyGroupIdx = 21; - KeyGroupsList testKeyGroupList = new KeyGroupRange(startKeyGroupIdx, endKeyGroupIdx); + KeyGroupRange testKeyGroupList = new KeyGroupRange(startKeyGroupIdx, endKeyGroupIdx); TestKeyContext keyContext = new TestKeyContext(); @@ -119,7 +119,7 @@ public void testTimerAssignmentToKeyGroups() { for (int i = 0; i < totalNoOfTimers; i++) { // create the timer to be registered - InternalTimer timer = new InternalTimer<>(10 + i, i, "hello_world_" + i); + TimerHeapInternalTimer timer = new TimerHeapInternalTimer<>(10 + i, i, "hello_world_" + i); int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), totalNoOfKeyGroups); // add it in the adequate expected set of timers per keygroup @@ -136,21 +136,23 @@ public void testTimerAssignmentToKeyGroups() { timerService.registerProcessingTimeTimer(timer.getNamespace(), timer.getTimestamp()); } - Set>[] eventTimeTimers = timerService.getEventTimeTimersPerKeyGroup(); - Set>[] processingTimeTimers = timerService.getProcessingTimeTimersPerKeyGroup(); + List>> eventTimeTimers = + timerService.getEventTimeTimersPerKeyGroup(); + List>> processingTimeTimers = + timerService.getProcessingTimeTimersPerKeyGroup(); // finally verify that the actual timers per key group sets are the expected ones. for (int i = 0; i < expectedNonEmptyTimerSets.length; i++) { Set> expected = expectedNonEmptyTimerSets[i]; - Set> actualEvent = eventTimeTimers[i]; - Set> actualProcessing = processingTimeTimers[i]; + Set> actualEvent = eventTimeTimers.get(i); + Set> actualProcessing = processingTimeTimers.get(i); if (expected == null) { - Assert.assertNull(actualEvent); - Assert.assertNull(actualProcessing); + Assert.assertTrue(actualEvent.isEmpty()); + Assert.assertTrue(actualProcessing.isEmpty()); } else { - Assert.assertArrayEquals(expected.toArray(), actualEvent.toArray()); - Assert.assertArrayEquals(expected.toArray(), actualProcessing.toArray()); + Assert.assertEquals(expected, actualEvent); + Assert.assertEquals(expected, actualProcessing); } } } @@ -379,10 +381,10 @@ public void testSetAndFireEventTimeTimers() throws Exception { timerService.advanceWatermark(10); verify(mockTriggerable, times(4)).onEventTime(anyInternalTimer()); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "ciao"))); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "hello"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key2, "hello"))); assertEquals(0, timerService.numEventTimeTimers()); } @@ -424,10 +426,10 @@ public void testSetAndFireProcessingTimeTimers() throws Exception { processingTimeService.setCurrentTime(10); verify(mockTriggerable, times(4)).onProcessingTime(anyInternalTimer()); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "hello"))); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "ciao"))); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key2, "hello"))); assertEquals(0, timerService.numProcessingTimeTimers()); } @@ -481,10 +483,10 @@ public void testDeleteEventTimeTimers() throws Exception { timerService.advanceWatermark(10); verify(mockTriggerable, times(2)).onEventTime(anyInternalTimer()); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "ciao"))); - verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); - verify(mockTriggerable, times(0)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); - verify(mockTriggerable, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "hello"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable, times(0)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable, times(0)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key2, "hello"))); assertEquals(0, timerService.numEventTimeTimers()); } @@ -538,10 +540,10 @@ public void testDeleteProcessingTimeTimers() throws Exception { processingTimeService.setCurrentTime(10); verify(mockTriggerable, times(2)).onProcessingTime(anyInternalTimer()); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); - verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, key1, "hello"))); - verify(mockTriggerable, times(0)).onProcessingTime(eq(new InternalTimer<>(10, key2, "ciao"))); - verify(mockTriggerable, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable, times(0)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable, times(0)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key2, "hello"))); assertEquals(0, timerService.numEventTimeTimers()); } @@ -635,11 +637,11 @@ private void testSnapshotAndRestore(int snapshotVersion) throws Exception { timerService.advanceWatermark(10); verify(mockTriggerable2, times(2)).onProcessingTime(anyInternalTimer()); - verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); - verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); + verify(mockTriggerable2, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable2, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key2, "hello"))); verify(mockTriggerable2, times(2)).onEventTime(anyInternalTimer()); - verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); - verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable2, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable2, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key2, "ciao"))); assertEquals(0, timerService.numEventTimeTimers()); } @@ -737,11 +739,11 @@ private void testSnapshotAndRebalancingRestore(int snapshotVersion) throws Excep timerService1.advanceWatermark(10); verify(mockTriggerable1, times(1)).onProcessingTime(anyInternalTimer()); - verify(mockTriggerable1, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); - verify(mockTriggerable1, never()).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); + verify(mockTriggerable1, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable1, never()).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key2, "hello"))); verify(mockTriggerable1, times(1)).onEventTime(anyInternalTimer()); - verify(mockTriggerable1, times(1)).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); - verify(mockTriggerable1, never()).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable1, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable1, never()).onEventTime(eq(new TimerHeapInternalTimer<>(10, key2, "ciao"))); assertEquals(0, timerService1.numEventTimeTimers()); @@ -749,11 +751,11 @@ private void testSnapshotAndRebalancingRestore(int snapshotVersion) throws Excep timerService2.advanceWatermark(10); verify(mockTriggerable2, times(1)).onProcessingTime(anyInternalTimer()); - verify(mockTriggerable2, never()).onProcessingTime(eq(new InternalTimer<>(10, key1, "ciao"))); - verify(mockTriggerable2, times(1)).onProcessingTime(eq(new InternalTimer<>(10, key2, "hello"))); + verify(mockTriggerable2, never()).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key1, "ciao"))); + verify(mockTriggerable2, times(1)).onProcessingTime(eq(new TimerHeapInternalTimer<>(10, key2, "hello"))); verify(mockTriggerable2, times(1)).onEventTime(anyInternalTimer()); - verify(mockTriggerable2, never()).onEventTime(eq(new InternalTimer<>(10, key1, "hello"))); - verify(mockTriggerable2, times(1)).onEventTime(eq(new InternalTimer<>(10, key2, "ciao"))); + verify(mockTriggerable2, never()).onEventTime(eq(new TimerHeapInternalTimer<>(10, key1, "hello"))); + verify(mockTriggerable2, times(1)).onEventTime(eq(new TimerHeapInternalTimer<>(10, key2, "ciao"))); assertEquals(0, timerService2.numEventTimeTimers()); } @@ -795,7 +797,7 @@ private static HeapInternalTimerService createTimerService( Triggerable triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, - KeyGroupsList keyGroupList, + KeyGroupRange keyGroupList, int maxParallelism) { HeapInternalTimerService service = new HeapInternalTimerService<>( @@ -814,7 +816,7 @@ private static HeapInternalTimerService restoreTimerService( Triggerable triggerable, KeyContext keyContext, ProcessingTimeService processingTimeService, - KeyGroupsList keyGroupsList, + KeyGroupRange keyGroupsList, int maxParallelism) throws Exception { // create an empty service diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java new file mode 100644 index 0000000000000..997bffc30acbd --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/InternalTimerHeapTest.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; + +/** + * Tests for {@link InternalTimerHeap}. + */ +public class InternalTimerHeapTest extends TestLogger { + + private static final KeyGroupRange KEY_GROUP_RANGE = new KeyGroupRange(0, 1); + + private static void insertRandomTimers( + InternalTimerHeap timerPriorityQueue, + int count) { + insertRandomTimers(timerPriorityQueue, null, count); + } + + private static void insertRandomTimers( + @Nonnull InternalTimerHeap timerPriorityQueue, + @Nullable Set> checkSet, + int count) { + + ThreadLocalRandom localRandom = ThreadLocalRandom.current(); + + for (int i = 0; i < count; ++i) { + TimerHeapInternalTimer timer = + new TimerHeapInternalTimer<>(localRandom.nextLong(), i, VoidNamespace.INSTANCE); + if (checkSet != null) { + Preconditions.checkState(checkSet.add(timer)); + } + Assert.assertTrue(timerPriorityQueue.scheduleTimer( + timer.getTimestamp(), + timer.getKey(), + timer.getNamespace())); + } + } + + private static InternalTimerHeap newPriorityQueue(int initialCapacity) { + return new InternalTimerHeap<>( + initialCapacity, + KEY_GROUP_RANGE, + KEY_GROUP_RANGE.getNumberOfKeyGroups()); + } + + @Test + public void testPeekPollOrder() { + final int initialCapacity = 4; + final int testSize = 1000; + InternalTimerHeap timerPriorityQueue = newPriorityQueue(initialCapacity); + HashSet> checkSet = new HashSet<>(testSize); + + insertRandomTimers(timerPriorityQueue, checkSet, testSize); + + long lastTimestamp = Long.MIN_VALUE; + int lastSize = timerPriorityQueue.size(); + Assert.assertEquals(testSize, lastSize); + InternalTimer timer; + while ((timer = timerPriorityQueue.peek()) != null) { + Assert.assertFalse(timerPriorityQueue.isEmpty()); + Assert.assertEquals(lastSize, timerPriorityQueue.size()); + Assert.assertEquals(timer, timerPriorityQueue.poll()); + Assert.assertTrue(checkSet.remove(timer)); + Assert.assertTrue(timer.getTimestamp() >= lastTimestamp); + lastTimestamp = timer.getTimestamp(); + --lastSize; + } + + Assert.assertTrue(timerPriorityQueue.isEmpty()); + Assert.assertEquals(0, timerPriorityQueue.size()); + Assert.assertEquals(0, checkSet.size()); + } + + @Test + public void testStopInsertMixKeepsOrder() { + + InternalTimerHeap timerPriorityQueue = newPriorityQueue(3); + + final int testSize = 345; + HashSet> checkSet = new HashSet<>(testSize); + + insertRandomTimers(timerPriorityQueue, checkSet, testSize); + + // check that the whole set is still in order + while (!checkSet.isEmpty()) { + + Iterator> iterator = checkSet.iterator(); + InternalTimer timer = iterator.next(); + iterator.remove(); + Assert.assertTrue(timerPriorityQueue.stopTimer(timer.getTimestamp(), timer.getKey(), timer.getNamespace())); + Assert.assertEquals(checkSet.size(), timerPriorityQueue.size()); + + long lastTimestamp = Long.MIN_VALUE; + + while ((timer = timerPriorityQueue.poll()) != null) { + Assert.assertTrue(timer.getTimestamp() >= lastTimestamp); + lastTimestamp = timer.getTimestamp(); + } + + Assert.assertTrue(timerPriorityQueue.isEmpty()); + + timerPriorityQueue.bulkAddRestoredTimers(checkSet); + } + } + + @Test + public void testPoll() { + InternalTimerHeap timerPriorityQueue = newPriorityQueue(3); + + Assert.assertNull(timerPriorityQueue.poll()); + + final int testSize = 345; + HashSet> checkSet = new HashSet<>(testSize); + insertRandomTimers(timerPriorityQueue, checkSet, testSize); + + long lastTimestamp = Long.MIN_VALUE; + while (!timerPriorityQueue.isEmpty()) { + InternalTimer removed = timerPriorityQueue.poll(); + Assert.assertNotNull(removed); + Assert.assertTrue(checkSet.remove(removed)); + Assert.assertTrue(removed.getTimestamp() >= lastTimestamp); + lastTimestamp = removed.getTimestamp(); + } + Assert.assertTrue(checkSet.isEmpty()); + + Assert.assertNull(timerPriorityQueue.poll()); + } + + @Test + public void testIsEmpty() { + InternalTimerHeap timerPriorityQueue = + newPriorityQueue(1); + + Assert.assertTrue(timerPriorityQueue.isEmpty()); + + timerPriorityQueue.scheduleTimer(42L, 4711, VoidNamespace.INSTANCE); + Assert.assertFalse(timerPriorityQueue.isEmpty()); + + timerPriorityQueue.poll(); + Assert.assertTrue(timerPriorityQueue.isEmpty()); + } + + @Test + public void testBulkAddRestoredTimers() { + final int testSize = 10; + HashSet> timerSet = new HashSet<>(testSize); + for (int i = 0; i < testSize; ++i) { + timerSet.add(new TimerHeapInternalTimer<>(i, i, VoidNamespace.INSTANCE)); + } + + List> twoTimesTimerSet = new ArrayList<>(timerSet.size() * 2); + twoTimesTimerSet.addAll(timerSet); + twoTimesTimerSet.addAll(timerSet); + + InternalTimerHeap timerPriorityQueue = + newPriorityQueue(1); + + timerPriorityQueue.bulkAddRestoredTimers(twoTimesTimerSet); + timerPriorityQueue.bulkAddRestoredTimers(twoTimesTimerSet); + + Assert.assertEquals(timerSet.size(), timerPriorityQueue.size()); + + for (InternalTimer timer : timerPriorityQueue) { + Assert.assertTrue(timerSet.remove(timer)); + } + + Assert.assertTrue(timerSet.isEmpty()); + } + + @Test + public void testToArray() { + final int testSize = 10; + HashSet> checkSet = new HashSet<>(testSize); + InternalTimerHeap timerPriorityQueue = + newPriorityQueue(1); + + Assert.assertEquals(0, timerPriorityQueue.toArray().length); + + insertRandomTimers(timerPriorityQueue, checkSet, testSize); + + Object[] toArray = timerPriorityQueue.toArray(); + Assert.assertEquals(timerPriorityQueue.size(), toArray.length); + + for (Object o : toArray) { + if (o instanceof TimerHeapInternalTimer) { + Assert.assertTrue(checkSet.remove(o)); + } + } + + Assert.assertTrue(checkSet.isEmpty()); + } + + @Test + public void testIterator() { + InternalTimerHeap timerPriorityQueue = + newPriorityQueue(1); + + // test empty iterator + Iterator> iterator = timerPriorityQueue.iterator(); + Assert.assertFalse(iterator.hasNext()); + try { + iterator.next(); + Assert.fail(); + } catch (NoSuchElementException ignore) { + } + + // iterate some data + final int testSize = 10; + HashSet> checkSet = new HashSet<>(testSize); + insertRandomTimers(timerPriorityQueue, checkSet, testSize); + iterator = timerPriorityQueue.iterator(); + while (iterator.hasNext()) { + Assert.assertTrue(checkSet.remove(iterator.next())); + } + Assert.assertTrue(checkSet.isEmpty()); + + // test remove is not supported + try { + iterator.remove(); + Assert.fail(); + } catch (UnsupportedOperationException ignore) { + } + } + + @Test + public void testClear() { + InternalTimerHeap timerPriorityQueue = + newPriorityQueue(1); + + int count = 10; + insertRandomTimers(timerPriorityQueue, count); + Assert.assertEquals(count, timerPriorityQueue.size()); + timerPriorityQueue.clear(); + Assert.assertEquals(0, timerPriorityQueue.size()); + } + + @Test + public void testScheduleTimer() { + InternalTimerHeap timerPriorityQueue = + newPriorityQueue(1); + + final long timestamp = 42L; + final Integer key = 4711; + Assert.assertTrue(timerPriorityQueue.scheduleTimer(timestamp, key, VoidNamespace.INSTANCE)); + Assert.assertFalse(timerPriorityQueue.scheduleTimer(timestamp, key, VoidNamespace.INSTANCE)); + Assert.assertEquals(1, timerPriorityQueue.size()); + final InternalTimer timer = timerPriorityQueue.poll(); + Assert.assertNotNull(timer); + Assert.assertEquals(timestamp, timer.getTimestamp()); + Assert.assertEquals(key, timer.getKey()); + Assert.assertEquals(VoidNamespace.INSTANCE, timer.getNamespace()); + } + + @Test + public void testStopTimer() { + InternalTimerHeap timerPriorityQueue = + newPriorityQueue(1); + + final long timestamp = 42L; + final Integer key = 4711; + Assert.assertFalse(timerPriorityQueue.stopTimer(timestamp, key, VoidNamespace.INSTANCE)); + Assert.assertTrue(timerPriorityQueue.scheduleTimer(timestamp, key, VoidNamespace.INSTANCE)); + Assert.assertTrue(timerPriorityQueue.stopTimer(timestamp, key, VoidNamespace.INSTANCE)); + Assert.assertFalse(timerPriorityQueue.stopTimer(timestamp, key, VoidNamespace.INSTANCE)); + Assert.assertTrue(timerPriorityQueue.isEmpty()); + } +} From a60b2409990bda6fed53d6b71f3db90b8873824a Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Thu, 24 May 2018 15:08:02 +0200 Subject: [PATCH 2/2] [FLINK-9436][state] Remove generic parameter namespace from InternalTimeServiceManager. --- .../api/operators/AbstractStreamOperator.java | 4 ++-- .../api/operators/InternalTimeServiceManager.java | 14 +++++++------- .../InternalTimerServiceSerializationProxy.java | 14 +++++++------- .../api/operators/StreamOperatorStateContext.java | 2 +- .../operators/StreamTaskStateInitializerImpl.java | 12 ++++++------ .../StateInitializationContextImplTest.java | 2 +- .../StreamOperatorSnapshotRestoreTest.java | 2 +- .../StreamTaskStateInitializerImplTest.java | 6 +++--- .../streaming/runtime/tasks/StreamTaskTest.java | 4 ++-- 9 files changed, 30 insertions(+), 30 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index e447cbeec05b0..0e80df5376451 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -151,7 +151,7 @@ public abstract class AbstractStreamOperator // ---------------- time handler ------------------ - protected transient InternalTimeServiceManager timeServiceManager; + protected transient InternalTimeServiceManager timeServiceManager; // ---------------- two-input operator watermarks ------------------ @@ -725,7 +725,7 @@ public InternalTimerService getInternalTimerService( // the following casting is to overcome type restrictions. TypeSerializer keySerializer = (TypeSerializer) getKeyedStateBackend().getKeySerializer(); - InternalTimeServiceManager keyedTimeServiceHandler = (InternalTimeServiceManager) timeServiceManager; + InternalTimeServiceManager keyedTimeServiceHandler = (InternalTimeServiceManager) timeServiceManager; return keyedTimeServiceHandler.getInternalTimerService(name, keySerializer, namespaceSerializer, triggerable); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java index 0f193ecded01e..e62883aed3fe0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimeServiceManager.java @@ -41,10 +41,9 @@ * NOTE: These services are only available to keyed operators. * * @param The type of keys used for the timers and the registry. - * @param The type of namespace used for the timers. */ @Internal -public class InternalTimeServiceManager { +public class InternalTimeServiceManager { private final int totalKeyGroups; private final KeyGroupRange localKeyGroupRange; @@ -52,7 +51,7 @@ public class InternalTimeServiceManager { private final ProcessingTimeService processingTimeService; - private final Map> timerServices; + private final Map> timerServices; InternalTimeServiceManager( int totalKeyGroups, @@ -90,10 +89,11 @@ public class InternalTimeServiceManager { * @param namespaceSerializer {@code TypeSerializer} for the timer namespace. * @param triggerable The {@link Triggerable} that should be invoked when timers fire */ - public InternalTimerService getInternalTimerService(String name, TypeSerializer keySerializer, + @SuppressWarnings("unchecked") + public InternalTimerService getInternalTimerService(String name, TypeSerializer keySerializer, TypeSerializer namespaceSerializer, Triggerable triggerable) { - HeapInternalTimerService timerService = timerServices.get(name); + HeapInternalTimerService timerService = (HeapInternalTimerService) timerServices.get(name); if (timerService == null) { timerService = new HeapInternalTimerService<>(totalKeyGroups, localKeyGroupRange, keyContext, processingTimeService); @@ -112,7 +112,7 @@ public void advanceWatermark(Watermark watermark) throws Exception { ////////////////// Fault Tolerance Methods /////////////////// public void snapshotStateForKeyGroup(DataOutputView stream, int keyGroupIdx) throws IOException { - InternalTimerServiceSerializationProxy serializationProxy = + InternalTimerServiceSerializationProxy serializationProxy = new InternalTimerServiceSerializationProxy<>(timerServices, keyGroupIdx); serializationProxy.write(stream); @@ -123,7 +123,7 @@ public void restoreStateForKeyGroup( int keyGroupIdx, ClassLoader userCodeClassLoader) throws IOException { - InternalTimerServiceSerializationProxy serializationProxy = + InternalTimerServiceSerializationProxy serializationProxy = new InternalTimerServiceSerializationProxy<>( timerServices, userCodeClassLoader, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java index 78252a04e31de..efa93d3e266a0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/InternalTimerServiceSerializationProxy.java @@ -34,12 +34,12 @@ * Serialization proxy for the timer services for a given key-group. */ @Internal -public class InternalTimerServiceSerializationProxy extends PostVersionedIOReadableWritable { +public class InternalTimerServiceSerializationProxy extends PostVersionedIOReadableWritable { public static final int VERSION = 1; /** The key-group timer services to write / read. */ - private Map> timerServices; + private Map> timerServices; /** The user classloader; only relevant if the proxy is used to restore timer services. */ private ClassLoader userCodeClassLoader; @@ -55,7 +55,7 @@ public class InternalTimerServiceSerializationProxy extends PostVersionedI * Constructor to use when restoring timer services. */ public InternalTimerServiceSerializationProxy( - Map> timerServicesMapToPopulate, + Map> timerServicesMapToPopulate, ClassLoader userCodeClassLoader, int totalKeyGroups, KeyGroupRange localKeyGroupRange, @@ -76,7 +76,7 @@ public InternalTimerServiceSerializationProxy( * Constructor to use when writing timer services. */ public InternalTimerServiceSerializationProxy( - Map> timerServices, + Map> timerServices, int keyGroupIdx) { this.timerServices = checkNotNull(timerServices); @@ -93,9 +93,9 @@ public void write(DataOutputView out) throws IOException { super.write(out); out.writeInt(timerServices.size()); - for (Map.Entry> entry : timerServices.entrySet()) { + for (Map.Entry> entry : timerServices.entrySet()) { String serviceName = entry.getKey(); - HeapInternalTimerService timerService = entry.getValue(); + HeapInternalTimerService timerService = entry.getValue(); out.writeUTF(serviceName); InternalTimersSnapshotReaderWriters @@ -111,7 +111,7 @@ protected void read(DataInputView in, boolean wasVersioned) throws IOException { for (int i = 0; i < noOfTimerServices; i++) { String serviceName = in.readUTF(); - HeapInternalTimerService timerService = timerServices.get(serviceName); + HeapInternalTimerService timerService = timerServices.get(serviceName); if (timerService == null) { timerService = new HeapInternalTimerService<>( totalKeyGroups, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java index 420b6bf9d33c9..2eb777a8db6e3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java @@ -49,7 +49,7 @@ public interface StreamOperatorStateContext { * Returns the internal timer service manager for the stream operator. This method returns null for non-keyed * operators. */ - InternalTimeServiceManager internalTimerServiceManager(); + InternalTimeServiceManager internalTimerServiceManager(); /** * Returns an iterable to obtain input streams for previously stored operator state partitions that are assigned to diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java index 460a52b4c7bca..578302ba1e5ee 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java @@ -124,7 +124,7 @@ public StreamOperatorStateContext streamOperatorStateContext( OperatorStateBackend operatorStateBackend = null; CloseableIterable rawKeyedStateInputs = null; CloseableIterable rawOperatorStateInputs = null; - InternalTimeServiceManager timeServiceManager; + InternalTimeServiceManager timeServiceManager; try { @@ -192,7 +192,7 @@ public StreamOperatorStateContext streamOperatorStateContext( } } - protected InternalTimeServiceManager internalTimeServiceManager( + protected InternalTimeServiceManager internalTimeServiceManager( AbstractKeyedStateBackend keyedStatedBackend, KeyContext keyContext, //the operator Iterable rawKeyedStates) throws Exception { @@ -203,7 +203,7 @@ protected InternalTimeServiceManager internalTimeServiceManager( final KeyGroupRange keyGroupRange = keyedStatedBackend.getKeyGroupRange(); - final InternalTimeServiceManager timeServiceManager = new InternalTimeServiceManager<>( + final InternalTimeServiceManager timeServiceManager = new InternalTimeServiceManager<>( keyedStatedBackend.getNumberOfKeyGroups(), keyGroupRange, keyContext, @@ -544,7 +544,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta private final OperatorStateBackend operatorStateBackend; private final AbstractKeyedStateBackend keyedStateBackend; - private final InternalTimeServiceManager internalTimeServiceManager; + private final InternalTimeServiceManager internalTimeServiceManager; private final CloseableIterable rawOperatorStateInputs; private final CloseableIterable rawKeyedStateInputs; @@ -553,7 +553,7 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta boolean restored, OperatorStateBackend operatorStateBackend, AbstractKeyedStateBackend keyedStateBackend, - InternalTimeServiceManager internalTimeServiceManager, + InternalTimeServiceManager internalTimeServiceManager, CloseableIterable rawOperatorStateInputs, CloseableIterable rawKeyedStateInputs) { @@ -581,7 +581,7 @@ public OperatorStateBackend operatorStateBackend() { } @Override - public InternalTimeServiceManager internalTimerServiceManager() { + public InternalTimeServiceManager internalTimerServiceManager() { return internalTimeServiceManager; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java index 90649f2abeccc..7d9dcd5ef11eb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateInitializationContextImplTest.java @@ -174,7 +174,7 @@ public void setUp() throws Exception { mock(ProcessingTimeService.class)) { @Override - protected InternalTimeServiceManager internalTimeServiceManager( + protected InternalTimeServiceManager internalTimeServiceManager( AbstractKeyedStateBackend keyedStatedBackend, KeyContext keyContext, Iterable rawKeyedStates) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java index 6d011a3baa1c6..53194cd7bcdad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamOperatorSnapshotRestoreTest.java @@ -200,7 +200,7 @@ protected StreamTaskStateInitializer createStreamTaskStateManager( return new StreamTaskStateInitializerImpl(env, stateBackend, processingTimeService) { @Override - protected InternalTimeServiceManager internalTimeServiceManager( + protected InternalTimeServiceManager internalTimeServiceManager( AbstractKeyedStateBackend keyedStatedBackend, KeyContext keyContext, Iterable rawKeyedStates) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java index b33a69ea1e6f8..000cf31c0861d 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java @@ -94,7 +94,7 @@ public void testNoRestore() throws Exception { OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); AbstractKeyedStateBackend keyedStateBackend = stateContext.keyedStateBackend(); - InternalTimeServiceManager timeServiceManager = stateContext.internalTimerServiceManager(); + InternalTimeServiceManager timeServiceManager = stateContext.internalTimerServiceManager(); CloseableIterable keyedStateInputs = stateContext.rawKeyedStateInputs(); CloseableIterable operatorStateInputs = stateContext.rawOperatorStateInputs(); @@ -194,7 +194,7 @@ public OperatorStateBackend createOperatorStateBackend( OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend(); AbstractKeyedStateBackend keyedStateBackend = stateContext.keyedStateBackend(); - InternalTimeServiceManager timeServiceManager = stateContext.internalTimerServiceManager(); + InternalTimeServiceManager timeServiceManager = stateContext.internalTimerServiceManager(); CloseableIterable keyedStateInputs = stateContext.rawKeyedStateInputs(); CloseableIterable operatorStateInputs = stateContext.rawOperatorStateInputs(); @@ -271,7 +271,7 @@ private StreamTaskStateInitializer streamTaskStateManager( stateBackend, processingTimeService) { @Override - protected InternalTimeServiceManager internalTimeServiceManager( + protected InternalTimeServiceManager internalTimeServiceManager( AbstractKeyedStateBackend keyedStatedBackend, KeyContext keyContext, Iterable rawKeyedStates) throws Exception { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 73a575e524955..c3ce83551f4f7 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -1109,8 +1109,8 @@ public AbstractKeyedStateBackend keyedStateBackend() { } @Override - public InternalTimeServiceManager internalTimerServiceManager() { - InternalTimeServiceManager timeServiceManager = context.internalTimerServiceManager(); + public InternalTimeServiceManager internalTimerServiceManager() { + InternalTimeServiceManager timeServiceManager = context.internalTimerServiceManager(); return timeServiceManager != null ? spy(timeServiceManager) : null; }