From 7b8443cd3d7f405bed97f00e1542a1004aef9ff8 Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Fri, 11 May 2018 23:15:07 +0800 Subject: [PATCH 1/2] keep the userKeyOffset in every MapEntry to avoid deserialization problem in queryable state. --- .../contrib/streaming/state/RocksDBMapState.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java index 56a7cc499214f..219f3ae2a9b27 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java @@ -67,9 +67,6 @@ public class RocksDBMapState private final TypeSerializer userKeySerializer; private final TypeSerializer userValueSerializer; - /** The offset of User Key offset in raw key bytes. */ - private int userKeyOffset; - /** * Creates a new {@code RocksDBMapState}. * @@ -305,7 +302,6 @@ public Iterator> iterator() { private byte[] serializeCurrentKeyAndNamespace() throws IOException { writeCurrentKeyWithGroupAndNamespace(); - userKeyOffset = keySerializationStream.getPosition(); return keySerializationStream.toByteArray(); } @@ -338,7 +334,7 @@ private byte[] serializeUserValue(UV userValue, TypeSerializer valueSerializ return keySerializationStream.toByteArray(); } - private UK deserializeUserKey(byte[] rawKeyBytes, TypeSerializer keySerializer) throws IOException { + private UK deserializeUserKey(int userKeyOffset, byte[] rawKeyBytes, TypeSerializer keySerializer) throws IOException { ByteArrayInputStreamWithPos bais = new ByteArrayInputStreamWithPos(rawKeyBytes); DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais); @@ -380,18 +376,23 @@ private class RocksDBMapEntry implements Map.Entry { private UV userValue; + /** The offset of User Key offset in raw key bytes. */ + private final int userKeyOffset; + private TypeSerializer keySerializer; private TypeSerializer valueSerializer; RocksDBMapEntry( @Nonnull final RocksDB db, + @Nonnull final int userKeyOffset, @Nonnull final byte[] rawKeyBytes, @Nonnull final byte[] rawValueBytes, @Nonnull final TypeSerializer keySerializer, @Nonnull final TypeSerializer valueSerializer) { this.db = db; + this.userKeyOffset = userKeyOffset; this.keySerializer = keySerializer; this.valueSerializer = valueSerializer; @@ -415,7 +416,7 @@ public void remove() { public UK getKey() { if (userKey == null) { try { - userKey = deserializeUserKey(rawKeyBytes, keySerializer); + userKey = deserializeUserKey(userKeyOffset, rawKeyBytes, keySerializer); } catch (IOException e) { throw new RuntimeException("Error while deserializing the user key.", e); } @@ -583,6 +584,7 @@ private void loadCache() { RocksDBMapEntry entry = new RocksDBMapEntry( db, + keyPrefixBytes.length, iterator.key(), iterator.value(), keySerializer, From 9fe0dfc9022dcf9466e8da3ee4e2599a1430f4bd Mon Sep 17 00:00:00 2001 From: sihuazhou Date: Sun, 13 May 2018 08:57:13 +0800 Subject: [PATCH 2/2] change `StateBackendTestBase#testMapState()` to guard this change in the future. --- .../runtime/state/StateBackendTestBase.java | 84 ++++++++++--------- 1 file changed, 46 insertions(+), 38 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 784b628727c3c..b809d845ebe89 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -2591,45 +2591,53 @@ public void testFoldingState() throws Exception { @SuppressWarnings("unchecked,rawtypes") public void testMapState() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); - AbstractKeyedStateBackend backend = createKeyedBackend(IntSerializer.INSTANCE); + AbstractKeyedStateBackend backend = createKeyedBackend(StringSerializer.INSTANCE); MapStateDescriptor kvId = new MapStateDescriptor<>("id", Integer.class, String.class); - TypeSerializer keySerializer = IntSerializer.INSTANCE; + TypeSerializer keySerializer = StringSerializer.INSTANCE; TypeSerializer namespaceSerializer = VoidNamespaceSerializer.INSTANCE; MapState state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - InternalKvState> kvState = (InternalKvState>) state; + InternalKvState> kvState = (InternalKvState>) state; // these are only available after the backend initialized the serializer TypeSerializer userKeySerializer = kvId.getKeySerializer(); TypeSerializer userValueSerializer = kvId.getValueSerializer(); // some modifications to the state - backend.setCurrentKey(1); + backend.setCurrentKey("1"); assertNull(state.get(1)); - assertNull(getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + assertNull(getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); state.put(1, "1"); - backend.setCurrentKey(2); + backend.setCurrentKey("2"); assertNull(state.get(2)); - assertNull(getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + assertNull(getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); state.put(2, "2"); - backend.setCurrentKey(1); + + // put entry with different userKeyOffset + backend.setCurrentKey("11"); + state.put(11, "11"); + + backend.setCurrentKey("1"); assertTrue(state.contains(1)); assertEquals("1", state.get(1)); assertEquals(new HashMap() {{ put (1, "1"); }}, - getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + + assertEquals(new HashMap() {{ put (11, "11"); }}, + getSerializedMap(kvState, "11", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); // draw a snapshot KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); // make some more modifications - backend.setCurrentKey(1); + backend.setCurrentKey("1"); state.put(1, "101"); - backend.setCurrentKey(2); + backend.setCurrentKey("2"); state.put(102, "102"); - backend.setCurrentKey(3); + backend.setCurrentKey("3"); state.put(103, "103"); state.putAll(new HashMap() {{ put(1031, "1031"); put(1032, "1032"); }}); @@ -2637,19 +2645,19 @@ public void testMapState() throws Exception { KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); // validate the original state - backend.setCurrentKey(1); + backend.setCurrentKey("1"); assertEquals("101", state.get(1)); assertEquals(new HashMap() {{ put(1, "101"); }}, - getSerializedMap(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(2); + getSerializedMap(kvState, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("2"); assertEquals("102", state.get(102)); assertEquals(new HashMap() {{ put(2, "2"); put(102, "102"); }}, - getSerializedMap(kvState, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(3); + getSerializedMap(kvState, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("3"); assertTrue(state.contains(103)); assertEquals("103", state.get(103)); assertEquals(new HashMap() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }}, - getSerializedMap(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + getSerializedMap(kvState, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); List keys = new ArrayList<>(); for (Integer key : state.keys()) { @@ -2670,11 +2678,11 @@ public void testMapState() throws Exception { assertTrue(values.isEmpty()); // make some more modifications - backend.setCurrentKey(1); + backend.setCurrentKey("1"); state.clear(); - backend.setCurrentKey(2); + backend.setCurrentKey("2"); state.remove(102); - backend.setCurrentKey(3); + backend.setCurrentKey("3"); final String updateSuffix = "_updated"; Iterator> iterator = state.iterator(); while (iterator.hasNext()) { @@ -2687,10 +2695,10 @@ public void testMapState() throws Exception { } // validate the state - backend.setCurrentKey(1); - backend.setCurrentKey(2); + backend.setCurrentKey("1"); + backend.setCurrentKey("2"); assertFalse(state.contains(102)); - backend.setCurrentKey(3); + backend.setCurrentKey("3"); for (Map.Entry entry : state.entries()) { assertEquals(4 + updateSuffix.length(), entry.getValue().length()); assertTrue(entry.getValue().endsWith(updateSuffix)); @@ -2698,44 +2706,44 @@ public void testMapState() throws Exception { backend.dispose(); // restore the first snapshot and validate it - backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); + backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot1); snapshot1.discardState(); MapState restored1 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - InternalKvState> restoredKvState1 = (InternalKvState>) restored1; + InternalKvState> restoredKvState1 = (InternalKvState>) restored1; - backend.setCurrentKey(1); + backend.setCurrentKey("1"); assertEquals("1", restored1.get(1)); assertEquals(new HashMap() {{ put (1, "1"); }}, - getSerializedMap(restoredKvState1, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(2); + getSerializedMap(restoredKvState1, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("2"); assertEquals("2", restored1.get(2)); assertEquals(new HashMap() {{ put (2, "2"); }}, - getSerializedMap(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + getSerializedMap(restoredKvState1, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); backend.dispose(); // restore the second snapshot and validate it - backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot2); + backend = restoreKeyedBackend(StringSerializer.INSTANCE, snapshot2); snapshot2.discardState(); @SuppressWarnings("unchecked") MapState restored2 = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @SuppressWarnings("unchecked") - InternalKvState> restoredKvState2 = (InternalKvState>) restored2; + InternalKvState> restoredKvState2 = (InternalKvState>) restored2; - backend.setCurrentKey(1); + backend.setCurrentKey("1"); assertEquals("101", restored2.get(1)); assertEquals(new HashMap() {{ put (1, "101"); }}, - getSerializedMap(restoredKvState2, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(2); + getSerializedMap(restoredKvState2, "1", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("2"); assertEquals("102", restored2.get(102)); assertEquals(new HashMap() {{ put(2, "2"); put (102, "102"); }}, - getSerializedMap(restoredKvState2, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); - backend.setCurrentKey(3); + getSerializedMap(restoredKvState2, "2", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + backend.setCurrentKey("3"); assertEquals("103", restored2.get(103)); assertEquals(new HashMap() {{ put(103, "103"); put(1031, "1031"); put(1032, "1032"); }}, - getSerializedMap(restoredKvState2, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); + getSerializedMap(restoredKvState2, "3", keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, userKeySerializer, userValueSerializer)); backend.dispose(); }