Skip to content

Commit

Permalink
[FLINK-34936][Checkpointing] Register reused shared state handle to F…
Browse files Browse the repository at this point in the history
…ileMergingSnapshotManager
  • Loading branch information
Zakelly authored and fredia committed Apr 17, 2024
1 parent 7a90a05 commit 31ea1a9
Show file tree
Hide file tree
Showing 11 changed files with 189 additions and 14 deletions.
Expand Up @@ -24,12 +24,14 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageAccess;

import java.io.Closeable;
import java.util.Collection;

/**
* FileMergingSnapshotManager provides an interface to manage files and meta information for
Expand Down Expand Up @@ -157,6 +159,16 @@ DirectoryStreamStateHandle getManagedDirStateHandle(
*/
void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId) throws Exception;

/**
* A callback method which is called when previous state handles are reused by following
* checkpoint(s).
*
* @param checkpointId the checkpoint that reuses the handles.
* @param stateHandles the handles to be reused.
*/
void reusePreviousStateHandle(
long checkpointId, Collection<? extends StreamStateHandle> stateHandles);

/**
* A key identifies a subtask. A subtask can be identified by the operator id, subtask index and
* the parallelism. Note that this key should be consistent across job attempts.
Expand Down
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile.LogicalFileId;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.state.filesystem.FileMergingCheckpointStateOutputStream;
Expand All @@ -42,6 +43,7 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
Expand Down Expand Up @@ -70,6 +72,9 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps
@GuardedBy("lock")
protected TreeMap<Long, Set<LogicalFile>> uploadedStates = new TreeMap<>();

/** The map that holds all the known live logical files. */
private final Map<LogicalFileId, LogicalFile> knownLogicalFiles = new ConcurrentHashMap<>();

/** The {@link FileSystem} that this manager works on. */
protected FileSystem fs;

Expand Down Expand Up @@ -206,7 +211,9 @@ protected LogicalFile createLogicalFile(
long length,
@Nonnull SubtaskKey subtaskKey) {
LogicalFileId fileID = LogicalFileId.generateRandomId();
return new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey);
LogicalFile file = new LogicalFile(fileID, physicalFile, startOffset, length, subtaskKey);
knownLogicalFiles.put(fileID, file);
return file;
}

/**
Expand Down Expand Up @@ -300,7 +307,11 @@ public SegmentFileStateHandle closeStreamAndCreateStateHandle(
returnPhysicalFileForNextReuse(subtaskKey, checkpointId, physicalFile);

return new SegmentFileStateHandle(
physicalFile.getFilePath(), startPos, stateSize, scope);
physicalFile.getFilePath(),
startPos,
stateSize,
scope,
logicalFile.getFileId());
}
}

Expand Down Expand Up @@ -459,13 +470,28 @@ public void notifyCheckpointSubsumed(SubtaskKey subtaskKey, long checkpointId)
uploadedStates.headMap(checkpointId, true).entrySet().iterator();
while (uploadedStatesIterator.hasNext()) {
Map.Entry<Long, Set<LogicalFile>> entry = uploadedStatesIterator.next();
if (discardLogicalFiles(subtaskKey, entry.getKey(), entry.getValue())) {
if (discardLogicalFiles(subtaskKey, checkpointId, entry.getValue())) {
uploadedStatesIterator.remove();
}
}
}
}

@Override
public void reusePreviousStateHandle(
long checkpointId, Collection<? extends StreamStateHandle> stateHandles) {
for (StreamStateHandle stateHandle : stateHandles) {
if (stateHandle instanceof SegmentFileStateHandle) {
LogicalFile file =
knownLogicalFiles.get(
((SegmentFileStateHandle) stateHandle).getLogicalFileId());
if (file != null) {
file.advanceLastCheckpointId(checkpointId);
}
}
}
}

private boolean discardLogicalFiles(
SubtaskKey subtaskKey, long checkpointId, Set<LogicalFile> logicalFiles)
throws Exception {
Expand All @@ -476,6 +502,7 @@ private boolean discardLogicalFiles(
&& logicalFile.getLastUsedCheckpointID() <= checkpointId) {
logicalFile.discardWithCheckpointId(checkpointId);
logicalFileIterator.remove();
knownLogicalFiles.remove(logicalFile.getFileId());
}
}

Expand Down Expand Up @@ -547,4 +574,9 @@ private void createManagedDirectory(Path managedPath) {

@Override
public void close() throws IOException {}

@VisibleForTesting
public LogicalFile getLogicalFile(LogicalFileId fileId) {
return knownLogicalFiles.get(fileId);
}
}
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocalPath;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
Expand Down Expand Up @@ -745,6 +746,7 @@ static void serializeStreamStateHandle(StreamStateHandle stateHandle, DataOutput
dos.writeLong(segmentFileStateHandle.getStateSize());
dos.writeInt(segmentFileStateHandle.getScope().ordinal());
dos.writeUTF(segmentFileStateHandle.getFilePath().toString());
dos.writeUTF(segmentFileStateHandle.getLogicalFileId().getKeyString());
}
} else if (stateHandle instanceof FileStateHandle) {
dos.writeByte(FILE_STREAM_STATE_HANDLE);
Expand Down Expand Up @@ -819,7 +821,9 @@ static StreamStateHandle deserializeStreamStateHandle(
long stateSize = dis.readLong();
CheckpointedStateScope scope = CheckpointedStateScope.values()[dis.readInt()];
Path physicalFilePath = new Path(dis.readUTF());
return new SegmentFileStateHandle(physicalFilePath, startPos, stateSize, scope);
LogicalFile.LogicalFileId logicalFileId = new LogicalFile.LogicalFileId(dis.readUTF());
return new SegmentFileStateHandle(
physicalFilePath, startPos, stateSize, scope, logicalFileId);
} else if (EMPTY_SEGMENT_FILE_HANDLE == type) {
return EmptySegmentFileStateHandle.INSTANCE;
} else {
Expand Down
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

/**
Expand Down Expand Up @@ -67,4 +68,15 @@ boolean canFastDuplicate(StreamStateHandle stateHandle, CheckpointedStateScope s
*/
List<StreamStateHandle> duplicate(
List<StreamStateHandle> stateHandles, CheckpointedStateScope scope) throws IOException;

