Skip to content

Commit

Permalink
[FLINK-32843][JUnit5 Migration] Migrate the jobmaster package of flin…
Browse files Browse the repository at this point in the history
…k-runtime module to JUnit5
  • Loading branch information
RocMarshal committed May 6, 2024
1 parent 80af4d5 commit beb0b16
Show file tree
Hide file tree
Showing 28 changed files with 520 additions and 639 deletions.
Expand Up @@ -28,7 +28,6 @@
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.rules.ExternalResource;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -72,41 +71,6 @@ public ComponentMainThreadExecutor getMainThreadExecutor() {
return mainThreadExecutor;
}

/** Test resource for convenience. */
public static class Resource extends ExternalResource {

private long shutdownTimeoutMillis;
private TestingComponentMainThreadExecutor componentMainThreadTestExecutor;
private ScheduledExecutorService innerExecutorService;

public Resource() {
this(500L);
}

public Resource(long shutdownTimeoutMillis) {
this.shutdownTimeoutMillis = shutdownTimeoutMillis;
}

@Override
protected void before() {
this.innerExecutorService = Executors.newSingleThreadScheduledExecutor();
this.componentMainThreadTestExecutor =
new TestingComponentMainThreadExecutor(
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
innerExecutorService));
}

@Override
protected void after() {
ExecutorUtils.gracefulShutdown(
shutdownTimeoutMillis, TimeUnit.MILLISECONDS, innerExecutorService);
}

public TestingComponentMainThreadExecutor getComponentMainThreadTestExecutor() {
return componentMainThreadTestExecutor;
}
}

