Skip to content

Commit

Permalink
[FLINK-9423][state] Implement efficient deletes for heap-based timer …
Browse files Browse the repository at this point in the history
…service.

This closes #6062.
  • Loading branch information
StefanRRichter committed May 31, 2018
1 parent b644801 commit ff0b9c1
Show file tree
Hide file tree
Showing 9 changed files with 1,067 additions and 373 deletions.
Expand Up @@ -22,15 +22,12 @@
import org.apache.flink.api.common.typeutils.CompatibilityResult; import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.Preconditions; import org.apache.flink.util.Preconditions;


import java.io.IOException; import java.util.List;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;


Expand All @@ -49,20 +46,18 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>,
/** /**
* Processing time timers that are currently in-flight. * Processing time timers that are currently in-flight.
*/ */
private final Set<InternalTimer<K, N>>[] processingTimeTimersByKeyGroup; private final InternalTimerHeap<K, N> processingTimeTimersQueue;
private final PriorityQueue<InternalTimer<K, N>> processingTimeTimersQueue;


/** /**
* Event time timers that are currently in-flight. * Event time timers that are currently in-flight.
*/ */
private final Set<InternalTimer<K, N>>[] eventTimeTimersByKeyGroup; private final InternalTimerHeap<K, N> eventTimeTimersQueue;
private final PriorityQueue<InternalTimer<K, N>> eventTimeTimersQueue;


/** /**
* Information concerning the local key-group range. * Information concerning the local key-group range.
*/ */
private final KeyGroupsList localKeyGroupRange; private final KeyGroupRange localKeyGroupRange;
private final int totalKeyGroups;
private final int localKeyGroupRangeStartIdx; private final int localKeyGroupRangeStartIdx;


/** /**
Expand Down Expand Up @@ -94,16 +89,14 @@ public class HeapInternalTimerService<K, N> implements InternalTimerService<N>,
/** The restored timers snapshot, if any. */ /** The restored timers snapshot, if any. */
private InternalTimersSnapshot<K, N> restoredTimersSnapshot; private InternalTimersSnapshot<K, N> restoredTimersSnapshot;


public HeapInternalTimerService( HeapInternalTimerService(
int totalKeyGroups, int totalKeyGroups,
KeyGroupsList localKeyGroupRange, KeyGroupRange localKeyGroupRange,
KeyContext keyContext, KeyContext keyContext,
ProcessingTimeService processingTimeService) { ProcessingTimeService processingTimeService) {


this.keyContext = checkNotNull(keyContext); this.keyContext = checkNotNull(keyContext);
this.processingTimeService = checkNotNull(processingTimeService); this.processingTimeService = checkNotNull(processingTimeService);

this.totalKeyGroups = totalKeyGroups;
this.localKeyGroupRange = checkNotNull(localKeyGroupRange); this.localKeyGroupRange = checkNotNull(localKeyGroupRange);


// find the starting index of the local key-group range // find the starting index of the local key-group range
Expand All @@ -113,14 +106,8 @@ public HeapInternalTimerService(
} }
this.localKeyGroupRangeStartIdx = startIdx; this.localKeyGroupRangeStartIdx = startIdx;


// the list of ids of the key-groups this task is responsible for this.eventTimeTimersQueue = new InternalTimerHeap<>(128, localKeyGroupRange, totalKeyGroups);
int localKeyGroups = this.localKeyGroupRange.getNumberOfKeyGroups(); this.processingTimeTimersQueue = new InternalTimerHeap<>(128, localKeyGroupRange, totalKeyGroups);

this.eventTimeTimersQueue = new PriorityQueue<>(100);
this.eventTimeTimersByKeyGroup = new HashSet[localKeyGroups];

this.processingTimeTimersQueue = new PriorityQueue<>(100);
this.processingTimeTimersByKeyGroup = new HashSet[localKeyGroups];
} }


