Skip to content

Commit

Permalink
YARN-8721. Relax NE node-attribute check when attribute doesn't exist…
Browse files Browse the repository at this point in the history
… on a node. Contributed by Sunil Govindan.
  • Loading branch information
yangwwei authored and sunilgovind committed Sep 12, 2018
1 parent 67ae81f commit 5219435
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 52 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeAttribute;
import org.apache.hadoop.yarn.api.records.NodeAttributeKey;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes;

/**
Expand Down Expand Up @@ -126,4 +127,10 @@ public abstract Map<String, Set<NodeAttribute>> getNodesToAttributes(

// futuristic
// public set<NodeId> getNodesMatchingExpression(String nodeLabelExp);

/**
* Refresh node attributes on a given node during RM recovery.
* @param nodeId Node Id
*/
public abstract void refreshNodeAttributesToScheduler(NodeId nodeId);
}
Expand Up @@ -725,4 +725,27 @@ protected void serviceStop() throws Exception {
public void setRMContext(RMContext context) {
this.rmContext = context;
}

/**
* Refresh node attributes on a given node during RM recovery.
* @param nodeId Node Id
*/
public void refreshNodeAttributesToScheduler(NodeId nodeId) {
String hostName = nodeId.getHost();
Map<String, Set<NodeAttribute>> newNodeToAttributesMap =
new HashMap<>();
Host host = nodeCollections.get(hostName);
if (host == null || host.attributes == null) {
return;
}
newNodeToAttributesMap.put(hostName, host.attributes.keySet());

// Notify RM
if (rmContext != null && rmContext.getDispatcher() != null) {
LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap
.values());
rmContext.getDispatcher().getEventHandler().handle(
new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap));
}
}
}
Expand Up @@ -1989,6 +1989,12 @@ private void addNode(RMNode nodeManager) {
schedulerNode.getTotalResource());
}

// recover attributes from store if any.
if (rmContext.getNodeAttributesManager() != null) {
rmContext.getNodeAttributesManager()
.refreshNodeAttributesToScheduler(schedulerNode.getNodeID());
}

Resource clusterResource = getClusterResource();
getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
Expand Down
Expand Up @@ -113,7 +113,7 @@ private static boolean canSatisfySingleConstraintExpression(
|| maxScopeCardinality <= desiredMaxCardinality);
}

private static boolean canSatisfyNodeConstraintExpresssion(
private static boolean canSatisfyNodeConstraintExpression(
SingleConstraint sc, TargetExpression targetExpression,
SchedulerNode schedulerNode) {
Set<String> values = targetExpression.getTargetValues();
Expand All @@ -138,45 +138,67 @@ private static boolean canSatisfyNodeConstraintExpresssion(
return true;
}

if (schedulerNode.getNodeAttributes() == null ||
!schedulerNode.getNodeAttributes().contains(requestAttribute)) {
if(LOG.isDebugEnabled()) {
return getNodeConstraintEvaluatedResult(schedulerNode, opCode,
requestAttribute);
}
return true;
}

private static boolean getNodeConstraintEvaluatedResult(
SchedulerNode schedulerNode,
NodeAttributeOpCode opCode, NodeAttribute requestAttribute) {
// In case, attributes in a node is empty or incoming attributes doesn't
// exist on given node, accept such nodes for scheduling if opCode is
// equals to NE. (for eg. java != 1.8 could be scheduled on a node
// where java is not configured.)
if (schedulerNode.getNodeAttributes() == null ||
!schedulerNode.getNodeAttributes().contains(requestAttribute)) {
if (opCode == NodeAttributeOpCode.NE) {
if (LOG.isDebugEnabled()) {
LOG.debug("Incoming requestAttribute:" + requestAttribute
+ "is not present in " + schedulerNode.getNodeID());
+ "is not present in " + schedulerNode.getNodeID()
+ ", however opcode is NE. Hence accept this node.");
}
return false;
return true;
}
boolean found = false;
for (Iterator<NodeAttribute> it = schedulerNode.getNodeAttributes()
.iterator(); it.hasNext();) {
NodeAttribute nodeAttribute = it.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Starting to compare Incoming requestAttribute :"
+ requestAttribute
+ " with requestAttribute value= " + requestAttribute
.getAttributeValue()
+ ", stored nodeAttribute value=" + nodeAttribute
.getAttributeValue());
}
if (requestAttribute.equals(nodeAttribute)) {
if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Incoming requestAttribute:" + requestAttribute
+ " matches with node:" + schedulerNode.getNodeID());
}
found = true;
return found;
if (LOG.isDebugEnabled()) {
LOG.debug("Incoming requestAttribute:" + requestAttribute
+ "is not present in " + schedulerNode.getNodeID()
+ ", skip such node.");
}
return false;
}

boolean found = false;
for (Iterator<NodeAttribute> it = schedulerNode.getNodeAttributes()
.iterator(); it.hasNext();) {
NodeAttribute nodeAttribute = it.next();
if (LOG.isDebugEnabled()) {
LOG.debug("Starting to compare Incoming requestAttribute :"
+ requestAttribute
+ " with requestAttribute value= " + requestAttribute
.getAttributeValue()
+ ", stored nodeAttribute value=" + nodeAttribute
.getAttributeValue());
}
if (requestAttribute.equals(nodeAttribute)) {
if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Incoming requestAttribute:" + requestAttribute
+ " matches with node:" + schedulerNode.getNodeID());
}
found = true;
return found;
}
}
if (!found) {
if(LOG.isDebugEnabled()) {
LOG.info("skip this node:" + schedulerNode.getNodeID()
+ " for requestAttribute:" + requestAttribute);
}
return false;
}
if (!found) {
if (LOG.isDebugEnabled()) {
LOG.info("skip this node:" + schedulerNode.getNodeID()
+ " for requestAttribute:" + requestAttribute);
}
return false;
}
return true;
}
Expand Down Expand Up @@ -217,7 +239,7 @@ private static boolean canSatisfySingleConstraint(ApplicationId applicationId,
}
} else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) {
// This is a node attribute expression, check it.
if (!canSatisfyNodeConstraintExpresssion(singleConstraint, currentExp,
if (!canSatisfyNodeConstraintExpression(singleConstraint, currentExp,
schedulerNode)) {
return false;
}
Expand Down
Expand Up @@ -22,16 +22,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
Expand Down Expand Up @@ -205,7 +196,14 @@ public static ApplicationId getMockApplicationId(int appId) {
ApplicationId applicationId = BuilderUtils.newApplicationId(0l, appId);
return ApplicationAttemptId.newInstance(applicationId, attemptId);
}


public static FiCaSchedulerNode getMockNodeWithAttributes(String host,
String rack, int port, int memory, Set<NodeAttribute> attributes) {
FiCaSchedulerNode node = getMockNode(host, rack, port, memory, 1);
when(node.getNodeAttributes()).thenReturn(attributes);
return node;
}

public static FiCaSchedulerNode getMockNode(String host, String rack,
int port, int memory) {
return getMockNode(host, rack, port, memory, 1);
Expand Down

0 comments on commit 5219435

Please sign in to comment.