diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 246677d219c59..907d828eb5691 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -120,6 +120,7 @@ public class RocksDBKeyedStateBackendBuilder extends AbstractKeyedStateBacken private RocksDB injectedTestDB; // for testing private ColumnFamilyHandle injectedDefaultColumnFamilyHandle; // for testing + private RocksDBStateUploader injectRocksDBStateUploader; // for testing public RocksDBKeyedStateBackendBuilder( String operatorIdentifier, @@ -228,6 +229,9 @@ RocksDBKeyedStateBackendBuilder setNativeMetricOptions( RocksDBKeyedStateBackendBuilder setNumberOfTransferingThreads( int numberOfTransferingThreads) { + Preconditions.checkState( + injectRocksDBStateUploader == null, + "numberOfTransferingThreads can be set only when injectRocksDBStateUploader is null."); this.numberOfTransferingThreads = numberOfTransferingThreads; return this; } @@ -238,6 +242,18 @@ RocksDBKeyedStateBackendBuilder setWriteBatchSize(long writeBatchSize) { return this; } + RocksDBKeyedStateBackendBuilder setRocksDBStateUploader( + RocksDBStateUploader rocksDBStateUploader) { + Preconditions.checkState( + injectRocksDBStateUploader == null, "rocksDBStateUploader can be only set once"); + Preconditions.checkState( + numberOfTransferingThreads + == RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue(), + "RocksDBStateUploader can only be set if numberOfTransferingThreads has not been manually set."); + this.injectRocksDBStateUploader = rocksDBStateUploader; + return this; + } + private static void checkAndCreateDirectory(File directory) throws IOException { if (directory.exists()) { if (!directory.isDirectory()) { @@ -497,6 +513,10 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation( long lastCompletedCheckpointId) { RocksDBSnapshotStrategyBase checkpointSnapshotStrategy; if (enableIncrementalCheckpointing) { + RocksDBStateUploader stateUploader = + injectRocksDBStateUploader == null + ? new RocksDBStateUploader(numberOfTransferingThreads) + : injectRocksDBStateUploader; checkpointSnapshotStrategy = new RocksIncrementalSnapshotStrategy<>( db, @@ -510,8 +530,8 @@ private RocksDBRestoreOperation getRocksDBRestoreOperation( instanceBasePath, backendUID, materializedSstFiles, - lastCompletedCheckpointId, - numberOfTransferingThreads); + stateUploader, + lastCompletedCheckpointId); } else { checkpointSnapshotStrategy = new RocksFullSnapshotStrategy<>( diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java index 616f1676fbac2..a9712a751374d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDataTransfer.java @@ -17,6 +17,8 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.runtime.util.ExecutorThreadFactory; + import java.io.Closeable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -30,7 +32,9 @@ class RocksDBStateDataTransfer implements Closeable { RocksDBStateDataTransfer(int threadNum) { if (threadNum > 1) { - executorService = Executors.newFixedThreadPool(threadNum); + executorService = + Executors.newFixedThreadPool( + threadNum, new ExecutorThreadFactory("Flink-RocksDBStateDataTransfer")); } else { executorService = newDirectExecutorService(); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index 99f0b6f4f4c97..73d9bf090bda3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -121,8 +121,8 @@ public RocksIncrementalSnapshotStrategy( @Nonnull File instanceBasePath, @Nonnull UUID backendUID, @Nonnull SortedMap> materializedSstFiles, - long lastCompletedCheckpointId, - int numberOfTransferingThreads) { + @Nonnull RocksDBStateUploader rocksDBStateUploader, + long lastCompletedCheckpointId) { super( DESCRIPTION, @@ -137,8 +137,8 @@ public RocksIncrementalSnapshotStrategy( this.instanceBasePath = instanceBasePath; this.backendUID = backendUID; this.materializedSstFiles = materializedSstFiles; + this.stateUploader = rocksDBStateUploader; this.lastCompletedCheckpointId = lastCompletedCheckpointId; - this.stateUploader = new RocksDBStateUploader(numberOfTransferingThreads); this.localDirectoryName = backendUID.toString().replaceAll("[\\-]", ""); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java index b2fba07fae33d..8c13466c7cbe1 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java @@ -137,6 +137,7 @@ public static List modes() { private String dbPath; private RocksDB db = null; private ColumnFamilyHandle defaultCFHandle = null; + private RocksDBStateUploader rocksDBStateUploader = null; private final RocksDBResourceContainer optionsContainer = new RocksDBResourceContainer(); public void prepareRocksDB() throws Exception { @@ -210,10 +211,9 @@ public void setupRocksKeyedStateBackend() throws Exception { testStreamFactory.setBlockerLatch(blocker); testStreamFactory.setWaiterLatch(waiter); testStreamFactory.setAfterNumberInvocations(10); - prepareRocksDB(); - keyedStateBackend = + RocksDBKeyedStateBackendBuilder keyedStateBackendBuilder = RocksDBTestUtils.builderForTestDB( TEMP_FOLDER .newFolder(), // this is not used anyways because the DB is @@ -222,8 +222,17 @@ public void setupRocksKeyedStateBackend() throws Exception { spy(db), defaultCFHandle, optionsContainer.getColumnOptions()) - .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing) - .build(); + .setEnableIncrementalCheckpointing(enableIncrementalCheckpointing); + + if (enableIncrementalCheckpointing) { + rocksDBStateUploader = + spy( + new RocksDBStateUploader( + RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM.defaultValue())); + keyedStateBackendBuilder.setRocksDBStateUploader(rocksDBStateUploader); + } + + keyedStateBackend = keyedStateBackendBuilder.build(); testState1 = keyedStateBackend.getPartitionedState( @@ -367,6 +376,7 @@ public void testReleasingSnapshotAfterBackendClosed() throws Exception { keyedStateBackend.dispose(); keyedStateBackend = null; } + verifyRocksDBStateUploaderClosed(); } @Test @@ -385,6 +395,7 @@ public void testDismissingSnapshot() throws Exception { this.keyedStateBackend.dispose(); this.keyedStateBackend = null; } + verifyRocksDBStateUploaderClosed(); } @Test @@ -412,6 +423,7 @@ public void testDismissingSnapshotNotRunnable() throws Exception { this.keyedStateBackend.dispose(); this.keyedStateBackend = null; } + verifyRocksDBStateUploaderClosed(); } @Test @@ -448,6 +460,7 @@ public void testCompletingSnapshot() throws Exception { this.keyedStateBackend.dispose(); this.keyedStateBackend = null; } + verifyRocksDBStateUploaderClosed(); } @Test @@ -485,6 +498,7 @@ public void testCancelRunningSnapshot() throws Exception { this.keyedStateBackend.dispose(); this.keyedStateBackend = null; } + verifyRocksDBStateUploaderClosed(); } @Test @@ -641,6 +655,12 @@ private void verifyRocksObjectsReleased() { assertEquals(true, keyedStateBackend.isDisposed()); } + private void verifyRocksDBStateUploaderClosed() { + if (enableIncrementalCheckpointing) { + verify(rocksDBStateUploader, times(1)).close(); + } + } + private static class AcceptAllFilter implements IOFileFilter { @Override public boolean accept(File file) {