From 9c1a247ada0015ff1b97c6017e8c1de874ba6d17 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Tue, 17 Jan 2017 14:26:16 +0100 Subject: [PATCH] [FLINK-5613][query] querying a non-existing key is inconsistent among state backends Querying for a non-existing key for a state that has a default value set currently results in an UnknownKeyOrNamespace exception when the MemoryStateBackend or FsStateBackend is used but results in the default value if RocksDBStateBackend is set. This removes the special handling from the RocksDBStateBackend and makes it consistent with the other two back-ends, i.e. returning null which results in the mentioned UnknownKeyOrNamespace exception. --- .../streaming/state/RocksDBValueState.java | 12 -- .../test/query/QueryableStateITCase.java | 160 +++++++++++++++++- 2 files changed, 153 insertions(+), 19 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index 7724f02bf4225..b2a4fba265cfb 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.internal.InternalValueState; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; @@ -103,15 +102,4 @@ public void update(V value) throws IOException { throw new RuntimeException("Error while adding data to RocksDB", e); } } - - @Override - public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { - byte[] value = super.getSerializedValue(serializedKeyAndNamespace); - - if (value != null) { - return value; - } else { - return KvStateRequestSerializer.serializeValue(stateDesc.getDefaultValue(), stateDesc.getSerializer()); - } - } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index 327a715080dc5..c2df6aed23c01 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -39,6 +39,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -50,10 +51,14 @@ import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.QueryableStateClient; +import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; @@ -67,7 +72,9 @@ import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; @@ -102,6 +109,9 @@ public class QueryableStateITCase extends TestLogger { private final static int NUM_SLOTS_PER_TM = 4; private final static int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * Shared between all the test. Make sure to have at least NUM_SLOTS * available after your test finishes, e.g. cancel the job you submitted. @@ -229,7 +239,8 @@ public Integer getKey(Tuple2 value) throws Exception { queryName, key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); serializedResult.onSuccess(new OnSuccess() { @Override @@ -352,7 +363,8 @@ public Integer getKey(Tuple2 value) throws Exception { queryName, key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -455,7 +467,8 @@ public ExecutionGraph apply(ExecutionGraphFound found) { queryName, key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -731,7 +744,8 @@ private void executeValueQuery(final Deadline deadline, queryableState.getQueryableStateName(), key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedValue = Await.result(future, deadline.timeLeft()); @@ -752,6 +766,132 @@ private void executeValueQuery(final Deadline deadline, } } + /** + * Tests simple value state queryable state instance with a default value + * set using the {@link MemoryStateBackend}. + */ + @Test(expected = UnknownKeyOrNamespace.class) + public void testValueStateDefaultValueMemoryBackend() throws Exception { + testValueStateDefault(new MemoryStateBackend()); + } + + /** + * Tests simple value state queryable state instance with a default value + * set using the {@link RocksDBStateBackend}. + */ + @Test(expected = UnknownKeyOrNamespace.class) + public void testValueStateDefaultValueRocksDBBackend() throws Exception { + testValueStateDefault(new RocksDBStateBackend( + temporaryFolder.newFolder().toURI().toString())); + } + + /** + * Tests simple value state queryable state instance with a default value + * set using the {@link FsStateBackend}. + */ + @Test(expected = UnknownKeyOrNamespace.class) + public void testValueStateDefaultValueFsBackend() throws Exception { + testValueStateDefault(new FsStateBackend( + temporaryFolder.newFolder().toURI().toString())); + } + + /** + * Tests simple value state queryable state instance with a default value + * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements) + * tuples, the key is mapped to 1 but key 0 is queried which should throw + * a {@link UnknownKeyOrNamespace} exception. + * + * @param stateBackend state back-end to use for the job + * + * @throws UnknownKeyOrNamespace thrown due querying a non-existent key + */ + void testValueStateDefault(final AbstractStateBackend stateBackend) throws + Exception, UnknownKeyOrNamespace { + + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final int numElements = 1024; + + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(NUM_SLOTS); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies + .fixedDelayRestart(Integer.MAX_VALUE, 1000)); + + env.setStateBackend(stateBackend); + + DataStream> source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor> valueState = + new ValueStateDescriptor<>( + "any", + source.getType(), + Tuple2.of(0, 1337l)); + + // only expose key "1" + QueryableStateStream> + queryableState = + source.keyBy( + new KeySelector, Integer>() { + @Override + public Integer getKey( + Tuple2 value) throws + Exception { + return 1; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // Now query + int key = 0; + final byte[] serializedKey = + KvStateRequestSerializer.serializeKeyAndNamespace( + key, + queryableState.getKeySerializer(), + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + Future future = getKvStateWithRetries(client, + jobId, + queryableState.getQueryableStateName(), + key, + serializedKey, + QUERY_RETRY_DELAY, + true); + + Await.result(future, deadline.timeLeft()); + } finally { + // Free cluster resources + if (jobId != null) { + Future cancellation = cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), + deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply( + CancellationSuccess.class)); + + Await.ready(cancellation, deadline.timeLeft()); + } + + client.shutDown(); + } + } + /** * Tests simple value state queryable state instance. Each source emits * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then @@ -883,7 +1023,8 @@ public Integer getKey(Tuple2 value) throws Exception { queryableState.getQueryableStateName(), key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedValue = Await.result(future, deadline.timeLeft()); @@ -993,7 +1134,8 @@ private static Future getKvStateWithRetries( final String queryName, final int key, final byte[] serializedKey, - final FiniteDuration retryDelay) { + final FiniteDuration retryDelay, + final boolean failForUknownKeyOrNamespace) { return client.getKvState(jobId, queryName, key, serializedKey) .recoverWith(new Recover>() { @@ -1001,6 +1143,9 @@ private static Future getKvStateWithRetries( public Future recover(Throwable failure) throws Throwable { if (failure instanceof AssertionError) { return Futures.failed(failure); + } else if (failForUknownKeyOrNamespace && + (failure instanceof UnknownKeyOrNamespace)) { + return Futures.failed(failure); } else { // At startup some failures are expected // due to races. Make sure that they don't @@ -1018,7 +1163,8 @@ public Future call() throws Exception { queryName, key, serializedKey, - retryDelay); + retryDelay, + failForUknownKeyOrNamespace); } }); }