Skip to content

Commit

Permalink
[FLINK-22379][runtime] CheckpointCoordinator checks the state of all …
Browse files Browse the repository at this point in the history
…subtasks before triggering the checkpoint
  • Loading branch information
akalash committed Apr 26, 2021
1 parent 19ca330 commit 9e3e73c
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 24 deletions.
Expand Up @@ -530,7 +530,7 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
final long timestamp = System.currentTimeMillis();

CompletableFuture<CheckpointPlan> checkpointPlanFuture =
checkpointPlanCalculator.calculateCheckpointPlan();
checkpointPlanCalculator.calculateCheckpointPlan(unalignedCheckpointsEnabled);

final CompletableFuture<PendingCheckpoint> pendingCheckpointCompletableFuture =
checkpointPlanFuture
Expand Down
Expand Up @@ -30,6 +30,8 @@ public interface CheckpointPlanCalculator {
* Calculates the plan of the next checkpoint.
*
* @return The result plan.
* @param isUnalignedCheckpoint {@code true} if plan should be calculated for unaligned
* checkpoint.
*/
CompletableFuture<CheckpointPlan> calculateCheckpointPlan();
CompletableFuture<CheckpointPlan> calculateCheckpointPlan(boolean isUnalignedCheckpoint);
}
Expand Up @@ -92,7 +92,8 @@ public void setAllowCheckpointsAfterTasksFinished(boolean allowCheckpointsAfterT
}

