From 0e849c5361c4a30507c2f46b1f66c7e19bf2879c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E6=A2=93=E7=AB=8B?= Date: Mon, 6 Aug 2018 16:09:43 +0800 Subject: [PATCH] [hotfix] loose dependency to mockito --- .../flink/yarn/YarnResourceManagerTest.java | 244 +++++++++++++----- 1 file changed, 186 insertions(+), 58 deletions(-) diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index eb8e96846e9149..495ea08efbbe3f 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -67,16 +67,21 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.hamcrest.Matchers; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.File; @@ -106,7 +111,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; /** * General tests for the YARN resource manager component. @@ -115,21 +119,28 @@ public class YarnResourceManagerTest extends TestLogger { private static final Time TIMEOUT = Time.seconds(10L); - private Configuration flinkConfig = new Configuration(); + private Configuration flinkConfig; - private Map env = new HashMap<>(); + private Map env; + + private TestingFatalErrorHandler testingFatalErrorHandler; @Rule public TemporaryFolder folder = new TemporaryFolder(); @Before public void setup() { + testingFatalErrorHandler = new TestingFatalErrorHandler(); + + flinkConfig = new Configuration(); flinkConfig.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, 100); + File root = folder.getRoot(); File home = new File(root, "home"); boolean created = home.mkdir(); assertTrue(created); + env = new HashMap<>(); env.put(ENV_APP_ID, "foo"); env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath()); env.put(ENV_CLIENT_SHIP_FILES, ""); @@ -139,15 +150,22 @@ public void setup() { } @After - public void teardown() { - env.clear(); + public void teardown() throws Exception { + if (testingFatalErrorHandler != null) { + testingFatalErrorHandler.rethrowError(); + } + + if (env != null) { + env.clear(); + } + } static class TestingYarnResourceManager extends YarnResourceManager { - public AMRMClientAsync mockResourceManagerClient; - public NMClient mockNMClient; + AMRMClientAsync mockResourceManagerClient; + NMClient mockNMClient; - public TestingYarnResourceManager( + TestingYarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, ResourceID resourceId, @@ -181,11 +199,11 @@ public TestingYarnResourceManager( this.mockResourceManagerClient = mockResourceManagerClient; } - public CompletableFuture runInMainThread(Callable callable) { + CompletableFuture runInMainThread(Callable callable) { return callAsync(callable, TIMEOUT); } - public MainThreadExecutor getMainThreadExecutorForTesting() { + MainThreadExecutor getMainThreadExecutorForTesting() { return super.getMainThreadExecutor(); } @@ -193,7 +211,7 @@ public MainThreadExecutor getMainThreadExecutorForTesting() { protected AMRMClientAsync createAndStartResourceManagerClient( YarnConfiguration yarnConfiguration, int yarnHeartbeatIntervalMillis, - @Nullable String webInteraceUrl) { + @Nullable String webInterfaceUrl) { return mockResourceManagerClient; } @@ -213,7 +231,6 @@ class Context { // services final TestingRpcService rpcService; - final TestingFatalErrorHandler fatalErrorHandler; final MockResourceManagerRuntimeServices rmServices; // RM @@ -240,7 +257,6 @@ class Context { */ Context() throws Exception { rpcService = new TestingRpcService(); - fatalErrorHandler = new TestingFatalErrorHandler(); rmServices = new MockResourceManagerRuntimeServices(); // resource manager @@ -258,7 +274,7 @@ class Context { rmServices.metricRegistry, rmServices.jobLeaderIdService, new ClusterInformation("localhost", 1234), - fatalErrorHandler, + testingFatalErrorHandler, null, mockResourceManagerClient, mockNMClient); @@ -269,15 +285,15 @@ class Context { */ class MockResourceManagerRuntimeServices { - public final ScheduledExecutor scheduledExecutor; - public final TestingHighAvailabilityServices highAvailabilityServices; - public final HeartbeatServices heartbeatServices; - public final MetricRegistry metricRegistry; - public final TestingLeaderElectionService rmLeaderElectionService; - public final JobLeaderIdService jobLeaderIdService; - public final SlotManager slotManager; + private final ScheduledExecutor scheduledExecutor; + private final TestingHighAvailabilityServices highAvailabilityServices; + private final HeartbeatServices heartbeatServices; + private final MetricRegistry metricRegistry; + private final TestingLeaderElectionService rmLeaderElectionService; + private final JobLeaderIdService jobLeaderIdService; + private final SlotManager slotManager; - public UUID rmLeaderSessionId; + private UUID rmLeaderSessionId; MockResourceManagerRuntimeServices() throws Exception { scheduledExecutor = mock(ScheduledExecutor.class); @@ -295,7 +311,7 @@ class MockResourceManagerRuntimeServices { Time.minutes(5L)); } - public void grantLeadership() throws Exception { + void grantLeadership() throws Exception { rmLeaderSessionId = UUID.randomUUID(); rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); } @@ -304,7 +320,7 @@ public void grantLeadership() throws Exception { /** * Start the resource manager and grant leadership to it. */ - public void startResourceManager() throws Exception { + void startResourceManager() throws Exception { resourceManager.start(); rmServices.grantLeadership(); } @@ -312,11 +328,141 @@ public void startResourceManager() throws Exception { /** * Stop the Akka actor system. */ - public void stopResourceManager() throws Exception { + void stopResourceManager() throws Exception { rpcService.stopService().get(); } } + static class TestingContainer extends Container { + private final NodeId nodeId; + private final ContainerId containerId; + private Resource resource; + private Priority priority; + + TestingContainer(String host, int port, int containerId) { + this.nodeId = NodeId.newInstance(host, port); + this.containerId = ContainerId.newInstance( + ApplicationAttemptId.newInstance( + ApplicationId.newInstance(System.currentTimeMillis(), 1), + 1 + ), + containerId + ); + } + + @Override + public ContainerId getId() { + return containerId; + } + + @Override + public void setId(ContainerId containerId) { + + } + + @Override + public NodeId getNodeId() { + return nodeId; + } + + @Override + public void setNodeId(NodeId nodeId) { + + } + + @Override + public Resource getResource() { + return resource; + } + + @Override + public void setResource(Resource resource) { + this.resource = resource; + } + + @Override + public Priority getPriority() { + return priority; + } + + @Override + public void setPriority(Priority priority) { + this.priority = priority; + } + + @Override + public Token getContainerToken() { + return null; + } + + @Override + public void setContainerToken(Token token) { + + } + + @Override + public void setNodeHttpAddress(String s) { + + } + + @Override + public String getNodeHttpAddress() { + return null; + } + + @Override public int compareTo(@Nonnull Container other) { + return 0; + } + } + + static class TestingContainerStatus extends ContainerStatus { + private ContainerId containerId; + + TestingContainerStatus() { + + } + + @Override + public ContainerId getContainerId() { + return containerId; + } + + @Override + public void setContainerId(ContainerId containerId) { + this.containerId = containerId; + } + + @Override + public ContainerState getState() { + return ContainerState.COMPLETE; + } + + @Override + public void setState(ContainerState containerState) { + + } + + @Override + public int getExitStatus() { + return -1; + } + + @Override + public void setExitStatus(int exitStatus) { + + } + + @Override + public String getDiagnostics() { + return "Test exit"; + } + + @Override + public void setDiagnostics(String diagnostics) { + + } + } + @Test public void testStopWorker() throws Exception { new Context() {{ @@ -332,16 +478,10 @@ public void testStopWorker() throws Exception { registerSlotRequestFuture.get(); // Callback from YARN when container is allocated. - Container testingContainer = mock(Container.class); - when(testingContainer.getId()).thenReturn( - ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1), - 1)); - when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); - when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); - when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + Container testingContainer = new TestingContainer("container", 1234, 1); + testingContainer.setResource(Resource.newInstance(200, 1)); + testingContainer.setPriority(Priority.UNDEFINED); + resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); @@ -397,8 +537,8 @@ public void testStopWorker() throws Exception { stopResourceManager(); // It's now safe to access the SlotManager state since the ResourceManager has been stopped. - assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0); - assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0); + assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0)); + assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0)); }}; } @@ -415,13 +555,14 @@ public void testDeleteApplicationFiles() throws Exception { resourceManager.deregisterApplication(ApplicationStatus.SUCCEEDED, null); assertFalse("YARN application directory was not removed", Files.exists(applicationDir.toPath())); + + stopResourceManager(); }}; } /** * Tests that YarnResourceManager will not request more containers than needs during * callback from Yarn when container is Completed. - * @throws Exception */ @Test public void testOnContainerCompleted() throws Exception { @@ -436,40 +577,27 @@ public void testOnContainerCompleted() throws Exception { // wait for the registerSlotRequest completion registerSlotRequestFuture.get(); - ContainerId testContainerId = ContainerId.newInstance( - ApplicationAttemptId.newInstance( - ApplicationId.newInstance(System.currentTimeMillis(), 1), - 1), - 1); - // Callback from YARN when container is allocated. - Container testingContainer = mock(Container.class); - when(testingContainer.getId()).thenReturn(testContainerId); - when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); - when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); - when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); + Container testingContainer = new TestingContainer("container", 1234, 1); + testingContainer.setResource(Resource.newInstance(200, 1)); + testingContainer.setPriority(Priority.UNDEFINED); resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Callback from YARN when container is Completed, pending request can not be fulfilled by pending // containers, need to request new container. - ContainerStatus testingContainerStatus = mock(ContainerStatus.class); - when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); - when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); - when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); - when(testingContainerStatus.getExitStatus()).thenReturn(-1); + ContainerStatus testingContainerStatus = new TestingContainerStatus(); + testingContainerStatus.setContainerId(testingContainer.getId()); resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); // Callback from YARN when container is Completed happened before global fail, pending request // slot is already fulfilled by pending containers, no need to request new container. - when(testingContainerStatus.getContainerId()).thenReturn(testContainerId); - when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE); - when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit"); - when(testingContainerStatus.getExitStatus()).thenReturn(-1); resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus)); verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + + stopResourceManager(); }}; } }