From 3b93a6f7e438c3f051e2b30d9761fae955e3dc72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=A2=93=E7=AB=8B?= Date: Mon, 6 Aug 2018 16:09:43 +0800 Subject: [PATCH 1/2] [hotfix] loose dependency to mockito --- .../flink/yarn/YarnResourceManagerTest.java | 242 +++++++++++++----- 1 file changed, 184 insertions(+), 58 deletions(-) diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index eb8e96846e914..347a8d1491973 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -67,16 +67,19 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; @@ -106,7 +109,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * General tests for the YARN resource manager component. @@ -115,21 +117,28 @@ public class YarnResourceManagerTest extends TestLogger { private static final Time TIMEOUT = Time.seconds(10L); - private Configuration flinkConfig = new Configuration(); + private Configuration flinkConfig; - private Map env = new HashMap<>(); + private Map env; + + private TestingFatalErrorHandler testingFatalErrorHandler; @Rule public TemporaryFolder folder = new TemporaryFolder(); @Before public void setup() { + testingFatalErrorHandler = new TestingFatalErrorHandler(); + + flinkConfig = new Configuration(); flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100); + File root = folder.getRoot(); File home = new File(root, "home"); boolean created = home.mkdir(); assertTrue(created); + env = new HashMap<>(); env.put(ENV_APP_ID, "foo"); env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath()); env.put(ENV_CLIENT_SHIP_FILES, ""); @@ -139,15 +148,22 @@ public void setup() { } @After - public void teardown() { - env.clear(); + public void teardown() throws Exception { + if (testingFatalErrorHandler != null) { + testingFatalErrorHandler.rethrowError(); + } + + if (env != null) { + env.clear(); + } + } static class TestingYarnResourceManager extends YarnResourceManager { - public AMRMClientAsync mockResourceManagerClient; - public NMClient mockNMClient; + AMRMClientAsync mockResourceManagerClient; + NMClient mockNMClient; - public TestingYarnResourceManager( + TestingYarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, @@ -181,11 +197,11 @@ public TestingYarnResourceManager( this.mockResourceManagerClient = mockResourceManagerClient; } - public CompletableFuture runInMainThread(Callable callable) { + CompletableFuture runInMainThread(Callable callable) { return callAsync(callable, TIMEOUT); } - public MainThreadExecutor getMainThreadExecutorForTesting() { + MainThreadExecutor getMainThreadExecutorForTesting() { return super.getMainThreadExecutor(); } @@ -193,7 +209,7 @@ public MainThreadExecutor getMainThreadExecutorForTesting() { protected AMRMClientAsync createAndStartResourceManagerClient( YarnConfiguration yarnConfiguration, int yarnHeartbeatIntervalMillis, - @Nullable String webInteraceUrl) { + @Nullable String webInterfaceUrl) { return mockResourceManagerClient; } @@ -213,7 +229,6 @@ class Context { // services final TestingRpcService rpcService; - final TestingFatalErrorHandler fatalErrorHandler; final MockResourceManagerRuntimeServices rmServices; // RM @@ -240,7 +255,6 @@ class Context { */ Context() throws Exception { rpcService = new TestingRpcService(); - fatalErrorHandler = new TestingFatalErrorHandler(); rmServices = new MockResourceManagerRuntimeServices(); // resource manager @@ -258,7 +272,7 @@ class Context { rmServices.metricRegistry, rmServices.jobLeaderIdService, new ClusterInformation("localhost", 1234), - fatalErrorHandler, + testingFatalErrorHandler, null, mockResourceManagerClient, mockNMClient); @@ -269,15 +283,15 @@ class Context { */ class MockResourceManagerRuntimeServices { - public final ScheduledExecutor scheduledExecutor; - public final TestingHighAvailabilityServices highAvailabilityServices; - public final HeartbeatServices heartbeatServices; - public final MetricRegistry metricRegistry; - public final TestingLeaderElectionService rmLeaderElectionService; - public final JobLeaderIdService jobLeaderIdService; - public final SlotManager slotManager; + private final ScheduledExecutor scheduledExecutor; + private final TestingHighAvailabilityServices highAvailabilityServices; + private final HeartbeatServices heartbeatServices; + private final MetricRegistry metricRegistry; + private final TestingLeaderElectionService rmLeaderElectionService; + private final JobLeaderIdService jobLeaderIdService; + private final SlotManager slotManager; - public UUID rmLeaderSessionId; + private UUID rmLeaderSessionId; MockResourceManagerRuntimeServices() throws Exception { scheduledExecutor = mock(ScheduledExecutor.class); @@ -295,7 +309,7 @@ class MockResourceManagerRuntimeServices { Time.minutes(5L)); } - public void grantLeadership() throws Exception { + void grantLeadership() throws Exception { rmLeaderSessionId = UUID.randomUUID(); rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } @@ -304,7 +318,7 @@ public void grantLeadership() throws Exception { /** * Start the resource manager and grant leadership to it. */ - public void startResourceManager() throws Exception { + void startResourceManager() throws Exception { resourceManager.start(); rmServices.grantLeadership(); } @@ -312,11 +326,141 @@ public void startResourceManager() throws Exception { /** * Stop the Akka actor system. */ - public void stopResourceManager() throws Exception { + void stopResourceManager() throws Exception { rpcService.stopService().get(); } } + static class TestingContainer extends Container { + private final NodeId nodeId; + private final ContainerId containerId; + private Resource resource; + private Priority priority; + + TestingContainer(String host, int port, int containerId) { + this.nodeId = NodeId.newInstance(host, port); + this.containerId = ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1 + ), + containerId + ); + } + + @Override + public ContainerId getId() { + return containerId; + } + + @Override + public void setId(ContainerId containerId) { + + } + + @Override + public NodeId getNodeId() { + return nodeId; + } + + @Override + public void setNodeId(NodeId nodeId) { + + } + + @Override + public Resource getResource() { + return resource; + } + + @Override + public void setResource(Resource resource) { + this.resource = resource; + } + + @Override + public Priority getPriority() { + return priority; + } + + @Override + public void setPriority(Priority priority) { + this.priority = priority; + } + + @Override + public Token getContainerToken() { + return null; + } + + @Override + public void setContainerToken(Token token) { + + } + + @Override + public void setNodeHttpAddress(String s) { + + } + + @Override + public String getNodeHttpAddress() { + return null; + } + + @Override public int compareTo(@Nonnull Container other) { + return 0; + } + } + + static class TestingContainerStatus extends ContainerStatus { + private ContainerId containerId; + + TestingContainerStatus() { + + } + + @Override + public ContainerId getContainerId() { + return containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + this.containerId = containerId; + } + + @Override + public ContainerState getState() { + return ContainerState.COMPLETE; + } + + @Override + public void setState(ContainerState containerState) { + + } + + @Override + public int getExitStatus() { + return -1; + } + + @Override + public void setExitStatus(int exitStatus) { + + } + + @Override + public String getDiagnostics() { + return "Test exit"; + } + + @Override + public void setDiagnostics(String diagnostics) { + + } + } + @Test public void testStopWorker() throws Exception { new Context() {{ @@ -332,16 +476,10 @@ public void testStopWorker() throws Exception { registerSlotRequestFuture.get(); // Callback from YARN when container is allocated. - Container testingContainer = mock(Container.class); - when(testingContainer.getId()).thenReturn( - ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1), - 1)); - when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); - when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); - when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + Container testingContainer = new TestingContainer("container", 1234, 1); + testingContainer.setResource(Resource.newInstance(200, 1)); + testingContainer.setPriority(Priority.UNDEFINED); + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); @@ -397,8 +535,8 @@ public void testStopWorker() throws Exception { stopResourceManager(); // It's now safe to access the SlotManager state since the ResourceManager has been stopped. - assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0); - assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0); + assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0)); + assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0)); }}; } @@ -415,13 +553,14 @@ public void testDeleteApplicationFiles() throws Exception { resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null); assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); + + stopResourceManager(); }}; } /** * Tests that YarnResourceManager will not request more containers than needs during * callback from Yarn when container is Completed. - * @throws Exception */ @Test public void testOnContainerCompleted() throws Exception { @@ -436,40 +575,27 @@ public void testOnContainerCompleted() throws Exception { // wait for the registerSlotRequest completion registerSlotRequestFuture.get(); - ContainerId testContainerId = ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1), - 1); - // Callback from YARN when container is allocated. - Container testingContainer = mock(Container.class); - when(testingContainer.getId()).thenReturn(testContainerId); - when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); - when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); - when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + Container testingContainer = new TestingContainer("container", 1234, 1); + testingContainer.setResource(Resource.newInstance(200, 1)); + testingContainer.setPriority(Priority.UNDEFINED); resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Callback from YARN when container is Completed, pending request can not be fulfilled by pending // containers, need to request new container. - ContainerStatus testingContainerStatus = mock(ContainerStatus.class); - when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); - when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); - when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); - when(testingContainerStatus.getExitStatus()).thenReturn(-1); + ContainerStatus testingContainerStatus = new TestingContainerStatus(); + testingContainerStatus.setContainerId(testingContainer.getId()); resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); // Callback from YARN when container is Completed happened before global fail, pending request // slot is already fulfilled by pending containers, no need to request new container. - when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); - when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); - when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); - when(testingContainerStatus.getExitStatus()).thenReturn(-1); resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + + stopResourceManager(); }}; } } From b6b90e2d84e29ef6d4070bb91a601be520110419 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=A2=93=E7=AB=8B?= Date: Wed, 8 Aug 2018 11:00:04 +0800 Subject: [PATCH 2/2] [FLINK-10099] [test] Rework YarnResourceManagerTest --- .../flink/yarn/YarnResourceManagerTest.java | 361 +++++++----------- 1 file changed, 134 insertions(+), 227 deletions(-) diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 347a8d1491973..a7d4f43672d01 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; @@ -67,7 +68,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -79,7 +79,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; @@ -109,6 +108,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * General tests for the YARN resource manager component. @@ -156,7 +156,6 @@ public void teardown() throws Exception { if (env != null) { env.clear(); } - } static class TestingYarnResourceManager extends YarnResourceManager { @@ -329,211 +328,123 @@ void startResourceManager() throws Exception { void stopResourceManager() throws Exception { rpcService.stopService().get(); } - } - - static class TestingContainer extends Container { - private final NodeId nodeId; - private final ContainerId containerId; - private Resource resource; - private Priority priority; - - TestingContainer(String host, int port, int containerId) { - this.nodeId = NodeId.newInstance(host, port); - this.containerId = ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1 - ), - containerId - ); - } - - @Override - public ContainerId getId() { - return containerId; - } - - @Override - public void setId(ContainerId containerId) { - - } - @Override - public NodeId getNodeId() { - return nodeId; - } - - @Override - public void setNodeId(NodeId nodeId) { - - } - - @Override - public Resource getResource() { - return resource; - } - - @Override - public void setResource(Resource resource) { - this.resource = resource; - } - - @Override - public Priority getPriority() { - return priority; - } - - @Override - public void setPriority(Priority priority) { - this.priority = priority; - } - - @Override - public Token getContainerToken() { - return null; - } - - @Override - public void setContainerToken(Token token) { - - } - - @Override - public void setNodeHttpAddress(String s) { - - } - - @Override - public String getNodeHttpAddress() { - return null; - } - - @Override public int compareTo(@Nonnull Container other) { - return 0; + /** + * A wrapper function for running test. Deal with setup and teardown logic + * in Context. + * @param testMethod the real test body. + */ + void runTest(RunnableWithException testMethod) throws Exception { + startResourceManager(); + try { + testMethod.run(); + } finally { + stopResourceManager(); + } } } - static class TestingContainerStatus extends ContainerStatus { - private ContainerId containerId; - - TestingContainerStatus() { - - } - - @Override - public ContainerId getContainerId() { - return containerId; - } - - @Override - public void setContainerId(ContainerId containerId) { - this.containerId = containerId; - } - - @Override - public ContainerState getState() { - return ContainerState.COMPLETE; - } - - @Override - public void setState(ContainerState containerState) { - - } + private static Container mockContainer(String host, int port, int containerId) { + Container mockContainer = mock(Container.class); - @Override - public int getExitStatus() { - return -1; - } + NodeId mockNodeId = NodeId.newInstance(host, port); + ContainerId mockContainerId = ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1 + ), + containerId + ); - @Override - public void setExitStatus(int exitStatus) { + when(mockContainer.getId()).thenReturn(mockContainerId); + when(mockContainer.getNodeId()).thenReturn(mockNodeId); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED); - } + return mockContainer; + } - @Override - public String getDiagnostics() { - return "Test exit"; - } + private static ContainerStatus mockContainerStatus(ContainerId containerId) { + ContainerStatus mockContainerStatus = mock(ContainerStatus.class); - @Override - public void setDiagnostics(String diagnostics) { + when(mockContainerStatus.getContainerId()).thenReturn(containerId); + when(mockContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); + when(mockContainerStatus.getDiagnostics()).thenReturn("Test exit"); + when(mockContainerStatus.getExitStatus()).thenReturn(-1); - } + return mockContainerStatus; } @Test public void testStopWorker() throws Exception { new Context() {{ - startResourceManager(); - // Request slot from SlotManager. - CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { - rmServices.slotManager.registerSlotRequest( - new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); - return null; + runTest(() -> { + // Request slot from SlotManager. + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + + // Callback from YARN when container is allocated. + Container testingContainer = mockContainer("container", 1234, 1); + + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); + rpcService.registerGateway(taskHost, mockTaskExecutorGateway); + + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + + final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString()); + final SlotReport slotReport = new SlotReport( + new SlotStatus( + new SlotID(taskManagerResourceId, 1), + new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap()))); + + CompletableFuture numberRegisteredSlotsFuture = rmGateway + .registerTaskExecutor( + taskHost, + taskManagerResourceId, + dataPort, + hardwareDescription, + Time.seconds(10L)) + .thenCompose( + (RegistrationResponse response) -> { + assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class)); + final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response; + return rmGateway.sendSlotReport( + taskManagerResourceId, + success.getRegistrationId(), + slotReport, + Time.seconds(10L)); + }) + .handleAsync( + (Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(), + resourceManager.getMainThreadExecutorForTesting()); + + final int numberRegisteredSlots = numberRegisteredSlotsFuture.get(); + + assertEquals(1, numberRegisteredSlots); + + // Unregister all task executors and release all containers. + CompletableFuture unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.unregisterTaskManagersAndReleaseResources(); + return null; + }); + + unregisterAndReleaseFuture.get(); + + verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class)); + verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class)); }); - // wait for the registerSlotRequest completion - registerSlotRequestFuture.get(); - - // Callback from YARN when container is allocated. - Container testingContainer = new TestingContainer("container", 1234, 1); - testingContainer.setResource(Resource.newInstance(200, 1)); - testingContainer.setPriority(Priority.UNDEFINED); - - resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); - verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); - - // Remote task executor registers with YarnResourceManager. - TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); - rpcService.registerGateway(taskHost, mockTaskExecutorGateway); - - final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); - - final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString()); - final SlotReport slotReport = new SlotReport( - new SlotStatus( - new SlotID(taskManagerResourceId, 1), - new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap()))); - - CompletableFuture numberRegisteredSlotsFuture = rmGateway - .registerTaskExecutor( - taskHost, - taskManagerResourceId, - dataPort, - hardwareDescription, - Time.seconds(10L)) - .thenCompose( - (RegistrationResponse response) -> { - assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class)); - final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response; - return rmGateway.sendSlotReport( - taskManagerResourceId, - success.getRegistrationId(), - slotReport, - Time.seconds(10L)); - }) - .handleAsync( - (Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(), - resourceManager.getMainThreadExecutorForTesting()); - - final int numberRegisteredSlots = numberRegisteredSlotsFuture.get(); - - assertEquals(1, numberRegisteredSlots); - - // Unregister all task executors and release all containers. - CompletableFuture unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> { - rmServices.slotManager.unregisterTaskManagersAndReleaseResources(); - return null; - }); - - unregisterAndReleaseFuture.get(); - - verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class)); - verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class)); - - stopResourceManager(); - // It's now safe to access the SlotManager state since the ResourceManager has been stopped. assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0)); assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0)); @@ -549,12 +460,10 @@ public void testDeleteApplicationFiles() throws Exception { final File applicationDir = folder.newFolder(".flink"); env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath()); - startResourceManager(); - - resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null); - assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); - - stopResourceManager(); + runTest(() -> { + resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null); + assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); + }); }}; } @@ -565,37 +474,35 @@ public void testDeleteApplicationFiles() throws Exception { @Test public void testOnContainerCompleted() throws Exception { new Context() {{ - startResourceManager(); - CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { - rmServices.slotManager.registerSlotRequest( - new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); - return null; + runTest(() -> { + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + + // Callback from YARN when container is allocated. + Container testingContainer = mockContainer("container", 1234, 1); + + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Callback from YARN when container is Completed, pending request can not be fulfilled by pending + // containers, need to request new container. + ContainerStatus testingContainerStatus = mockContainerStatus(testingContainer.getId()); + + resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + + // Callback from YARN when container is Completed happened before global fail, pending request + // slot is already fulfilled by pending containers, no need to request new container. + resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); }); - - // wait for the registerSlotRequest completion - registerSlotRequestFuture.get(); - - // Callback from YARN when container is allocated. - Container testingContainer = new TestingContainer("container", 1234, 1); - testingContainer.setResource(Resource.newInstance(200, 1)); - testingContainer.setPriority(Priority.UNDEFINED); - resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); - verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); - - // Callback from YARN when container is Completed, pending request can not be fulfilled by pending - // containers, need to request new container. - ContainerStatus testingContainerStatus = new TestingContainerStatus(); - testingContainerStatus.setContainerId(testingContainer.getId()); - resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); - verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - - // Callback from YARN when container is Completed happened before global fail, pending request - // slot is already fulfilled by pending containers, no need to request new container. - resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); - verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - - stopResourceManager(); }}; } }