Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,14 @@
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CachedStreamStateHandle;
import org.apache.flink.runtime.checkpoint.CheckpointCache;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CachedCheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
Expand Down Expand Up @@ -205,6 +208,8 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** Unique ID of this backend. */
private UUID backendUID;

private CheckpointCache cache;

public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
Expand All @@ -216,13 +221,16 @@ public RocksDBKeyedStateBackend(
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
CheckpointCache cache,
boolean enableIncrementalCheckpointing
) throws IOException {

super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig);

this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);

this.cache = cache;

this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
this.rocksDBResourceGuard = new ResourceGuard();

Expand Down Expand Up @@ -370,7 +378,7 @@ private RunnableFuture<KeyedStateHandle> snapshotIncrementally(
final RocksDBIncrementalSnapshotOperation<K> snapshotOperation =
new RocksDBIncrementalSnapshotOperation<>(
this,
checkpointStreamFactory,
new CachedCheckpointStreamFactory(cache, checkpointStreamFactory),
checkpointId,
checkpointTimestamp);

Expand Down Expand Up @@ -416,7 +424,10 @@ private RunnableFuture<KeyedStateHandle> snapshotFully(
return DoneFuture.nullValue();
}

snapshotOperation = new RocksDBFullSnapshotOperation<>(this, streamFactory, snapshotCloseableRegistry);
snapshotOperation = new RocksDBFullSnapshotOperation<>(
this,
new CachedCheckpointStreamFactory(cache, streamFactory),
snapshotCloseableRegistry);
snapshotOperation.takeDBSnapShot(checkpointId, timestamp);

// implementation of the async IO operation, based on FutureTask
Expand Down Expand Up @@ -488,7 +499,7 @@ static final class RocksDBFullSnapshotOperation<K> {

private final RocksDBKeyedStateBackend<K> stateBackend;
private final KeyGroupRangeOffsets keyGroupRangeOffsets;
private final CheckpointStreamFactory checkpointStreamFactory;
private final CachedCheckpointStreamFactory checkpointStreamFactory;
private final CloseableRegistry snapshotCloseableRegistry;
private final ResourceGuard.Lease dbLease;

Expand All @@ -504,7 +515,7 @@ static final class RocksDBFullSnapshotOperation<K> {

RocksDBFullSnapshotOperation(
RocksDBKeyedStateBackend<K> stateBackend,
CheckpointStreamFactory checkpointStreamFactory,
CachedCheckpointStreamFactory checkpointStreamFactory,
CloseableRegistry registry) throws IOException {

this.stateBackend = stateBackend;
Expand Down Expand Up @@ -535,7 +546,8 @@ public void takeDBSnapShot(long checkpointId, long checkpointTimeStamp) {
*/
public void openCheckpointStream() throws Exception {
Preconditions.checkArgument(outStream == null, "Output stream for snapshot is already set.");
outStream = checkpointStreamFactory.createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp);
outStream = checkpointStreamFactory.
createCheckpointStateOutputStream(checkpointId, checkpointTimeStamp, new StateHandleID(stateBackend.backendUID + "$" + checkpointId));
snapshotCloseableRegistry.registerCloseable(outStream);
outputView = new DataOutputViewStreamWrapper(outStream);
}
Expand Down Expand Up @@ -754,7 +766,7 @@ private static final class RocksDBIncrementalSnapshotOperation<K> {
private final RocksDBKeyedStateBackend<K> stateBackend;

/** Stream factory that creates the outpus streams to DFS. */
private final CheckpointStreamFactory checkpointStreamFactory;
private final CachedCheckpointStreamFactory checkpointStreamFactory;

/** Id for the current checkpoint. */
private final long checkpointId;
Expand Down Expand Up @@ -787,7 +799,7 @@ private static final class RocksDBIncrementalSnapshotOperation<K> {

private RocksDBIncrementalSnapshotOperation(
RocksDBKeyedStateBackend<K> stateBackend,
CheckpointStreamFactory checkpointStreamFactory,
CachedCheckpointStreamFactory checkpointStreamFactory,
long checkpointId,
long checkpointTimestamp) throws IOException {

Expand All @@ -798,19 +810,27 @@ private RocksDBIncrementalSnapshotOperation(
this.dbLease = this.stateBackend.rocksDBResourceGuard.acquireResource();
}

private StreamStateHandle materializeStateData(Path filePath) throws Exception {
private StreamStateHandle materializeStateData(StateHandleID handleID, Path filePath) throws Exception {
FSDataInputStream inputStream = null;
CheckpointStreamFactory.CheckpointStateOutputStream outputStream = null;

//fast path for placeholder
if (filePath == null) {
outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp, handleID, true);
outputStream.closeAndGetHandle();
outputStream = null;
return new PlaceholderStreamStateHandle(handleID);
}

try {
final byte[] buffer = new byte[8 * 1024];

FileSystem backupFileSystem = backupPath.getFileSystem();
inputStream = backupFileSystem.open(filePath);
closeableRegistry.registerCloseable(inputStream);

outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp, handleID);
closeableRegistry.registerCloseable(outputStream);

while (true) {
Expand Down Expand Up @@ -846,7 +866,7 @@ private StreamStateHandle materializeMetaData() throws Exception {

try {
outputStream = checkpointStreamFactory
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp);
.createCheckpointStateOutputStream(checkpointId, checkpointTimestamp, new StateHandleID(stateBackend.backendUID + "$" + checkpointId + "meta"));
closeableRegistry.registerCloseable(outputStream);

//no need for compression scheme support because sst-files are already compressed
Expand Down Expand Up @@ -924,23 +944,23 @@ KeyedStateHandle materializeSnapshot() throws Exception {
for (FileStatus fileStatus : fileStatuses) {
final Path filePath = fileStatus.getPath();
final String fileName = filePath.getName();
final StateHandleID stateHandleID = new StateHandleID(fileName);

if (fileName.endsWith(SST_FILE_SUFFIX)) {
final StateHandleID stateHandleID = new StateHandleID(stateBackend.backendUID + "$" + fileName);
final boolean existsAlready =
baseSstFiles != null && baseSstFiles.contains(stateHandleID);

if (existsAlready) {
// we introduce a placeholder state handle, that is replaced with the
// original from the shared state registry (created from a previous checkpoint)
sstFiles.put(
stateHandleID,
new PlaceholderStreamStateHandle());
// materializeStateData should return a placeholder in a fast path.
sstFiles.put(stateHandleID, materializeStateData(stateHandleID, null));
} else {
sstFiles.put(stateHandleID, materializeStateData(filePath));
sstFiles.put(stateHandleID, materializeStateData(stateHandleID, filePath));
}
} else {
StreamStateHandle fileHandle = materializeStateData(filePath);
final StateHandleID stateHandleID = new StateHandleID(stateBackend.backendUID + "$" + checkpointId + "$" + fileName);
StreamStateHandle fileHandle = materializeStateData(stateHandleID, filePath);
miscFiles.put(stateHandleID, fileHandle);
}
}
Expand Down Expand Up @@ -1136,6 +1156,7 @@ public void doRestore(Collection<KeyedStateHandle> keyedStateHandles)
", but found: " + keyedStateHandle.getClass());
}
this.currentKeyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle;
this.currentKeyGroupsStateHandle.setCache(rocksDBKeyedStateBackend.cache);
restoreKeyGroupsInStateHandle();
}
}
Expand Down Expand Up @@ -1289,11 +1310,16 @@ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBack
}

private List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> readMetaData(
StreamStateHandle metaStateHandle) throws Exception {
StreamStateHandle metaStateHandle,
boolean hasExtraKeys) throws Exception {

FSDataInputStream inputStream = null;

try {
if (metaStateHandle instanceof CachedStreamStateHandle) {
((CachedStreamStateHandle) metaStateHandle).setCheckpointCache(stateBackend.cache);
((CachedStreamStateHandle) metaStateHandle).reCache(!hasExtraKeys);
}
inputStream = metaStateHandle.openInputStream();
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);

Expand Down Expand Up @@ -1375,12 +1401,12 @@ private void restoreInstance(
final Map<StateHandleID, StreamStateHandle> miscFiles =
restoreStateHandle.getPrivateState();

readAllStateData(sstFiles, restoreInstancePath);
readAllStateData(miscFiles, restoreInstancePath);
readAllStateData(sstFiles, restoreInstancePath, hasExtraKeys);
readAllStateData(miscFiles, restoreInstancePath, hasExtraKeys);

// read meta data
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> stateMetaInfoSnapshots =
readMetaData(restoreStateHandle.getMetaStateHandle());
readMetaData(restoreStateHandle.getMetaStateHandle(), hasExtraKeys);

List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>(1 + stateMetaInfoSnapshots.size());
Expand Down Expand Up @@ -1516,6 +1542,11 @@ private void restoreInstance(
}

stateBackend.lastCompletedCheckpointId = restoreStateHandle.getCheckpointId();

// commit re-cache
if (stateBackend.cache != null) {
stateBackend.cache.commitCache(CheckpointCache.CHECKPOINT_ID_FOR_RESTORE, false);
}
}
} finally {
FileSystem restoreFileSystem = restoreInstancePath.getFileSystem();
Expand All @@ -1527,12 +1558,17 @@ private void restoreInstance(

private void readAllStateData(
Map<StateHandleID, StreamStateHandle> stateHandleMap,
Path restoreInstancePath) throws IOException {
Path restoreInstancePath,
boolean hasExtraKeys) throws IOException {

for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
StateHandleID stateHandleID = entry.getKey();
StreamStateHandle remoteFileHandle = entry.getValue();
readStateData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
if (remoteFileHandle instanceof CachedStreamStateHandle) {
((CachedStreamStateHandle) remoteFileHandle).setCheckpointCache(stateBackend.cache);
((CachedStreamStateHandle) remoteFileHandle).reCache(!hasExtraKeys);
}
readStateData(new Path(restoreInstancePath, getFileNameFromStateHanldeId(stateHandleID)), remoteFileHandle);
}
}

Expand All @@ -1541,13 +1577,18 @@ private void createFileHardLinksInRestorePath(
Path restoreInstancePath) throws IOException {

for (StateHandleID stateHandleID : stateHandleMap.keySet()) {
String newSstFileName = stateHandleID.toString();
String newSstFileName = getFileNameFromStateHanldeId(stateHandleID);
File restoreFile = new File(restoreInstancePath.getPath(), newSstFileName);
File targetFile = new File(stateBackend.instanceRocksDBPath, newSstFileName);
Files.createLink(targetFile.toPath(), restoreFile.toPath());
}
}

private String getFileNameFromStateHanldeId(StateHandleID handleID) {
final String[] arr = handleID.toString().split("\\$");
return arr[arr.length - 1];
}

void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception {

boolean hasExtraKeys = (restoreStateHandles.size() > 1 ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,13 @@ public CheckpointStreamFactory createSavepointStreamFactory(

@Override
public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws IOException {
Environment env,
JobID jobID,
String operatorIdentifier,
TypeSerializer<K> keySerializer,
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
TaskKvStateRegistry kvStateRegistry) throws IOException {

// first, make sure that the RocksDB JNI library is loaded
// we do this explicitly here to have better error handling
Expand All @@ -313,6 +313,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
env.getCheckpointCache(),
enableIncrementalCheckpointing);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointCache;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
Expand Down Expand Up @@ -231,6 +232,7 @@ public void testCorrectMergeOperatorSet() throws IOException {
1,
new KeyGroupRange(0, 0),
new ExecutionConfig(),
mock(CheckpointCache.class),
enableIncrementalCheckpointing);

verify(columnFamilyOptions, Mockito.times(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.mesos.runtime.clusterframework

import org.apache.flink.runtime.checkpoint.CheckpointCacheManager
import org.apache.flink.runtime.clusterframework.types.ResourceID
import org.apache.flink.runtime.highavailability.HighAvailabilityServices
import org.apache.flink.runtime.io.disk.iomanager.IOManager
Expand All @@ -37,6 +38,7 @@ class MesosTaskManager(
memoryManager: MemoryManager,
ioManager: IOManager,
network: NetworkEnvironment,
checkpointCacheManager: CheckpointCacheManager,
numberOfSlots: Int,
highAvailabilityServices: HighAvailabilityServices,
taskManagerMetricGroup : TaskManagerMetricGroup)
Expand All @@ -47,6 +49,7 @@ class MesosTaskManager(
memoryManager,
ioManager,
network,
checkpointCacheManager,
numberOfSlots,
highAvailabilityServices,
taskManagerMetricGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointCache;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.internal.InternalListState;
Expand Down Expand Up @@ -76,7 +77,9 @@ static final class RocksDBKeyedStateBackend2<K> extends RocksDBKeyedStateBackend
super(operatorIdentifier, userCodeClassLoader,
instanceBasePath,
dbOptions, columnFamilyOptions, kvStateRegistry, keySerializer,
numberOfKeyGroups, keyGroupRange, executionConfig, false);
numberOfKeyGroups, keyGroupRange, executionConfig,
mock(CheckpointCache.class),
false);
}

@Override
Expand Down Expand Up @@ -152,6 +155,7 @@ public void testMapSerialization() throws Exception {
LongSerializer.INSTANCE,
1, new KeyGroupRange(0, 0),
new ExecutionConfig(),
mock(CheckpointCache.class),
false);
longHeapKeyedStateBackend.restore(null);
longHeapKeyedStateBackend.setCurrentKey(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointCache;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
Expand Down Expand Up @@ -194,7 +195,8 @@ public void testListSerialization() throws Exception {
1,
new KeyGroupRange(0, 0),
async,
new ExecutionConfig()
new ExecutionConfig(),
mock(CheckpointCache.class)
);
longHeapKeyedStateBackend.setCurrentKey(key);

Expand Down Expand Up @@ -296,7 +298,8 @@ public void testMapSerialization() throws Exception {
1,
new KeyGroupRange(0, 0),
async,
new ExecutionConfig()
new ExecutionConfig(),
mock(CheckpointCache.class)
);
longHeapKeyedStateBackend.setCurrentKey(key);

Expand Down
Loading