diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index b98ac0bd921b2..9e47d7ced1827 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -633,6 +633,11 @@
+
+
+
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 11672439c6153..4ac585db55746 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -24,6 +24,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -196,7 +197,7 @@ public QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
this.parent = parent != null ? parent.getMetrics() : null;
this.parentQueue = parent;
- this.users = enableUserMetrics ? new HashMap() : null;
+ this.users = enableUserMetrics ? new ConcurrentHashMap() : null;
this.enableUserMetrics = enableUserMetrics;
metricsSystem = ms;
@@ -253,7 +254,7 @@ public synchronized static void clearQueueMetrics() {
* Simple metrics cache to help prevent re-registrations.
*/
private static final Map QUEUE_METRICS =
- new HashMap();
+ new ConcurrentHashMap();
/**
* Returns the metrics cache to help prevent re-registrations.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
index 87333bf50a143..d052c4ca95ab2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractParentQueue.java
@@ -26,6 +26,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
@@ -40,6 +42,7 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -557,6 +560,7 @@ public boolean isEligibleForAutoQueueCreation() {
public void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
writeLock.lock();
+ ExecutorService executor = null;
try {
// We skip reinitialize for dynamic queues, when this is called, and
// new queue is different from this queue, we will make this queue to be
@@ -573,6 +577,12 @@ public void reinitialize(CSQueue newlyParsedQueue,
+ newlyParsedQueue.getQueuePath());
}
+ CapacitySchedulerConfiguration conf = queueContext.getConfiguration();
+ boolean initializeQueuesParallel = conf.getBoolean(
+ CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_ENABLE,
+ CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_ENABLE
+ );
+
AbstractParentQueue newlyParsedParentQueue = (AbstractParentQueue) newlyParsedQueue;
// Set new configs
@@ -595,46 +605,35 @@ public void reinitialize(CSQueue newlyParsedQueue,
}
}
- for (Map.Entry e : newChildQueues.entrySet()) {
- String newChildQueueName = e.getKey();
- CSQueue newChildQueue = e.getValue();
-
- CSQueue childQueue = currentChildQueues.get(newChildQueueName);
-
- // Check if the child-queue already exists
- if (childQueue != null) {
- // Check if the child-queue has been converted into parent queue or
- // parent Queue has been converted to child queue. The CS has already
- // checked to ensure that this child-queue is in STOPPED state if
- // Child queue has been converted to ParentQueue.
- if ((childQueue instanceof AbstractLeafQueue
- && newChildQueue instanceof AbstractParentQueue)
- || (childQueue instanceof AbstractParentQueue
- && newChildQueue instanceof AbstractLeafQueue)) {
- // We would convert this LeafQueue to ParentQueue, or vice versa.
- // consider this as the combination of DELETE then ADD.
- newChildQueue.setParent(this);
- currentChildQueues.put(newChildQueueName, newChildQueue);
- // inform CapacitySchedulerQueueManager
- CapacitySchedulerQueueManager queueManager =
- queueContext.getQueueManager();
- queueManager.addQueue(newChildQueueName, newChildQueue);
- continue;
- }
- // Re-init existing queues
- childQueue.reinitialize(newChildQueue, clusterResource);
- LOG.info(getQueuePath() + ": re-configured queue: " + childQueue);
- } else{
- // New child queue, do not re-init
-
- // Set parent to 'this'
- newChildQueue.setParent(this);
-
- // Save in list of current child queues
- currentChildQueues.put(newChildQueueName, newChildQueue);
-
- LOG.info(
- getQueuePath() + ": added new child queue: " + newChildQueue);
+ if (initializeQueuesParallel) {
+ LOG.info("Reinitialize queues in parallel");
+ int initializeQueuesParallelism = conf.getInt(
+ CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD,
+ CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD);
+ initializeQueuesParallelism = Math.max(initializeQueuesParallelism, 1);
+
+ executor = HadoopExecutors.newFixedThreadPool(initializeQueuesParallelism);
+ CountDownLatch allDone = new CountDownLatch(newChildQueues.size());
+ for (Map.Entry e : newChildQueues.entrySet()) {
+ executor.submit(() -> {
+ try {
+ reinitializeChildQueues(e, currentChildQueues, clusterResource);
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ } finally {
+ allDone.countDown();
+ }
+ });
+ }
+ try {
+ allDone.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ LOG.info("Reinitialize queues serially");
+ for (Map.Entry e : newChildQueues.entrySet()) {
+ reinitializeChildQueues(e, currentChildQueues, clusterResource);
}
}
@@ -658,10 +657,58 @@ public void reinitialize(CSQueue newlyParsedQueue,
// Make sure we notifies QueueOrderingPolicy
queueOrderingPolicy.setQueues(childQueues);
} finally {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
writeLock.unlock();
}
}
+ private void reinitializeChildQueues(Map.Entry e,
+ Map currentChildQueues,
+ Resource clusterResource) throws IOException {
+ String newChildQueueName = e.getKey();
+ CSQueue newChildQueue = e.getValue();
+
+ CSQueue childQueue = currentChildQueues.get(newChildQueueName);
+
+ // Check if the child-queue already exists
+ if (childQueue != null) {
+ // Check if the child-queue has been converted into parent queue or
+ // parent Queue has been converted to child queue. The CS has already
+ // checked to ensure that this child-queue is in STOPPED state if
+ // Child queue has been converted to ParentQueue.
+ if ((childQueue instanceof AbstractLeafQueue
+ && newChildQueue instanceof AbstractParentQueue)
+ || (childQueue instanceof AbstractParentQueue
+ && newChildQueue instanceof AbstractLeafQueue)) {
+ // We would convert this LeafQueue to ParentQueue, or vice versa.
+ // consider this as the combination of DELETE then ADD.
+ newChildQueue.setParent(this);
+ currentChildQueues.put(newChildQueueName, newChildQueue);
+ // inform CapacitySchedulerQueueManager
+ CapacitySchedulerQueueManager queueManager =
+ queueContext.getQueueManager();
+ queueManager.addQueue(newChildQueueName, newChildQueue);
+ return;
+ }
+ // Re-init existing queues
+ childQueue.reinitialize(newChildQueue, clusterResource);
+ LOG.info(getQueuePath() + ": re-configured queue: " + childQueue);
+ } else {
+ // New child queue, do not re-init
+
+ // Set parent to 'this'
+ newChildQueue.setParent(this);
+
+ // Save in list of current child queues
+ currentChildQueues.put(newChildQueueName, newChildQueue);
+
+ LOG.info(
+ getQueuePath() + ": added new child queue: " + newChildQueue);
+ }
+ }
+
private Map getQueuesMap(List queues) {
Map queuesMap = new HashMap();
for (CSQueue queue : queues) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index ea5c892ce3e5b..78516d834d708 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -301,6 +301,25 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final boolean DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE = true;
+ @Private
+ public static final String INITIALIZE_QUEUES_PARALLEL_PREFIX =
+ PREFIX + "initialize-queues-parallel";
+
+ @Private
+ public static final String INITIALIZE_QUEUES_PARALLEL_ENABLE =
+ INITIALIZE_QUEUES_PARALLEL_PREFIX + ".enable";
+
+ @Private
+ public static final boolean DEFAULT_INITIALIZE_QUEUES_PARALLEL_ENABLE = false;
+
+ @Private
+ public static final String INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD =
+ INITIALIZE_QUEUES_PARALLEL_PREFIX + ".maximum-threads";
+
+ @Private
+ public static final Integer
+ DEFAULT_INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD = 1;
+
@Private
public static final String QUEUE_MAPPING = PREFIX + "queue-mappings";
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index bf9df5b694f1f..396bbba9e71d0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -19,12 +19,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.RecursiveTask;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
@@ -237,6 +240,40 @@ static CSQueue parseQueue(
CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues,
QueueHook hook) throws IOException {
CSQueue queue;
+ boolean initializeQueuesParallel = conf.getBoolean(
+ CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_ENABLE,
+ CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_ENABLE
+ );
+
+ if (initializeQueuesParallel) {
+ LOG.info("Initialize queues in parallel");
+ queue = parallelParseQueue(queueContext, conf, parent, queueName,
+ newQueues, oldQueues, hook);
+ } else {
+ LOG.info("Initialize queues serially");
+ queue = serialParseQueue(queueContext, conf, parent, queueName,
+ newQueues, oldQueues, hook);
+ }
+ return queue;
+ }
+
+ /**
+ * Parse the queue from the configuration serially.
+ * @param queueContext the CapacitySchedulerQueueContext
+ * @param conf the CapacitySchedulerConfiguration
+ * @param parent the parent queue
+ * @param queueName the queue name
+ * @param newQueues all the queues
+ * @param oldQueues the old queues
+ * @param hook the queue hook
+ * @return the CSQueue
+ * @throws IOException
+ */
+ static CSQueue serialParseQueue(
+ CapacitySchedulerQueueContext queueContext, CapacitySchedulerConfiguration conf,
+ CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues,
+ QueueHook hook) throws IOException {
+ CSQueue queue;
QueuePath queueToParse = (parent == null) ? new QueuePath(queueName) :
(QueuePath.createFromQueues(parent.getQueuePath(), queueName));
List childQueueNames = conf.getQueues(queueToParse);
@@ -283,7 +320,7 @@ static CSQueue parseQueue(
queue = hook.hook(parentQueue);
List childQueues = new ArrayList<>();
for (String childQueueName : childQueueNames) {
- CSQueue childQueue = parseQueue(queueContext, conf, queue, childQueueName, newQueues,
+ CSQueue childQueue = serialParseQueue(queueContext, conf, queue, childQueueName, newQueues,
oldQueues, hook);
childQueues.add(childQueue);
}
@@ -300,6 +337,45 @@ static CSQueue parseQueue(
return queue;
}
+ /**
+ * Parse the queue from the configuration in parallel.
+ * @param queueContext the CapacitySchedulerQueueContext
+ * @param conf the CapacitySchedulerConfiguration
+ * @param parent the parent queue
+ * @param queueName the queue name
+ * @param newQueues all the queues
+ * @param oldQueues the old queues
+ * @param hook the queue hook
+ * @return the CSQueue
+ * @throws IOException
+ */
+ static CSQueue parallelParseQueue(
+ CapacitySchedulerQueueContext queueContext, CapacitySchedulerConfiguration conf,
+ CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues,
+ QueueHook hook) throws IOException {
+ CSQueue queue;
+ ForkJoinPool initializeQueuesThreadPool = null;
+ try {
+ int initializeQueuesParallelism = conf.getInt(
+ CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD,
+ CapacitySchedulerConfiguration.DEFAULT_INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD);
+ initializeQueuesParallelism = Math.max(initializeQueuesParallelism, 1);
+
+ initializeQueuesThreadPool = new ForkJoinPool(initializeQueuesParallelism);
+ ParseQueueTask parseQueueTask = new ParseQueueTask(queueContext, conf, parent,
+ queueName, newQueues, oldQueues, hook);
+
+ queue = initializeQueuesThreadPool.invoke(parseQueueTask);
+ } catch (RuntimeException e) {
+ throw new IOException("Exception occurred during parse queue", e);
+ } finally {
+ if (initializeQueuesThreadPool != null) {
+ initializeQueuesThreadPool.shutdownNow();
+ }
+ }
+ return queue;
+ }
+
/**
* Updates to our list of queues: Adds the new queues and deletes the removed
* ones... be careful, do not overwrite existing queues.
@@ -742,4 +818,104 @@ private static void validateParent(CSQueue parent, String queueName) {
+ queueName);
}
}
+
+ static class ParseQueueTask extends RecursiveTask implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ private transient CapacitySchedulerQueueContext queueContext;
+ private transient CapacitySchedulerConfiguration conf;
+ private transient CSQueue parent;
+ private String queueName;
+ private transient CSQueueStore newQueues;
+ private transient CSQueueStore oldQueues;
+ private transient QueueHook hook;
+
+ ParseQueueTask(
+ CapacitySchedulerQueueContext queueContext, CapacitySchedulerConfiguration conf,
+ CSQueue parent, String queueName, CSQueueStore newQueues, CSQueueStore oldQueues,
+ QueueHook hook) {
+ this.queueContext = queueContext;
+ this.conf = conf;
+ this.parent = parent;
+ this.queueName = queueName;
+ this.newQueues = newQueues;
+ this.oldQueues = oldQueues;
+ this.hook = hook;
+ }
+
+ @Override
+ protected CSQueue compute() {
+ CSQueue queue;
+ try {
+ QueuePath queueToParse = (parent == null) ? new QueuePath(queueName) :
+ (QueuePath.createFromQueues(parent.getQueuePath(), queueName));
+ List childQueueNames = conf.getQueues(queueToParse);
+ CSQueue oldQueue = oldQueues.get(queueToParse.getFullPath());
+
+ boolean isReservableQueue = conf.isReservable(queueToParse);
+ boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled(queueToParse);
+ // if a queue is eligible for auto queue creation v2 it must be a ParentQueue
+ // (even if it is empty)
+ final boolean isDynamicParent = oldQueue instanceof AbstractParentQueue &&
+ oldQueue.isDynamicQueue();
+ boolean isAutoQueueCreationEnabledParent = isDynamicParent || conf.isAutoQueueCreationV2Enabled(
+ queueToParse) || isAutoCreateEnabled;
+
+ if (childQueueNames.size() == 0 && !isAutoQueueCreationEnabledParent) {
+ validateParent(parent, queueName);
+ // Check if the queue will be dynamically managed by the Reservation system
+ if (isReservableQueue) {
+ queue = new PlanQueue(queueContext, queueName, parent,
+ oldQueues.get(queueToParse.getFullPath()));
+ ReservationQueue defaultResQueue = ((PlanQueue) queue).initializeDefaultInternalQueue();
+ newQueues.add(defaultResQueue);
+ } else {
+ queue = new LeafQueue(queueContext, queueName, parent,
+ oldQueues.get(queueToParse.getFullPath()));
+ }
+
+ queue = hook.hook(queue);
+ } else {
+ if (isReservableQueue) {
+ throw new IllegalStateException("Only Leaf Queues can be reservable for " +
+ queueToParse.getFullPath());
+ }
+
+ AbstractParentQueue parentQueue;
+ if (isAutoCreateEnabled) {
+ parentQueue = new ManagedParentQueue(queueContext, queueName, parent, oldQueues.get(
+ queueToParse.getFullPath()));
+ } else {
+ parentQueue = new ParentQueue(queueContext, queueName, parent, oldQueues.get(
+ queueToParse.getFullPath()));
+ }
+
+ queue = hook.hook(parentQueue);
+ List parseQueueTasks = new ArrayList<>();
+ List childQueues = new ArrayList<>();
+ for (String childQueueName : childQueueNames) {
+ ParseQueueTask parseQueueTask = new ParseQueueTask(queueContext, conf, queue,
+ childQueueName, newQueues, oldQueues, hook);
+ parseQueueTask.fork();
+ parseQueueTasks.add(parseQueueTask);
+ }
+
+ for (ParseQueueTask parseQueueTask : parseQueueTasks) {
+ childQueues.add(parseQueueTask.join());
+ }
+ if (!childQueues.isEmpty()) {
+ parentQueue.setChildQueues(childQueues);
+ }
+
+ }
+
+ newQueues.add(queue);
+
+ LOG.info("Initialized queue: " + queueToParse.getFullPath());
+ } catch (IOException e) {
+ throw new RuntimeException("Exception occurred during parse queue", e);
+ }
+ return queue;
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java
index 2f445e636c8dc..b07f35e44f435 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueues.java
@@ -160,6 +160,26 @@ public void testRefreshQueues() throws Exception {
cs.stop();
}
+ @Test
+ public void testParallelRefreshQueues() throws Exception {
+ CapacityScheduler cs = new CapacityScheduler();
+ conf.setBoolean(CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_ENABLE, true);
+ conf.setInt(CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD, 10);
+ setupQueueConfiguration(conf);
+ cs.setConf(new YarnConfiguration());
+ cs.setRMContext(rm.getRMContext());
+ cs.init(conf);
+ cs.start();
+ cs.reinitialize(conf, rm.getRMContext());
+ checkQueueStructureCapacities(cs);
+
+ conf.setCapacity(A, 80f);
+ conf.setCapacity(B, 20f);
+ cs.reinitialize(conf, rm.getRMContext());
+ checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f));
+ cs.stop();
+ }
+
@Test
public void testRefreshQueuesWithNewQueue() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 870d61539b02b..5d990f10abb36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -3226,6 +3226,26 @@ public void testLocalityScheduling() throws Exception {
assertEquals(numNodes+1, app_0.getSchedulingOpportunities(schedulerKey));
}
+ @Test
+ public void testParallelInitializeQueues() throws Exception {
+
+ CSQueueStore serialQueues = new CSQueueStore();
+ CapacitySchedulerQueueManager.parseQueue(queueContext,
+ csConf, null, ROOT.getFullPath(), serialQueues, queues,
+ TestUtils.spyHook);
+
+ csConf.setBoolean(CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_ENABLE, true);
+ csConf.setInt(CapacitySchedulerConfiguration.INITIALIZE_QUEUES_PARALLEL_MAXIMUM_THREAD, 10);
+
+ CSQueueStore parallelQueues = new CSQueueStore();
+ CapacitySchedulerQueueManager.parseQueue(queueContext,
+ csConf, null, ROOT.getFullPath(), parallelQueues, queues,
+ TestUtils.spyHook);
+
+ assertEquals(serialQueues.getFullNameQueues().keySet(),
+ parallelQueues.getFullNameQueues().keySet());
+ }
+
@Test
public void testRackLocalityDelayScheduling() throws Exception {