Skip to content

Commit

Permalink
[FLINK-32087][checkpoint] Introduce space amplification statistics of…
Browse files Browse the repository at this point in the history
… file merging
  • Loading branch information
fredia committed May 8, 2024
1 parent ea4112a commit 1a79824
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import java.io.Closeable;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

/**
Expand Down Expand Up @@ -268,4 +269,78 @@ public String toString() {
"%s-%s(%d/%d)", jobIDString, operatorIDString, subtaskIndex, parallelism);
}
}

/** Space usage statistics of a managed directory. */
final class SpaceStat {

AtomicLong physicalFileCount;
AtomicLong physicalFileSize;

AtomicLong logicalFileCount;
AtomicLong logicalFileSize;

public SpaceStat() {
this(0, 0, 0, 0);
}

public SpaceStat(
long physicalFileCount,
long physicalFileSize,
long logicalFileCount,
long logicalFileSize) {
this.physicalFileCount = new AtomicLong(0);
this.physicalFileSize = new AtomicLong(physicalFileSize);
this.logicalFileCount = new AtomicLong(0);
this.logicalFileSize = new AtomicLong(logicalFileSize);
}

public void onLogicalFileCreate(long size) {
physicalFileSize.addAndGet(size);
logicalFileSize.addAndGet(size);
logicalFileCount.incrementAndGet();
}

public void onLogicalFileDelete(long size) {
logicalFileSize.addAndGet(-size);
logicalFileCount.decrementAndGet();
}

public void onPhysicalFileCreate() {
physicalFileCount.incrementAndGet();
}

public void onPhysicalFileDelete(long size) {
physicalFileSize.addAndGet(-size);
physicalFileCount.decrementAndGet();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SpaceStat spaceStat = (SpaceStat) o;
return physicalFileCount.get() == spaceStat.physicalFileCount.get()
&& physicalFileSize.get() == spaceStat.physicalFileSize.get()
&& logicalFileCount.get() == spaceStat.logicalFileCount.get()
&& logicalFileSize.get() == spaceStat.logicalFileSize.get();
}

@Override
public String toString() {
return "SpaceStat{"
+ "physicalFileCount="
+ physicalFileCount.get()
+ ", physicalFileSize="
+ physicalFileSize.get()
+ ", logicalFileCount="
+ logicalFileCount.get()
+ ", logicalFileSize="
+ logicalFileSize.get()
+ '}';
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,16 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
*/
protected DirectoryStreamStateHandle managedExclusiveStateDirHandle;

/** The current space statistic, updated on file creation/deletion. */
protected SpaceStat spaceStat;

public FileMergingSnapshotManagerBase(
String id, long maxFileSize, PhysicalFilePool.Type filePoolType, Executor ioExecutor) {
this.id = id;
this.maxPhysicalFileSize = maxFileSize;
this.filePoolType = filePoolType;
this.ioExecutor = ioExecutor;
this.spaceStat = new SpaceStat();
}

@Override
Expand Down Expand Up @@ -215,6 +219,7 @@ protected LogicalFile createLogicalFile(
LogicalFileId fileID = LogicalFileId.generateRandomId();
LogicalFile file = new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey);
knownLogicalFiles.put(fileID, file);
spaceStat.onLogicalFileCreate(length);
return file;
}

Expand Down Expand Up @@ -305,7 +310,6 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle(
}

// deal with physicalFile file
physicalFile.incSize(stateSize);
returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile);

