Skip to content

Commit

Permalink
[FLINK-3244] [runtime] Add debug log messages to SavepointCoordinator
Browse files Browse the repository at this point in the history
  • Loading branch information
uce committed Jan 18, 2016
1 parent a5bba3c commit 7d7d8b6
Showing 1 changed file with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,12 @@
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.types.IntValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -198,14 +196,29 @@ public ApplicationID restoreSavepoint(
LOG.info("Savepoint: {}@{}", checkpoint.getCheckpointID(), checkpoint.getTimestamp());

// Set the initial state of all tasks
LOG.debug("Rolling back individual operators.");
for (StateForTask state : checkpoint.getStates()) {
LOG.debug("Rolling back subtask {} of operator {}.",
state.getSubtask(), state.getOperatorId());

ExecutionJobVertex vertex = tasks.get(state.getOperatorId());

if (vertex == null) {
String msg = String.format("Failed to rollback to savepoint %s. " +
"Cannot map old state for task %s to the new program. " +
"This indicates that the program has been changed after " +
"the savepoint.", savepoint, state.getOperatorId());
"This indicates that the program has been changed in a " +
"non-compatible way after the savepoint.", savepoint,
state.getOperatorId());
throw new IllegalStateException(msg);
}

if (state.getSubtask() >= vertex.getParallelism()) {
String msg = String.format("Failed to rollback to savepoint %s. " +
"Parallelism mismatch between savepoint state and new program. " +
"Cannot map subtask %d of operator %s to new program with " +
"parallelism %d. This indicates that the program has been changed " +
"in a non-compatible way after the savepoint.", savepoint,
state.getSubtask(), state.getOperatorId(), vertex.getParallelism());
throw new IllegalStateException(msg);
}

Expand Down

0 comments on commit 7d7d8b6

Please sign in to comment.