Skip to content

Commit

Permalink
[FLINK-3390] [runtime, tests] Restore savepoint path on ExecutionGrap…
Browse files Browse the repository at this point in the history
…h restart

Temporary work around to restore initial state on failure during recovery as
required by a user. Will be superseded by FLINK-3397 with better handling of
checkpoint and savepoint restoring.

A failure during recovery resulted in restarting a job without its savepoint
state. This temporary work around makes sure that if the savepoint coordinator
ever restored a savepoint and there was no checkpoint after the savepoint,
the savepoint state will be restored again.

This closes #1720.
  • Loading branch information
uce authored and tillrohrmann committed Feb 26, 2016
1 parent 016644a commit c2a43c9
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 12 deletions.
Expand Up @@ -741,7 +741,7 @@ private void triggerQueuedRequests() throws Exception {
// Checkpoint State Restoring
// --------------------------------------------------------------------------------------------

public void restoreLatestCheckpointedState(
public boolean restoreLatestCheckpointedState(
Map<JobVertexID, ExecutionJobVertex> tasks,
boolean errorIfNoCheckpoint,
boolean allOrNothingState) throws Exception {
Expand All @@ -761,7 +761,7 @@ public void restoreLatestCheckpointedState(
if (errorIfNoCheckpoint) {
throw new IllegalStateException("No completed checkpoint available");
} else {
return;
return false;
}
}

Expand Down Expand Up @@ -799,6 +799,8 @@ public void restoreLatestCheckpointedState(
exec.setInitialState(state.getState(), recoveryTimestamp);
}
}

return true;
}
}

Expand Down
Expand Up @@ -72,6 +72,10 @@ public class SavepointCoordinator extends CheckpointCoordinator {
/** Mapping from checkpoint ID to promises for savepoints. */
private final Map<Long, Promise<String>> savepointPromises;

// TODO(uce) Temporary work around to restore initial state on
// failure during recovery. Will be superseded by FLINK-3397.
private volatile String savepointRestorePath;

public SavepointCoordinator(
JobID jobId,
long baseInterval,
Expand Down Expand Up @@ -102,6 +106,10 @@ public SavepointCoordinator(
this.savepointPromises = new ConcurrentHashMap<>();
}

public String getSavepointRestorePath() {
return savepointRestorePath;
}

// ------------------------------------------------------------------------
// Savepoint trigger and reset
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -221,6 +229,10 @@ public void restoreSavepoint(
checkpointIdCounter.start();
checkpointIdCounter.setCount(nextCheckpointId + 1);
LOG.info("Reset the checkpoint ID to {}", nextCheckpointId);

if (savepointRestorePath == null) {
savepointRestorePath = savepointPath;
}
}
}

Expand Down
Expand Up @@ -872,7 +872,17 @@ else if (current != JobStatus.RESTARTING) {

// if we have checkpointed state, reload it into the executions
if (checkpointCoordinator != null) {
checkpointCoordinator.restoreLatestCheckpointedState(getAllVertices(), false, false);
boolean restored = checkpointCoordinator
.restoreLatestCheckpointedState(getAllVertices(), false, false);

// TODO(uce) Temporary work around to restore initial state on
// failure during recovery. Will be superseded by FLINK-3397.
if (!restored && savepointCoordinator != null) {
String savepointPath = savepointCoordinator.getSavepointRestorePath();
if (savepointPath != null) {
savepointCoordinator.restoreSavepoint(getAllVertices(), savepointPath);
}
}
}
}

Expand Down
Expand Up @@ -56,7 +56,7 @@
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList;
Expand All @@ -71,6 +71,7 @@
import scala.concurrent.duration.FiniteDuration;

import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -736,6 +737,113 @@ public void testSubmitWithUnknownSavepointPath() throws Exception {
}
}