@Override
public CompletableFuture<CheckpointPlan> calculateCheckpointPlan() {
public CompletableFuture<CheckpointPlan> calculateCheckpointPlan(
boolean isUnalignedCheckpoint) {
return CompletableFuture.supplyAsync(
() -> {
try {
Expand All @@ -111,7 +112,10 @@ public CompletableFuture<CheckpointPlan> calculateCheckpointPlan() {
? calculateAfterTasksFinished()
: calculateWithAllTasksRunning();

checkTasksStarted(result.getTasksToTrigger());
checkTasksStarted(
isUnalignedCheckpoint
? result.getTasksToWaitFor()
: result.getTasksToTrigger());

return result;
} catch (Throwable throwable) {
Expand Down
Expand Up @@ -100,6 +100,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder.disableTransit;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_DECLINED;
import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED;
Expand Down Expand Up @@ -345,7 +346,7 @@ public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(new JobVertexID())
.addJobVertex(new JobVertexID(), false)
.setTransitToRunning(false)
.setTransitToRunning(disableTransit)
.build();

CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(graph);
Expand Down Expand Up @@ -2243,7 +2244,7 @@ public void testPeriodicSchedulingWithInactiveTasks() {
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(jobVertexID1)
.setTransitToRunning(false)
.setTransitToRunning(disableTransit)
.build();

ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
Expand Down
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.executiongraph.ExecutionGraphCheckpointPlanCalculatorContext;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
Expand Down Expand Up @@ -81,6 +82,7 @@
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -531,14 +533,14 @@ public NotifiedCheckpoint getOnlyNotifiedAbortedCheckpoint(ExecutionAttemptID at
}

static class CheckpointExecutionGraphBuilder {
public static Predicate<ExecutionVertex> disableTransit = null;
private final List<JobVertex> sourceVertices = new ArrayList<>();
private final List<JobVertex> nonSourceVertices = new ArrayList<>();
private boolean transitToRunning;
private Predicate<ExecutionVertex> transitToRunning = (vertex) -> true;
private TaskManagerGateway taskManagerGateway;
private ComponentMainThreadExecutor mainThreadExecutor;

CheckpointExecutionGraphBuilder() {
this.transitToRunning = true;
this.mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forMainThread();
}

Expand Down Expand Up @@ -589,7 +591,8 @@ public CheckpointExecutionGraphBuilder setTaskManagerGateway(
return this;
}

public CheckpointExecutionGraphBuilder setTransitToRunning(boolean transitToRunning) {
public CheckpointExecutionGraphBuilder setTransitToRunning(
Predicate<ExecutionVertex> transitToRunning) {
this.transitToRunning = transitToRunning;
return this;
}
Expand Down Expand Up @@ -631,14 +634,17 @@ ExecutionGraph build() throws Exception {
});
}

if (transitToRunning) {
if (transitToRunning != null) {
executionGraph.transitionToRunning();
executionGraph
.getAllExecutionVertices()
.forEach(
task ->
task -> {
if (transitToRunning.test(task)) {
task.getCurrentExecutionAttempt()
.transitionState(ExecutionState.RUNNING));
.transitionState(ExecutionState.RUNNING);
}
});
}

return executionGraph;
Expand Down Expand Up @@ -805,7 +811,7 @@ public String deserialize(int version, byte[] serialized) throws IOException {

// ----------------- Mock class builders ---------------

public static final class MockOperatorCheckpointCoordinatorContextBuilder {
static final class MockOperatorCheckpointCoordinatorContextBuilder {
private BiConsumer<Long, CompletableFuture<byte[]>> onCallingCheckpointCoordinator = null;
private Consumer<Long> onCallingAfterSourceBarrierInjection = null;
private OperatorID operatorID = null;
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.function.FunctionWithException;

import org.hamcrest.CoreMatchers;
import org.junit.Test;
Expand All @@ -51,6 +52,7 @@
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

Expand Down Expand Up @@ -149,23 +151,114 @@ public void testComputeWithMultipleLevels() throws Exception {
}

@Test
public void testWithTriggeredTasksNotRunning() throws Exception {
public void testNotRunningOneOfSourcesTriggeredTasksNotRunning() throws Exception {
// given: Execution graph builder with one RUNNING source and NOT RUNNING source.
FunctionWithException<JobVertexID, ExecutionGraph, Exception> twoSourcesBuilder =
(id) ->
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(id)
.addJobVertex(new JobVertexID())
.setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id))
.build();

// when: Creating the checkpoint plan for aligned and unaligned checkpoints.
runWithNotRunTask(twoSourcesBuilder, true);
runWithNotRunTask(twoSourcesBuilder, false);

// then: The plan failed because one task didn't have RUNNING state.
}

@Test
public void testNotRunningSingleSourceTriggeredTasksNotRunning() throws Exception {
// given: Execution graph builder with one NOT RUNNING source and RUNNING not source task.
FunctionWithException<JobVertexID, ExecutionGraph, Exception> sourceAndNotSourceBuilder =
(id) ->
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(id)
.addJobVertex(new JobVertexID(), false)
.setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id))
.build();

// when: Creating the checkpoint plan for aligned and unaligned checkpoints.
runWithNotRunTask(sourceAndNotSourceBuilder, true);
runWithNotRunTask(sourceAndNotSourceBuilder, false);

// then: The plan failed because one task didn't have RUNNING state.
}

@Test
public void testNotRunningOneOfNotSourcesTriggeredTasksNotRunning() throws Exception {
// given: Execution graph builder with NOT RUNNING not source and RUNNING not source task.
FunctionWithException<JobVertexID, ExecutionGraph, Exception> twoNotSourceBuilder =
(id) ->
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(id, false)
.addJobVertex(new JobVertexID(), false)
.setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id))
.build();

// when: Creating the checkpoint plan for aligned and unaligned checkpoints.
runWithNotRunTask(twoNotSourceBuilder, true);
runWithNotRunTask(twoNotSourceBuilder, false);

// then: The plan failed because one task didn't have RUNNING state.
}

@Test
public void testNotRunningSingleNotSourceTriggeredTasksNotRunning() throws Exception {
// given: Execution graph builder with NOT RUNNING not source and RUNNING source tasks.
FunctionWithException<JobVertexID, ExecutionGraph, Exception> sourceAndNotSourceBuilder =
(id) ->
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(id, false)
.addJobVertex(new JobVertexID())
.setTransitToRunning(vertex -> !vertex.getJobvertexId().equals(id))
.build();

// when: Creating the checkpoint plan for unaligned checkpoints(it makes sense only for
// unaligned. for aligned see {@link #testNotRunningSingleNotSourceForAlignedCheckpoint}).
runWithNotRunTask(sourceAndNotSourceBuilder, true);

// then: The plan failed because one task didn't have RUNNING state.
}

@Test
public void testNotRunningSingleNotSourceForAlignedCheckpoint() throws Exception {
// given: Execution graph builder with NOT RUNNING not source and RUNNING source tasks.
for (ExecutionState state : EnumSet.complementOf(EnumSet.of(ExecutionState.RUNNING))) {
JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(jobVertexID)
.setTransitToRunning(false)
.addJobVertex(jobVertexID, false)
.addJobVertex(new JobVertexID())
.setTransitToRunning(
vertex -> !vertex.getJobvertexId().equals(jobVertexID))
.build();
graph.getJobVertex(jobVertexID)
.getTaskVertices()[0]
.getCurrentExecutionAttempt()
.transitionState(state);

transitVertexToState(graph, jobVertexID, state);

// when: Creating the checkpoint plan for aligned checkpoints.
DefaultCheckpointPlanCalculator checkpointPlanCalculator =
createCheckpointPlanCalculator(graph);

// then: The plan should be successfully created despite of one NOT RUNNING not source.
assertNotNull(checkpointPlanCalculator.calculateCheckpointPlan(false).get());
}
}

private void runWithNotRunTask(
FunctionWithException<JobVertexID, ExecutionGraph, Exception> graphBuilder,
boolean isUnalignedCheckpoint)
throws Exception {
for (ExecutionState state : EnumSet.complementOf(EnumSet.of(ExecutionState.RUNNING))) {
JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph = graphBuilder.apply(jobVertexID);
transitVertexToState(graph, jobVertexID, state);
DefaultCheckpointPlanCalculator checkpointPlanCalculator =
createCheckpointPlanCalculator(graph);

try {
checkpointPlanCalculator.calculateCheckpointPlan().get();
checkpointPlanCalculator.calculateCheckpointPlan(isUnalignedCheckpoint).get();
fail(
"The computation should fail since some tasks to trigger are in "
+ state
Expand All @@ -180,6 +273,16 @@ public void testWithTriggeredTasksNotRunning() throws Exception {
}
}

private void transitVertexToState(
ExecutionGraph graph, JobVertexID jobVertexID, ExecutionState state) {
Arrays.stream(graph.getJobVertex(jobVertexID).getTaskVertices())
.filter(vertex -> vertex.getJobvertexId().equals(jobVertexID))
.findFirst()
.get()
.getCurrentExecutionAttempt()
.transitionState(state);
}

// ------------------------- Utility methods ---------------------------------------

private void runSingleTest(
Expand Down Expand Up @@ -239,8 +342,9 @@ private void runSingleTest(
chooseTasks(
graph, expectedToTriggerTaskDeclarations.toArray(new TaskDeclaration[0]));

// Tests computing checkpoint plan
CheckpointPlan checkpointPlan = planCalculator.calculateCheckpointPlan().get();
// Tests computing checkpoint plan(isUnalignedCheckpoint flag doesn't influence on result
// because all tasks are in RUNNING state here).
CheckpointPlan checkpointPlan = planCalculator.calculateCheckpointPlan(true).get();
checkCheckpointPlan(
expectedToTriggerTasks,
expectedRunningTasks,
Expand Down
Expand Up @@ -35,6 +35,7 @@
import java.util.Collections;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder.disableTransit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
Expand All @@ -58,7 +59,7 @@ public void testAbortPendingCheckpointsWithTriggerValidation() throws Exception
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(new JobVertexID())
.setTransitToRunning(false)
.setTransitToRunning(disableTransit)
.build();
CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration =
new CheckpointCoordinatorConfiguration(
Expand Down

0 comments on commit 9e3e73c

Please sign in to comment.