Skip to content

Commit

Permalink
[FLINK-4932] [distributed coordination] Failing in state RESTARTING o…
Browse files Browse the repository at this point in the history
…nly fails terminally if no more restarts are possible

If in state RESTARTING a failure occurs, then a new restart attempt is started. Only if the
restart strategy no longer allows further restarts or if the thrown exception is of type
SuppressRestartsException a job can go from RESTARTING into FAILED.

Fix failing test cases: ExecutionGraphMetricsTest and ExecutionGraphRestartTest

This closes apache#2711
  • Loading branch information
tillrohrmann authored and StephanEwen committed Oct 31, 2016
1 parent 4b86701 commit ac82e3d
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 123 deletions.
263 changes: 173 additions & 90 deletions docs/internals/fig/job_status.svg
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Expand Up @@ -869,14 +869,13 @@ public void fail(Throwable t) {
current == JobStatus.SUSPENDED ||
current.isGloballyTerminalState()) {
return;
} else if (current == JobStatus.RESTARTING && transitionState(current, JobStatus.FAILED, t)) {
synchronized (progressLock) {
postRunCleanup();
progressLock.notifyAll();
} else if (current == JobStatus.RESTARTING) {
this.failureCause = t;

LOG.info("Job {} failed during restart.", getJobID());
if (tryRestartOrFail()) {
return;
}
// concurrent job status change, let's check again
} else if (transitionState(current, JobStatus.FAILING, t)) {
this.failureCause = t;

Expand Down Expand Up @@ -963,6 +962,7 @@ public void restart() {
scheduleForExecution(scheduler);
}
catch (Throwable t) {
LOG.warn("Failed to restart the job.", t);
fail(t);
}
}
Expand Down Expand Up @@ -1123,15 +1123,10 @@ else if (current == JobStatus.CANCELLING) {
}
}
else if (current == JobStatus.FAILING) {
boolean allowRestart = !(failureCause instanceof SuppressRestartsException);

if (allowRestart && restartStrategy.canRestart() && transitionState(current, JobStatus.RESTARTING)) {
restartStrategy.restart(this);
break;
} else if ((!allowRestart || !restartStrategy.canRestart()) && transitionState(current, JobStatus.FAILED, failureCause)) {
postRunCleanup();
if (tryRestartOrFail()) {
break;
}
// concurrent job status change, let's check again
}
else if (current == JobStatus.SUSPENDED) {
// we've already cleaned up when entering the SUSPENDED state
Expand All @@ -1155,6 +1150,47 @@ else if (current.isGloballyTerminalState()) {
}
}

/**
* Try to restart the job. If we cannot restart the job (e.g. no more restarts allowed), then
* try to fail the job. This operation is only permitted if the current state is FAILING or
* RESTARTING.
*
* @return true if the operation could be executed; false if a concurrent job status change occurred
*/
private boolean tryRestartOrFail() {
JobStatus currentState = state;

if (currentState == JobStatus.FAILING || currentState == JobStatus.RESTARTING) {
synchronized (progressLock) {
if (LOG.isDebugEnabled()) {
LOG.debug("Try to restart the job or fail it if no longer possible.", failureCause);
} else {
LOG.info("Try to restart the job or fail it if no longer possible.");
}

boolean isRestartable = !(failureCause instanceof SuppressRestartsException) && restartStrategy.canRestart();

if (isRestartable && transitionState(currentState, JobStatus.RESTARTING)) {
LOG.info("Restarting the job...");
restartStrategy.restart(this);

return true;
} else if (!isRestartable && transitionState(currentState, JobStatus.FAILED, failureCause)) {
LOG.info("Could not restart the job.", failureCause);
postRunCleanup();

return true;
} else {
// we must have changed the state concurrently, thus we cannot complete this operation
return false;
}
}
} else {
// this operation is only allowed in the state FAILING or RESTARTING
return false;
}
}

private void postRunCleanup() {
try {
CheckpointCoordinator coord = this.checkpointCoordinator;
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
Expand Down Expand Up @@ -257,7 +258,8 @@ public void testExecutionGraphRestartTimeMetric() throws JobException, IOExcepti
assertTrue(previousRestartingTime > 0);

// now lets fail the job while it is in restarting and see whether the restarting time then stops to increase
executionGraph.fail(new Exception());
// for this to work, we have to use a SuppressRestartException
executionGraph.fail(new SuppressRestartsException(new Exception()));

assertEquals(JobStatus.FAILED, executionGraph.getState());

Expand Down
Expand Up @@ -263,9 +263,14 @@ public void testFailWhileRestarting() throws Exception {

assertEquals(JobStatus.RESTARTING, executionGraph.getState());

// Canceling needs to abort the restart
// The restarting should not fail with an ordinary exception
executionGraph.fail(new Exception("Test exception"));

assertEquals(JobStatus.RESTARTING, executionGraph.getState());

// but it should fail when sending a SuppressRestartsException
executionGraph.fail(new SuppressRestartsException(new Exception("Test exception")));

assertEquals(JobStatus.FAILED, executionGraph.getState());

// The restart has been aborted
Expand Down
Expand Up @@ -27,14 +27,15 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.restart.InfiniteDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;

import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -43,6 +44,8 @@
import org.powermock.api.mockito.PowerMockito;

import scala.concurrent.duration.FiniteDuration;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.same;
Expand Down Expand Up @@ -147,7 +150,7 @@ private void mockNumberOfInputs(int nodeIndex, int predecessorIndex) {

@Test
public void testCancel() throws Exception {
Assert.assertEquals(JobStatus.CREATED, eg.getState());
assertEquals(JobStatus.CREATED, eg.getState());
eg.cancel();

verifyCancel(1);
Expand All @@ -156,42 +159,42 @@ public void testCancel() throws Exception {
eg.cancel();

verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
assertEquals(JobStatus.CANCELLING, eg.getState());

eg.cancel();

verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
assertEquals(JobStatus.CANCELLING, eg.getState());

f.set(eg, JobStatus.CANCELED);
eg.cancel();

verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELED, eg.getState());
assertEquals(JobStatus.CANCELED, eg.getState());

f.set(eg, JobStatus.FAILED);
eg.cancel();

verifyCancel(2);
Assert.assertEquals(JobStatus.FAILED, eg.getState());
assertEquals(JobStatus.FAILED, eg.getState());

f.set(eg, JobStatus.FAILING);
eg.cancel();

verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELLING, eg.getState());
assertEquals(JobStatus.CANCELLING, eg.getState());

f.set(eg, JobStatus.FINISHED);
eg.cancel();

verifyCancel(2);
Assert.assertEquals(JobStatus.FINISHED, eg.getState());
assertEquals(JobStatus.FINISHED, eg.getState());

f.set(eg, JobStatus.RESTARTING);
eg.cancel();

verifyCancel(2);
Assert.assertEquals(JobStatus.CANCELED, eg.getState());
assertEquals(JobStatus.CANCELED, eg.getState());
}

private void verifyCancel(int times) {
Expand All @@ -206,65 +209,65 @@ private void verifyCancel(int times) {
*/
@Test
public void testSuspend() throws Exception {
Assert.assertEquals(JobStatus.CREATED, eg.getState());
assertEquals(JobStatus.CREATED, eg.getState());
Exception testException = new Exception("Test exception");

eg.suspend(testException);

verifyCancel(1);
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
assertEquals(JobStatus.SUSPENDED, eg.getState());

f.set(eg, JobStatus.RUNNING);

eg.suspend(testException);

verifyCancel(2);
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
assertEquals(JobStatus.SUSPENDED, eg.getState());

f.set(eg, JobStatus.FAILING);

eg.suspend(testException);

verifyCancel(3);
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
assertEquals(JobStatus.SUSPENDED, eg.getState());

f.set(eg, JobStatus.CANCELLING);

eg.suspend(testException);

verifyCancel(4);
Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
assertEquals(JobStatus.SUSPENDED, eg.getState());

f.set(eg, JobStatus.FAILED);

eg.suspend(testException);

verifyCancel(4);
Assert.assertEquals(JobStatus.FAILED, eg.getState());
assertEquals(JobStatus.FAILED, eg.getState());

f.set(eg, JobStatus.FINISHED);

eg.suspend(testException);

verifyCancel(4);
Assert.assertEquals(JobStatus.FINISHED, eg.getState());
assertEquals(JobStatus.FINISHED, eg.getState());

f.set(eg, JobStatus.CANCELED);

eg.suspend(testException);

verifyCancel(4);
Assert.assertEquals(JobStatus.CANCELED, eg.getState());
assertEquals(JobStatus.CANCELED, eg.getState());

f.set(eg, JobStatus.SUSPENDED);

eg.fail(testException);

Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
assertEquals(JobStatus.SUSPENDED, eg.getState());

eg.cancel();

Assert.assertEquals(JobStatus.SUSPENDED, eg.getState());
assertEquals(JobStatus.SUSPENDED, eg.getState());
}

// test that all source tasks receive STOP signal
Expand All @@ -290,6 +293,71 @@ public void testStop() throws Exception {
}
}

/**
* Test that failing in state restarting will retrigger the restarting logic. This means that
* it only goes into the state FAILED after the restart strategy says the job is no longer
* restartable.
*/
@Test
public void testFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException, InterruptedException {
Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
restartStrategyField.setAccessible(true);

restartStrategyField.set(eg, new InfiniteDelayRestartStrategy(1));

f.set(eg, JobStatus.RESTARTING);

eg.fail(new Exception("Test"));

// we should restart since we have one restart attempt left
assertEquals(JobStatus.RESTARTING, eg.getState());

eg.fail(new Exception("Test"));

// after depleting all our restart attempts we should go into Failed
assertEquals(JobStatus.FAILED, eg.getState());
}

/**
* Tests that a {@link SuppressRestartsException} in state RESTARTING stops the restarting
* immediately and sets the execution graph's state to FAILED.
*/
@Test
public void testSuppressRestartFailureWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
restartStrategyField.setAccessible(true);

restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());

f.set(eg, JobStatus.RESTARTING);

// suppress a possible restart
eg.fail(new SuppressRestartsException(new Exception("Test")));

assertEquals(JobStatus.FAILED, eg.getState());
}

/**
* Tests that we can suspend a job when in state RESTARTING.
*/
@Test
public void testSuspendWhileRestarting() throws IllegalAccessException, NoSuchFieldException {
Field restartStrategyField = eg.getClass().getDeclaredField("restartStrategy");
restartStrategyField.setAccessible(true);

restartStrategyField.set(eg, new InfiniteDelayRestartStrategy());

f.set(eg, JobStatus.RESTARTING);

final Exception exception = new Exception("Suspended");

eg.suspend(exception);

assertEquals(JobStatus.SUSPENDED, eg.getState());

assertEquals(exception, eg.getFailureCause());
}

// STOP only supported if all sources are stoppable
@Test(expected = StoppingException.class)
public void testStopBatching() throws StoppingException {
Expand Down

0 comments on commit ac82e3d

Please sign in to comment.