Skip to content

Commit 8be1058

Browse files
leiyanfeirkhachatryan
authored andcommitted
[FLINK-23003][runtime] Fix resource leak in RocksIncrementalSnapshotStrategy
1 parent 3a40361 commit 8be1058

File tree

5 files changed

+17
-2
lines changed

5 files changed

+17
-2
lines changed

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,6 +465,7 @@ public void dispose() {
465465

466466
cleanInstanceBasePath();
467467
}
468+
IOUtils.closeQuietly(checkpointSnapshotStrategy);
468469
this.disposed = true;
469470
}
470471

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackendBuilder.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
265265
new RocksDbTtlCompactFiltersManager(ttlTimeProvider);
266266

267267
ResourceGuard rocksDBResourceGuard = new ResourceGuard();
268-
RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy;
268+
RocksDBSnapshotStrategyBase<K, ?> checkpointStrategy = null;
269269
PriorityQueueSetFactory priorityQueueFactory;
270270
SerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
271271
// Number of bytes required to prefix the key groups.
@@ -363,6 +363,7 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
363363
IOUtils.closeQuietly(optionsContainer);
364364
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
365365
kvStateInformation.clear();
366+
IOUtils.closeQuietly(checkpointStrategy);
366367
try {
367368
FileUtils.deleteDirectory(instanceBasePath);
368369
} catch (Exception ex) {

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
* @param <K> type of the backend keys.
4444
*/
4545
public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources>
46-
implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R> {
46+
implements CheckpointListener, SnapshotStrategy<KeyedStateHandle, R>, AutoCloseable {
4747

4848
private static final Logger LOG = LoggerFactory.getLogger(RocksDBSnapshotStrategyBase.class);
4949

@@ -92,4 +92,7 @@ public RocksDBSnapshotStrategyBase(
9292
public String getDescription() {
9393
return description;
9494
}
95+
96+
@Override
97+
public abstract void close();
9598
}

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksFullSnapshotStrategy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,11 @@ public void notifyCheckpointAborted(long checkpointId) {
141141
// nothing to do.
142142
}
143143

144+
@Override
145+
public void close() {
146+
// nothing to do.
147+
}
148+
144149
private SupplierWithException<CheckpointStreamWithResultProvider, Exception>
145150
createCheckpointStreamSupplier(
146151
long checkpointId,

flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,11 @@ public void notifyCheckpointAborted(long abortedCheckpointId) {
204204
}
205205
}
206206

207+
@Override
208+
public void close() {
209+
stateUploader.close();
210+
}
211+
207212
@Nonnull
208213
private SnapshotDirectory prepareLocalSnapshotDirectory(long checkpointId) throws IOException {
209214

0 commit comments

Comments
 (0)