From 9e3e73c822429f4d103abf140ed6af6aeb837197 Mon Sep 17 00:00:00 2001 From: Anton Kalashnikov Date: Thu, 22 Apr 2021 17:22:27 +0200 Subject: [PATCH] [FLINK-22379][runtime] CheckpointCoordinator checks the state of all subtasks before triggering the checkpoint --- .../checkpoint/CheckpointCoordinator.java | 2 +- .../checkpoint/CheckpointPlanCalculator.java | 4 +- .../DefaultCheckpointPlanCalculator.java | 8 +- .../checkpoint/CheckpointCoordinatorTest.java | 5 +- .../CheckpointCoordinatorTestingUtils.java | 20 ++- .../DefaultCheckpointPlanCalculatorTest.java | 124 ++++++++++++++++-- ...overStrategyCheckpointCoordinatorTest.java | 3 +- 7 files changed, 142 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index db1509e30fdf10..602a0c1e20e61a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -530,7 +530,7 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) { final long timestamp = System.currentTimeMillis(); CompletableFuture checkpointPlanFuture = - checkpointPlanCalculator.calculateCheckpointPlan(); + checkpointPlanCalculator.calculateCheckpointPlan(unalignedCheckpointsEnabled); final CompletableFuture pendingCheckpointCompletableFuture = checkpointPlanFuture diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointPlanCalculator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointPlanCalculator.java index b49581f0aa31c3..bd7f5dc1bc6314 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointPlanCalculator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointPlanCalculator.java @@ -30,6 +30,8 @@ public interface CheckpointPlanCalculator { * Calculates the plan of the next checkpoint. * * @return The result plan. + * @param isUnalignedCheckpoint {@code true} if plan should be calculated for unaligned + * checkpoint. */ - CompletableFuture calculateCheckpointPlan(); + CompletableFuture calculateCheckpointPlan(boolean isUnalignedCheckpoint); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java index c81a5dde859a0b..0643f14699aefb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculator.java @@ -92,7 +92,8 @@ public void setAllowCheckpointsAfterTasksFinished(boolean allowCheckpointsAfterT } @Override - public CompletableFuture calculateCheckpointPlan() { + public CompletableFuture calculateCheckpointPlan( + boolean isUnalignedCheckpoint) { return CompletableFuture.supplyAsync( () -> { try { @@ -111,7 +112,10 @@ public CompletableFuture calculateCheckpointPlan() { ? calculateAfterTasksFinished() : calculateWithAllTasksRunning(); - checkTasksStarted(result.getTasksToTrigger()); + checkTasksStarted( + isUnalignedCheckpoint + ? result.getTasksToWaitFor() + : result.getTasksToTrigger()); return result; } catch (Throwable throwable) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 44feb9c55f3aa7..a89fd08d7c528c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -100,6 +100,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder.disableTransit; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED; import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED; @@ -345,7 +346,7 @@ public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() { new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() .addJobVertex(new JobVertexID()) .addJobVertex(new JobVertexID(), false) - .setTransitToRunning(false) + .setTransitToRunning(disableTransit) .build(); CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph); @@ -2243,7 +2244,7 @@ public void testPeriodicSchedulingWithInactiveTasks() { ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() .addJobVertex(jobVertexID1) - .setTransitToRunning(false) + .setTransitToRunning(disableTransit) .build(); ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0]; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java index 479717d1632869..8b9ccc2fd36037 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java @@ -34,6 +34,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext; import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -81,6 +82,7 @@ import java.util.concurrent.Executor; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Predicate; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -531,14 +533,14 @@ public NotifiedCheckpoint getOnlyNotifiedAbortedCheckpoint(ExecutionAttemptID at } static class CheckpointExecutionGraphBuilder { + public static Predicate disableTransit = null; private final List sourceVertices = new ArrayList<>(); private final List nonSourceVertices = new ArrayList<>(); - private boolean transitToRunning; + private Predicate transitToRunning = (vertex) -> true; private TaskManagerGateway taskManagerGateway; private ComponentMainThreadExecutor mainThreadExecutor; CheckpointExecutionGraphBuilder() { - this.transitToRunning = true; this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread(); } @@ -589,7 +591,8 @@ public CheckpointExecutionGraphBuilder setTaskManagerGateway( return this; } - public CheckpointExecutionGraphBuilder setTransitToRunning(boolean transitToRunning) { + public CheckpointExecutionGraphBuilder setTransitToRunning( + Predicate transitToRunning) { this.transitToRunning = transitToRunning; return this; } @@ -631,14 +634,17 @@ ExecutionGraph build() throws Exception { }); } - if (transitToRunning) { + if (transitToRunning != null) { executionGraph.transitionToRunning(); executionGraph .getAllExecutionVertices() .forEach( - task -> + task -> { + if (transitToRunning.test(task)) { task.getCurrentExecutionAttempt() - .transitionState(ExecutionState.RUNNING)); + .transitionState(ExecutionState.RUNNING); + } + }); } return executionGraph; @@ -805,7 +811,7 @@ public String deserialize(int version, byte[] serialized) throws IOException { // ----------------- Mock class builders --------------- - public static final class MockOperatorCheckpointCoordinatorContextBuilder { + static final class MockOperatorCheckpointCoordinatorContextBuilder { private BiConsumer> onCallingCheckpointCoordinator = null; private Consumer onCallingAfterSourceBarrierInjection = null; private OperatorID operatorID = null; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java index 492988f87b287a..f4facbb2e09e44 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCheckpointPlanCalculatorTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.function.FunctionWithException; import org.hamcrest.CoreMatchers; import org.junit.Test; @@ -51,6 +52,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.instanceOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -149,23 +151,114 @@ public void testComputeWithMultipleLevels() throws Exception { } @Test - public void testWithTriggeredTasksNotRunning() throws Exception { + public void testNotRunningOneOfSourcesTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with one RUNNING source and NOT RUNNING source. + FunctionWithException twoSourcesBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id) + .addJobVertex(new JobVertexID()) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan for aligned and unaligned checkpoints. + runWithNotRunTask(twoSourcesBuilder, true); + runWithNotRunTask(twoSourcesBuilder, false); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningSingleSourceTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with one NOT RUNNING source and RUNNING not source task. + FunctionWithException sourceAndNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id) + .addJobVertex(new JobVertexID(), false) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan for aligned and unaligned checkpoints. + runWithNotRunTask(sourceAndNotSourceBuilder, true); + runWithNotRunTask(sourceAndNotSourceBuilder, false); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningOneOfNotSourcesTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with NOT RUNNING not source and RUNNING not source task. + FunctionWithException twoNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id, false) + .addJobVertex(new JobVertexID(), false) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan for aligned and unaligned checkpoints. + runWithNotRunTask(twoNotSourceBuilder, true); + runWithNotRunTask(twoNotSourceBuilder, false); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningSingleNotSourceTriggeredTasksNotRunning() throws Exception { + // given: Execution graph builder with NOT RUNNING not source and RUNNING source tasks. + FunctionWithException sourceAndNotSourceBuilder = + (id) -> + new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() + .addJobVertex(id, false) + .addJobVertex(new JobVertexID()) + .setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id)) + .build(); + + // when: Creating the checkpoint plan for unaligned checkpoints(it makes sense only for + // unaligned. for aligned see {@link #testNotRunningSingleNotSourceForAlignedCheckpoint}). + runWithNotRunTask(sourceAndNotSourceBuilder, true); + + // then: The plan failed because one task didn't have RUNNING state. + } + + @Test + public void testNotRunningSingleNotSourceForAlignedCheckpoint() throws Exception { + // given: Execution graph builder with NOT RUNNING not source and RUNNING source tasks. for (ExecutionState state : EnumSet.complementOf(EnumSet.of(ExecutionState.RUNNING))) { JobVertexID jobVertexID = new JobVertexID(); ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() - .addJobVertex(jobVertexID) - .setTransitToRunning(false) + .addJobVertex(jobVertexID, false) + .addJobVertex(new JobVertexID()) + .setTransitToRunning( + vertex -> !vertex.getJobvertexId().equals(jobVertexID)) .build(); - graph.getJobVertex(jobVertexID) - .getTaskVertices()[0] - .getCurrentExecutionAttempt() - .transitionState(state); + + transitVertexToState(graph, jobVertexID, state); + + // when: Creating the checkpoint plan for aligned checkpoints. + DefaultCheckpointPlanCalculator checkpointPlanCalculator = + createCheckpointPlanCalculator(graph); + + // then: The plan should be successfully created despite of one NOT RUNNING not source. + assertNotNull(checkpointPlanCalculator.calculateCheckpointPlan(false).get()); + } + } + + private void runWithNotRunTask( + FunctionWithException graphBuilder, + boolean isUnalignedCheckpoint) + throws Exception { + for (ExecutionState state : EnumSet.complementOf(EnumSet.of(ExecutionState.RUNNING))) { + JobVertexID jobVertexID = new JobVertexID(); + ExecutionGraph graph = graphBuilder.apply(jobVertexID); + transitVertexToState(graph, jobVertexID, state); DefaultCheckpointPlanCalculator checkpointPlanCalculator = createCheckpointPlanCalculator(graph); try { - checkpointPlanCalculator.calculateCheckpointPlan().get(); + checkpointPlanCalculator.calculateCheckpointPlan(isUnalignedCheckpoint).get(); fail( "The computation should fail since some tasks to trigger are in " + state @@ -180,6 +273,16 @@ public void testWithTriggeredTasksNotRunning() throws Exception { } } + private void transitVertexToState( + ExecutionGraph graph, JobVertexID jobVertexID, ExecutionState state) { + Arrays.stream(graph.getJobVertex(jobVertexID).getTaskVertices()) + .filter(vertex -> vertex.getJobvertexId().equals(jobVertexID)) + .findFirst() + .get() + .getCurrentExecutionAttempt() + .transitionState(state); + } + // ------------------------- Utility methods --------------------------------------- private void runSingleTest( @@ -239,8 +342,9 @@ private void runSingleTest( chooseTasks( graph, expectedToTriggerTaskDeclarations.toArray(new TaskDeclaration[0])); - // Tests computing checkpoint plan - CheckpointPlan checkpointPlan = planCalculator.calculateCheckpointPlan().get(); + // Tests computing checkpoint plan(isUnalignedCheckpoint flag doesn't influence on result + // because all tasks are in RUNNING state here). + CheckpointPlan checkpointPlan = planCalculator.calculateCheckpointPlan(true).get(); checkCheckpointPlan( expectedToTriggerTasks, expectedRunningTasks, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java index 9396ef06fc091d..06ad53040153ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder.disableTransit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -58,7 +59,7 @@ public void testAbortPendingCheckpointsWithTriggerValidation() throws Exception ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() .addJobVertex(new JobVertexID()) - .setTransitToRunning(false) + .setTransitToRunning(disableTransit) .build(); CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(