diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 347a8d1491973..a7d4f43672d01 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; @@ -67,7 +68,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -79,7 +79,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; @@ -109,6 +108,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * General tests for the YARN resource manager component. @@ -156,7 +156,6 @@ public void teardown() throws Exception { if (env != null) { env.clear(); } - } static class TestingYarnResourceManager extends YarnResourceManager { @@ -329,211 +328,123 @@ void startResourceManager() throws Exception { void stopResourceManager() throws Exception { rpcService.stopService().get(); } - } - - static class TestingContainer extends Container { - private final NodeId nodeId; - private final ContainerId containerId; - private Resource resource; - private Priority priority; - - TestingContainer(String host, int port, int containerId) { - this.nodeId = NodeId.newInstance(host, port); - this.containerId = ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1 - ), - containerId - ); - } - - @Override - public ContainerId getId() { - return containerId; - } - - @Override - public void setId(ContainerId containerId) { - - } - @Override - public NodeId getNodeId() { - return nodeId; - } - - @Override - public void setNodeId(NodeId nodeId) { - - } - - @Override - public Resource getResource() { - return resource; - } - - @Override - public void setResource(Resource resource) { - this.resource = resource; - } - - @Override - public Priority getPriority() { - return priority; - } - - @Override - public void setPriority(Priority priority) { - this.priority = priority; - } - - @Override - public Token getContainerToken() { - return null; - } - - @Override - public void setContainerToken(Token token) { - - } - - @Override - public void setNodeHttpAddress(String s) { - - } - - @Override - public String getNodeHttpAddress() { - return null; - } - - @Override public int compareTo(@Nonnull Container other) { - return 0; + /** + * A wrapper function for running test. Deal with setup and teardown logic + * in Context. + * @param testMethod the real test body. + */ + void runTest(RunnableWithException testMethod) throws Exception { + startResourceManager(); + try { + testMethod.run(); + } finally { + stopResourceManager(); + } } } - static class TestingContainerStatus extends ContainerStatus { - private ContainerId containerId; - - TestingContainerStatus() { - - } - - @Override - public ContainerId getContainerId() { - return containerId; - } - - @Override - public void setContainerId(ContainerId containerId) { - this.containerId = containerId; - } - - @Override - public ContainerState getState() { - return ContainerState.COMPLETE; - } - - @Override - public void setState(ContainerState containerState) { - - } + private static Container mockContainer(String host, int port, int containerId) { + Container mockContainer = mock(Container.class); - @Override - public int getExitStatus() { - return -1; - } + NodeId mockNodeId = NodeId.newInstance(host, port); + ContainerId mockContainerId = ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1 + ), + containerId + ); - @Override - public void setExitStatus(int exitStatus) { + when(mockContainer.getId()).thenReturn(mockContainerId); + when(mockContainer.getNodeId()).thenReturn(mockNodeId); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED); - } + return mockContainer; + } - @Override - public String getDiagnostics() { - return "Test exit"; - } + private static ContainerStatus mockContainerStatus(ContainerId containerId) { + ContainerStatus mockContainerStatus = mock(ContainerStatus.class); - @Override - public void setDiagnostics(String diagnostics) { + when(mockContainerStatus.getContainerId()).thenReturn(containerId); + when(mockContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); + when(mockContainerStatus.getDiagnostics()).thenReturn("Test exit"); + when(mockContainerStatus.getExitStatus()).thenReturn(-1); - } + return mockContainerStatus; } @Test public void testStopWorker() throws Exception { new Context() {{ - startResourceManager(); - // Request slot from SlotManager. - CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { - rmServices.slotManager.registerSlotRequest( - new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); - return null; + runTest(() -> { + // Request slot from SlotManager. + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + + // Callback from YARN when container is allocated. + Container testingContainer = mockContainer("container", 1234, 1); + + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); + rpcService.registerGateway(taskHost, mockTaskExecutorGateway); + + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + + final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString()); + final SlotReport slotReport = new SlotReport( + new SlotStatus( + new SlotID(taskManagerResourceId, 1), + new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap()))); + + CompletableFuture numberRegisteredSlotsFuture = rmGateway + .registerTaskExecutor( + taskHost, + taskManagerResourceId, + dataPort, + hardwareDescription, + Time.seconds(10L)) + .thenCompose( + (RegistrationResponse response) -> { + assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class)); + final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response; + return rmGateway.sendSlotReport( + taskManagerResourceId, + success.getRegistrationId(), + slotReport, + Time.seconds(10L)); + }) + .handleAsync( + (Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(), + resourceManager.getMainThreadExecutorForTesting()); + + final int numberRegisteredSlots = numberRegisteredSlotsFuture.get(); + + assertEquals(1, numberRegisteredSlots); + + // Unregister all task executors and release all containers. + CompletableFuture unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.unregisterTaskManagersAndReleaseResources(); + return null; + }); + + unregisterAndReleaseFuture.get(); + + verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class)); + verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class)); }); - // wait for the registerSlotRequest completion - registerSlotRequestFuture.get(); - - // Callback from YARN when container is allocated. - Container testingContainer = new TestingContainer("container", 1234, 1); - testingContainer.setResource(Resource.newInstance(200, 1)); - testingContainer.setPriority(Priority.UNDEFINED); - - resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); - verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); - - // Remote task executor registers with YarnResourceManager. - TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); - rpcService.registerGateway(taskHost, mockTaskExecutorGateway); - - final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); - - final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString()); - final SlotReport slotReport = new SlotReport( - new SlotStatus( - new SlotID(taskManagerResourceId, 1), - new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap()))); - - CompletableFuture numberRegisteredSlotsFuture = rmGateway - .registerTaskExecutor( - taskHost, - taskManagerResourceId, - dataPort, - hardwareDescription, - Time.seconds(10L)) - .thenCompose( - (RegistrationResponse response) -> { - assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class)); - final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response; - return rmGateway.sendSlotReport( - taskManagerResourceId, - success.getRegistrationId(), - slotReport, - Time.seconds(10L)); - }) - .handleAsync( - (Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(), - resourceManager.getMainThreadExecutorForTesting()); - - final int numberRegisteredSlots = numberRegisteredSlotsFuture.get(); - - assertEquals(1, numberRegisteredSlots); - - // Unregister all task executors and release all containers. - CompletableFuture unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> { - rmServices.slotManager.unregisterTaskManagersAndReleaseResources(); - return null; - }); - - unregisterAndReleaseFuture.get(); - - verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class)); - verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class)); - - stopResourceManager(); - // It's now safe to access the SlotManager state since the ResourceManager has been stopped. assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0)); assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0)); @@ -549,12 +460,10 @@ public void testDeleteApplicationFiles() throws Exception { final File applicationDir = folder.newFolder(".flink"); env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath()); - startResourceManager(); - - resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null); - assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); - - stopResourceManager(); + runTest(() -> { + resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null); + assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); + }); }}; } @@ -565,37 +474,35 @@ public void testDeleteApplicationFiles() throws Exception { @Test public void testOnContainerCompleted() throws Exception { new Context() {{ - startResourceManager(); - CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { - rmServices.slotManager.registerSlotRequest( - new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); - return null; + runTest(() -> { + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + + // Callback from YARN when container is allocated. + Container testingContainer = mockContainer("container", 1234, 1); + + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Callback from YARN when container is Completed, pending request can not be fulfilled by pending + // containers, need to request new container. + ContainerStatus testingContainerStatus = mockContainerStatus(testingContainer.getId()); + + resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + + // Callback from YARN when container is Completed happened before global fail, pending request + // slot is already fulfilled by pending containers, no need to request new container. + resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); }); - - // wait for the registerSlotRequest completion - registerSlotRequestFuture.get(); - - // Callback from YARN when container is allocated. - Container testingContainer = new TestingContainer("container", 1234, 1); - testingContainer.setResource(Resource.newInstance(200, 1)); - testingContainer.setPriority(Priority.UNDEFINED); - resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); - verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); - - // Callback from YARN when container is Completed, pending request can not be fulfilled by pending - // containers, need to request new container. - ContainerStatus testingContainerStatus = new TestingContainerStatus(); - testingContainerStatus.setContainerId(testingContainer.getId()); - resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); - verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - - // Callback from YARN when container is Completed happened before global fail, pending request - // slot is already fulfilled by pending containers, no need to request new container. - resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); - verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - - stopResourceManager(); }}; } }