Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public class RocksDBKeyedStateBackendBuilder<K> extends AbstractKeyedStateBacken

private RocksDB injectedTestDB; // for testing
private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for testing
private RocksDBStateUploader injectRocksDBStateUploader; // for testing

public RocksDBKeyedStateBackendBuilder(
String operatorIdentifier,
Expand Down Expand Up @@ -228,6 +229,9 @@ RocksDBKeyedStateBackendBuilder<K> setNativeMetricOptions(

RocksDBKeyedStateBackendBuilder<K> setNumberOfTransferingThreads(
int numberOfTransferingThreads) {
Preconditions.checkState(
injectRocksDBStateUploader == null,
"numberOfTransferingThreads can be set only when injectRocksDBStateUploader is null.");
this.numberOfTransferingThreads = numberOfTransferingThreads;
return this;
}
Expand All @@ -238,6 +242,18 @@ RocksDBKeyedStateBackendBuilder<K> setWriteBatchSize(long writeBatchSize) {
return this;
}

RocksDBKeyedStateBackendBuilder<K> setRocksDBStateUploader(
RocksDBStateUploader rocksDBStateUploader) {
Preconditions.checkState(
injectRocksDBStateUploader == null, "rocksDBStateUploader can be only set once");
Preconditions.checkState(
numberOfTransferingThreads
== RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue(),
"RocksDBStateUploader can only be set if numberOfTransferingThreads has not been manually set.");
this.injectRocksDBStateUploader = rocksDBStateUploader;
return this;
}

private static void checkAndCreateDirectory(File directory) throws IOException {
if (directory.exists()) {
if (!directory.isDirectory()) {
Expand Down Expand Up @@ -497,6 +513,10 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation(
long lastCompletedCheckpointId) {
RocksDBSnapshotStrategyBase<K, ?> checkpointSnapshotStrategy;
if (enableIncrementalCheckpointing) {
RocksDBStateUploader stateUploader =
injectRocksDBStateUploader == null
? new RocksDBStateUploader(numberOfTransferingThreads)
: injectRocksDBStateUploader;
checkpointSnapshotStrategy =
new RocksIncrementalSnapshotStrategy<>(
db,
Expand All @@ -510,8 +530,8 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation(
instanceBasePath,
backendUID,
materializedSstFiles,
lastCompletedCheckpointId,
numberOfTransferingThreads);
stateUploader,
lastCompletedCheckpointId);
} else {
checkpointSnapshotStrategy =
new RocksFullSnapshotStrategy<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.runtime.util.ExecutorThreadFactory;

import java.io.Closeable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -30,7 +32,9 @@ class RocksDBStateDataTransfer implements Closeable {

RocksDBStateDataTransfer(int threadNum) {
if (threadNum > 1) {
executorService = Executors.newFixedThreadPool(threadNum);
executorService =
Executors.newFixedThreadPool(
threadNum, new ExecutorThreadFactory("Flink-RocksDBStateDataTransfer"));
} else {
executorService = newDirectExecutorService();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ public RocksIncrementalSnapshotStrategy(
@Nonnull File instanceBasePath,
@Nonnull UUID backendUID,
@Nonnull SortedMap<Long, Set<StateHandleID>> materializedSstFiles,
long lastCompletedCheckpointId,
int numberOfTransferingThreads) {
@Nonnull RocksDBStateUploader rocksDBStateUploader,
long lastCompletedCheckpointId) {

super(
DESCRIPTION,
Expand All @@ -137,8 +137,8 @@ public RocksIncrementalSnapshotStrategy(
this.instanceBasePath = instanceBasePath;
this.backendUID = backendUID;
this.materializedSstFiles = materializedSstFiles;
this.stateUploader = rocksDBStateUploader;
this.lastCompletedCheckpointId = lastCompletedCheckpointId;
this.stateUploader = new RocksDBStateUploader(numberOfTransferingThreads);
this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", "");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public static List<Object[]> modes() {
private String dbPath;
private RocksDB db = null;
private ColumnFamilyHandle defaultCFHandle = null;
private RocksDBStateUploader rocksDBStateUploader = null;
private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer();

public void prepareRocksDB() throws Exception {
Expand Down Expand Up @@ -210,10 +211,9 @@ public void setupRocksKeyedStateBackend() throws Exception {
testStreamFactory.setBlockerLatch(blocker);
testStreamFactory.setWaiterLatch(waiter);
testStreamFactory.setAfterNumberInvocations(10);

prepareRocksDB();

keyedStateBackend =
RocksDBKeyedStateBackendBuilder keyedStateBackendBuilder =
RocksDBTestUtils.builderForTestDB(
TEMP_FOLDER
.newFolder(), // this is not used anyways because the DB is
Expand All @@ -222,8 +222,17 @@ public void setupRocksKeyedStateBackend() throws Exception {
spy(db),
defaultCFHandle,
optionsContainer.getColumnOptions())
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing)
.build();
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing);

if (enableIncrementalCheckpointing) {
rocksDBStateUploader =
spy(
new RocksDBStateUploader(
RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue()));
keyedStateBackendBuilder.setRocksDBStateUploader(rocksDBStateUploader);
}

keyedStateBackend = keyedStateBackendBuilder.build();

testState1 =
keyedStateBackend.getPartitionedState(
Expand Down Expand Up @@ -367,6 +376,7 @@ public void testReleasingSnapshotAfterBackendClosed() throws Exception {
keyedStateBackend.dispose();
keyedStateBackend = null;
}
verifyRocksDBStateUploaderClosed();
}

@Test
Expand All @@ -385,6 +395,7 @@ public void testDismissingSnapshot() throws Exception {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
verifyRocksDBStateUploaderClosed();
}

@Test
Expand Down Expand Up @@ -412,6 +423,7 @@ public void testDismissingSnapshotNotRunnable() throws Exception {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
verifyRocksDBStateUploaderClosed();
}

@Test
Expand Down Expand Up @@ -448,6 +460,7 @@ public void testCompletingSnapshot() throws Exception {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
verifyRocksDBStateUploaderClosed();
}

@Test
Expand Down Expand Up @@ -485,6 +498,7 @@ public void testCancelRunningSnapshot() throws Exception {
this.keyedStateBackend.dispose();
this.keyedStateBackend = null;
}
verifyRocksDBStateUploaderClosed();
}

@Test
Expand Down Expand Up @@ -641,6 +655,12 @@ private void verifyRocksObjectsReleased() {
assertEquals(true, keyedStateBackend.isDisposed());
}

private void verifyRocksDBStateUploaderClosed() {
if (enableIncrementalCheckpointing) {
verify(rocksDBStateUploader, times(1)).close();
}
}

private static class AcceptAllFilter implements IOFileFilter {
@Override
public boolean accept(File file) {
Expand Down