/** /**
Expand Down Expand Up @@ -175,8 +162,9 @@ public void startTimerService(
this.triggerTarget = Preconditions.checkNotNull(triggerTarget); this.triggerTarget = Preconditions.checkNotNull(triggerTarget);


// re-register the restored timers (if any) // re-register the restored timers (if any)
if (processingTimeTimersQueue.size() > 0) { final InternalTimer<K, N> headTimer = processingTimeTimersQueue.peek();
nextTimer = processingTimeService.registerTimer(processingTimeTimersQueue.peek().getTimestamp(), this); if (headTimer != null) {
nextTimer = processingTimeService.registerTimer(headTimer.getTimestamp(), this);
} }
this.isInitialized = true; this.isInitialized = true;
} else { } else {
Expand All @@ -199,17 +187,9 @@ public long currentWatermark() {


@Override @Override
public void registerProcessingTimeTimer(N namespace, long time) { public void registerProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();

if (processingTimeTimersQueue.scheduleTimer(time, (K) keyContext.getCurrentKey(), namespace)) {
// make sure we only put one timer per key into the queue
Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);
if (timerSet.add(timer)) {

InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE; long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;

processingTimeTimersQueue.add(timer);

// check if we need to re-schedule our timer to earlier // check if we need to re-schedule our timer to earlier
if (time < nextTriggerTime) { if (time < nextTriggerTime) {
if (nextTimer != null) { if (nextTimer != null) {
Expand All @@ -222,29 +202,17 @@ public void registerProcessingTimeTimer(N namespace, long time) {


@Override @Override
public void registerEventTimeTimer(N namespace, long time) { public void registerEventTimeTimer(N namespace, long time) {
InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); eventTimeTimersQueue.scheduleTimer(time, (K) keyContext.getCurrentKey(), namespace);
Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
if (timerSet.add(timer)) {
eventTimeTimersQueue.add(timer);
}
} }


@Override @Override
public void deleteProcessingTimeTimer(N namespace, long time) { public void deleteProcessingTimeTimer(N namespace, long time) {
InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); processingTimeTimersQueue.stopTimer(time, (K) keyContext.getCurrentKey(), namespace);
Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);
if (timerSet.remove(timer)) {
processingTimeTimersQueue.remove(timer);
}
} }


@Override @Override
public void deleteEventTimeTimer(N namespace, long time) { public void deleteEventTimeTimer(N namespace, long time) {
InternalTimer<K, N> timer = new InternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace); eventTimeTimersQueue.stopTimer(time, (K) keyContext.getCurrentKey(), namespace);
Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
if (timerSet.remove(timer)) {
eventTimeTimersQueue.remove(timer);
}
} }


@Override @Override
Expand All @@ -256,12 +224,7 @@ public void onProcessingTime(long time) throws Exception {
InternalTimer<K, N> timer; InternalTimer<K, N> timer;


while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

processingTimeTimersQueue.poll();
Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);

timerSet.remove(timer);
processingTimeTimersQueue.remove();

keyContext.setCurrentKey(timer.getKey()); keyContext.setCurrentKey(timer.getKey());
triggerTarget.onProcessingTime(timer); triggerTarget.onProcessingTime(timer);
} }
Expand All @@ -279,11 +242,7 @@ public void advanceWatermark(long time) throws Exception {
InternalTimer<K, N> timer; InternalTimer<K, N> timer;


while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) { while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {

eventTimeTimersQueue.poll();
Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
timerSet.remove(timer);
eventTimeTimersQueue.remove();

keyContext.setCurrentKey(timer.getKey()); keyContext.setCurrentKey(timer.getKey());
triggerTarget.onEventTime(timer); triggerTarget.onEventTime(timer);
} }
Expand All @@ -301,114 +260,37 @@ public InternalTimersSnapshot<K, N> snapshotTimersForKeyGroup(int keyGroupIdx) {
keySerializer.snapshotConfiguration(), keySerializer.snapshotConfiguration(),
namespaceSerializer, namespaceSerializer,
namespaceSerializer.snapshotConfiguration(), namespaceSerializer.snapshotConfiguration(),
getEventTimeTimerSetForKeyGroup(keyGroupIdx), eventTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx),
getProcessingTimeTimerSetForKeyGroup(keyGroupIdx)); processingTimeTimersQueue.getTimersForKeyGroup(keyGroupIdx));
} }


/** /**
* Restore the timers (both processing and event time ones) for a given {@code keyGroupIdx}. * Restore the timers (both processing and event time ones) for a given {@code keyGroupIdx}.
* *
* @param restoredTimersSnapshot the restored snapshot containing the key-group's timers, * @param restoredSnapshot the restored snapshot containing the key-group's timers,
* and the serializers that were used to write them * and the serializers that were used to write them
* @param keyGroupIdx the id of the key-group to be put in the snapshot. * @param keyGroupIdx the id of the key-group to be put in the snapshot.
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredTimersSnapshot, int keyGroupIdx) throws IOException { public void restoreTimersForKeyGroup(InternalTimersSnapshot<?, ?> restoredSnapshot, int keyGroupIdx) {
this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredTimersSnapshot; this.restoredTimersSnapshot = (InternalTimersSnapshot<K, N>) restoredSnapshot;

if ((this.keyDeserializer != null && !this.keyDeserializer.equals(restoredTimersSnapshot.getKeySerializer())) ||
(this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredTimersSnapshot.getNamespaceSerializer()))) {


if (areSnapshotSerializersIncompatible(restoredSnapshot)) {
throw new IllegalArgumentException("Tried to restore timers " + throw new IllegalArgumentException("Tried to restore timers " +
"for the same service with different serializers."); "for the same service with different serializers.");
} }


this.keyDeserializer = this.restoredTimersSnapshot.getKeySerializer(); this.keyDeserializer = restoredTimersSnapshot.getKeySerializer();
this.namespaceDeserializer = this.restoredTimersSnapshot.getNamespaceSerializer(); this.namespaceDeserializer = restoredTimersSnapshot.getNamespaceSerializer();


checkArgument(localKeyGroupRange.contains(keyGroupIdx), checkArgument(localKeyGroupRange.contains(keyGroupIdx),
"Key Group " + keyGroupIdx + " does not belong to the local range."); "Key Group " + keyGroupIdx + " does not belong to the local range.");


// restore the event time timers // restore the event time timers
Set<InternalTimer<K, N>> eventTimers = getEventTimeTimerSetForKeyGroup(keyGroupIdx); eventTimeTimersQueue.bulkAddRestoredTimers(restoredTimersSnapshot.getEventTimeTimers());
eventTimers.addAll(this.restoredTimersSnapshot.getEventTimeTimers());
eventTimeTimersQueue.addAll(this.restoredTimersSnapshot.getEventTimeTimers());


// restore the processing time timers // restore the processing time timers
Set<InternalTimer<K, N>> processingTimers = getProcessingTimeTimerSetForKeyGroup(keyGroupIdx); processingTimeTimersQueue.bulkAddRestoredTimers(restoredTimersSnapshot.getProcessingTimeTimers());
processingTimers.addAll(this.restoredTimersSnapshot.getProcessingTimeTimers());
processingTimeTimersQueue.addAll(this.restoredTimersSnapshot.getProcessingTimeTimers());
}

/**
* Retrieve the set of event time timers for the key-group this timer belongs to.
*
* @param timer the timer whose key-group we are searching.
* @return the set of registered timers for the key-group.
*/
private Set<InternalTimer<K, N>> getEventTimeTimerSetForTimer(InternalTimer<K, N> timer) {
checkArgument(localKeyGroupRange != null, "The operator has not been initialized.");
int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), this.totalKeyGroups);
return getEventTimeTimerSetForKeyGroup(keyGroupIdx);
}

