Skip to content

Commit

Permalink
YARN-7290. Method canContainerBePreempted can return true when it sho…
Browse files Browse the repository at this point in the history
…uldn't. (Contributed by Steven Rand)

(cherry picked from commit 2bde3ae)
(cherry picked from commit f335d50)
  • Loading branch information
flyrain authored and umbrant committed Dec 8, 2017
1 parent 0510cee commit c25427c
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 35 deletions.
Expand Up @@ -588,7 +588,8 @@ Set<ContainerId> getPreemptionContainerIds() {
}
}

boolean canContainerBePreempted(RMContainer container) {
boolean canContainerBePreempted(RMContainer container,
Resource alreadyConsideringForPreemption) {
if (!isPreemptable()) {
return false;
}
Expand All @@ -610,17 +611,29 @@ boolean canContainerBePreempted(RMContainer container) {

// Check if the app's allocation will be over its fairshare even
// after preempting this container
Resource usageAfterPreemption = getUsageAfterPreemptingContainer(
container.getAllocatedResource(),
alreadyConsideringForPreemption);

return !isUsageBelowShare(usageAfterPreemption, getFairShare());
}

private Resource getUsageAfterPreemptingContainer(Resource containerResources,
Resource alreadyConsideringForPreemption) {
Resource usageAfterPreemption = Resources.clone(getResourceUsage());

// Subtract resources of containers already queued for preemption
synchronized (preemptionVariablesLock) {
Resources.subtractFrom(usageAfterPreemption, resourcesToBePreempted);
}

// Subtract this container's allocation to compute usage after preemption
Resources.subtractFrom(
usageAfterPreemption, container.getAllocatedResource());
return !isUsageBelowShare(usageAfterPreemption, getFairShare());
// Subtract resources of this container and other containers of this app
// that the FSPreemptionThread is already considering for preemption.
Resources.subtractFrom(usageAfterPreemption, containerResources);
Resources.subtractFrom(usageAfterPreemption,
alreadyConsideringForPreemption);

return usageAfterPreemption;
}

/**
Expand Down
Expand Up @@ -19,7 +19,7 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
Expand All @@ -29,7 +29,9 @@
import org.apache.hadoop.yarn.util.resource.Resources;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
Expand Down Expand Up @@ -130,10 +132,21 @@ private List<RMContainer> identifyContainersToPreempt(
}
} // End of iteration through nodes for one RR

if (bestContainers != null && bestContainers.containers.size() > 0) {
containersToPreempt.addAll(bestContainers.containers);
// Reserve the containers for the starved app
trackPreemptionsAgainstNode(bestContainers.containers, starvedApp);
if (bestContainers != null) {
List<RMContainer> containers = bestContainers.getAllContainers();
if (containers.size() > 0) {
containersToPreempt.addAll(containers);
// Reserve the containers for the starved app
trackPreemptionsAgainstNode(containers, starvedApp);
// Warn application about containers to be killed
for (RMContainer container : containers) {
FSAppAttempt app = scheduler.getSchedulerApp(
container.getApplicationAttemptId());
LOG.info("Preempting container " + container +
" from queue " + app.getQueueName());
app.trackContainerForPreemption(container);
}
}
}
}
} // End of iteration over RRs
Expand Down Expand Up @@ -170,10 +183,12 @@ private PreemptableContainers identifyContainersToPreemptOnNode(
for (RMContainer container : containersToCheck) {
FSAppAttempt app =
scheduler.getSchedulerApp(container.getApplicationAttemptId());
ApplicationId appId = app.getApplicationId();

if (app.canContainerBePreempted(container)) {
if (app.canContainerBePreempted(container,
preemptableContainers.getResourcesToPreemptForApp(appId))) {
// Flag container for preemption
if (!preemptableContainers.addContainer(container)) {
if (!preemptableContainers.addContainer(container, appId)) {
return null;
}

Expand All @@ -199,15 +214,6 @@ private void trackPreemptionsAgainstNode(List<RMContainer> containers,
}

private void preemptContainers(List<RMContainer> containers) {
// Warn application about containers to be killed
for (RMContainer container : containers) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = scheduler.getSchedulerApp(appAttemptId);
LOG.info("Preempting container " + container +
" from queue " + app.getQueueName());
app.trackContainerForPreemption(container);
}

// Schedule timer task to kill containers
preemptionTimer.schedule(
new PreemptContainersTask(containers), warnTimeBeforeKill);
Expand Down Expand Up @@ -237,14 +243,14 @@ public void run() {
* A class to track preemptable containers.
*/
private static class PreemptableContainers {
List<RMContainer> containers;
Map<ApplicationId, List<RMContainer>> containersByApp;
int numAMContainers;
int maxAMContainers;

PreemptableContainers(int maxAMContainers) {
containers = new ArrayList<>();
numAMContainers = 0;
this.maxAMContainers = maxAMContainers;
this.containersByApp = new HashMap<>();
}

/**
Expand All @@ -254,16 +260,38 @@ private static class PreemptableContainers {
* @param container the container to add
* @return true if success; false otherwise
*/
private boolean addContainer(RMContainer container) {
private boolean addContainer(RMContainer container, ApplicationId appId) {
if (container.isAMContainer()) {
numAMContainers++;
if (numAMContainers >= maxAMContainers) {
return false;
}
}

containers.add(container);
if (!containersByApp.containsKey(appId)) {
containersByApp.put(appId, new ArrayList<>());
}

containersByApp.get(appId).add(container);
return true;
}

private List<RMContainer> getAllContainers() {
List<RMContainer> allContainers = new ArrayList<>();
for (List<RMContainer> containersForApp : containersByApp.values()) {
allContainers.addAll(containersForApp);
}
return allContainers;
}

private Resource getResourcesToPreemptForApp(ApplicationId appId) {
Resource resourcesToPreempt = Resources.createResource(0, 0);
if (containersByApp.containsKey(appId)) {
for (RMContainer container : containersByApp.get(appId)) {
Resources.addTo(resourcesToPreempt, container.getAllocatedResource());
}
}
return resourcesToPreempt;
}
}
}
Expand Up @@ -278,24 +278,25 @@ private void submitApps(String queue1, String queue2)
preemptHalfResources(queue2);
}

private void verifyPreemption(int numStarvedAppContainers)
private void verifyPreemption(int numStarvedAppContainers,
int numGreedyAppContainers)
throws InterruptedException {
// Sleep long enough for four containers to be preempted.
for (int i = 0; i < 1000; i++) {
if (greedyApp.getLiveContainers().size() == 2 * numStarvedAppContainers) {
if (greedyApp.getLiveContainers().size() == numGreedyAppContainers) {
break;
}
Thread.sleep(10);
}

// Post preemption, verify the greedyApp has the correct # of containers.
assertEquals("Incorrect # of containers on the greedy app",
2 * numStarvedAppContainers, greedyApp.getLiveContainers().size());
numGreedyAppContainers, greedyApp.getLiveContainers().size());

// Verify the queue metrics are set appropriately. The greedyApp started
// with 8 1GB, 1vcore containers.
assertEquals("Incorrect # of preempted containers in QueueMetrics",
8 - 2 * numStarvedAppContainers,
8 - numGreedyAppContainers,
greedyApp.getQueue().getMetrics().getAggregatePreemptedContainers());

// Verify the node is reserved for the starvingApp
Expand Down Expand Up @@ -340,7 +341,7 @@ public void testPreemptionWithinSameLeafQueue() throws Exception {
String queue = "root.preemptable.child-1";
submitApps(queue, queue);
if (fairsharePreemption) {
verifyPreemption(2);
verifyPreemption(2, 4);
} else {
verifyNoPreemption();
}
Expand All @@ -349,13 +350,13 @@ public void testPreemptionWithinSameLeafQueue() throws Exception {
@Test
public void testPreemptionBetweenTwoSiblingLeafQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.preemptable.child-2");
verifyPreemption(2);
verifyPreemption(2, 4);
}

@Test
public void testPreemptionBetweenNonSiblingQueues() throws Exception {
submitApps("root.preemptable.child-1", "root.nonpreemptable.child-1");
verifyPreemption(2);
verifyPreemption(2, 4);
}

@Test
Expand Down Expand Up @@ -389,7 +390,7 @@ public void testPreemptionSelectNonAMContainer() throws Exception {
setNumAMContainersPerNode(2);
preemptHalfResources("root.preemptable.child-2");

verifyPreemption(2);
verifyPreemption(2, 4);

ArrayList<RMContainer> containers =
(ArrayList<RMContainer>) starvingApp.getLiveContainers();
Expand All @@ -401,6 +402,22 @@ public void testPreemptionSelectNonAMContainer() throws Exception {
+ "nodes.", !host0.equals(host1));
}

@Test
public void testAppNotPreemptedBelowFairShare() throws Exception {
takeAllResources("root.preemptable.child-1");
tryPreemptMoreThanFairShare("root.preemptable.child-2");
}

private void tryPreemptMoreThanFairShare(String queueName)
throws InterruptedException {
ApplicationAttemptId appAttemptId
= createSchedulingRequest(3 * GB, 3, queueName, "default",
NODE_CAPACITY_MULTIPLE * rmNodes.size() / 2);
starvingApp = scheduler.getSchedulerApp(appAttemptId);

verifyPreemption(1, 5);
}

@Test
public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()
throws InterruptedException {
Expand All @@ -414,10 +431,10 @@ public void testPreemptionBetweenSiblingQueuesWithParentAtFairShare()

// Submit a job so half the resources go to parent's sibling
preemptHalfResources("root.preemptable-sibling");
verifyPreemption(2);
verifyPreemption(2, 4);

// Submit a job to the child's sibling to force preemption from the child
preemptHalfResources("root.preemptable.child-2");
verifyPreemption(1);
verifyPreemption(1, 2);
}
}

0 comments on commit c25427c

Please sign in to comment.