Skip to content

Commit

Permalink
YARN-2495. Allow admin specify labels from each NM (Distributed confi…
Browse files Browse the repository at this point in the history
…guration for node label). (Naganarasimha G R via wangda)
  • Loading branch information
wangdatan committed Mar 30, 2015
1 parent b804571 commit 2a945d2
Show file tree
Hide file tree
Showing 21 changed files with 1,199 additions and 41 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -83,6 +83,9 @@ Release 2.8.0 - UNRELEASED


YARN-3288. Document and fix indentation in the DockerContainerExecutor code YARN-3288. Document and fix indentation in the DockerContainerExecutor code


YARN-2495. Allow admin specify labels from each NM (Distributed
configuration for node label). (Naganarasimha G R via wangda)

OPTIMIZATIONS OPTIMIZATIONS


YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not
Expand Down
Expand Up @@ -1719,6 +1719,18 @@ private static void addDeprecatedKeys() {
public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX
+ "enabled"; + "enabled";
public static final boolean DEFAULT_NODE_LABELS_ENABLED = false; public static final boolean DEFAULT_NODE_LABELS_ENABLED = false;

public static final String NODELABEL_CONFIGURATION_TYPE =
NODE_LABELS_PREFIX + "configuration-type";

public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
"centralized";

public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE =
"distributed";

public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
CENTALIZED_NODELABEL_CONFIGURATION_TYPE;


public YarnConfiguration() { public YarnConfiguration() {
super(); super();
Expand Down
Expand Up @@ -239,6 +239,10 @@ message NodeIdToLabelsProto {
repeated string nodeLabels = 2; repeated string nodeLabels = 2;
} }


message StringArrayProto {
repeated string elements = 1;
}

message LabelsToNodeIdsProto { message LabelsToNodeIdsProto {
optional string nodeLabels = 1; optional string nodeLabels = 1;
repeated NodeIdProto nodeId = 2; repeated NodeIdProto nodeId = 2;
Expand Down
Expand Up @@ -70,7 +70,7 @@ public void testResourceTrackerOnHA() throws Exception {
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null, NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
null, null); null, null);
NodeHeartbeatRequest request2 = NodeHeartbeatRequest request2 =
NodeHeartbeatRequest.newInstance(status, null, null); NodeHeartbeatRequest.newInstance(status, null, null,null);
resourceTracker.nodeHeartbeat(request2); resourceTracker.nodeHeartbeat(request2);
} }


Expand Down
Expand Up @@ -18,6 +18,8 @@


package org.apache.hadoop.yarn.server.api.protocolrecords; package org.apache.hadoop.yarn.server.api.protocolrecords;


import java.util.Set;

import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.Records;
Expand All @@ -26,14 +28,15 @@ public abstract class NodeHeartbeatRequest {


public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus, public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey, MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey) { MasterKey lastKnownNMTokenMasterKey, Set<String> nodeLabels) {
NodeHeartbeatRequest nodeHeartbeatRequest = NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class); Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus); nodeHeartbeatRequest.setNodeStatus(nodeStatus);
nodeHeartbeatRequest nodeHeartbeatRequest
.setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey); .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
nodeHeartbeatRequest nodeHeartbeatRequest
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey); .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
nodeHeartbeatRequest.setNodeLabels(nodeLabels);
return nodeHeartbeatRequest; return nodeHeartbeatRequest;
} }


Expand All @@ -45,4 +48,7 @@ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,


public abstract MasterKey getLastKnownNMTokenMasterKey(); public abstract MasterKey getLastKnownNMTokenMasterKey();
public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey); public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);

public abstract Set<String> getNodeLabels();
public abstract void setNodeLabels(Set<String> nodeLabels);
} }
Expand Up @@ -67,4 +67,7 @@ public interface NodeHeartbeatResponse {


void setSystemCredentialsForApps( void setSystemCredentialsForApps(
Map<ApplicationId, ByteBuffer> systemCredentials); Map<ApplicationId, ByteBuffer> systemCredentials);

boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
} }
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.api.protocolrecords; package org.apache.hadoop.yarn.server.api.protocolrecords;


