Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
* limitations under the License.
*/

package org.apache.flink.test.query;
package org.apache.flink.contrib.streaming.state;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializerTest;
Expand All @@ -39,55 +36,17 @@
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;

import java.io.File;

import static org.mockito.Mockito.mock;

/**
* Additional tests for the serialization and deserialization of {@link
* KvStateRequestSerializer} with a RocksDB state back-end.
*/
public final class KVStateRequestSerializerRocksDBTest {
public final class RocksDBKvStateRequestSerializerTest {

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();

/**
* Extension of {@link RocksDBKeyedStateBackend} to make {@link
* #createListState(TypeSerializer, ListStateDescriptor)} public for use in
* the tests.
*
* @param <K> key type
*/
final static class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend<K> {

RocksDBKeyedStateBackend2(
final JobID jobId,
final String operatorIdentifier,
final ClassLoader userCodeClassLoader,
final File instanceBasePath,
final DBOptions dbOptions,
final ColumnFamilyOptions columnFamilyOptions,
final TaskKvStateRegistry kvStateRegistry,
final TypeSerializer<K> keySerializer,
final int numberOfKeyGroups,
final KeyGroupRange keyGroupRange) throws Exception {

super(jobId, operatorIdentifier, userCodeClassLoader,
instanceBasePath,
dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
numberOfKeyGroups, keyGroupRange);
}

@Override
public <N, T> InternalListState<N, T> createListState(
final TypeSerializer<N> namespaceSerializer,
final ListStateDescriptor<T> stateDesc) throws Exception {

return super.createListState(namespaceSerializer, stateDesc);
}
}

/**
* Tests list serialization and deserialization match.
*
Expand All @@ -103,8 +62,8 @@ public void testListSerialization() throws Exception {
DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);
ColumnFamilyOptions columnFamilyOptions = PredefinedOptions.DEFAULT.createColumnOptions();
final RocksDBKeyedStateBackend2<Long> longHeapKeyedStateBackend =
new RocksDBKeyedStateBackend2<>(
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
new RocksDBKeyedStateBackend<>(
new JobID(), "no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
Expand All @@ -116,9 +75,11 @@ public void testListSerialization() throws Exception {
);
longHeapKeyedStateBackend.setCurrentKey(key);

final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend
.createListState(VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
final InternalListState<VoidNamespace, Long> listState = (InternalListState<VoidNamespace, Long>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));

KvStateRequestSerializerTest.testListSerialization(key, listState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
Expand Down Expand Up @@ -442,6 +443,38 @@ public static <T> T deserializeValue(byte[] serializedValue, TypeSerializer<T> s
}
}

/**
* Serializes all values with the given serializer.
*
* @param list The list to serialize
* @param serializer Serializer for T
* @param <T> Type of the value
* @return Serialized bytes or <code>null</code> if the list is null.
* is <code>null</code>
* @throws IOException On failure during deserialization
*/
public static <T> byte[] serializeList(Iterable<T> list, TypeSerializer<T> serializer) throws IOException {
DataOutputSerializer dos = new DataOutputSerializer(128);

if (list == null) {
return null;
}

Iterator<T> iterator = list.iterator();
// write the same as RocksDB writes lists, with one ',' separator
while (iterator.hasNext()) {
T element = iterator.next();

serializer.serialize(element, dos);

if (iterator.hasNext()) {
dos.writeByte(',');
}
}

return dos.getCopyOfBuffer();
}

/**
* Deserializes all values with the given serializer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(
}

@Override
public <N, V> InternalValueState<N, V> createValueState(
protected <N, V> InternalValueState<N, V> createValueState(
TypeSerializer<N> namespaceSerializer,
ValueStateDescriptor<V> stateDesc) throws Exception {

Expand All @@ -150,7 +150,7 @@ public <N, V> InternalValueState<N, V> createValueState(
}

@Override
public <N, T> InternalListState<N, T> createListState(
protected <N, T> InternalListState<N, T> createListState(
TypeSerializer<N> namespaceSerializer,
ListStateDescriptor<T> stateDesc) throws Exception {

Expand All @@ -168,7 +168,7 @@ public <N, T> InternalListState<N, T> createListState(
}

@Override
public <N, T> InternalReducingState<N, T> createReducingState(
protected <N, T> InternalReducingState<N, T> createReducingState(
TypeSerializer<N> namespaceSerializer,
ReducingStateDescriptor<T> stateDesc) throws Exception {

Expand All @@ -177,7 +177,7 @@ public <N, T> InternalReducingState<N, T> createReducingState(
}

@Override
public <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
protected <N, T, ACC, R> InternalAggregatingState<N, T, R> createAggregatingState(
TypeSerializer<N> namespaceSerializer,
AggregatingStateDescriptor<T, ACC, R> stateDesc) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.util.Preconditions;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Map;

Expand Down Expand Up @@ -143,21 +142,7 @@ public byte[] getSerializedValue(K key, N namespace) throws Exception {
return null;
}

TypeSerializer<V> serializer = stateDesc.getElementSerializer();

ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(baos);

// write the same as RocksDB writes lists, with one ',' separator
for (int i = 0; i < result.size(); i++) {
serializer.serialize(result.get(i), view);
if (i < result.size() -1) {
view.writeByte(',');
}
}
view.flush();

return baos.toByteArray();
return KvStateRequestSerializer.serializeList(result, stateDesc.getElementSerializer());
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,11 @@ public void testListSerialization() throws Exception {
);
longHeapKeyedStateBackend.setCurrentKey(key);

final InternalListState<VoidNamespace, Long> listState = longHeapKeyedStateBackend.createListState(
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));
final InternalListState<VoidNamespace, Long> listState = (InternalListState<VoidNamespace, Long>)
longHeapKeyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("test", LongSerializer.INSTANCE));

testListSerialization(key, listState);
}
Expand Down