/**
* Retrieve the set of event time timers for the requested key-group.
*
* @param keyGroupIdx the index of the key group we are interested in.
* @return the set of registered timers for the key-group.
*/
private Set<InternalTimer<K, N>> getEventTimeTimerSetForKeyGroup(int keyGroupIdx) {
int localIdx = getIndexForKeyGroup(keyGroupIdx);
Set<InternalTimer<K, N>> timers = eventTimeTimersByKeyGroup[localIdx];
if (timers == null) {
timers = new HashSet<>();
eventTimeTimersByKeyGroup[localIdx] = timers;
}
return timers;
}

/**
* Retrieve the set of processing time timers for the key-group this timer belongs to.
*
* @param timer the timer whose key-group we are searching.
* @return the set of registered timers for the key-group.
*/
private Set<InternalTimer<K, N>> getProcessingTimeTimerSetForTimer(InternalTimer<K, N> timer) {
checkArgument(localKeyGroupRange != null, "The operator has not been initialized.");
int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(timer.getKey(), this.totalKeyGroups);
return getProcessingTimeTimerSetForKeyGroup(keyGroupIdx);
}

/**
* Retrieve the set of processing time timers for the requested key-group.
*
* @param keyGroupIdx the index of the key group we are interested in.
* @return the set of registered timers for the key-group.
*/
private Set<InternalTimer<K, N>> getProcessingTimeTimerSetForKeyGroup(int keyGroupIdx) {
int localIdx = getIndexForKeyGroup(keyGroupIdx);
Set<InternalTimer<K, N>> timers = processingTimeTimersByKeyGroup[localIdx];
if (timers == null) {
timers = new HashSet<>();
processingTimeTimersByKeyGroup[localIdx] = timers;
}
return timers;
}

