From e129cf0a63000aac3af651095cb963ac8b179af7 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Tue, 20 Sep 2016 16:36:16 +0200 Subject: [PATCH] [FLINK-4556] Make Queryable State Key-Group Aware --- .../flink/runtime/query/KvStateLocation.java | 47 +++++----- .../query/KvStateLocationRegistry.java | 16 ++-- .../flink/runtime/query/KvStateMessage.java | 35 ++++---- .../flink/runtime/query/KvStateRegistry.java | 12 +-- .../query/KvStateRegistryListener.java | 9 +- .../runtime/query/QueryableStateClient.java | 4 +- .../runtime/query/TaskKvStateRegistry.java | 17 ++-- .../state/KeyGroupRangeAssignment.java | 15 +++- .../runtime/state/KeyedStateBackend.java | 8 +- .../ActorGatewayKvStateRegistryListener.java | 9 +- .../flink/runtime/jobmanager/JobManager.scala | 4 +- .../runtime/jobmanager/JobManagerTest.java | 20 +++-- .../query/KvStateLocationRegistryTest.java | 35 ++++---- .../runtime/query/KvStateLocationTest.java | 85 +++++++++++++------ .../query/QueryableStateClientTest.java | 10 +-- .../query/netty/KvStateClientTest.java | 2 +- .../query/netty/KvStateServerHandlerTest.java | 10 +-- .../runtime/state/StateBackendTestBase.java | 9 +- .../test/query/QueryableStateITCase.java | 1 - 19 files changed, 207 insertions(+), 141 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java index 9be22c2458daf..90bb2a5ae8881 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocation.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; import org.apache.flink.util.Preconditions; @@ -160,44 +161,50 @@ public KvStateServerAddress getKvStateServerAddress(int keyGroupIndex) { /** * Registers a KvState instance for the given key group index. * - * @param keyGroupIndex Key group index to register + * @param keyGroupRange Key group range to register * @param kvStateId ID of the KvState instance at the key group index. * @param kvStateAddress Server address of the KvState instance at the key group index. - * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups + * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups */ - void registerKvState(int keyGroupIndex, KvStateID kvStateId, KvStateServerAddress kvStateAddress) { - if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) { + void registerKvState(KeyGroupRange keyGroupRange, KvStateID kvStateId, KvStateServerAddress kvStateAddress) { + + if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) { throw new IndexOutOfBoundsException("Key group index"); } - if (kvStateIds[keyGroupIndex] == null && kvStateAddresses[keyGroupIndex] == null) { - numRegisteredKeyGroups++; - } + for (int kgIdx = keyGroupRange.getStartKeyGroup(); kgIdx <= keyGroupRange.getEndKeyGroup(); ++kgIdx) { + + if (kvStateIds[kgIdx] == null && kvStateAddresses[kgIdx] == null) { + numRegisteredKeyGroups++; + } - kvStateIds[keyGroupIndex] = kvStateId; - kvStateAddresses[keyGroupIndex] = kvStateAddress; + kvStateIds[kgIdx] = kvStateId; + kvStateAddresses[kgIdx] = kvStateAddress; + } } /** * Registers a KvState instance for the given key group index. * - * @param keyGroupIndex Key group index to unregister. - * @throws IndexOutOfBoundsException If key group index < 0 or >= Number of key groups - * @throws IllegalArgumentException If no location information registered for key group index. + * @param keyGroupRange Key group range to unregister. + * @throws IndexOutOfBoundsException If key group range start < 0 or key group range end >= Number of key groups + * @throws IllegalArgumentException If no location information registered for a key group index in the range. */ - void unregisterKvState(int keyGroupIndex) { - if (keyGroupIndex < 0 || keyGroupIndex >= numKeyGroups) { + void unregisterKvState(KeyGroupRange keyGroupRange) { + if (keyGroupRange.getStartKeyGroup() < 0 || keyGroupRange.getEndKeyGroup() >= numKeyGroups) { throw new IndexOutOfBoundsException("Key group index"); } - if (kvStateIds[keyGroupIndex] == null || kvStateAddresses[keyGroupIndex] == null) { - throw new IllegalArgumentException("Not registered. Probably registration/unregistration race."); - } + for (int kgIdx = keyGroupRange.getStartKeyGroup(); kgIdx <= keyGroupRange.getEndKeyGroup(); ++kgIdx) { + if (kvStateIds[kgIdx] == null || kvStateAddresses[kgIdx] == null) { + throw new IllegalArgumentException("Not registered. Probably registration/unregistration race."); + } - numRegisteredKeyGroups--; + numRegisteredKeyGroups--; - kvStateIds[keyGroupIndex] = null; - kvStateAddresses[keyGroupIndex] = null; + kvStateIds[kgIdx] = null; + kvStateAddresses[kgIdx] = null; + } } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java index 5b7659870df26..c48902504c308 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateLocationRegistry.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.execution.SuppressRestartsException; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; import org.apache.flink.util.Preconditions; @@ -73,7 +74,7 @@ public KvStateLocation getKvStateLocation(String registrationName) { * Notifies the registry about a registered KvState instance. * * @param jobVertexId JobVertexID the KvState instance belongs to - * @param keyGroupIndex Key group index the KvState instance belongs to + * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName Name under which the KvState has been registered * @param kvStateId ID of the registered KvState instance * @param kvStateServerAddress Server address where to find the KvState instance @@ -85,7 +86,7 @@ public KvStateLocation getKvStateLocation(String registrationName) { */ public void notifyKvStateRegistered( JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, KvStateServerAddress kvStateServerAddress) { @@ -97,7 +98,7 @@ public void notifyKvStateRegistered( ExecutionJobVertex vertex = jobVertices.get(jobVertexId); if (vertex != null) { - int parallelism = vertex.getParallelism(); + int parallelism = vertex.getMaxParallelism(); location = new KvStateLocation(jobId, jobVertexId, parallelism, registrationName); lookupTable.put(registrationName, location); } else { @@ -119,22 +120,21 @@ public void notifyKvStateRegistered( throw duplicate; } - - location.registerKvState(keyGroupIndex, kvStateId, kvStateServerAddress); + location.registerKvState(keyGroupRange, kvStateId, kvStateServerAddress); } /** * Notifies the registry about an unregistered KvState instance. * * @param jobVertexId JobVertexID the KvState instance belongs to - * @param keyGroupIndex Key group index the KvState instance belongs to + * @param keyGroupRange Key group index the KvState instance belongs to * @param registrationName Name under which the KvState has been registered * @throws IllegalArgumentException If another operator registered the state instance * @throws IllegalArgumentException If the registration name is not known */ public void notifyKvStateUnregistered( JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName) { KvStateLocation location = lookupTable.get(registrationName); @@ -147,7 +147,7 @@ public void notifyKvStateUnregistered( "under '" + registrationName + "'."); } - location.unregisterKvState(keyGroupIndex); + location.unregisterKvState(keyGroupRange); if (location.getNumRegisteredKeyGroups() == 0) { lookupTable.remove(registrationName); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java index 5e3c38e983bdd..857b8b304a743 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateMessage.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; import org.apache.flink.util.Preconditions; @@ -97,8 +98,8 @@ class NotifyKvStateRegistered implements KvStateMessage { /** JobVertexID the KvState instance belongs to. */ private final JobVertexID jobVertexId; - /** Key group index the KvState instance belongs to. */ - private final int keyGroupIndex; + /** Key group range the KvState instance belongs to. */ + private final KeyGroupRange keyGroupRange; /** Name under which the KvState has been registered. */ private final String registrationName; @@ -114,7 +115,7 @@ class NotifyKvStateRegistered implements KvStateMessage { * * @param jobId JobID the KvState instance belongs to * @param jobVertexId JobVertexID the KvState instance belongs to - * @param keyGroupIndex Key group index the KvState instance belongs to + * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName Name under which the KvState has been registered * @param kvStateId ID of the registered KvState instance * @param kvStateServerAddress Server address where to find the KvState instance @@ -122,15 +123,15 @@ class NotifyKvStateRegistered implements KvStateMessage { public NotifyKvStateRegistered( JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, KvStateServerAddress kvStateServerAddress) { this.jobId = Preconditions.checkNotNull(jobId, "JobID"); this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID"); - Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index"); - this.keyGroupIndex = keyGroupIndex; + Preconditions.checkArgument(keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP); + this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name"); this.kvStateId = Preconditions.checkNotNull(kvStateId, "KvStateID"); this.kvStateServerAddress = Preconditions.checkNotNull(kvStateServerAddress, "KvStateServerAddress"); @@ -159,8 +160,8 @@ public JobVertexID getJobVertexId() { * * @return Key group index the KvState instance belongs to */ - public int getKeyGroupIndex() { - return keyGroupIndex; + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; } /** @@ -195,7 +196,7 @@ public String toString() { return "NotifyKvStateRegistered{" + "jobId=" + jobId + ", jobVertexId=" + jobVertexId + - ", keyGroupIndex=" + keyGroupIndex + + ", keyGroupRange=" + keyGroupRange + ", registrationName='" + registrationName + '\'' + ", kvStateId=" + kvStateId + ", kvStateServerAddress=" + kvStateServerAddress + @@ -214,7 +215,7 @@ class NotifyKvStateUnregistered implements KvStateMessage { private final JobVertexID jobVertexId; /** Key group index the KvState instance belongs to. */ - private final int keyGroupIndex; + private final KeyGroupRange keyGroupRange; /** Name under which the KvState has been registered. */ private final String registrationName; @@ -224,19 +225,19 @@ class NotifyKvStateUnregistered implements KvStateMessage { * * @param jobId JobID the KvState instance belongs to * @param jobVertexId JobVertexID the KvState instance belongs to - * @param keyGroupIndex Key group index the KvState instance belongs to + * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName Name under which the KvState has been registered */ public NotifyKvStateUnregistered( JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName) { this.jobId = Preconditions.checkNotNull(jobId, "JobID"); this.jobVertexId = Preconditions.checkNotNull(jobVertexId, "JobVertexID"); - Preconditions.checkArgument(keyGroupIndex >= 0, "Negative key group index"); - this.keyGroupIndex = keyGroupIndex; + Preconditions.checkArgument(keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP); + this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange); this.registrationName = Preconditions.checkNotNull(registrationName, "Registration name"); } @@ -263,8 +264,8 @@ public JobVertexID getJobVertexId() { * * @return Key group index the KvState instance belongs to */ - public int getKeyGroupIndex() { - return keyGroupIndex; + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; } /** @@ -281,7 +282,7 @@ public String toString() { return "NotifyKvStateUnregistered{" + "jobId=" + jobId + ", jobVertexId=" + jobVertexId + - ", keyGroupIndex=" + keyGroupIndex + + ", keyGroupRange=" + keyGroupRange + ", registrationName='" + registrationName + '\'' + '}'; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java index f19c123b1080b..f57ae47e516c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.netty.KvStateServer; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; import org.apache.flink.runtime.taskmanager.Task; @@ -80,7 +81,7 @@ public void unregisterListener() { * * @param jobId JobId the KvState instance belongs to * @param jobVertexId JobVertexID the KvState instance belongs to - * @param keyGroupIndex Key group index the KvState instance belongs to + * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName Name under which the KvState is registered * @param kvState KvState instance to be registered * @return Assigned KvStateID @@ -88,7 +89,7 @@ public void unregisterListener() { public KvStateID registerKvState( JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName, KvState kvState) { @@ -100,7 +101,7 @@ public KvStateID registerKvState( listener.notifyKvStateRegistered( jobId, jobVertexId, - keyGroupIndex, + keyGroupRange, registrationName, kvStateId); } @@ -116,11 +117,12 @@ public KvStateID registerKvState( * * @param jobId JobId the KvState instance belongs to * @param kvStateId KvStateID to identify the KvState instance + * @param keyGroupRange Key group range the KvState instance belongs to */ public void unregisterKvState( JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) { @@ -130,7 +132,7 @@ public void unregisterKvState( listener.notifyKvStateUnregistered( jobId, jobVertexId, - keyGroupIndex, + keyGroupRange, registrationName); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java index 760adf16d7811..29bee9a92c9a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistryListener.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; /** * A listener for a {@link KvStateRegistry}. @@ -34,14 +35,14 @@ public interface KvStateRegistryListener { * * @param jobId Job ID the KvState instance belongs to * @param jobVertexId JobVertexID the KvState instance belongs to - * @param keyGroupIndex Key group index the KvState instance belongs to + * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName Name under which the KvState is registered * @param kvStateId ID of the KvState instance */ void notifyKvStateRegistered( JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId); @@ -50,13 +51,13 @@ void notifyKvStateRegistered( * * @param jobId Job ID the KvState instance belongs to * @param jobVertexId JobVertexID the KvState instance belongs to - * @param keyGroupIndex Key group index the KvState instance belongs to + * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName Name under which the KvState is registered */ void notifyKvStateUnregistered( JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java index 0e1ea57f339d5..591c67d24286e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java @@ -36,8 +36,8 @@ import org.apache.flink.runtime.query.netty.KvStateServer; import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace; import org.apache.flink.runtime.query.netty.UnknownKvStateID; +import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.util.LeaderRetrievalUtils; -import org.apache.flink.util.MathUtils; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -296,7 +296,7 @@ private Future getKvState( .flatMap(new Mapper>() { @Override public Future apply(KvStateLocation lookup) { - int keyGroupIndex = MathUtils.murmurHash(keyHashCode) % lookup.getNumKeyGroups(); + int keyGroupIndex = KeyGroupRangeAssignment.computeKeyGroupForKeyHash(keyHashCode, lookup.getNumKeyGroups()); KvStateServerAddress serverAddress = lookup.getKvStateServerAddress(keyGroupIndex); if (serverAddress == null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java index b5c09aa1009f8..d83121432e081 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/TaskKvStateRegistry.java @@ -20,6 +20,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KvState; import org.apache.flink.util.Preconditions; @@ -52,15 +53,15 @@ public class TaskKvStateRegistry { /** * Registers the KvState instance at the KvStateRegistry. * - * @param keyGroupIndex KeyGroupIndex the KvState instance belongs to + * @param keyGroupRange Key group range the KvState instance belongs to * @param registrationName The registration name (not necessarily the same * as the KvState name defined in the state * descriptor used to create the KvState instance) * @param kvState The */ - public void registerKvState(int keyGroupIndex, String registrationName, KvState kvState) { - KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupIndex, registrationName, kvState); - registeredKvStates.add(new KvStateInfo(keyGroupIndex, registrationName, kvStateId)); + public void registerKvState(KeyGroupRange keyGroupRange, String registrationName, KvState kvState) { + KvStateID kvStateId = registry.registerKvState(jobId, jobVertexId, keyGroupRange, registrationName, kvState); + registeredKvStates.add(new KvStateInfo(keyGroupRange, registrationName, kvStateId)); } /** @@ -68,7 +69,7 @@ public void registerKvState(int keyGroupIndex, String registrationName, KvState< */ public void unregisterAll() { for (KvStateInfo kvState : registeredKvStates) { - registry.unregisterKvState(jobId, jobVertexId, kvState.keyGroupIndex, kvState.registrationName, kvState.kvStateId); + registry.unregisterKvState(jobId, jobVertexId, kvState.keyGroupRange, kvState.registrationName, kvState.kvStateId); } } @@ -77,14 +78,14 @@ public void unregisterAll() { */ private static class KvStateInfo { - private final int keyGroupIndex; + private final KeyGroupRange keyGroupRange; private final String registrationName; private final KvStateID kvStateId; - public KvStateInfo(int keyGroupIndex, String registrationName, KvStateID kvStateId) { - this.keyGroupIndex = keyGroupIndex; + public KvStateInfo(KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) { + this.keyGroupRange = keyGroupRange; this.registrationName = registrationName; this.kvStateId = kvStateId; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java index eceb6f4cfcde4..894f721acf482 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupRangeAssignment.java @@ -48,8 +48,19 @@ public static int assignKeyToParallelOperator(Object key, int maxParallelism, in * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. * @return the key-group to which the given key is assigned */ - public static final int assignToKeyGroup(Object key, int maxParallelism) { - return MathUtils.murmurHash(key.hashCode()) % maxParallelism; + public static int assignToKeyGroup(Object key, int maxParallelism) { + return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); + } + + /** + * Assigns the given key to a key-group index. + * + * @param keyHash the hash of the key to assign + * @param maxParallelism the maximum supported parallelism, aka the number of key-groups. + * @return the key-group to which the given key is assigned + */ + public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { + return MathUtils.murmurHash(keyHash) % maxParallelism; } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java index bf9018e911c63..74731ee67687d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateBackend.java @@ -265,8 +265,7 @@ public FoldingState createFoldingState(FoldingStateDescriptor snapshot( long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception; + + + public KeyGroupRange getKeyGroupRange() { + return keyGroupRange; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java index 2d6993856d11b..4404867acfa11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayKvStateRegistryListener.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.KvStateRegistryListener; import org.apache.flink.runtime.query.KvStateServerAddress; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.util.Preconditions; /** @@ -49,14 +50,14 @@ public ActorGatewayKvStateRegistryListener( public void notifyKvStateRegistered( JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) { Object msg = new KvStateMessage.NotifyKvStateRegistered( jobId, jobVertexId, - keyGroupIndex, + keyGroupRange, registrationName, kvStateId, kvStateServerAddress); @@ -68,13 +69,13 @@ public void notifyKvStateRegistered( public void notifyKvStateUnregistered( JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName) { Object msg = new KvStateMessage.NotifyKvStateUnregistered( jobId, jobVertexId, - keyGroupIndex, + keyGroupRange, registrationName); jobManager.tell(msg); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 9c844baa0add8..914cc2ce2d0a7 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -1500,7 +1500,7 @@ class JobManager( graph.getKvStateLocationRegistry.notifyKvStateRegistered( msg.getJobVertexId, - msg.getKeyGroupIndex, + msg.getKeyGroupRange, msg.getRegistrationName, msg.getKvStateId, msg.getKvStateServerAddress) @@ -1519,7 +1519,7 @@ class JobManager( try { graph.getKvStateLocationRegistry.notifyKvStateUnregistered( msg.getJobVertexId, - msg.getKeyGroupIndex, + msg.getKeyGroupRange, msg.getRegistrationName) } catch { case t: Throwable => diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java index f925d628ca338..d731b95d9974a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java @@ -58,6 +58,7 @@ import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateUnregistered; import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManager; @@ -467,7 +468,7 @@ public void testKvStateMessages() throws Exception { NotifyKvStateRegistered registerNonExistingJob = new NotifyKvStateRegistered( new JobID(), new JobVertexID(), - 0, + new KeyGroupRange(0, 0), "any-name", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1233)); @@ -492,7 +493,7 @@ public void testKvStateMessages() throws Exception { NotifyKvStateRegistered registerForExistingJob = new NotifyKvStateRegistered( jobGraph.getJobID(), jobVertex1.getID(), - 0, + new KeyGroupRange(0, 0), "register-me", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1293)); @@ -512,11 +513,12 @@ public void testKvStateMessages() throws Exception { assertEquals(jobGraph.getJobID(), location.getJobId()); assertEquals(jobVertex1.getID(), location.getJobVertexId()); - assertEquals(jobVertex1.getParallelism(), location.getNumKeyGroups()); + assertEquals(jobVertex1.getMaxParallelism(), location.getNumKeyGroups()); assertEquals(1, location.getNumRegisteredKeyGroups()); - int keyGroupIndex = registerForExistingJob.getKeyGroupIndex(); - assertEquals(registerForExistingJob.getKvStateId(), location.getKvStateID(keyGroupIndex)); - assertEquals(registerForExistingJob.getKvStateServerAddress(), location.getKvStateServerAddress(keyGroupIndex)); + KeyGroupRange keyGroupRange = registerForExistingJob.getKeyGroupRange(); + assertEquals(1, keyGroupRange.getNumberOfKeyGroups()); + assertEquals(registerForExistingJob.getKvStateId(), location.getKvStateID(keyGroupRange.getStartKeyGroup())); + assertEquals(registerForExistingJob.getKvStateServerAddress(), location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup())); // // Unregistration @@ -524,7 +526,7 @@ public void testKvStateMessages() throws Exception { NotifyKvStateUnregistered unregister = new NotifyKvStateUnregistered( registerForExistingJob.getJobId(), registerForExistingJob.getJobVertexId(), - registerForExistingJob.getKeyGroupIndex(), + registerForExistingJob.getKeyGroupRange(), registerForExistingJob.getRegistrationName()); jobManager.tell(unregister); @@ -546,7 +548,7 @@ public void testKvStateMessages() throws Exception { NotifyKvStateRegistered register = new NotifyKvStateRegistered( jobGraph.getJobID(), jobVertex1.getID(), - 0, + new KeyGroupRange(0, 0), "duplicate-me", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1293)); @@ -554,7 +556,7 @@ public void testKvStateMessages() throws Exception { NotifyKvStateRegistered duplicate = new NotifyKvStateRegistered( jobGraph.getJobID(), jobVertex2.getID(), // <--- different operator, but... - 0, + new KeyGroupRange(0, 0), "duplicate-me", // ...same name new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1293)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java index 70f0ba29fd1ba..f8005a4279406 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationRegistryTest.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; import org.junit.Test; import java.net.InetAddress; @@ -52,8 +53,8 @@ public void testRegisterAndLookup() throws Exception { // IDs for each key group of each vertex KvStateID[][] ids = new KvStateID[vertices.length][]; for (int i = 0; i < ids.length; i++) { - ids[i] = new KvStateID[vertices[i].getParallelism()]; - for (int j = 0; j < vertices[i].getParallelism(); j++) { + ids[i] = new KvStateID[vertices[i].getMaxParallelism()]; + for (int j = 0; j < vertices[i].getMaxParallelism(); j++) { ids[i][j] = new KvStateID(); } } @@ -66,12 +67,12 @@ public void testRegisterAndLookup() throws Exception { // Register for (int i = 0; i < vertices.length; i++) { - int numKeyGroups = vertices[i].getParallelism(); + int numKeyGroups = vertices[i].getMaxParallelism(); for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) { // Register registry.notifyKvStateRegistered( vertices[i].getJobVertexId(), - keyGroupIndex, + new KeyGroupRange(keyGroupIndex, keyGroupIndex), registrationNames[i], ids[i][keyGroupIndex], server); @@ -83,8 +84,8 @@ public void testRegisterAndLookup() throws Exception { KvStateLocation location = registry.getKvStateLocation(registrationNames[i]); assertNotNull(location); - int parallelism = vertices[i].getParallelism(); - for (int keyGroupIndex = 0; keyGroupIndex < parallelism; keyGroupIndex++) { + int maxParallelism = vertices[i].getMaxParallelism(); + for (int keyGroupIndex = 0; keyGroupIndex < maxParallelism; keyGroupIndex++) { assertEquals(ids[i][keyGroupIndex], location.getKvStateID(keyGroupIndex)); assertEquals(server, location.getKvStateServerAddress(keyGroupIndex)); } @@ -92,10 +93,10 @@ public void testRegisterAndLookup() throws Exception { // Unregister for (int i = 0; i < vertices.length; i++) { - int numKeyGroups = vertices[i].getParallelism(); + int numKeyGroups = vertices[i].getMaxParallelism(); JobVertexID jobVertexId = vertices[i].getJobVertexId(); for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) { - registry.notifyKvStateUnregistered(jobVertexId, keyGroupIndex, registrationNames[i]); + registry.notifyKvStateUnregistered(jobVertexId, new KeyGroupRange(keyGroupIndex, keyGroupIndex), registrationNames[i]); } } @@ -121,7 +122,7 @@ public void testRegisterDuplicateName() throws Exception { // First operator registers registry.notifyKvStateRegistered( vertices[0].getJobVertexId(), - 0, + new KeyGroupRange(0, 0), registrationName, new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 12328)); @@ -130,7 +131,7 @@ public void testRegisterDuplicateName() throws Exception { // Second operator registers same name registry.notifyKvStateRegistered( vertices[1].getJobVertexId(), - 0, + new KeyGroupRange(0, 0), registrationName, new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 12032)); @@ -151,7 +152,7 @@ public void testUnregisterBeforeRegister() throws Exception { KvStateLocationRegistry registry = new KvStateLocationRegistry(new JobID(), vertexMap); try { - registry.notifyKvStateUnregistered(vertex.getJobVertexId(), 0, "any-name"); + registry.notifyKvStateUnregistered(vertex.getJobVertexId(), new KeyGroupRange(0, 0), "any-name"); fail("Did not throw expected Exception, because of missing registration"); } catch (IllegalArgumentException ignored) { // Expected @@ -179,7 +180,7 @@ public void testUnregisterFailures() throws Exception { // First operator registers name registry.notifyKvStateRegistered( vertices[0].getJobVertexId(), - 0, + new KeyGroupRange(0, 0), name, new KvStateID(), mock(KvStateServerAddress.class)); @@ -190,7 +191,7 @@ public void testUnregisterFailures() throws Exception { registry.notifyKvStateUnregistered( vertices[0].getJobVertexId(), - notRegisteredKeyGroupIndex, + new KeyGroupRange(notRegisteredKeyGroupIndex, notRegisteredKeyGroupIndex), name); fail("Did not throw expected Exception"); @@ -201,7 +202,7 @@ public void testUnregisterFailures() throws Exception { // Wrong operator tries to unregister registry.notifyKvStateUnregistered( vertices[1].getJobVertexId(), - 0, + new KeyGroupRange(0, 0), name); fail("Did not throw expected Exception"); @@ -210,13 +211,13 @@ public void testUnregisterFailures() throws Exception { } // ------------------------------------------------------------------------ - - private ExecutionJobVertex createJobVertex(int parallelism) { + + private ExecutionJobVertex createJobVertex(int maxParallelism) { JobVertexID id = new JobVertexID(); ExecutionJobVertex vertex = mock(ExecutionJobVertex.class); when(vertex.getJobVertexId()).thenReturn(id); - when(vertex.getParallelism()).thenReturn(parallelism); + when(vertex.getMaxParallelism()).thenReturn(maxParallelism); return vertex; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java index 59ac575002344..ed51f624aa5d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/KvStateLocationTest.java @@ -20,9 +20,12 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.state.KeyGroupRange; import org.junit.Test; import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; import static org.junit.Assert.assertEquals; @@ -36,55 +39,87 @@ public void testRegisterAndLookup() throws Exception { JobID jobId = new JobID(); JobVertexID jobVertexId = new JobVertexID(); int numKeyGroups = 123; + int numRanges = 10; + int fract = numKeyGroups / numRanges; + int remain = numKeyGroups % numRanges; + List keyGroupRanges = new ArrayList<>(numRanges); + + int start = 0; + for (int i = 0; i < numRanges; ++i) { + int end = start + fract - 1; + if(remain > 0) { + --remain; + ++end; + } + KeyGroupRange range = new KeyGroupRange(start, end); + keyGroupRanges.add(range); + start = end + 1; + } + + System.out.println(keyGroupRanges); + String registrationName = "asdasdasdasd"; KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numKeyGroups, registrationName); - KvStateID[] kvStateIds = new KvStateID[numKeyGroups]; - KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numKeyGroups]; + KvStateID[] kvStateIds = new KvStateID[numRanges]; + KvStateServerAddress[] serverAddresses = new KvStateServerAddress[numRanges]; InetAddress host = InetAddress.getLocalHost(); // Register - for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) { - kvStateIds[keyGroupIndex] = new KvStateID(); - serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex); - - location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]); - assertEquals(keyGroupIndex + 1, location.getNumRegisteredKeyGroups()); + int registeredCount = 0; + for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) { + kvStateIds[rangeIdx] = new KvStateID(); + serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx); + KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx); + location.registerKvState(keyGroupRange, kvStateIds[rangeIdx], serverAddresses[rangeIdx]); + registeredCount += keyGroupRange.getNumberOfKeyGroups(); + assertEquals(registeredCount, location.getNumRegisteredKeyGroups()); } // Lookup - for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) { - assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex)); - assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex)); + for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) { + KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx); + for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) { + assertEquals(kvStateIds[rangeIdx], location.getKvStateID(keyGroup)); + assertEquals(serverAddresses[rangeIdx], location.getKvStateServerAddress(keyGroup)); + } } // Overwrite - for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) { - kvStateIds[keyGroupIndex] = new KvStateID(); - serverAddresses[keyGroupIndex] = new KvStateServerAddress(host, 1024 + keyGroupIndex); + for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) { + kvStateIds[rangeIdx] = new KvStateID(); + serverAddresses[rangeIdx] = new KvStateServerAddress(host, 1024 + rangeIdx); - location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], serverAddresses[keyGroupIndex]); - assertEquals(numKeyGroups, location.getNumRegisteredKeyGroups()); + location.registerKvState(keyGroupRanges.get(rangeIdx), kvStateIds[rangeIdx], serverAddresses[rangeIdx]); + assertEquals(registeredCount, location.getNumRegisteredKeyGroups()); } // Lookup - for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) { - assertEquals(kvStateIds[keyGroupIndex], location.getKvStateID(keyGroupIndex)); - assertEquals(serverAddresses[keyGroupIndex], location.getKvStateServerAddress(keyGroupIndex)); + for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) { + KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx); + for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) { + assertEquals(kvStateIds[rangeIdx], location.getKvStateID(keyGroup)); + assertEquals(serverAddresses[rangeIdx], location.getKvStateServerAddress(keyGroup)); + } } // Unregister - for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) { - location.unregisterKvState(keyGroupIndex); - assertEquals(numKeyGroups - keyGroupIndex - 1, location.getNumRegisteredKeyGroups()); + for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) { + KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx); + location.unregisterKvState(keyGroupRange); + registeredCount -= keyGroupRange.getNumberOfKeyGroups(); + assertEquals(registeredCount, location.getNumRegisteredKeyGroups()); } // Lookup - for (int keyGroupIndex = 0; keyGroupIndex < numKeyGroups; keyGroupIndex++) { - assertEquals(null, location.getKvStateID(keyGroupIndex)); - assertEquals(null, location.getKvStateServerAddress(keyGroupIndex)); + for (int rangeIdx = 0; rangeIdx < numRanges; rangeIdx++) { + KeyGroupRange keyGroupRange = keyGroupRanges.get(rangeIdx); + for(int keyGroup = keyGroupRange.getStartKeyGroup(); keyGroup <= keyGroupRange.getEndKeyGroup(); ++keyGroup) { + assertEquals(null, location.getKvStateID(keyGroup)); + assertEquals(null, location.getKvStateServerAddress(keyGroup)); + } } assertEquals(0, location.getNumRegisteredKeyGroups()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java index 405f9622348f7..1039568b1295b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java @@ -154,7 +154,7 @@ public void testForceLookupOnOutdatedLocation() throws Exception { KvStateServerAddress serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 12323); KvStateLocation location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query3); for (int i = 0; i < numKeyGroups; i++) { - location.registerKvState(i, kvStateId, serverAddress); + location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress); } when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query3))) @@ -184,7 +184,7 @@ public void testForceLookupOnOutdatedLocation() throws Exception { serverAddress = new KvStateServerAddress(InetAddress.getLocalHost(), 11123); location = new KvStateLocation(jobId, new JobVertexID(), numKeyGroups, query4); for (int i = 0; i < numKeyGroups; i++) { - location.registerKvState(i, kvStateId, serverAddress); + location.registerKvState(new KeyGroupRange(i, i), kvStateId, serverAddress); } when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query4))) @@ -281,7 +281,7 @@ public void testIntegrationWithKvStateServer() throws Exception { kvStateIds[i] = registries[i].registerKvState( jobId, new JobVertexID(), - i, // key group index + new KeyGroupRange(i, i), "choco", kvState); } @@ -302,7 +302,7 @@ public void testIntegrationWithKvStateServer() throws Exception { // Location lookup service KvStateLocation location = new KvStateLocation(jobId, jobVertexId, numServers, "choco"); for (int keyGroupIndex = 0; keyGroupIndex < numServers; keyGroupIndex++) { - location.registerKvState(keyGroupIndex, kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress()); + location.registerKvState(new KeyGroupRange(keyGroupIndex, keyGroupIndex), kvStateIds[keyGroupIndex], servers[keyGroupIndex].getAddress()); } KvStateLocationLookupService lookupService = mock(KvStateLocationLookupService.class); @@ -385,7 +385,7 @@ public void testLookupMultipleJobIds() throws Exception { // Exact contents don't matter here KvStateLocation location = new KvStateLocation(new JobID(), new JobVertexID(), 1, name); - location.registerKvState(0, new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892)); + location.registerKvState(new KeyGroupRange(0, 0), new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 892)); JobID jobId1 = new JobID(); JobID jobId2 = new JobID(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java index f78517467a07c..c8fb4bb4d73d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateClientTest.java @@ -595,7 +595,7 @@ public void testClientServerIntegration() throws Exception { KvState kvState = (KvState) state; // Register KvState (one state instance for all server) - ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), 0, "any", kvState); + ids[i] = registry[i].registerKvState(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any", kvState); } final KvStateClient finalClient = client; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java index 52c807f1fedd5..7e6d7135d7cc0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/KvStateServerHandlerTest.java @@ -278,7 +278,7 @@ public void testFailureOnGetSerializedValue() throws Exception { KvStateID kvStateId = registry.registerKvState( new JobID(), new JobVertexID(), - 0, + new KeyGroupRange(0, 0), "vanilla", kvState); @@ -681,18 +681,18 @@ private ChannelHandler getFrameDecoder() { */ static class TestRegistryListener implements KvStateRegistryListener { volatile JobVertexID jobVertexID; - volatile int keyGroupIndex; + volatile KeyGroupRange keyGroupIndex; volatile String registrationName; volatile KvStateID kvStateId; @Override public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId) { this.jobVertexID = jobVertexId; - this.keyGroupIndex = keyGroupIndex; + this.keyGroupIndex = keyGroupRange; this.registrationName = registrationName; this.kvStateId = kvStateId; } @@ -700,7 +700,7 @@ public void notifyKvStateRegistered(JobID jobId, @Override public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, - int keyGroupIndex, + KeyGroupRange keyGroupRange, String registrationName) { } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 33ec182406e56..73e28088fbe36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -1108,6 +1108,7 @@ public void testQueryableStateRegistration() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); KeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE, env); + KeyGroupRange expectedKeyGroupRange = backend.getKeyGroupRange(); KvStateRegistryListener listener = mock(KvStateRegistryListener.class); registry.registerListener(listener); @@ -1122,7 +1123,7 @@ public void testQueryableStateRegistration() throws Exception { // Verify registered verify(listener, times(1)).notifyKvStateRegistered( - eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class)); + eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class)); KeyGroupsStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory)); @@ -1130,7 +1131,7 @@ public void testQueryableStateRegistration() throws Exception { backend.close(); verify(listener, times(1)).notifyKvStateUnregistered( - eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana")); + eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana")); backend.close(); // Initialize again backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot, env); @@ -1140,12 +1141,12 @@ public void testQueryableStateRegistration() throws Exception { // Verify registered again verify(listener, times(2)).notifyKvStateRegistered( - eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana"), any(KvStateID.class)); + eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class)); backend.close(); } - + @Test public void testEmptyStateCheckpointing() { diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index b99858a0dd40f..1259460970982 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -93,7 +93,6 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -@Ignore public class QueryableStateITCase extends TestLogger { private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS);