Skip to content

Commit

Permalink
[hotfix][tests] Transition to RECOVERING before RUNNING in DefaultSch…
Browse files Browse the repository at this point in the history
…edulerTest
  • Loading branch information
rkhachatryan committed Apr 6, 2021
1 parent aadd02e commit 9cb1610
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -744,8 +744,7 @@ public void abortPendingCheckpointsWhenRestartingTasks() throws Exception {
.getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
new TaskExecutionState(attemptId, ExecutionState.RUNNING));
transitionToRunning(scheduler, attemptId);

final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);

Expand Down Expand Up @@ -775,8 +774,7 @@ public void restoreStateWhenRestartingTasks() throws Exception {
.getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
new TaskExecutionState(attemptId, ExecutionState.RUNNING));
transitionToRunning(scheduler, attemptId);

final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);

Expand Down Expand Up @@ -814,8 +812,7 @@ public void failGlobalWhenRestoringStateFails() throws Exception {
.getAllExecutionVertices());
final ExecutionAttemptID attemptId =
onlyExecutionVertex.getCurrentExecutionAttempt().getAttemptId();
scheduler.updateTaskExecutionState(
new TaskExecutionState(attemptId, ExecutionState.RUNNING));
transitionToRunning(scheduler, attemptId);

final CheckpointCoordinator checkpointCoordinator = getCheckpointCoordinator(scheduler);

Expand Down Expand Up @@ -1326,4 +1323,13 @@ private CountDownLatch getCheckpointTriggeredLatch() {
});
return checkpointTriggeredLatch;
}

private void transitionToRunning(DefaultScheduler scheduler, ExecutionAttemptID attemptId) {
Preconditions.checkState(
scheduler.updateTaskExecutionState(
new TaskExecutionState(attemptId, ExecutionState.RECOVERING)));
Preconditions.checkState(
scheduler.updateTaskExecutionState(
new TaskExecutionState(attemptId, ExecutionState.RUNNING)));
}
}

0 comments on commit 9cb1610

Please sign in to comment.