diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java index 683c1ef290d8a..99517322305ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java @@ -38,7 +38,13 @@ public class ExecutionGraphInfo implements Serializable { private final Iterable exceptionHistory; public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) { - this(executionGraph, Collections.emptyList()); + this( + executionGraph, + executionGraph.getFailureInfo() != null + ? Collections.singleton( + RootExceptionHistoryEntry.fromGlobalFailure( + executionGraph.getFailureInfo())) + : Collections.emptyList()); } public ExecutionGraphInfo( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java index f84b6fcdc01cf..6c0a24403867e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/exceptionhistory/RootExceptionHistoryEntry.java @@ -19,11 +19,14 @@ package org.apache.flink.runtime.scheduler.exceptionhistory; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; import javax.annotation.Nullable; +import java.util.Collections; import java.util.Set; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -84,6 +87,24 @@ public static RootExceptionHistoryEntry fromGlobalFailure( return createRootExceptionHistoryEntry(cause, timestamp, null, null, executions); } + /** + * Creates a {@code RootExceptionHistoryEntry} based on the passed {@link ErrorInfo}. No + * concurrent failures will be added. + * + * @param errorInfo The failure information that shall be used to initialize the {@code + * RootExceptionHistoryEntry}. + * @return The {@code RootExceptionHistoryEntry} instance. + * @throws NullPointerException if {@code errorInfo} is {@code null} or the passed info does not + * contain a {@code Throwable}. + * @throws IllegalArgumentException if the passed {@code timestamp} is not bigger than {@code + * 0}. + */ + public static RootExceptionHistoryEntry fromGlobalFailure(ErrorInfo errorInfo) { + Preconditions.checkNotNull(errorInfo, "errorInfo"); + return fromGlobalFailure( + errorInfo.getException(), errorInfo.getTimestamp(), Collections.emptyList()); + } + private static RootExceptionHistoryEntry createRootExceptionHistoryEntry( Throwable cause, long timestamp, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java index cfce046018c6e..db7faa9fe2a2f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.java @@ -20,30 +20,30 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; +import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.runtime.client.JobInitializationException; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway; import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder; import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder; import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; +import org.apache.flink.util.SerializedThrowable; import org.apache.flink.util.TestLogger; -import org.junit.Test; +import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; + +import org.junit.jupiter.api.Test; import java.time.Duration; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.function.Function; -import static org.apache.flink.core.testutils.FlinkMatchers.containsCause; -import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; -import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally; -import static org.hamcrest.CoreMatchers.is; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link DefaultJobMasterServiceProcess}. */ public class DefaultJobMasterServiceProcessTest extends TestLogger { @@ -63,11 +63,86 @@ public void testInitializationFailureCompletesResultFuture() { final RuntimeException originalCause = new RuntimeException("Init error"); jobMasterServiceFuture.completeExceptionally(originalCause); - assertTrue(serviceProcess.getResultFuture().join().isInitializationFailure()); - final Throwable initializationFailure = - serviceProcess.getResultFuture().join().getInitializationFailure(); - assertThat(initializationFailure, containsCause(JobInitializationException.class)); - assertThat(initializationFailure, containsCause(originalCause)); + final JobManagerRunnerResult actualJobManagerResult = + serviceProcess.getResultFuture().join(); + assertThat(actualJobManagerResult.isInitializationFailure()).isTrue(); + final Throwable initializationFailure = actualJobManagerResult.getInitializationFailure(); + + assertThat(initializationFailure) + .isInstanceOf(JobInitializationException.class) + .hasCause(originalCause); + } + + @Test + public void testInitializationFailureSetsFailureInfoProperly() + throws ExecutionException, InterruptedException { + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); + final RuntimeException originalCause = new RuntimeException("Expected RuntimeException"); + + long beforeFailureTimestamp = System.currentTimeMillis(); + jobMasterServiceFuture.completeExceptionally(originalCause); + long afterFailureTimestamp = System.currentTimeMillis(); + + final JobManagerRunnerResult result = serviceProcess.getResultFuture().get(); + final ErrorInfo executionGraphFailure = + result.getExecutionGraphInfo().getArchivedExecutionGraph().getFailureInfo(); + + assertThat(executionGraphFailure).isNotNull(); + assertInitializationException( + executionGraphFailure.getException(), + originalCause, + executionGraphFailure.getTimestamp(), + beforeFailureTimestamp, + afterFailureTimestamp); + } + + @Test + public void testInitializationFailureSetsExceptionHistoryProperly() + throws ExecutionException, InterruptedException { + final CompletableFuture jobMasterServiceFuture = + new CompletableFuture<>(); + DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); + final RuntimeException originalCause = new RuntimeException("Expected RuntimeException"); + + long beforeFailureTimestamp = System.currentTimeMillis(); + jobMasterServiceFuture.completeExceptionally(originalCause); + long afterFailureTimestamp = System.currentTimeMillis(); + + final RootExceptionHistoryEntry entry = + Iterables.getOnlyElement( + serviceProcess + .getResultFuture() + .get() + .getExecutionGraphInfo() + .getExceptionHistory()); + + assertInitializationException( + entry.getException(), + originalCause, + entry.getTimestamp(), + beforeFailureTimestamp, + afterFailureTimestamp); + assertThat(entry.isGlobal()).isTrue(); + } + + private static void assertInitializationException( + SerializedThrowable actualException, + Throwable expectedCause, + long actualTimestamp, + long expectedLowerTimestampThreshold, + long expectedUpperTimestampThreshold) { + final Throwable deserializedException = + actualException.deserializeError(Thread.currentThread().getContextClassLoader()); + + assertThat(deserializedException) + .isInstanceOf(JobInitializationException.class) + .hasCause(expectedCause); + + assertThat(actualTimestamp) + .isGreaterThanOrEqualTo(expectedLowerTimestampThreshold) + .isLessThanOrEqualTo(expectedUpperTimestampThreshold); } @Test @@ -78,8 +153,9 @@ public void testCloseAfterInitializationFailure() throws Exception { jobMasterServiceFuture.completeExceptionally(new RuntimeException("Init error")); serviceProcess.closeAsync().get(); - assertTrue(serviceProcess.getResultFuture().join().isInitializationFailure()); - assertThat(serviceProcess.getJobMasterGatewayFuture().isCompletedExceptionally(), is(true)); + assertThat(serviceProcess.getResultFuture()) + .isCompletedWithValueMatching(JobManagerRunnerResult::isInitializationFailure); + assertThat(serviceProcess.getJobMasterGatewayFuture()).isCompletedExceptionally(); } @Test @@ -91,10 +167,12 @@ public void testCloseAfterInitializationSuccess() throws Exception { jobMasterServiceFuture.complete(testingJobMasterService); serviceProcess.closeAsync().get(); - assertThat(testingJobMasterService.isClosed(), is(true)); - assertThat( - serviceProcess.getResultFuture(), - futureWillCompleteExceptionally(JobNotFinishedException.class, TIMEOUT)); + assertThat(testingJobMasterService.isClosed()).isTrue(); + assertThat(serviceProcess.getResultFuture()) + .failsWithin(TIMEOUT) + .withThrowableOfType(ExecutionException.class) + .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE) + .anySatisfy(t -> assertThat(t).isInstanceOf(JobNotFinishedException.class)); } @Test @@ -110,17 +188,15 @@ public void testJobMasterTerminationIsHandled() { RuntimeException testException = new RuntimeException("Fake exception from JobMaster"); jobMasterTerminationFuture.completeExceptionally(testException); - try { - serviceProcess.getResultFuture().get(); - fail("Expect failure"); - } catch (Throwable t) { - assertThat(t, containsCause(RuntimeException.class)); - assertThat(t, containsMessage(testException.getMessage())); - } + assertThat(serviceProcess.getResultFuture()) + .failsWithin(TIMEOUT) + .withThrowableOfType(ExecutionException.class) + .extracting(FlinkAssertions::chainOfCauses, FlinkAssertions.STREAM_THROWABLE) + .anySatisfy(t -> assertThat(t).isEqualTo(testException)); } @Test - public void testJobMasterGatewayGetsForwarded() throws Exception { + public void testJobMasterGatewayGetsForwarded() { final CompletableFuture jobMasterServiceFuture = new CompletableFuture<>(); DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture); @@ -129,7 +205,7 @@ public void testJobMasterGatewayGetsForwarded() throws Exception { new TestingJobMasterService("localhost", null, testingGateway); jobMasterServiceFuture.complete(testingJobMasterService); - assertThat(serviceProcess.getJobMasterGatewayFuture().get(), is(testingGateway)); + assertThat(serviceProcess.getJobMasterGatewayFuture()).isCompletedWithValue(testingGateway); } @Test @@ -142,14 +218,14 @@ public void testLeaderAddressGetsForwarded() throws Exception { new TestingJobMasterService(testingAddress, null, null); jobMasterServiceFuture.complete(testingJobMasterService); - assertThat(serviceProcess.getLeaderAddressFuture().get(), is(testingAddress)); + assertThat(serviceProcess.getLeaderAddressFuture()).isCompletedWithValue(testingAddress); } @Test public void testIsNotInitialized() { DefaultJobMasterServiceProcess serviceProcess = createTestInstance(new CompletableFuture<>()); - assertThat(serviceProcess.isInitializedAndRunning(), is(false)); + assertThat(serviceProcess.isInitializedAndRunning()).isFalse(); } @Test @@ -160,7 +236,7 @@ public void testIsInitialized() { jobMasterServiceFuture.complete(new TestingJobMasterService()); - assertThat(serviceProcess.isInitializedAndRunning(), is(true)); + assertThat(serviceProcess.isInitializedAndRunning()).isTrue(); } @Test @@ -173,7 +249,7 @@ public void testIsNotInitializedAfterClosing() { serviceProcess.closeAsync(); - assertFalse(serviceProcess.isInitializedAndRunning()); + assertThat(serviceProcess.isInitializedAndRunning()).isFalse(); } @Test @@ -188,15 +264,12 @@ public void testSuccessOnTerminalState() throws Exception { serviceProcess.jobReachedGloballyTerminalState( new ExecutionGraphInfo(archivedExecutionGraph)); - assertThat(serviceProcess.getResultFuture().get().isSuccess(), is(true)); - assertThat( - serviceProcess - .getResultFuture() - .get() - .getExecutionGraphInfo() - .getArchivedExecutionGraph() - .getState(), - is(JobStatus.FINISHED)); + assertThat(serviceProcess.getResultFuture()) + .isCompletedWithValueMatching(JobManagerRunnerResult::isSuccess) + .isCompletedWithValueMatching( + r -> + r.getExecutionGraphInfo().getArchivedExecutionGraph().getState() + == JobStatus.FINISHED); } private DefaultJobMasterServiceProcess createTestInstance( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java new file mode 100644 index 0000000000000..ea5cb9d6a7720 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfoTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** {@code ExecutionGraphInfoTest} tests the proper initialization of {@link ExecutionGraphInfo}. */ +public class ExecutionGraphInfoTest { + + @Test + public void testExecutionGraphHistoryBeingDerivedFromFailedExecutionGraph() { + final ArchivedExecutionGraph executionGraph = + ArchivedExecutionGraph.createFromInitializingJob( + new JobID(), + "test job name", + JobStatus.FAILED, + new RuntimeException("Expected RuntimeException"), + null, + System.currentTimeMillis()); + + final ExecutionGraphInfo executionGraphInfo = new ExecutionGraphInfo(executionGraph); + + final ErrorInfo failureInfo = + executionGraphInfo.getArchivedExecutionGraph().getFailureInfo(); + + final RootExceptionHistoryEntry actualEntry = + Iterables.getOnlyElement(executionGraphInfo.getExceptionHistory()); + + assertThat(failureInfo).isNotNull(); + assertThat(failureInfo.getException()).isEqualTo(actualEntry.getException()); + assertThat(failureInfo.getTimestamp()).isEqualTo(actualEntry.getTimestamp()); + + assertThat(actualEntry.isGlobal()).isTrue(); + assertThat(actualEntry.getFailingTaskName()).isNull(); + assertThat(actualEntry.getTaskManagerLocation()).isNull(); + } +}