diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index eb8e96846e914..a7d4f43672d01 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList; @@ -71,6 +72,7 @@ import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -115,21 +117,28 @@ public class YarnResourceManagerTest extends TestLogger { private static final Time TIMEOUT = Time.seconds(10L); - private Configuration flinkConfig = new Configuration(); + private Configuration flinkConfig; - private Map env = new HashMap<>(); + private Map env; + + private TestingFatalErrorHandler testingFatalErrorHandler; @Rule public TemporaryFolder folder = new TemporaryFolder(); @Before public void setup() { + testingFatalErrorHandler = new TestingFatalErrorHandler(); + + flinkConfig = new Configuration(); flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100); + File root = folder.getRoot(); File home = new File(root, "home"); boolean created = home.mkdir(); assertTrue(created); + env = new HashMap<>(); env.put(ENV_APP_ID, "foo"); env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath()); env.put(ENV_CLIENT_SHIP_FILES, ""); @@ -139,15 +148,21 @@ public void setup() { } @After - public void teardown() { - env.clear(); + public void teardown() throws Exception { + if (testingFatalErrorHandler != null) { + testingFatalErrorHandler.rethrowError(); + } + + if (env != null) { + env.clear(); + } } static class TestingYarnResourceManager extends YarnResourceManager { - public AMRMClientAsync mockResourceManagerClient; - public NMClient mockNMClient; + AMRMClientAsync mockResourceManagerClient; + NMClient mockNMClient; - public TestingYarnResourceManager( + TestingYarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, @@ -181,11 +196,11 @@ public TestingYarnResourceManager( this.mockResourceManagerClient = mockResourceManagerClient; } - public CompletableFuture runInMainThread(Callable callable) { + CompletableFuture runInMainThread(Callable callable) { return callAsync(callable, TIMEOUT); } - public MainThreadExecutor getMainThreadExecutorForTesting() { + MainThreadExecutor getMainThreadExecutorForTesting() { return super.getMainThreadExecutor(); } @@ -193,7 +208,7 @@ public MainThreadExecutor getMainThreadExecutorForTesting() { protected AMRMClientAsync createAndStartResourceManagerClient( YarnConfiguration yarnConfiguration, int yarnHeartbeatIntervalMillis, - @Nullable String webInteraceUrl) { + @Nullable String webInterfaceUrl) { return mockResourceManagerClient; } @@ -213,7 +228,6 @@ class Context { // services final TestingRpcService rpcService; - final TestingFatalErrorHandler fatalErrorHandler; final MockResourceManagerRuntimeServices rmServices; // RM @@ -240,7 +254,6 @@ class Context { */ Context() throws Exception { rpcService = new TestingRpcService(); - fatalErrorHandler = new TestingFatalErrorHandler(); rmServices = new MockResourceManagerRuntimeServices(); // resource manager @@ -258,7 +271,7 @@ class Context { rmServices.metricRegistry, rmServices.jobLeaderIdService, new ClusterInformation("localhost", 1234), - fatalErrorHandler, + testingFatalErrorHandler, null, mockResourceManagerClient, mockNMClient); @@ -269,15 +282,15 @@ class Context { */ class MockResourceManagerRuntimeServices { - public final ScheduledExecutor scheduledExecutor; - public final TestingHighAvailabilityServices highAvailabilityServices; - public final HeartbeatServices heartbeatServices; - public final MetricRegistry metricRegistry; - public final TestingLeaderElectionService rmLeaderElectionService; - public final JobLeaderIdService jobLeaderIdService; - public final SlotManager slotManager; + private final ScheduledExecutor scheduledExecutor; + private final TestingHighAvailabilityServices highAvailabilityServices; + private final HeartbeatServices heartbeatServices; + private final MetricRegistry metricRegistry; + private final TestingLeaderElectionService rmLeaderElectionService; + private final JobLeaderIdService jobLeaderIdService; + private final SlotManager slotManager; - public UUID rmLeaderSessionId; + private UUID rmLeaderSessionId; MockResourceManagerRuntimeServices() throws Exception { scheduledExecutor = mock(ScheduledExecutor.class); @@ -295,7 +308,7 @@ class MockResourceManagerRuntimeServices { Time.minutes(5L)); } - public void grantLeadership() throws Exception { + void grantLeadership() throws Exception { rmLeaderSessionId = UUID.randomUUID(); rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } @@ -304,7 +317,7 @@ public void grantLeadership() throws Exception { /** * Start the resource manager and grant leadership to it. */ - public void startResourceManager() throws Exception { + void startResourceManager() throws Exception { resourceManager.start(); rmServices.grantLeadership(); } @@ -312,93 +325,129 @@ public void startResourceManager() throws Exception { /** * Stop the Akka actor system. */ - public void stopResourceManager() throws Exception { + void stopResourceManager() throws Exception { rpcService.stopService().get(); } - } - @Test - public void testStopWorker() throws Exception { - new Context() {{ + /** + * A wrapper function for running test. Deal with setup and teardown logic + * in Context. + * @param testMethod the real test body. + */ + void runTest(RunnableWithException testMethod) throws Exception { startResourceManager(); - // Request slot from SlotManager. - CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { - rmServices.slotManager.registerSlotRequest( - new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); - return null; - }); + try { + testMethod.run(); + } finally { + stopResourceManager(); + } + } + } - // wait for the registerSlotRequest completion - registerSlotRequestFuture.get(); - - // Callback from YARN when container is allocated. - Container testingContainer = mock(Container.class); - when(testingContainer.getId()).thenReturn( - ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1), - 1)); - when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); - when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); - when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); - resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); - verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); - - // Remote task executor registers with YarnResourceManager. - TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); - rpcService.registerGateway(taskHost, mockTaskExecutorGateway); - - final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); - - final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString()); - final SlotReport slotReport = new SlotReport( - new SlotStatus( - new SlotID(taskManagerResourceId, 1), - new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap()))); - - CompletableFuture numberRegisteredSlotsFuture = rmGateway - .registerTaskExecutor( - taskHost, - taskManagerResourceId, - dataPort, - hardwareDescription, - Time.seconds(10L)) - .thenCompose( - (RegistrationResponse response) -> { - assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class)); - final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response; - return rmGateway.sendSlotReport( - taskManagerResourceId, - success.getRegistrationId(), - slotReport, - Time.seconds(10L)); - }) - .handleAsync( - (Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(), - resourceManager.getMainThreadExecutorForTesting()); - - final int numberRegisteredSlots = numberRegisteredSlotsFuture.get(); - - assertEquals(1, numberRegisteredSlots); - - // Unregister all task executors and release all containers. - CompletableFuture unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> { - rmServices.slotManager.unregisterTaskManagersAndReleaseResources(); - return null; - }); + private static Container mockContainer(String host, int port, int containerId) { + Container mockContainer = mock(Container.class); + + NodeId mockNodeId = NodeId.newInstance(host, port); + ContainerId mockContainerId = ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1 + ), + containerId + ); + + when(mockContainer.getId()).thenReturn(mockContainerId); + when(mockContainer.getNodeId()).thenReturn(mockNodeId); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED); + + return mockContainer; + } + + private static ContainerStatus mockContainerStatus(ContainerId containerId) { + ContainerStatus mockContainerStatus = mock(ContainerStatus.class); - unregisterAndReleaseFuture.get(); + when(mockContainerStatus.getContainerId()).thenReturn(containerId); + when(mockContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); + when(mockContainerStatus.getDiagnostics()).thenReturn("Test exit"); + when(mockContainerStatus.getExitStatus()).thenReturn(-1); - verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class)); - verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class)); + return mockContainerStatus; + } - stopResourceManager(); + @Test + public void testStopWorker() throws Exception { + new Context() {{ + runTest(() -> { + // Request slot from SlotManager. + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + + // Callback from YARN when container is allocated. + Container testingContainer = mockContainer("container", 1234, 1); + + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Remote task executor registers with YarnResourceManager. + TaskExecutorGateway mockTaskExecutorGateway = mock(TaskExecutorGateway.class); + rpcService.registerGateway(taskHost, mockTaskExecutorGateway); + + final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); + + final ResourceID taskManagerResourceId = new ResourceID(testingContainer.getId().toString()); + final SlotReport slotReport = new SlotReport( + new SlotStatus( + new SlotID(taskManagerResourceId, 1), + new ResourceProfile(10, 1, 1, 1, 0, Collections.emptyMap()))); + + CompletableFuture numberRegisteredSlotsFuture = rmGateway + .registerTaskExecutor( + taskHost, + taskManagerResourceId, + dataPort, + hardwareDescription, + Time.seconds(10L)) + .thenCompose( + (RegistrationResponse response) -> { + assertThat(response, instanceOf(TaskExecutorRegistrationSuccess.class)); + final TaskExecutorRegistrationSuccess success = (TaskExecutorRegistrationSuccess) response; + return rmGateway.sendSlotReport( + taskManagerResourceId, + success.getRegistrationId(), + slotReport, + Time.seconds(10L)); + }) + .handleAsync( + (Acknowledge ignored, Throwable throwable) -> rmServices.slotManager.getNumberRegisteredSlots(), + resourceManager.getMainThreadExecutorForTesting()); + + final int numberRegisteredSlots = numberRegisteredSlotsFuture.get(); + + assertEquals(1, numberRegisteredSlots); + + // Unregister all task executors and release all containers. + CompletableFuture unregisterAndReleaseFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.unregisterTaskManagersAndReleaseResources(); + return null; + }); + + unregisterAndReleaseFuture.get(); + + verify(mockNMClient).stopContainer(any(ContainerId.class), any(NodeId.class)); + verify(mockResourceManagerClient).releaseAssignedContainer(any(ContainerId.class)); + }); // It's now safe to access the SlotManager state since the ResourceManager has been stopped. - assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0); - assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0); + assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0)); + assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0)); }}; } @@ -411,65 +460,49 @@ public void testDeleteApplicationFiles() throws Exception { final File applicationDir = folder.newFolder(".flink"); env.put(FLINK_YARN_FILES, applicationDir.getCanonicalPath()); - startResourceManager(); - - resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null); - assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); + runTest(() -> { + resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null); + assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); + }); }}; } /** * Tests that YarnResourceManager will not request more containers than needs during * callback from Yarn when container is Completed. - * @throws Exception */ @Test public void testOnContainerCompleted() throws Exception { new Context() {{ - startResourceManager(); - CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { - rmServices.slotManager.registerSlotRequest( - new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); - return null; + runTest(() -> { + CompletableFuture registerSlotRequestFuture = resourceManager.runInMainThread(() -> { + rmServices.slotManager.registerSlotRequest( + new SlotRequest(new JobID(), new AllocationID(), resourceProfile1, taskHost)); + return null; + }); + + // wait for the registerSlotRequest completion + registerSlotRequestFuture.get(); + + // Callback from YARN when container is allocated. + Container testingContainer = mockContainer("container", 1234, 1); + + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); + verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); + + // Callback from YARN when container is Completed, pending request can not be fulfilled by pending + // containers, need to request new container. + ContainerStatus testingContainerStatus = mockContainerStatus(testingContainer.getId()); + + resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + + // Callback from YARN when container is Completed happened before global fail, pending request + // slot is already fulfilled by pending containers, no need to request new container. + resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); + verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); }); - - // wait for the registerSlotRequest completion - registerSlotRequestFuture.get(); - - ContainerId testContainerId = ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1), - 1); - - // Callback from YARN when container is allocated. - Container testingContainer = mock(Container.class); - when(testingContainer.getId()).thenReturn(testContainerId); - when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); - when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); - when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); - resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); - verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); - - // Callback from YARN when container is Completed, pending request can not be fulfilled by pending - // containers, need to request new container. - ContainerStatus testingContainerStatus = mock(ContainerStatus.class); - when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); - when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); - when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); - when(testingContainerStatus.getExitStatus()).thenReturn(-1); - resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); - verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); - - // Callback from YARN when container is Completed happened before global fail, pending request - // slot is already fulfilled by pending containers, no need to request new container. - when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); - when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); - when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); - when(testingContainerStatus.getExitStatus()).thenReturn(-1); - resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); - verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); }}; } }