return new SegmentFileStateHandle(
Expand All @@ -321,7 +325,7 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle(
public void closeStreamExceptionally() throws IOException {
if (physicalFile != null) {
if (logicalFile != null) {
logicalFile.discardWithCheckpointId(checkpointId);
discardSingleLogicalFile(logicalFile, checkpointId);
} else {
// The physical file should be closed anyway. This is because the
// last segmented write on this file is likely to have failed, and
Expand All @@ -336,6 +340,7 @@ public void closeStreamExceptionally() throws IOException {

private void updateFileCreationMetrics(Path path) {
// TODO: FLINK-32091 add io metrics
spaceStat.onPhysicalFileCreate();
LOG.debug("Create a new physical file {} for checkpoint file merging.", path);
}

Expand Down Expand Up @@ -364,11 +369,12 @@ boolean isResponsibleForFile(Path filePath) {
*
* @param filePath the given file path to delete.
*/
protected final void deletePhysicalFile(Path filePath) {
protected final void deletePhysicalFile(Path filePath, long size) {
ioExecutor.execute(
() -> {
try {
fs.delete(filePath, false);
spaceStat.onPhysicalFileDelete(size);
LOG.debug("Physical file deleted: {}.", filePath);
} catch (IOException e) {
LOG.warn("Fail to delete file: {}", filePath);
Expand Down Expand Up @@ -494,6 +500,12 @@ public void reusePreviousStateHandle(
}
}

public void discardSingleLogicalFile(LogicalFile logicalFile, long checkpointId)
throws IOException {
logicalFile.discardWithCheckpointId(checkpointId);
spaceStat.onLogicalFileDelete(logicalFile.getLength());
}

private boolean discardLogicalFiles(
SubtaskKey subtaskKey, long checkpointId, Set<LogicalFile> logicalFiles)
throws Exception {
Expand All @@ -502,7 +514,7 @@ private boolean discardLogicalFiles(
LogicalFile logicalFile = logicalFileIterator.next();
if (logicalFile.getSubtaskKey().equals(subtaskKey)
&& logicalFile.getLastUsedCheckpointID() <= checkpointId) {
logicalFile.discardWithCheckpointId(checkpointId);
discardSingleLogicalFile(logicalFile, checkpointId);
logicalFileIterator.remove();
knownLogicalFiles.remove(logicalFile.getFileId());
}
Expand Down Expand Up @@ -607,6 +619,7 @@ public void restoreStateHandles(
fileHandle.getScope()))
? physicalFileDeleter
: null;
spaceStat.onPhysicalFileCreate();
return new PhysicalFile(
null, path, fileDeleter, fileHandle.getScope());
});
Expand All @@ -619,6 +632,7 @@ public void restoreStateHandles(
fileHandle.getStartPos(),
fileHandle.getStateSize(),
subtaskKey);
spaceStat.onLogicalFileCreate(logicalFile.getLength());
knownLogicalFiles.put(logicalFileId, logicalFile);
logicalFile.advanceLastCheckpointId(checkpointId);
restoredLogicalFiles.add(logicalFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public LogicalFile(
this.length = length;
this.subtaskKey = subtaskKey;
physicalFile.incRefCount();
physicalFile.incSize(length);
}

public LogicalFileId getFileId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class PhysicalFile {
@FunctionalInterface
public interface PhysicalFileDeleter {
/** Delete the file. */
void perform(Path filePath) throws IOException;
void perform(Path filePath, long size) throws IOException;
}

/** Functional interface to create the physical file. */
Expand Down Expand Up @@ -142,7 +142,7 @@ public void deleteIfNecessary() throws IOException {
}
}
if (deleter != null) {
deleter.perform(filePath);
deleter.perform(filePath, size.get());
} else {
LOG.debug(
"Skip deleting this file {} because it is not owned by FileMergingManager.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ void testCreateAndReuseFiles() throws IOException {
assertThat(file2.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file2).isNotEqualTo(file1);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);

// return for reuse
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file1);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);

// allocate for another subtask
PhysicalFile file3 =
Expand All @@ -67,6 +69,7 @@ void testCreateAndReuseFiles() throws IOException {
assertThat(file3.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.SHARED));
assertThat(file3).isNotEqualTo(file1);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3);

// allocate for another checkpoint
PhysicalFile file4 =
Expand All @@ -75,23 +78,28 @@ void testCreateAndReuseFiles() throws IOException {
assertThat(file4.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file4).isEqualTo(file1);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3);

// a physical file whose size is bigger than maxPhysicalFileSize cannot be reused
file4.incSize(fmsm.maxPhysicalFileSize);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 1, file4);
// file4 is discarded because it's size is bigger than maxPhysicalFileSize
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(2);
PhysicalFile file5 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 1, CheckpointedStateScope.SHARED);
assertThat(file5.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file5).isNotEqualTo(file4);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(3);

// Secondly, we try private state
PhysicalFile file6 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 1, CheckpointedStateScope.EXCLUSIVE);
assertThat(file6.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4);

// allocate another
PhysicalFile file7 =
Expand All @@ -100,9 +108,11 @@ void testCreateAndReuseFiles() throws IOException {
assertThat(file7.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
assertThat(file7).isNotEqualTo(file5);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5);

// return for reuse
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file6);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5);

// allocate for another checkpoint
PhysicalFile file8 =
Expand All @@ -111,9 +121,11 @@ void testCreateAndReuseFiles() throws IOException {
assertThat(file8.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
assertThat(file8).isEqualTo(file6);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5);

// return for reuse
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 0, file8);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5);

// allocate for this checkpoint but another subtask
PhysicalFile file9 =
Expand All @@ -122,16 +134,19 @@ void testCreateAndReuseFiles() throws IOException {
assertThat(file9.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE));
assertThat(file9).isEqualTo(file6);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5);

// a physical file whose size is bigger than maxPhysicalFileSize cannot be reused
file9.incSize(fmsm.maxPhysicalFileSize);
fmsm.returnPhysicalFileForNextReuse(subtaskKey1, 2, file9);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(4);
PhysicalFile file10 =
fmsm.getOrCreatePhysicalFileForCheckpoint(
subtaskKey1, 2, CheckpointedStateScope.SHARED);
assertThat(file10.getFilePath().getParent())
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.SHARED));
assertThat(file10).isNotEqualTo(file9);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(5);

assertThat(fmsm.getManagedDir(subtaskKey2, CheckpointedStateScope.EXCLUSIVE))
.isEqualTo(fmsm.getManagedDir(subtaskKey1, CheckpointedStateScope.EXCLUSIVE));
Expand All @@ -140,33 +155,44 @@ void testCreateAndReuseFiles() throws IOException {

@Test
public void testCheckpointNotification() throws Exception {
try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir);
try (FileMergingSnapshotManagerBase fmsm =
(FileMergingSnapshotManagerBase)
createFileMergingSnapshotManager(checkpointBaseDir);
CloseableRegistry closeableRegistry = new CloseableRegistry()) {
FileMergingCheckpointStateOutputStream cp1Stream =
writeCheckpointAndGetStream(1, fmsm, closeableRegistry);
SegmentFileStateHandle cp1StateHandle = cp1Stream.closeAndGetHandle();
fmsm.notifyCheckpointComplete(subtaskKey1, 1);
assertFileInManagedDir(fmsm, cp1StateHandle);

assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1);
// complete checkpoint-2
FileMergingCheckpointStateOutputStream cp2Stream =
writeCheckpointAndGetStream(2, fmsm, closeableRegistry);
SegmentFileStateHandle cp2StateHandle = cp2Stream.closeAndGetHandle();
fmsm.notifyCheckpointComplete(subtaskKey1, 2);
assertFileInManagedDir(fmsm, cp2StateHandle);
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);

// subsume checkpoint-1
assertThat(fileExists(cp1StateHandle)).isTrue();
fmsm.notifyCheckpointSubsumed(subtaskKey1, 1);
assertThat(fileExists(cp1StateHandle)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1);

// abort checkpoint-3
FileMergingCheckpointStateOutputStream cp3Stream =
writeCheckpointAndGetStream(3, fmsm, closeableRegistry);
SegmentFileStateHandle cp3StateHandle = cp3Stream.closeAndGetHandle();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(2);
assertFileInManagedDir(fmsm, cp3StateHandle);
fmsm.notifyCheckpointAborted(subtaskKey1, 3);
assertThat(fileExists(cp3StateHandle)).isTrue();
assertThat(fmsm.spaceStat.physicalFileCount.get()).isEqualTo(1);
assertThat(fmsm.spaceStat.logicalFileCount.get()).isEqualTo(1);
}
}
}

0 comments on commit 1a79824

Please sign in to comment.