Skip to content

Commit

Permalink
[review] Migrated to assertj for DefaultJobMasterServiceProcessTest
Browse files Browse the repository at this point in the history
  • Loading branch information
XComp committed Dec 2, 2021
1 parent dce53b3 commit 98d52a6
Showing 1 changed file with 134 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,20 @@

import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;

import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.CompletableFutureAssert;
import org.assertj.core.api.ObjectAssert;
import org.junit.Test;

import java.time.Duration;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;

import static org.apache.flink.core.testutils.FlinkMatchers.containsCause;
import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage;
import static org.apache.flink.core.testutils.FlinkMatchers.futureWillCompleteExceptionally;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for the {@link DefaultJobMasterServiceProcess}. */
public class DefaultJobMasterServiceProcessTest extends TestLogger {
Expand All @@ -71,11 +68,103 @@ public void testInitializationFailureCompletesResultFuture() {
final RuntimeException originalCause = new RuntimeException("Init error");
jobMasterServiceFuture.completeExceptionally(originalCause);

assertTrue(serviceProcess.getResultFuture().join().isInitializationFailure());
final Throwable initializationFailure =
serviceProcess.getResultFuture().join().getInitializationFailure();
assertThat(initializationFailure, containsCause(JobInitializationException.class));
assertThat(initializationFailure, containsCause(originalCause));
final JobManagerRunnerResult actualJobManagerResult =
serviceProcess.getResultFuture().join();
assertThat(actualJobManagerResult.isInitializationFailure()).isTrue();
final Throwable initializationFailure = actualJobManagerResult.getInitializationFailure();

assertThat(initializationFailure)
.satisfies(
t -> {
assertThat(
ExceptionUtils.findThrowable(
t, JobInitializationException.class)
.isPresent())
.isTrue();
});
CauseTreeContainsAssert.assertThat(initializationFailure)
.containsCause(JobInitializationException.class);
CauseTreeContainsAssert.assertThat(initializationFailure).containsCause(originalCause);
}

private static class CauseTreeContainsAssert
extends AbstractThrowableAssert<CauseTreeContainsAssert, Throwable> {

public static CauseTreeContainsAssert assertThat(Throwable actualThrowable) {
return new CauseTreeContainsAssert(actualThrowable);
}

private CauseTreeContainsAssert(Throwable actualThrowable) {
super(actualThrowable, CauseTreeContainsAssert.class);
}

public CauseTreeContainsAssert containsCause(Throwable expectedCause) {
return containsCause(
"The expected cause " + expectedCause + " was not found in the cause tree.",
t -> t.equals(expectedCause));
}

public CauseTreeContainsAssert containsCause(
Class<? extends Throwable> expectedCauseClass) {
return containsCause(
"No " + expectedCauseClass.getCanonicalName() + " was found in the cause tree.",
t -> t.getClass().equals(expectedCauseClass));
}

public CauseTreeContainsAssert containsCause(Predicate<Throwable> throwablePredicate) {
return containsCause(
"The passed Throwable didn't match the expectations.", throwablePredicate);
}

public CauseTreeContainsAssert containsCauseWithMessage(String expectedErrorMessage) {
return containsCause(
"The passed Throwable didn't contain any cause with the given error message: "
+ expectedErrorMessage,
t -> t.getMessage().equals(expectedErrorMessage));
}

public CauseTreeContainsAssert containsCause(
String errorMessage, Predicate<Throwable> throwablePredicate) {
isNotNull();

final Optional<Throwable> foundCause =
ExceptionUtils.findThrowable(actual, throwablePredicate);
if (!foundCause.isPresent()) {
failWithMessage(errorMessage, throwablePredicate);
}

return this;
}
}

private static class FlinkCompletableFutureAssert<T> extends CompletableFutureAssert<T> {

public static <T> FlinkCompletableFutureAssert<T> assertThat(CompletableFuture<T> actual) {
return new FlinkCompletableFutureAssert<>(actual);
}

private FlinkCompletableFutureAssert(CompletableFuture<T> actual) {
super(actual);
}

public CauseTreeContainsAssert failsWithThrowable(Duration duration) {
this.failsWithin(duration);
return failsImmediately();
}

public CauseTreeContainsAssert failsImmediately() {
try {
actual.get();
} catch (Throwable t) {
return new CauseTreeContainsAssert(t);
}

throw failure("The CompletableFuture didn't fail.");
}

public ObjectAssert<T> succeedsImmediately() {
return super.succeedsWithin(Duration.ZERO);
}
}

@Test
Expand All @@ -94,7 +183,7 @@ public void testInitializationFailureSetsFailureInfoProperly()
final ErrorInfo executionGraphFailure =
result.getExecutionGraphInfo().getArchivedExecutionGraph().getFailureInfo();

org.assertj.core.api.Assertions.assertThat(executionGraphFailure).isNotNull();
assertThat(executionGraphFailure).isNotNull();
assertInitializationException(
executionGraphFailure.getException(),
originalCause,
Expand Down Expand Up @@ -129,7 +218,7 @@ public void testInitializationFailureSetsExceptionHistoryProperly()
entry.getTimestamp(),
beforeFailureTimestamp,
afterFailureTimestamp);
org.assertj.core.api.Assertions.assertThat(entry.isGlobal()).isTrue();
assertThat(entry.isGlobal()).isTrue();
}

