Skip to content

Commit

Permalink
[FLINK-5613][query] querying a non-existing key is inconsistent among…
Browse files Browse the repository at this point in the history
… state backends

Querying for a non-existing key for a state that has a default value set
currently results in an UnknownKeyOrNamespace exception when the
MemoryStateBackend or FsStateBackend is used but results in the default value
if RocksDBStateBackend is set.

This removes the special handling from the RocksDBStateBackend and makes it
consistent with the other two back-ends, i.e. returning null which results
in the mentioned UnknownKeyOrNamespace exception.

This closes #3193
  • Loading branch information
Nico Kruber authored and rmetzger committed Jan 23, 2017
1 parent 24fe081 commit da26bdc
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 19 deletions.
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.RocksDBException;
Expand Down Expand Up @@ -103,15 +102,4 @@ public void update(V value) throws IOException {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}

@Override
public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
byte[] value = super.getSerializedValue(serializedKeyAndNamespace);

if (value != null) {
return value;
} else {
return KvStateRequestSerializer.serializeValue(stateDesc.getDefaultValue(), stateDesc.getSerializer());
}
}
}
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.QueryableStateOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
Expand All @@ -50,10 +51,14 @@
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.QueryableStateClient;
import org.apache.flink.runtime.query.netty.UnknownKeyOrNamespace;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
Expand All @@ -67,7 +72,9 @@
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
Expand Down Expand Up @@ -102,6 +109,9 @@ public class QueryableStateITCase extends TestLogger {
private final static int NUM_SLOTS_PER_TM = 4;
private final static int NUM_SLOTS = NUM_TMS * NUM_SLOTS_PER_TM;

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

/**
* Shared between all the test. Make sure to have at least NUM_SLOTS
* available after your test finishes, e.g. cancel the job you submitted.
Expand Down Expand Up @@ -229,7 +239,8 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
queryName,
key,
serializedKey,
QUERY_RETRY_DELAY);
QUERY_RETRY_DELAY,
false);

serializedResult.onSuccess(new OnSuccess<byte[]>() {
@Override
Expand Down Expand Up @@ -352,7 +363,8 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
queryName,
key,
serializedKey,
QUERY_RETRY_DELAY);
QUERY_RETRY_DELAY,
false);

byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());

Expand Down Expand Up @@ -455,7 +467,8 @@ public ExecutionGraph apply(ExecutionGraphFound found) {
queryName,
key,
serializedKey,
QUERY_RETRY_DELAY);
QUERY_RETRY_DELAY,
false);

byte[] serializedResult = Await.result(serializedResultFuture, deadline.timeLeft());

Expand Down Expand Up @@ -731,7 +744,8 @@ private void executeValueQuery(final Deadline deadline,
queryableState.getQueryableStateName(),
key,
serializedKey,
QUERY_RETRY_DELAY);
QUERY_RETRY_DELAY,
false);

byte[] serializedValue = Await.result(future, deadline.timeLeft());

Expand All @@ -752,6 +766,132 @@ private void executeValueQuery(final Deadline deadline,
}
}

/**
* Tests simple value state queryable state instance with a default value
* set using the {@link MemoryStateBackend}.
*/
@Test(expected = UnknownKeyOrNamespace.class)
public void testValueStateDefaultValueMemoryBackend() throws Exception {
testValueStateDefault(new MemoryStateBackend());
}

/**
* Tests simple value state queryable state instance with a default value
* set using the {@link RocksDBStateBackend}.
*/
@Test(expected = UnknownKeyOrNamespace.class)
public void testValueStateDefaultValueRocksDBBackend() throws Exception {
testValueStateDefault(new RocksDBStateBackend(
temporaryFolder.newFolder().toURI().toString()));
}

/**
* Tests simple value state queryable state instance with a default value
* set using the {@link FsStateBackend}.
*/
@Test(expected = UnknownKeyOrNamespace.class)
public void testValueStateDefaultValueFsBackend() throws Exception {
testValueStateDefault(new FsStateBackend(
temporaryFolder.newFolder().toURI().toString()));
}

/**
* Tests simple value state queryable state instance with a default value
* set. Each source emits (subtaskIndex, 0)..(subtaskIndex, numElements)
* tuples, the key is mapped to 1 but key 0 is queried which should throw
* a {@link UnknownKeyOrNamespace} exception.
*
* @param stateBackend state back-end to use for the job
*
* @throws UnknownKeyOrNamespace thrown due querying a non-existent key
*/
void testValueStateDefault(final AbstractStateBackend stateBackend) throws
Exception, UnknownKeyOrNamespace {

// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();

final int numElements = 1024;

final QueryableStateClient client = new QueryableStateClient(cluster.configuration());

JobID jobId = null;
try {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(NUM_SLOTS);
// Very important, because cluster is shared between tests and we
// don't explicitly check that all slots are available before
// submitting.
env.setRestartStrategy(RestartStrategies
.fixedDelayRestart(Integer.MAX_VALUE, 1000));

env.setStateBackend(stateBackend);

DataStream<Tuple2<Integer, Long>> source = env
.addSource(new TestAscendingValueSource(numElements));

// Value state
ValueStateDescriptor<Tuple2<Integer, Long>> valueState =
new ValueStateDescriptor<>(
"any",
source.getType(),
Tuple2.of(0, 1337l));

// only expose key "1"
QueryableStateStream<Integer, Tuple2<Integer, Long>>
queryableState =
source.keyBy(
new KeySelector<Tuple2<Integer, Long>, Integer>() {
@Override
public Integer getKey(
Tuple2<Integer, Long> value) throws
Exception {
return 1;
}
}).asQueryableState("hakuna", valueState);

// Submit the job graph
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
jobId = jobGraph.getJobID();

cluster.submitJobDetached(jobGraph);

// Now query
int key = 0;
final byte[] serializedKey =
KvStateRequestSerializer.serializeKeyAndNamespace(
key,
queryableState.getKeySerializer(),
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE);

Future<byte[]> future = getKvStateWithRetries(client,
jobId,
queryableState.getQueryableStateName(),
key,
serializedKey,
QUERY_RETRY_DELAY,
true);

Await.result(future, deadline.timeLeft());
} finally {
// Free cluster resources
if (jobId != null) {
Future<CancellationSuccess> cancellation = cluster
.getLeaderGateway(deadline.timeLeft())
.ask(new JobManagerMessages.CancelJob(jobId),
deadline.timeLeft())
.mapTo(ClassTag$.MODULE$.<CancellationSuccess>apply(
CancellationSuccess.class));

Await.ready(cancellation, deadline.timeLeft());
}

client.shutDown();
}
}

/**
* Tests simple value state queryable state instance. Each source emits
* (subtaskIndex, 0)..(subtaskIndex, numElements) tuples, which are then
Expand Down Expand Up @@ -883,7 +1023,8 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
queryableState.getQueryableStateName(),
key,
serializedKey,
QUERY_RETRY_DELAY);
QUERY_RETRY_DELAY,
false);

byte[] serializedValue = Await.result(future, deadline.timeLeft());

Expand Down Expand Up @@ -993,14 +1134,18 @@ private static Future<byte[]> getKvStateWithRetries(
final String queryName,
final int key,
final byte[] serializedKey,
final FiniteDuration retryDelay) {
final FiniteDuration retryDelay,
final boolean failForUknownKeyOrNamespace) {

return client.getKvState(jobId, queryName, key, serializedKey)
.recoverWith(new Recover<Future<byte[]>>() {
@Override
public Future<byte[]> recover(Throwable failure) throws Throwable {
if (failure instanceof AssertionError) {
return Futures.failed(failure);
} else if (failForUknownKeyOrNamespace &&
(failure instanceof UnknownKeyOrNamespace)) {
return Futures.failed(failure);
} else {
// At startup some failures are expected
// due to races. Make sure that they don't
Expand All @@ -1018,7 +1163,8 @@ public Future<byte[]> call() throws Exception {
queryName,
key,
serializedKey,
retryDelay);
retryDelay,
failForUknownKeyOrNamespace);
}
});
}
Expand Down

0 comments on commit da26bdc

Please sign in to comment.