From 58aa78c44811e480b6621fade496231d48ce92a5 Mon Sep 17 00:00:00 2001 From: liubin04 Date: Thu, 2 Jan 2025 10:34:25 +0800 Subject: [PATCH 1/3] Initialize queues in parallel --- .../dev-support/findbugs-exclude.xml | 5 + .../scheduler/QueueMetrics.java | 5 +- .../capacity/AbstractParentQueue.java | 127 +++++++++---- .../CapacitySchedulerConfiguration.java | 19 ++ .../CapacitySchedulerQueueManager.java | 178 +++++++++++++++++- .../capacity/TestCapacitySchedulerQueues.java | 20 ++ .../scheduler/capacity/TestLeafQueue.java | 20 ++ 7 files changed, 331 insertions(+), 43 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index b98ac0bd921b2..9e47d7ced1827 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -633,6 +633,11 @@ + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 11672439c6153..4ac585db55746 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -196,7 +197,7 @@ public QueueMetrics(MetricsSystem ms, String queueName, Queue parent, this.parent = parent != null ? parent.getMetrics() : null; this.parentQueue = parent; - this.users = enableUserMetrics ? new HashMap() : null; + this.users = enableUserMetrics ? new ConcurrentHashMap() : null; this.enableUserMetrics = enableUserMetrics; metricsSystem = ms; @@ -253,7 +254,7 @@ public synchronized static void clearQueueMetrics() { * Simple metrics cache to help prevent re-registrations. */ private static final Map QUEUE_METRICS = - new HashMap(); + new ConcurrentHashMap(); /** * Returns the metrics cache to help prevent re-registrations. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java index 87333bf50a143..d052c4ca95ab2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.LockSupport; @@ -40,6 +42,7 @@ import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -557,6 +560,7 @@ public boolean isEligibleForAutoQueueCreation() { public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { writeLock.lock(); + ExecutorService executor = null; try { // We skip reinitialize for dynamic queues, when this is called, and // new queue is different from this queue, we will make this queue to be @@ -573,6 +577,12 @@ public void reinitialize(CSQueue newlyParsedQueue, + newlyParsedQueue.getQueuePath()); } + CapacitySchedulerConfiguration conf = queueContext.getConfiguration(); + boolean initializeQueuesParallel = conf.getBoolean( + CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_ENABLE, + CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_ENABLE + ); + AbstractParentQueue newlyParsedParentQueue = (AbstractParentQueue) newlyParsedQueue; // Set new configs @@ -595,46 +605,35 @@ public void reinitialize(CSQueue newlyParsedQueue, } } - for (Map.Entry e : newChildQueues.entrySet()) { - String newChildQueueName = e.getKey(); - CSQueue newChildQueue = e.getValue(); - - CSQueue childQueue = currentChildQueues.get(newChildQueueName); - - // Check if the child-queue already exists - if (childQueue != null) { - // Check if the child-queue has been converted into parent queue or - // parent Queue has been converted to child queue. The CS has already - // checked to ensure that this child-queue is in STOPPED state if - // Child queue has been converted to ParentQueue. - if ((childQueue instanceof AbstractLeafQueue - && newChildQueue instanceof AbstractParentQueue) - || (childQueue instanceof AbstractParentQueue - && newChildQueue instanceof AbstractLeafQueue)) { - // We would convert this LeafQueue to ParentQueue, or vice versa. - // consider this as the combination of DELETE then ADD. - newChildQueue.setParent(this); - currentChildQueues.put(newChildQueueName, newChildQueue); - // inform CapacitySchedulerQueueManager - CapacitySchedulerQueueManager queueManager = - queueContext.getQueueManager(); - queueManager.addQueue(newChildQueueName, newChildQueue); - continue; - } - // Re-init existing queues - childQueue.reinitialize(newChildQueue, clusterResource); - LOG.info(getQueuePath() + ": re-configured queue: " + childQueue); - } else{ - // New child queue, do not re-init - - // Set parent to 'this' - newChildQueue.setParent(this); - - // Save in list of current child queues - currentChildQueues.put(newChildQueueName, newChildQueue); - - LOG.info( - getQueuePath() + ": added new child queue: " + newChildQueue); + if (initializeQueuesParallel) { + LOG.info("Reinitialize queues in parallel"); + int initializeQueuesParallelism = conf.getInt( + CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD, + CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD); + initializeQueuesParallelism = Math.max(initializeQueuesParallelism, 1); + + executor = HadoopExecutors.newFixedThreadPool(initializeQueuesParallelism); + CountDownLatch allDone = new CountDownLatch(newChildQueues.size()); + for (Map.Entry e : newChildQueues.entrySet()) { + executor.submit(() -> { + try { + reinitializeChildQueues(e, currentChildQueues, clusterResource); + } catch (IOException ex) { + throw new RuntimeException(ex); + } finally { + allDone.countDown(); + } + }); + } + try { + allDone.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } else { + LOG.info("Reinitialize queues serially"); + for (Map.Entry e : newChildQueues.entrySet()) { + reinitializeChildQueues(e, currentChildQueues, clusterResource); } } @@ -658,10 +657,58 @@ public void reinitialize(CSQueue newlyParsedQueue, // Make sure we notifies QueueOrderingPolicy queueOrderingPolicy.setQueues(childQueues); } finally { + if (executor != null) { + executor.shutdownNow(); + } writeLock.unlock(); } } + private void reinitializeChildQueues(Map.Entry e, + Map currentChildQueues, + Resource clusterResource) throws IOException { + String newChildQueueName = e.getKey(); + CSQueue newChildQueue = e.getValue(); + + CSQueue childQueue = currentChildQueues.get(newChildQueueName); + + // Check if the child-queue already exists + if (childQueue != null) { + // Check if the child-queue has been converted into parent queue or + // parent Queue has been converted to child queue. The CS has already + // checked to ensure that this child-queue is in STOPPED state if + // Child queue has been converted to ParentQueue. + if ((childQueue instanceof AbstractLeafQueue + && newChildQueue instanceof AbstractParentQueue) + || (childQueue instanceof AbstractParentQueue + && newChildQueue instanceof AbstractLeafQueue)) { + // We would convert this LeafQueue to ParentQueue, or vice versa. + // consider this as the combination of DELETE then ADD. + newChildQueue.setParent(this); + currentChildQueues.put(newChildQueueName, newChildQueue); + // inform CapacitySchedulerQueueManager + CapacitySchedulerQueueManager queueManager = + queueContext.getQueueManager(); + queueManager.addQueue(newChildQueueName, newChildQueue); + return; + } + // Re-init existing queues + childQueue.reinitialize(newChildQueue, clusterResource); + LOG.info(getQueuePath() + ": re-configured queue: " + childQueue); + } else { + // New child queue, do not re-init + + // Set parent to 'this' + newChildQueue.setParent(this); + + // Save in list of current child queues + currentChildQueues.put(newChildQueueName, newChildQueue); + + LOG.info( + getQueuePath() + ": added new child queue: " + newChildQueue); + } + } + private Map getQueuesMap(List queues) { Map queuesMap = new HashMap(); for (CSQueue queue : queues) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index ea5c892ce3e5b..78516d834d708 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -301,6 +301,25 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = true; + @Private + public static final String INITIALIZE_QUEUES_PARALLEL_PREFIX = + PREFIX + "initialize-queues-parallel"; + + @Private + public static final String INITIALIZE_QUEUES_PARALLEL_ENABLE = + INITIALIZE_QUEUES_PARALLEL_PREFIX + ".enable"; + + @Private + public static final boolean DEFAULT_INITIALIZE_QUEUES_PARALLEL_ENABLE = false; + + @Private + public static final String INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD = + INITIALIZE_QUEUES_PARALLEL_PREFIX + ".maximum-threads"; + + @Private + public static final Integer + DEFAULT_INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD = 1; + @Private public static final String QUEUE_MAPPING = PREFIX + "queue-mappings"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index bf9df5b694f1f..396bbba9e71d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -19,12 +19,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveTask; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.yarn.security.PrivilegedEntity; @@ -237,6 +240,40 @@ static CSQueue parseQueue( CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues, QueueHook hook) throws IOException { CSQueue queue; + boolean initializeQueuesParallel = conf.getBoolean( + CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_ENABLE, + CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_ENABLE + ); + + if (initializeQueuesParallel) { + LOG.info("Initialize queues in parallel"); + queue = parallelParseQueue(queueContext, conf, parent, queueName, + newQueues, oldQueues, hook); + } else { + LOG.info("Initialize queues serially"); + queue = serialParseQueue(queueContext, conf, parent, queueName, + newQueues, oldQueues, hook); + } + return queue; + } + + /** + * Parse the queue from the configuration serially. + * @param queueContext the CapacitySchedulerQueueContext + * @param conf the CapacitySchedulerConfiguration + * @param parent the parent queue + * @param queueName the queue name + * @param newQueues all the queues + * @param oldQueues the old queues + * @param hook the queue hook + * @return the CSQueue + * @throws IOException + */ + static CSQueue serialParseQueue( + CapacitySchedulerQueueContext queueContext, CapacitySchedulerConfiguration conf, + CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues, + QueueHook hook) throws IOException { + CSQueue queue; QueuePath queueToParse = (parent == null) ? new QueuePath(queueName) : (QueuePath.createFromQueues(parent.getQueuePath(), queueName)); List childQueueNames = conf.getQueues(queueToParse); @@ -283,7 +320,7 @@ static CSQueue parseQueue( queue = hook.hook(parentQueue); List childQueues = new ArrayList<>(); for (String childQueueName : childQueueNames) { - CSQueue childQueue = parseQueue(queueContext, conf, queue, childQueueName, newQueues, + CSQueue childQueue = serialParseQueue(queueContext, conf, queue, childQueueName, newQueues, oldQueues, hook); childQueues.add(childQueue); } @@ -300,6 +337,45 @@ static CSQueue parseQueue( return queue; } + /** + * Parse the queue from the configuration in parallel. + * @param queueContext the CapacitySchedulerQueueContext + * @param conf the CapacitySchedulerConfiguration + * @param parent the parent queue + * @param queueName the queue name + * @param newQueues all the queues + * @param oldQueues the old queues + * @param hook the queue hook + * @return the CSQueue + * @throws IOException + */ + static CSQueue parallelParseQueue( + CapacitySchedulerQueueContext queueContext, CapacitySchedulerConfiguration conf, + CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues, + QueueHook hook) throws IOException { + CSQueue queue; + ForkJoinPool initializeQueuesThreadPool = null; + try { + int initializeQueuesParallelism = conf.getInt( + CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD, + CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD); + initializeQueuesParallelism = Math.max(initializeQueuesParallelism, 1); + + initializeQueuesThreadPool = new ForkJoinPool(initializeQueuesParallelism); + ParseQueueTask parseQueueTask = new ParseQueueTask(queueContext, conf, parent, + queueName, newQueues, oldQueues, hook); + + queue = initializeQueuesThreadPool.invoke(parseQueueTask); + } catch (RuntimeException e) { + throw new IOException("Exception occurred during parse queue", e); + } finally { + if (initializeQueuesThreadPool != null) { + initializeQueuesThreadPool.shutdownNow(); + } + } + return queue; + } + /** * Updates to our list of queues: Adds the new queues and deletes the removed * ones... be careful, do not overwrite existing queues. @@ -742,4 +818,104 @@ private static void validateParent(CSQueue parent, String queueName) { + queueName); } } + + static class ParseQueueTask extends RecursiveTask implements Serializable { + private static final long serialVersionUID = 1L; + + private transient CapacitySchedulerQueueContext queueContext; + private transient CapacitySchedulerConfiguration conf; + private transient CSQueue parent; + private String queueName; + private transient CSQueueStore newQueues; + private transient CSQueueStore oldQueues; + private transient QueueHook hook; + + ParseQueueTask( + CapacitySchedulerQueueContext queueContext, CapacitySchedulerConfiguration conf, + CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues, + QueueHook hook) { + this.queueContext = queueContext; + this.conf = conf; + this.parent = parent; + this.queueName = queueName; + this.newQueues = newQueues; + this.oldQueues = oldQueues; + this.hook = hook; + } + + @Override + protected CSQueue compute() { + CSQueue queue; + try { + QueuePath queueToParse = (parent == null) ? new QueuePath(queueName) : + (QueuePath.createFromQueues(parent.getQueuePath(), queueName)); + List childQueueNames = conf.getQueues(queueToParse); + CSQueue oldQueue = oldQueues.get(queueToParse.getFullPath()); + + boolean isReservableQueue = conf.isReservable(queueToParse); + boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(queueToParse); + // if a queue is eligible for auto queue creation v2 it must be a ParentQueue + // (even if it is empty) + final boolean isDynamicParent = oldQueue instanceof AbstractParentQueue && + oldQueue.isDynamicQueue(); + boolean isAutoQueueCreationEnabledParent = isDynamicParent || conf.isAutoQueueCreationV2Enabled( + queueToParse) || isAutoCreateEnabled; + + if (childQueueNames.size() == 0 && !isAutoQueueCreationEnabledParent) { + validateParent(parent, queueName); + // Check if the queue will be dynamically managed by the Reservation system + if (isReservableQueue) { + queue = new PlanQueue(queueContext, queueName, parent, + oldQueues.get(queueToParse.getFullPath())); + ReservationQueue defaultResQueue = ((PlanQueue) queue).initializeDefaultInternalQueue(); + newQueues.add(defaultResQueue); + } else { + queue = new LeafQueue(queueContext, queueName, parent, + oldQueues.get(queueToParse.getFullPath())); + } + + queue = hook.hook(queue); + } else { + if (isReservableQueue) { + throw new IllegalStateException("Only Leaf Queues can be reservable for " + + queueToParse.getFullPath()); + } + + AbstractParentQueue parentQueue; + if (isAutoCreateEnabled) { + parentQueue = new ManagedParentQueue(queueContext, queueName, parent, oldQueues.get( + queueToParse.getFullPath())); + } else { + parentQueue = new ParentQueue(queueContext, queueName, parent, oldQueues.get( + queueToParse.getFullPath())); + } + + queue = hook.hook(parentQueue); + List parseQueueTasks = new ArrayList<>(); + List childQueues = new ArrayList<>(); + for (String childQueueName : childQueueNames) { + ParseQueueTask parseQueueTask = new ParseQueueTask(queueContext, conf, queue, + childQueueName, newQueues, oldQueues, hook); + parseQueueTask.fork(); + parseQueueTasks.add(parseQueueTask); + } + + for (ParseQueueTask parseQueueTask : parseQueueTasks) { + childQueues.add(parseQueueTask.join()); + } + if (!childQueues.isEmpty()) { + parentQueue.setChildQueues(childQueues); + } + + } + + newQueues.add(queue); + + LOG.info("Initialized queue: " + queueToParse.getFullPath()); + } catch (IOException e) { + throw new RuntimeException("Exception occurred during parse queue", e); + } + return queue; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java index 2f445e636c8dc..b07f35e44f435 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java @@ -160,6 +160,26 @@ public void testRefreshQueues() throws Exception { cs.stop(); } + @Test + public void testParallelRefreshQueues() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + conf.setBoolean(CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_ENABLE, true); + conf.setInt(CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD, 10); + setupQueueConfiguration(conf); + cs.setConf(new YarnConfiguration()); + cs.setRMContext(rm.getRMContext()); + cs.init(conf); + cs.start(); + cs.reinitialize(conf, rm.getRMContext()); + checkQueueStructureCapacities(cs); + + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + cs.reinitialize(conf, rm.getRMContext()); + checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f)); + cs.stop(); + } + @Test public void testRefreshQueuesWithNewQueue() throws Exception { CapacityScheduler cs = new CapacityScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 870d61539b02b..5d990f10abb36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -3226,6 +3226,26 @@ public void testLocalityScheduling() throws Exception { assertEquals(numNodes+1, app_0.getSchedulingOpportunities(schedulerKey)); } + @Test + public void testParallelInitializeQueues() throws Exception { + + CSQueueStore serialQueues = new CSQueueStore(); + CapacitySchedulerQueueManager.parseQueue(queueContext, + csConf, null, ROOT.getFullPath(), serialQueues, queues, + TestUtils.spyHook); + + csConf.setBoolean(CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_ENABLE, true); + csConf.setInt(CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD, 10); + + CSQueueStore parallelQueues = new CSQueueStore(); + CapacitySchedulerQueueManager.parseQueue(queueContext, + csConf, null, ROOT.getFullPath(), parallelQueues, queues, + TestUtils.spyHook); + + assertEquals(serialQueues.getFullNameQueues().keySet(), + parallelQueues.getFullNameQueues().keySet()); + } + @Test public void testRackLocalityDelayScheduling() throws Exception { From 2f96042c3b6dd7a568d3f1044f0c1ad0a5d620c1 Mon Sep 17 00:00:00 2001 From: liubin04 Date: Fri, 3 Jan 2025 14:54:49 +0800 Subject: [PATCH 2/3] fix bugs --- .../hadoop-yarn/dev-support/findbugs-exclude.xml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 9e47d7ced1827..b98ac0bd921b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -633,11 +633,6 @@ - - - - - From 45f2e7ca3f2a9c65858e688fd31f43afa3631075 Mon Sep 17 00:00:00 2001 From: liubin04 Date: Tue, 7 Jan 2025 18:15:50 +0800 Subject: [PATCH 3/3] fix bugs --- .../hadoop-yarn/dev-support/findbugs-exclude.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index b98ac0bd921b2..9e47d7ced1827 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -633,6 +633,11 @@ + + + + +