From 6c05abe3433070afe0931fff213bd3ce9da2b116 Mon Sep 17 00:00:00 2001 From: kkloudas Date: Mon, 19 Jun 2017 15:01:40 +0200 Subject: [PATCH] [FLINK-7044] [qs] Allow to specify namespace and descriptor in query. --- .../runtime/query/QueryableStateClient.java | 198 +++++++++++++ .../runtime/state/VoidNamespaceTypeInfo.java | 102 +++++++ .../query/QueryableStateClientTest.java | 67 +++-- .../state/VoidNamespaceTypeInfoTest.java | 33 +++ .../query/AbstractQueryableStateITCase.java | 267 +++++++++++------- 5 files changed, 550 insertions(+), 117 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfoTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java index 003d803df14b1..306333a92fd59 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateClient.java @@ -22,8 +22,14 @@ import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.dispatch.Recover; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.base.ListSerializer; import org.apache.flink.configuration.AkkaOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; @@ -38,7 +44,10 @@ import org.apache.flink.runtime.query.netty.KvStateServer; import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace; import org.apache.flink.runtime.query.netty.UnknownKvStateID; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +59,7 @@ import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.net.ConnectException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -95,6 +105,8 @@ public class QueryableStateClient { /** This is != null, if we started the actor system. */ private final ActorSystem actorSystem; + private ExecutionConfig executionConfig; + /** * Creates a client from the given configuration. * @@ -157,10 +169,21 @@ public QueryableStateClient( this.lookupService = lookupService; this.kvStateClient = networkClient; this.executionContext = actorSystem.dispatcher(); + this.executionConfig = new ExecutionConfig(); this.lookupService.start(); } + /** Gets the {@link ExecutionConfig}. */ + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + /** Sets the {@link ExecutionConfig}. */ + public void setExecutionConfig(ExecutionConfig config) { + this.executionConfig = config; + } + /** * Creates a client. * @@ -266,6 +289,181 @@ public Future recover(Throwable failure) throws Throwable { }, executionContext); } + /** + * Returns a future holding the request result. + * + *

If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + *

If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + *

All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key we are interested in. + * @param keyTypeHint A {@link TypeHint} used to extract the type of the key. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public Future getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final TypeHint keyTypeHint, + final StateDescriptor stateDescriptor) { + + Preconditions.checkNotNull(keyTypeHint); + + TypeInformation keyTypeInfo = keyTypeHint.getTypeInfo(); + return getKvState(jobId, queryableStateName, key, keyTypeInfo, stateDescriptor); + } + + /** + * Returns a future holding the request result. + * + *

If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + *

If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + *

All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key we are interested in. + * @param keyTypeInfo The {@link TypeInformation} of the key. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public Future getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final TypeInformation keyTypeInfo, + final StateDescriptor stateDescriptor) { + + Preconditions.checkNotNull(keyTypeInfo); + + return getKvState(jobId, queryableStateName, key, VoidNamespace.INSTANCE, + keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor); + } + + /** + * Returns a future holding the request result. + * + *

If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + *

If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + *

All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key that the state we request is associated with. + * @param namespace The namespace of the state. + * @param keyTypeInfo The {@link TypeInformation} of the keys. + * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. + * @param stateDescriptor The {@link StateDescriptor} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public Future getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final N namespace, + final TypeInformation keyTypeInfo, + final TypeInformation namespaceTypeInfo, + final StateDescriptor stateDescriptor) { + + Preconditions.checkNotNull(stateDescriptor); + + // initialize the value serializer based on the execution config. + stateDescriptor.initializeSerializerUnlessSet(executionConfig); + TypeSerializer stateSerializer = stateDescriptor.getSerializer(); + + return getKvState(jobId, queryableStateName, key, + namespace, keyTypeInfo, namespaceTypeInfo, stateSerializer); + } + + /** + * Returns a future holding the request result. + * + *

If the server does not serve a KvState instance with the given ID, + * the Future will be failed with a {@link UnknownKvStateID}. + * + *

If the KvState instance does not hold any data for the given key + * and namespace, the Future will be failed with a {@link UnknownKeyOrNamespace}. + * + *

All other failures are forwarded to the Future. + * + * @param jobId JobID of the job the queryable state belongs to. + * @param queryableStateName Name under which the state is queryable. + * @param key The key that the state we request is associated with. + * @param namespace The namespace of the state. + * @param keyTypeInfo The {@link TypeInformation} of the keys. + * @param namespaceTypeInfo The {@link TypeInformation} of the namespace. + * @param stateSerializer The {@link TypeSerializer} of the state we want to query. + * @return Future holding the result. + */ + @PublicEvolving + public Future getKvState( + final JobID jobId, + final String queryableStateName, + final K key, + final N namespace, + final TypeInformation keyTypeInfo, + final TypeInformation namespaceTypeInfo, + final TypeSerializer stateSerializer) { + + Preconditions.checkNotNull(queryableStateName); + + Preconditions.checkNotNull(key); + Preconditions.checkNotNull(namespace); + + Preconditions.checkNotNull(keyTypeInfo); + Preconditions.checkNotNull(namespaceTypeInfo); + Preconditions.checkNotNull(stateSerializer); + + if (stateSerializer instanceof ListSerializer) { + throw new IllegalArgumentException("ListState is not supported out-of-the-box yet."); + } + + TypeSerializer keySerializer = keyTypeInfo.createSerializer(executionConfig); + TypeSerializer namespaceSerializer = namespaceTypeInfo.createSerializer(executionConfig); + + final byte[] serializedKeyAndNamespace; + try { + serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( + key, + keySerializer, + namespace, + namespaceSerializer); + } catch (IOException e) { + return Futures.failed(e); + } + + return getKvState(jobId, queryableStateName, key.hashCode(), serializedKeyAndNamespace) + .flatMap(new Mapper>() { + @Override + public Future apply(byte[] parameter) { + try { + return Futures.successful( + KvStateRequestSerializer.deserializeValue(parameter, stateSerializer)); + } catch (IOException e) { + return Futures.failed(e); + } + } + }, executionContext); + } + /** * Returns a future holding the serialized request result. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java new file mode 100644 index 0000000000000..9917410a51b6a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; + +/** + * {@link TypeInformation} for {@link VoidNamespace}. + */ +@Public +public class VoidNamespaceTypeInfo extends TypeInformation { + + private static final long serialVersionUID = 5453679706408610586L; + + public static final VoidNamespaceTypeInfo INSTANCE = new VoidNamespaceTypeInfo(); + + @Override + @PublicEvolving + public boolean isBasicType() { + return false; + } + + @Override + @PublicEvolving + public boolean isTupleType() { + return false; + } + + @Override + @PublicEvolving + public int getArity() { + return 0; + } + + @Override + @PublicEvolving + public int getTotalFields() { + return 0; + } + + @Override + @PublicEvolving + public Class getTypeClass() { + return VoidNamespace.class; + } + + @Override + @PublicEvolving + public boolean isKeyType() { + return false; + } + + @Override + @PublicEvolving + public TypeSerializer createSerializer(ExecutionConfig config) { + return VoidNamespaceSerializer.INSTANCE; + } + + @Override + @PublicEvolving + public String toString() { + return "VoidNamespaceTypeInfo"; + } + + @Override + @PublicEvolving + public boolean equals(Object obj) { + return this == obj || obj instanceof VoidNamespaceTypeInfo; + } + + @Override + @PublicEvolving + public int hashCode() { + return 0; + } + + @Override + @PublicEvolving + public boolean canEqual(Object obj) { + return obj instanceof VoidNamespaceTypeInfo; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java index 8c4e049eac473..0806ba88640c1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/QueryableStateClientTest.java @@ -22,6 +22,7 @@ import akka.dispatch.Futures; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; @@ -31,7 +32,6 @@ import org.apache.flink.runtime.query.netty.KvStateClient; import org.apache.flink.runtime.query.netty.KvStateServer; import org.apache.flink.runtime.query.netty.UnknownKvStateID; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo; @@ -108,11 +108,12 @@ public void testForceLookupOnOutdatedLocation() throws Exception { when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query1))) .thenReturn(unknownKvStateLocation); - Future result = client.getKvState( + Future result = client.getKvState( jobId, query1, 0, - new byte[0]); + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); try { Await.result(result, timeout); @@ -134,7 +135,12 @@ public void testForceLookupOnOutdatedLocation() throws Exception { when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query2))) .thenReturn(unknownKeyGroupLocation); - result = client.getKvState(jobId, query2, 0, new byte[0]); + result = client.getKvState( + jobId, + query2, + 0, + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); try { Await.result(result, timeout); @@ -164,7 +170,12 @@ public void testForceLookupOnOutdatedLocation() throws Exception { when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class))) .thenReturn(unknownKvStateId); - result = client.getKvState(jobId, query3, 0, new byte[0]); + result = client.getKvState( + jobId, + query3, + 0, + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); try { Await.result(result, timeout); @@ -194,7 +205,12 @@ public void testForceLookupOnOutdatedLocation() throws Exception { when(networkClient.getKvState(eq(serverAddress), eq(kvStateId), any(byte[].class))) .thenReturn(connectException); - result = client.getKvState(jobId, query4, 0, new byte[0]); + result = client.getKvState( + jobId, + query4, + 0, + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); try { Await.result(result, timeout); @@ -213,7 +229,12 @@ public void testForceLookupOnOutdatedLocation() throws Exception { when(lookupService.getKvStateLookupInfo(eq(jobId), eq(query5))) .thenReturn(exception); - client.getKvState(jobId, query5, 0, new byte[0]); + client.getKvState( + jobId, + query5, + 0, + BasicTypeInfo.INT_TYPE_INFO, + new ValueStateDescriptor<>("test", IntSerializer.INSTANCE)); verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId), eq(query5)); } finally { @@ -279,7 +300,7 @@ public void testIntegrationWithKvStateServer() throws Exception { // Register state HeapValueState kvState = new HeapValueState<>( descriptor, - new NestedMapsStateTable(keyedStateBackend, registeredKeyedBackendStateMetaInfo), + new NestedMapsStateTable<>(keyedStateBackend, registeredKeyedBackendStateMetaInfo), IntSerializer.INSTANCE, VoidNamespaceSerializer.INSTANCE); @@ -322,25 +343,25 @@ public void testIntegrationWithKvStateServer() throws Exception { client = new QueryableStateClient(lookupService, networkClient, testActorSystem.dispatcher()); // Send all queries - List> futures = new ArrayList<>(numKeys); + List> futures = new ArrayList<>(numKeys); for (int key = 0; key < numKeys; key++) { - byte[] serializedKeyAndNamespace = KvStateRequestSerializer.serializeKeyAndNamespace( + ValueStateDescriptor descriptor = + new ValueStateDescriptor<>("any", IntSerializer.INSTANCE); + futures.add(client.getKvState( + jobId, + "choco", key, - IntSerializer.INSTANCE, - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - futures.add(client.getKvState(jobId, "choco", key, serializedKeyAndNamespace)); + BasicTypeInfo.INT_TYPE_INFO, + descriptor)); } // Verify results - Future> future = Futures.sequence(futures, testActorSystem.dispatcher()); - Iterable results = Await.result(future, timeout); + Future> future = Futures.sequence(futures, testActorSystem.dispatcher()); + Iterable results = Await.result(future, timeout); int index = 0; - for (byte[] buffer : results) { - int deserializedValue = KvStateRequestSerializer.deserializeValue(buffer, IntSerializer.INSTANCE); - assertEquals(1337 + index, deserializedValue); + for (int buffer : results) { + assertEquals(1337 + index, buffer); index++; } @@ -411,10 +432,12 @@ public void testLookupMultipleJobIds() throws Exception { networkClient, testActorSystem.dispatcher()); + ValueStateDescriptor stateDesc = new ValueStateDescriptor<>("test", IntSerializer.INSTANCE); + // Query ies with same name, but different job IDs should lead to a // single lookup per query and job ID. - client.getKvState(jobId1, name, 0, new byte[0]); - client.getKvState(jobId2, name, 0, new byte[0]); + client.getKvState(jobId1, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc); + client.getKvState(jobId2, name, 0, BasicTypeInfo.INT_TYPE_INFO, stateDesc); verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId1), eq(name)); verify(lookupService, times(1)).getKvStateLookupInfo(eq(jobId2), eq(name)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfoTest.java new file mode 100644 index 0000000000000..a4329deec44dd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfoTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.typeinfo.IntegerTypeInfo; +import org.apache.flink.api.common.typeutils.TypeInformationTestBase; + +/** + * Test for {@link IntegerTypeInfo}. + */ +public class VoidNamespaceTypeInfoTest extends TypeInformationTestBase { + + @Override + protected VoidNamespaceTypeInfo[] getTestData() { + return new VoidNamespaceTypeInfo[] { VoidNamespaceTypeInfo.INSTANCE }; + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java index f07113d40f183..f012a479b826c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java @@ -29,7 +29,11 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.FoldingStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; +import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; @@ -45,11 +49,10 @@ import org.apache.flink.runtime.messages.JobManagerMessages.JobFound; import org.apache.flink.runtime.query.QueryableStateClient; import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.VoidNamespace; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.VoidNamespaceTypeInfo; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.QueryableStateStream; @@ -86,14 +89,14 @@ */ public abstract class AbstractQueryableStateITCase extends TestLogger { - private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); - private final static FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS); + private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); + private static final FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS); private static ActorSystem TEST_ACTOR_SYSTEM; - private final static int NUM_TMS = 2; - private final static int NUM_SLOTS_PER_TM = 4; - private final static int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM; + private static final int NUM_TMS = 2; + private static final int NUM_SLOTS_PER_TM = 4; + private static final int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM; /** * State backend to use. @@ -201,6 +204,8 @@ public void testQueryableState() throws Exception { final QueryableStateStream> queryableState = source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 7143749578983540352L; + @Override public Integer getKey(Tuple2 value) throws Exception { return value.f0; @@ -222,7 +227,7 @@ public Integer getKey(Tuple2 value) throws Exception { while (!allNonZero && deadline.hasTimeLeft()) { allNonZero = true; - final List> futures = new ArrayList<>(numKeys); + final List>> futures = new ArrayList<>(numKeys); for (int i = 0; i < numKeys; i++) { final int key = i; @@ -234,38 +239,28 @@ public Integer getKey(Tuple2 value) throws Exception { allNonZero = false; } - final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - queryableState.getKeySerializer(), - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - Future serializedResult = getKvStateWithRetries( + Future> result = getKvStateWithRetries( client, jobId, queryName, key, - serializedKey, + BasicTypeInfo.INT_TYPE_INFO, + reducingState, QUERY_RETRY_DELAY, false); - serializedResult.onSuccess(new OnSuccess() { + result.onSuccess(new OnSuccess>() { @Override - public void onSuccess(byte[] result) throws Throwable { - Tuple2 value = KvStateRequestSerializer.deserializeValue( - result, - queryableState.getValueSerializer()); - - counts.set(key, value.f1); - - assertEquals("Key mismatch", key, value.f0.intValue()); + public void onSuccess(Tuple2 result) throws Throwable { + counts.set(key, result.f1); + assertEquals("Key mismatch", key, result.f0.intValue()); } }, TEST_ACTOR_SYSTEM.dispatcher()); - futures.add(serializedResult); + futures.add(result); } - Future> futureSequence = Futures.sequence( + Future>> futureSequence = Futures.sequence( futures, TEST_ACTOR_SYSTEM.dispatcher()); @@ -330,6 +325,8 @@ public void testDuplicateRegistrationFailsJob() throws Exception { final QueryableStateStream> queryableState = source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = -4126824763829132959L; + @Override public Integer getKey(Tuple2 value) throws Exception { return value.f0; @@ -338,6 +335,8 @@ public Integer getKey(Tuple2 value) throws Exception { final QueryableStateStream> duplicate = source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = -6265024000462809436L; + @Override public Integer getKey(Tuple2 value) throws Exception { return value.f0; @@ -422,6 +421,8 @@ public void testValueState() throws Exception { QueryableStateStream> queryableState = source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 7662520075515707428L; + @Override public Integer getKey(Tuple2 value) throws Exception { return value.f0; @@ -437,8 +438,7 @@ public Integer getKey(Tuple2 value) throws Exception { // Now query long expected = numElements; - executeValueQuery(deadline, client, jobId, queryableState, - expected); + executeQuery(deadline, client, jobId, "hakuna", valueState, expected); } finally { // Free cluster resources if (jobId != null) { @@ -490,6 +490,8 @@ public void testQueryNonStartedJobState() throws Exception { QueryableStateStream> queryableState = source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 7480503339992214681L; + @Override public Integer getKey(Tuple2 value) throws Exception { return value.f0; @@ -504,17 +506,18 @@ public Integer getKey(Tuple2 value) throws Exception { long expected = numElements; // query once - client.getKvState(jobId, queryableState.getQueryableStateName(), 0, - KvStateRequestSerializer.serializeKeyAndNamespace( + client.getKvState( + jobId, + queryableState.getQueryableStateName(), 0, - queryableState.getKeySerializer(), VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE)); + BasicTypeInfo.INT_TYPE_INFO, + VoidNamespaceTypeInfo.INSTANCE, + valueState); cluster.submitJobDetached(jobGraph); - executeValueQuery(deadline, client, jobId, queryableState, - expected); + executeQuery(deadline, client, jobId, "hakuna", valueState, expected); } finally { // Free cluster resources if (jobId != null) { @@ -534,33 +537,66 @@ public Integer getKey(Tuple2 value) throws Exception { * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until * expected equals the value of the result tuple's second field. */ - private void executeValueQuery(final Deadline deadline, - final QueryableStateClient client, final JobID jobId, - final QueryableStateStream> queryableState, - final long expected) throws Exception { + private void executeQuery( + final Deadline deadline, + final QueryableStateClient client, + final JobID jobId, + final String queryableStateName, + final StateDescriptor> stateDescriptor, + final long expected) throws Exception { for (int key = 0; key < NUM_SLOTS; key++) { - final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - queryableState.getKeySerializer(), - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - boolean success = false; while (deadline.hasTimeLeft() && !success) { - Future future = getKvStateWithRetries(client, + Future> future = getKvStateWithRetries(client, jobId, - queryableState.getQueryableStateName(), + queryableStateName, key, - serializedKey, + BasicTypeInfo.INT_TYPE_INFO, + stateDescriptor, QUERY_RETRY_DELAY, false); - byte[] serializedValue = Await.result(future, deadline.timeLeft()); + Tuple2 value = Await.result(future, deadline.timeLeft()); + + assertEquals("Key mismatch", key, value.f0.intValue()); + if (expected == value.f1) { + success = true; + } else { + // Retry + Thread.sleep(50); + } + } + + assertTrue("Did not succeed query", success); + } + } + + /** + * Retry a query for state for keys between 0 and {@link #NUM_SLOTS} until + * expected equals the value of the result tuple's second field. + */ + private void executeQuery( + final Deadline deadline, + final QueryableStateClient client, + final JobID jobId, + final String queryableStateName, + final TypeSerializer> valueSerializer, + final long expected) throws Exception { + + for (int key = 0; key < NUM_SLOTS; key++) { + boolean success = false; + while (deadline.hasTimeLeft() && !success) { + Future> future = getKvStateWithRetries(client, + jobId, + queryableStateName, + key, + BasicTypeInfo.INT_TYPE_INFO, + valueSerializer, + QUERY_RETRY_DELAY, + false); - Tuple2 value = KvStateRequestSerializer.deserializeValue( - serializedValue, - queryableState.getValueSerializer()); + Tuple2 value = Await.result(future, deadline.timeLeft()); assertEquals("Key mismatch", key, value.f0.intValue()); if (expected == value.f1) { @@ -623,6 +659,8 @@ public void testValueStateDefault() throws queryableState = source.keyBy( new KeySelector, Integer>() { + private static final long serialVersionUID = 4509274556892655887L; + @Override public Integer getKey( Tuple2 value) throws @@ -639,18 +677,12 @@ public Integer getKey( // Now query int key = 0; - final byte[] serializedKey = - KvStateRequestSerializer.serializeKeyAndNamespace( - key, - queryableState.getKeySerializer(), - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - - Future future = getKvStateWithRetries(client, + Future> future = getKvStateWithRetries(client, jobId, queryableState.getQueryableStateName(), key, - serializedKey, + BasicTypeInfo.INT_TYPE_INFO, + valueState, QUERY_RETRY_DELAY, true); @@ -707,6 +739,8 @@ public void testValueStateShortcut() throws Exception { // Value state shortcut QueryableStateStream> queryableState = source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 9168901838808830068L; + @Override public Integer getKey(Tuple2 value) throws Exception { return value.f0; @@ -722,8 +756,8 @@ public Integer getKey(Tuple2 value) throws Exception { // Now query long expected = numElements; - executeValueQuery(deadline, client, jobId, queryableState, - expected); + executeQuery(deadline, client, jobId, "matata", + queryableState.getValueSerializer(), expected); } finally { // Free cluster resources if (jobId != null) { @@ -780,6 +814,8 @@ public void testFoldingState() throws Exception { QueryableStateStream queryableState = source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = -842809958106747539L; + @Override public Integer getKey(Tuple2 value) throws Exception { return value.f0; @@ -796,28 +832,18 @@ public Integer getKey(Tuple2 value) throws Exception { String expected = Integer.toString(numElements * (numElements + 1) / 2); for (int key = 0; key < NUM_SLOTS; key++) { - final byte[] serializedKey = KvStateRequestSerializer.serializeKeyAndNamespace( - key, - queryableState.getKeySerializer(), - VoidNamespace.INSTANCE, - VoidNamespaceSerializer.INSTANCE); - boolean success = false; while (deadline.hasTimeLeft() && !success) { - Future future = getKvStateWithRetries(client, + Future future = getKvStateWithRetries(client, jobId, queryableState.getQueryableStateName(), key, - serializedKey, + BasicTypeInfo.INT_TYPE_INFO, + foldingState, QUERY_RETRY_DELAY, false); - byte[] serializedValue = Await.result(future, deadline.timeLeft()); - - String value = KvStateRequestSerializer.deserializeValue( - serializedValue, - queryableState.getValueSerializer()); - + String value = Await.result(future, deadline.timeLeft()); if (expected.equals(value)) { success = true; } else { @@ -882,6 +908,8 @@ public void testReducingState() throws Exception { QueryableStateStream> queryableState = source.keyBy(new KeySelector, Integer>() { + private static final long serialVersionUID = 8470749712274833552L; + @Override public Integer getKey(Tuple2 value) throws Exception { return value.f0; @@ -899,8 +927,7 @@ public Integer getKey(Tuple2 value) throws Exception { // Now query long expected = numElements * (numElements + 1) / 2; - executeValueQuery(deadline, client, jobId, queryableState, - expected); + executeQuery(deadline, client, jobId, "jungle", reducingState, expected); } finally { // Free cluster resources if (jobId != null) { @@ -916,23 +943,23 @@ public Integer getKey(Tuple2 value) throws Exception { } } - @SuppressWarnings("unchecked") - private static Future getKvStateWithRetries( + private static Future getKvStateWithRetries( final QueryableStateClient client, final JobID jobId, final String queryName, - final int key, - final byte[] serializedKey, + final K key, + final TypeInformation keyTypeInfo, + final TypeSerializer valueTypeSerializer, final FiniteDuration retryDelay, - final boolean failForUknownKeyOrNamespace) { + final boolean failForUnknownKeyOrNamespace) { - return client.getKvState(jobId, queryName, key, serializedKey) - .recoverWith(new Recover>() { + return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, valueTypeSerializer) + .recoverWith(new Recover>() { @Override - public Future recover(Throwable failure) throws Throwable { + public Future recover(Throwable failure) throws Throwable { if (failure instanceof AssertionError) { return Futures.failed(failure); - } else if (failForUknownKeyOrNamespace && + } else if (failForUnknownKeyOrNamespace && (failure instanceof UnknownKeyOrNamespace)) { return Futures.failed(failure); } else { @@ -943,17 +970,65 @@ public Future recover(Throwable failure) throws Throwable { retryDelay, TEST_ACTOR_SYSTEM.scheduler(), TEST_ACTOR_SYSTEM.dispatcher(), - new Callable>() { + new Callable>() { @Override - public Future call() throws Exception { + public Future call() throws Exception { return getKvStateWithRetries( client, jobId, queryName, key, - serializedKey, + keyTypeInfo, + valueTypeSerializer, retryDelay, - failForUknownKeyOrNamespace); + failForUnknownKeyOrNamespace); + } + }); + } + } + }, TEST_ACTOR_SYSTEM.dispatcher()); + + } + + private static Future getKvStateWithRetries( + final QueryableStateClient client, + final JobID jobId, + final String queryName, + final K key, + final TypeInformation keyTypeInfo, + final StateDescriptor stateDescriptor, + final FiniteDuration retryDelay, + final boolean failForUnknownKeyOrNamespace) { + + return client.getKvState(jobId, queryName, key, VoidNamespace.INSTANCE, keyTypeInfo, VoidNamespaceTypeInfo.INSTANCE, stateDescriptor) + .recoverWith(new Recover>() { + @Override + public Future recover(Throwable failure) throws Throwable { + if (failure instanceof AssertionError) { + return Futures.failed(failure); + } else if (failForUnknownKeyOrNamespace && + (failure instanceof UnknownKeyOrNamespace)) { + return Futures.failed(failure); + } else { + // At startup some failures are expected + // due to races. Make sure that they don't + // fail this test. + return Patterns.after( + retryDelay, + TEST_ACTOR_SYSTEM.scheduler(), + TEST_ACTOR_SYSTEM.dispatcher(), + new Callable>() { + @Override + public Future call() throws Exception { + return getKvStateWithRetries( + client, + jobId, + queryName, + key, + keyTypeInfo, + stateDescriptor, + retryDelay, + failForUnknownKeyOrNamespace); } }); } @@ -970,10 +1045,12 @@ public Future call() throws Exception { */ private static class TestAscendingValueSource extends RichParallelSourceFunction> { + private static final long serialVersionUID = 1459935229498173245L; + private final long maxValue; private volatile boolean isRunning = true; - public TestAscendingValueSource(long maxValue) { + TestAscendingValueSource(long maxValue) { Preconditions.checkArgument(maxValue >= 0); this.maxValue = maxValue; } @@ -1024,12 +1101,12 @@ private static class TestKeyRangeSource extends RichParallelSourceFunction