Skip to content
This repository has been archived by the owner on Aug 4, 2022. It is now read-only.

[REEF-308]: Race condition and clock skew issues in Tasks reporting back to JobDriver #198

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public boolean isClosed() {
public void onEvaluatorException(final EvaluatorException exception) {
synchronized (this.evaluatorDescriptor) {
if (this.stateManager.isDoneOrFailedOrKilled()) {
LOG.log(Level.FINE, "Ignoring an exception receivedfor Evaluator {0} which is already in state {1}.",
LOG.log(Level.FINE, "Ignoring an exception received for Evaluator {0} which is already in state {1}.",
new Object[]{this.getId(), this.stateManager});
return;
}
Expand Down Expand Up @@ -460,9 +460,19 @@ private void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskSta
if (!(this.task.isPresent() && this.task.get().getId().equals(taskStatusProto.getTaskId()))) {
if (taskStatusProto.getState() == ReefServiceProtos.State.INIT ||
taskStatusProto.getState() == ReefServiceProtos.State.FAILED ||
taskStatusProto.getState() == ReefServiceProtos.State.RUNNING ||
taskStatusProto.getRecovery() // for task from recovered evaluators
) {

// [REEF-308] exposes a bug where the .NET evaluator does not send its states in the right order
// [REEF-289] is a related item which may fix the issue
if (taskStatusProto.getState() == ReefServiceProtos.State.RUNNING) {
LOG.log(Level.WARNING,
"Received a message of state " + ReefServiceProtos.State.RUNNING +
" for Task " + taskStatusProto.getTaskId() +
" before receiving its " + ReefServiceProtos.State.INIT + " state");
}

// FAILED is a legal first state of a Task as it could have failed during construction.
this.task = Optional.of(
new TaskRepresenter(taskStatusProto.getTaskId(),
Expand All @@ -471,8 +481,9 @@ private void onTaskStatusMessage(final ReefServiceProtos.TaskStatusProto taskSta
this,
this.exceptionCodec));
} else {
throw new RuntimeException("Received an message of state " + taskStatusProto.getState() +
", not INIT or FAILED for Task " + taskStatusProto.getTaskId() + " which we haven't heard from before.");
throw new RuntimeException("Received a message of state " + taskStatusProto.getState() +
", not INIT, RUNNING, or FAILED for Task " + taskStatusProto.getTaskId() +
" which we haven't heard from before.");
}
}
this.task.get().onTaskStatusMessage(taskStatusProto);
Expand Down