From 0d7c5f3ebf3a721b679cf830b124ffd4d646c8f6 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 17 Jan 2017 14:26:16 +0100 Subject: [PATCH] [FLINK-5527][query] querying a non-existing key does not return the default value Querying for a non-existing key for a state that has a default value set currently results in an UnknownKeyOrNamespace exception when the MemoryStateBackend or FsStateBackend is used. It should return the default value instead just like the RocksDBStateBackend. --- .../streaming/state/RocksDBValueState.java | 15 ++ .../runtime/state/heap/HeapValueState.java | 29 ++++ .../test/query/QueryableStateITCase.java | 142 ++++++++++++++++++ 3 files changed, 186 insertions(+) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index 9563ed80efdb5..c4073959975f6 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -103,6 +103,21 @@ public void update(V value) throws IOException { } } + /** + * Returns the serialized value for the given key and namespace. + * + *

If no value is associated with key and namespace, the default value + * set via the state descriptor is returned (may be null). + * + * @param serializedKeyAndNamespace + * Serialized key and namespace + * + * @return Serialized value, default value or null if no value + * is associated with the key and namespace. + * + * @throws Exception + * Exceptions during serialization are forwarded + */ @Override public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { byte[] value = super.getSerializedValue(serializedKeyAndNamespace); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java index cccaacb420475..e519d1a8c69ed 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapValueState.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.util.Preconditions; @@ -109,4 +110,32 @@ public void update(V value) { keyedMap.put(backend.getCurrentKey(), value); } + + /** + * Returns the serialized value for the given key and namespace. + * + *

If no value is associated with key and namespace, the default value + * set via the state descriptor is returned (may be null). + * + * @param serializedKeyAndNamespace + * Serialized key and namespace + * + * @return Serialized value, default value or null if no value + * is associated with the key and namespace. + * + * @throws Exception + * Exceptions during serialization are forwarded + */ + @Override + public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { + byte[] value = super.getSerializedValue(serializedKeyAndNamespace); + + if (value != null) { + return value; + } else { + return KvStateRequestSerializer + .serializeValue(stateDesc.getDefaultValue(), + stateDesc.getSerializer()); + } + } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index a5ed6adc3b8b9..4a15fc40bf387 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -40,6 +40,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -52,9 +53,12 @@ import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.QueryableStateClient; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; @@ -68,7 +72,9 @@ import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; @@ -102,6 +108,9 @@ public class QueryableStateITCase extends TestLogger { private final static int NUM_SLOTS_PER_TM = 4; private final static int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * Shared between all the test. Make sure to have at least NUM_SLOTS * available after your test finishes, e.g. cancel the job you submitted. @@ -767,6 +776,139 @@ public Integer getKey(Tuple2 value) throws Exception { } } + /** + * Tests simple value state queryable state instance with a default value + * set using the {@link MemoryStateBackend}. + */ + @Test + public void testValueStateDefaultValueMemoryBackend() throws Exception { + testValueStateDefault(new MemoryStateBackend()); + } + + /** + * Tests simple value state queryable state instance with a default value + * set using the {@link RocksDBStateBackend}. + */ + @Test + public void testValueStateDefaultValueRocksDBBackend() throws Exception { + testValueStateDefault(new RocksDBStateBackend( + temporaryFolder.newFolder().toURI().toString())); + } + + /** + * Tests simple value state queryable state instance with a default value + * set using the {@link FsStateBackend}. + */ + @Test + public void testValueStateDefaultValueFsBackend() throws Exception { + testValueStateDefault(new FsStateBackend( + temporaryFolder.newFolder().toURI().toString())); + } + + /** + * Tests simple value state queryable state instance with a default value + * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements) + * tuples, the key is mapped to 1 but key 0 is queried. + * + * @param stateBackend state back-end to use for the job + */ + void testValueStateDefault(final AbstractStateBackend stateBackend) throws + Exception { + + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final int numElements = 1024; + + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(NUM_SLOTS); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies + .fixedDelayRestart(Integer.MAX_VALUE, 1000)); + + env.setStateBackend(stateBackend); + + DataStream> source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor> valueState = + new ValueStateDescriptor<>( + "any", + source.getType(), + Tuple2.of(0, 1337l)); + + // only expose key "1" + QueryableStateStream> + queryableState = + source.keyBy( + new KeySelector, Integer>() { + @Override + public Integer getKey( + Tuple2 value) throws + Exception { + return 1; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // Now query + FiniteDuration retryDelay = + new FiniteDuration(1, TimeUnit.SECONDS); + int key = 0; + final byte[] serializedKey = + KvStateRequestSerializer.serializeKeyAndNamespace( + key, + queryableState.getKeySerializer(), + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + Future future = getKvStateWithRetries(client, + jobId, + queryableState.getQueryableStateName(), + key, + serializedKey, + retryDelay); + + byte[] serializedValue = + Await.result(future, deadline.timeLeft()); + + Tuple2 value = + KvStateRequestSerializer.deserializeValue( + serializedValue, + queryableState.getValueSerializer()); + + assertEquals("Key mismatch", key, value.f0.intValue()); + assertEquals("Value mismatch", 1337l, value.f1.longValue()); + } finally { + // Free cluster resources + if (jobId != null) { + Future cancellation = cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), + deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply( + CancellationSuccess.class)); + + Await.ready(cancellation, deadline.timeLeft()); + } + + client.shutDown(); + } + } + /** * Tests simple list state queryable state instance. Each source emits * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then