diff --git a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKvStateRequestSerializerTest.java similarity index 60% rename from flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java rename to flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKvStateRequestSerializerTest.java index 05624437ffd3f..64389ad5e4657 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/query/KVStateRequestSerializerRocksDBTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBKvStateRequestSerializerTest.java @@ -16,14 +16,11 @@ * limitations under the License. */ -package org.apache.flink.test.query; +package org.apache.flink.contrib.streaming.state; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; -import org.apache.flink.contrib.streaming.state.PredefinedOptions; -import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest; @@ -39,55 +36,17 @@ import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; -import java.io.File; - import static org.mockito.Mockito.mock; /** * Additional tests for the serialization and deserialization of {@link * KvStateRequestSerializer} with a RocksDB state back-end. */ -public final class KVStateRequestSerializerRocksDBTest { +public final class RocksDBKvStateRequestSerializerTest { @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); - /** - * Extension of {@link RocksDBKeyedStateBackend} to make {@link - * #createListState(TypeSerializer, ListStateDescriptor)} public for use in - * the tests. - * - * @param key type - */ - final static class RocksDBKeyedStateBackend2 extends RocksDBKeyedStateBackend { - - RocksDBKeyedStateBackend2( - final JobID jobId, - final String operatorIdentifier, - final ClassLoader userCodeClassLoader, - final File instanceBasePath, - final DBOptions dbOptions, - final ColumnFamilyOptions columnFamilyOptions, - final TaskKvStateRegistry kvStateRegistry, - final TypeSerializer keySerializer, - final int numberOfKeyGroups, - final KeyGroupRange keyGroupRange) throws Exception { - - super(jobId, operatorIdentifier, userCodeClassLoader, - instanceBasePath, - dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer, - numberOfKeyGroups, keyGroupRange); - } - - @Override - public InternalListState createListState( - final TypeSerializer namespaceSerializer, - final ListStateDescriptor stateDesc) throws Exception { - - return super.createListState(namespaceSerializer, stateDesc); - } - } - /** * Tests list serialization and deserialization match. * @@ -103,8 +62,8 @@ public void testListSerialization() throws Exception { DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions(); dbOptions.setCreateIfMissing(true); ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions(); - final RocksDBKeyedStateBackend2 longHeapKeyedStateBackend = - new RocksDBKeyedStateBackend2<>( + final RocksDBKeyedStateBackend longHeapKeyedStateBackend = + new RocksDBKeyedStateBackend<>( new JobID(), "no-op", ClassLoader.getSystemClassLoader(), temporaryFolder.getRoot(), @@ -116,9 +75,11 @@ public void testListSerialization() throws Exception { ); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalListState listState = longHeapKeyedStateBackend - .createListState(VoidNamespaceSerializer.INSTANCE, - new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); + final InternalListState listState = (InternalListState) + longHeapKeyedStateBackend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); KvStateRequestSerializerTest.testListSerialization(key, listState); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java index 2f32861d6f393..7c144fa0ce3fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializer.java @@ -36,6 +36,7 @@ import java.io.ObjectOutputStream; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; /** @@ -442,6 +443,38 @@ public static T deserializeValue(byte[] serializedValue, TypeSerializer s } } + /** + * Serializes all values with the given serializer. + * + * @param list The list to serialize + * @param serializer Serializer for T + * @param Type of the value + * @return Serialized bytes or null if the list is null. + * is null + * @throws IOException On failure during deserialization + */ + public static byte[] serializeList(Iterable list, TypeSerializer serializer) throws IOException { + DataOutputSerializer dos = new DataOutputSerializer(128); + + if (list == null) { + return null; + } + + Iterator iterator = list.iterator(); + // write the same as RocksDB writes lists, with one ',' separator + while (iterator.hasNext()) { + T element = iterator.next(); + + serializer.serialize(element, dos); + + if (iterator.hasNext()) { + dos.writeByte(','); + } + } + + return dos.getCopyOfBuffer(); + } + /** * Deserializes all values with the given serializer. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java index e386e0faccb4d..63827f67cb7f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java @@ -141,7 +141,7 @@ private StateTable tryRegisterStateTable( } @Override - public InternalValueState createValueState( + protected InternalValueState createValueState( TypeSerializer namespaceSerializer, ValueStateDescriptor stateDesc) throws Exception { @@ -150,7 +150,7 @@ public InternalValueState createValueState( } @Override - public InternalListState createListState( + protected InternalListState createListState( TypeSerializer namespaceSerializer, ListStateDescriptor stateDesc) throws Exception { @@ -168,7 +168,7 @@ public InternalListState createListState( } @Override - public InternalReducingState createReducingState( + protected InternalReducingState createReducingState( TypeSerializer namespaceSerializer, ReducingStateDescriptor stateDesc) throws Exception { @@ -177,7 +177,7 @@ public InternalReducingState createReducingState( } @Override - public InternalAggregatingState createAggregatingState( + protected InternalAggregatingState createAggregatingState( TypeSerializer namespaceSerializer, AggregatingStateDescriptor stateDesc) throws Exception { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java index 02c3067f3efd2..0754da604ed8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapListState.java @@ -21,13 +21,12 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.state.KeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalListState; import org.apache.flink.util.Preconditions; -import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.Map; @@ -143,21 +142,7 @@ public byte[] getSerializedValue(K key, N namespace) throws Exception { return null; } - TypeSerializer serializer = stateDesc.getElementSerializer(); - - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos); - - // write the same as RocksDB writes lists, with one ',' separator - for (int i = 0; i < result.size(); i++) { - serializer.serialize(result.get(i), view); - if (i < result.size() -1) { - view.writeByte(','); - } - } - view.flush(); - - return baos.toByteArray(); + return KvStateRequestSerializer.serializeList(result, stateDesc.getElementSerializer()); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java index 69dbe6f2466d6..1289028824d8a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/query/netty/message/KvStateRequestSerializerTest.java @@ -329,9 +329,11 @@ public void testListSerialization() throws Exception { ); longHeapKeyedStateBackend.setCurrentKey(key); - final InternalListState listState = longHeapKeyedStateBackend.createListState( - VoidNamespaceSerializer.INSTANCE, - new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); + final InternalListState listState = (InternalListState) + longHeapKeyedStateBackend.getPartitionedState( + VoidNamespace.INSTANCE, + VoidNamespaceSerializer.INSTANCE, + new ListStateDescriptor<>("test", LongSerializer.INSTANCE)); testListSerialization(key, listState); }