Skip to content

Commit

Permalink
[FLINK-28474][checkpoint] Use maxConcurrentCheckpoints as maxRecordAb…
Browse files Browse the repository at this point in the history
…ortedCheckpoints
  • Loading branch information
1996fanrui committed Oct 6, 2022
1 parent c59e820 commit 1b6f5c7
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 29 deletions.
Expand Up @@ -519,6 +519,17 @@ public void setAlignedCheckpointTimeout(Duration alignedCheckpointTimeout) {
ExecutionCheckpointingOptions.ALIGNED_CHECKPOINT_TIMEOUT, alignedCheckpointTimeout);
}

public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
config.setInteger(
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS, maxConcurrentCheckpoints);
}

public int getMaxConcurrentCheckpoints() {
return config.getInteger(
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS,
ExecutionCheckpointingOptions.MAX_CONCURRENT_CHECKPOINTS.defaultValue());
}

/**
* Sets the job vertex level non-chained outputs. The given output list must have the same order
* with {@link JobVertex#getProducedDataSets()}.
Expand Down
Expand Up @@ -981,6 +981,7 @@ private void setVertexConfig(
config.setCheckpointMode(getCheckpointingMode(checkpointCfg));
config.setUnalignedCheckpointsEnabled(checkpointCfg.isUnalignedCheckpointsEnabled());
config.setAlignedCheckpointTimeout(checkpointCfg.getAlignedCheckpointTimeout());
config.setMaxConcurrentCheckpoints(checkpointCfg.getMaxConcurrentCheckpoints());

for (int i = 0; i < vertex.getStatePartitioners().length; i++) {
config.setStatePartitioner(i, vertex.getStatePartitioners()[i]);
Expand Down
Expand Up @@ -466,6 +466,7 @@ protected StreamTask(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH),
this::prepareInputSnapshot,
configuration.getMaxConcurrentCheckpoints(),
BarrierAlignmentUtil.createRegisterTimerCallback(
mainMailboxExecutor, systemTimerService));
resourceCloser.registerCloseable(subtaskCheckpointCoordinator::close);
Expand Down
Expand Up @@ -77,7 +77,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {

private static final Logger LOG =
LoggerFactory.getLogger(SubtaskCheckpointCoordinatorImpl.class);
private static final int DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS = 128;

private static final int CHECKPOINT_EXECUTION_DELAY_LOG_THRESHOLD_MS = 30_000;

Expand Down Expand Up @@ -125,34 +124,6 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator {
*/
private long alignmentCheckpointId;

SubtaskCheckpointCoordinatorImpl(
CheckpointStorageWorkerView checkpointStorage,
String taskName,
StreamTaskActionExecutor actionExecutor,
ExecutorService asyncOperationsThreadPool,
Environment env,
AsyncExceptionHandler asyncExceptionHandler,
boolean unalignedCheckpointEnabled,
boolean enableCheckpointAfterTasksFinished,
BiFunctionWithException<
ChannelStateWriter, Long, CompletableFuture<Void>, CheckpointException>
prepareInputSnapshot,
DelayableTimer registerTimer)
throws IOException {
this(
checkpointStorage,
taskName,
actionExecutor,
asyncOperationsThreadPool,
env,
asyncExceptionHandler,
unalignedCheckpointEnabled,
enableCheckpointAfterTasksFinished,
prepareInputSnapshot,
DEFAULT_MAX_RECORD_ABORTED_CHECKPOINTS,
registerTimer);
}

SubtaskCheckpointCoordinatorImpl(
CheckpointStorageWorkerView checkpointStorage,
String taskName,
Expand Down

0 comments on commit 1b6f5c7

Please sign in to comment.