/** Test extension for convenience. */
public static class Extension implements BeforeAllCallback, AfterAllCallback {
private final long shutdownTimeoutMillis;
Expand Down
Expand Up @@ -20,9 +20,8 @@
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.taskexecutor.ExecutionDeploymentReport;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -34,15 +33,13 @@

import static org.apache.flink.runtime.clusterframework.types.ResourceID.generate;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.core.IsCollectionContaining.hasItem;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link DefaultExecutionDeploymentReconciler}. */
public class DefaultExecutionDeploymentReconcilerTest extends TestLogger {
class DefaultExecutionDeploymentReconcilerTest {

@Test
public void testMatchingDeployments() {
void testMatchingDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -57,12 +54,12 @@ public void testMatchingDeployments() {
new ExecutionDeploymentReport(Collections.singleton(attemptId)),
Collections.singletonMap(attemptId, ExecutionDeploymentState.DEPLOYED));

assertThat(handler.getMissingExecutions(), empty());
assertThat(handler.getUnknownExecutions(), empty());
assertThat(handler.getMissingExecutions()).isEmpty();
assertThat(handler.getUnknownExecutions()).isEmpty();
}

@Test
public void testMissingDeployments() {
void testMissingDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -77,12 +74,12 @@ public void testMissingDeployments() {
new ExecutionDeploymentReport(Collections.emptySet()),
Collections.singletonMap(attemptId, ExecutionDeploymentState.DEPLOYED));

assertThat(handler.getUnknownExecutions(), empty());
assertThat(handler.getMissingExecutions(), hasItem(attemptId));
assertThat(handler.getUnknownExecutions()).isEmpty();
assertThat(handler.getMissingExecutions()).contains(attemptId);
}

@Test
public void testUnknownDeployments() {
void testUnknownDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -97,12 +94,12 @@ public void testUnknownDeployments() {
new ExecutionDeploymentReport(Collections.singleton(attemptId)),
Collections.emptyMap());

assertThat(handler.getMissingExecutions(), empty());
assertThat(handler.getUnknownExecutions(), hasItem(attemptId));
assertThat(handler.getMissingExecutions()).isEmpty();
assertThat(handler.getUnknownExecutions()).contains(attemptId);
}

@Test
public void testMissingAndUnknownDeployments() {
void testMissingAndUnknownDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -120,12 +117,12 @@ public void testMissingAndUnknownDeployments() {
Stream.of(missingId, matchingId)
.collect(Collectors.toMap(x -> x, x -> ExecutionDeploymentState.DEPLOYED)));

assertThat(handler.getMissingExecutions(), hasItem(missingId));
assertThat(handler.getUnknownExecutions(), hasItem(unknownId));
assertThat(handler.getMissingExecutions()).contains(missingId);
assertThat(handler.getUnknownExecutions()).contains(unknownId);
}

@Test
public void testPendingDeployments() {
void testPendingDeployments() {
TestingExecutionDeploymentReconciliationHandler handler =
new TestingExecutionDeploymentReconciliationHandler();

Expand All @@ -143,8 +140,8 @@ public void testPendingDeployments() {
Stream.of(matchingId, missingId)
.collect(Collectors.toMap(x -> x, x -> ExecutionDeploymentState.PENDING)));

assertThat(handler.getMissingExecutions(), empty());
assertThat(handler.getUnknownExecutions(), hasItem(unknownId));
assertThat(handler.getMissingExecutions()).isEmpty();
assertThat(handler.getUnknownExecutions()).contains(unknownId);
}

private static class TestingExecutionDeploymentReconciliationHandler
Expand Down
Expand Up @@ -19,42 +19,34 @@

import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link DefaultExecutionDeploymentTracker}. */
public class DefaultExecutionDeploymentTrackerTest extends TestLogger {
class DefaultExecutionDeploymentTrackerTest {

@Test
public void testStartTracking() {
void testStartTracking() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId1 = createExecutionAttemptId();
final ResourceID resourceId1 = ResourceID.generate();
tracker.startTrackingPendingDeploymentOf(attemptId1, resourceId1);

assertThat(
tracker.getExecutionsOn(resourceId1),
hasEntry(attemptId1, ExecutionDeploymentState.PENDING));
assertThat(tracker.getExecutionsOn(resourceId1))
.containsEntry(attemptId1, ExecutionDeploymentState.PENDING);

tracker.completeDeploymentOf(attemptId1);

assertThat(
tracker.getExecutionsOn(resourceId1),
hasEntry(attemptId1, ExecutionDeploymentState.DEPLOYED));
assertThat(tracker.getExecutionsOn(resourceId1))
.containsEntry(attemptId1, ExecutionDeploymentState.DEPLOYED);
}

@Test
public void testStopTrackingCompletedDeployment() {
void testStopTrackingCompletedDeployment() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId1 = createExecutionAttemptId();
Expand All @@ -65,11 +57,11 @@ public void testStopTrackingCompletedDeployment() {

tracker.stopTrackingDeploymentOf(attemptId1);

assertThat(tracker.getExecutionsOn(resourceId1).entrySet(), empty());
assertThat(tracker.getExecutionsOn(resourceId1).entrySet()).isEmpty();
}

@Test
public void testStopTrackingPendingDeployment() {
void testStopTrackingPendingDeployment() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId1 = createExecutionAttemptId();
Expand All @@ -78,11 +70,11 @@ public void testStopTrackingPendingDeployment() {

tracker.stopTrackingDeploymentOf(attemptId1);

assertThat(tracker.getExecutionsOn(resourceId1).entrySet(), empty());
assertThat(tracker.getExecutionsOn(resourceId1).entrySet()).isEmpty();
}

@Test
public void testStopTrackingDoesNotAffectOtherIds() {
void testStopTrackingDoesNotAffectOtherIds() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId1 = createExecutionAttemptId();
Expand All @@ -92,30 +84,28 @@ public void testStopTrackingDoesNotAffectOtherIds() {

tracker.stopTrackingDeploymentOf(createExecutionAttemptId());

assertThat(tracker.getExecutionsOn(resourceId1), hasKey(attemptId1));
assertThat(tracker.getExecutionsOn(resourceId1)).containsKey(attemptId1);
}

@Test
public void testCompleteDeploymentUnknownExecutionDoesNotThrowException() {
void testCompleteDeploymentUnknownExecutionDoesNotThrowException() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

tracker.completeDeploymentOf(createExecutionAttemptId());
}

@Test
public void testStopTrackingUnknownExecutionDoesNotThrowException() {
void testStopTrackingUnknownExecutionDoesNotThrowException() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

final ExecutionAttemptID attemptId2 = createExecutionAttemptId();
tracker.stopTrackingDeploymentOf(attemptId2);
}

@Test
public void testGetExecutionsReturnsEmptySetForUnknownHost() {
void testGetExecutionsReturnsEmptySetForUnknownHost() {
final DefaultExecutionDeploymentTracker tracker = new DefaultExecutionDeploymentTracker();

assertThat(
tracker.getExecutionsOn(ResourceID.generate()).entrySet(),
allOf(notNullValue(), empty()));
assertThat(tracker.getExecutionsOn(ResourceID.generate()).entrySet()).isEmpty();
}
}
Expand Up @@ -205,7 +205,7 @@ void testJobMasterGatewayGetsForwarded() {
}

@Test
void testLeaderAddressGetsForwarded() throws Exception {
void testLeaderAddressGetsForwarded() {
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
Expand Down Expand Up @@ -249,7 +249,7 @@ void testIsNotInitializedAfterClosing() {
}

@Test
void testSuccessOnTerminalState() throws Exception {
void testSuccessOnTerminalState() {
final CompletableFuture<JobMasterService> jobMasterServiceFuture =
new CompletableFuture<>();
DefaultJobMasterServiceProcess serviceProcess = createTestInstance(jobMasterServiceFuture);
Expand Down
Expand Up @@ -43,10 +43,10 @@

/** Tests for the {@link DefaultSlotPoolServiceSchedulerFactory}. */
@ExtendWith(TestLoggerExtension.class)
public class DefaultSlotPoolServiceSchedulerFactoryTest {
class DefaultSlotPoolServiceSchedulerFactoryTest {

@Test
public void testFallsBackToDefaultSchedulerIfAdaptiveSchedulerInBatchJob() {
void testFallsBackToDefaultSchedulerIfAdaptiveSchedulerInBatchJob() {
final Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive);

Expand All @@ -61,7 +61,7 @@ public void testFallsBackToDefaultSchedulerIfAdaptiveSchedulerInBatchJob() {
}

@Test
public void testAdaptiveSchedulerForReactiveMode() {
void testAdaptiveSchedulerForReactiveMode() {
final Configuration configuration = new Configuration();
configuration.set(JobManagerOptions.SCHEDULER_MODE, SchedulerExecutionMode.REACTIVE);

Expand All @@ -76,7 +76,7 @@ public void testAdaptiveSchedulerForReactiveMode() {
}

@Test
public void testFallBackSchedulerWithAdaptiveSchedulerTestProperty() {
void testFallBackSchedulerWithAdaptiveSchedulerTestProperty() {
String propertyValue = saveAdaptiveSchedulerTestPropertyValue();
System.setProperty("flink.tests.enable-adaptive-scheduler", "true");

Expand All @@ -102,7 +102,7 @@ public void testFallBackSchedulerWithAdaptiveSchedulerTestProperty() {
}

@Test
public void testFallBackSchedulerWithoutAdaptiveSchedulerTestProperty() {
void testFallBackSchedulerWithoutAdaptiveSchedulerTestProperty() {
String propertyValue = saveAdaptiveSchedulerTestPropertyValue();
System.clearProperty("flink.tests.enable-adaptive-scheduler");

Expand Down
Expand Up @@ -26,25 +26,23 @@
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.concurrent.CompletableFuture;

import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.assertj.core.api.Assertions.assertThat;

/** Integration tests for job scheduling. */
public class JobExecutionITCase extends TestLogger {
class JobExecutionITCase {

/**
* Tests that tasks with a co-location constraint are scheduled in the same slots. In fact it
* also tests that consumers are scheduled wrt their input location if the co-location
* constraint is deactivated.
*/
@Test
public void testCoLocationConstraintJobExecution() throws Exception {
void testCoLocationConstraintJobExecution() throws Exception {
final int numSlotsPerTaskExecutor = 1;
final int numTaskExecutors = 3;
final int parallelism = numTaskExecutors * numSlotsPerTaskExecutor;
Expand All @@ -66,7 +64,7 @@ public void testCoLocationConstraintJobExecution() throws Exception {
final CompletableFuture<JobResult> jobResultFuture =
miniCluster.requestJobResult(jobGraph.getJobID());

assertThat(jobResultFuture.get().isSuccess(), is(true));
assertThat(jobResultFuture.get().isSuccess()).isTrue();
}
}

Expand Down

0 comments on commit beb0b16

Please sign in to comment.