Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@

package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

Expand All @@ -34,6 +37,9 @@
import java.util.Set;

public class CapacitySchedulerPreemptionUtils {

private boolean enableAMPreemption;

public static Map<String, Resource> getResToObtainByPartitionForLeafQueue(
CapacitySchedulerPreemptionContext context, String queueName,
Resource clusterResource) {
Expand All @@ -56,12 +62,25 @@ public static Map<String, Resource> getResToObtainByPartitionForLeafQueue(
return resToObtainByPartition;
}

public static boolean getAMPreemptionEnabled(){
Configuration conf = new Configuration();
return conf.getBoolean(
CapacitySchedulerConfiguration.AM_PREEMPTION_ENABLED, CapacitySchedulerConfiguration.DEFAULT_AM_PREEMPTION);
}

@VisibleForTesting
public static void setEnableAMPreemption(boolean enableAMPreemption) {
this.enableAMPreemption = enableAMPreemption;
}

public static boolean isContainerAlreadySelected(RMContainer container,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
if (null == selectedCandidates) {
return false;
}

if(container.isAMContainer() && !getAMPreemptionEnabled()) {
return false;
}
Set<RMContainer> containers = selectedCandidates
.get(container.getApplicationAttemptId());
if (containers == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption";

@Private
public static final String AM_PREEMPTION_ENABLED = PREFIX + "enabled_am_preemption";

@Private
public static final boolean DEFAULT_AM_PREEMPTION = true;

@Private
public static final String DEFAULT_APPLICATION_PRIORITY = "default-application-priority";

Expand Down Expand Up @@ -1730,6 +1736,10 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) {
return preemptionDisabled;
}

public boolean getAMPreemptionEnabled(){
return getBoolean(AM_PREEMPTION_ENABLED, DEFAULT_AM_PREEMPTION);
}

/**
* Indicates whether intra-queue preemption is disabled on the specified queue
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;

import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.CapacitySchedulerPreemptionUtils;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb;
Expand Down Expand Up @@ -1274,6 +1275,75 @@ null, new RMContainerTokenSecretManager(conf),
cs.stop();
}

@Test
public void testDisableAMPreemption() throws Exception{
Configuration conf = new Configuration();
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
int CONTAINER_MEMORY = 1024; // start RM
MockRM rm1 = new MockRM(conf);
rm1.start();

// get scheduler
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();

// start NM
MockNM nm1 =
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
nm1.registerNode();

// create app and launch the AM
RMApp app0 = MockRMAppSubmitter.submitWithMemory(CONTAINER_MEMORY, rm1);
MockAM am0 = MockRM.launchAM(app0, rm1, nm1);
am0.registerAppAttempt();

// get scheduler app
FiCaSchedulerApp schedulerAppAttempt =
cs.getSchedulerApplications().get(app0.getApplicationId())
.getCurrentAppAttempt();

// allocate some containers and launch them
List<Container> allocatedContainers =
am0.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1);

// kill the 3 containers
for (Container c : allocatedContainers) {
cs.markContainerForPreemption(am0.getApplicationAttemptId(), schedulerAppAttempt.getRMContainer(c.getId())); }

// check values
waitForAppPreemptionInfo(app0,
Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3,
Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3);

// kill app0-attempt0 AM container
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0
.getCurrentAppAttempt().getMasterContainer().getId()));

// wait for app0 failed
waitForNewAttemptCreated(app0, am0.getApplicationAttemptId());

// check values
waitForAppPreemptionInfo(app0,
Resource.newInstance(CONTAINER_MEMORY * 4, 4), 1, 3,
Resource.newInstance(0, 0), false, 0);

CapacitySchedulerPreemptionUtils.setEnableAMPreemption(false);

// kill app0-attempt0 AM container
cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0
.getCurrentAppAttempt().getMasterContainer().getId()));

// wait for app0 failed
waitForNewAttemptCreated(app0, am0.getApplicationAttemptId());

// check values
waitForAppPreemptionInfo(app0,
Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3,
Resource.newInstance(0, 0), false, 0);

}

private void waitContainerAllocated(MockAM am, int mem, int nContainer,
int startContainerId, MockRM rm, MockNM nm) throws Exception {
for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) {
Expand Down