From 6f1d5896f8e8819910446cd8b4a7cc2ae2a84601 Mon Sep 17 00:00:00 2001 From: Brandon Scheller Date: Thu, 27 Sep 2018 22:37:36 +0000 Subject: [PATCH] Add fairscheduler nodelabel support Supports unfair label scheduling and non-exclusive node labels --- .../hadoop/yarn/conf/YarnConfiguration.java | 3 + .../server/resourcemanager/RMAppManager.java | 12 +- .../nodelabels/RMNodeLabelsManager.java | 2 +- .../scheduler/fair/FSAppAttempt.java | 11 ++ .../scheduler/fair/FSLeafQueue.java | 48 ++++- .../scheduler/fair/FSQueue.java | 11 +- .../scheduler/fair/FSQueueMetrics.java | 18 ++ .../scheduler/fair/FairScheduler.java | 73 +++++++ .../scheduler/fair/FairSchedulerTestBase.java | 26 ++- .../scheduler/fair/TestFairScheduler.java | 178 +++++++++++++++++- ...stFairSchedulerWithMultiResourceTypes.java | 2 + .../scheduler/fair/TestSchedulingPolicy.java | 2 + 12 files changed, 363 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6488ebfc4ec447..5e8815430facb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3544,6 +3544,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_CLUSTER_LEVEL_APPLICATION_PRIORITY = 0; + public static final String DEFAULT_AM_NODELABEL_EXPRESSION = + NODE_LABELS_PREFIX + "am.default-node-label-expression"; + public static final String APP_ATTEMPT_DIAGNOSTICS_LIMIT_KC = YARN_PREFIX + "app.attempt.diagnostics.limit.kc"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 783fab0bc0534b..f59dda07c5c662 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -556,8 +556,8 @@ private List validateAndCreateResourceRequest( // set label expression for AM ANY request if not set if (null == anyReq.getNodeLabelExpression()) { - anyReq.setNodeLabelExpression(submissionContext - .getNodeLabelExpression()); + anyReq.setNodeLabelExpression(determineAMNodeLabelExpression( + submissionContext.getNodeLabelExpression())); } // Put ANY request at the front @@ -587,6 +587,14 @@ private List validateAndCreateResourceRequest( return null; } + private String determineAMNodeLabelExpression(String nodeLabelExpression) { + if (nodeLabelExpression == null) { + return conf.get(YarnConfiguration.DEFAULT_AM_NODELABEL_EXPRESSION, null); + } else { + return nodeLabelExpression; + } + } + @Override public void recover(RMState state) throws Exception { RMStateStore store = rmContext.getStateStore(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 507f696d057a21..10761d7dd88bc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -536,7 +536,7 @@ public Resource getResourceByLabel(String label, Resource clusterResource) { private boolean isNodeUsableByQueue(Set nodeLabels, Queue q) { // node without any labels can be accessed by any queue - if (nodeLabels == null || nodeLabels.isEmpty() + if (q.accessibleNodeLabels.contains(ANY) || nodeLabels == null || nodeLabels.isEmpty() || (nodeLabels.size() == 1 && nodeLabels.contains(NO_LABEL))) { return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index d9f32623246eea..cadadad22ffeaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -178,6 +178,17 @@ void containerCompleted(RMContainer rmContainer, } } + public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition, String newPartition) { + Resource containerResource = rmContainer.getAllocatedResource(); + this.attemptResourceUsage.decUsed(oldPartition, containerResource); + this.attemptResourceUsage.incUsed(newPartition, containerResource); + + if (rmContainer.isAMContainer()) { + this.attemptResourceUsage.decAMUsed(oldPartition, containerResource); + this.attemptResourceUsage.incAMUsed(newPartition, containerResource); + } + } + private void unreserveInternal( SchedulerRequestKey schedulerKey, FSSchedulerNode node) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 7e4dab8088db33..0170adb67c29ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; @@ -74,6 +76,7 @@ public class FSLeafQueue extends FSQueue { // Track the AM resource usage for this queue private Resource amResourceUsage; + private final RMNodeLabelsManager labelsManager; private final ActiveUsersManager activeUsersManager; public FSLeafQueue(String name, FairScheduler scheduler, @@ -81,6 +84,7 @@ public FSLeafQueue(String name, FairScheduler scheduler, super(name, scheduler, parent); this.context = scheduler.getContext(); this.lastTimeAtMinShare = scheduler.getClock().getTime(); + this.labelsManager = scheduler.getLabelsManager(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); getMetrics().setAMResourceUsage(amResourceUsage); @@ -282,7 +286,7 @@ private void updateStarvedAppsMinshare( */ void updateStarvedApps() { // Fetch apps with pending demand - TreeSet appsWithDemand = fetchAppsWithDemand(false); + TreeSet appsWithDemand = fetchAppsWithPreemptionDemand(); // Process apps with fairshare starvation Resource fairShareStarvation = updateStarvedAppsFairshare(appsWithDemand); @@ -346,7 +350,7 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } - for (FSAppAttempt sched : fetchAppsWithDemand(true)) { + for (FSAppAttempt sched : fetchAppsWithDemandForNode(node)) { if (SchedulerAppUtils.isPlaceBlacklisted(sched, node, LOG)) { continue; } @@ -362,23 +366,49 @@ public Resource assignContainer(FSSchedulerNode node) { return assigned; } + private TreeSet fetchAppsWithDemandForNode(FSSchedulerNode node) { + TreeSet pendingForResourceApps = + new TreeSet<>(policy.getComparator()); + readLock.lock(); + try { + for (FSAppAttempt app : runnableApps) { + Resource pending; + if (node.getPartition().isEmpty()) { + pending = app.getAppAttemptResourceUsage().getPending(); + } else { + pending = app.getAppAttemptResourceUsage().getPending(node.getPartition()); + if (pending.equals(Resources.none()) + && !labelsManager.isExclusiveNodeLabel(node.getPartition())) { + pending = app.getAppAttemptResourceUsage().getPending(); + } + } + if (!Resources.isNone(pending)) { + pendingForResourceApps.add(app); + } + } + } catch (IOException e) { + LOG.warn("Exception when trying to get exclusivity of node label=" + + node.getPartition(), e); + return pendingForResourceApps; + } finally { + readLock.unlock(); + } + return pendingForResourceApps; + } + /** - * Fetch the subset of apps that have unmet demand. When used for - * preemption-related code (as opposed to allocation), omits apps that + * Fetch the subset of apps that have unmet demand. Omits apps that * should not be checked for starvation. * - * @param assignment whether the apps are for allocation containers, as - * opposed to preemption calculations * @return Set of apps with unmet demand */ - private TreeSet fetchAppsWithDemand(boolean assignment) { + private TreeSet fetchAppsWithPreemptionDemand() { TreeSet pendingForResourceApps = new TreeSet<>(policy.getComparator()); readLock.lock(); try { for (FSAppAttempt app : runnableApps) { - if (!Resources.isNone(app.getPendingDemand()) && - (assignment || app.shouldCheckForStarvation())) { + if (!Resources.isNone(app.getPendingDemand()) && app.shouldCheckForStarvation()) { pendingForResourceApps.add(app); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 6217f550f8b29f..29181bebe41f73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -42,11 +42,13 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; @Private @Unstable @@ -256,6 +258,7 @@ public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) { queueInfo.setChildQueues(childQueueInfos); queueInfo.setQueueState(QueueState.RUNNING); queueInfo.setQueueStatistics(getQueueStatistics()); + queueInfo.setAccessibleNodeLabels(getAccessibleNodeLabels()); return queueInfo; } @@ -461,14 +464,14 @@ public String toString() { @Override public Set getAccessibleNodeLabels() { - // TODO, add implementation for FS - return null; + // Allow queue access to all node labels + return ImmutableSet.of(RMNodeLabelsManager.ANY); } @Override public String getDefaultNodeLabelExpression() { - // TODO, add implementation for FS - return null; + // Allow queue access to all node labels + return RMNodeLabelsManager.ANY; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index 4fe3973f7f7d38..8f880ffff06192 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @@ -191,6 +192,23 @@ static FSQueueMetrics forQueue(String queueName, Queue parent, return forQueue(ms, queueName, parent, enableUserMetrics, conf); } + // All resource metrics should update the default partition + @Override + public void allocateResources(String partition, String user, int containers, Resource res, + boolean decrPending) { + super.allocateResources(RMNodeLabelsManager.NO_LABEL, user, containers, res, decrPending); + } + + @Override + public void allocateResources(String partition, String user, Resource res) { + super.allocateResources(RMNodeLabelsManager.NO_LABEL, user, res); + } + + @Override + public void releaseResources(String partition, String user, int containers, Resource res) { + super.releaseResources(RMNodeLabelsManager.NO_LABEL, user, containers, res); + } + /** * Get the FS queue metric for the given queue. Create one and register it to * metrics system if there isn't one for the queue. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index da5e4c9347edbe..a8774a9aefd219 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -20,6 +20,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -85,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; @@ -201,6 +204,9 @@ public class FairScheduler extends final MaxRunningAppsEnforcer maxRunningEnforcer; private AllocationFileLoaderService allocsLoader; + + private RMNodeLabelsManager labelManager; + @VisibleForTesting volatile AllocationConfiguration allocConf; @@ -384,6 +390,16 @@ public void update() { return rmContext.getContainerTokenSecretManager(); } + /** Convenience method for use by other fair scheduler components to get the + * node labels manager. + * + * @return the node labels manager + */ + @VisibleForTesting + public RMNodeLabelsManager getLabelsManager() { + return labelManager; + } + public boolean isSizeBasedWeight() { return sizeBasedWeight; } @@ -769,6 +785,41 @@ protected void completedContainerInternal( } } + /** + * Process node labels update on a node. + */ + private synchronized void updateLabelsOnNode(NodeId nodeId, + Set newLabels) { + FSSchedulerNode node = nodeTracker.getNode(nodeId); + if (null == node) { + return; + } + + // Get new partition, we have only one partition per node + String newPartition = Iterables.getFirst(newLabels, RMNodeLabelsManager.NO_LABEL); + + // old partition as well + String oldPartition = node.getPartition(); + + // Update resources of these containers + for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) { + FSAppAttempt application = + getApplicationAttempt(rmContainer.getApplicationAttemptId()); + if (null != application) { + application.nodePartitionUpdated(rmContainer, oldPartition, + newPartition); + } else { + LOG.warn("There's something wrong, some RMContainers running on" + + " a node, but we cannot find SchedulerApplicationAttempt for it. Node=" + + node.getNodeID() + " applicationAttemptId=" + + rmContainer.getApplicationAttemptId()); + } + } + + // Update node labels after we've done this + node.updateLabels(newLabels); + } + private void addNode(List containerReports, RMNode node) { writeLock.lock(); @@ -777,6 +828,9 @@ private void addNode(List containerReports, usePortForNodeName); nodeTracker.addNode(schedulerNode); + // update this node to node label manager + labelManager.activateNode(node.getNodeID(), schedulerNode.getTotalResource()); + triggerUpdate(); Resource clusterResource = getClusterResource(); @@ -819,6 +873,9 @@ private void removeNode(RMNode rmNode) { SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL); } + // update this node to node label manager + labelManager.deactivateNode(rmNode.getNodeID()); + nodeTracker.removeNode(nodeId); Resource clusterResource = getClusterResource(); queueMgr.getRootQueue().setSteadyFairShare(clusterResource); @@ -1021,6 +1078,9 @@ protected void nodeUpdate(RMNode nm) { super.nodeUpdate(nm); FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID()); + if (fsNode != null) { + labelManager.activateNode(nm.getNodeID(), fsNode.getTotalResource()); + } attemptScheduling(fsNode); long duration = getClock().getTime() - start; @@ -1239,6 +1299,18 @@ public void handle(SchedulerEvent event) { NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event; removeNode(nodeRemovedEvent.getRemovedRMNode()); break; + case NODE_LABELS_UPDATE: + if (!(event instanceof NodeLabelsUpdateSchedulerEvent)) { + throw new RuntimeException("Unexpected event type: " + event); + } + NodeLabelsUpdateSchedulerEvent labelUpdateEvent = (NodeLabelsUpdateSchedulerEvent) event; + for (Entry> entry : labelUpdateEvent + .getUpdatedNodeToLabels().entrySet()) { + NodeId id = entry.getKey(); + Set labels = entry.getValue(); + updateLabelsOnNode(id, labels); + } + break; case NODE_UPDATE: if (!(event instanceof NodeUpdateSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); @@ -1420,6 +1492,7 @@ private void initScheduler(Configuration conf) throws IOException { sizeBasedWeight = this.conf.getSizeBasedWeight(); usePortForNodeName = this.conf.getUsePortForNodeName(); reservableNodesRatio = this.conf.getReservableNodes(); + this.labelManager = rmContext.getNodeLabelManager(); updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java index 4f1f20b942b7a5..eb94e20b89d57c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerTestBase.java @@ -116,12 +116,19 @@ protected ResourceRequest createResourceRequest( int memory, String host, int priority, int numContainers, boolean relaxLocality) { return createResourceRequest(memory, 1, host, priority, numContainers, - relaxLocality); + relaxLocality, null); } protected ResourceRequest createResourceRequest( int memory, int vcores, String host, int priority, int numContainers, boolean relaxLocality) { + return createResourceRequest(memory, vcores, host, priority, numContainers, + relaxLocality, null); + } + + protected ResourceRequest createResourceRequest( + int memory, int vcores, String host, int priority, int numContainers, + boolean relaxLocality, String label) { ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class); request.setCapability(BuilderUtils.newResource(memory, vcores)); request.setResourceName(host); @@ -130,7 +137,7 @@ protected ResourceRequest createResourceRequest( prio.setPriority(priority); request.setPriority(prio); request.setRelaxLocality(relaxLocality); - request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL); + request.setNodeLabelExpression(label != null ? label : RMNodeLabelsManager.NO_LABEL); return request; } @@ -155,20 +162,27 @@ protected ApplicationAttemptId createSchedulingRequest( protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers) { - return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1); + return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, 1, null); } protected ApplicationAttemptId createSchedulingRequest( int memory, String queueId, String userId, int numContainers, int priority) { return createSchedulingRequest(memory, 1, queueId, userId, numContainers, - priority); + priority, null); } protected ApplicationAttemptId createSchedulingRequest( int memory, int vcores, String queueId, String userId, int numContainers, int priority) { + return createSchedulingRequest(memory, vcores, queueId, userId, numContainers, + priority, null); + } + + protected ApplicationAttemptId createSchedulingRequest( + int memory, int vcores, String queueId, String userId, int numContainers, + int priority, String label) { ResourceRequest request = createResourceRequest(memory, vcores, - ResourceRequest.ANY, priority, numContainers, true); + ResourceRequest.ANY, priority, numContainers, true, label); return createSchedulingRequest(Lists.newArrayList(request), queueId, userId); } @@ -247,7 +261,7 @@ protected void createSchedulingRequestExistingApplication( protected void createSchedulingRequestExistingApplication( int memory, int vcores, int priority, ApplicationAttemptId attId) { ResourceRequest request = createResourceRequest(memory, vcores, ResourceRequest.ANY, - priority, 1, true); + priority, 1, true, null); createSchedulingRequestExistingApplication(request, attId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index dbf4b2bf91bc82..f4b4eb5e52f809 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -62,12 +62,14 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.NodeLabel; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl; +import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Dispatcher; @@ -113,6 +115,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default; @@ -129,6 +132,7 @@ import org.mockito.Mockito; import org.xml.sax.SAXException; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Sets; @SuppressWarnings("unchecked") @@ -1022,6 +1026,10 @@ public void testQueueInfo() throws IOException { .getAllocatedContainers()); Assert.assertEquals(6144, queueInfo.getQueueStatistics() .getAllocatedMemoryMB()); + + //Test that queueInfo has access to all nodelabels + Assert.assertEquals("*", queueInfo.getAccessibleNodeLabels().iterator().next()); + Assert.assertEquals("*", queueInfo.getAccessibleNodeLabels().iterator().next()); } @Test @@ -3144,7 +3152,173 @@ public void testAssignContainer() throws Exception { } } } - + + @Test + public void testExclusiveLabelAssignments() throws Exception { + final String testLabel1 = "ONE"; + final String testLabel2 = "TWO"; + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add five nodes + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + RMNode node5 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.5"); + NodeAddedSchedulerEvent nodeEvent5 = new NodeAddedSchedulerEvent(node5); + scheduler.handle(nodeEvent5); + + // Add exclusive test labels to cluster labels + NodeLabel label1 = new NodeLabelPBImpl(); + label1.setName(testLabel1); + label1.setExclusivity(true); + NodeLabel label2 = new NodeLabelPBImpl(); + label2.setName(testLabel2); + label2.setExclusivity(true); + resourceManager.getRMContext().getNodeLabelManager() + .addToCluserNodeLabels(ImmutableSet.of(label1, label2)); + + // Add testLabel1 to node 3 and testLabel2 to node 5 + Map> nodeToLabelsMap = new HashMap<>(); + nodeToLabelsMap.put(node3.getNodeID(), ImmutableSet.of(testLabel1)); + nodeToLabelsMap.put(node5.getNodeID(), ImmutableSet.of(testLabel2)); + NodeLabelsUpdateSchedulerEvent nodeLabelEvent = new NodeLabelsUpdateSchedulerEvent(nodeToLabelsMap); + scheduler.handle(nodeLabelEvent); + + ApplicationAttemptId attId1 = + createSchedulingRequest(1024, 1, "default", "user1", 10, 1, testLabel1); + ApplicationAttemptId attId2 = + createSchedulingRequest(1024, 1, "default", "user1", 10, 1, testLabel2); + ApplicationAttemptId attId3 = + createSchedulingRequest(1024, 1, "default", "user1", 10, 1); + + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + + NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent3 = new NodeUpdateSchedulerEvent(node3); + NodeUpdateSchedulerEvent updateEvent4 = new NodeUpdateSchedulerEvent(node4); + NodeUpdateSchedulerEvent updateEvent5 = new NodeUpdateSchedulerEvent(node5); + + scheduler.handle(updateEvent1); + scheduler.handle(updateEvent2); + scheduler.handle(updateEvent3); + scheduler.handle(updateEvent4); + scheduler.handle(updateEvent5); + + // Number of containers per app matches number of same labeled nodes + assertEquals(1, app1.getLiveContainers().size()); + assertEquals(1, app2.getLiveContainers().size()); + assertEquals(3, app3.getLiveContainers().size()); + } + + @Test + public void testNonExclusiveLabelAssignments() throws Exception { + final String testLabel1 = "ONE"; + final String testLabel2 = "TWO"; + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add five nodes + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + RMNode node5 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.5"); + NodeAddedSchedulerEvent nodeEvent5 = new NodeAddedSchedulerEvent(node5); + scheduler.handle(nodeEvent5); + + // Add nonexclusive & exclusive test labels to cluster labels + NodeLabel label1 = new NodeLabelPBImpl(); + label1.setName(testLabel1); + label1.setExclusivity(true); + NodeLabel label2 = new NodeLabelPBImpl(); + label2.setName(testLabel2); + label2.setExclusivity(false); + resourceManager.getRMContext().getNodeLabelManager() + .addToCluserNodeLabels(ImmutableSet.of(label1, label2)); + + // Add exclusive testLabel1 to node 3 and nonexclusive testLabel2 to nodes 4 & 5 + Map> nodeToLabelsMap = new HashMap<>(); + nodeToLabelsMap.put(node3.getNodeID(), ImmutableSet.of(testLabel1)); + nodeToLabelsMap.put(node4.getNodeID(), ImmutableSet.of(testLabel2)); + nodeToLabelsMap.put(node5.getNodeID(), ImmutableSet.of(testLabel2)); + NodeLabelsUpdateSchedulerEvent nodeLabelEvent = new NodeLabelsUpdateSchedulerEvent(nodeToLabelsMap); + scheduler.handle(nodeLabelEvent); + + // Create request without labels + ApplicationAttemptId attId = + createSchedulingRequest(1024, 1, "default", "user1", 10, 1); + + FSAppAttempt app = scheduler.getSchedulerApp(attId); + + NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateEvent3 = new NodeUpdateSchedulerEvent(node3); + NodeUpdateSchedulerEvent updateEvent4 = new NodeUpdateSchedulerEvent(node4); + NodeUpdateSchedulerEvent updateEvent5 = new NodeUpdateSchedulerEvent(node5); + + scheduler.handle(updateEvent1); + scheduler.handle(updateEvent2); + scheduler.handle(updateEvent3); + scheduler.handle(updateEvent4); + scheduler.handle(updateEvent5); + + // Should have 2 containers on unlabeled nodes and 2 on non-exclusive labeled node + assertEquals(4, app.getLiveContainers().size()); + } + @SuppressWarnings("unchecked") @Test public void testNotAllowSubmitApplication() throws Exception { @@ -5086,6 +5260,8 @@ public void testUserAsDefaultQueueWithLeadingTrailingSpaceUserName() @Test public void testRemovedNodeDecomissioningNode() throws Exception { + scheduler.init(conf); + scheduler.start(); // Register nodemanager NodeManager nm = registerNode("host_decom", 1234, 2345, NetworkTopology.DEFAULT_RACK, Resources.createResource(8 * GB, 4)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java index f9fcf5328f0cf0..01adc8aa0eaf8e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerWithMultiResourceTypes.java @@ -21,6 +21,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.junit.After; import org.junit.Before; @@ -91,6 +92,7 @@ public void testMaximumAllocationRefresh() throws IOException { YarnConfiguration.RESOURCE_TYPES + "." + ResourceInformation.MEMORY_MB.getName() + MAXIMUM_ALLOCATION, 512); + scheduler.setRMContext(new RMContextImpl()); scheduler.init(conf); scheduler.reinitialize(conf, null); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index b016c1b4fb8eab..2966473033b89b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; @@ -51,6 +52,7 @@ public class TestSchedulingPolicy { @Before public void setUp() throws Exception { scheduler = new FairScheduler(); + scheduler.setRMContext(new RMContextImpl()); conf = new FairSchedulerConfiguration(); }