Skip to content

Commit

Permalink
[FLINK-33984][runtime] Skip maintain subtaskGatewayMap when Checkpoin…
Browse files Browse the repository at this point in the history
…tCoordinator is null.
  • Loading branch information
JunRuiLee authored and zhuzhurk committed Apr 8, 2024
1 parent 191e6ce commit 3825565
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 7 deletions.
Expand Up @@ -110,9 +110,9 @@ public class OperatorCoordinatorHolder

/**
* A map that manages subtask gateways. It is used to control the opening/closing of each
* gateway during checkpoint. This map should only be read or modified when concurrent execution
* attempt is disabled. Note that concurrent execution attempt is currently guaranteed to be
* disabled when checkpoint is enabled.
* gateway during checkpoints. This map should only be read or modified in Streaming mode. Given
* that the CheckpointCoordinator is guaranteed to be non-null in Streaming mode, construction
* of this map can be skipped if the CheckpointCoordinator is null.
*/
private final Map<Integer, SubtaskGatewayImpl> subtaskGatewayMap;

Expand Down Expand Up @@ -437,9 +437,8 @@ private void setupSubtaskGateway(final SubtaskAccess sta) {
final SubtaskGatewayImpl gateway =
new SubtaskGatewayImpl(sta, mainThreadExecutor, unconfirmedEvents);

// When concurrent execution attempts is supported, the checkpoint must have been disabled.
// Thus, we don't need to maintain subtaskGatewayMap
if (!context.isConcurrentExecutionAttemptsSupported()) {
// We don't need to maintain subtaskGatewayMap when checkpoint coordinator is null.
if (context.getCheckpointCoordinator() != null) {
subtaskGatewayMap.put(gateway.getSubtask(), gateway);
}

Expand Down
Expand Up @@ -18,21 +18,36 @@

package org.apache.flink.runtime.operators.coordination;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculatorContext;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator;
import org.apache.flink.runtime.checkpoint.NoOpFailJobCall;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask;
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import org.apache.commons.collections.iterators.IteratorChain;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

Expand All @@ -42,6 +57,7 @@

import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -528,7 +544,37 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) {
new Configuration()),
UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());

holder.lazyInitialize(globalFailureHandler, mainThreadExecutor, null);
JobID jobId = new JobID();
holder.lazyInitialize(
globalFailureHandler,
mainThreadExecutor,
new CheckpointCoordinator(
jobId,
CheckpointCoordinatorConfiguration.builder().build(),
Collections.emptyList(),
new StandaloneCheckpointIDCounter(),
new StandaloneCompletedCheckpointStore(10),
new MemoryStateBackend(),
mainThreadExecutor,
new CheckpointsCleaner(),
mainThreadExecutor,
new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE),
new DefaultCheckpointPlanCalculator(
jobId,
new CheckpointPlanCalculatorContext() {
@Override
public ScheduledExecutor getMainExecutor() {
return null;
}

@Override
public boolean hasFinishedTasks() {
return false;
}
},
IteratorChain::new,
false),
new CheckpointStatsTracker(1, new UnregisteredMetricsGroup(), jobId)));
holder.start();

return holder;
Expand Down

0 comments on commit 3825565

Please sign in to comment.