Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -1612,7 +1612,7 @@
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:RootExceptionInfo",
"properties" : {
"exceptionName" : {
"type" : "string"
Expand All @@ -1628,6 +1628,30 @@
},
"location" : {
"type" : "string"
},
"concurrentExceptions" : {
"type" : "array",
"items" : {
"type" : "object",
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo",
"properties" : {
"exceptionName" : {
"type" : "string"
},
"stacktrace" : {
"type" : "string"
},
"timestamp" : {
"type" : "integer"
},
"taskName" : {
"type" : "string"
},
"location" : {
"type" : "string"
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ public void suspend(Throwable suspensionCause) {
// stay in a terminal state
return;
} else if (transitionState(state, JobStatus.SUSPENDED, suspensionCause)) {
initFailureCause(suspensionCause);
initFailureCause(suspensionCause, System.currentTimeMillis());

incrementRestarts();

Expand Down Expand Up @@ -1045,9 +1045,9 @@ public void incrementRestarts() {
}

@Override
public void initFailureCause(Throwable t) {
public void initFailureCause(Throwable t, long timestamp) {
this.failureCause = t;
this.failureInfo = new ErrorInfo(t, System.currentTimeMillis());
this.failureInfo = new ErrorInfo(t, timestamp);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -1136,13 +1136,13 @@ private void allVerticesInTerminalState() {
}

@Override
public void failJob(Throwable cause) {
public void failJob(Throwable cause, long timestamp) {
if (state == JobStatus.FAILING || state.isTerminalState()) {
return;
}

transitionState(JobStatus.FAILING, cause);
initFailureCause(cause);
initFailureCause(cause, timestamp);

FutureUtils.assertNoException(
cancelVerticesAsync()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ void enableCheckpointing(
*/
void suspend(Throwable suspensionCause);

void failJob(Throwable cause);
void failJob(Throwable cause, long timestamp);

/**
* Returns the termination future of this {@link ExecutionGraph}. The termination future is
Expand All @@ -181,7 +181,7 @@ void enableCheckpointing(

void incrementRestarts();

void initFailureCause(Throwable t);
void initFailureCause(Throwable t, long timestamp);

/**
* Updates the state of one of the ExecutionVertex's Execution attempts. If the new status if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,15 @@ public ExecutionFailureHandler(
*
* @param failedTask is the ID of the failed task vertex
* @param cause of the task failure
* @param timestamp of the task failure
* @return result of the failure handling
*/
public FailureHandlingResult getFailureHandlingResult(
ExecutionVertexID failedTask, Throwable cause) {
ExecutionVertexID failedTask, Throwable cause, long timestamp) {
return handleFailure(
failedTask,
cause,
timestamp,
failoverStrategy.getTasksNeedingRestart(failedTask, cause),
false);
}
Expand All @@ -91,12 +93,15 @@ public FailureHandlingResult getFailureHandlingResult(
* for it.
*
* @param cause of the task failure
* @param timestamp of the task failure
* @return result of the failure handling
*/
public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable cause) {
public FailureHandlingResult getGlobalFailureHandlingResult(
final Throwable cause, long timestamp) {
return handleFailure(
null,
cause,
timestamp,
IterableUtils.toStream(schedulingTopology.getVertices())
.map(SchedulingExecutionVertex::getId)
.collect(Collectors.toSet()),
Expand All @@ -106,13 +111,15 @@ public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable caus
private FailureHandlingResult handleFailure(
@Nullable final ExecutionVertexID failingExecutionVertexId,
final Throwable cause,
long timestamp,
final Set<ExecutionVertexID> verticesToRestart,
final boolean globalFailure) {

if (isUnrecoverableError(cause)) {
return FailureHandlingResult.unrecoverable(
failingExecutionVertexId,
new JobException("The failure is not recoverable", cause),
timestamp,
globalFailure);
}

Expand All @@ -123,6 +130,7 @@ private FailureHandlingResult handleFailure(
return FailureHandlingResult.restartable(
failingExecutionVertexId,
cause,
timestamp,
verticesToRestart,
restartBackoffTimeStrategy.getBackoffTime(),
globalFailure);
Expand All @@ -131,6 +139,7 @@ private FailureHandlingResult handleFailure(
failingExecutionVertexId,
new JobException(
"Recovery is suppressed by " + restartBackoffTimeStrategy, cause),
timestamp,
globalFailure);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class FailureHandlingResult {
/** Failure reason. {@code @Nullable} because of FLINK-21376. */
@Nullable private final Throwable error;

/** Failure timestamp. */
private final long timestamp;

/** True if the original failure was a global failure. */
private final boolean globalFailure;

Expand All @@ -65,13 +68,15 @@ public class FailureHandlingResult {
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param cause the exception that caused this failure.
* @param timestamp the time the failure was handled.
* @param verticesToRestart containing task vertices to restart to recover from the failure.
* {@code null} indicates that the failure is not restartable.
* @param restartDelayMS indicate a delay before conducting the restart
*/
private FailureHandlingResult(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nullable Throwable cause,
long timestamp,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
boolean globalFailure) {
Expand All @@ -81,6 +86,7 @@ private FailureHandlingResult(
this.restartDelayMS = restartDelayMS;
this.failingExecutionVertexId = failingExecutionVertexId;
this.error = cause;
this.timestamp = timestamp;
this.globalFailure = globalFailure;
}

Expand All @@ -91,15 +97,18 @@ private FailureHandlingResult(
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param error reason why the failure is not recoverable
* @param timestamp the time the failure was handled.
*/
private FailureHandlingResult(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nonnull Throwable error,
long timestamp,
boolean globalFailure) {
this.verticesToRestart = null;
this.restartDelayMS = -1;
this.failingExecutionVertexId = failingExecutionVertexId;
this.error = checkNotNull(error);
this.timestamp = timestamp;
this.globalFailure = globalFailure;
}

Expand Down Expand Up @@ -147,11 +156,19 @@ public Optional<ExecutionVertexID> getExecutionVertexIdOfFailedTask() {
*
* @return reason why the restarting cannot be conducted
*/
@Nullable
public Throwable getError() {
return error;
}

/**
* Returns the time of the failure.
*
* @return The timestamp.
*/
public long getTimestamp() {
return timestamp;
}

/**
* Returns whether the restarting can be conducted.
*
Expand All @@ -178,6 +195,8 @@ public boolean isGlobalFailure() {
* @param failingExecutionVertexId the {@link ExecutionVertexID} refering to the {@link
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param cause The reason of the failure.
* @param timestamp The time of the failure.
* @param verticesToRestart containing task vertices to restart to recover from the failure.
* {@code null} indicates that the failure is not restartable.
* @param restartDelayMS indicate a delay before conducting the restart
Expand All @@ -186,11 +205,17 @@ public boolean isGlobalFailure() {
public static FailureHandlingResult restartable(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nonnull Throwable cause,
long timestamp,
@Nullable Set<ExecutionVertexID> verticesToRestart,
long restartDelayMS,
boolean globalFailure) {
return new FailureHandlingResult(
failingExecutionVertexId, cause, verticesToRestart, restartDelayMS, globalFailure);
failingExecutionVertexId,
cause,
timestamp,
verticesToRestart,
restartDelayMS,
globalFailure);
}

/**
Expand All @@ -203,12 +228,14 @@ public static FailureHandlingResult restartable(
* ExecutionVertex} the failure is originating from. Passing {@code null} as a value
* indicates that the failure was issued by Flink itself.
* @param error reason why the failure is not recoverable
* @param timestamp The time of the failure.
* @return result indicating the failure is not recoverable
*/
public static FailureHandlingResult unrecoverable(
@Nullable ExecutionVertexID failingExecutionVertexId,
@Nonnull Throwable error,
long timestamp,
boolean globalFailure) {
return new FailureHandlingResult(failingExecutionVertexId, error, globalFailure);
return new FailureHandlingResult(failingExecutionVertexId, error, timestamp, globalFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter;
import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
Expand All @@ -56,6 +57,7 @@
import java.util.Optional;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

/** Handler serving the job exceptions. */
public class JobExceptionsHandler
Expand Down Expand Up @@ -153,46 +155,69 @@ private static JobExceptionsInfoWithHistory createJobExceptionsInfo(
executionGraphInfo.getExceptionHistory(), exceptionToReportMaxSize));
}

static JobExceptionsInfoWithHistory.JobExceptionHistory createJobExceptionHistory(
Iterable<ExceptionHistoryEntry> historyEntries, int limit) {
private static JobExceptionsInfoWithHistory.JobExceptionHistory createJobExceptionHistory(
Iterable<RootExceptionHistoryEntry> historyEntries, int limit) {
// we need to reverse the history to have a stable result when doing paging on it
final List<ExceptionHistoryEntry> reversedHistoryEntries = new ArrayList<>();
final List<RootExceptionHistoryEntry> reversedHistoryEntries = new ArrayList<>();
Iterables.addAll(reversedHistoryEntries, historyEntries);
Collections.reverse(reversedHistoryEntries);

List<JobExceptionsInfoWithHistory.ExceptionInfo> exceptionHistoryEntries =
List<JobExceptionsInfoWithHistory.RootExceptionInfo> exceptionHistoryEntries =
reversedHistoryEntries.stream()
.limit(limit)
.map(JobExceptionsHandler::createExceptionInfo)
.map(JobExceptionsHandler::createRootExceptionInfo)
.collect(Collectors.toList());

return new JobExceptionsInfoWithHistory.JobExceptionHistory(
exceptionHistoryEntries,
exceptionHistoryEntries.size() < reversedHistoryEntries.size());
}

private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo(
ExceptionHistoryEntry historyEntry) {
private static JobExceptionsInfoWithHistory.RootExceptionInfo createRootExceptionInfo(
RootExceptionHistoryEntry historyEntry) {
final List<JobExceptionsInfoWithHistory.ExceptionInfo> concurrentExceptions =
StreamSupport.stream(historyEntry.getConcurrentExceptions().spliterator(), false)
.map(JobExceptionsHandler::createExceptionInfo)
.collect(Collectors.toList());

if (historyEntry.isGlobal()) {
return new JobExceptionsInfoWithHistory.ExceptionInfo(
return new JobExceptionsInfoWithHistory.RootExceptionInfo(
historyEntry.getException().getOriginalErrorClassName(),
historyEntry.getExceptionAsString(),
historyEntry.getTimestamp());
historyEntry.getTimestamp(),
concurrentExceptions);
}

Preconditions.checkArgument(
historyEntry.getFailingTaskName() != null,
"The taskName must not be null for a non-global failure.");
Preconditions.checkArgument(
historyEntry.getTaskManagerLocation() != null,
"The location must not be null for a non-global failure.");
assertLocalExceptionInfo(historyEntry);

return new JobExceptionsInfoWithHistory.ExceptionInfo(
return new JobExceptionsInfoWithHistory.RootExceptionInfo(
historyEntry.getException().getOriginalErrorClassName(),
historyEntry.getExceptionAsString(),
historyEntry.getTimestamp(),
historyEntry.getFailingTaskName(),
toString(historyEntry.getTaskManagerLocation()));
toString(historyEntry.getTaskManagerLocation()),
concurrentExceptions);
}

private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo(
ExceptionHistoryEntry exceptionHistoryEntry) {
assertLocalExceptionInfo(exceptionHistoryEntry);

return new JobExceptionsInfoWithHistory.ExceptionInfo(
exceptionHistoryEntry.getException().getOriginalErrorClassName(),
exceptionHistoryEntry.getExceptionAsString(),
exceptionHistoryEntry.getTimestamp(),
exceptionHistoryEntry.getFailingTaskName(),
toString(exceptionHistoryEntry.getTaskManagerLocation()));
}

private static void assertLocalExceptionInfo(ExceptionHistoryEntry exceptionHistoryEntry) {
Preconditions.checkArgument(
exceptionHistoryEntry.getFailingTaskName() != null,
"The taskName must not be null for a non-global failure.");
Preconditions.checkArgument(
exceptionHistoryEntry.getTaskManagerLocation() != null,
"The location must not be null for a non-global failure.");
}

@VisibleForTesting
Expand Down
Loading