Skip to content

Commit

Permalink
[FLINK-7960] [tests] Fix race conditions in ExecutionGraphRestartTest…
Browse files Browse the repository at this point in the history
…#completeCancellingForAllVertices

One race condition is between waitUntilJobStatus(eg, JobStatus.FAILING, 1000) and the
subsequent completeCancellingForAllVertices where not all execution are in state
CANCELLING.

The other race condition is between completeCancellingForAllVertices and the fixed
delay restart without a delay. The problem is that the 10th task could have failed.
In order to restart we would have to complete the cancel for the first 9 tasks. This
is enough for the restart strategy to restart the job. If this happens before
completeCancellingForAllVertices has also cancelled the execution of the 10th task,
it could happen that we cancel a fresh execution.

[hotfix] Make WaitForTasks using an AtomicInteger

[hotfix] Set optCancelCondition to Optional.empty() in SimpleAckingTaskManagerGateway

Add assertion message to ExecutionGraphTestUtils#switchToRunning

This closes #4933.
  • Loading branch information
tillrohrmann committed Nov 2, 2017
1 parent 1ac3e05 commit f9b475f
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 12 deletions.
Expand Up @@ -844,7 +844,7 @@ else if (current == CANCELING || current == RUNNING || current == DEPLOYING) {
// failing in the meantime may happen and is no problem.
// anything else is a serious problem !!!
if (current != FAILED) {
String message = String.format("Asynchronous race: Found state %s after successful cancel call.", state);
String message = String.format("Asynchronous race: Found %s in state %s after successful cancel call.", vertex.getTaskNameWithSubtaskIndex(), state);
LOG.error(message);
vertex.getExecutionGraph().failGlobal(new Exception(message));
}
Expand Down
Expand Up @@ -76,6 +76,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway;
Expand Down Expand Up @@ -589,15 +590,19 @@ public void testSuspendWhileRestarting() throws Exception {
public void testConcurrentLocalFailAndRestart() throws Exception {
final int parallelism = 10;
SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
final OneShotLatch restartLatch = new OneShotLatch();
final TriggeredRestartStrategy triggeredRestartStrategy = new TriggeredRestartStrategy(restartLatch);

final ExecutionGraph eg = createSimpleTestGraph(
new JobID(),
taskManagerGateway,
new FixedDelayRestartStrategy(10, 0L),
triggeredRestartStrategy,
createNoOpVertex(parallelism));

WaitForTasks waitForTasks = new WaitForTasks(parallelism);
WaitForTasks waitForTasksCancelled = new WaitForTasks(parallelism);
taskManagerGateway.setCondition(waitForTasks);
taskManagerGateway.setCancelCondition(waitForTasksCancelled);

eg.setScheduleMode(ScheduleMode.EAGER);
eg.scheduleForExecution();
Expand Down Expand Up @@ -648,8 +653,15 @@ public void run() {
WaitForTasks waitForTasksAfterRestart = new WaitForTasks(parallelism);
taskManagerGateway.setCondition(waitForTasksAfterRestart);

waitForTasksCancelled.getFuture().get(1000L, TimeUnit.MILLISECONDS);

completeCancellingForAllVertices(eg);

// block the restart until we have completed for all vertices the cancellation
// otherwise it might happen that the last vertex which failed will have a new
// execution set due to restart which is wrongly canceled
restartLatch.trigger();

waitUntilJobStatus(eg, JobStatus.RUNNING, 1000);

waitForTasksAfterRestart.getFuture().get(1000, TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -1048,11 +1060,12 @@ public static class WaitForTasks implements Consumer<ExecutionAttemptID> {

private final int tasksToWaitFor;
private final CompletableFuture<Boolean> allTasksReceived;
private int counter;
private final AtomicInteger counter;

public WaitForTasks(int tasksToWaitFor) {
this.tasksToWaitFor = tasksToWaitFor;
this.allTasksReceived = new CompletableFuture<>();
this.counter = new AtomicInteger();
}

public CompletableFuture<Boolean> getFuture() {
Expand All @@ -1061,9 +1074,7 @@ public CompletableFuture<Boolean> getFuture() {

@Override
public void accept(ExecutionAttemptID executionAttemptID) {
counter++;

if (counter >= tasksToWaitFor) {
if (counter.incrementAndGet() >= tasksToWaitFor) {
allTasksReceived.complete(true);
}
}
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

Expand All @@ -45,7 +46,7 @@
/**
* Validates that suspending out of various states works correctly.
*/
public class ExecutionGraphSuspendTest {
public class ExecutionGraphSuspendTest extends TestLogger {

/**
* Going into SUSPENDED out of CREATED should immediately cancel everything and
Expand Down
Expand Up @@ -202,7 +202,8 @@ public static void switchToRunning(ExecutionGraph eg) {
// check that all execution are in state DEPLOYING
for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
final Execution exec = ev.getCurrentExecutionAttempt();
assert(exec.getState() == ExecutionState.DEPLOYING);
final ExecutionState executionState = exec.getState();
assert executionState == ExecutionState.DEPLOYING : "Expected executionState to be DEPLOYING, was: " + executionState;
}

// switch executions to RUNNING
Expand Down
Expand Up @@ -23,14 +23,15 @@
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;


public class ExecutionGraphVariousFailuesTest {
public class ExecutionGraphVariousFailuesTest extends TestLogger {

/**
* Test that failing in state restarting will retrigger the restarting logic. This means that
Expand Down
Expand Up @@ -41,13 +41,14 @@
import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

import scala.concurrent.ExecutionContext;

@SuppressWarnings("serial")
public class ExecutionVertexCancelTest {
public class ExecutionVertexCancelTest extends TestLogger {

// --------------------------------------------------------------------------------------------
// Canceling in different states
Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

Expand All @@ -42,7 +43,7 @@
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

public class GlobalModVersionTest {
public class GlobalModVersionTest extends TestLogger {

/**
* Tests that failures during a global cancellation are not handed to the local
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;

import org.junit.Test;

Expand All @@ -53,7 +54,7 @@
* <p>This test must be in the package it resides in, because it uses package-private methods
* from the ExecutionGraph classes.
*/
public class PipelinedRegionFailoverConcurrencyTest {
public class PipelinedRegionFailoverConcurrencyTest extends TestLogger {

/**
* Tests that a cancellation concurrent to a local failover leads to a properly
Expand Down
Expand Up @@ -48,14 +48,21 @@ public class SimpleAckingTaskManagerGateway implements TaskManagerGateway {

private Optional<Consumer<ExecutionAttemptID>> optSubmitCondition;

private Optional<Consumer<ExecutionAttemptID>> optCancelCondition;

public SimpleAckingTaskManagerGateway() {
optSubmitCondition = Optional.empty();
optCancelCondition = Optional.empty();
}

public void setCondition(Consumer<ExecutionAttemptID> predicate) {
optSubmitCondition = Optional.of(predicate);
}

public void setCancelCondition(Consumer<ExecutionAttemptID> predicate) {
optCancelCondition = Optional.of(predicate);
}

@Override
public String getAddress() {
return address;
Expand Down Expand Up @@ -96,6 +103,7 @@ public CompletableFuture<Acknowledge> stopTask(ExecutionAttemptID executionAttem

@Override
public CompletableFuture<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, Time timeout) {
optCancelCondition.ifPresent(condition -> condition.accept(executionAttemptID));
return CompletableFuture.completedFuture(Acknowledge.get());
}

Expand Down

0 comments on commit f9b475f

Please sign in to comment.