Skip to content

Commit

Permalink
YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementatio…
Browse files Browse the repository at this point in the history
…n to support container resizing. Contributed by Meng Ding
  • Loading branch information
jian-he authored and wangdatan committed Sep 23, 2015
1 parent c59ae4e commit c3dc1af
Show file tree
Hide file tree
Showing 17 changed files with 628 additions and 90 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -215,6 +215,9 @@ Release 2.8.0 - UNRELEASED
YARN-1643. Make ContainersMonitor support changing monitoring size of an
allocated container. (Meng Ding and Wangda Tan)

YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to
support container resizing. (Meng Ding via jianhe)

IMPROVEMENTS

YARN-644. Basic null check is not performed on passed in arguments before
Expand Down
Expand Up @@ -68,7 +68,7 @@ public void testResourceTrackerOnHA() throws Exception {
failoverThread = createAndStartFailoverThread();
NodeStatus status =
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
null, null, null, null);
null, null, null, null, null);
NodeHeartbeatRequest request2 =
NodeHeartbeatRequest.newInstance(status, null, null,null);
resourceTracker.nodeHeartbeat(request2);
Expand Down
Expand Up @@ -24,6 +24,7 @@

import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;

Expand Down Expand Up @@ -70,4 +71,7 @@ void setSystemCredentialsForApps(

boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);

List<Container> getContainersToDecrease();
void addAllContainersToDecrease(List<Container> containersToDecrease);
}
Expand Up @@ -27,12 +27,15 @@

import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
Expand All @@ -58,7 +61,9 @@ public class NodeHeartbeatResponsePBImpl extends

private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;


private List<Container> containersToDecrease = null;

public NodeHeartbeatResponsePBImpl() {
builder = NodeHeartbeatResponseProto.newBuilder();
}
Expand Down Expand Up @@ -96,6 +101,9 @@ private void mergeLocalToBuilder() {
if (this.systemCredentials != null) {
addSystemCredentialsToProto();
}
if (this.containersToDecrease != null) {
addContainersToDecreaseToProto();
}
}

private void addSystemCredentialsToProto() {
Expand Down Expand Up @@ -408,6 +416,64 @@ public void remove() {
builder.addAllApplicationsToCleanup(iterable);
}

private void initContainersToDecrease() {
if (this.containersToDecrease != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getContainersToDecreaseList();
this.containersToDecrease = new ArrayList<>();

for (ContainerProto c : list) {
this.containersToDecrease.add(convertFromProtoFormat(c));
}
}

@Override
public List<Container> getContainersToDecrease() {
initContainersToDecrease();
return this.containersToDecrease;
}

@Override
public void addAllContainersToDecrease(
final List<Container> containersToDecrease) {
if (containersToDecrease == null) {
return;
}
initContainersToDecrease();
this.containersToDecrease.addAll(containersToDecrease);
}

private void addContainersToDecreaseToProto() {
maybeInitBuilder();
builder.clearContainersToDecrease();
if (this.containersToDecrease == null) {
return;
}
Iterable<ContainerProto> iterable = new
Iterable<ContainerProto>() {
@Override
public Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
private Iterator<Container> iter = containersToDecrease.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainersToDecrease(iterable);
}

@Override
public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
Expand Down Expand Up @@ -484,6 +550,14 @@ private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl) t).getProto();
}

private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
return new ContainerPBImpl(p);
}

private ContainerProto convertToProtoFormat(Container t) {
return ((ContainerPBImpl) t).getProto();
}