private static void assertInitializationException(
Expand All @@ -141,17 +230,17 @@ private static void assertInitializationException(
final Throwable deserializedException =
actualException.deserializeError(Thread.currentThread().getContextClassLoader());

org.assertj.core.api.Assertions.assertThat(deserializedException)
assertThat(deserializedException)
.isOfAnyClassIn(JobInitializationException.class)
.satisfies(
e ->
org.assertj.core.api.Assertions.assertThat(
assertThat(
ExceptionUtils.findThrowable(
e, t -> t.equals(expectedCause))
.isPresent())
.isTrue());

org.assertj.core.api.Assertions.assertThat(actualTimestamp)
assertThat(actualTimestamp)
.isGreaterThanOrEqualTo(expectedLowerTimestampThreshold)
.isLessThanOrEqualTo(expectedUpperTimestampThreshold);
}
Expand All @@ -164,8 +253,8 @@ public void testCloseAfterInitializationFailure() throws Exception {
jobMasterServiceFuture.completeExceptionally(new RuntimeException("Init error"));

serviceProcess.closeAsync().get();
assertTrue(serviceProcess.getResultFuture().join().isInitializationFailure());
assertThat(serviceProcess.getJobMasterGatewayFuture().isCompletedExceptionally(), is(true));
assertThat(serviceProcess.getResultFuture().join().isInitializationFailure()).isTrue();
assertThat(serviceProcess.getJobMasterGatewayFuture().isCompletedExceptionally()).isTrue();
}

@Test
Expand All @@ -177,10 +266,10 @@ public void testCloseAfterInitializationSuccess() throws Exception {
jobMasterServiceFuture.complete(testingJobMasterService);

serviceProcess.closeAsync().get();
assertThat(testingJobMasterService.isClosed(), is(true));
assertThat(
serviceProcess.getResultFuture(),
futureWillCompleteExceptionally(JobNotFinishedException.class, TIMEOUT));
assertThat(testingJobMasterService.isClosed()).isTrue();
FlinkCompletableFutureAssert.assertThat(serviceProcess.getResultFuture())
.failsWithThrowable(TIMEOUT)
.containsCause(JobNotFinishedException.class);
}

@Test
Expand All @@ -196,13 +285,10 @@ public void testJobMasterTerminationIsHandled() {
RuntimeException testException = new RuntimeException("Fake exception from JobMaster");
jobMasterTerminationFuture.completeExceptionally(testException);

try {
serviceProcess.getResultFuture().get();
fail("Expect failure");
} catch (Throwable t) {
assertThat(t, containsCause(RuntimeException.class));
assertThat(t, containsMessage(testException.getMessage()));
}
FlinkCompletableFutureAssert.assertThat(serviceProcess.getResultFuture())
.failsImmediately()
.containsCause(RuntimeException.class)
.containsCauseWithMessage(testException.getMessage());
}

@Test
Expand All @@ -215,7 +301,7 @@ public void testJobMasterGatewayGetsForwarded() throws Exception {
new TestingJobMasterService("localhost", null, testingGateway);
jobMasterServiceFuture.complete(testingJobMasterService);

assertThat(serviceProcess.getJobMasterGatewayFuture().get(), is(testingGateway));
assertThat(serviceProcess.getJobMasterGatewayFuture()).isCompletedWithValue(testingGateway);
}

@Test
Expand All @@ -228,14 +314,14 @@ public void testLeaderAddressGetsForwarded() throws Exception {
new TestingJobMasterService(testingAddress, null, null);
jobMasterServiceFuture.complete(testingJobMasterService);

assertThat(serviceProcess.getLeaderAddressFuture().get(), is(testingAddress));
assertThat(serviceProcess.getLeaderAddressFuture()).isCompletedWithValue(testingAddress);
}

@Test
public void testIsNotInitialized() {
DefaultJobMasterServiceProcess serviceProcess =
createTestInstance(new CompletableFuture<>());
assertThat(serviceProcess.isInitializedAndRunning(), is(false));
assertThat(serviceProcess.isInitializedAndRunning()).isFalse();
}

@Test
Expand All @@ -246,7 +332,7 @@ public void testIsInitialized() {

jobMasterServiceFuture.complete(new TestingJobMasterService());

assertThat(serviceProcess.isInitializedAndRunning(), is(true));
assertThat(serviceProcess.isInitializedAndRunning()).isTrue();
}

@Test
Expand All @@ -259,7 +345,7 @@ public void testIsNotInitializedAfterClosing() {

serviceProcess.closeAsync();

assertFalse(serviceProcess.isInitializedAndRunning());
assertThat(serviceProcess.isInitializedAndRunning()).isFalse();
}

@Test
Expand All @@ -274,15 +360,17 @@ public void testSuccessOnTerminalState() throws Exception {
serviceProcess.jobReachedGloballyTerminalState(
new ExecutionGraphInfo(archivedExecutionGraph));

assertThat(serviceProcess.getResultFuture().get().isSuccess(), is(true));
assertThat(
serviceProcess
.getResultFuture()
.get()
.getExecutionGraphInfo()
.getArchivedExecutionGraph()
.getState(),
is(JobStatus.FINISHED));
FlinkCompletableFutureAssert.assertThat(serviceProcess.getResultFuture())
.succeedsImmediately()
.extracting(JobManagerRunnerResult::isSuccess)
.isEqualTo(true);

FlinkCompletableFutureAssert.assertThat(serviceProcess.getResultFuture())
.succeedsImmediately()
.extracting(JobManagerRunnerResult::getExecutionGraphInfo)
.extracting(ExecutionGraphInfo::getArchivedExecutionGraph)
.extracting(ArchivedExecutionGraph::getState)
.isEqualTo(JobStatus.FINISHED);
}

private DefaultJobMasterServiceProcess createTestInstance(
Expand Down

0 comments on commit 98d52a6

Please sign in to comment.