/**
* A callback method when some previous handle is reused. It is needed by the file merging
* mechanism (FLIP-306) which will manage the life cycle of underlying files by file-reusing
* information.
*
* @param previousHandle the previous handles that will be reused.
*/
default void reusePreviousStateHandle(Collection<? extends StreamStateHandle> previousHandle) {
// Does nothing for normal stream factory
}
}
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.state.CheckpointedStateScope;

import java.io.IOException;
Expand All @@ -31,11 +32,19 @@ public class EmptySegmentFileStateHandle extends SegmentFileStateHandle {

public static final EmptySegmentFileStateHandle INSTANCE =
new EmptySegmentFileStateHandle(
new Path("empty"), 0, 0, CheckpointedStateScope.EXCLUSIVE);
new Path("empty"),
0,
0,
CheckpointedStateScope.EXCLUSIVE,
new LogicalFile.LogicalFileId("DUMMY"));

private EmptySegmentFileStateHandle(
Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) {
super(filePath, startPos, stateSize, scope);
Path filePath,
long startPos,
long stateSize,
CheckpointedStateScope scope,
LogicalFile.LogicalFileId fileId) {
super(filePath, startPos, stateSize, scope, fileId);
}

@Override
Expand Down
Expand Up @@ -52,20 +52,29 @@ public class SegmentFileStateHandle implements StreamStateHandle {
/** The scope of the state. */
private final CheckpointedStateScope scope;

/** The id for corresponding logical file. Used to retrieve LogicalFile in TM. */
private final LogicalFile.LogicalFileId logicalFileId;

/**
* Creates a new segment file state for the given file path.
*
* @param filePath The path to the file that stores the state.
* @param startPos Start position of the segment in the physical file.
* @param stateSize Size of the segment.
* @param scope The state's scope, whether it is exclusive or shared.
* @param fileId The corresponding logical file id.
*/
public SegmentFileStateHandle(
Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) {
Path filePath,
long startPos,
long stateSize,
CheckpointedStateScope scope,
LogicalFile.LogicalFileId fileId) {
this.filePath = filePath;
this.stateSize = stateSize;
this.startPos = startPos;
this.scope = scope;
this.logicalFileId = fileId;
}

/**
Expand Down Expand Up @@ -118,6 +127,10 @@ public CheckpointedStateScope getScope() {
return scope;
}

public LogicalFile.LogicalFileId getLogicalFileId() {
return logicalFileId;
}

/**
* Gets the file system that stores the file state.
*
Expand All @@ -142,15 +155,17 @@ public boolean equals(Object o) {

SegmentFileStateHandle that = (SegmentFileStateHandle) o;

return filePath.equals(that.filePath)
return logicalFileId.equals(that.logicalFileId)
&& filePath.equals(that.filePath)
&& startPos == that.startPos
&& stateSize == that.stateSize
&& scope.equals(that.scope);
}

@Override
public int hashCode() {
int result = getFilePath().hashCode();
int result = logicalFileId.hashCode();
result = 31 * result + Objects.hashCode(getFilePath());
result = 31 * result + Objects.hashCode(startPos);
result = 31 * result + Objects.hashCode(stateSize);
result = 31 * result + Objects.hashCode(scope);
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;

Expand Down Expand Up @@ -113,4 +114,9 @@ public FileMergingCheckpointStateOutputStream createCheckpointStateOutputStream(
return fileMergingSnapshotManager.createCheckpointStateOutputStream(
subtaskKey, checkpointId, scope);
}

@Override
public void reusePreviousStateHandle(Collection<? extends StreamStateHandle> previousHandle) {
fileMergingSnapshotManager.reusePreviousStateHandle(checkpointId, previousHandle);
}
}
Expand Up @@ -35,6 +35,7 @@
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
Expand Down Expand Up @@ -319,6 +320,67 @@ public void testConcurrentFileReusingWithBlockingPool() throws Exception {
}
}

@Test
public void testReuseCallbackAndAdvanceWatermark() throws Exception {
long checkpointId = 1;
int streamNum = 20;
int perStreamWriteNum = 128;

// write random bytes and then read them from the file
byte[] bytes = new byte[streamNum * perStreamWriteNum];
Random rd = new Random();
rd.nextBytes(bytes);
int byteIndex = 0;

SegmentFileStateHandle[] handles = new SegmentFileStateHandle[streamNum];
try (FileMergingSnapshotManager fmsm = createFileMergingSnapshotManager(checkpointBaseDir);
CloseableRegistry closeableRegistry = new CloseableRegistry()) {
fmsm.registerSubtaskForSharedStates(subtaskKey1);

// repeatedly get-write-close streams
for (int i = 0; i < streamNum; i++) {
FileMergingCheckpointStateOutputStream stream =
fmsm.createCheckpointStateOutputStream(
subtaskKey1, checkpointId, CheckpointedStateScope.SHARED);
try {
closeableRegistry.registerCloseable(stream);
for (int j = 0; j < perStreamWriteNum; j++) {
stream.write(bytes[byteIndex++]);
}
handles[i] = stream.closeAndGetHandle();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

// start reuse
for (long cp = checkpointId + 1; cp <= 10; cp++) {
ArrayList<SegmentFileStateHandle> reuse = new ArrayList<>();
for (int j = 0; j <= 10 - cp; j++) {
reuse.add(handles[j]);
}
fmsm.reusePreviousStateHandle(cp, reuse);
// assert the reusing affects the watermark
for (SegmentFileStateHandle handle : reuse) {
assertThat(
((FileMergingSnapshotManagerBase) fmsm)
.getLogicalFile(handle.getLogicalFileId())
.getLastUsedCheckpointID())
.isEqualTo(cp);
}
// subsumed
fmsm.notifyCheckpointSubsumed(subtaskKey1, cp - 1);
// assert the other files discarded.
for (int j = 10 - (int) cp + 1; j < streamNum; j++) {
assertThat(
((FileMergingSnapshotManagerBase) fmsm)
.getLogicalFile(handles[j].getLogicalFileId()))
.isNull();
}
}
}
}

FileMergingSnapshotManager createFileMergingSnapshotManager(Path checkpointBaseDir)
throws IOException {
return createFileMergingSnapshotManager(
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.checkpoint.MasterState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.filemerging.LogicalFile;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DiscardRecordedStateObject;
Expand Down Expand Up @@ -363,7 +364,12 @@ private static class TestingSegmentFileStateHandle extends SegmentFileStateHandl

public TestingSegmentFileStateHandle(
Path filePath, long startPos, long stateSize, CheckpointedStateScope scope) {
super(filePath, startPos, stateSize, scope);
super(
filePath,
startPos,
stateSize,
scope,
LogicalFile.LogicalFileId.generateRandomId());
}

@Override
Expand Down

0 comments on commit 31ea1a9

Please sign in to comment.