From 7cc4c6f3e5e84efc067f2f2179648e31e5defa27 Mon Sep 17 00:00:00 2001 From: Shuyi Chen Date: Sat, 10 Nov 2018 00:42:49 -0800 Subject: [PATCH] [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success This closes #7078 --- .../test/java/org/apache/flink/yarn/YarnTestBase.java | 2 ++ .../org/apache/flink/yarn/YarnFlinkResourceManager.java | 2 ++ .../java/org/apache/flink/yarn/YarnResourceManager.java | 3 ++- .../apache/flink/yarn/YarnFlinkResourceManagerTest.java | 9 +++++++++ .../org/apache/flink/yarn/YarnResourceManagerTest.java | 4 ++++ 5 files changed, 19 insertions(+), 1 deletion(-) diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java index 1a0520f68fe9a..bcc298073ca8d 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java @@ -167,6 +167,8 @@ public abstract class YarnTestBase extends TestLogger { YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600); YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false); YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster. + YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator", + "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"); // so we have to change the number of cores for testing. YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN). } diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java index 8e686bbbe34c1..3327505e32df5 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java @@ -438,6 +438,8 @@ private void containersAllocated(List containers) { numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); LOG.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); + resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest( + container.getResource(), null, null, container.getPriority())); // decide whether to return the container, or whether to start a TaskManager if (numRegistered + containersInLaunch.size() < numRequired) { diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index ead0ac015a734..9d9d21bc5b895 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -357,7 +357,8 @@ public void onContainersAllocated(List containers) { "Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); - + resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest( + container.getResource(), null, null, container.getPriority())); if (numPendingContainerRequests > 0) { numPendingContainerRequests--; diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java index 10b2ce97d6fe4..d665df6bc7ce0 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -69,8 +71,11 @@ import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; /** @@ -125,6 +130,8 @@ public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Except 1), i)); when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234)); + when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED); containerList.add(mockContainer); } @@ -233,6 +240,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft()); + verify(resourceManagerClient, times(numInitialTaskManagers)).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); assertEquals(numInitialTaskManagers, numberOfRegisteredResources); } finally { if (resourceManager != null) { diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index 58d297d6b0074..8b583306807b2 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -359,6 +359,8 @@ public void testStopWorker() throws Exception { when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockResourceManagerClient).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Remote task executor registers with YarnResourceManager. @@ -465,6 +467,8 @@ public void testOnContainerCompleted() throws Exception { when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED); resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockResourceManagerClient).removeContainerRequest( + any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Callback from YARN when container is Completed, pending request can not be fulfilled by pending