/**
* Tests that a restore failure is retried with the savepoint state.
*/
@Test
public void testRestoreFailure() throws Exception {
// Config
int numTaskManagers = 1;
int numSlotsPerTaskManager = 1;
int numExecutionRetries = 2;
int retryDelay = 500;
int checkpointingInterval = 100000000;

// Test deadline
final Deadline deadline = new FiniteDuration(3, TimeUnit.MINUTES).fromNow();

ForkableFlinkMiniCluster flink = null;

try {
// The job
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(checkpointingInterval);
env.setNumberOfExecutionRetries(numExecutionRetries);
env.getConfig().setExecutionRetryDelay(retryDelay);

DataStream<Integer> stream = env
.addSource(new RestoreStateCountingAndFailingSource());

// Source configuration
RestoreStateCountingAndFailingSource.failOnRestoreStateCall = false;
RestoreStateCountingAndFailingSource.numRestoreStateCalls = 0;
RestoreStateCountingAndFailingSource.checkpointCompleteLatch = new CountDownLatch(1);
RestoreStateCountingAndFailingSource.emitted= 0;

stream.addSink(new DiscardingSink<Integer>());

JobGraph jobGraph = env.getStreamGraph().getJobGraph();

// Flink configuration
final Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, numSlotsPerTaskManager);
LOG.info("Flink configuration: " + config + ".");

// Start Flink
flink = new ForkableFlinkMiniCluster(config);
LOG.info("Starting Flink cluster.");
flink.start();

// Retrieve the job manager
LOG.info("Retrieving JobManager.");
ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
LOG.info("JobManager: " + jobManager + ".");

// Submit the job and wait for some checkpoints to complete
flink.submitJobDetached(jobGraph);

while (deadline.hasTimeLeft() && RestoreStateCountingAndFailingSource.emitted < 100) {
Thread.sleep(100);
}

assertTrue("No progress", RestoreStateCountingAndFailingSource.emitted >= 100);

// Trigger the savepoint
Future<Object> savepointPathFuture = jobManager.ask(
new TriggerSavepoint(jobGraph.getJobID()), deadline.timeLeft());

final String savepointPath = ((TriggerSavepointSuccess) Await
.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
LOG.info("Retrieved savepoint path: " + savepointPath + ".");

// Completed checkpoint
RestoreStateCountingAndFailingSource.checkpointCompleteLatch.await();

// Cancel the job
Future<?> cancelFuture = jobManager.ask(new CancelJob(
jobGraph.getJobID()), deadline.timeLeft());
Await.ready(cancelFuture, deadline.timeLeft());

// Wait for the job to be removed
Future<?> removedFuture = jobManager.ask(new NotifyWhenJobRemoved(
jobGraph.getJobID()), deadline.timeLeft());
Await.ready(removedFuture, deadline.timeLeft());

// Set source to fail on restore calls and try to recover from savepoint
RestoreStateCountingAndFailingSource.failOnRestoreStateCall = true;
jobGraph.setSavepointPath(savepointPath);

try {
flink.submitJobAndWait(jobGraph, false, deadline.timeLeft());
// If the savepoint state is not restored, we will wait here
// until the deadline times out.
fail("Did not throw expected Exception");
} catch (Exception ignored) {
} finally {
// Expecting one restore for the initial submission from
// savepoint and one for the execution retries
assertEquals(1 + numExecutionRetries, RestoreStateCountingAndFailingSource.numRestoreStateCalls);
}
}
finally {
if (flink != null) {
flink.shutdown();
}
}
}

// ------------------------------------------------------------------------
// Test program
// ------------------------------------------------------------------------
Expand All @@ -761,13 +869,7 @@ private JobGraph createJobGraph(
.shuffle()
.map(new StatefulCounter());

// Discard
stream.addSink(new SinkFunction<Integer>() {
private static final long serialVersionUID = -8671189807690005893L;
@Override
public void invoke(Integer value) throws Exception {
}
});
stream.addSink(new DiscardingSink<Integer>());

return env.getStreamGraph().getJobGraph();
}
Expand All @@ -779,7 +881,7 @@ private static class InfiniteTestSource
private volatile boolean running = true;

// Test control
private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(0);
private static CountDownLatch CheckpointCompleteLatch = new CountDownLatch(1);

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
Expand Down Expand Up @@ -837,4 +939,53 @@ public void restoreState(byte[] data) throws Exception {
}
}

/**
* Test source that counts calls to restoreState and that can be configured
* to fail on restoreState calls.
*/
private static class RestoreStateCountingAndFailingSource
implements SourceFunction<Integer>, Checkpointed, CheckpointListener {

private static final long serialVersionUID = 1L;

private static volatile int numRestoreStateCalls = 0;
private static volatile boolean failOnRestoreStateCall = false;
private static volatile CountDownLatch checkpointCompleteLatch = new CountDownLatch(1);
private static volatile int emitted = 0;

private volatile boolean running = true;

@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (running) {
ctx.collect(1);
emitted++;
}
}

@Override
public void cancel() {
running = false;
}

@Override
public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
return 1;
}

@Override
public void restoreState(Serializable state) throws Exception {
numRestoreStateCalls++;

if (failOnRestoreStateCall) {
throw new RuntimeException("Restore test failure");
}
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
checkpointCompleteLatch.countDown();
}
}

}

0 comments on commit c2a43c9

Please sign in to comment.