Skip to content

Commit

Permalink
[FLINK-10848][YARN] properly remove YARN ContainerRequest upon contai…
Browse files Browse the repository at this point in the history
…ner allocation success

This closes #7078
  • Loading branch information
Shuyi Chen committed Jan 4, 2019
1 parent c8675b8 commit e26d90f
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 1 deletion.
Expand Up @@ -165,6 +165,8 @@ public abstract class YarnTestBase extends TestLogger {
YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator",
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
// so we have to change the number of cores for testing.
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
}
Expand Down
Expand Up @@ -438,6 +438,8 @@ private void containersAllocated(List<Container> containers) {
numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
LOG.info("Received new container: {} - Remaining pending container requests: {}",
container.getId(), numPendingContainerRequests);
resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
container.getResource(), null, null, container.getPriority()));

// decide whether to return the container, or whether to start a TaskManager
if (numRegistered + containersInLaunch.size() < numRequired) {
Expand Down
Expand Up @@ -361,7 +361,8 @@ public void onContainersAllocated(List<Container> containers) {
"Received new container: {} - Remaining pending container requests: {}",
container.getId(),
numPendingContainerRequests);

resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
container.getResource(), null, null, container.getPriority()));
if (numPendingContainerRequests > 0) {
numPendingContainerRequests--;

Expand Down
Expand Up @@ -43,6 +43,8 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
Expand All @@ -69,8 +71,11 @@
import scala.concurrent.duration.FiniteDuration;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -125,6 +130,8 @@ public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Except
1),
i));
when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
containerList.add(mockContainer);
}

Expand Down Expand Up @@ -233,6 +240,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());

verify(resourceManagerClient, times(numInitialTaskManagers)).removeContainerRequest(
any(AMRMClient.ContainerRequest.class));
assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
} finally {
if (resourceManager != null) {
Expand Down
Expand Up @@ -401,6 +401,8 @@ public void testStopWorker() throws Exception {

resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
verify(mockResourceManagerClient).removeContainerRequest(
any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));

// Remote task executor registers with YarnResourceManager.
Expand Down Expand Up @@ -496,6 +498,8 @@ public void testOnContainerCompleted() throws Exception {

resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
verify(mockResourceManagerClient).removeContainerRequest(
any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));

// Callback from YARN when container is Completed, pending request can not be fulfilled by pending
Expand Down

0 comments on commit e26d90f

Please sign in to comment.