import java.util.List; import java.util.List;
import java.util.Set;


import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
Expand All @@ -31,6 +32,14 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId, int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses, List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications) { List<ApplicationId> runningApplications) {
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
containerStatuses, runningApplications, null);
}

public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<String> nodeLabels) {
RegisterNodeManagerRequest request = RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class); Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort); request.setHttpPort(httpPort);
Expand All @@ -39,6 +48,7 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
request.setNMVersion(nodeManagerVersionId); request.setNMVersion(nodeManagerVersionId);
request.setContainerStatuses(containerStatuses); request.setContainerStatuses(containerStatuses);
request.setRunningApplications(runningApplications); request.setRunningApplications(runningApplications);
request.setNodeLabels(nodeLabels);
return request; return request;
} }


Expand All @@ -47,6 +57,8 @@ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
public abstract Resource getResource(); public abstract Resource getResource();
public abstract String getNMVersion(); public abstract String getNMVersion();
public abstract List<NMContainerStatus> getNMContainerStatuses(); public abstract List<NMContainerStatus> getNMContainerStatuses();
public abstract Set<String> getNodeLabels();
public abstract void setNodeLabels(Set<String> nodeLabels);


/** /**
* We introduce this here because currently YARN RM doesn't persist nodes info * We introduce this here because currently YARN RM doesn't persist nodes info
Expand Down
Expand Up @@ -45,4 +45,7 @@ public interface RegisterNodeManagerResponse {
void setRMVersion(String version); void setRMVersion(String version);


String getRMVersion(); String getRMVersion();

boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
} }
Expand Up @@ -18,6 +18,11 @@


package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;


import java.util.HashSet;
import java.util.Set;

import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
Expand All @@ -36,6 +41,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private NodeStatus nodeStatus = null; private NodeStatus nodeStatus = null;
private MasterKey lastKnownContainerTokenMasterKey = null; private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null; private MasterKey lastKnownNMTokenMasterKey = null;
private Set<String> labels = null;


public NodeHeartbeatRequestPBImpl() { public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder(); builder = NodeHeartbeatRequestProto.newBuilder();
Expand Down Expand Up @@ -80,6 +86,11 @@ private void mergeLocalToBuilder() {
builder.setLastKnownNmTokenMasterKey( builder.setLastKnownNmTokenMasterKey(
convertToProtoFormat(this.lastKnownNMTokenMasterKey)); convertToProtoFormat(this.lastKnownNMTokenMasterKey));
} }
if (this.labels != null) {
builder.clearNodeLabels();
builder.setNodeLabels(StringArrayProto.newBuilder()
.addAllElements(this.labels).build());
}
} }


private void mergeLocalToProto() { private void mergeLocalToProto() {
Expand Down Expand Up @@ -178,4 +189,30 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
private MasterKeyProto convertToProtoFormat(MasterKey t) { private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl)t).getProto(); return ((MasterKeyPBImpl)t).getProto();
} }

@Override
public Set<String> getNodeLabels() {
initNodeLabels();
return this.labels;
}

@Override
public void setNodeLabels(Set<String> nodeLabels) {
maybeInitBuilder();
builder.clearNodeLabels();
this.labels = nodeLabels;
}

private void initNodeLabels() {
if (this.labels != null) {
return;
}
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeLabels()) {
labels = null;
return;
}
StringArrayProto nodeLabels = p.getNodeLabels();
labels = new HashSet<String>(nodeLabels.getElementsList());
}
} }
Expand Up @@ -483,5 +483,18 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
private MasterKeyProto convertToProtoFormat(MasterKey t) { private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl) t).getProto(); return ((MasterKeyPBImpl) t).getProto();
} }

@Override
public boolean getAreNodeLabelsAcceptedByRM() {
NodeHeartbeatResponseProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
return p.getAreNodeLabelsAcceptedByRM();
}

@Override
public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
maybeInitBuilder();
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
}
} }


Expand Up @@ -20,32 +20,27 @@




import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Set;


import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;




public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest { public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance(); RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
Expand All @@ -56,7 +51,8 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private NodeId nodeId = null; private NodeId nodeId = null;
private List<NMContainerStatus> containerStatuses = null; private List<NMContainerStatus> containerStatuses = null;
private List<ApplicationId> runningApplications = null; private List<ApplicationId> runningApplications = null;

private Set<String> labels = null;

public RegisterNodeManagerRequestPBImpl() { public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder(); builder = RegisterNodeManagerRequestProto.newBuilder();
} }
Expand Down Expand Up @@ -86,7 +82,11 @@ private void mergeLocalToBuilder() {
if (this.nodeId != null) { if (this.nodeId != null) {
builder.setNodeId(convertToProtoFormat(this.nodeId)); builder.setNodeId(convertToProtoFormat(this.nodeId));
} }

if (this.labels != null) {
builder.clearNodeLabels();
builder.setNodeLabels(StringArrayProto.newBuilder()
.addAllElements(this.labels).build());
}
} }


private synchronized void addNMContainerStatusesToProto() { private synchronized void addNMContainerStatusesToProto() {
Expand Down Expand Up @@ -292,6 +292,32 @@ public void setNMVersion(String version) {
builder.setNmVersion(version); builder.setNmVersion(version);
} }


@Override
public Set<String> getNodeLabels() {
initNodeLabels();
return this.labels;
}

@Override
public void setNodeLabels(Set<String> nodeLabels) {
maybeInitBuilder();
builder.clearNodeLabels();
this.labels = nodeLabels;
}

private void initNodeLabels() {
if (this.labels != null) {
return;
}
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeLabels()) {
labels=null;
return;
}
StringArrayProto nodeLabels = p.getNodeLabels();
labels = new HashSet<String>(nodeLabels.getElementsList());
}

private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p); return new ApplicationIdPBImpl(p);
} }
Expand Down
Expand Up @@ -216,4 +216,17 @@ private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
private MasterKeyProto convertToProtoFormat(MasterKey t) { private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl)t).getProto(); return ((MasterKeyPBImpl)t).getProto();
} }

@Override
public boolean getAreNodeLabelsAcceptedByRM() {
RegisterNodeManagerResponseProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
return p.getAreNodeLabelsAcceptedByRM();
}

@Override
public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
maybeInitBuilder();
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
}
} }
Expand Up @@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto {
optional string nm_version = 5; optional string nm_version = 5;
repeated NMContainerStatusProto container_statuses = 6; repeated NMContainerStatusProto container_statuses = 6;
repeated ApplicationIdProto runningApplications = 7; repeated ApplicationIdProto runningApplications = 7;
optional StringArrayProto nodeLabels = 8;
} }


message RegisterNodeManagerResponseProto { message RegisterNodeManagerResponseProto {
Expand All @@ -41,12 +42,14 @@ message RegisterNodeManagerResponseProto {
optional int64 rm_identifier = 4; optional int64 rm_identifier = 4;
optional string diagnostics_message = 5; optional string diagnostics_message = 5;
optional string rm_version = 6; optional string rm_version = 6;
optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
} }


message NodeHeartbeatRequestProto { message NodeHeartbeatRequestProto {
optional NodeStatusProto node_status = 1; optional NodeStatusProto node_status = 1;
optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3; optional MasterKeyProto last_known_nm_token_master_key = 3;
optional StringArrayProto nodeLabels = 4;
} }


message NodeHeartbeatResponseProto { message NodeHeartbeatResponseProto {
Expand All @@ -60,6 +63,7 @@ message NodeHeartbeatResponseProto {
optional string diagnostics_message = 8; optional string diagnostics_message = 8;
repeated ContainerIdProto containers_to_be_removed_from_nm = 9; repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10; repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
} }


message SystemCredentialsForAppsProto { message SystemCredentialsForAppsProto {
Expand Down

0 comments on commit 2a945d2

Please sign in to comment.