Skip to content

Commit

Permalink
[FLINK-9702] Improvement in (de)serialization of keys and values for …
Browse files Browse the repository at this point in the history
…RocksDB state

This closes #7288.

Co-authored-by: Stefan Richter <s.richter@data-artisans.com>
Co-authored-by: klion26 <qcx978132955@gmail.com>
  • Loading branch information
StefanRRichter and klion26 committed Jan 7, 2019
1 parent 6f9a884 commit 54ef382
Show file tree
Hide file tree
Showing 12 changed files with 643 additions and 205 deletions.
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;

Expand Down Expand Up @@ -112,7 +111,7 @@ public void setPosition(int position) {
}

@Override
public void close() throws IOException {
public void close() {
}

public byte[] getBuf() {
Expand Down
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.core.memory;

import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -350,6 +352,11 @@ public void write(DataInputView source, int numBytes) throws IOException {
this.position += numBytes;
}

public void setPosition(int position) {
Preconditions.checkArgument(position >= 0 && position <= this.position, "Position out of bounds.");
this.position = position;
}

// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.rocksdb.WriteOptions;

import java.io.IOException;
import java.util.List;

/**
* Base class for {@link State} implementations that store state in a RocksDB database.
Expand Down Expand Up @@ -70,7 +71,7 @@ public abstract class AbstractRocksDBState<K, N, V> implements InternalKvState<K

protected final DataInputDeserializer dataInputView;

private final boolean ambiguousKeyPossible;
private final RocksDBSerializedCompositeKeyBuilder<K> sharedKeyNamespaceSerializer;

/**
* Creates a new RocksDB backed state.
Expand Down Expand Up @@ -99,26 +100,23 @@ protected AbstractRocksDBState(

this.dataOutputView = new DataOutputSerializer(128);
this.dataInputView = new DataInputDeserializer();
this.ambiguousKeyPossible =
RocksDBKeySerializationUtils.isAmbiguousKeyPossible(backend.getKeySerializer(), namespaceSerializer);
this.sharedKeyNamespaceSerializer = backend.getSharedRocksKeyBuilder();
}

// ------------------------------------------------------------------------

@Override
public void clear() {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = dataOutputView.getCopyOfBuffer();
backend.db.delete(columnFamily, writeOptions, key);
} catch (IOException | RocksDBException e) {
backend.db.delete(columnFamily, writeOptions, serializeCurrentKeyWithGroupAndNamespace());
} catch (RocksDBException e) {
throw new FlinkRuntimeException("Error while removing entry from RocksDB", e);
}
}

@Override
public void setCurrentNamespace(N namespace) {
this.currentNamespace = Preconditions.checkNotNull(namespace, "Namespace");
this.currentNamespace = namespace;
}

@Override
Expand All @@ -128,30 +126,78 @@ public byte[] getSerializedValue(
final TypeSerializer<N> safeNamespaceSerializer,
final TypeSerializer<V> safeValueSerializer) throws Exception {

Preconditions.checkNotNull(serializedKeyAndNamespace);
Preconditions.checkNotNull(safeKeySerializer);
Preconditions.checkNotNull(safeNamespaceSerializer);
Preconditions.checkNotNull(safeValueSerializer);

//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
Tuple2<K, N> keyAndNamespace = KvStateSerializer.deserializeKeyAndNamespace(
serializedKeyAndNamespace, safeKeySerializer, safeNamespaceSerializer);

int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(keyAndNamespace.f0, backend.getNumberOfKeyGroups());

// we cannot reuse the keySerializationStream member since this method
// is called concurrently to the other ones and it may thus contain garbage
DataOutputSerializer tmpKeySerializationView = new DataOutputSerializer(128);
RocksDBSerializedCompositeKeyBuilder<K> keyBuilder =
new RocksDBSerializedCompositeKeyBuilder<>(
safeKeySerializer,
backend.getKeyGroupPrefixBytes(),
32
);
keyBuilder.setKeyAndKeyGroup(keyAndNamespace.f0, keyGroup);
byte[] key = keyBuilder.buildCompositeKeyNamespace(keyAndNamespace.f1, namespaceSerializer);
return backend.db.get(columnFamily, key);
}

<UK> byte[] serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
UK userKey,
TypeSerializer<UK> userKeySerializer) throws IOException {
return sharedKeyNamespaceSerializer.buildCompositeKeyNamesSpaceUserKey(
currentNamespace,
namespaceSerializer,
userKey,
userKeySerializer
);
}

private <T> byte[] serializeValueInternal(T value, TypeSerializer<T> serializer) throws IOException {
serializer.serialize(value, dataOutputView);
return dataOutputView.getCopyOfBuffer();
}

byte[] serializeCurrentKeyWithGroupAndNamespace() {
return sharedKeyNamespaceSerializer.buildCompositeKeyNamespace(currentNamespace, namespaceSerializer);
}

byte[] serializeValue(V value) throws IOException {
return serializeValue(value, valueSerializer);
}

<T> byte[] serializeValueNullSensitive(T value, TypeSerializer<T> serializer) throws IOException {
dataOutputView.clear();
dataOutputView.writeBoolean(value == null);
return serializeValueInternal(value, serializer);
}

<T> byte[] serializeValue(T value, TypeSerializer<T> serializer) throws IOException {
dataOutputView.clear();
return serializeValueInternal(value, serializer);
}

<T> byte[] serializeValueList(
List<T> valueList,
TypeSerializer<T> elementSerializer,
byte delimiter) throws IOException {

dataOutputView.clear();
boolean first = true;

writeKeyWithGroupAndNamespace(
keyGroup,
keyAndNamespace.f0,
safeKeySerializer,
keyAndNamespace.f1,
safeNamespaceSerializer,
tmpKeySerializationView);
for (T value : valueList) {
Preconditions.checkNotNull(value, "You cannot add null to a value list.");

if (first) {
first = false;
} else {
dataOutputView.write(delimiter);
}
elementSerializer.serialize(value, dataOutputView);
}

return backend.db.get(columnFamily, tmpKeySerializationView.getCopyOfBuffer());
return dataOutputView.getCopyOfBuffer();
}

public void migrateSerializedValue(
Expand All @@ -169,12 +215,7 @@ public void migrateSerializedValue(
}

byte[] getKeyBytes() {
try {
writeCurrentKeyWithGroupAndNamespace();
return dataOutputView.getCopyOfBuffer();
} catch (IOException e) {
throw new FlinkRuntimeException("Error while serializing key", e);
}
return serializeCurrentKeyWithGroupAndNamespace();
}

byte[] getValueBytes(V value) {
Expand All @@ -187,45 +228,6 @@ byte[] getValueBytes(V value) {
}
}

protected void writeCurrentKeyWithGroupAndNamespace() throws IOException {
writeKeyWithGroupAndNamespace(
backend.getCurrentKeyGroupIndex(),
backend.getCurrentKey(),
currentNamespace,
dataOutputView);
}

protected void writeKeyWithGroupAndNamespace(
int keyGroup, K key, N namespace,
DataOutputSerializer keySerializationDataOutputView) throws IOException {

writeKeyWithGroupAndNamespace(
keyGroup,
key,
backend.getKeySerializer(),
namespace,
namespaceSerializer,
keySerializationDataOutputView);
}

protected void writeKeyWithGroupAndNamespace(
final int keyGroup,
final K key,
final TypeSerializer<K> keySerializer,
final N namespace,
final TypeSerializer<N> namespaceSerializer,
final DataOutputSerializer keySerializationDataOutputView) throws IOException {

Preconditions.checkNotNull(key, "No key set. This method should not be called outside of a keyed context.");
Preconditions.checkNotNull(keySerializer);
Preconditions.checkNotNull(namespaceSerializer);

keySerializationDataOutputView.clear();
RocksDBKeySerializationUtils.writeKeyGroup(keyGroup, backend.getKeyGroupPrefixBytes(), keySerializationDataOutputView);
RocksDBKeySerializationUtils.writeKey(key, keySerializer, keySerializationDataOutputView, ambiguousKeyPossible);
RocksDBKeySerializationUtils.writeNameSpace(namespace, namespaceSerializer, keySerializationDataOutputView, ambiguousKeyPossible);
}

protected V getDefaultValue() {
if (defaultValue != null) {
return valueSerializer.copy(defaultValue);
Expand Down
Expand Up @@ -109,19 +109,14 @@ public void mergeNamespaces(N target, Collection<N> sources) {
return;
}

// cache key and namespace
final K key = backend.getCurrentKey();
final int keyGroup = backend.getCurrentKeyGroupIndex();

try {
ACC current = null;

// merge the sources to the target
for (N source : sources) {
if (source != null) {
writeKeyWithGroupAndNamespace(keyGroup, key, source, dataOutputView);

final byte[] sourceKey = dataOutputView.getCopyOfBuffer();
setCurrentNamespace(source);
final byte[] sourceKey = serializeCurrentKeyWithGroupAndNamespace();
final byte[] valueBytes = backend.db.get(columnFamily, sourceKey);
backend.db.delete(columnFamily, writeOptions, sourceKey);

Expand All @@ -141,10 +136,9 @@ public void mergeNamespaces(N target, Collection<N> sources) {

// if something came out of merging the sources, merge it or write it to the target
if (current != null) {
setCurrentNamespace(target);
// create the target full-binary-key
writeKeyWithGroupAndNamespace(keyGroup, key, target, dataOutputView);

final byte[] targetKey = dataOutputView.getCopyOfBuffer();
final byte[] targetKey = serializeCurrentKeyWithGroupAndNamespace();
final byte[] targetValueBytes = backend.db.get(columnFamily, targetKey);

if (targetValueBytes != null) {
Expand Down
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;

import javax.annotation.Nonnull;

import java.io.IOException;

/**
Expand Down Expand Up @@ -80,8 +82,12 @@ public static <N> void writeNameSpace(
}
}

public static boolean isSerializerTypeVariableSized(@Nonnull TypeSerializer<?> serializer) {
return serializer.getLength() < 0;
}

public static boolean isAmbiguousKeyPossible(TypeSerializer keySerializer, TypeSerializer namespaceSerializer) {
return (keySerializer.getLength() < 0) && (namespaceSerializer.getLength() < 0);
return (isSerializerTypeVariableSized(keySerializer) && isSerializerTypeVariableSized(namespaceSerializer));
}

public static void writeKeyGroup(
Expand All @@ -108,7 +114,7 @@ public static <K> void writeKey(
}
}

private static void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
public static void readVariableIntBytes(DataInputView inputView, int value) throws IOException {
do {
inputView.readByte();
value >>>= 8;
Expand All @@ -122,7 +128,7 @@ private static void writeLengthFrom(
writeVariableIntBytes(length, keySerializationDateDataOutputView);
}

private static void writeVariableIntBytes(
public static void writeVariableIntBytes(
int value,
DataOutputView keySerializationDateDataOutputView)
throws IOException {
Expand Down
Expand Up @@ -242,6 +242,9 @@ <K, N, SV, S extends State, IS extends S> IS createState(
/** The native metrics monitor. */
private RocksDBNativeMetricMonitor nativeMetricMonitor;

/** Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.*/
private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;

public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
Expand Down Expand Up @@ -294,6 +297,7 @@ public RocksDBKeyedStateBackend(
this.kvStateInformation = new LinkedHashMap<>();

this.writeOptions = new WriteOptions().setDisableWAL(true);
this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);

this.metricOptions = metricOptions;
this.metricGroup = metricGroup;
Expand Down Expand Up @@ -373,6 +377,12 @@ private void registerKvStateInformation(String columnFamilyName, Tuple2<ColumnFa
}
}

@Override
public void setCurrentKey(K newKey) {
super.setCurrentKey(newKey);
sharedRocksKeyBuilder.setKeyAndKeyGroup(getCurrentKey(), getCurrentKeyGroupIndex());
}

/**
* Should only be called by one thread, and only after all accesses to the DB happened.
*/
Expand Down Expand Up @@ -456,6 +466,10 @@ public WriteOptions getWriteOptions() {
return writeOptions;
}

RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
return sharedRocksKeyBuilder;
}

/**
* Triggers an asynchronous snapshot of the keyed state backend from RocksDB. This snapshot can be canceled and
* is also stopped when the backend is closed through {@link #dispose()}. For each backend, this method must always
Expand Down

0 comments on commit 54ef382

Please sign in to comment.