diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java index f3253f147db..8aff60fb766 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java @@ -110,9 +110,9 @@ public class OperatorCoordinatorHolder /** * A map that manages subtask gateways. It is used to control the opening/closing of each - * gateway during checkpoint. This map should only be read or modified when concurrent execution - * attempt is disabled. Note that concurrent execution attempt is currently guaranteed to be - * disabled when checkpoint is enabled. + * gateway during checkpoints. This map should only be read or modified in Streaming mode. Given + * that the CheckpointCoordinator is guaranteed to be non-null in Streaming mode, construction + * of this map can be skipped if the CheckpointCoordinator is null. */ private final Map subtaskGatewayMap; @@ -437,9 +437,8 @@ private void setupSubtaskGateway(final SubtaskAccess sta) { final SubtaskGatewayImpl gateway = new SubtaskGatewayImpl(sta, mainThreadExecutor, unconfirmedEvents); - // When concurrent execution attempts is supported, the checkpoint must have been disabled. - // Thus, we don't need to maintain subtaskGatewayMap - if (!context.isConcurrentExecutionAttemptsSupported()) { + // We don't need to maintain subtaskGatewayMap when checkpoint coordinator is null. + if (context.getCheckpointCoordinator() != null) { subtaskGatewayMap.put(gateway.getSubtask(), gateway); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index 95841dcb634..278214f8901 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -18,21 +18,36 @@ package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; +import org.apache.flink.runtime.checkpoint.CheckpointFailureManager; +import org.apache.flink.runtime.checkpoint.CheckpointPlanCalculatorContext; +import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker; +import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; +import org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator; +import org.apache.flink.runtime.checkpoint.NoOpFailJobCall; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; +import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.runtime.operators.coordination.EventReceivingTasks.EventWithSubtask; import org.apache.flink.runtime.scheduler.GlobalFailureHandler; +import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.concurrent.ScheduledExecutor; +import org.apache.commons.collections.iterators.IteratorChain; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; @@ -42,6 +57,7 @@ import java.nio.ByteBuffer; import java.util.ArrayDeque; +import java.util.Collections; import java.util.Queue; import java.util.Random; import java.util.concurrent.CompletableFuture; @@ -528,7 +544,37 @@ public OperatorCoordinator create(OperatorCoordinator.Context context) { new Configuration()), UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup()); - holder.lazyInitialize(globalFailureHandler, mainThreadExecutor, null); + JobID jobId = new JobID(); + holder.lazyInitialize( + globalFailureHandler, + mainThreadExecutor, + new CheckpointCoordinator( + jobId, + CheckpointCoordinatorConfiguration.builder().build(), + Collections.emptyList(), + new StandaloneCheckpointIDCounter(), + new StandaloneCompletedCheckpointStore(10), + new MemoryStateBackend(), + mainThreadExecutor, + new CheckpointsCleaner(), + mainThreadExecutor, + new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE), + new DefaultCheckpointPlanCalculator( + jobId, + new CheckpointPlanCalculatorContext() { + @Override + public ScheduledExecutor getMainExecutor() { + return null; + } + + @Override + public boolean hasFinishedTasks() { + return false; + } + }, + IteratorChain::new, + false), + new CheckpointStatsTracker(1, new UnregisteredMetricsGroup(), jobId))); holder.start(); return holder;