diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index bbd44188738b4..2aa45b8edfcc1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -588,7 +588,8 @@ Set getPreemptionContainerIds() { } } - boolean canContainerBePreempted(RMContainer container) { + boolean canContainerBePreempted(RMContainer container, + Resource alreadyConsideringForPreemption) { if (!isPreemptable()) { return false; } @@ -610,6 +611,15 @@ boolean canContainerBePreempted(RMContainer container) { // Check if the app's allocation will be over its fairshare even // after preempting this container + Resource usageAfterPreemption = getUsageAfterPreemptingContainer( + container.getAllocatedResource(), + alreadyConsideringForPreemption); + + return !isUsageBelowShare(usageAfterPreemption, getFairShare()); + } + + private Resource getUsageAfterPreemptingContainer(Resource containerResources, + Resource alreadyConsideringForPreemption) { Resource usageAfterPreemption = Resources.clone(getResourceUsage()); // Subtract resources of containers already queued for preemption @@ -617,10 +627,13 @@ boolean canContainerBePreempted(RMContainer container) { Resources.subtractFrom(usageAfterPreemption, resourcesToBePreempted); } - // Subtract this container's allocation to compute usage after preemption - Resources.subtractFrom( - usageAfterPreemption, container.getAllocatedResource()); - return !isUsageBelowShare(usageAfterPreemption, getFairShare()); + // Subtract resources of this container and other containers of this app + // that the FSPreemptionThread is already considering for preemption. + Resources.subtractFrom(usageAfterPreemption, containerResources); + Resources.subtractFrom(usageAfterPreemption, + alreadyConsideringForPreemption); + + return usageAfterPreemption; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java index b3e59c53daea4..47e580d8d145c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSPreemptionThread.java @@ -19,7 +19,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -29,7 +29,9 @@ import org.apache.hadoop.yarn.util.resource.Resources; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.locks.Lock; @@ -130,10 +132,21 @@ private List identifyContainersToPreempt( } } // End of iteration through nodes for one RR - if (bestContainers != null && bestContainers.containers.size() > 0) { - containersToPreempt.addAll(bestContainers.containers); - // Reserve the containers for the starved app - trackPreemptionsAgainstNode(bestContainers.containers, starvedApp); + if (bestContainers != null) { + List containers = bestContainers.getAllContainers(); + if (containers.size() > 0) { + containersToPreempt.addAll(containers); + // Reserve the containers for the starved app + trackPreemptionsAgainstNode(containers, starvedApp); + // Warn application about containers to be killed + for (RMContainer container : containers) { + FSAppAttempt app = scheduler.getSchedulerApp( + container.getApplicationAttemptId()); + LOG.info("Preempting container " + container + + " from queue " + app.getQueueName()); + app.trackContainerForPreemption(container); + } + } } } } // End of iteration over RRs @@ -170,10 +183,12 @@ private PreemptableContainers identifyContainersToPreemptOnNode( for (RMContainer container : containersToCheck) { FSAppAttempt app = scheduler.getSchedulerApp(container.getApplicationAttemptId()); + ApplicationId appId = app.getApplicationId(); - if (app.canContainerBePreempted(container)) { + if (app.canContainerBePreempted(container, + preemptableContainers.getResourcesToPreemptForApp(appId))) { // Flag container for preemption - if (!preemptableContainers.addContainer(container)) { + if (!preemptableContainers.addContainer(container, appId)) { return null; } @@ -199,15 +214,6 @@ private void trackPreemptionsAgainstNode(List containers, } private void preemptContainers(List containers) { - // Warn application about containers to be killed - for (RMContainer container : containers) { - ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); - FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId); - LOG.info("Preempting container " + container + - " from queue " + app.getQueueName()); - app.trackContainerForPreemption(container); - } - // Schedule timer task to kill containers preemptionTimer.schedule( new PreemptContainersTask(containers), warnTimeBeforeKill); @@ -237,14 +243,14 @@ public void run() { * A class to track preemptable containers. */ private static class PreemptableContainers { - List containers; + Map> containersByApp; int numAMContainers; int maxAMContainers; PreemptableContainers(int maxAMContainers) { - containers = new ArrayList<>(); numAMContainers = 0; this.maxAMContainers = maxAMContainers; + this.containersByApp = new HashMap<>(); } /** @@ -254,7 +260,7 @@ private static class PreemptableContainers { * @param container the container to add * @return true if success; false otherwise */ - private boolean addContainer(RMContainer container) { + private boolean addContainer(RMContainer container, ApplicationId appId) { if (container.isAMContainer()) { numAMContainers++; if (numAMContainers >= maxAMContainers) { @@ -262,8 +268,30 @@ private boolean addContainer(RMContainer container) { } } - containers.add(container); + if (!containersByApp.containsKey(appId)) { + containersByApp.put(appId, new ArrayList<>()); + } + + containersByApp.get(appId).add(container); return true; } + + private List getAllContainers() { + List allContainers = new ArrayList<>(); + for (List containersForApp : containersByApp.values()) { + allContainers.addAll(containersForApp); + } + return allContainers; + } + + private Resource getResourcesToPreemptForApp(ApplicationId appId) { + Resource resourcesToPreempt = Resources.createResource(0, 0); + if (containersByApp.containsKey(appId)) { + for (RMContainer container : containersByApp.get(appId)) { + Resources.addTo(resourcesToPreempt, container.getAllocatedResource()); + } + } + return resourcesToPreempt; + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index 31630240e28e0..ac5d9fe7afa4c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -278,11 +278,12 @@ private void submitApps(String queue1, String queue2) preemptHalfResources(queue2); } - private void verifyPreemption(int numStarvedAppContainers) + private void verifyPreemption(int numStarvedAppContainers, + int numGreedyAppContainers) throws InterruptedException { // Sleep long enough for four containers to be preempted. for (int i = 0; i < 1000; i++) { - if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) { + if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) { break; } Thread.sleep(10); @@ -290,12 +291,12 @@ private void verifyPreemption(int numStarvedAppContainers) // Post preemption, verify the greedyApp has the correct # of containers. assertEquals("Incorrect # of containers on the greedy app", - 2 * numStarvedAppContainers, greedyApp.getLiveContainers().size()); + numGreedyAppContainers, greedyApp.getLiveContainers().size()); // Verify the queue metrics are set appropriately. The greedyApp started // with 8 1GB, 1vcore containers. assertEquals("Incorrect # of preempted containers in QueueMetrics", - 8 - 2 * numStarvedAppContainers, + 8 - numGreedyAppContainers, greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers()); // Verify the node is reserved for the starvingApp @@ -340,7 +341,7 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { String queue = "root.preemptable.child-1"; submitApps(queue, queue); if (fairsharePreemption) { - verifyPreemption(2); + verifyPreemption(2, 4); } else { verifyNoPreemption(); } @@ -349,13 +350,13 @@ public void testPreemptionWithinSameLeafQueue() throws Exception { @Test public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception { submitApps("root.preemptable.child-1", "root.preemptable.child-2"); - verifyPreemption(2); + verifyPreemption(2, 4); } @Test public void testPreemptionBetweenNonSiblingQueues() throws Exception { submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1"); - verifyPreemption(2); + verifyPreemption(2, 4); } @Test @@ -389,7 +390,7 @@ public void testPreemptionSelectNonAMContainer() throws Exception { setNumAMContainersPerNode(2); preemptHalfResources("root.preemptable.child-2"); - verifyPreemption(2); + verifyPreemption(2, 4); ArrayList containers = (ArrayList) starvingApp.getLiveContainers(); @@ -401,6 +402,22 @@ public void testPreemptionSelectNonAMContainer() throws Exception { + "nodes.", !host0.equals(host1)); } + @Test + public void testAppNotPreemptedBelowFairShare() throws Exception { + takeAllResources("root.preemptable.child-1"); + tryPreemptMoreThanFairShare("root.preemptable.child-2"); + } + + private void tryPreemptMoreThanFairShare(String queueName) + throws InterruptedException { + ApplicationAttemptId appAttemptId + = createSchedulingRequest(3 * GB, 3, queueName, "default", + NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2); + starvingApp = scheduler.getSchedulerApp(appAttemptId); + + verifyPreemption(1, 5); + } + @Test public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() throws InterruptedException { @@ -414,10 +431,10 @@ public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare() // Submit a job so half the resources go to parent's sibling preemptHalfResources("root.preemptable-sibling"); - verifyPreemption(2); + verifyPreemption(2, 4); // Submit a job to the child's sibling to force preemption from the child preemptHalfResources("root.preemptable.child-2"); - verifyPreemption(1); + verifyPreemption(1, 2); } }