Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -957,4 +957,10 @@ public int numEventTimeTimers() {
return timeServiceManager == null ? 0 :
timeServiceManager.numEventTimeTimers();
}

@VisibleForTesting
public int numKeysForWatermarkCallback() {
return timeServiceManager == null ? 0 :
timeServiceManager.numKeysForWatermarkCallback();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,9 @@ public int numEventTimeTimers() {
}
return count;
}

@VisibleForTesting
public int numKeysForWatermarkCallback() {
return watermarkCallbackService.numKeysForWatermarkCallback();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
Expand All @@ -38,8 +39,8 @@
* The watermark callback service allows to register a {@link OnWatermarkCallback OnWatermarkCallback}
* and multiple keys, for which the callback will be invoked every time a new {@link Watermark} is received
* (after the registration of the key).
* <p>
* <b>NOTE: </b> This service is only available to <b>keyed</b> operators.
*
* <p><b>NOTE: </b> This service is only available to <b>keyed</b> operators.
*
* @param <K> The type of key returned by the {@code KeySelector}.
*/
Expand All @@ -58,7 +59,17 @@ public class InternalWatermarkCallbackService<K> {
* An array of sets of keys keeping the registered keys split
* by the key-group they belong to. Each key-group has one set.
*/
private final Set<K>[] keysByKeygroup;
private final Set<K>[] registeredKeysByKeyGroup;

/**
* An array of sets of keys keeping the keys "to delete" split
* by the key-group they belong to. Each key-group has one set.
*
* <p>This is used to avoid potential concurrent modification
* exception when deleting keys from inside the
* {@link #invokeOnWatermarkCallback(Watermark)}.
*/
private final Set<K>[] deletedKeysByKeyGroup;

/** A serializer for the registered keys. */
private TypeSerializer<K> keySerializer;
Expand All @@ -84,7 +95,8 @@ public InternalWatermarkCallbackService(int totalKeyGroups, KeyGroupsList localK

// the list of ids of the key-groups this task is responsible for
int localKeyGroups = this.localKeyGroupRange.getNumberOfKeyGroups();
this.keysByKeygroup = new Set[localKeyGroups];
this.registeredKeysByKeyGroup = new Set[localKeyGroups];
this.deletedKeysByKeyGroup = new Set[localKeyGroups];
}

/**
Expand All @@ -110,7 +122,7 @@ public void setWatermarkCallback(OnWatermarkCallback<K> watermarkCallback, TypeS
* @param key The key to be registered.
*/
public boolean registerKeyForWatermarkCallback(K key) {
return getKeySetForKeyGroup(key).add(key);
return getRegisteredKeysForKeyGroup(key).add(key);
}

/**
Expand All @@ -119,13 +131,7 @@ public boolean registerKeyForWatermarkCallback(K key) {
* @param key The key to be unregistered.
*/
public boolean unregisterKeyFromWatermarkCallback(K key) {
Set<K> keys = getKeySetForKeyGroup(key);
boolean res = keys.remove(key);

if (keys.isEmpty()) {
removeKeySetForKey(key);
}
return res;
return getDeletedKeysForKeyGroup(key).add(key);
}

/**
Expand All @@ -134,8 +140,11 @@ public boolean unregisterKeyFromWatermarkCallback(K key) {
* @param watermark The watermark that triggered the invocation.
*/
public void invokeOnWatermarkCallback(Watermark watermark) throws IOException {
// clean up any keys registered for deletion before calling the callback
cleanupRegisteredKeys();

if (callback != null) {
for (Set<K> keySet : keysByKeygroup) {
for (Set<K> keySet : registeredKeysByKeyGroup) {
if (keySet != null) {
for (K key : keySet) {
keyContext.setCurrentKey(key);
Expand All @@ -146,16 +155,39 @@ public void invokeOnWatermarkCallback(Watermark watermark) throws IOException {
}
}

/**
* Does the actual deletion of any keys registered for deletion using the
* {@link #unregisterKeyFromWatermarkCallback(Object)}.
*/
private void cleanupRegisteredKeys() {
for (int keyGroupIdx = 0; keyGroupIdx < registeredKeysByKeyGroup.length; keyGroupIdx++) {

Set<K> deletedKeys = deletedKeysByKeyGroup[keyGroupIdx];
if (deletedKeys != null) {

Set<K> registeredKeys = registeredKeysByKeyGroup[keyGroupIdx];
if (registeredKeys != null) {

registeredKeys.removeAll(deletedKeys);
if (registeredKeys.isEmpty()) {
registeredKeysByKeyGroup[keyGroupIdx] = null;
}
}
deletedKeysByKeyGroup[keyGroupIdx] = null;
}
}
}

/**
* Retrieve the set of keys for the key-group this key belongs to.
*
* @param key the key whose key-group we are searching.
* @return the set of registered keys for the key-group.
*/
private Set<K> getKeySetForKeyGroup(K key) {
private Set<K> getRegisteredKeysForKeyGroup(K key) {
checkArgument(localKeyGroupRange != null, "The operator has not been initialized.");
int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups);
return getKeySetForKeyGroup(keyGroupIdx);
return getRegisteredKeysForKeyGroup(keyGroupIdx);
}

/**
Expand All @@ -164,27 +196,36 @@ private Set<K> getKeySetForKeyGroup(K key) {
* @param keyGroupIdx the index of the key group we are interested in.
* @return the set of keys for the key-group.
*/
private Set<K> getKeySetForKeyGroup(int keyGroupIdx) {
private Set<K> getRegisteredKeysForKeyGroup(int keyGroupIdx) {
int localIdx = getIndexForKeyGroup(keyGroupIdx);
Set<K> keys = keysByKeygroup[localIdx];
Set<K> keys = registeredKeysByKeyGroup[localIdx];
if (keys == null) {
keys = new HashSet<>();
keysByKeygroup[localIdx] = keys;
registeredKeysByKeyGroup[localIdx] = keys;
}
return keys;
}

private void removeKeySetForKey(K key) {
private Set<K> getDeletedKeysForKeyGroup(K key) {
checkArgument(localKeyGroupRange != null, "The operator has not been initialized.");
int keyGroupIdx = KeyGroupRangeAssignment.assignToKeyGroup(key, totalKeyGroups);
int localKeyGroupIdx = getIndexForKeyGroup(keyGroupIdx);
keysByKeygroup[localKeyGroupIdx] = null;
return getDeletedKeysForKeyGroup(keyGroupIdx);
}

private Set<K> getDeletedKeysForKeyGroup(int keyGroupIdx) {
int localIdx = getIndexForKeyGroup(keyGroupIdx);
Set<K> keys = deletedKeysByKeyGroup[localIdx];
if (keys == null) {
keys = new HashSet<>();
deletedKeysByKeyGroup[localIdx] = keys;
}
return keys;
}

/**
* Computes the index of the requested key-group in the local datastructures.
* <li/>
* Currently we assume that each task is assigned a continuous range of key-groups,
*
* <p>Currently we assume that each task is assigned a continuous range of key-groups,
* e.g. 1,2,3,4, and not 1,3,5. We leverage this to keep the different states
* key-grouped in arrays instead of maps, where the offset for each key-group is
* the key-group id (an int) minus the id of the first key-group in the local range.
Expand All @@ -199,7 +240,11 @@ private int getIndexForKeyGroup(int keyGroupIdx) {
////////////////// Fault Tolerance Methods ///////////////////

public void snapshotKeysForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception {
Set<K> keySet = getKeySetForKeyGroup(keyGroupIdx);

// we cleanup also here to avoid checkpointing the deletion set
cleanupRegisteredKeys();

Set<K> keySet = getRegisteredKeysForKeyGroup(keyGroupIdx);
if (keySet != null) {
stream.writeInt(keySet.size());

Expand All @@ -224,16 +269,29 @@ public void restoreKeysForKeyGroup(DataInputViewStreamWrapper stream, int keyGro
TypeSerializer<K> tmpKeyDeserializer = InstantiationUtil.deserializeObject(stream, userCodeClassLoader);

if (keySerializer != null && !keySerializer.equals(tmpKeyDeserializer)) {
throw new IllegalArgumentException("Tried to restore timers " +
"for the same service with different serializers.");
throw new IllegalArgumentException("Tried to restore keys " +
"for the watermark callback service with mismatching serializers.");
}

this.keySerializer = tmpKeyDeserializer;

Set<K> keys = getKeySetForKeyGroup(keyGroupIdx);
Set<K> keys = getRegisteredKeysForKeyGroup(keyGroupIdx);
for (int i = 0; i < numKeys; i++) {
keys.add(keySerializer.deserialize(stream));
}
}
}

////////////////// Testing Methods ///////////////////

@VisibleForTesting
public int numKeysForWatermarkCallback() {
int count = 0;
for (Set<K> keyGroup: registeredKeysByKeyGroup) {
if (keyGroup != null) {
count += keyGroup.size();
}
}
return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,78 @@ public void testWatermarkCallbackServiceScalingDown() throws Exception {
testHarness5.close();
}

@Test
public void testWatermarkCallbackServiceKeyDeletion() throws Exception {
final int MAX_PARALLELISM = 10;

Tuple2<Integer, String> element1 = new Tuple2<>(7, "start");
Tuple2<Integer, String> element2 = new Tuple2<>(45, "start");
Tuple2<Integer, String> element3 = new Tuple2<>(90, "start");

TestOperatorWithDeletingKeyCallback op = new TestOperatorWithDeletingKeyCallback(45);

KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> testHarness1 =
new KeyedOneInputStreamOperatorTestHarness<>(
op,
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
MAX_PARALLELISM,
1,
0);
testHarness1.open();

testHarness1.processElement(new StreamRecord<>(element1));
testHarness1.processElement(new StreamRecord<>(element2));

testHarness1.processWatermark(10L);

assertEquals(3L, testHarness1.getOutput().size());
verifyElement(testHarness1.getOutput().poll(), 7);
verifyElement(testHarness1.getOutput().poll(), 45);
verifyWatermark(testHarness1.getOutput().poll(), 10);

testHarness1.processElement(new StreamRecord<>(element3));
testHarness1.processWatermark(20L);

// because at the first watermark the operator removed key 45
assertEquals(3L, testHarness1.getOutput().size());
verifyElement(testHarness1.getOutput().poll(), 7);
verifyElement(testHarness1.getOutput().poll(), 90);
verifyWatermark(testHarness1.getOutput().poll(), 20);

testHarness1.processWatermark(25L);

verifyElement(testHarness1.getOutput().poll(), 7);
verifyElement(testHarness1.getOutput().poll(), 90);
verifyWatermark(testHarness1.getOutput().poll(), 25);

// unregister key and then fail
op.unregisterKey(90);

// take a snapshot with some elements in internal sorting queue
OperatorStateHandles snapshot = testHarness1.snapshot(0, 0);
testHarness1.close();

testHarness1 = new KeyedOneInputStreamOperatorTestHarness<>(
new TestOperatorWithDeletingKeyCallback(45),
new TestKeySelector(),
BasicTypeInfo.INT_TYPE_INFO,
MAX_PARALLELISM,
1,
0);
testHarness1.setup();
testHarness1.initializeState(snapshot);
testHarness1.open();

testHarness1.processWatermark(30L);

assertEquals(2L, testHarness1.getOutput().size());
verifyElement(testHarness1.getOutput().poll(), 7);
verifyWatermark(testHarness1.getOutput().poll(), 30);

testHarness1.close();
}

private KeyedOneInputStreamOperatorTestHarness<Integer, Tuple2<Integer, String>, Integer> getTestHarness(
int maxParallelism, int noOfTasks, int taskIdx) throws Exception {

Expand Down Expand Up @@ -868,6 +940,51 @@ public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws
}
}

private static class TestOperatorWithDeletingKeyCallback
extends AbstractStreamOperator<Integer>
implements OneInputStreamOperator<Tuple2<Integer, String>, Integer> {

private static final long serialVersionUID = 9215057823264582305L;

private final int keyToDelete;

public TestOperatorWithDeletingKeyCallback(int keyToDelete) {
this.keyToDelete = keyToDelete;
}

@Override
public void open() throws Exception {
super.open();

InternalWatermarkCallbackService<Integer> callbackService = getInternalWatermarkCallbackService();

callbackService.setWatermarkCallback(new OnWatermarkCallback<Integer>() {

@Override
public void onWatermark(Integer integer, Watermark watermark) throws IOException {

// this is to simulate the case where we may have a concurrent modification
// exception as we iterate over the list of registered keys and we concurrently
// delete the key.

if (integer.equals(keyToDelete)) {
getInternalWatermarkCallbackService().unregisterKeyFromWatermarkCallback(integer);
}
output.collect(new StreamRecord<>(integer));
}
}, IntSerializer.INSTANCE);
}

@Override
public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(element.getValue().f0);
}

public void unregisterKey(int key) {
getInternalWatermarkCallbackService().unregisterKeyFromWatermarkCallback(key);
}
}

/**
* Testing operator that can respond to commands by either setting/deleting state, emitting
* state or setting timers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,15 @@ public int numEventTimeTimers() {
}
}

@VisibleForTesting
public int numKeysForWatermarkCallback() {
if (operator instanceof AbstractStreamOperator) {
return ((AbstractStreamOperator) operator).numKeysForWatermarkCallback();
} else {
throw new UnsupportedOperationException();
}
}

private class MockOutput implements Output<StreamRecord<OUT>> {

private TypeSerializer<OUT> outputSerializer;
Expand Down