Skip to content

Commit

Permalink
[hotfix] loose dependency to mockito
Browse files Browse the repository at this point in the history
  • Loading branch information
tisonkun committed Aug 6, 2018
1 parent ce96c40 commit 9a93c27
Showing 1 changed file with 160 additions and 50 deletions.
Expand Up @@ -67,16 +67,19 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.File;
Expand Down Expand Up @@ -106,7 +109,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* General tests for the YARN resource manager component.
Expand Down Expand Up @@ -144,10 +146,10 @@ public void teardown() {
}

static class TestingYarnResourceManager extends YarnResourceManager {
public AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
public NMClient mockNMClient;
AMRMClientAsync<AMRMClient.ContainerRequest> mockResourceManagerClient;
NMClient mockNMClient;

public TestingYarnResourceManager(
TestingYarnResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
ResourceID resourceId,
Expand Down Expand Up @@ -181,11 +183,11 @@ public TestingYarnResourceManager(
this.mockResourceManagerClient = mockResourceManagerClient;
}

public <T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
<T> CompletableFuture<T> runInMainThread(Callable<T> callable) {
return callAsync(callable, TIMEOUT);
}

public MainThreadExecutor getMainThreadExecutorForTesting() {
MainThreadExecutor getMainThreadExecutorForTesting() {
return super.getMainThreadExecutor();
}

Expand Down Expand Up @@ -269,15 +271,15 @@ class Context {
*/
class MockResourceManagerRuntimeServices {

public final ScheduledExecutor scheduledExecutor;
public final TestingHighAvailabilityServices highAvailabilityServices;
public final HeartbeatServices heartbeatServices;
public final MetricRegistry metricRegistry;
public final TestingLeaderElectionService rmLeaderElectionService;
public final JobLeaderIdService jobLeaderIdService;
public final SlotManager slotManager;
private final ScheduledExecutor scheduledExecutor;
private final TestingHighAvailabilityServices highAvailabilityServices;
private final HeartbeatServices heartbeatServices;
private final MetricRegistry metricRegistry;
private final TestingLeaderElectionService rmLeaderElectionService;
private final JobLeaderIdService jobLeaderIdService;
private final SlotManager slotManager;

public UUID rmLeaderSessionId;
private UUID rmLeaderSessionId;

MockResourceManagerRuntimeServices() throws Exception {
scheduledExecutor = mock(ScheduledExecutor.class);
Expand All @@ -295,7 +297,7 @@ class MockResourceManagerRuntimeServices {
Time.minutes(5L));
}

public void grantLeadership() throws Exception {
void grantLeadership() throws Exception {
rmLeaderSessionId = UUID.randomUUID();
rmLeaderElectionService.isLeader(rmLeaderSessionId).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
}
Expand All @@ -304,19 +306,149 @@ public void grantLeadership() throws Exception {
/**
* Start the resource manager and grant leadership to it.
*/
public void startResourceManager() throws Exception {
void startResourceManager() throws Exception {
resourceManager.start();
rmServices.grantLeadership();
}

/**
* Stop the Akka actor system.
*/
public void stopResourceManager() throws Exception {
void stopResourceManager() throws Exception {
rpcService.stopService().get();
}
}

static class TestingContainer extends Container {
private final NodeId nodeId;
private final ContainerId containerId;
private Resource resource;
private Priority priority;

TestingContainer(String host, int port, int containerId) {
this.nodeId = NodeId.newInstance(host, port);
this.containerId = ContainerId.newInstance(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(System.currentTimeMillis(), 1),
1
),
containerId
);
}

@Override
public ContainerId getId() {
return containerId;
}

@Override
public void setId(ContainerId containerId) {

}

@Override
public NodeId getNodeId() {
return nodeId;
}

@Override
public void setNodeId(NodeId nodeId) {

}

@Override
public Resource getResource() {
return resource;
}

@Override
public void setResource(Resource resource) {
this.resource = resource;
}

@Override
public Priority getPriority() {
return priority;
}

@Override
public void setPriority(Priority priority) {
this.priority = priority;
}

@Override
public Token getContainerToken() {
return null;
}

@Override
public void setContainerToken(Token token) {

}

@Override
public void setNodeHttpAddress(String s) {

}

@Override
public String getNodeHttpAddress() {
return null;
}

@Override public int compareTo(@Nonnull Container other) {
return 0;
}
}

static class TestingContainerStatus extends ContainerStatus {
private ContainerId containerId;

TestingContainerStatus() {

}

@Override
public ContainerId getContainerId() {
return containerId;
}

@Override
public void setContainerId(ContainerId containerId) {
this.containerId = containerId;
}

@Override
public ContainerState getState() {
return ContainerState.COMPLETE;
}

@Override
public void setState(ContainerState containerState) {

}

@Override
public int getExitStatus() {
return -1;
}

@Override
public void setExitStatus(int exitStatus) {

}

@Override
public String getDiagnostics() {
return "Test exit";
}

@Override
public void setDiagnostics(String diagnostics) {

}
}

@Test
public void testStopWorker() throws Exception {
new Context() {{
Expand All @@ -332,16 +464,10 @@ public void testStopWorker() throws Exception {
registerSlotRequestFuture.get();

// Callback from YARN when container is allocated.
Container testingContainer = mock(Container.class);
when(testingContainer.getId()).thenReturn(
ContainerId.newInstance(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(System.currentTimeMillis(), 1),
1),
1));
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
Container testingContainer = new TestingContainer("container", 1234, 1);
testingContainer.setResource(Resource.newInstance(200, 1));
testingContainer.setPriority(Priority.UNDEFINED);

resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
Expand Down Expand Up @@ -397,8 +523,8 @@ public void testStopWorker() throws Exception {
stopResourceManager();

// It's now safe to access the SlotManager state since the ResourceManager has been stopped.
assertTrue(rmServices.slotManager.getNumberRegisteredSlots() == 0);
assertTrue(resourceManager.getNumberOfRegisteredTaskManagers().get() == 0);
assertThat(rmServices.slotManager.getNumberRegisteredSlots(), Matchers.equalTo(0));
assertThat(resourceManager.getNumberOfRegisteredTaskManagers().get(), Matchers.equalTo(0));
}};
}

Expand All @@ -421,7 +547,6 @@ public void testDeleteApplicationFiles() throws Exception {
/**
* Tests that YarnResourceManager will not request more containers than needs during
* callback from Yarn when container is Completed.
* @throws Exception
*/
@Test
public void testOnContainerCompleted() throws Exception {
Expand All @@ -436,38 +561,23 @@ public void testOnContainerCompleted() throws Exception {
// wait for the registerSlotRequest completion
registerSlotRequestFuture.get();

ContainerId testContainerId = ContainerId.newInstance(
ApplicationAttemptId.newInstance(
ApplicationId.newInstance(System.currentTimeMillis(), 1),
1),
1);

// Callback from YARN when container is allocated.
Container testingContainer = mock(Container.class);
when(testingContainer.getId()).thenReturn(testContainerId);
when(testingContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
when(testingContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
Container testingContainer = new TestingContainer("container", 1234, 1);
testingContainer.setResource(Resource.newInstance(200, 1));
testingContainer.setPriority(Priority.UNDEFINED);
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));

// Callback from YARN when container is Completed, pending request can not be fulfilled by pending
// containers, need to request new container.
ContainerStatus testingContainerStatus = mock(ContainerStatus.class);
when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
when(testingContainerStatus.getExitStatus()).thenReturn(-1);
ContainerStatus testingContainerStatus = new TestingContainerStatus();
testingContainerStatus.setContainerId(testingContainer.getId());
resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));

// Callback from YARN when container is Completed happened before global fail, pending request
// slot is already fulfilled by pending containers, no need to request new container.
when(testingContainerStatus.getContainerId()).thenReturn(testContainerId);
when(testingContainerStatus.getState()).thenReturn(ContainerState.COMPLETE);
when(testingContainerStatus.getDiagnostics()).thenReturn("Test exit");
when(testingContainerStatus.getExitStatus()).thenReturn(-1);
resourceManager.onContainersCompleted(ImmutableList.of(testingContainerStatus));
verify(mockResourceManagerClient, times(2)).addContainerRequest(any(AMRMClient.ContainerRequest.class));
}};
Expand Down

0 comments on commit 9a93c27

Please sign in to comment.