diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index f8d3e43e1ef12..6321a4fa4587e 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -1612,7 +1612,7 @@ "type" : "array", "items" : { "type" : "object", - "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:RootExceptionInfo", "properties" : { "exceptionName" : { "type" : "string" @@ -1628,6 +1628,30 @@ }, "location" : { "type" : "string" + }, + "concurrentExceptions" : { + "type" : "array", + "items" : { + "type" : "object", + "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:JobExceptionsInfoWithHistory:ExceptionInfo", + "properties" : { + "exceptionName" : { + "type" : "string" + }, + "stacktrace" : { + "type" : "string" + }, + "timestamp" : { + "type" : "integer" + }, + "taskName" : { + "type" : "string" + }, + "location" : { + "type" : "string" + } + } + } } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java index a96bd4ce5b9e7..7a9ea67d1ddfe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java @@ -905,7 +905,7 @@ public void suspend(Throwable suspensionCause) { // stay in a terminal state return; } else if (transitionState(state, JobStatus.SUSPENDED, suspensionCause)) { - initFailureCause(suspensionCause); + initFailureCause(suspensionCause, System.currentTimeMillis()); incrementRestarts(); @@ -1045,9 +1045,9 @@ public void incrementRestarts() { } @Override - public void initFailureCause(Throwable t) { + public void initFailureCause(Throwable t, long timestamp) { this.failureCause = t; - this.failureInfo = new ErrorInfo(t, System.currentTimeMillis()); + this.failureInfo = new ErrorInfo(t, timestamp); } // ------------------------------------------------------------------------ @@ -1136,13 +1136,13 @@ private void allVerticesInTerminalState() { } @Override - public void failJob(Throwable cause) { + public void failJob(Throwable cause, long timestamp) { if (state == JobStatus.FAILING || state.isTerminalState()) { return; } transitionState(JobStatus.FAILING, cause); - initFailureCause(cause); + initFailureCause(cause, timestamp); FutureUtils.assertNoException( cancelVerticesAsync() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 9a43562b5b642..1be8b96160167 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -163,7 +163,7 @@ void enableCheckpointing( */ void suspend(Throwable suspensionCause); - void failJob(Throwable cause); + void failJob(Throwable cause, long timestamp); /** * Returns the termination future of this {@link ExecutionGraph}. The termination future is @@ -181,7 +181,7 @@ void enableCheckpointing( void incrementRestarts(); - void initFailureCause(Throwable t); + void initFailureCause(Throwable t, long timestamp); /** * Updates the state of one of the ExecutionVertex's Execution attempts. If the new status if diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java index a3b99e7ebdb08..d2fc70f0bd87b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandler.java @@ -74,13 +74,15 @@ public ExecutionFailureHandler( * * @param failedTask is the ID of the failed task vertex * @param cause of the task failure + * @param timestamp of the task failure * @return result of the failure handling */ public FailureHandlingResult getFailureHandlingResult( - ExecutionVertexID failedTask, Throwable cause) { + ExecutionVertexID failedTask, Throwable cause, long timestamp) { return handleFailure( failedTask, cause, + timestamp, failoverStrategy.getTasksNeedingRestart(failedTask, cause), false); } @@ -91,12 +93,15 @@ public FailureHandlingResult getFailureHandlingResult( * for it. * * @param cause of the task failure + * @param timestamp of the task failure * @return result of the failure handling */ - public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable cause) { + public FailureHandlingResult getGlobalFailureHandlingResult( + final Throwable cause, long timestamp) { return handleFailure( null, cause, + timestamp, IterableUtils.toStream(schedulingTopology.getVertices()) .map(SchedulingExecutionVertex::getId) .collect(Collectors.toSet()), @@ -106,6 +111,7 @@ public FailureHandlingResult getGlobalFailureHandlingResult(final Throwable caus private FailureHandlingResult handleFailure( @Nullable final ExecutionVertexID failingExecutionVertexId, final Throwable cause, + long timestamp, final Set verticesToRestart, final boolean globalFailure) { @@ -113,6 +119,7 @@ private FailureHandlingResult handleFailure( return FailureHandlingResult.unrecoverable( failingExecutionVertexId, new JobException("The failure is not recoverable", cause), + timestamp, globalFailure); } @@ -123,6 +130,7 @@ private FailureHandlingResult handleFailure( return FailureHandlingResult.restartable( failingExecutionVertexId, cause, + timestamp, verticesToRestart, restartBackoffTimeStrategy.getBackoffTime(), globalFailure); @@ -131,6 +139,7 @@ private FailureHandlingResult handleFailure( failingExecutionVertexId, new JobException( "Recovery is suppressed by " + restartBackoffTimeStrategy, cause), + timestamp, globalFailure); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java index 5782e26b4fa6b..957fb92ca7d1a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResult.java @@ -55,6 +55,9 @@ public class FailureHandlingResult { /** Failure reason. {@code @Nullable} because of FLINK-21376. */ @Nullable private final Throwable error; + /** Failure timestamp. */ + private final long timestamp; + /** True if the original failure was a global failure. */ private final boolean globalFailure; @@ -65,6 +68,7 @@ public class FailureHandlingResult { * ExecutionVertex} the failure is originating from. Passing {@code null} as a value * indicates that the failure was issued by Flink itself. * @param cause the exception that caused this failure. + * @param timestamp the time the failure was handled. * @param verticesToRestart containing task vertices to restart to recover from the failure. * {@code null} indicates that the failure is not restartable. * @param restartDelayMS indicate a delay before conducting the restart @@ -72,6 +76,7 @@ public class FailureHandlingResult { private FailureHandlingResult( @Nullable ExecutionVertexID failingExecutionVertexId, @Nullable Throwable cause, + long timestamp, @Nullable Set verticesToRestart, long restartDelayMS, boolean globalFailure) { @@ -81,6 +86,7 @@ private FailureHandlingResult( this.restartDelayMS = restartDelayMS; this.failingExecutionVertexId = failingExecutionVertexId; this.error = cause; + this.timestamp = timestamp; this.globalFailure = globalFailure; } @@ -91,15 +97,18 @@ private FailureHandlingResult( * ExecutionVertex} the failure is originating from. Passing {@code null} as a value * indicates that the failure was issued by Flink itself. * @param error reason why the failure is not recoverable + * @param timestamp the time the failure was handled. */ private FailureHandlingResult( @Nullable ExecutionVertexID failingExecutionVertexId, @Nonnull Throwable error, + long timestamp, boolean globalFailure) { this.verticesToRestart = null; this.restartDelayMS = -1; this.failingExecutionVertexId = failingExecutionVertexId; this.error = checkNotNull(error); + this.timestamp = timestamp; this.globalFailure = globalFailure; } @@ -147,11 +156,19 @@ public Optional getExecutionVertexIdOfFailedTask() { * * @return reason why the restarting cannot be conducted */ - @Nullable public Throwable getError() { return error; } + /** + * Returns the time of the failure. + * + * @return The timestamp. + */ + public long getTimestamp() { + return timestamp; + } + /** * Returns whether the restarting can be conducted. * @@ -178,6 +195,8 @@ public boolean isGlobalFailure() { * @param failingExecutionVertexId the {@link ExecutionVertexID} refering to the {@link * ExecutionVertex} the failure is originating from. Passing {@code null} as a value * indicates that the failure was issued by Flink itself. + * @param cause The reason of the failure. + * @param timestamp The time of the failure. * @param verticesToRestart containing task vertices to restart to recover from the failure. * {@code null} indicates that the failure is not restartable. * @param restartDelayMS indicate a delay before conducting the restart @@ -186,11 +205,17 @@ public boolean isGlobalFailure() { public static FailureHandlingResult restartable( @Nullable ExecutionVertexID failingExecutionVertexId, @Nonnull Throwable cause, + long timestamp, @Nullable Set verticesToRestart, long restartDelayMS, boolean globalFailure) { return new FailureHandlingResult( - failingExecutionVertexId, cause, verticesToRestart, restartDelayMS, globalFailure); + failingExecutionVertexId, + cause, + timestamp, + verticesToRestart, + restartDelayMS, + globalFailure); } /** @@ -203,12 +228,14 @@ public static FailureHandlingResult restartable( * ExecutionVertex} the failure is originating from. Passing {@code null} as a value * indicates that the failure was issued by Flink itself. * @param error reason why the failure is not recoverable + * @param timestamp The time of the failure. * @return result indicating the failure is not recoverable */ public static FailureHandlingResult unrecoverable( @Nullable ExecutionVertexID failingExecutionVertexId, @Nonnull Throwable error, + long timestamp, boolean globalFailure) { - return new FailureHandlingResult(failingExecutionVertexId, error, globalFailure); + return new FailureHandlingResult(failingExecutionVertexId, error, timestamp, globalFailure); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index cf9a3cdf50a55..c7de5d70f083f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -34,8 +34,9 @@ import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter; -import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; +import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; @@ -56,6 +57,7 @@ import java.util.Optional; import java.util.concurrent.Executor; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; /** Handler serving the job exceptions. */ public class JobExceptionsHandler @@ -153,17 +155,17 @@ private static JobExceptionsInfoWithHistory createJobExceptionsInfo( executionGraphInfo.getExceptionHistory(), exceptionToReportMaxSize)); } - static JobExceptionsInfoWithHistory.JobExceptionHistory createJobExceptionHistory( - Iterable historyEntries, int limit) { + private static JobExceptionsInfoWithHistory.JobExceptionHistory createJobExceptionHistory( + Iterable historyEntries, int limit) { // we need to reverse the history to have a stable result when doing paging on it - final List reversedHistoryEntries = new ArrayList<>(); + final List reversedHistoryEntries = new ArrayList<>(); Iterables.addAll(reversedHistoryEntries, historyEntries); Collections.reverse(reversedHistoryEntries); - List exceptionHistoryEntries = + List exceptionHistoryEntries = reversedHistoryEntries.stream() .limit(limit) - .map(JobExceptionsHandler::createExceptionInfo) + .map(JobExceptionsHandler::createRootExceptionInfo) .collect(Collectors.toList()); return new JobExceptionsInfoWithHistory.JobExceptionHistory( @@ -171,28 +173,51 @@ static JobExceptionsInfoWithHistory.JobExceptionHistory createJobExceptionHistor exceptionHistoryEntries.size() < reversedHistoryEntries.size()); } - private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo( - ExceptionHistoryEntry historyEntry) { + private static JobExceptionsInfoWithHistory.RootExceptionInfo createRootExceptionInfo( + RootExceptionHistoryEntry historyEntry) { + final List concurrentExceptions = + StreamSupport.stream(historyEntry.getConcurrentExceptions().spliterator(), false) + .map(JobExceptionsHandler::createExceptionInfo) + .collect(Collectors.toList()); + if (historyEntry.isGlobal()) { - return new JobExceptionsInfoWithHistory.ExceptionInfo( + return new JobExceptionsInfoWithHistory.RootExceptionInfo( historyEntry.getException().getOriginalErrorClassName(), historyEntry.getExceptionAsString(), - historyEntry.getTimestamp()); + historyEntry.getTimestamp(), + concurrentExceptions); } - Preconditions.checkArgument( - historyEntry.getFailingTaskName() != null, - "The taskName must not be null for a non-global failure."); - Preconditions.checkArgument( - historyEntry.getTaskManagerLocation() != null, - "The location must not be null for a non-global failure."); + assertLocalExceptionInfo(historyEntry); - return new JobExceptionsInfoWithHistory.ExceptionInfo( + return new JobExceptionsInfoWithHistory.RootExceptionInfo( historyEntry.getException().getOriginalErrorClassName(), historyEntry.getExceptionAsString(), historyEntry.getTimestamp(), historyEntry.getFailingTaskName(), - toString(historyEntry.getTaskManagerLocation())); + toString(historyEntry.getTaskManagerLocation()), + concurrentExceptions); + } + + private static JobExceptionsInfoWithHistory.ExceptionInfo createExceptionInfo( + ExceptionHistoryEntry exceptionHistoryEntry) { + assertLocalExceptionInfo(exceptionHistoryEntry); + + return new JobExceptionsInfoWithHistory.ExceptionInfo( + exceptionHistoryEntry.getException().getOriginalErrorClassName(), + exceptionHistoryEntry.getExceptionAsString(), + exceptionHistoryEntry.getTimestamp(), + exceptionHistoryEntry.getFailingTaskName(), + toString(exceptionHistoryEntry.getTaskManagerLocation())); + } + + private static void assertLocalExceptionInfo(ExceptionHistoryEntry exceptionHistoryEntry) { + Preconditions.checkArgument( + exceptionHistoryEntry.getFailingTaskName() != null, + "The taskName must not be null for a non-global failure."); + Preconditions.checkArgument( + exceptionHistoryEntry.getTaskManagerLocation() != null, + "The location must not be null for a non-global failure."); } @VisibleForTesting diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java index bd0b2aa374bb1..11f4e472b4297 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobExceptionsInfoWithHistory.java @@ -27,6 +27,7 @@ import javax.annotation.Nullable; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -71,6 +72,8 @@ public JobExceptionHistory getExceptionHistory() { return exceptionHistory; } + // hashCode and equals are necessary for the test classes deriving from + // RestResponseMarshallingTestBase @Override public boolean equals(Object o) { if (this == o) { @@ -115,21 +118,21 @@ public static final class JobExceptionHistory { public static final String FIELD_NAME_TRUNCATED = "truncated"; @JsonProperty(FIELD_NAME_ENTRIES) - private final List entries; + private final List entries; @JsonProperty(FIELD_NAME_TRUNCATED) private final boolean truncated; @JsonCreator public JobExceptionHistory( - @JsonProperty(FIELD_NAME_ENTRIES) List entries, + @JsonProperty(FIELD_NAME_ENTRIES) List entries, @JsonProperty(FIELD_NAME_TRUNCATED) boolean truncated) { this.entries = entries; this.truncated = truncated; } @JsonIgnore - public List getEntries() { + public List getEntries() { return entries; } @@ -138,6 +141,8 @@ public boolean isTruncated() { return truncated; } + // hashCode and equals are necessary for the test classes deriving from + // RestResponseMarshallingTestBase @Override public boolean equals(Object o) { if (this == o) { @@ -165,7 +170,10 @@ public String toString() { } } - /** Collects the information of a single exception. */ + /** + * Json equivalent of {@link + * org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry}. + */ public static class ExceptionInfo { public static final String FIELD_NAME_EXCEPTION_NAME = "exceptionName"; @@ -238,6 +246,8 @@ public String getLocation() { return location; } + // hashCode and equals are necessary for the test classes deriving from + // RestResponseMarshallingTestBase @Override public boolean equals(Object o) { if (this == o) { @@ -270,4 +280,73 @@ public String toString() { .toString(); } } + + /** + * Json equivalent of {@link + * org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry}. + */ + public static class RootExceptionInfo extends ExceptionInfo { + + public static final String FIELD_NAME_CONCURRENT_EXCEPTIONS = "concurrentExceptions"; + + @JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS) + private final Collection concurrentExceptions; + + public RootExceptionInfo( + String exceptionName, + String stacktrace, + long timestamp, + Collection concurrentExceptions) { + this(exceptionName, stacktrace, timestamp, null, null, concurrentExceptions); + } + + @JsonCreator + public RootExceptionInfo( + @JsonProperty(FIELD_NAME_EXCEPTION_NAME) String exceptionName, + @JsonProperty(FIELD_NAME_EXCEPTION_STACKTRACE) String stacktrace, + @JsonProperty(FIELD_NAME_EXCEPTION_TIMESTAMP) long timestamp, + @JsonProperty(FIELD_NAME_TASK_NAME) @Nullable String taskName, + @JsonProperty(FIELD_NAME_LOCATION) @Nullable String location, + @JsonProperty(FIELD_NAME_CONCURRENT_EXCEPTIONS) + Collection concurrentExceptions) { + super(exceptionName, stacktrace, timestamp, taskName, location); + this.concurrentExceptions = concurrentExceptions; + } + + @JsonIgnore + public Collection getConcurrentExceptions() { + return concurrentExceptions; + } + + // hashCode and equals are necessary for the test classes deriving from + // RestResponseMarshallingTestBase + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass() || !super.equals(o)) { + return false; + } + RootExceptionInfo that = (RootExceptionInfo) o; + return getConcurrentExceptions().equals(that.getConcurrentExceptions()); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), getConcurrentExceptions()); + } + + @Override + public String toString() { + return new StringJoiner(", ", RootExceptionInfo.class.getSimpleName() + "[", "]") + .add("exceptionName='" + getExceptionName() + "'") + .add("stacktrace='" + getStacktrace() + "'") + .add("timestamp=" + getTimestamp()) + .add("taskName='" + getTaskName() + "'") + .add("location='" + getLocation() + "'") + .add("concurrentExceptions=" + getConcurrentExceptions()) + .toString(); + } + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index be291928630d8..2618d276b7314 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -199,10 +199,12 @@ private void maybeHandleTaskFailure( private void handleTaskFailure( final ExecutionVertexID executionVertexId, @Nullable final Throwable error) { - setGlobalFailureCause(error); + final long timestamp = System.currentTimeMillis(); + setGlobalFailureCause(error, timestamp); notifyCoordinatorsAboutTaskFailure(executionVertexId, error); final FailureHandlingResult failureHandlingResult = - executionFailureHandler.getFailureHandlingResult(executionVertexId, error); + executionFailureHandler.getFailureHandlingResult( + executionVertexId, error, timestamp); maybeRestartTasks(failureHandlingResult); } @@ -217,11 +219,12 @@ private void notifyCoordinatorsAboutTaskFailure( @Override public void handleGlobalFailure(final Throwable error) { - setGlobalFailureCause(error); + final long timestamp = System.currentTimeMillis(); + setGlobalFailureCause(error, timestamp); log.info("Trying to recover from a global failure.", error); final FailureHandlingResult failureHandlingResult = - executionFailureHandler.getGlobalFailureHandlingResult(error); + executionFailureHandler.getGlobalFailureHandlingResult(error, timestamp); maybeRestartTasks(failureHandlingResult); } @@ -229,7 +232,7 @@ private void maybeRestartTasks(final FailureHandlingResult failureHandlingResult if (failureHandlingResult.canRestart()) { restartTasksWithDelay(failureHandlingResult); } else { - failJob(failureHandlingResult.getError()); + failJob(failureHandlingResult.getError(), failureHandlingResult.getTimestamp()); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java index d43a35549bcee..683c1ef290d8a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import java.io.Serializable; import java.util.Collections; @@ -34,7 +35,7 @@ public class ExecutionGraphInfo implements Serializable { private static final long serialVersionUID = -6134203195124124202L; private final ArchivedExecutionGraph executionGraph; - private final Iterable exceptionHistory; + private final Iterable exceptionHistory; public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) { this(executionGraph, Collections.emptyList()); @@ -42,7 +43,7 @@ public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) { public ExecutionGraphInfo( ArchivedExecutionGraph executionGraph, - Iterable exceptionHistory) { + Iterable exceptionHistory) { this.executionGraph = executionGraph; this.exceptionHistory = exceptionHistory; } @@ -55,7 +56,7 @@ public ArchivedExecutionGraph getArchivedExecutionGraph() { return executionGraph; } - public Iterable getExceptionHistory() { + public Iterable getExceptionHistory() { return exceptionHistory; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index f75bde1ec7449..30ee8b339b1e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -73,6 +73,8 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.UnknownKvStateLocation; +import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntryExtractor; +import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationHandlerImpl; import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointTerminationManager; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; @@ -142,7 +144,8 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling private final ComponentMainThreadExecutor mainThreadExecutor; - private final BoundedFIFOQueue exceptionHistory; + private final ExceptionHistoryEntryExtractor exceptionHistoryEntryExtractor; + private final BoundedFIFOQueue exceptionHistory; private final ExecutionGraphFactory executionGraphFactory; @@ -207,7 +210,9 @@ public SchedulerBase( this.operatorCoordinatorHandler = new DefaultOperatorCoordinatorHandler(executionGraph, this::handleGlobalFailure); operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor); - exceptionHistory = + + this.exceptionHistoryEntryExtractor = new ExceptionHistoryEntryExtractor(); + this.exceptionHistory = new BoundedFIFOQueue<>( jobMasterConfiguration.getInteger( JobManagerOptions.MAX_EXCEPTION_HISTORY_SIZE)); @@ -398,9 +403,9 @@ protected void transitionToScheduled(final List verticesToDep .transitionState(ExecutionState.SCHEDULED)); } - protected void setGlobalFailureCause(@Nullable final Throwable cause) { + protected void setGlobalFailureCause(@Nullable final Throwable cause, long timestamp) { if (cause != null) { - executionGraph.initFailureCause(cause); + executionGraph.initFailureCause(cause, timestamp); } } @@ -408,9 +413,9 @@ protected ComponentMainThreadExecutor getMainThreadExecutor() { return mainThreadExecutor; } - protected void failJob(Throwable cause) { + protected void failJob(Throwable cause, long timestamp) { incrementVersionsOfAllVertices(); - executionGraph.failJob(cause); + executionGraph.failJob(cause, timestamp); getJobTerminationFuture().thenRun(() -> archiveGlobalFailure(cause)); } @@ -539,34 +544,34 @@ protected final void archiveGlobalFailure(@Nullable Throwable failure) { archiveGlobalFailure(failure, executionGraph.getStatusTimestamp(JobStatus.FAILED)); } - protected final void archiveGlobalFailure(@Nullable Throwable failure, long timestamp) { - exceptionHistory.add(ExceptionHistoryEntry.fromGlobalFailure(failure, timestamp)); + private void archiveGlobalFailure(@Nullable Throwable failure, long timestamp) { + exceptionHistory.add( + exceptionHistoryEntryExtractor.extractGlobalFailure( + executionGraph.getAllExecutionVertices(), failure, timestamp)); log.debug("Archive global failure.", failure); } protected final void archiveFromFailureHandlingResult( FailureHandlingResult failureHandlingResult) { - final Optional executionOptional = - failureHandlingResult - .getExecutionVertexIdOfFailedTask() - .map(this::getExecutionVertex) - .map(ExecutionVertex::getCurrentExecutionAttempt); - - if (executionOptional.isPresent()) { - final Execution failedExecution = executionOptional.get(); - final ExceptionHistoryEntry exceptionHistoryEntry = - ExceptionHistoryEntry.fromFailedExecution( - failedExecution, - failedExecution.getVertex().getTaskNameWithSubtaskIndex()); - exceptionHistory.add(exceptionHistoryEntry); + if (failureHandlingResult.getExecutionVertexIdOfFailedTask().isPresent()) { + final ExecutionVertexID executionVertexId = + failureHandlingResult.getExecutionVertexIdOfFailedTask().get(); + final RootExceptionHistoryEntry rootEntry = + exceptionHistoryEntryExtractor.extractLocalFailure( + executionGraph.getAllVertices(), + executionVertexId, + failureHandlingResult.getVerticesToRestart().stream() + .filter(v -> !executionVertexId.equals(v)) + .collect(Collectors.toSet())); + exceptionHistory.add(rootEntry); + log.debug( "Archive local failure causing attempt {} to fail: {}", - failedExecution.getAttemptId(), - exceptionHistoryEntry.getExceptionAsString()); + executionVertexId, + rootEntry.getExceptionAsString()); } else { - // fallback in case of a global fail over - no failed state is set and, therefore, no - // timestamp was taken - archiveGlobalFailure(failureHandlingResult.getError(), System.currentTimeMillis()); + archiveGlobalFailure( + failureHandlingResult.getError(), failureHandlingResult.getTimestamp()); } } @@ -650,14 +655,9 @@ public final void notifyPartitionDataAvailable(final ResultPartitionID partition protected void notifyPartitionDataAvailableInternal( IntermediateResultPartitionID resultPartitionId) {} - /** - * Returns a copy of the current history of task failures. - * - * @return a copy of the current history of task failures. - */ @VisibleForTesting - protected Iterable getExceptionHistory() { - final Collection copy = new ArrayList<>(exceptionHistory.size()); + Iterable getExceptionHistory() { + final Collection copy = new ArrayList<>(exceptionHistory.size()); exceptionHistory.forEach(copy::add); return copy; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java index af249b597f750..dc44c3c081dac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Failing.java @@ -42,7 +42,7 @@ class Failing extends StateWithExecutionGraph { super(context, executionGraph, executionGraphHandler, operatorCoordinatorHandler, logger); this.context = context; - getExecutionGraph().failJob(failureCause); + getExecutionGraph().failJob(failureCause, System.currentTimeMillis()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java similarity index 74% rename from flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java index 83f615417407b..08a69efac5b9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntry.java @@ -16,11 +16,10 @@ * limitations under the License. */ -package org.apache.flink.runtime.scheduler; +package org.apache.flink.runtime.scheduler.exceptionhistory; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -40,45 +39,19 @@ public class ExceptionHistoryEntry extends ErrorInfo { @Nullable private final String failingTaskName; @Nullable private final ArchivedTaskManagerLocation taskManagerLocation; - /** - * Creates a {@code ExceptionHistoryEntry} representing a global failure from the passed {@code - * Throwable} and timestamp. - * - * @param cause The reason for the failure. - * @param timestamp The time the failure was caught. - * @return The {@code ExceptionHistoryEntry} instance. - */ - public static ExceptionHistoryEntry fromGlobalFailure(Throwable cause, long timestamp) { - return new ExceptionHistoryEntry(cause, timestamp, null, null); - } - - /** - * Creates a {@code ExceptionHistoryEntry} representing a local failure using the passed - * information. - * - * @param execution The {@link AccessExecution} that caused the failure. - * @param failingTaskName The name of the task the {@code execution} is connected to. - * @return The {@code ExceptionHistoryEntry} instance. - */ - public static ExceptionHistoryEntry fromFailedExecution( - AccessExecution execution, String failingTaskName) { - ErrorInfo failureInfo = - execution - .getFailureInfo() - .orElseThrow( - () -> - new IllegalArgumentException( - "The passed Execution does not provide a failureCause.")); - return new ExceptionHistoryEntry( - failureInfo.getException(), - failureInfo.getTimestamp(), + ExceptionHistoryEntry( + Throwable cause, + long timestamp, + @Nullable String failingTaskName, + @Nullable TaskManagerLocation taskManagerLocation) { + this( + cause, + timestamp, failingTaskName, - ArchivedTaskManagerLocation.fromTaskManagerLocation( - execution.getAssignedResourceLocation())); + ArchivedTaskManagerLocation.fromTaskManagerLocation(taskManagerLocation)); } - @VisibleForTesting - public ExceptionHistoryEntry( + private ExceptionHistoryEntry( Throwable cause, long timestamp, @Nullable String failingTaskName, @@ -127,7 +100,7 @@ public static class ArchivedTaskManagerLocation implements Serializable { */ @VisibleForTesting @Nullable - public static ArchivedTaskManagerLocation fromTaskManagerLocation( + static ArchivedTaskManagerLocation fromTaskManagerLocation( TaskManagerLocation taskManagerLocation) { if (taskManagerLocation == null) { return null; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractor.java new file mode 100644 index 0000000000000..09030aa8629e8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractor.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedThrowable; +import org.apache.flink.util.function.QuadFunction; + +import java.util.Map; + +/** + * {@code ExceptionHistoryEntryExtractor} extracts all the necessary information from given + * executions to create corresponding {@link RootExceptionHistoryEntry RootExceptionHistoryEntries}. + */ +public class ExceptionHistoryEntryExtractor { + + /** + * Extracts a {@link RootExceptionHistoryEntry} based on the passed local failure information. + * + * @param executionJobVertices The {@link ExecutionJobVertex} instances registry. + * @param failedExecutionVertexId The {@link ExecutionVertexID} referring to the {@link + * ExecutionVertex} that is the root of the failure. + * @param otherAffectedVertices The {@code ExecutionVertexID}s of other affected {@code + * ExecutionVertices} that, if failed as well, would be added as concurrent failures. + * @return The {@code RootExceptionHistoryEntry}. + * @throws IllegalArgumentException if one of the passed {@code ExecutionVertexID}s cannot be + * resolved into an {@code ExecutionVertex}. + * @throws IllegalArgumentException if the {@code failedExecutionVertexID} refers to an {@code + * ExecutionVertex} that didn't fail. + */ + public RootExceptionHistoryEntry extractLocalFailure( + Map executionJobVertices, + ExecutionVertexID failedExecutionVertexId, + Iterable otherAffectedVertices) { + final ExecutionVertex rootCauseExecutionVertex = + getExecutionVertex(executionJobVertices, failedExecutionVertexId); + + final RootExceptionHistoryEntry root = + createLocalExceptionHistoryEntry( + RootExceptionHistoryEntry::fromLocalFailure, rootCauseExecutionVertex); + + for (ExecutionVertexID otherExecutionVertexId : otherAffectedVertices) { + final ExecutionVertex executionVertex = + getExecutionVertex(executionJobVertices, otherExecutionVertexId); + if (executionVertex.getFailureInfo().isPresent()) { + root.add( + createLocalExceptionHistoryEntry( + ExceptionHistoryEntry::new, executionVertex)); + } + } + + return root; + } + + /** + * Extracts a {@link RootExceptionHistoryEntry} based on the global failure information. + * + * @param executionVertices The {@link ExecutionVertex ExecutionVertices} that are affected by + * the global failure and, if failed as well, would be added as concurrent failures to the + * entry. + * @param rootCause The {@code Throwable} causing the failure. + * @param timestamp The timestamp the failure occurred. + * @return The {@code RootExceptionHistoryEntry}. + * @throws IllegalArgumentException if one of the passed {@code ExecutionVertexID}s cannot be * + * resolved into an {@code ExecutionVertex}. + */ + public RootExceptionHistoryEntry extractGlobalFailure( + Iterable executionVertices, Throwable rootCause, long timestamp) { + final RootExceptionHistoryEntry root = + RootExceptionHistoryEntry.fromGlobalFailure(rootCause, timestamp); + + for (ExecutionVertex executionVertex : executionVertices) { + if (!executionVertex.getFailureInfo().isPresent()) { + continue; + } + + final ExceptionHistoryEntry exceptionHistoryEntry = + createLocalExceptionHistoryEntry(ExceptionHistoryEntry::new, executionVertex); + if (exceptionHistoryEntry != null) { + root.add(exceptionHistoryEntry); + } + } + + return root; + } + + private static ExecutionVertex getExecutionVertex( + Map executionJobVertices, + ExecutionVertexID executionVertexID) { + final ExecutionJobVertex executionJobVertex = + executionJobVertices.get(executionVertexID.getJobVertexId()); + + Preconditions.checkArgument( + executionJobVertex != null, + "The passed ExecutionVertexID does not correspond to any ExecutionJobVertex provided."); + final ExecutionVertex[] executionVertices = executionJobVertex.getTaskVertices(); + + Preconditions.checkArgument( + executionVertices.length > executionVertexID.getSubtaskIndex(), + "The ExecutionJobVertex referred by the passed ExecutionVertexID does not have the right amount of subtasks (expected subtask ID: {}; actual number of subtasks: {}).", + executionVertexID.getSubtaskIndex(), + executionJobVertex.getTaskVertices().length); + return executionVertices[executionVertexID.getSubtaskIndex()]; + } + + private static E createLocalExceptionHistoryEntry( + QuadFunction + exceptionHistoryEntryCreator, + ExecutionVertex executionVertex) { + return executionVertex + .getFailureInfo() + .map( + failureInfo -> + exceptionHistoryEntryCreator.apply( + failureInfo.getException(), + failureInfo.getTimestamp(), + executionVertex.getTaskNameWithSubtaskIndex(), + executionVertex + .getCurrentExecutionAttempt() + .getAssignedResourceLocation())) + .orElseThrow( + () -> + new IllegalArgumentException( + "The selected ExecutionVertex " + + executionVertex.getID() + + " didn't fail.")); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java new file mode 100644 index 0000000000000..400e3d39eae30 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collection; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@code RootExceptionHistoryEntry} extending {@link ExceptionHistoryEntry} by providing a list of + * {@code ExceptionHistoryEntry} instances to store concurrently caught failures. + */ +public class RootExceptionHistoryEntry extends ExceptionHistoryEntry { + + private static final long serialVersionUID = -7647332765867297434L; + + private final Collection concurrentExceptions = new ArrayList<>(); + + /** + * Creates a {@code RootExceptionHistoryEntry} representing a global failure from the passed + * {@code Throwable} and timestamp. + * + * @param cause The reason for the failure. + * @param timestamp The time the failure was caught. + * @return The {@code RootExceptionHistoryEntry} instance. + * @throws NullPointerException if {@code cause} is {@code null}. + * @throws IllegalArgumentException if the passed {@code timestamp} is not bigger than {@code + * 0}. + */ + @VisibleForTesting + public static RootExceptionHistoryEntry fromGlobalFailure(Throwable cause, long timestamp) { + return new RootExceptionHistoryEntry(cause, timestamp, null, null); + } + + /** + * Creates a {@code RootExceptionHistoryEntry} representing a local failure using the passed + * information. + * + * @param cause The reason for the failure. + * @param timestamp The time the failure was caught. + * @param failingTaskName The name of the task that failed. + * @param taskManagerLocation The {@link TaskManagerLocation} the task was running on. + * @return The {@code RootExceptionHistoryEntry} instance. + * @throws NullPointerException if {@code cause} or {@code failingTaskName} are {@code null}. + * @throws IllegalArgumentException if the passed {@code timestamp} is not bigger than {@code + * 0}. + */ + @VisibleForTesting + public static RootExceptionHistoryEntry fromLocalFailure( + Throwable cause, + long timestamp, + String failingTaskName, + @Nullable TaskManagerLocation taskManagerLocation) { + return new RootExceptionHistoryEntry( + cause, timestamp, checkNotNull(failingTaskName), taskManagerLocation); + } + + private RootExceptionHistoryEntry( + Throwable cause, + long timestamp, + @Nullable String failingTaskName, + @Nullable TaskManagerLocation taskManagerLocation) { + super(cause, timestamp, failingTaskName, taskManagerLocation); + } + + void add(ExceptionHistoryEntry concurrentException) { + concurrentExceptions.add(concurrentException); + } + + public Iterable getConcurrentExceptions() { + return concurrentExceptions; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index 925455ed78689..d53be6712c429 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -66,7 +66,7 @@ public void testShutdownCheckpointCoordinatorOnFailure() throws Exception { assertThat(checkpointCoordinator, Matchers.notNullValue()); assertThat(checkpointCoordinator.isShutdown(), is(false)); - graph.failJob(new Exception("Test Exception")); + graph.failJob(new Exception("Test Exception"), System.currentTimeMillis()); assertThat(checkpointCoordinator.isShutdown(), is(true)); assertThat(counterShutdownFuture.get(), is(JobStatus.FAILED)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java index 477e4522f479a..69fd869903f2a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/ExecutionFailureHandlerTest.java @@ -27,7 +27,6 @@ import org.apache.flink.util.IterableUtils; import org.apache.flink.util.TestLogger; -import org.hamcrest.core.IsSame; import org.junit.Before; import org.junit.Test; @@ -35,6 +34,8 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -76,16 +77,18 @@ public void testNormalFailureHandling() { failoverStrategy.setTasksToRestart(tasksToRestart); Exception cause = new Exception("test failure"); + long timestamp = System.currentTimeMillis(); // trigger a task failure final FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult( - new ExecutionVertexID(new JobVertexID(), 0), cause); + new ExecutionVertexID(new JobVertexID(), 0), cause, timestamp); // verify results assertTrue(result.canRestart()); assertEquals(RESTART_DELAY_MS, result.getRestartDelayMS()); assertEquals(tasksToRestart, result.getVerticesToRestart()); - assertThat(result.getError(), IsSame.sameInstance(cause)); + assertThat(result.getError(), is(cause)); + assertThat(result.getTimestamp(), is(timestamp)); assertEquals(1, executionFailureHandler.getNumberOfRestarts()); } @@ -96,13 +99,16 @@ public void testRestartingSuppressedFailureHandlingResult() { backoffTimeStrategy.setCanRestart(false); // trigger a task failure + final Throwable error = new Exception("expected test failure"); + final long timestamp = System.currentTimeMillis(); final FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult( - new ExecutionVertexID(new JobVertexID(), 0), new Exception("test failure")); + new ExecutionVertexID(new JobVertexID(), 0), error, timestamp); // verify results assertFalse(result.canRestart()); - assertNotNull(result.getError()); + assertThat(result.getError(), containsCause(error)); + assertThat(result.getTimestamp(), is(timestamp)); assertFalse(ExecutionFailureHandler.isUnrecoverableError(result.getError())); try { result.getVerticesToRestart(); @@ -123,16 +129,18 @@ public void testRestartingSuppressedFailureHandlingResult() { @Test public void testNonRecoverableFailureHandlingResult() { // trigger an unrecoverable task failure + final Throwable error = + new Exception(new SuppressRestartsException(new Exception("test failure"))); + final long timestamp = System.currentTimeMillis(); final FailureHandlingResult result = executionFailureHandler.getFailureHandlingResult( - new ExecutionVertexID(new JobVertexID(), 0), - new Exception( - new SuppressRestartsException(new Exception("test failure")))); + new ExecutionVertexID(new JobVertexID(), 0), error, timestamp); // verify results assertFalse(result.canRestart()); assertNotNull(result.getError()); assertTrue(ExecutionFailureHandler.isUnrecoverableError(result.getError())); + assertThat(result.getTimestamp(), is(timestamp)); try { result.getVerticesToRestart(); fail("get tasks to restart is not allowed when restarting is suppressed"); @@ -167,15 +175,18 @@ public void testUnrecoverableErrorCheck() { @Test public void testGlobalFailureHandling() { + final Throwable error = new Exception("Expected test failure"); + final long timestamp = System.currentTimeMillis(); final FailureHandlingResult result = - executionFailureHandler.getGlobalFailureHandlingResult( - new Exception("test failure")); + executionFailureHandler.getGlobalFailureHandlingResult(error, timestamp); assertEquals( IterableUtils.toStream(schedulingTopology.getVertices()) .map(SchedulingExecutionVertex::getId) .collect(Collectors.toSet()), result.getVerticesToRestart()); + assertThat(result.getError(), is(error)); + assertThat(result.getTimestamp(), is(timestamp)); } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java index 3ceba5b882890..4746e1b6bd26c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/flip1/FailureHandlingResultTest.java @@ -47,13 +47,16 @@ public void testNormalFailureHandlingResult() { tasks.add(executionVertexID); long delay = 1234; Throwable error = new RuntimeException(); + long timestamp = System.currentTimeMillis(); FailureHandlingResult result = - FailureHandlingResult.restartable(executionVertexID, error, tasks, delay, false); + FailureHandlingResult.restartable( + executionVertexID, error, timestamp, tasks, delay, false); assertTrue(result.canRestart()); assertEquals(delay, result.getRestartDelayMS()); assertEquals(tasks, result.getVerticesToRestart()); assertThat(result.getError(), sameInstance(error)); + assertThat(result.getTimestamp(), is(timestamp)); assertTrue(result.getExecutionVertexIdOfFailedTask().isPresent()); assertThat(result.getExecutionVertexIdOfFailedTask().get(), is(executionVertexID)); } @@ -63,10 +66,13 @@ public void testNormalFailureHandlingResult() { public void testRestartingSuppressedFailureHandlingResultWithNoCausingExecutionVertexId() { // create a FailureHandlingResult with error Throwable error = new Exception("test error"); - FailureHandlingResult result = FailureHandlingResult.unrecoverable(null, error, false); + long timestamp = System.currentTimeMillis(); + FailureHandlingResult result = + FailureHandlingResult.unrecoverable(null, error, timestamp, false); assertFalse(result.canRestart()); - assertEquals(error, result.getError()); + assertThat(result.getError(), sameInstance(error)); + assertThat(result.getTimestamp(), is(timestamp)); assertFalse(result.getExecutionVertexIdOfFailedTask().isPresent()); try { result.getVerticesToRestart(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index dd1e5ab3f30bf..e65eba40d1d26 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -38,11 +38,13 @@ import org.apache.flink.runtime.rest.messages.JobExceptionsInfo; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory; import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.ExceptionInfo; +import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory.RootExceptionInfo; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters; import org.apache.flink.runtime.rest.messages.job.UpperLimitExceptionParameter; -import org.apache.flink.runtime.scheduler.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; +import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -54,9 +56,12 @@ import org.hamcrest.Description; import org.hamcrest.Matcher; import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.hamcrest.collection.IsEmptyIterable; +import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -110,7 +115,7 @@ public void testOnlyRootCause() throws HandlerRequestException { final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo( - ExceptionHistoryEntry.fromGlobalFailure(rootCause, rootCauseTimestamp)); + RootExceptionHistoryEntry.fromGlobalFailure(rootCause, rootCauseTimestamp)); final HandlerRequest request = createRequest(executionGraphInfo.getJobId(), 10); final JobExceptionsInfoWithHistory response = @@ -128,16 +133,15 @@ public void testOnlyRootCause() throws HandlerRequestException { @Test public void testWithExceptionHistory() throws HandlerRequestException { - final ExceptionHistoryEntry rootCause = - ExceptionHistoryEntry.fromGlobalFailure( + final RootExceptionHistoryEntry rootCause = + RootExceptionHistoryEntry.fromGlobalFailure( new RuntimeException("exception #0"), System.currentTimeMillis()); - final ExceptionHistoryEntry otherFailure = - new ExceptionHistoryEntry( + final RootExceptionHistoryEntry otherFailure = + RootExceptionHistoryEntry.fromLocalFailure( new RuntimeException("exception #1"), System.currentTimeMillis(), "task name", - ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation( - new LocalTaskManagerLocation())); + new LocalTaskManagerLocation()); final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(rootCause, otherFailure); @@ -163,16 +167,15 @@ public void testWithExceptionHistory() throws HandlerRequestException { @Test public void testWithExceptionHistoryWithTruncationThroughParameter() throws HandlerRequestException { - final ExceptionHistoryEntry rootCause = - ExceptionHistoryEntry.fromGlobalFailure( + final RootExceptionHistoryEntry rootCause = + RootExceptionHistoryEntry.fromGlobalFailure( new RuntimeException("exception #0"), System.currentTimeMillis()); - final ExceptionHistoryEntry otherFailure = - new ExceptionHistoryEntry( + final RootExceptionHistoryEntry otherFailure = + RootExceptionHistoryEntry.fromLocalFailure( new RuntimeException("exception #1"), System.currentTimeMillis(), "task name", - ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation( - new LocalTaskManagerLocation())); + new LocalTaskManagerLocation()); final ExecutionGraphInfo executionGraphInfo = createExecutionGraphInfo(rootCause, otherFailure); @@ -217,16 +220,14 @@ public void testArchivedTaskManagerLocationFallbackHandling() { @Test public void testArchivedTaskManagerLocationHandling() { - final ExceptionHistoryEntry.ArchivedTaskManagerLocation taskManagerLocation = - ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation( - new LocalTaskManagerLocation()); + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); assertThat( JobExceptionsHandler.toString(taskManagerLocation), is( String.format( "%s:%s", taskManagerLocation.getFQDNHostname(), - taskManagerLocation.getPort()))); + taskManagerLocation.dataPort()))); } @Test @@ -261,14 +262,13 @@ private static ExecutionGraphInfo createAccessExecutionGraph(int numTasks) { final Throwable failureCause = new RuntimeException("root cause"); final long failureTimestamp = System.currentTimeMillis(); - final List exceptionHistory = + final List exceptionHistory = Collections.singletonList( - new ExceptionHistoryEntry( + RootExceptionHistoryEntry.fromLocalFailure( failureCause, failureTimestamp, "test task #1", - ExceptionHistoryEntry.ArchivedTaskManagerLocation - .fromTaskManagerLocation(new LocalTaskManagerLocation()))); + new LocalTaskManagerLocation())); return new ExecutionGraphInfo( new ArchivedExecutionGraphBuilder() .setFailureCause(new ErrorInfo(failureCause, failureTimestamp)) @@ -320,10 +320,10 @@ private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex( // -------- exception history related utility methods for creating the input data -------- private static ExecutionGraphInfo createExecutionGraphInfo( - ExceptionHistoryEntry... historyEntries) { + RootExceptionHistoryEntry... historyEntries) { final ArchivedExecutionGraphBuilder executionGraphBuilder = new ArchivedExecutionGraphBuilder(); - final List historyEntryCollection = new ArrayList<>(); + final List historyEntryCollection = new ArrayList<>(); for (int i = 0; i < historyEntries.length; i++) { if (i == 0) { @@ -359,7 +359,33 @@ private static HandlerRequest // -------- factory methods for instantiating new Matchers -------- - private static Matcher historyContainsJobExceptionInfo( + @SafeVarargs + private static Matcher historyContainsJobExceptionInfo( + Throwable expectedFailureCause, + long expectedFailureTimestamp, + String expectedTaskNameWithSubtaskId, + String expectedTaskManagerLocation, + Matcher... concurrentExceptionMatchers) { + return new RootExceptionInfoMatcher( + matchesFailure( + expectedFailureCause, + expectedFailureTimestamp, + expectedTaskNameWithSubtaskId, + expectedTaskManagerLocation), + concurrentExceptionMatchers); + } + + @SafeVarargs + private static Matcher historyContainsGlobalFailure( + Throwable expectedFailureCause, + long expectedFailureTimestamp, + Matcher... concurrentExceptionMatchers) { + return new RootExceptionInfoMatcher( + matchesFailure(expectedFailureCause, expectedFailureTimestamp, null, null), + concurrentExceptionMatchers); + } + + private static Matcher matchesFailure( Throwable expectedFailureCause, long expectedFailureTimestamp, String expectedTaskNameWithSubtaskId, @@ -371,14 +397,52 @@ private static Matcher historyContainsJobExceptionInfo( expectedTaskManagerLocation); } - private static Matcher historyContainsGlobalFailure( - Throwable expectedFailureCause, long expectedFailureTimestamp) { - return historyContainsJobExceptionInfo( - expectedFailureCause, expectedFailureTimestamp, null, null); - } - // -------- Matcher implementations used in this test class -------- + /** Checks the given {@link RootExceptionInfo} instance. */ + private static class RootExceptionInfoMatcher + extends TypeSafeDiagnosingMatcher { + + private final Matcher rootCauseMatcher; + private final Matcher> concurrentExceptionsMatcher; + + @SafeVarargs + private RootExceptionInfoMatcher( + Matcher rootCauseMatcher, + Matcher... concurrentExceptionsMatchers) { + this.rootCauseMatcher = rootCauseMatcher; + this.concurrentExceptionsMatcher = + concurrentExceptionsMatchers.length == 0 + ? IsEmptyIterable.emptyIterable() + : IsIterableContainingInOrder.contains( + Arrays.asList(concurrentExceptionsMatchers)); + } + + @Override + protected boolean matchesSafely( + RootExceptionInfo rootExceptionInfo, Description description) { + boolean match = true; + if (!rootCauseMatcher.matches(rootExceptionInfo)) { + rootCauseMatcher.describeMismatch(rootExceptionInfo, description); + match = false; + } + + if (!concurrentExceptionsMatcher.matches(rootExceptionInfo.getConcurrentExceptions())) { + concurrentExceptionsMatcher.describeMismatch( + rootExceptionInfo.getConcurrentExceptions(), description); + return false; + } + + return match; + } + + @Override + public void describeTo(Description description) { + rootCauseMatcher.describeTo(description); + concurrentExceptionsMatcher.describeTo(description); + } + } + /** Checks the given {@link ExceptionInfo} instance. */ private static class ExceptionInfoMatcher extends TypeSafeDiagnosingMatcher { @@ -387,7 +451,7 @@ private static class ExceptionInfoMatcher extends TypeSafeDiagnosingMatcher actualExceptionHistory = + final Iterable actualExceptionHistory = scheduler.getExceptionHistory(); assertThat(actualExceptionHistory, IsIterableWithSize.iterableWithSize(1)); - final ExceptionHistoryEntry failure = actualExceptionHistory.iterator().next(); + final RootExceptionHistoryEntry failure = actualExceptionHistory.iterator().next(); assertThat( - failure.getException().deserializeError(ClassLoader.getSystemClassLoader()), - is(expectedException)); - assertThat(failure.getTimestamp(), greaterThanOrEqualTo(start)); - assertThat(failure.getTimestamp(), lessThanOrEqualTo(end)); - assertThat(failure.getTaskManagerLocation(), is(nullValue())); - assertThat(failure.getFailingTaskName(), is(nullValue())); + failure, + ExceptionHistoryEntryMatcher.matchesGlobalFailure( + expectedException, + scheduler.getExecutionGraph().getFailureInfo().getTimestamp())); + assertThat(failure.getConcurrentExceptions(), IsEmptyIterable.emptyIterable()); } @Test @@ -1079,10 +1077,6 @@ public void testExceptionHistoryWithRestartableFailure() { final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); final TestingLogicalSlotBuilder logicalSlotBuilder = new TestingLogicalSlotBuilder(); logicalSlotBuilder.setTaskManagerLocation(taskManagerLocation); - final ExceptionHistoryEntry.ArchivedTaskManagerLocation - expectedArchivedTaskManagerLocation = - ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation( - taskManagerLocation); executionSlotAllocatorFactory = new TestExecutionSlotAllocatorFactory(logicalSlotBuilder); @@ -1095,9 +1089,8 @@ public void testExceptionHistoryWithRestartableFailure() { .requestJob() .getArchivedExecutionGraph() .getAllExecutionVertices()); - final String expectedTaskName = taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(); final RuntimeException restartableException = new RuntimeException("restartable exception"); - Range updateStateTriggeringRestartTimeframe = + final long updateStateTriggeringRestartTimestamp = initiateFailure( scheduler, taskFailureExecutionVertex.getCurrentExecutionAttempt().getAttemptId(), @@ -1117,48 +1110,23 @@ public void testExceptionHistoryWithRestartableFailure() { .getCurrentExecutionAttempt() .getAttemptId(); final RuntimeException failingException = new RuntimeException("failing exception"); - final Range updateStateTriggeringJobFailureTimeframe = + final long updateStateTriggeringJobFailureTimestamp = initiateFailure(scheduler, failingAttemptId, failingException); - final Iterable actualExceptionHistory = + final Iterable actualExceptionHistory = scheduler.getExceptionHistory(); - assertThat(actualExceptionHistory, IsIterableWithSize.iterableWithSize(2)); - final Iterator exceptionHistoryIterator = - actualExceptionHistory.iterator(); // assert restarted attempt - final ExceptionHistoryEntry restartableFailure = exceptionHistoryIterator.next(); assertThat( - restartableFailure - .getException() - .deserializeError(ClassLoader.getSystemClassLoader()), - is(restartableException)); - assertThat( - restartableFailure.getTimestamp(), - greaterThanOrEqualTo(updateStateTriggeringRestartTimeframe.lowerEndpoint())); - assertThat( - restartableFailure.getTimestamp(), - lessThanOrEqualTo(updateStateTriggeringRestartTimeframe.upperEndpoint())); - assertThat(restartableFailure.getFailingTaskName(), is(expectedTaskName)); - assertThat( - restartableFailure.getTaskManagerLocation(), - ExceptionHistoryEntryTest.isArchivedTaskManagerLocation( - expectedArchivedTaskManagerLocation)); - - // assert job failure attempt - final ExceptionHistoryEntry globalFailure = exceptionHistoryIterator.next(); - final Throwable actualException = - globalFailure.getException().deserializeError(ClassLoader.getSystemClassLoader()); - assertThat(actualException, instanceOf(JobException.class)); - assertThat(actualException, FlinkMatchers.containsCause(failingException)); - assertThat( - globalFailure.getTimestamp(), - greaterThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.lowerEndpoint())); - assertThat( - globalFailure.getTimestamp(), - lessThanOrEqualTo(updateStateTriggeringJobFailureTimeframe.upperEndpoint())); - assertThat(globalFailure.getFailingTaskName(), is(nullValue())); - assertThat(globalFailure.getTaskManagerLocation(), is(nullValue())); + actualExceptionHistory, + IsIterableContainingInOrder.contains( + ExceptionHistoryEntryMatcher.matchesFailure( + restartableException, + updateStateTriggeringRestartTimestamp, + taskFailureExecutionVertex.getTaskNameWithSubtaskIndex(), + taskFailureExecutionVertex.getCurrentAssignedResourceLocation()), + ExceptionHistoryEntryMatcher.matchesGlobalFailure( + failingException, updateStateTriggeringJobFailureTimestamp))); } @Test @@ -1176,34 +1144,32 @@ public void testExceptionHistoryTruncation() { .getAllExecutionVertices()) .getCurrentExecutionAttempt() .getAttemptId(); - scheduler.updateTaskExecutionState( - new TaskExecutionState( - attemptId0, ExecutionState.FAILED, new RuntimeException("old exception"))); + initiateFailure(scheduler, attemptId0, new RuntimeException("old exception")); taskRestartExecutor.triggerNonPeriodicScheduledTasks(); - final ExecutionAttemptID attemptId1 = + final ArchivedExecutionVertex executionVertex1 = Iterables.getOnlyElement( - scheduler - .requestJob() - .getArchivedExecutionGraph() - .getAllExecutionVertices()) - .getCurrentExecutionAttempt() - .getAttemptId(); + scheduler + .requestJob() + .getArchivedExecutionGraph() + .getAllExecutionVertices()); final RuntimeException exception = new RuntimeException("relevant exception"); - scheduler.updateTaskExecutionState( - new TaskExecutionState(attemptId1, ExecutionState.FAILED, exception)); + final long relevantTimestamp = + initiateFailure( + scheduler, + executionVertex1.getCurrentExecutionAttempt().getAttemptId(), + exception); + taskRestartExecutor.triggerNonPeriodicScheduledTasks(); - final Iterator entryIterator = - scheduler.getExceptionHistory().iterator(); - assertTrue(entryIterator.hasNext()); assertThat( - entryIterator - .next() - .getException() - .deserializeError(ClassLoader.getSystemClassLoader()), - is(exception)); - assertFalse(entryIterator.hasNext()); + scheduler.getExceptionHistory(), + IsIterableContainingInOrder.contains( + ExceptionHistoryEntryMatcher.matchesFailure( + exception, + relevantTimestamp, + executionVertex1.getTaskNameWithSubtaskIndex(), + executionVertex1.getCurrentAssignedResourceLocation()))); } private static TaskExecutionState createFailedTaskExecutionState( @@ -1212,14 +1178,42 @@ private static TaskExecutionState createFailedTaskExecutionState( executionAttemptID, ExecutionState.FAILED, new Exception("Expected failure cause")); } - private static Range initiateFailure( + private static long initiateFailure( DefaultScheduler scheduler, - ExecutionAttemptID executionAttemptID, + ExecutionAttemptID executionAttemptId, Throwable exception) { - long start = System.currentTimeMillis(); scheduler.updateTaskExecutionState( - new TaskExecutionState(executionAttemptID, ExecutionState.FAILED, exception)); - return Range.closed(start, System.currentTimeMillis()); + new TaskExecutionState(executionAttemptId, ExecutionState.FAILED, exception)); + return getFailureTimestamp(scheduler, executionAttemptId); + } + + private static long getFailureTimestamp( + DefaultScheduler scheduler, ExecutionAttemptID executionAttemptId) { + final ExecutionVertex failedExecutionVertex = + StreamSupport.stream( + scheduler + .getExecutionGraph() + .getAllExecutionVertices() + .spliterator(), + false) + .filter( + v -> + executionAttemptId.equals( + v.getCurrentExecutionAttempt().getAttemptId())) + .findFirst() + .orElseThrow( + () -> + new IllegalArgumentException( + "No ExecutionVertex available for the passed ExecutionAttemptId " + + executionAttemptId)); + return failedExecutionVertex + .getFailureInfo() + .map(ErrorInfo::getTimestamp) + .orElseThrow( + () -> + new IllegalStateException( + "No failure was set for ExecutionVertex having the passed execution " + + executionAttemptId)); } private static JobVertex createVertex(String name, int parallelism) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java deleted file mode 100644 index 141c5c9970ab4..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExceptionHistoryEntryTest.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.scheduler; - -import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; -import org.apache.flink.runtime.execution.ExecutionState; -import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.ErrorInfo; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; -import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.util.TestLogger; - -import org.hamcrest.Description; -import org.hamcrest.Matcher; -import org.hamcrest.TypeSafeDiagnosingMatcher; -import org.junit.Test; - -import javax.annotation.Nullable; - -import java.util.Objects; -import java.util.Optional; - -import static org.apache.flink.runtime.scheduler.ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.CoreMatchers.nullValue; -import static org.junit.Assert.assertThat; - -/** {@code ExceptionHistoryEntryTest} tests the instantiation of {@link ExceptionHistoryEntry}. */ -public class ExceptionHistoryEntryTest extends TestLogger { - - @Test - public void testFromGlobalFailure() { - final Throwable failureCause = new RuntimeException("failure cause"); - final long timestamp = System.currentTimeMillis(); - - final ExceptionHistoryEntry testInstance = - ExceptionHistoryEntry.fromGlobalFailure(failureCause, timestamp); - - assertThat( - testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()), - is(failureCause)); - assertThat(testInstance.getTimestamp(), is(timestamp)); - assertThat(testInstance.getFailingTaskName(), is(nullValue())); - assertThat(testInstance.getTaskManagerLocation(), is(nullValue())); - } - - @Test - public void testFromFailedExecution() { - final Throwable failureCause = new RuntimeException("Expected failure"); - final long failureTimestamp = System.currentTimeMillis(); - final String taskNameWithSubTaskIndex = "task name"; - final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); - - final AccessExecution failedExecution = - new TestingExecution( - new ErrorInfo(failureCause, failureTimestamp), taskManagerLocation); - final ExceptionHistoryEntry testInstance = - ExceptionHistoryEntry.fromFailedExecution( - failedExecution, taskNameWithSubTaskIndex); - - assertThat( - testInstance.getException().deserializeError(ClassLoader.getSystemClassLoader()), - is(failureCause)); - assertThat(testInstance.getTimestamp(), is(failureTimestamp)); - assertThat(testInstance.getFailingTaskName(), is(taskNameWithSubTaskIndex)); - assertThat( - testInstance.getTaskManagerLocation(), - isArchivedTaskManagerLocation(fromTaskManagerLocation(taskManagerLocation))); - } - - @Test(expected = IllegalArgumentException.class) - public void testFromFailedExecutionWithoutFailure() { - final AccessExecution executionWithoutFailure = - new TestingExecution(null, new LocalTaskManagerLocation()); - ExceptionHistoryEntry.fromFailedExecution(executionWithoutFailure, "task name"); - } - - /** - * {@code TestingExecution} mocks {@link AccessExecution} to provide the relevant methods for - * testing {@link ExceptionHistoryEntry#fromFailedExecution(AccessExecution, String)}. - */ - private static class TestingExecution implements AccessExecution { - - private final ErrorInfo failureInfo; - private final TaskManagerLocation taskManagerLocation; - - private TestingExecution( - @Nullable ErrorInfo failureInfo, - @Nullable TaskManagerLocation taskManagerLocation) { - this.failureInfo = failureInfo; - this.taskManagerLocation = taskManagerLocation; - } - - @Override - public Optional getFailureInfo() { - return Optional.ofNullable(failureInfo); - } - - @Override - public TaskManagerLocation getAssignedResourceLocation() { - return taskManagerLocation; - } - - @Override - public long getStateTimestamp(ExecutionState state) { - throw new UnsupportedOperationException("Method should not be triggered."); - } - - @Override - public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() { - throw new UnsupportedOperationException("Method should not be triggered."); - } - - @Override - public int getParallelSubtaskIndex() { - throw new UnsupportedOperationException("Method should not be triggered."); - } - - @Override - public IOMetrics getIOMetrics() { - throw new UnsupportedOperationException("Method should not be triggered."); - } - - @Override - public ExecutionAttemptID getAttemptId() { - throw new UnsupportedOperationException("Method should not be triggered."); - } - - @Override - public int getAttemptNumber() { - throw new UnsupportedOperationException("Method should not be triggered."); - } - - @Override - public long[] getStateTimestamps() { - throw new UnsupportedOperationException("Method should not be triggered."); - } - - @Override - public ExecutionState getState() { - throw new UnsupportedOperationException("Method should not be triggered."); - } - } - - public static Matcher - isArchivedTaskManagerLocation( - ExceptionHistoryEntry.ArchivedTaskManagerLocation actualLocation) { - return new ArchivedTaskManagerLocationMatcher(actualLocation); - } - - private static class ArchivedTaskManagerLocationMatcher - extends TypeSafeDiagnosingMatcher { - - private final ExceptionHistoryEntry.ArchivedTaskManagerLocation expectedLocation; - - public ArchivedTaskManagerLocationMatcher( - ExceptionHistoryEntry.ArchivedTaskManagerLocation expectedLocation) { - this.expectedLocation = expectedLocation; - } - - @Override - protected boolean matchesSafely( - ExceptionHistoryEntry.ArchivedTaskManagerLocation actual, Description description) { - if (actual == null) { - return expectedLocation == null; - } - - boolean match = true; - if (!Objects.equals(actual.getAddress(), expectedLocation.getAddress())) { - description.appendText(" address=").appendText(actual.getAddress()); - match = false; - } - - if (!Objects.equals(actual.getFQDNHostname(), expectedLocation.getFQDNHostname())) { - description.appendText(" FQDNHostname=").appendText(actual.getFQDNHostname()); - match = false; - } - - if (!Objects.equals(actual.getHostname(), expectedLocation.getHostname())) { - description.appendText(" hostname=").appendText(actual.getHostname()); - match = false; - } - - if (!Objects.equals(actual.getResourceID(), expectedLocation.getResourceID())) { - description - .appendText(" resourceID=") - .appendText(actual.getResourceID().toString()); - match = false; - } - - if (!Objects.equals(actual.getPort(), expectedLocation.getPort())) { - description.appendText(" port=").appendText(String.valueOf(actual.getPort())); - match = false; - } - - return match; - } - - @Override - public void describeTo(Description description) { - description.appendText(String.valueOf(expectedLocation)); - } - } -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java index b60f0e2f91a2c..f3b7980d28c94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java @@ -162,7 +162,8 @@ public void testTransitionToFinishedOnFailedExecutionGraph() throws Exception { // transition EG into terminal state, which will notify the Executing state about the // failure (async via the supplied executor) - exec.getExecutionGraph().failJob(new RuntimeException("test failure")); + exec.getExecutionGraph() + .failJob(new RuntimeException("test failure"), System.currentTimeMillis()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java index a97eb141a0858..eb7abf8a5b65f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateTrackingMockExecutionGraph.java @@ -126,7 +126,7 @@ public void cancel() { } @Override - public void failJob(Throwable cause) { + public void failJob(Throwable cause, long timestamp) { transitionToState(JobStatus.FAILING); } @@ -330,7 +330,7 @@ public void incrementRestarts() { } @Override - public void initFailureCause(Throwable t) { + public void initFailureCause(Throwable t, long timestamp) { throw new UnsupportedOperationException(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java index 443137e09454e..9e82ecdbb041d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/StateWithExecutionGraphTest.java @@ -60,7 +60,9 @@ public void testSuspendCanBeCalledWhenExecutionGraphHasReachedGloballyTerminalSt assertThat(archivedExecutionGraph.getState(), is(JobStatus.FAILED))); // transition to FAILED - testingExecutionGraph.failJob(new FlinkException("Transition job to FAILED state")); + testingExecutionGraph.failJob( + new FlinkException("Transition job to FAILED state"), + System.currentTimeMillis()); testingExecutionGraph.completeTerminationFuture(JobStatus.FAILED); assertThat(testingExecutionGraph.getState(), is(JobStatus.FAILED)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ArchivedTaskManagerLocationMatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ArchivedTaskManagerLocationMatcher.java new file mode 100644 index 0000000000000..bc27430129ff0 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ArchivedTaskManagerLocationMatcher.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry.ArchivedTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; + +import java.util.Objects; + +import static org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry.ArchivedTaskManagerLocation.fromTaskManagerLocation; + +/** + * {@code ArchivedTaskManagerLocationMatcher} can be used to match {@link TaskManagerLocation} with + * {@link ArchivedTaskManagerLocation} instances. + */ +class ArchivedTaskManagerLocationMatcher + extends TypeSafeDiagnosingMatcher { + + private final ArchivedTaskManagerLocation expectedLocation; + + public static Matcher isArchivedTaskManagerLocation( + TaskManagerLocation actualLocation) { + return new ArchivedTaskManagerLocationMatcher(actualLocation); + } + + ArchivedTaskManagerLocationMatcher(TaskManagerLocation expectedLocation) { + this(fromTaskManagerLocation(expectedLocation)); + } + + ArchivedTaskManagerLocationMatcher(ArchivedTaskManagerLocation expectedLocation) { + this.expectedLocation = expectedLocation; + } + + @Override + protected boolean matchesSafely(ArchivedTaskManagerLocation actual, Description description) { + if (actual == null) { + return expectedLocation == null; + } else if (expectedLocation == null) { + return false; + } + + boolean match = true; + if (!Objects.equals(actual.getAddress(), expectedLocation.getAddress())) { + description.appendText(" address=").appendText(actual.getAddress()); + match = false; + } + + if (!Objects.equals(actual.getFQDNHostname(), expectedLocation.getFQDNHostname())) { + description.appendText(" FQDNHostname=").appendText(actual.getFQDNHostname()); + match = false; + } + + if (!Objects.equals(actual.getHostname(), expectedLocation.getHostname())) { + description.appendText(" hostname=").appendText(actual.getHostname()); + match = false; + } + + if (!Objects.equals(actual.getResourceID(), expectedLocation.getResourceID())) { + description.appendText(" resourceID=").appendText(actual.getResourceID().toString()); + match = false; + } + + if (!Objects.equals(actual.getPort(), expectedLocation.getPort())) { + description.appendText(" port=").appendText(String.valueOf(actual.getPort())); + match = false; + } + + return match; + } + + @Override + public void describeTo(Description description) { + description.appendText(String.valueOf(expectedLocation)); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractorTest.java new file mode 100644 index 0000000000000..9fd8f33591a5f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryExtractorTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; +import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; +import org.apache.flink.util.TestLogger; + +import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; + +import org.hamcrest.collection.IsIterableContainingInOrder; +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static org.junit.Assert.assertThat; + +/** {@code ExceptionHistoryEntryExtractorTest} tests {@link ExceptionHistoryEntryExtractor}. */ +public class ExceptionHistoryEntryExtractorTest extends TestLogger { + + private final ExceptionHistoryEntryExtractor testInstance = + new ExceptionHistoryEntryExtractor(); + + private ExecutionGraph executionGraph; + + @Before + public void setup() throws JobException, JobExecutionException { + final JobGraph jobGraph = JobGraphTestUtils.singleNoOpJobGraph(); + jobGraph.getVertices().forEach(v -> v.setParallelism(3)); + + executionGraph = + TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).build(); + executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + } + + @Test(expected = IllegalArgumentException.class) + public void testWrongExecutionVertexIdOfFailingVertex() { + testInstance.extractLocalFailure( + executionGraph.getAllVertices(), + new ExecutionVertexID(new JobVertexID(), 0), + Collections.emptyList()); + } + + @Test(expected = IllegalArgumentException.class) + public void testWrongSubtaskIndexOfFailingVertex() { + final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); + triggerFailure(rootExecutionVertex, new RuntimeException("Expected root cause")); + testInstance.extractLocalFailure( + executionGraph.getAllVertices(), + new ExecutionVertexID( + rootExecutionVertex.getID().getJobVertexId(), Integer.MAX_VALUE), + Collections.emptyList()); + } + + @Test(expected = IllegalArgumentException.class) + public void testRootExecutionVertexIdNotFailed() { + final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); + testInstance.extractLocalFailure( + executionGraph.getAllVertices(), + rootExecutionVertex.getID(), + Collections.emptyList()); + } + + @Test(expected = IllegalArgumentException.class) + public void testWrongExecutionVertexIdOfConcurrentlyFailedVertex() { + final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); + triggerFailure(rootExecutionVertex, new RuntimeException("Expected root cause")); + testInstance.extractLocalFailure( + executionGraph.getAllVertices(), + rootExecutionVertex.getID(), + Collections.singleton(new ExecutionVertexID(new JobVertexID(), 0))); + } + + @Test(expected = IllegalArgumentException.class) + public void testWrongSubtaskIndexOfConcurrentlyFailedVertex() { + final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); + triggerFailure(rootExecutionVertex, new RuntimeException("Expected root cause")); + final ExecutionVertex concurrentlyFailedExecutionVertex = extractExecutionVertex(1); + triggerFailure( + concurrentlyFailedExecutionVertex, + new RuntimeException("Expected concurrent failure")); + + testInstance.extractLocalFailure( + executionGraph.getAllVertices(), + rootExecutionVertex.getID(), + Collections.singleton( + new ExecutionVertexID( + concurrentlyFailedExecutionVertex.getJobvertexId(), + Integer.MAX_VALUE))); + } + + @Test + public void extractLocalFailure() { + final Throwable rootException = new RuntimeException("Expected root failure"); + final ExecutionVertex rootExecutionVertex = extractExecutionVertex(0); + final long rootTimestamp = triggerFailure(rootExecutionVertex, rootException); + + final Throwable concurrentException = new IllegalStateException("Expected other failure"); + final ExecutionVertex concurrentlyFailedExecutionVertex = extractExecutionVertex(1); + final long concurrentExceptionTimestamp = + triggerFailure(concurrentlyFailedExecutionVertex, concurrentException); + + final ExecutionVertex notFailedExecutionVertex = extractExecutionVertex(2); + + final RootExceptionHistoryEntry actualEntry = + testInstance.extractLocalFailure( + executionGraph.getAllVertices(), + rootExecutionVertex.getID(), + Arrays.asList( + concurrentlyFailedExecutionVertex.getID(), + notFailedExecutionVertex.getID())); + + assertThat( + actualEntry, + ExceptionHistoryEntryMatcher.matchesFailure( + rootException, + rootTimestamp, + rootExecutionVertex.getTaskNameWithSubtaskIndex(), + rootExecutionVertex.getCurrentAssignedResourceLocation())); + assertThat( + actualEntry.getConcurrentExceptions(), + IsIterableContainingInOrder.contains( + ExceptionHistoryEntryMatcher.matchesFailure( + concurrentException, + concurrentExceptionTimestamp, + concurrentlyFailedExecutionVertex.getTaskNameWithSubtaskIndex(), + concurrentlyFailedExecutionVertex + .getCurrentAssignedResourceLocation()))); + } + + @Test + public void extractGlobalFailure() { + final Throwable concurrentException0 = + new RuntimeException("Expected concurrent failure #0"); + final ExecutionVertex concurrentlyFailedExecutionVertex0 = extractExecutionVertex(0); + final long concurrentExceptionTimestamp0 = + triggerFailure(concurrentlyFailedExecutionVertex0, concurrentException0); + + final Throwable concurrentException1 = + new IllegalStateException("Expected concurrent failure #1"); + final ExecutionVertex concurrentlyFailedExecutionVertex1 = extractExecutionVertex(1); + final long concurrentExceptionTimestamp1 = + triggerFailure(concurrentlyFailedExecutionVertex1, concurrentException1); + + final Throwable rootCause = new Exception("Expected root failure"); + final long rootTimestamp = System.currentTimeMillis(); + final RootExceptionHistoryEntry actualEntry = + testInstance.extractGlobalFailure( + executionGraph.getAllExecutionVertices(), rootCause, rootTimestamp); + + assertThat( + actualEntry, + ExceptionHistoryEntryMatcher.matchesGlobalFailure(rootCause, rootTimestamp)); + assertThat( + actualEntry.getConcurrentExceptions(), + IsIterableContainingInOrder.contains( + ExceptionHistoryEntryMatcher.matchesFailure( + concurrentException0, + concurrentExceptionTimestamp0, + concurrentlyFailedExecutionVertex0.getTaskNameWithSubtaskIndex(), + concurrentlyFailedExecutionVertex0 + .getCurrentAssignedResourceLocation()), + ExceptionHistoryEntryMatcher.matchesFailure( + concurrentException1, + concurrentExceptionTimestamp1, + concurrentlyFailedExecutionVertex1.getTaskNameWithSubtaskIndex(), + concurrentlyFailedExecutionVertex1 + .getCurrentAssignedResourceLocation()))); + } + + private long triggerFailure(ExecutionVertex executionVertex, Throwable throwable) { + executionGraph.updateState( + new TaskExecutionStateTransition( + new TaskExecutionState( + executionVertex.getCurrentExecutionAttempt().getAttemptId(), + ExecutionState.FAILED, + throwable))); + + return executionVertex + .getFailureInfo() + .orElseThrow( + () -> + new IllegalArgumentException( + "The transition into failed state didn't succeed for ExecutionVertex " + + executionVertex.getID() + + ".")) + .getTimestamp(); + } + + private ExecutionVertex extractExecutionVertex(int pos) { + final ExecutionVertex executionVertex = + Iterables.get(executionGraph.getAllExecutionVertices(), pos); + executionVertex.tryAssignResource( + new TestingLogicalSlotBuilder().createTestingLogicalSlot()); + + return executionVertex; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java new file mode 100644 index 0000000000000..d7665057d0519 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/ExceptionHistoryEntryMatcher.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.exceptionhistory; + +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.ExceptionUtils; + +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeDiagnosingMatcher; + +/** Matches {@link ExceptionHistoryEntry} instances. */ +public class ExceptionHistoryEntryMatcher extends TypeSafeDiagnosingMatcher { + + public static Matcher matchesGlobalFailure( + Throwable expectedException, long expectedTimestamp) { + return matchesFailure(expectedException, expectedTimestamp, null, null); + } + + public static Matcher matchesFailure( + Throwable expectedException, + long expectedTimestamp, + String expectedTaskName, + TaskManagerLocation expectedTaskManagerLocation) { + return new ExceptionHistoryEntryMatcher( + expectedException, + expectedTimestamp, + expectedTaskName, + expectedTaskManagerLocation); + } + + private final Throwable expectedException; + private final long expectedTimestamp; + private final String expectedTaskName; + private final ArchivedTaskManagerLocationMatcher taskManagerLocationMatcher; + + public ExceptionHistoryEntryMatcher( + Throwable expectedException, + long expectedTimestamp, + String expectedTaskName, + TaskManagerLocation expectedTaskManagerLocation) { + this.expectedException = expectedException; + this.expectedTimestamp = expectedTimestamp; + this.expectedTaskName = expectedTaskName; + this.taskManagerLocationMatcher = + new ArchivedTaskManagerLocationMatcher(expectedTaskManagerLocation); + } + + @Override + protected boolean matchesSafely( + ExceptionHistoryEntry exceptionHistoryEntry, Description description) { + boolean match = true; + if (!exceptionHistoryEntry + .getException() + .deserializeError(ClassLoader.getSystemClassLoader()) + .equals(expectedException)) { + description + .appendText(" actualException=") + .appendText( + ExceptionUtils.stringifyException( + exceptionHistoryEntry + .getException() + .deserializeError(ClassLoader.getSystemClassLoader()))); + match = false; + } + + if (exceptionHistoryEntry.getTimestamp() != expectedTimestamp) { + description + .appendText(" actualTimestamp=") + .appendText(String.valueOf(exceptionHistoryEntry.getTimestamp())); + match = false; + } + + if (exceptionHistoryEntry.getFailingTaskName() == null) { + if (expectedTaskName != null) { + description.appendText(" actualTaskName=null"); + match = false; + } + } else if (exceptionHistoryEntry.getFailingTaskName().equals(expectedTaskName)) { + description + .appendText(" actualTaskName=") + .appendText(exceptionHistoryEntry.getFailingTaskName()); + match = false; + } + + match |= + taskManagerLocationMatcher.matchesSafely( + exceptionHistoryEntry.getTaskManagerLocation(), description); + + return match; + } + + @Override + public void describeTo(Description description) { + description + .appendText("expectedException=") + .appendText(ExceptionUtils.stringifyException(expectedException)) + .appendText(" expectedTimestamp=") + .appendText(String.valueOf(expectedTimestamp)) + .appendText(" expectedTaskName=") + .appendText(expectedTaskName) + .appendText(" expectedTaskManagerLocation="); + taskManagerLocationMatcher.describeTo(description); + } +}