@Override
public boolean getAreNodeLabelsAcceptedByRM() {
NodeHeartbeatResponseProtoOrBuilder p =
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
Expand All @@ -48,14 +49,16 @@ public abstract class NodeStatus {
* @param nodeHealthStatus Health status of the node.
* @param containersUtilization Utilization of the containers in this node.
* @param nodeUtilization Utilization of the node.
* @param increasedContainers Containers whose resource has been increased.
* @return New {@code NodeStatus} with the provided information.
*/
public static NodeStatus newInstance(NodeId nodeId, int responseId,
List<ContainerStatus> containerStatuses,
List<ApplicationId> keepAliveApplications,
NodeHealthStatus nodeHealthStatus,
ResourceUtilization containersUtilization,
ResourceUtilization nodeUtilization) {
ResourceUtilization nodeUtilization,
List<Container> increasedContainers) {
NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
nodeStatus.setResponseId(responseId);
nodeStatus.setNodeId(nodeId);
Expand All @@ -64,6 +67,7 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId,
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
nodeStatus.setContainersUtilization(containersUtilization);
nodeStatus.setNodeUtilization(nodeUtilization);
nodeStatus.setIncreasedContainers(increasedContainers);
return nodeStatus;
}

Expand Down Expand Up @@ -108,4 +112,13 @@ public abstract void setContainersUtilization(
@Unstable
public abstract void setNodeUtilization(
ResourceUtilization nodeUtilization);

@Public
@Unstable
public abstract List<Container> getIncreasedContainers();

@Private
@Unstable
public abstract void setIncreasedContainers(
List<Container> increasedContainers);
}
Expand Up @@ -24,13 +24,16 @@
import java.util.List;

import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
Expand All @@ -49,7 +52,8 @@ public class NodeStatusPBImpl extends NodeStatus {
private List<ContainerStatus> containers = null;
private NodeHealthStatus nodeHealthStatus = null;
private List<ApplicationId> keepAliveApplications = null;

private List<Container> increasedContainers = null;

public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder();
}
Expand Down Expand Up @@ -79,6 +83,9 @@ private synchronized void mergeLocalToBuilder() {
if (this.keepAliveApplications != null) {
addKeepAliveApplicationsToProto();
}
if (this.increasedContainers != null) {
addIncreasedContainersToProto();
}
}

private synchronized void mergeLocalToProto() {
Expand Down Expand Up @@ -165,6 +172,37 @@ public void remove() {
builder.addAllKeepAliveApplications(iterable);
}

private synchronized void addIncreasedContainersToProto() {
maybeInitBuilder();
builder.clearIncreasedContainers();
if (increasedContainers == null) {
return;
}
Iterable<ContainerProto> iterable = new
Iterable<ContainerProto>() {
@Override
public Iterator<ContainerProto> iterator() {
return new Iterator<ContainerProto>() {
private Iterator<Container> iter =
increasedContainers.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllIncreasedContainers(iterable);
}

@Override
public int hashCode() {
return getProto().hashCode();
Expand Down Expand Up @@ -336,6 +374,31 @@ public synchronized void setNodeUtilization(
.setNodeUtilization(convertToProtoFormat(nodeUtilization));
}

@Override
public synchronized List<Container> getIncreasedContainers() {
if (increasedContainers != null) {
return increasedContainers;
}
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerProto> list = p.getIncreasedContainersList();
this.increasedContainers = new ArrayList<>();
for (ContainerProto c : list) {
this.increasedContainers.add(convertFromProtoFormat(c));
}
return this.increasedContainers;
}

@Override
public synchronized void setIncreasedContainers(
List<Container> increasedContainers) {
maybeInitBuilder();
if (increasedContainers == null) {
builder.clearIncreasedContainers();
return;
}
this.increasedContainers = increasedContainers;
}

private NodeIdProto convertToProtoFormat(NodeId nodeId) {
return ((NodeIdPBImpl)nodeId).getProto();
}
Expand Down Expand Up @@ -377,4 +440,14 @@ private ResourceUtilizationPBImpl convertFromProtoFormat(
ResourceUtilizationProto p) {
return new ResourceUtilizationPBImpl(p);
}

private ContainerPBImpl convertFromProtoFormat(
ContainerProto c) {
return new ContainerPBImpl(c);
}

private ContainerProto convertToProtoFormat(
Container c) {
return ((ContainerPBImpl)c).getProto();
}
}
Expand Up @@ -38,6 +38,7 @@ message NodeStatusProto {
repeated ApplicationIdProto keep_alive_applications = 5;
optional ResourceUtilizationProto containers_utilization = 6;
optional ResourceUtilizationProto node_utilization = 7;
repeated ContainerProto increased_containers = 8;
}

message MasterKeyProto {
Expand All @@ -60,4 +61,4 @@ message ResourceUtilizationProto {
optional int32 pmem = 1;
optional int32 vmem = 2;
optional float cpu = 3;
}
}
Expand Up @@ -82,6 +82,7 @@ message NodeHeartbeatResponseProto {
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
repeated ContainerProto containers_to_decrease = 12;
}

message SystemCredentialsForAppsProto {
Expand Down

0 comments on commit c3dc1af

Please sign in to comment.