diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 499493ba3702a..5ec7f41273fef 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -465,6 +465,7 @@ public void dispose() { cleanInstanceBasePath(); } + IOUtils.closeQuietly(checkpointSnapshotStrategy); this.disposed = true; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java index 5eb0be01ef94d..246677d219c59 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java @@ -265,7 +265,7 @@ public RocksDBKeyedStateBackend build() throws BackendBuildingException { new RocksDbTtlCompactFiltersManager(ttlTimeProvider); ResourceGuard rocksDBResourceGuard = new ResourceGuard(); - RocksDBSnapshotStrategyBase checkpointStrategy; + RocksDBSnapshotStrategyBase checkpointStrategy = null; PriorityQueueSetFactory priorityQueueFactory; SerializedCompositeKeyBuilder sharedRocksKeyBuilder; // Number of bytes required to prefix the key groups. @@ -363,6 +363,7 @@ public RocksDBKeyedStateBackend build() throws BackendBuildingException { IOUtils.closeQuietly(optionsContainer); ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories(); kvStateInformation.clear(); + IOUtils.closeQuietly(checkpointStrategy); try { FileUtils.deleteDirectory(instanceBasePath); } catch (Exception ex) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java index 74311ef845c36..b064fc549215d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java @@ -43,7 +43,7 @@ * @param type of the backend keys. */ public abstract class RocksDBSnapshotStrategyBase - implements CheckpointListener, SnapshotStrategy { + implements CheckpointListener, SnapshotStrategy, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnapshotStrategyBase.class); @@ -92,4 +92,7 @@ public RocksDBSnapshotStrategyBase( public String getDescription() { return description; } + + @Override + public abstract void close(); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java index 8812165abed85..8e3063a857b82 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java @@ -141,6 +141,11 @@ public void notifyCheckpointAborted(long checkpointId) { // nothing to do. } + @Override + public void close() { + // nothing to do. + } + private SupplierWithException createCheckpointStreamSupplier( long checkpointId, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index 682a3f7d98271..99f0b6f4f4c97 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -204,6 +204,11 @@ public void notifyCheckpointAborted(long abortedCheckpointId) { } } + @Override + public void close() { + stateUploader.close(); + } + @Nonnull private SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) throws IOException {