Skip to content

Commit

Permalink
YARN-2694. Ensure only single node label specified in ResourceRequest…
Browse files Browse the repository at this point in the history
…. Contributed by Wangda Tan
  • Loading branch information
jian-he committed Feb 6, 2015
1 parent eaab959 commit c1957fe
Show file tree
Hide file tree
Showing 17 changed files with 355 additions and 151 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -501,6 +501,9 @@ Release 2.7.0 - UNRELEASED
YARN-1537. Fix race condition in
TestLocalResourcesTrackerImpl.testLocalResourceCache. (xgong via acmurthy)

YARN-2694. Ensure only single node label specified in ResourceRequest.
(Wangda Tan via jianhe)

Release 2.6.0 - 2014-11-18

INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -253,24 +253,27 @@ public static boolean isAnyLocation(String hostName) {
/**
* Get node-label-expression for this Resource Request. If this is set, all
* containers allocated to satisfy this resource-request will be only on those
* nodes that satisfy this node-label-expression
* nodes that satisfy this node-label-expression.
*
* Please note that node label expression now can only take effect when the
* resource request has resourceName = ANY
*
* @return node-label-expression
*/
@Public
@Evolving
public abstract String getNodeLabelExpression();
public abstract String getNodeLabelExpression();

/**
* Set node label expression of this resource request. Now only
* support AND(&&), in the future will provide support for OR(||), NOT(!).
* Set node label expression of this resource request. Now only support
* specifying a single node label. In the future we will support more complex
* node label expression specification like AND(&&), OR(||), etc.
*
* Examples:
* - GPU && LARGE_MEM, ask for node has label GPU and LARGE_MEM together
* - "" (empty) means ask for node doesn't have label on it, this is default
* behavior
* Any please note that node label expression now can only take effect when
* the resource request has resourceName = ANY
*
* @param nodelabelExpression node-label-expression of this ResourceRequest
* @param nodelabelExpression
* node-label-expression of this ResourceRequest
*/
@Public
@Evolving
Expand Down
Expand Up @@ -169,7 +169,8 @@ public ContainerRequest(Resource capability, String[] nodes,
* If true, containers for this request may be assigned on hosts
* and racks other than the ones explicitly requested.
* @param nodeLabelsExpression
* Set node labels to allocate resource
* Set node labels to allocate resource, now we only support
* asking for only a single node label
*/
public ContainerRequest(Resource capability, String[] nodes,
String[] racks, Priority priority, boolean relaxLocality,
Expand Down
Expand Up @@ -421,6 +421,8 @@ public synchronized void addContainerRequest(T req) {
checkLocalityRelaxationConflict(req.getPriority(), dedupedRacks, true);
checkLocalityRelaxationConflict(req.getPriority(), inferredRacks,
req.getRelaxLocality());
// check if the node label expression specified is valid
checkNodeLabelExpression(req);

if (req.getNodes() != null) {
HashSet<String> dedupedNodes = new HashSet<String>(req.getNodes());
Expand Down Expand Up @@ -586,6 +588,37 @@ private void checkLocalityRelaxationConflict(Priority priority,
}
}

/**
* Valid if a node label expression specified on container request is valid or
* not
*
* @param containerRequest
*/
private void checkNodeLabelExpression(T containerRequest) {
String exp = containerRequest.getNodeLabelExpression();

if (null == exp || exp.isEmpty()) {
return;
}

// Don't support specifying >= 2 node labels in a node label expression now
if (exp.contains("&&") || exp.contains("||")) {
throw new InvalidContainerRequestException(
"Cannot specify more than two node labels"
+ " in a single node label expression");
}

// Don't allow specify node label against ANY request
if ((containerRequest.getRacks() != null &&
(!containerRequest.getRacks().isEmpty()))
||
(containerRequest.getNodes() != null &&
(!containerRequest.getNodes().isEmpty()))) {
throw new InvalidContainerRequestException(
"Cannot specify node label with rack and node");
}
}

private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// This code looks weird but is needed because of the following scenario.
// A ResourceRequest is removed from the remoteRequestTable. A 0 container
Expand Down Expand Up @@ -640,7 +673,9 @@ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
resourceRequestInfo.containerRequests.add(req);
}

resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
if (ResourceRequest.ANY.equals(resourceName)) {
resourceRequestInfo.remoteRequest.setNodeLabelExpression(labelExpression);
}

// Note this down for next interaction with ResourceManager
addResourceRequestToAsk(resourceRequestInfo.remoteRequest);
Expand Down
Expand Up @@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.ReplaceLabelsOnNodeRequest;
import org.apache.hadoop.yarn.util.ConverterUtils;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;

@Private
Expand Down Expand Up @@ -102,7 +103,9 @@ public class RMAdminCLI extends HAAdmin {
.put("-replaceLabelsOnNode",
new UsageInfo(
"[node1[:port]=label1,label2 node2[:port]=label1,label2]",
"replace labels on nodes"))
"replace labels on nodes"
+ " (please note that we do not support specifying multiple"
+ " labels on a single host for now.)"))
.put("-directlyAccessNodeLabelStore",
new UsageInfo("", "Directly access node label store, "
+ "with this option, all node label related operations"
Expand Down Expand Up @@ -389,8 +392,7 @@ private int removeFromClusterNodeLabels(String args) throws IOException,
return 0;
}

private Map<NodeId, Set<String>> buildNodeLabelsMapFromStr(String args)
throws IOException {
private Map<NodeId, Set<String>> buildNodeLabelsMapFromStr(String args) {
Map<NodeId, Set<String>> map = new HashMap<NodeId, Set<String>>();

for (String nodeToLabels : args.split("[ \n]")) {
Expand All @@ -411,10 +413,9 @@ private Map<NodeId, Set<String>> buildNodeLabelsMapFromStr(String args)
if (index == 0) {
splits = splits[1].split(",");
}

if (nodeIdStr.trim().isEmpty()) {
throw new IOException("node name cannot be empty");
}

Preconditions.checkArgument(!nodeIdStr.trim().isEmpty(),
"node name cannot be empty");

NodeId nodeId = ConverterUtils.toNodeIdWithDefaultPort(nodeIdStr);
map.put(nodeId, new HashSet<String>());
Expand All @@ -424,6 +425,11 @@ private Map<NodeId, Set<String>> buildNodeLabelsMapFromStr(String args)
map.get(nodeId).add(splits[i].trim());
}
}

int nLabels = map.get(nodeId).size();
Preconditions.checkArgument(nLabels <= 1, "%d labels specified on host=%s"
+ ", please note that we do not support specifying multiple"
+ " labels on a single host for now.", nLabels, nodeIdStr);
}

if (map.isEmpty()) {
Expand Down
Expand Up @@ -18,8 +18,6 @@

package org.apache.hadoop.yarn.client.api.impl;

import com.google.common.base.Supplier;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -40,7 +38,6 @@
import java.util.Set;
import java.util.TreeSet;

import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
Expand Down Expand Up @@ -75,6 +72,7 @@
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.InvalidContainerRequestException;
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
Expand All @@ -89,13 +87,16 @@
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mortbay.log.Log;

import com.google.common.base.Supplier;

public class TestAMRMClient {
static Configuration conf = null;
static MiniYARNCluster yarnCluster = null;
Expand Down Expand Up @@ -148,7 +149,6 @@ public static void setup() throws Exception {
racks = new String[]{ rack };
}

@SuppressWarnings("deprecation")
@Before
public void startApp() throws Exception {
// submit new app
Expand Down Expand Up @@ -678,21 +678,57 @@ public void testAskWithNodeLabels() {
AMRMClientImpl<ContainerRequest> client =
new AMRMClientImpl<ContainerRequest>();

// add x, y to ANY
// add exp=x to ANY
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "x && y"));
1), null, null, Priority.UNDEFINED, true, "x"));
Assert.assertEquals(1, client.ask.size());
Assert.assertEquals("x && y", client.ask.iterator().next()
Assert.assertEquals("x", client.ask.iterator().next()
.getNodeLabelExpression());

// add x, y and a, b to ANY, only a, b should be kept
// add exp=x then add exp=a to ANY in same priority, only exp=a should kept
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "x && y"));
1), null, null, Priority.UNDEFINED, true, "x"));
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true, "a && b"));
1), null, null, Priority.UNDEFINED, true, "a"));
Assert.assertEquals(1, client.ask.size());
Assert.assertEquals("a && b", client.ask.iterator().next()
Assert.assertEquals("a", client.ask.iterator().next()
.getNodeLabelExpression());

// add exp=x to ANY, rack and node, only resource request has ANY resource
// name will be assigned the label expression
// add exp=x then add exp=a to ANY in same priority, only exp=a should kept
client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
1), null, null, Priority.UNDEFINED, true,
"y"));
Assert.assertEquals(1, client.ask.size());
for (ResourceRequest req : client.ask) {
if (ResourceRequest.ANY.equals(req.getResourceName())) {
Assert.assertEquals("y", req.getNodeLabelExpression());
} else {
Assert.assertNull(req.getNodeLabelExpression());
}
}
}

