From 1a7982481df72336b004d2581772a96e3205b0d7 Mon Sep 17 00:00:00 2001 From: fredia Date: Tue, 7 May 2024 20:14:08 +0800 Subject: [PATCH] [FLINK-32087][checkpoint] Introduce space amplification statistics of file merging --- .../FileMergingSnapshotManager.java | 75 +++++++++++++++++++ .../FileMergingSnapshotManagerBase.java | 22 +++++- .../checkpoint/filemerging/LogicalFile.java | 1 + .../checkpoint/filemerging/PhysicalFile.java | 4 +- ...ckpointFileMergingSnapshotManagerTest.java | 30 +++++++- .../FileMergingSnapshotManagerTestBase.java | 47 ++++++++++++ ...ckpointFileMergingSnapshotManagerTest.java | 30 +++++++- ...ergingCheckpointStateOutputStreamTest.java | 5 +- 8 files changed, 204 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index add8806369cb8b..98da4a3bc49e12 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -34,6 +34,7 @@ import java.io.Closeable; import java.util.Collection; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Stream; /** @@ -268,4 +269,78 @@ public String toString() { "%s-%s(%d/%d)", jobIDString, operatorIDString, subtaskIndex, parallelism); } } + + /** Space usage statistics of a managed directory. */ + final class SpaceStat { + + AtomicLong physicalFileCount; + AtomicLong physicalFileSize; + + AtomicLong logicalFileCount; + AtomicLong logicalFileSize; + + public SpaceStat() { + this(0, 0, 0, 0); + } + + public SpaceStat( + long physicalFileCount, + long physicalFileSize, + long logicalFileCount, + long logicalFileSize) { + this.physicalFileCount = new AtomicLong(0); + this.physicalFileSize = new AtomicLong(physicalFileSize); + this.logicalFileCount = new AtomicLong(0); + this.logicalFileSize = new AtomicLong(logicalFileSize); + } + + public void onLogicalFileCreate(long size) { + physicalFileSize.addAndGet(size); + logicalFileSize.addAndGet(size); + logicalFileCount.incrementAndGet(); + } + + public void onLogicalFileDelete(long size) { + logicalFileSize.addAndGet(-size); + logicalFileCount.decrementAndGet(); + } + + public void onPhysicalFileCreate() { + physicalFileCount.incrementAndGet(); + } + + public void onPhysicalFileDelete(long size) { + physicalFileSize.addAndGet(-size); + physicalFileCount.decrementAndGet(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SpaceStat spaceStat = (SpaceStat) o; + return physicalFileCount.get() == spaceStat.physicalFileCount.get() + && physicalFileSize.get() == spaceStat.physicalFileSize.get() + && logicalFileCount.get() == spaceStat.logicalFileCount.get() + && logicalFileSize.get() == spaceStat.logicalFileSize.get(); + } + + @Override + public String toString() { + return "SpaceStat{" + + "physicalFileCount=" + + physicalFileCount.get() + + ", physicalFileSize=" + + physicalFileSize.get() + + ", logicalFileCount=" + + logicalFileCount.get() + + ", logicalFileSize=" + + logicalFileSize.get() + + '}'; + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index 1e1abdcaf64309..25c25cebe70c49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -133,12 +133,16 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps */ protected DirectoryStreamStateHandle managedExclusiveStateDirHandle; + /** The current space statistic, updated on file creation/deletion. */ + protected SpaceStat spaceStat; + public FileMergingSnapshotManagerBase( String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) { this.id = id; this.maxPhysicalFileSize = maxFileSize; this.filePoolType = filePoolType; this.ioExecutor = ioExecutor; + this.spaceStat = new SpaceStat(); } @Override @@ -215,6 +219,7 @@ protected LogicalFile createLogicalFile( LogicalFileId fileID = LogicalFileId.generateRandomId(); LogicalFile file = new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey); knownLogicalFiles.put(fileID, file); + spaceStat.onLogicalFileCreate(length); return file; } @@ -305,7 +310,6 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle( } // deal with physicalFile file - physicalFile.incSize(stateSize); returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile); return new SegmentFileStateHandle( @@ -321,7 +325,7 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle( public void closeStreamExceptionally() throws IOException { if (physicalFile != null) { if (logicalFile != null) { - logicalFile.discardWithCheckpointId(checkpointId); + discardSingleLogicalFile(logicalFile, checkpointId); } else { // The physical file should be closed anyway. This is because the // last segmented write on this file is likely to have failed, and @@ -336,6 +340,7 @@ public void closeStreamExceptionally() throws IOException { private void updateFileCreationMetrics(Path path) { // TODO: FLINK-32091 add io metrics + spaceStat.onPhysicalFileCreate(); LOG.debug("Create a new physical file {} for checkpoint file merging.", path); } @@ -364,11 +369,12 @@ boolean isResponsibleForFile(Path filePath) { * * @param filePath the given file path to delete. */ - protected final void deletePhysicalFile(Path filePath) { + protected final void deletePhysicalFile(Path filePath, long size) { ioExecutor.execute( () -> { try { fs.delete(filePath, false); + spaceStat.onPhysicalFileDelete(size); LOG.debug("Physical file deleted: {}.", filePath); } catch (IOException e) { LOG.warn("Fail to delete file: {}", filePath); @@ -494,6 +500,12 @@ public void reusePreviousStateHandle( } } + public void discardSingleLogicalFile(LogicalFile logicalFile, long checkpointId) + throws IOException { + logicalFile.discardWithCheckpointId(checkpointId); + spaceStat.onLogicalFileDelete(logicalFile.getLength()); + } + private boolean discardLogicalFiles( SubtaskKey subtaskKey, long checkpointId, Set logicalFiles) throws Exception { @@ -502,7 +514,7 @@ private boolean discardLogicalFiles( LogicalFile logicalFile = logicalFileIterator.next(); if (logicalFile.getSubtaskKey().equals(subtaskKey) && logicalFile.getLastUsedCheckpointID() <= checkpointId) { - logicalFile.discardWithCheckpointId(checkpointId); + discardSingleLogicalFile(logicalFile, checkpointId); logicalFileIterator.remove(); knownLogicalFiles.remove(logicalFile.getFileId()); } @@ -607,6 +619,7 @@ public void restoreStateHandles( fileHandle.getScope())) ? physicalFileDeleter : null; + spaceStat.onPhysicalFileCreate(); return new PhysicalFile( null, path, fileDeleter, fileHandle.getScope()); }); @@ -619,6 +632,7 @@ public void restoreStateHandles( fileHandle.getStartPos(), fileHandle.getStateSize(), subtaskKey); + spaceStat.onLogicalFileCreate(logicalFile.getLength()); knownLogicalFiles.put(logicalFileId, logicalFile); logicalFile.advanceLastCheckpointId(checkpointId); restoredLogicalFiles.add(logicalFile); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java index 489e97059af006..c53a1116a36f12 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/LogicalFile.java @@ -90,6 +90,7 @@ public LogicalFile( this.length = length; this.subtaskKey = subtaskKey; physicalFile.incRefCount(); + physicalFile.incSize(length); } public LogicalFileId getFileId() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java index 8c99b97ea0537c..73e29d5175646d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/PhysicalFile.java @@ -42,7 +42,7 @@ public class PhysicalFile { @FunctionalInterface public interface PhysicalFileDeleter { /** Delete the file. */ - void perform(Path filePath) throws IOException; + void perform(Path filePath, long size) throws IOException; } /** Functional interface to create the physical file. */ @@ -142,7 +142,7 @@ public void deleteIfNecessary() throws IOException { } } if (deleter != null) { - deleter.perform(filePath); + deleter.perform(filePath, size.get()); } else { LOG.debug( "Skip deleting this file {} because it is not owned by FileMergingManager.", diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java index cc6c53793df9f5..d4bec90b33537b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/AcrossCheckpointFileMergingSnapshotManagerTest.java @@ -56,9 +56,11 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file2.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); assertThat(file2).isNotEqualTo(file1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); // return for reuse fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); // allocate for another subtask PhysicalFile file3 = @@ -67,6 +69,7 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file3.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED)); assertThat(file3).isNotEqualTo(file1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3); // allocate for another checkpoint PhysicalFile file4 = @@ -75,16 +78,20 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file4.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); assertThat(file4).isEqualTo(file1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3); // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused file4.incSize(fmsm.maxPhysicalFileSize); fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 1, file4); + // file4 is discarded because it's size is bigger than maxPhysicalFileSize + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); PhysicalFile file5 = fmsm.getOrCreatePhysicalFileForCheckpoint( subtaskKey1, 1, CheckpointedStateScope.SHARED); assertThat(file5.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); assertThat(file5).isNotEqualTo(file4); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3); // Secondly, we try private state PhysicalFile file6 = @@ -92,6 +99,7 @@ void testCreateAndReuseFiles() throws IOException { subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE); assertThat(file6.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4); // allocate another PhysicalFile file7 = @@ -100,9 +108,11 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file7.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); assertThat(file7).isNotEqualTo(file5); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5); // return for reuse fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file6); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5); // allocate for another checkpoint PhysicalFile file8 = @@ -111,9 +121,11 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file8.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); assertThat(file8).isEqualTo(file6); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5); // return for reuse fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file8); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5); // allocate for this checkpoint but another subtask PhysicalFile file9 = @@ -122,16 +134,19 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file9.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)); assertThat(file9).isEqualTo(file6); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5); // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused file9.incSize(fmsm.maxPhysicalFileSize); fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 2, file9); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4); PhysicalFile file10 = fmsm.getOrCreatePhysicalFileForCheckpoint( subtaskKey1, 2, CheckpointedStateScope.SHARED); assertThat(file10.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); assertThat(file10).isNotEqualTo(file9); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5); assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); @@ -140,33 +155,44 @@ void testCreateAndReuseFiles() throws IOException { @Test public void testCheckpointNotification() throws Exception { - try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir); + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir); CloseableRegistry closeableRegistry = new CloseableRegistry()) { FileMergingCheckpointStateOutputStream cp1Stream = writeCheckpointAndGetStream(1, fmsm, closeableRegistry); SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle(); fmsm.notifyCheckpointComplete(subtaskKey1, 1); assertFileInManagedDir(fmsm, cp1StateHandle); - + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); // complete checkpoint-2 FileMergingCheckpointStateOutputStream cp2Stream = writeCheckpointAndGetStream(2, fmsm, closeableRegistry); SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle(); fmsm.notifyCheckpointComplete(subtaskKey1, 2); assertFileInManagedDir(fmsm, cp2StateHandle); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); // subsume checkpoint-1 assertThat(fileExists(cp1StateHandle)).isTrue(); fmsm.notifyCheckpointSubsumed(subtaskKey1, 1); assertThat(fileExists(cp1StateHandle)).isTrue(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); // abort checkpoint-3 FileMergingCheckpointStateOutputStream cp3Stream = writeCheckpointAndGetStream(3, fmsm, closeableRegistry); SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); assertFileInManagedDir(fmsm, cp3StateHandle); fmsm.notifyCheckpointAborted(subtaskKey1, 3); assertThat(fileExists(cp3StateHandle)).isTrue(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java index f8677e491db15f..5dcd9b86266661 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerTestBase.java @@ -27,6 +27,7 @@ import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SpaceStat; import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager.SubtaskKey; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointedStateScope; @@ -198,6 +199,48 @@ void testSizeStatsInPhysicalFile() throws IOException { } } + @Test + void testSpaceStat() throws IOException { + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir)) { + fmsm.registerSubtaskForSharedStates(subtaskKey1); + fmsm.registerSubtaskForSharedStates(subtaskKey2); + PhysicalFile physicalFile1 = + fmsm.getOrCreatePhysicalFileForCheckpoint( + subtaskKey1, 0, CheckpointedStateScope.SHARED); + assertThat(physicalFile1.isOpen()).isTrue(); + + LogicalFile logicalFile1 = fmsm.createLogicalFile(physicalFile1, 0, 123, subtaskKey1); + assertThat(fmsm.spaceStat.physicalFileSize.get()).isEqualTo(123); + assertThat(fmsm.spaceStat.logicalFileSize.get()).isEqualTo(123); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); + assertThat(physicalFile1.getSize()).isEqualTo(123); + + LogicalFile logicalFile2 = fmsm.createLogicalFile(physicalFile1, 0, 456, subtaskKey1); + assertThat(fmsm.spaceStat.physicalFileSize.get()).isEqualTo(123 + 456); + assertThat(fmsm.spaceStat.logicalFileSize.get()).isEqualTo(123 + 456); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); + assertThat(physicalFile1.getSize()).isEqualTo(123 + 456); + + logicalFile1.discardWithCheckpointId(1); + fmsm.discardSingleLogicalFile(logicalFile1, 1); + assertThat(fmsm.spaceStat.physicalFileSize.get()).isEqualTo(123 + 456); + assertThat(fmsm.spaceStat.logicalFileSize.get()).isEqualTo(456); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); + + physicalFile1.close(); + fmsm.discardSingleLogicalFile(logicalFile2, 1); + assertThat(fmsm.spaceStat.physicalFileSize.get()).isEqualTo(0); + assertThat(fmsm.spaceStat.logicalFileSize.get()).isEqualTo(0); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(0); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(0); + } + } + @Test public void testReusedFileWriting() throws Exception { long checkpointId = 1; @@ -406,6 +449,7 @@ public void testReuseCallbackAndAdvanceWatermark() throws Exception { public void testRestore() throws Exception { TaskStateSnapshot taskStateSnapshot; long checkpointId = 222; + SpaceStat oldSpaceStat; // Step1: build TaskStateSnapshot using FileMergingSnapshotManagerBase; try (FileMergingSnapshotManagerBase fmsm = @@ -416,7 +460,9 @@ public void testRestore() throws Exception { subtaskStatesByOperatorID.put( operatorID, buildOperatorSubtaskState(checkpointId, fmsm, closeableRegistry)); taskStateSnapshot = new TaskStateSnapshot(subtaskStatesByOperatorID); + oldSpaceStat = fmsm.spaceStat; } + assertThat(taskStateSnapshot).isNotNull(); // Step 2: restore FileMergingSnapshotManagerBase from the TaskStateSnapshot. @@ -447,6 +493,7 @@ public void testRestore() throws Exception { Set restoreFileSet = stateFiles.get(checkpointId); assertThat(restoreFileSet).isNotNull(); assertThat(restoreFileSet.size()).isEqualTo(4); + assertThat(fmsm.spaceStat).isEqualTo(oldSpaceStat); for (LogicalFile file : restoreFileSet) { assertThat(fmsm.getLogicalFile(file.getFileId())).isEqualTo(file); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java index 340bb917d41ae7..2401fc1430370a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/filemerging/WithinCheckpointFileMergingSnapshotManagerTest.java @@ -49,6 +49,7 @@ void testCreateAndReuseFiles() throws IOException { subtaskKey1, 0, CheckpointedStateScope.SHARED); assertThat(file1.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); // allocate another PhysicalFile file2 = fmsm.getOrCreatePhysicalFileForCheckpoint( @@ -56,9 +57,11 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file2.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); assertThat(file2).isNotEqualTo(file1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); // return for reuse fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); // allocate for another subtask PhysicalFile file3 = @@ -67,6 +70,7 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file3.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED)); assertThat(file3).isNotEqualTo(file1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3); // allocate for another checkpoint PhysicalFile file4 = @@ -75,6 +79,7 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file4.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); assertThat(file4).isNotEqualTo(file1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4); // allocate for this checkpoint PhysicalFile file5 = @@ -83,16 +88,19 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file5.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); assertThat(file5).isEqualTo(file1); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4); // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused file5.incSize(fmsm.maxPhysicalFileSize); fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file5); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3); PhysicalFile file6 = fmsm.getOrCreatePhysicalFileForCheckpoint( subtaskKey1, 0, CheckpointedStateScope.SHARED); assertThat(file6.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); assertThat(file6).isNotEqualTo(file5); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4); // Secondly, we try private state PhysicalFile file7 = @@ -100,6 +108,7 @@ void testCreateAndReuseFiles() throws IOException { subtaskKey1, 0, CheckpointedStateScope.EXCLUSIVE); assertThat(file7.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5); // allocate another PhysicalFile file8 = @@ -108,9 +117,11 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file8.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); assertThat(file8).isNotEqualTo(file6); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(6); // return for reuse fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file7); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(6); // allocate for another checkpoint PhysicalFile file9 = @@ -119,6 +130,7 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file9.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); assertThat(file9).isNotEqualTo(file7); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(7); // allocate for this checkpoint but another subtask PhysicalFile file10 = @@ -127,16 +139,19 @@ void testCreateAndReuseFiles() throws IOException { assertThat(file10.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)); assertThat(file10).isEqualTo(file7); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(7); // a physical file whose size is bigger than maxPhysicalFileSize cannot be reused file10.incSize(fmsm.maxPhysicalFileSize); fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file10); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(6); PhysicalFile file11 = fmsm.getOrCreatePhysicalFileForCheckpoint( subtaskKey1, 0, CheckpointedStateScope.SHARED); assertThat(file11.getFilePath().getParent()) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED)); assertThat(file11).isNotEqualTo(file10); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(7); assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE)) .isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE)); @@ -145,13 +160,17 @@ void testCreateAndReuseFiles() throws IOException { @Test public void testCheckpointNotification() throws Exception { - try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir); + try (FileMergingSnapshotManagerBase fmsm = + (FileMergingSnapshotManagerBase) + createFileMergingSnapshotManager(checkpointBaseDir); CloseableRegistry closeableRegistry = new CloseableRegistry()) { FileMergingCheckpointStateOutputStream cp1Stream = writeCheckpointAndGetStream(1, fmsm, closeableRegistry); SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle(); fmsm.notifyCheckpointComplete(subtaskKey1, 1); assertFileInManagedDir(fmsm, cp1StateHandle); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); // complete checkpoint-2 FileMergingCheckpointStateOutputStream cp2Stream = @@ -159,19 +178,28 @@ public void testCheckpointNotification() throws Exception { SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle(); fmsm.notifyCheckpointComplete(subtaskKey1, 2); assertFileInManagedDir(fmsm, cp2StateHandle); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); // subsume checkpoint-1 assertThat(fileExists(cp1StateHandle)).isTrue(); fmsm.notifyCheckpointSubsumed(subtaskKey1, 1); assertThat(fileExists(cp1StateHandle)).isFalse(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); // abort checkpoint-3 FileMergingCheckpointStateOutputStream cp3Stream = writeCheckpointAndGetStream(3, fmsm, closeableRegistry); SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2); + assertFileInManagedDir(fmsm, cp3StateHandle); fmsm.notifyCheckpointAborted(subtaskKey1, 3); assertThat(fileExists(cp3StateHandle)).isFalse(); + assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1); + assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java index 1146fa7d722cdc..062379bc2b6fc3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileMergingCheckpointStateOutputStreamTest.java @@ -90,7 +90,10 @@ private FileMergingCheckpointStateOutputStream getNewStream(boolean reuseLastPhy FileSystem.WriteMode.NO_OVERWRITE); physicalFile = new PhysicalFile( - streamAndPath.stream(), physicalFilePath, (path) -> {}, EXCLUSIVE); + streamAndPath.stream(), + physicalFilePath, + (path, size) -> {}, + EXCLUSIVE); } isPhysicalFileProvided = false;