From 9a93c2738d0b0ade772e2e9cad9cf91aa6a9e7a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=A2=93=E7=AB=8B?= Date: Mon, 6 Aug 2018 16:09:43 +0800 Subject: [PATCH] [hotfix] loose dependency to mockito --- .../flink/yarn/YarnResourceManagerTest.java | 210 +++++++++++++----- 1 file changed, 160 insertions(+), 50 deletions(-) diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index eb8e96846e9149..5ae251e6bc79f1 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -67,16 +67,19 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; @@ -106,7 +109,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * General tests for the YARN resource manager component. @@ -144,10 +146,10 @@ public void teardown() { } static class TestingYarnResourceManager extends YarnResourceManager { - public AMRMClientAsync mockResourceManagerClient; - public NMClient mockNMClient; + AMRMClientAsync mockResourceManagerClient; + NMClient mockNMClient; - public TestingYarnResourceManager( + TestingYarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, @@ -181,11 +183,11 @@ public TestingYarnResourceManager( this.mockResourceManagerClient = mockResourceManagerClient; } - public CompletableFuture runInMainThread(Callable callable) { + CompletableFuture runInMainThread(Callable callable) { return callAsync(callable, TIMEOUT); } - public MainThreadExecutor getMainThreadExecutorForTesting() { + MainThreadExecutor getMainThreadExecutorForTesting() { return super.getMainThreadExecutor(); } @@ -269,15 +271,15 @@ class Context { */ class MockResourceManagerRuntimeServices { - public final ScheduledExecutor scheduledExecutor; - public final TestingHighAvailabilityServices highAvailabilityServices; - public final HeartbeatServices heartbeatServices; - public final MetricRegistry metricRegistry; - public final TestingLeaderElectionService rmLeaderElectionService; - public final JobLeaderIdService jobLeaderIdService; - public final SlotManager slotManager; + private final ScheduledExecutor scheduledExecutor; + private final TestingHighAvailabilityServices highAvailabilityServices; + private final HeartbeatServices heartbeatServices; + private final MetricRegistry metricRegistry; + private final TestingLeaderElectionService rmLeaderElectionService; + private final JobLeaderIdService jobLeaderIdService; + private final SlotManager slotManager; - public UUID rmLeaderSessionId; + private UUID rmLeaderSessionId; MockResourceManagerRuntimeServices() throws Exception { scheduledExecutor = mock(ScheduledExecutor.class); @@ -295,7 +297,7 @@ class MockResourceManagerRuntimeServices { Time.minutes(5L)); } - public void grantLeadership() throws Exception { + void grantLeadership() throws Exception { rmLeaderSessionId = UUID.randomUUID(); rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } @@ -304,7 +306,7 @@ public void grantLeadership() throws Exception { /** * Start the resource manager and grant leadership to it. */ - public void startResourceManager() throws Exception { + void startResourceManager() throws Exception { resourceManager.start(); rmServices.grantLeadership(); } @@ -312,11 +314,141 @@ public void startResourceManager() throws Exception { /** * Stop the Akka actor system. */ - public void stopResourceManager() throws Exception { + void stopResourceManager() throws Exception { rpcService.stopService().get(); } } + static class TestingContainer extends Container { + private final NodeId nodeId; + private final ContainerId containerId; + private Resource resource; + private Priority priority; + + TestingContainer(String host, int port, int containerId) { + this.nodeId = NodeId.newInstance(host, port); + this.containerId = ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1 + ), + containerId + ); + } + + @Override + public ContainerId getId() { + return containerId; + } + + @Override + public void setId(ContainerId containerId) { + + } + + @Override + public NodeId getNodeId() { + return nodeId; + } + + @Override + public void setNodeId(NodeId nodeId) { + + } + + @Override + public Resource getResource() { + return resource; + } + + @Override + public void setResource(Resource resource) { + this.resource = resource; + } + + @Override + public Priority getPriority() { + return priority; + } + + @Override + public void setPriority(Priority priority) { + this.priority = priority; + } + + @Override + public Token getContainerToken() { + return null; + } + + @Override + public void setContainerToken(Token token) { + + } + + @Override + public void setNodeHttpAddress(String s) { + + } + + @Override + public String getNodeHttpAddress() { + return null; + } + + @Override public int compareTo(@Nonnull Container other) { + return 0; + } + } + + static class TestingContainerStatus extends ContainerStatus { + private ContainerId containerId; + + TestingContainerStatus() { + + } + + @Override + public ContainerId getContainerId() { + return containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + this.containerId = containerId; + } + + @Override + public ContainerState getState() { + return ContainerState.COMPLETE; + } + + @Override + public void setState(ContainerState containerState) { + + } + + @Override + public int getExitStatus() { + return -1; + } + + @Override + public void setExitStatus(int exitStatus) { + + } + + @Override + public String getDiagnostics() { + return "Test exit"; + } + + @Override + public void setDiagnostics(String diagnostics) { + + } + } + @Test public void testStopWorker() throws Exception { new Context() {{ @@ -332,16 +464,10 @@ public void testStopWorker() throws Exception { registerSlotRequestFuture.get(); // Callback from YARN when container is allocated. - Container testingContainer = mock(Container.class); - when(testingContainer.getId()).thenReturn( - ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1), - 1)); - when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); - when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); - when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + Container testingContainer = new TestingContainer("container", 1234, 1); + testingContainer.setResource(Resource.newInstance(200, 1)); + testingContainer.setPriority(Priority.UNDEFINED); + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); @@ -397,8 +523,8 @@ public void testStopWorker() throws Exception { stopResourceManager(); // It's now safe to access the SlotManager state since the ResourceManager has been stopped. - assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0); - assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0); + assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0)); + assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0)); }}; } @@ -421,7 +547,6 @@ public void testDeleteApplicationFiles() throws Exception { /** * Tests that YarnResourceManager will not request more containers than needs during * callback from Yarn when container is Completed. - * @throws Exception */ @Test public void testOnContainerCompleted() throws Exception { @@ -436,38 +561,23 @@ public void testOnContainerCompleted() throws Exception { // wait for the registerSlotRequest completion registerSlotRequestFuture.get(); - ContainerId testContainerId = ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1), - 1); - // Callback from YARN when container is allocated. - Container testingContainer = mock(Container.class); - when(testingContainer.getId()).thenReturn(testContainerId); - when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); - when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); - when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + Container testingContainer = new TestingContainer("container", 1234, 1); + testingContainer.setResource(Resource.newInstance(200, 1)); + testingContainer.setPriority(Priority.UNDEFINED); resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Callback from YARN when container is Completed, pending request can not be fulfilled by pending // containers, need to request new container. - ContainerStatus testingContainerStatus = mock(ContainerStatus.class); - when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); - when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); - when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); - when(testingContainerStatus.getExitStatus()).thenReturn(-1); + ContainerStatus testingContainerStatus = new TestingContainerStatus(); + testingContainerStatus.setContainerId(testingContainer.getId()); resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); // Callback from YARN when container is Completed happened before global fail, pending request // slot is already fulfilled by pending containers, no need to request new container. - when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); - when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); - when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); - when(testingContainerStatus.getExitStatus()).thenReturn(-1); resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); }};