Skip to content

Commit

Permalink
[FLINK-35048] Support AsyncKeyedStateBackend for ForStKeyedStateBackend
Browse files Browse the repository at this point in the history
  • Loading branch information
masteryhx committed May 7, 2024
1 parent df45ac7 commit a06c3a1
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,59 @@
package org.apache.flink.state.forst;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.Disposable;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* A KeyedStateBackend that stores its state in {@code ForSt}. This state backend can store very
* large state that exceeds memory even disk to remote storage. TODO: Support to implement the new
* interface of KeyedStateBackend
* large state that exceeds memory even disk to remote storage.
*/
public class ForStKeyedStateBackend<K> implements Disposable {
public class ForStKeyedStateBackend<K> implements AsyncKeyedStateBackend {

private static final Logger LOG = LoggerFactory.getLogger(ForStKeyedStateBackend.class);

/** The key serializer. */
protected final TypeSerializer<K> keySerializer;

/** Supplier to create SerializedCompositeKeyBuilder. */
private final Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder;

/** Supplier to create DataOutputSerializer to serialize value. */
private final Supplier<DataOutputSerializer> valueSerializerView;

/** Supplier to create DataInputDeserializer to deserialize value. */
private final Supplier<DataInputDeserializer> valueDeserializerView;

/** The container of ForSt options. */
private final ForStResourceContainer optionsContainer;

/** Factory function to create column family options from state name. */
private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;

/**
* We are not using the default column family for Flink state ops, but we still need to remember
* this handle so that we can close it properly when the backend is closed. Note that the one
Expand All @@ -63,22 +88,69 @@ public class ForStKeyedStateBackend<K> implements Disposable {
*/
protected final RocksDB db;

/** Handler to handle state request. */
private StateRequestHandler stateRequestHandler;

// mark whether this backend is already disposed and prevent duplicate disposing
private boolean disposed = false;

public ForStKeyedStateBackend(
ForStResourceContainer optionsContainer,
TypeSerializer<K> keySerializer,
Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder,
Supplier<DataOutputSerializer> valueSerializerView,
Supplier<DataInputDeserializer> valueDeserializerView,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
ColumnFamilyHandle defaultColumnFamilyHandle,
ForStNativeMetricMonitor nativeMetricMonitor) {
this.optionsContainer = Preconditions.checkNotNull(optionsContainer);
this.keySerializer = keySerializer;
this.serializedKeyBuilder = serializedKeyBuilder;
this.valueSerializerView = valueSerializerView;
this.valueDeserializerView = valueDeserializerView;
this.db = db;
this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
this.defaultColumnFamily = defaultColumnFamilyHandle;
this.nativeMetricMonitor = nativeMetricMonitor;
}

@Override
public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
this.stateRequestHandler = stateRequestHandler;
}

@Nonnull
@Override
@SuppressWarnings("unchecked")
public <SV, S extends State> S createState(@Nonnull StateDescriptor<SV> stateDesc) {
Preconditions.checkNotNull(
stateRequestHandler,
"A non-null stateRequestHandler must be setup before createState");
ColumnFamilyHandle columnFamilyHandle =
ForStOperationUtils.createColumnFamilyHandle(
stateDesc.getStateId(), db, columnFamilyOptionsFactory);
if (stateDesc.getType() == StateDescriptor.Type.VALUE) {
return (S)
new ForStValueState<>(
stateRequestHandler,
columnFamilyHandle,
(ValueStateDescriptor<SV>) stateDesc,
serializedKeyBuilder,
valueSerializerView,
valueDeserializerView);
}
throw new UnsupportedOperationException(
String.format("Unsupported state type: %s", stateDesc.getType()));
}

@Override
@Nonnull
public StateExecutor createStateExecutor() {
// TODO: Make io parallelism configurable
return new ForStStateExecutor(4, db, optionsContainer.getWriteOptions());
}

/** Should only be called by one thread, and only after all accesses to the DB happened. */
@Override
public void dispose() {
Expand Down Expand Up @@ -132,4 +204,9 @@ File getLocalBasePath() {
URI getRemoteBasePath() {
return optionsContainer.getRemoteBasePath();
}

@Override
public void close() throws IOException {
// do nothing currently, native resources will be release in dispose method
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
package org.apache.flink.state.forst;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.StateBackendBuilder;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.state.forst.restore.ForStNoneRestoreOperation;
Expand All @@ -42,6 +46,7 @@

import java.util.Collection;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* Builder class for {@link ForStKeyedStateBackend} which handles all necessary initializations and
Expand All @@ -54,7 +59,14 @@ public class ForStKeyedStateBackendBuilder<K>

protected final Logger logger = LoggerFactory.getLogger(getClass());

private static final int KEY_SERIALIZER_BUFFER_START_SIZE = 32;

private static final int VALUE_SERIALIZER_BUFFER_START_SIZE = 128;

private final StateSerializerProvider<K> keySerializerProvider;

private final int numberOfKeyGroups;

private final Collection<KeyedStateHandle> restoreStateHandles;

/** Factory function to create column family options from state name. */
Expand All @@ -72,12 +84,14 @@ public ForStKeyedStateBackendBuilder(
ForStResourceContainer optionsContainer,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles) {
this.optionsContainer = optionsContainer;
this.columnFamilyOptionsFactory = Preconditions.checkNotNull(columnFamilyOptionsFactory);
this.keySerializerProvider =
StateSerializerProvider.fromNewRegisteredSerializer(keySerializer);
this.numberOfKeyGroups = numberOfKeyGroups;
this.metricGroup = metricGroup;
this.restoreStateHandles = stateHandles;
this.nativeMetricOptions = new ForStNativeMetricOptions();
Expand All @@ -95,6 +109,24 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
ForStNativeMetricMonitor nativeMetricMonitor = null;
RocksDB db = null;
ForStRestoreOperation restoreOperation = null;
// Number of bytes required to prefix the key groups.
int keyGroupPrefixBytes =
CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix(
numberOfKeyGroups);
// it is important that we only create the key builder after the restore, and not
// before;
// restore operations may reconfigure the key serializer, so accessing the key
// serializer
// only now we can be certain that the key serializer used in the builder is final.
Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder =
() ->
new SerializedCompositeKeyBuilder<>(
keySerializerProvider.currentSchemaSerializer(),
keyGroupPrefixBytes,
KEY_SERIALIZER_BUFFER_START_SIZE);
Supplier<DataOutputSerializer> valueSerializerView =
() -> new DataOutputSerializer(VALUE_SERIALIZER_BUFFER_START_SIZE);
Supplier<DataInputDeserializer> valueDeserializerView = DataInputDeserializer::new;

try {
optionsContainer.prepareDirectories();
Expand Down Expand Up @@ -139,7 +171,11 @@ public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
return new ForStKeyedStateBackend<>(
this.optionsContainer,
this.keySerializerProvider.currentSchemaSerializer(),
serializedKeyBuilder,
valueSerializerView,
valueDeserializerView,
db,
columnFamilyOptionsFactory,
defaultColumnFamilyHandle,
nativeMetricMonitor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.flink.state.forst;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.OperatingSystem;
import org.apache.flink.util.Preconditions;
Expand All @@ -35,6 +37,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;

Expand Down Expand Up @@ -88,6 +91,46 @@ public static RocksDB openDB(
return dbRef;
}

/** Creates a column family handle from a state id. */
public static ColumnFamilyHandle createColumnFamilyHandle(
String stateId,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {

ColumnFamilyDescriptor columnFamilyDescriptor =
createColumnFamilyDescriptor(stateId, columnFamilyOptionsFactory);

final ColumnFamilyHandle columnFamilyHandle;
try {
columnFamilyHandle = createColumnFamily(columnFamilyDescriptor, db);
} catch (Exception ex) {
IOUtils.closeQuietly(columnFamilyDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", ex);
}

return columnFamilyHandle;
}

/** Creates a column descriptor for a state column family. */
public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
String stateId, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {

byte[] nameBytes = stateId.getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(
!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");

ColumnFamilyOptions options =
createColumnFamilyOptions(columnFamilyOptionsFactory, stateId);

return new ColumnFamilyDescriptor(nameBytes, options);
}

private static ColumnFamilyHandle createColumnFamily(
ColumnFamilyDescriptor columnDescriptor, RocksDB db) throws RocksDBException {
return db.createColumnFamily(columnDescriptor);
}

public static ColumnFamilyOptions createColumnFamilyOptions(
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, String stateName) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,13 @@ private File getNextStoragePath() {
// State holding data structures
// ------------------------------------------------------------------------

public <K> ForStKeyedStateBackend<K> createForStKeyedStateBackend(
@Override
public boolean supportsAsyncKeyedStateBackend() {
return true;
}

@Override
public <K> ForStKeyedStateBackend<K> createAsyncKeyedStateBackend(
KeyedStateBackendParameters<K> parameters) throws IOException {
Environment env = parameters.getEnv();

Expand Down Expand Up @@ -331,6 +337,7 @@ public <K> ForStKeyedStateBackend<K> createForStKeyedStateBackend(
resourceContainer,
stateName -> resourceContainer.getColumnOptions(),
parameters.getKeySerializer(),
parameters.getNumberOfKeyGroups(),
parameters.getMetricGroup(),
parameters.getStateHandles())
.setNativeMetricOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void testSetDbPath() throws Exception {
assertNull(forStStateBackend.getLocalDbStoragePaths());
} finally {
keyedBackend.dispose();
keyedBackend.close();
env.close();
}
}
Expand Down Expand Up @@ -216,6 +217,7 @@ private void testLocalDbPaths(String configuredPath, File expectedPath) throws E
assertNull(forStBackend.getLocalDbStoragePaths());
} finally {
keyedBackend.dispose();
keyedBackend.close();
env.close();
}
}
Expand Down Expand Up @@ -281,6 +283,7 @@ public void testCleanRelocatedDbLogs() throws Exception {
}
} finally {
keyedBackend.dispose();
keyedBackend.close();
env.close();
}

Expand Down Expand Up @@ -311,7 +314,7 @@ public void testUseTempDirectories() throws Exception {
TaskKvStateRegistry kvStateRegistry = env.getTaskKvStateRegistry();
CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
ForStKeyedStateBackend<Integer> keyedBackend =
forStStateBackend.createForStKeyedStateBackend(
forStStateBackend.createAsyncKeyedStateBackend(
new KeyedStateBackendParametersImpl<>(
env,
jobID,
Expand All @@ -330,6 +333,7 @@ public void testUseTempDirectories() throws Exception {
assertThat(instanceBasePath.getAbsolutePath(), startsWith(dir1.getAbsolutePath()));
} finally {
keyedBackend.dispose();
keyedBackend.close();
env.close();
}
}
Expand Down Expand Up @@ -357,7 +361,7 @@ public void testFailWhenNoLocalStorageDir() throws Exception {
TaskKvStateRegistry kvStateRegistry =
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID());
CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
forStStateBackend.createForStKeyedStateBackend(
forStStateBackend.createAsyncKeyedStateBackend(
new KeyedStateBackendParametersImpl<>(
env,
jobID,
Expand Down Expand Up @@ -403,7 +407,7 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID());
CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
ForStKeyedStateBackend<Integer> keyedStateBackend =
forStStateBackend.createForStKeyedStateBackend(
forStStateBackend.createAsyncKeyedStateBackend(
new KeyedStateBackendParametersImpl<>(
env,
jobID,
Expand All @@ -418,6 +422,7 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
cancelStreamRegistry));

keyedStateBackend.dispose();
keyedStateBackend.close();
} catch (Exception e) {
e.printStackTrace();
fail("Backend initialization failed even though some paths were available");
Expand Down Expand Up @@ -727,6 +732,7 @@ public void testRemoteDirectory() throws Exception {
} finally {
if (keyedBackend != null) {
keyedBackend.dispose();
keyedBackend.close();
}
}
}
Expand Down

0 comments on commit a06c3a1

Please sign in to comment.