private void verifyAddRequestFailed(AMRMClient<ContainerRequest> client,
ContainerRequest request) {
try {
client.addContainerRequest(request);
} catch (InvalidContainerRequestException e) {
return;
}
Assert.fail();
}

@Test(timeout=30000)
public void testAskWithInvalidNodeLabels() {
AMRMClientImpl<ContainerRequest> client =
new AMRMClientImpl<ContainerRequest>();

// specified exp with more than one node labels
verifyAddRequestFailed(client,
new ContainerRequest(Resource.newInstance(1024, 1), null, null,
Priority.UNDEFINED, true, "x && y"));
}

private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
Expand Down
Expand Up @@ -512,7 +512,7 @@ public void testReplaceLabelsOnNode() throws Exception {
.addToCluserNodeLabels(ImmutableSet.of("x", "y", "Y"));
String[] args =
{ "-replaceLabelsOnNode",
"node1:8000,x,y node2:8000=y node3,x,Y node4=Y",
"node1:8000,x node2:8000=y node3,x node4=Y",
"-directlyAccessNodeLabelStore" };
assertEquals(0, rmAdminCLI.run(args));
assertTrue(dummyNodeLabelsManager.getNodeLabels().containsKey(
Expand Down Expand Up @@ -540,6 +540,16 @@ public void testReplaceLabelsOnNode() throws Exception {
args = new String[] { "-replaceLabelsOnNode", ", " };
assertTrue(0 != rmAdminCLI.run(args));
}

@Test
public void testReplaceMultipleLabelsOnSingleNode() throws Exception {
// Successfully replace labels
dummyNodeLabelsManager.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
String[] args =
{ "-replaceLabelsOnNode", "node1,x,y",
"-directlyAccessNodeLabelStore" };
assertTrue(0 != rmAdminCLI.run(args));
}

private void testError(String[] args, String template,
ByteArrayOutputStream data, int resultCode) throws Exception {
Expand Down
Expand Up @@ -45,11 +45,12 @@
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.event.StoreNewClusterNodeLabels;
import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEvent;
import org.apache.hadoop.yarn.nodelabels.event.NodeLabelsStoreEventType;
import org.apache.hadoop.yarn.nodelabels.event.RemoveClusterNodeLabels;
import org.apache.hadoop.yarn.nodelabels.event.StoreNewClusterNodeLabels;
import org.apache.hadoop.yarn.nodelabels.event.UpdateNodeToLabelsMappingsEvent;
import org.apache.hadoop.yarn.util.resource.Resources;

Expand Down Expand Up @@ -309,14 +310,35 @@ protected void checkAddLabelsToNode(
// check all labels being added existed
Set<String> knownLabels = labelCollections.keySet();
for (Entry<NodeId, Set<String>> entry : addedLabelsToNode.entrySet()) {
if (!knownLabels.containsAll(entry.getValue())) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();

if (!knownLabels.containsAll(labels)) {
String msg =
"Not all labels being added contained by known "
+ "label collections, please check" + ", added labels=["
+ StringUtils.join(entry.getValue(), ",") + "]";
+ StringUtils.join(labels, ",") + "]";
LOG.error(msg);
throw new IOException(msg);
}

// In YARN-2694, we temporarily disable user add more than 1 labels on a
// same host
if (!labels.isEmpty()) {
Set<String> newLabels = new HashSet<String>(getLabelsByNode(nodeId));
newLabels.addAll(labels);
// we don't allow number of labels on a node > 1 after added labels
if (newLabels.size() > 1) {
String msg =
String.format(
"%d labels specified on host=%s after add labels to node"
+ ", please note that we do not support specifying multiple"
+ " labels on a single host for now.",
newLabels.size(), nodeId.getHost());
LOG.error(msg);
throw new IOException(msg);
}
}
}
}

Expand Down Expand Up @@ -620,11 +642,24 @@ protected void checkReplaceLabelsOnNode(
// check all labels being added existed
Set<String> knownLabels = labelCollections.keySet();
for (Entry<NodeId, Set<String>> entry : replaceLabelsToNode.entrySet()) {
if (!knownLabels.containsAll(entry.getValue())) {
NodeId nodeId = entry.getKey();
Set<String> labels = entry.getValue();

// As in YARN-2694, we disable user add more than 1 labels on a same host
if (labels.size() > 1) {
String msg = String.format("%d labels specified on host=%s"
+ ", please note that we do not support specifying multiple"
+ " labels on a single host for now.", labels.size(),
nodeId.getHost());
LOG.error(msg);
throw new IOException(msg);
}

if (!knownLabels.containsAll(labels)) {
String msg =
"Not all labels being replaced contained by known "
+ "label collections, please check" + ", new labels=["
+ StringUtils.join(entry.getValue(), ",") + "]";
+ StringUtils.join(labels, ",") + "]";
LOG.error(msg);
throw new IOException(msg);
}
Expand Down

0 comments on commit c1957fe

Please sign in to comment.