/**
* Computes the index of the requested key-group in the local datastructures.
* <li/>
* Currently we assume that each task is assigned a continuous range of key-groups,
* e.g. 1,2,3,4, and not 1,3,5. We leverage this to keep the different states by
* key-grouped in arrays instead of maps, where the offset for each key-group is
* the key-group id (an int) minus the id of the first key-group in the local range.
* This is for performance reasons.
*/
private int getIndexForKeyGroup(int keyGroupIdx) {
checkArgument(localKeyGroupRange.contains(keyGroupIdx),
"Key Group " + keyGroupIdx + " does not belong to the local range.");
return keyGroupIdx - this.localKeyGroupRangeStartIdx;
} }


public int numProcessingTimeTimers() { public int numProcessingTimeTimers() {
Expand Down Expand Up @@ -440,17 +322,22 @@ public int numEventTimeTimers(N namespace) {
} }


@VisibleForTesting @VisibleForTesting
public int getLocalKeyGroupRangeStartIdx() { int getLocalKeyGroupRangeStartIdx() {
return this.localKeyGroupRangeStartIdx; return this.localKeyGroupRangeStartIdx;
} }


@VisibleForTesting @VisibleForTesting
public Set<InternalTimer<K, N>>[] getEventTimeTimersPerKeyGroup() { List<Set<InternalTimer<K, N>>> getEventTimeTimersPerKeyGroup() {
return this.eventTimeTimersByKeyGroup; return eventTimeTimersQueue.getTimersByKeyGroup();
} }


@VisibleForTesting @VisibleForTesting
public Set<InternalTimer<K, N>>[] getProcessingTimeTimersPerKeyGroup() { List<Set<InternalTimer<K, N>>> getProcessingTimeTimersPerKeyGroup() {
return this.processingTimeTimersByKeyGroup; return processingTimeTimersQueue.getTimersByKeyGroup();
}

private boolean areSnapshotSerializersIncompatible(InternalTimersSnapshot<?, ?> restoredSnapshot) {
return (this.keyDeserializer != null && !this.keyDeserializer.equals(restoredSnapshot.getKeySerializer())) ||
(this.namespaceDeserializer != null && !this.namespaceDeserializer.equals(restoredSnapshot.getNamespaceSerializer()));
} }
} }
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.KeyGroupsList; import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
Expand All @@ -47,7 +47,7 @@
public class InternalTimeServiceManager<K, N> { public class InternalTimeServiceManager<K, N> {


private final int totalKeyGroups; private final int totalKeyGroups;
private final KeyGroupsList localKeyGroupRange; private final KeyGroupRange localKeyGroupRange;
private final KeyContext keyContext; private final KeyContext keyContext;


private final ProcessingTimeService processingTimeService; private final ProcessingTimeService processingTimeService;
Expand All @@ -56,7 +56,7 @@ public class InternalTimeServiceManager<K, N> {


InternalTimeServiceManager( InternalTimeServiceManager(
int totalKeyGroups, int totalKeyGroups,
KeyGroupsList localKeyGroupRange, KeyGroupRange localKeyGroupRange,
KeyContext keyContext, KeyContext keyContext,
ProcessingTimeService processingTimeService) { ProcessingTimeService processingTimeService) {


Expand Down

0 comments on commit ff0b9c1

Please sign in to comment.