diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index 7724f02bf4225..b2a4fba265cfb 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.internal.InternalValueState; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; @@ -103,15 +102,4 @@ public void update(V value) throws IOException { throw new RuntimeException("Error while adding data to RocksDB", e); } } - - @Override - public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { - byte[] value = super.getSerializedValue(serializedKeyAndNamespace); - - if (value != null) { - return value; - } else { - return KvStateRequestSerializer.serializeValue(stateDesc.getDefaultValue(), stateDesc.getSerializer()); - } - } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java index 327a715080dc5..c2df6aed23c01 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java @@ -39,6 +39,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.QueryableStateOptions; +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -50,10 +51,14 @@ import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.QueryableStateClient; +import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testingUtils.TestingCluster; import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; @@ -67,7 +72,9 @@ import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.Deadline; @@ -102,6 +109,9 @@ public class QueryableStateITCase extends TestLogger { private final static int NUM_SLOTS_PER_TM = 4; private final static int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM; + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + /** * Shared between all the test. Make sure to have at least NUM_SLOTS * available after your test finishes, e.g. cancel the job you submitted. @@ -229,7 +239,8 @@ public Integer getKey(Tuple2 value) throws Exception { queryName, key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); serializedResult.onSuccess(new OnSuccess() { @Override @@ -352,7 +363,8 @@ public Integer getKey(Tuple2 value) throws Exception { queryName, key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -455,7 +467,8 @@ public ExecutionGraph apply(ExecutionGraphFound found) { queryName, key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -731,7 +744,8 @@ private void executeValueQuery(final Deadline deadline, queryableState.getQueryableStateName(), key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedValue = Await.result(future, deadline.timeLeft()); @@ -752,6 +766,132 @@ private void executeValueQuery(final Deadline deadline, } } + /** + * Tests simple value state queryable state instance with a default value + * set using the {@link MemoryStateBackend}. + */ + @Test(expected = UnknownKeyOrNamespace.class) + public void testValueStateDefaultValueMemoryBackend() throws Exception { + testValueStateDefault(new MemoryStateBackend()); + } + + /** + * Tests simple value state queryable state instance with a default value + * set using the {@link RocksDBStateBackend}. + */ + @Test(expected = UnknownKeyOrNamespace.class) + public void testValueStateDefaultValueRocksDBBackend() throws Exception { + testValueStateDefault(new RocksDBStateBackend( + temporaryFolder.newFolder().toURI().toString())); + } + + /** + * Tests simple value state queryable state instance with a default value + * set using the {@link FsStateBackend}. + */ + @Test(expected = UnknownKeyOrNamespace.class) + public void testValueStateDefaultValueFsBackend() throws Exception { + testValueStateDefault(new FsStateBackend( + temporaryFolder.newFolder().toURI().toString())); + } + + /** + * Tests simple value state queryable state instance with a default value + * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements) + * tuples, the key is mapped to 1 but key 0 is queried which should throw + * a {@link UnknownKeyOrNamespace} exception. + * + * @param stateBackend state back-end to use for the job + * + * @throws UnknownKeyOrNamespace thrown due querying a non-existent key + */ + void testValueStateDefault(final AbstractStateBackend stateBackend) throws + Exception, UnknownKeyOrNamespace { + + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final int numElements = 1024; + + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(NUM_SLOTS); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies + .fixedDelayRestart(Integer.MAX_VALUE, 1000)); + + env.setStateBackend(stateBackend); + + DataStream> source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor> valueState = + new ValueStateDescriptor<>( + "any", + source.getType(), + Tuple2.of(0, 1337l)); + + // only expose key "1" + QueryableStateStream> + queryableState = + source.keyBy( + new KeySelector, Integer>() { + @Override + public Integer getKey( + Tuple2 value) throws + Exception { + return 1; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // Now query + int key = 0; + final byte[] serializedKey = + KvStateRequestSerializer.serializeKeyAndNamespace( + key, + queryableState.getKeySerializer(), + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + Future future = getKvStateWithRetries(client, + jobId, + queryableState.getQueryableStateName(), + key, + serializedKey, + QUERY_RETRY_DELAY, + true); + + Await.result(future, deadline.timeLeft()); + } finally { + // Free cluster resources + if (jobId != null) { + Future cancellation = cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), + deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply( + CancellationSuccess.class)); + + Await.ready(cancellation, deadline.timeLeft()); + } + + client.shutDown(); + } + } + /** * Tests simple value state queryable state instance. Each source emits * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then @@ -883,7 +1023,8 @@ public Integer getKey(Tuple2 value) throws Exception { queryableState.getQueryableStateName(), key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedValue = Await.result(future, deadline.timeLeft()); @@ -993,7 +1134,8 @@ private static Future getKvStateWithRetries( final String queryName, final int key, final byte[] serializedKey, - final FiniteDuration retryDelay) { + final FiniteDuration retryDelay, + final boolean failForUknownKeyOrNamespace) { return client.getKvState(jobId, queryName, key, serializedKey) .recoverWith(new Recover>() { @@ -1001,6 +1143,9 @@ private static Future getKvStateWithRetries( public Future recover(Throwable failure) throws Throwable { if (failure instanceof AssertionError) { return Futures.failed(failure); + } else if (failForUknownKeyOrNamespace && + (failure instanceof UnknownKeyOrNamespace)) { + return Futures.failed(failure); } else { // At startup some failures are expected // due to races. Make sure that they don't @@ -1018,7 +1163,8 @@ public Future call() throws Exception { queryName, key, serializedKey, - retryDelay); + retryDelay, + failForUknownKeyOrNamespace); } }); }