From 31ea1a9358f4aaa1083c6fbb7bdee070be29f4e6 Mon Sep 17 00:00:00 2001 From: Zakelly Date: Tue, 9 Apr 2024 20:16:31 +0800 Subject: [PATCH] [FLINK-34936][Checkpointing] Register reused shared state handle to FileMergingSnapshotManager --- .../FileMergingSnapshotManager.java | 12 ++++ .../FileMergingSnapshotManagerBase.java | 38 +++++++++++- .../metadata/MetadataV2V3SerializerBase.java | 6 +- .../state/CheckpointStreamFactory.java | 12 ++++ .../EmptySegmentFileStateHandle.java | 15 ++++- .../filemerging/SegmentFileStateHandle.java | 21 ++++++- .../FsMergingCheckpointStorageLocation.java | 6 ++ .../FileMergingSnapshotManagerTestBase.java | 62 +++++++++++++++++++ .../metadata/CheckpointTestUtils.java | 8 ++- ...ergingCheckpointStateOutputStreamTest.java | 8 ++- .../RocksIncrementalSnapshotStrategy.java | 15 ++++- 11 files changed, 189 insertions(+), 14 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index afc5eb618dc..2aa32ba65c0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -24,12 +24,14 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess; import java.io.Closeable; +import java.util.Collection; /** * FileMergingSnapshotManager provides an interface to manage files and meta information for @@ -157,6 +159,16 @@ DirectoryStreamStateHandle getManagedDirStateHandle( */ void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) throws Exception; + /** + * A callback method which is called when previous state handles are reused by following + * checkpoint(s). + * + * @param checkpointId the checkpoint that reuses the handles. + * @param stateHandles the handles to be reused. + */ + void reusePreviousStateHandle( + long checkpointId, Collection stateHandles); + /** * A key identifies a subtask. A subtask can be identified by the operator id, subtask index and * the parallelism. Note that this key should be consistent across job attempts. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index d46edcd235e..db90d654d61 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -27,6 +27,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream; @@ -42,6 +43,7 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.Collection; import java.util.HashSet; import java.util.Iterator; import java.util.Map; @@ -70,6 +72,9 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps @GuardedBy("lock") protected TreeMap> uploadedStates = new TreeMap<>(); + /** The map that holds all the known live logical files. */ + private final Map knownLogicalFiles = new ConcurrentHashMap<>(); + /** The {@link FileSystem} that this manager works on. */ protected FileSystem fs; @@ -206,7 +211,9 @@ protected LogicalFile createLogicalFile( long length, @Nonnull SubtaskKey subtaskKey) { LogicalFileId fileID = LogicalFileId.generateRandomId(); - return new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey); + LogicalFile file = new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey); + knownLogicalFiles.put(fileID, file); + return file; } /** @@ -300,7 +307,11 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle( returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile); return new SegmentFileStateHandle( - physicalFile.getFilePath(), startPos, stateSize, scope); + physicalFile.getFilePath(), + startPos, + stateSize, + scope, + logicalFile.getFileId()); } } @@ -459,13 +470,28 @@ public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) uploadedStates.headMap(checkpointId, true).entrySet().iterator(); while (uploadedStatesIterator.hasNext()) { Map.Entry> entry = uploadedStatesIterator.next(); - if (discardLogicalFiles(subtaskKey, entry.getKey(), entry.getValue())) { + if (discardLogicalFiles(subtaskKey, checkpointId, entry.getValue())) { uploadedStatesIterator.remove(); } } } } + @Override + public void reusePreviousStateHandle( + long checkpointId, Collection stateHandles) { + for (StreamStateHandle stateHandle : stateHandles) { + if (stateHandle instanceof SegmentFileStateHandle) { + LogicalFile file = + knownLogicalFiles.get( + ((SegmentFileStateHandle) stateHandle).getLogicalFileId()); + if (file != null) { + file.advanceLastCheckpointId(checkpointId); + } + } + } + } + private boolean discardLogicalFiles( SubtaskKey subtaskKey, long checkpointId, Set logicalFiles) throws Exception { @@ -476,6 +502,7 @@ private boolean discardLogicalFiles( && logicalFile.getLastUsedCheckpointID() <= checkpointId) { logicalFile.discardWithCheckpointId(checkpointId); logicalFileIterator.remove(); + knownLogicalFiles.remove(logicalFile.getFileId()); } } @@ -547,4 +574,9 @@ private void createManagedDirectory(Path managedPath) { @Override public void close() throws IOException {} + + @VisibleForTesting + public LogicalFile getLogicalFile(LogicalFileId fileId) { + return knownLogicalFiles.get(fileId); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java index 3867ceed0bc..7f7727c6d12 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StateObjectCollection; +import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath; import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; @@ -745,6 +746,7 @@ static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutput dos.writeLong(segmentFileStateHandle.getStateSize()); dos.writeInt(segmentFileStateHandle.getScope().ordinal()); dos.writeUTF(segmentFileStateHandle.getFilePath().toString()); + dos.writeUTF(segmentFileStateHandle.getLogicalFileId().getKeyString()); } } else if (stateHandle instanceof FileStateHandle) { dos.writeByte(FILE_STREAM_STATE_HANDLE); @@ -819,7 +821,9 @@ static StreamStateHandle deserializeStreamStateHandle( long stateSize = dis.readLong(); CheckpointedStateScope scope = CheckpointedStateScope.values()[dis.readInt()]; Path physicalFilePath = new Path(dis.readUTF()); - return new SegmentFileStateHandle(physicalFilePath, startPos, stateSize, scope); + LogicalFile.LogicalFileId logicalFileId = new LogicalFile.LogicalFileId(dis.readUTF()); + return new SegmentFileStateHandle( + physicalFilePath, startPos, stateSize, scope, logicalFileId); } else if (EMPTY_SEGMENT_FILE_HANDLE == type) { return EmptySegmentFileStateHandle.INSTANCE; } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java index 96c95999fcf..f08db8481f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state; import java.io.IOException; +import java.util.Collection; import java.util.List; /** @@ -67,4 +68,15 @@ boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope s */ List duplicate( List stateHandles, CheckpointedStateScope scope) throws IOException; + + /** + * A callback method when some previous handle is reused. It is needed by the file merging + * mechanism (FLIP-306) which will manage the life cycle of underlying files by file-reusing + * information. + * + * @param previousHandle the previous handles that will be reused. + */ + default void reusePreviousStateHandle(Collection previousHandle) { + // Does nothing for normal stream factory + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java index 44b487fd53b..23e38f8da7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/EmptySegmentFileStateHandle.java @@ -20,6 +20,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile; import org.apache.flink.runtime.state.CheckpointedStateScope; import java.io.IOException; @@ -31,11 +32,19 @@ public class EmptySegmentFileStateHandle extends SegmentFileStateHandle { public static final EmptySegmentFileStateHandle INSTANCE = new EmptySegmentFileStateHandle( - new Path("empty"), 0, 0, CheckpointedStateScope.EXCLUSIVE); + new Path("empty"), + 0, + 0, + CheckpointedStateScope.EXCLUSIVE, + new LogicalFile.LogicalFileId("DUMMY")); private EmptySegmentFileStateHandle( - Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) { - super(filePath, startPos, stateSize, scope); + Path filePath, + long startPos, + long stateSize, + CheckpointedStateScope scope, + LogicalFile.LogicalFileId fileId) { + super(filePath, startPos, stateSize, scope, fileId); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java index 6522f55c429..11d68e7b394 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filemerging/SegmentFileStateHandle.java @@ -52,6 +52,9 @@ public class SegmentFileStateHandle implements StreamStateHandle { /** The scope of the state. */ private final CheckpointedStateScope scope; + /** The id for corresponding logical file. Used to retrieve LogicalFile in TM. */ + private final LogicalFile.LogicalFileId logicalFileId; + /** * Creates a new segment file state for the given file path. * @@ -59,13 +62,19 @@ public class SegmentFileStateHandle implements StreamStateHandle { * @param startPos Start position of the segment in the physical file. * @param stateSize Size of the segment. * @param scope The state's scope, whether it is exclusive or shared. + * @param fileId The corresponding logical file id. */ public SegmentFileStateHandle( - Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) { + Path filePath, + long startPos, + long stateSize, + CheckpointedStateScope scope, + LogicalFile.LogicalFileId fileId) { this.filePath = filePath; this.stateSize = stateSize; this.startPos = startPos; this.scope = scope; + this.logicalFileId = fileId; } /** @@ -118,6 +127,10 @@ public CheckpointedStateScope getScope() { return scope; } + public LogicalFile.LogicalFileId getLogicalFileId() { + return logicalFileId; + } + /** * Gets the file system that stores the file state. * @@ -142,7 +155,8 @@ public boolean equals(Object o) { SegmentFileStateHandle that = (SegmentFileStateHandle) o; - return filePath.equals(that.filePath) + return logicalFileId.equals(that.logicalFileId) + && filePath.equals(that.filePath) && startPos == that.startPos && stateSize == that.stateSize && scope.equals(that.scope); @@ -150,7 +164,8 @@ public boolean equals(Object o) { @Override public int hashCode() { - int result = getFilePath().hashCode(); + int result = logicalFileId.hashCode(); + result = 31 * result + Objects.hashCode(getFilePath()); result = 31 * result + Objects.hashCode(startPos); result = 31 * result + Objects.hashCode(stateSize); result = 31 * result + Objects.hashCode(scope); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java index f367094bb8b..32455f10576 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsMergingCheckpointStorageLocation.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.function.Supplier; @@ -113,4 +114,9 @@ public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream( return fileMergingSnapshotManager.createCheckpointStateOutputStream( subtaskKey, checkpointId, scope); } + + @Override + public void reusePreviousStateHandle(Collection previousHandle) { + fileMergingSnapshotManager.reusePreviousStateHandle(checkpointId, previousHandle); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java index 3147e668ef1..af044b9a24d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java @@ -35,6 +35,7 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; import java.util.Random; import java.util.Set; @@ -319,6 +320,67 @@ public void testConcurrentFileReusingWithBlockingPool() throws Exception { } } + @Test + public void testReuseCallbackAndAdvanceWatermark() throws Exception { + long checkpointId = 1; + int streamNum = 20; + int perStreamWriteNum = 128; + + // write random bytes and then read them from the file + byte[] bytes = new byte[streamNum * perStreamWriteNum]; + Random rd = new Random(); + rd.nextBytes(bytes); + int byteIndex = 0; + + SegmentFileStateHandle[] handles = new SegmentFileStateHandle[streamNum]; + try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir); + CloseableRegistry closeableRegistry = new CloseableRegistry()) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + + // repeatedly get-write-close streams + for (int i = 0; i < streamNum; i++) { + FileMergingCheckpointStateOutputStream stream = + fmsm.createCheckpointStateOutputStream( + subtaskKey1, checkpointId, CheckpointedStateScope.SHARED); + try { + closeableRegistry.registerCloseable(stream); + for (int j = 0; j < perStreamWriteNum; j++) { + stream.write(bytes[byteIndex++]); + } + handles[i] = stream.closeAndGetHandle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + // start reuse + for (long cp = checkpointId + 1; cp <= 10; cp++) { + ArrayList reuse = new ArrayList<>(); + for (int j = 0; j <= 10 - cp; j++) { + reuse.add(handles[j]); + } + fmsm.reusePreviousStateHandle(cp, reuse); + // assert the reusing affects the watermark + for (SegmentFileStateHandle handle : reuse) { + assertThat( + ((FileMergingSnapshotManagerBase) fmsm) + .getLogicalFile(handle.getLogicalFileId()) + .getLastUsedCheckpointID()) + .isEqualTo(cp); + } + // subsumed + fmsm.notifyCheckpointSubsumed(subtaskKey1, cp - 1); + // assert the other files discarded. + for (int j = 10 - (int) cp + 1; j < streamNum; j++) { + assertThat( + ((FileMergingSnapshotManagerBase) fmsm) + .getLogicalFile(handles[j].getLogicalFileId())) + .isNull(); + } + } + } + } + FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir) throws IOException { return createFileMergingSnapshotManager( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java index 079fd16c2d7..4ed3f61aa97 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/metadata/CheckpointTestUtils.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.checkpoint.MasterState; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointedStateScope; import org.apache.flink.runtime.state.DiscardRecordedStateObject; @@ -363,7 +364,12 @@ private static class TestingSegmentFileStateHandle extends SegmentFileStateHandl public TestingSegmentFileStateHandle( Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) { - super(filePath, startPos, stateSize, scope); + super( + filePath, + startPos, + stateSize, + scope, + LogicalFile.LogicalFileId.generateRandomId()); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java index fba8653bdcd..1146fa7d722 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java @@ -24,6 +24,7 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.OutputStreamAndPath; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile; import org.apache.flink.runtime.checkpoint.filemerging.PhysicalFile; import org.apache.flink.runtime.state.CheckpointStateOutputStream; import org.apache.flink.runtime.state.StreamStateHandle; @@ -117,7 +118,12 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle( physicalFile.close(); } } - return new SegmentFileStateHandle(filePath, startPos, stateSize, EXCLUSIVE); + return new SegmentFileStateHandle( + filePath, + startPos, + stateSize, + EXCLUSIVE, + LogicalFile.LogicalFileId.generateRandomId()); } @Override diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java index 436a0f2ec1c..968882a6be8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java @@ -261,6 +261,8 @@ public SnapshotResult get(CloseableRegistry snapshotCloseableR // Handles to the misc files in the current snapshot will go here final List miscFiles = new ArrayList<>(); + final List reusedHandle = new ArrayList<>(); + try { metaStateHandle = @@ -283,7 +285,8 @@ public SnapshotResult get(CloseableRegistry snapshotCloseableR sstFiles, miscFiles, snapshotCloseableRegistry, - tmpResourcesRegistry); + tmpResourcesRegistry, + reusedHandle); // We make the 'sstFiles' as the 'sharedState' in IncrementalRemoteKeyedStateHandle, // whether they belong to the sharded CheckpointedStateScope or exclusive @@ -321,6 +324,10 @@ public SnapshotResult get(CloseableRegistry snapshotCloseableR } finally { if (!completed) { cleanupIncompleteSnapshot(tmpResourcesRegistry, localBackupDirectory); + } else { + // Report the reuse of state handle to stream factory, which is essential for + // file merging mechanism. + checkpointStreamFactory.reusePreviousStateHandle(reusedHandle); } } } @@ -330,7 +337,8 @@ private long uploadSnapshotFiles( @Nonnull List sstFiles, @Nonnull List miscFiles, @Nonnull CloseableRegistry snapshotCloseableRegistry, - @Nonnull CloseableRegistry tmpResourcesRegistry) + @Nonnull CloseableRegistry tmpResourcesRegistry, + @Nonnull List reusedHandle) throws Exception { // write state data @@ -349,6 +357,9 @@ private long uploadSnapshotFiles( ? CheckpointedStateScope.EXCLUSIVE : CheckpointedStateScope.SHARED; + // Collect the reuse of state handle. + sstFiles.stream().map(HandleAndLocalPath::getHandle).forEach(reusedHandle::add); + List sstFilesUploadResult = stateUploader.uploadFilesToCheckpointFs( sstFilePaths,