From f1bc7aa626136dd8b3ee036ff9c7ecb24d41b0cb Mon Sep 17 00:00:00 2001 From: singer-bin Date: Mon, 11 Dec 2023 19:02:40 +0800 Subject: [PATCH] YARN-11624 CapacityScheduler: Add configuration to disable AM preemption --- .../CapacitySchedulerPreemptionUtils.java | 21 +++++- .../CapacitySchedulerConfiguration.java | 10 +++ .../capacity/TestCapacityScheduler.java | 70 +++++++++++++++++++ 3 files changed, 100 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java index 99c691c7733b1..ef83bb5277056 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -34,6 +37,9 @@ import java.util.Set; public class CapacitySchedulerPreemptionUtils { + + private boolean enableAMPreemption; + public static Map getResToObtainByPartitionForLeafQueue( CapacitySchedulerPreemptionContext context, String queueName, Resource clusterResource) { @@ -56,12 +62,25 @@ public static Map getResToObtainByPartitionForLeafQueue( return resToObtainByPartition; } + public static boolean getAMPreemptionEnabled(){ + Configuration conf = new Configuration(); + return conf.getBoolean( + CapacitySchedulerConfiguration.AM_PREEMPTION_ENABLED, CapacitySchedulerConfiguration.DEFAULT_AM_PREEMPTION); + } + + @VisibleForTesting + public static void setEnableAMPreemption(boolean enableAMPreemption) { + this.enableAMPreemption = enableAMPreemption; + } + public static boolean isContainerAlreadySelected(RMContainer container, Map> selectedCandidates) { if (null == selectedCandidates) { return false; } - + if(container.isAMContainer() && !getAMPreemptionEnabled()) { + return false; + } Set containers = selectedCandidates .get(container.getApplicationAttemptId()); if (containers == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 5ab237d282a6a..18493aec4e9a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -325,6 +325,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; + @Private + public static final String AM_PREEMPTION_ENABLED = PREFIX + "enabled_am_preemption"; + + @Private + public static final boolean DEFAULT_AM_PREEMPTION = true; + @Private public static final String DEFAULT_APPLICATION_PRIORITY = "default-application-priority"; @@ -1730,6 +1736,10 @@ public boolean getPreemptionDisabled(String queue, boolean defaultVal) { return preemptionDisabled; } + public boolean getAMPreemptionEnabled(){ + return getBoolean(AM_PREEMPTION_ENABLED, DEFAULT_AM_PREEMPTION); + } + /** * Indicates whether intra-queue preemption is disabled on the specified queue * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index ede33b6f38687..ef7250ecabe13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.CapacitySchedulerPreemptionUtils; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocMb; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMaxAllocVcores; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb; @@ -1274,6 +1275,75 @@ null, new RMContainerTokenSecretManager(conf), cs.stop(); } + @Test + public void testDisableAMPreemption() throws Exception{ + Configuration conf = new Configuration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + int CONTAINER_MEMORY = 1024; // start RM + MockRM rm1 = new MockRM(conf); + rm1.start(); + + // get scheduler + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + + // start NM + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // create app and launch the AM + RMApp app0 = MockRMAppSubmitter.submitWithMemory(CONTAINER_MEMORY, rm1); + MockAM am0 = MockRM.launchAM(app0, rm1, nm1); + am0.registerAppAttempt(); + + // get scheduler app + FiCaSchedulerApp schedulerAppAttempt = + cs.getSchedulerApplications().get(app0.getApplicationId()) + .getCurrentAppAttempt(); + + // allocate some containers and launch them + List allocatedContainers = + am0.allocateAndWaitForContainers(3, CONTAINER_MEMORY, nm1); + + // kill the 3 containers + for (Container c : allocatedContainers) { + cs.markContainerForPreemption(am0.getApplicationAttemptId(), schedulerAppAttempt.getRMContainer(c.getId())); } + + // check values + waitForAppPreemptionInfo(app0, + Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3, + Resource.newInstance(CONTAINER_MEMORY * 3, 3), false, 3); + + // kill app0-attempt0 AM container + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0 + .getCurrentAppAttempt().getMasterContainer().getId())); + + // wait for app0 failed + waitForNewAttemptCreated(app0, am0.getApplicationAttemptId()); + + // check values + waitForAppPreemptionInfo(app0, + Resource.newInstance(CONTAINER_MEMORY * 4, 4), 1, 3, + Resource.newInstance(0, 0), false, 0); + + CapacitySchedulerPreemptionUtils.setEnableAMPreemption(false); + + // kill app0-attempt0 AM container + cs.markContainerForKillable(schedulerAppAttempt.getRMContainer(app0 + .getCurrentAppAttempt().getMasterContainer().getId())); + + // wait for app0 failed + waitForNewAttemptCreated(app0, am0.getApplicationAttemptId()); + + // check values + waitForAppPreemptionInfo(app0, + Resource.newInstance(CONTAINER_MEMORY * 3, 3), 0, 3, + Resource.newInstance(0, 0), false, 0); + + } + private void waitContainerAllocated(MockAM am, int mem, int nContainer, int startContainerId, MockRM rm, MockNM nm) throws Exception { for (int cId = startContainerId; cId < startContainerId + nContainer; cId++) {