Skip to content

Commit

Permalink
[hotfix][runtime] Extract SchedulerBase.processCheckpointCoordinatorM…
Browse files Browse the repository at this point in the history
…essage
  • Loading branch information
rkhachatryan committed Jan 14, 2021
1 parent 987dfd6 commit 6f34de5
Showing 1 changed file with 29 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.function.FunctionUtils;

import org.apache.flink.util.function.ThrowingConsumer;
import org.slf4j.Logger;

import javax.annotation.Nullable;
Expand Down Expand Up @@ -1015,95 +1016,56 @@ public void acknowledgeCheckpoint(
final long checkpointId,
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
mainThreadExecutor.assertRunningInMainThread();

final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
final AcknowledgeCheckpoint ackMessage =
new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);

final String taskManagerLocationInfo = retrieveTaskManagerLocation(executionAttemptID);

if (checkpointCoordinator != null) {
ioExecutor.execute(
() -> {
try {
checkpointCoordinator.receiveAcknowledgeMessage(
ackMessage, taskManagerLocationInfo);
} catch (Throwable t) {
log.warn(
"Error while processing checkpoint acknowledgement message", t);
}
});
} else {
String errorMessage =
"Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
processCheckpointCoordinatorMessage(
"AcknowledgeCheckpoint",
coordinator ->
coordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState),
retrieveTaskManagerLocation(executionAttemptID)));
}

@Override
public void reportCheckpointMetrics(
JobID jobID, ExecutionAttemptID attemptId, long id, CheckpointMetrics metrics) {
mainThreadExecutor.assertRunningInMainThread();

final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();

if (checkpointCoordinator != null) {
ioExecutor.execute(
() -> {
try {
checkpointCoordinator.reportStats(id, attemptId, metrics);
} catch (Throwable t) {
log.warn("Error while processing report checkpoint stats message", t);
}
});
} else {
String errorMessage =
"Received ReportCheckpointStats message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
log.debug(errorMessage, jobGraph.getJobID());
}
}
processCheckpointCoordinatorMessage(
"ReportCheckpointStats",
coordinator -> coordinator.reportStats(id, attemptId, metrics));
}

@Override
public void declineCheckpoint(final DeclineCheckpoint decline) {
processCheckpointCoordinatorMessage(
"DeclineCheckpoint",
coordinator ->
coordinator.receiveDeclineMessage(
decline,
retrieveTaskManagerLocation(decline.getTaskExecutionId())));
}

private void processCheckpointCoordinatorMessage(
String messageType, ThrowingConsumer<CheckpointCoordinator, Exception> process) {
mainThreadExecutor.assertRunningInMainThread();

final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
final String taskManagerLocationInfo =
retrieveTaskManagerLocation(decline.getTaskExecutionId());

if (checkpointCoordinator != null) {
ioExecutor.execute(
() -> {
try {
checkpointCoordinator.receiveDeclineMessage(
decline, taskManagerLocationInfo);
} catch (Exception e) {
log.error(
"Error in CheckpointCoordinator while processing {}",
decline,
e);
process.accept(checkpointCoordinator);
} catch (Exception t) {
log.warn("Error while processing " + messageType + " message", t);
}
});
} else {
String errorMessage =
"Received DeclineCheckpoint message for job {} with no CheckpointCoordinator";
"Received " + messageType + " message for job {} with no CheckpointCoordinator";
if (executionGraph.getState() == JobStatus.RUNNING) {
log.error(errorMessage, jobGraph.getJobID());
} else {
Expand Down

0 comments on commit 6f34de5

Please sign in to comment.