diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java index 7724f02bf4225..b2a4fba265cfb 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; -import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.internal.InternalValueState; import org.rocksdb.ColumnFamilyHandle; import org.rocksdb.RocksDBException; @@ -103,15 +102,4 @@ public void update(V value) throws IOException { throw new RuntimeException("Error while adding data to RocksDB", e); } } - - @Override - public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception { - byte[] value = super.getSerializedValue(serializedKeyAndNamespace); - - if (value != null) { - return value; - } else { - return KvStateRequestSerializer.serializeValue(stateDesc.getDefaultValue(), stateDesc.getSerializer()); - } - } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java similarity index 88% rename from flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java rename to flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java index 327a715080dc5..5683c5e7c43e6 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/query/AbstractQueryableStateITCase.java @@ -50,12 +50,13 @@ import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateMessage; import org.apache.flink.runtime.query.QueryableStateClient; +import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; +import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages; import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.ResponseRunningTasks; import org.apache.flink.streaming.api.datastream.DataStream; @@ -66,6 +67,7 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import scala.concurrent.Await; @@ -91,17 +93,25 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -public class QueryableStateITCase extends TestLogger { +/** + * Base class for queryable state integration tests with a configurable state backend. + */ +public abstract class AbstractQueryableStateITCase extends TestLogger { private final static FiniteDuration TEST_TIMEOUT = new FiniteDuration(100, TimeUnit.SECONDS); private final static FiniteDuration QUERY_RETRY_DELAY = new FiniteDuration(100, TimeUnit.MILLISECONDS); - private final static ActorSystem TEST_ACTOR_SYSTEM = AkkaUtils.createDefaultActorSystem(); + private static ActorSystem TEST_ACTOR_SYSTEM; private final static int NUM_TMS = 2; private final static int NUM_SLOTS_PER_TM = 4; private final static int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM; + /** + * State backend to use. + */ + protected AbstractStateBackend stateBackend; + /** * Shared between all the test. Make sure to have at least NUM_SLOTS * available after your test finishes, e.g. cancel the job you submitted. @@ -121,6 +131,8 @@ public static void setup() { cluster = new TestingCluster(config, false); cluster.start(true); + + TEST_ACTOR_SYSTEM = AkkaUtils.createDefaultActorSystem(); } catch (Exception e) { e.printStackTrace(); fail(e.getMessage()); @@ -141,6 +153,20 @@ public static void tearDown() { } } + @Before + public void setUp() throws Exception { + // NOTE: do not use a shared instance for all tests as the tests may brake + this.stateBackend = createStateBackend(); + } + + /** + * Creates a state backend instance which is used in the {@link #setUp()} method before each + * test case. + * + * @return a state backend instance for each unit test + */ + protected abstract AbstractStateBackend createStateBackend() throws Exception; + /** * Runs a simple topology producing random (key, 1) pairs at the sources (where * number of keys is in fixed in range 0...numKeys). The records are keyed and @@ -154,7 +180,7 @@ public static void tearDown() { public void testQueryableState() throws Exception { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numKeys = 1024; + final int numKeys = 256; final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); @@ -165,6 +191,7 @@ public void testQueryableState() throws Exception { // Test program // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); env.setParallelism(NUM_SLOTS); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before @@ -229,7 +256,8 @@ public Integer getKey(Tuple2 value) throws Exception { queryName, key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); serializedResult.onSuccess(new OnSuccess() { @Override @@ -287,7 +315,7 @@ public void onSuccess(byte[] result) throws Throwable { public void testQueryableStateWithTaskManagerFailure() throws Exception { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numKeys = 1024; + final int numKeys = 256; final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); @@ -298,12 +326,13 @@ public void testQueryableStateWithTaskManagerFailure() throws Exception { // Test program // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); env.setParallelism(NUM_SLOTS); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before // submitting. env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); - env.getCheckpointConfig().setCheckpointInterval(1000); + env.getCheckpointConfig().setCheckpointInterval(500); DataStream> source = env .addSource(new TestKeyRangeSource(numKeys)); @@ -352,7 +381,8 @@ public Integer getKey(Tuple2 value) throws Exception { queryName, key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -363,18 +393,18 @@ public Integer getKey(Tuple2 value) throws Exception { countForKey = result.f1; assertEquals("Key mismatch", key, result.f0.intValue()); - success = countForKey > 1000; // Wait for some progress + success = countForKey > 3; // Wait for some progress } - assertTrue("No progress for count", countForKey > 1000); + assertTrue("No progress for count", countForKey > 3); long currentCheckpointId = TestKeyRangeSource.LATEST_CHECKPOINT_ID.get(); - long waitUntilCheckpointId = currentCheckpointId + 5; + long waitUntilCheckpointId = currentCheckpointId + 2; // Wait for some checkpoint after the query result while (deadline.hasTimeLeft() && TestKeyRangeSource.LATEST_CHECKPOINT_ID.get() < waitUntilCheckpointId) { - Thread.sleep(500); + Thread.sleep(100); } assertTrue("Did not complete enough checkpoints to continue", @@ -390,7 +420,7 @@ public Integer getKey(Tuple2 value) throws Exception { Future egFuture = cluster.getLeaderGateway(deadline.timeLeft()) .ask(new RequestExecutionGraph(jobId), deadline.timeLeft()) .mapTo(ClassTag$.MODULE$.apply(ExecutionGraphFound.class)) - .map(new Mapper() { + .map(new Mapper() { @Override public ExecutionGraph apply(ExecutionGraphFound found) { return (ExecutionGraph) found.executionGraph(); @@ -455,7 +485,8 @@ public ExecutionGraph apply(ExecutionGraphFound found) { queryName, key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft()); @@ -486,7 +517,7 @@ public ExecutionGraph apply(ExecutionGraphFound found) { public void testDuplicateRegistrationFailsJob() throws Exception { // Config final Deadline deadline = TEST_TIMEOUT.fromNow(); - final int numKeys = 1024; + final int numKeys = 256; JobID jobId = null; @@ -495,6 +526,7 @@ public void testDuplicateRegistrationFailsJob() throws Exception { // Test program // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); env.setParallelism(NUM_SLOTS); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before @@ -532,7 +564,7 @@ public Integer getKey(Tuple2 value) throws Exception { JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobId = jobGraph.getJobID(); - Future failedFuture = cluster + Future failedFuture = cluster .getLeaderGateway(deadline.timeLeft()) .ask(new NotifyWhenJobStatus(jobId, JobStatus.FAILED), deadline.timeLeft()) .mapTo(ClassTag$.MODULE$.apply(JobStatusIs.class)); @@ -587,6 +619,7 @@ public void testValueState() throws Exception { JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); env.setParallelism(NUM_SLOTS); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before @@ -651,6 +684,7 @@ public void testQueryNonStartedJobState() throws Exception { JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); env.setParallelism(NUM_SLOTS); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before @@ -731,7 +765,8 @@ private void executeValueQuery(final Deadline deadline, queryableState.getQueryableStateName(), key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedValue = Await.result(future, deadline.timeLeft()); @@ -752,6 +787,101 @@ private void executeValueQuery(final Deadline deadline, } } + /** + * Tests simple value state queryable state instance with a default value + * set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements) + * tuples, the key is mapped to 1 but key 0 is queried which should throw + * a {@link UnknownKeyOrNamespace} exception. + * + * @throws UnknownKeyOrNamespace thrown due querying a non-existent key + */ + @Test(expected = UnknownKeyOrNamespace.class) + public void testValueStateDefault() throws + Exception, UnknownKeyOrNamespace { + + // Config + final Deadline deadline = TEST_TIMEOUT.fromNow(); + + final int numElements = 1024; + + final QueryableStateClient client = new QueryableStateClient(cluster.configuration()); + + JobID jobId = null; + try { + StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); + env.setParallelism(NUM_SLOTS); + // Very important, because cluster is shared between tests and we + // don't explicitly check that all slots are available before + // submitting. + env.setRestartStrategy(RestartStrategies + .fixedDelayRestart(Integer.MAX_VALUE, 1000)); + + DataStream> source = env + .addSource(new TestAscendingValueSource(numElements)); + + // Value state + ValueStateDescriptor> valueState = + new ValueStateDescriptor<>( + "any", + source.getType(), + Tuple2.of(0, 1337l)); + + // only expose key "1" + QueryableStateStream> + queryableState = + source.keyBy( + new KeySelector, Integer>() { + @Override + public Integer getKey( + Tuple2 value) throws + Exception { + return 1; + } + }).asQueryableState("hakuna", valueState); + + // Submit the job graph + JobGraph jobGraph = env.getStreamGraph().getJobGraph(); + jobId = jobGraph.getJobID(); + + cluster.submitJobDetached(jobGraph); + + // Now query + int key = 0; + final byte[] serializedKey = + KvStateRequestSerializer.serializeKeyAndNamespace( + key, + queryableState.getKeySerializer(), + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE); + + Future future = getKvStateWithRetries(client, + jobId, + queryableState.getQueryableStateName(), + key, + serializedKey, + QUERY_RETRY_DELAY, + true); + + Await.result(future, deadline.timeLeft()); + } finally { + // Free cluster resources + if (jobId != null) { + Future cancellation = cluster + .getLeaderGateway(deadline.timeLeft()) + .ask(new JobManagerMessages.CancelJob(jobId), + deadline.timeLeft()) + .mapTo(ClassTag$.MODULE$.apply( + CancellationSuccess.class)); + + Await.ready(cancellation, deadline.timeLeft()); + } + + client.shutDown(); + } + } + /** * Tests simple value state queryable state instance. Each source emits * (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then @@ -772,6 +902,7 @@ public void testValueStateShortcut() throws Exception { JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); env.setParallelism(NUM_SLOTS); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before @@ -835,6 +966,7 @@ public void testFoldingState() throws Exception { JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); env.setParallelism(NUM_SLOTS); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before @@ -883,7 +1015,8 @@ public Integer getKey(Tuple2 value) throws Exception { queryableState.getQueryableStateName(), key, serializedKey, - QUERY_RETRY_DELAY); + QUERY_RETRY_DELAY, + false); byte[] serializedValue = Await.result(future, deadline.timeLeft()); @@ -934,6 +1067,7 @@ public void testReducingState() throws Exception { JobID jobId = null; try { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStateBackend(stateBackend); env.setParallelism(NUM_SLOTS); // Very important, because cluster is shared between tests and we // don't explicitly check that all slots are available before @@ -993,7 +1127,8 @@ private static Future getKvStateWithRetries( final String queryName, final int key, final byte[] serializedKey, - final FiniteDuration retryDelay) { + final FiniteDuration retryDelay, + final boolean failForUknownKeyOrNamespace) { return client.getKvState(jobId, queryName, key, serializedKey) .recoverWith(new Recover>() { @@ -1001,6 +1136,9 @@ private static Future getKvStateWithRetries( public Future recover(Throwable failure) throws Throwable { if (failure instanceof AssertionError) { return Futures.failed(failure); + } else if (failForUknownKeyOrNamespace && + (failure instanceof UnknownKeyOrNamespace)) { + return Futures.failed(failure); } else { // At startup some failures are expected // due to races. Make sure that they don't @@ -1018,7 +1156,8 @@ public Future call() throws Exception { queryName, key, serializedKey, - retryDelay); + retryDelay, + failForUknownKeyOrNamespace); } }); } @@ -1115,6 +1254,8 @@ public void run(SourceContext> ctx) throws Exception { record.f0 = random.nextInt(numKeys); ctx.collect(record); } + // mild slow down + Thread.sleep(1); } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java new file mode 100644 index 0000000000000..6c72bcaee85e3 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseFsBackend.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.query; + +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.filesystem.FsStateBackend; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link FsStateBackend}. + */ +public class QueryableStateITCaseFsBackend extends AbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new FsStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java new file mode 100644 index 0000000000000..312970e4c2c83 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseMemoryBackend.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.query; + +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; + +/** + * Several integration tests for queryable state using the {@link MemoryStateBackend}. + */ +public class QueryableStateITCaseMemoryBackend extends AbstractQueryableStateITCase { + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new MemoryStateBackend(); + } + +} diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java new file mode 100644 index 0000000000000..4799da413bcbc --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/query/QueryableStateITCaseRocksDBBackend.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.query; + +import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; +import org.apache.flink.runtime.state.AbstractStateBackend; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +/** + * Several integration tests for queryable state using the {@link RocksDBStateBackend}. + */ +public class QueryableStateITCaseRocksDBBackend extends AbstractQueryableStateITCase { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + @Override + protected AbstractStateBackend createStateBackend() throws Exception { + return new RocksDBStateBackend(temporaryFolder.newFolder().toURI().toString()); + } +}