Skip to content

Commit

Permalink
YARN-7102. NM heartbeat stuck when responseId overflows MAX_INT. Cont…
Browse files Browse the repository at this point in the history
…ributed by Botong Huang
  • Loading branch information
jlowe committed Jan 25, 2018
1 parent 16be42d commit ff8378e
Show file tree
Hide file tree
Showing 12 changed files with 125 additions and 128 deletions.
Expand Up @@ -71,7 +71,7 @@ public class NMSimulator extends TaskRunner.Task {
// resource manager // resource manager
private ResourceManager rm; private ResourceManager rm;
// heart beat response id // heart beat response id
private int RESPONSE_ID = 1; private int responseId = 0;
private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class); private final static Logger LOG = LoggerFactory.getLogger(NMSimulator.class);


public void init(String nodeIdStr, Resource nodeResource, public void init(String nodeIdStr, Resource nodeResource,
Expand Down Expand Up @@ -131,7 +131,7 @@ public void middleStep() throws Exception {
ns.setContainersStatuses(generateContainerStatusList()); ns.setContainersStatuses(generateContainerStatusList());
ns.setNodeId(node.getNodeID()); ns.setNodeId(node.getNodeID());
ns.setKeepAliveApplications(new ArrayList<ApplicationId>()); ns.setKeepAliveApplications(new ArrayList<ApplicationId>());
ns.setResponseId(RESPONSE_ID ++); ns.setResponseId(responseId++);
ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0)); ns.setNodeHealthStatus(NodeHealthStatus.newInstance(true, "", 0));
beatRequest.setNodeStatus(ns); beatRequest.setNodeStatus(ns);
NodeHeartbeatResponse beatResponse = NodeHeartbeatResponse beatResponse =
Expand Down
Expand Up @@ -144,8 +144,8 @@ public List<ApplicationId> getRunningApps() {
return runningApplications; return runningApplications;
} }


public void updateNodeHeartbeatResponseForCleanup( public void setAndUpdateNodeHeartbeatResponse(
NodeHeartbeatResponse response) { NodeHeartbeatResponse response) {
} }


public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
Expand Down Expand Up @@ -178,13 +178,6 @@ public Set<String> getNodeLabels() {
return RMNodeLabelsManager.EMPTY_STRING_SET; return RMNodeLabelsManager.EMPTY_STRING_SET;
} }


@Override
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
// TODO Auto-generated method stub

}

@Override @Override
public List<Container> pullNewlyIncreasedContainers() { public List<Container> pullNewlyIncreasedContainers() {
// TODO Auto-generated method stub // TODO Auto-generated method stub
Expand Down
Expand Up @@ -127,9 +127,9 @@ public List<ApplicationId> getRunningApps() {
} }


@Override @Override
public void updateNodeHeartbeatResponseForCleanup( public void setAndUpdateNodeHeartbeatResponse(
NodeHeartbeatResponse nodeHeartbeatResponse) { NodeHeartbeatResponse nodeHeartbeatResponse) {
node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse); node.setAndUpdateNodeHeartbeatResponse(nodeHeartbeatResponse);
} }


@Override @Override
Expand Down Expand Up @@ -167,12 +167,6 @@ public Set<String> getNodeLabels() {
return RMNodeLabelsManager.EMPTY_STRING_SET; return RMNodeLabelsManager.EMPTY_STRING_SET;
} }


@Override
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
// TODO Auto-generated method stub
}

@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public List<Container> pullNewlyIncreasedContainers() { public List<Container> pullNewlyIncreasedContainers() {
Expand Down
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;


import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -81,6 +82,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStartedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider; import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
Expand Down Expand Up @@ -403,14 +405,37 @@ public RegisterNodeManagerResponse registerNodeManager(
} else { } else {
LOG.info("Reconnect from the node at: " + host); LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId); this.nmLivelinessMonitor.unregister(nodeId);
// Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse(); if (CollectionUtils.isEmpty(request.getRunningApplications())
this.rmContext && rmNode.getState() != NodeState.DECOMMISSIONING
.getDispatcher() && rmNode.getHttpPort() != oldNode.getHttpPort()) {
.getEventHandler() // Reconnected node differs, so replace old node and start new node
.handle( switch (rmNode.getState()) {
new RMNodeReconnectEvent(nodeId, rmNode, request case RUNNING:
.getRunningApplications(), request.getNMContainerStatuses())); ClusterMetrics.getMetrics().decrNumActiveNodes();
break;
case UNHEALTHY:
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
break;
default:
LOG.debug("Unexpected Rmnode state");
}
this.rmContext.getDispatcher().getEventHandler()
.handle(new NodeRemovedSchedulerEvent(rmNode));

this.rmContext.getRMNodes().put(nodeId, rmNode);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeStartedEvent(nodeId, null, null));

} else {
// Reset heartbeat ID since node just restarted.
oldNode.resetLastNodeHeartBeatResponse();

this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeReconnectEvent(nodeId, rmNode,
request.getRunningApplications(),
request.getNMContainerStatuses()));
}
} }
// On every node manager register we will be clearing NMToken keys if // On every node manager register we will be clearing NMToken keys if
// present for any running application. // present for any running application.
Expand Down Expand Up @@ -508,12 +533,13 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)


// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse(); NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse if (getNextResponseId(
.getResponseId()) { remoteNodeStatus.getResponseId()) == lastNodeHeartbeatResponse
.getResponseId()) {
LOG.info("Received duplicate heartbeat from node " LOG.info("Received duplicate heartbeat from node "
+ rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId()); + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
return lastNodeHeartbeatResponse; return lastNodeHeartbeatResponse;
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse } else if (remoteNodeStatus.getResponseId() != lastNodeHeartbeatResponse
.getResponseId()) { .getResponseId()) {
String message = String message =
"Too far behind rm response id:" "Too far behind rm response id:"
Expand Down Expand Up @@ -549,13 +575,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
} }


// Heartbeat response // Heartbeat response
NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils NodeHeartbeatResponse nodeHeartBeatResponse =
.newNodeHeartbeatResponse(lastNodeHeartbeatResponse. YarnServerBuilderUtils.newNodeHeartbeatResponse(
getResponseId() + 1, NodeAction.NORMAL, null, null, null, null, getNextResponseId(lastNodeHeartbeatResponse.getResponseId()),
nextHeartBeatInterval); NodeAction.NORMAL, null, null, null, null, nextHeartBeatInterval);
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse); rmNode.setAndUpdateNodeHeartbeatResponse(nodeHeartBeatResponse);
rmNode.updateNodeHeartbeatResponseForUpdatedContainers(
nodeHeartBeatResponse);


populateKeys(request, nodeHeartBeatResponse); populateKeys(request, nodeHeartBeatResponse);


Expand All @@ -573,7 +597,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)


// 4. Send status to RMNode, saving the latest response. // 4. Send status to RMNode, saving the latest response.
RMNodeStatusEvent nodeStatusEvent = RMNodeStatusEvent nodeStatusEvent =
new RMNodeStatusEvent(nodeId, remoteNodeStatus, nodeHeartBeatResponse); new RMNodeStatusEvent(nodeId, remoteNodeStatus);
if (request.getLogAggregationReportsForApps() != null if (request.getLogAggregationReportsForApps() != null
&& !request.getLogAggregationReportsForApps().isEmpty()) { && !request.getLogAggregationReportsForApps().isEmpty()) {
nodeStatusEvent.setLogAggregationReportsForApps(request nodeStatusEvent.setLogAggregationReportsForApps(request
Expand Down Expand Up @@ -614,6 +638,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
return nodeHeartBeatResponse; return nodeHeartBeatResponse;
} }


private int getNextResponseId(int responseId) {
// Loop between 0 and Integer.MAX_VALUE
return (responseId + 1) & Integer.MAX_VALUE;
}

private void setAppCollectorsMapToResponse( private void setAppCollectorsMapToResponse(
List<ApplicationId> runningApps, NodeHeartbeatResponse response) { List<ApplicationId> runningApps, NodeHeartbeatResponse response) {
Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new Map<ApplicationId, AppCollectorData> liveAppCollectorsMap = new
Expand Down
Expand Up @@ -141,10 +141,11 @@ public interface RMNode {


/** /**
* Update a {@link NodeHeartbeatResponse} with the list of containers and * Update a {@link NodeHeartbeatResponse} with the list of containers and
* applications to clean up for this node. * applications to clean up for this node, and the containers to be updated.
*
* @param response the {@link NodeHeartbeatResponse} to update * @param response the {@link NodeHeartbeatResponse} to update
*/ */
void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response); void setAndUpdateNodeHeartbeatResponse(NodeHeartbeatResponse response);


public NodeHeartbeatResponse getLastNodeHeartBeatResponse(); public NodeHeartbeatResponse getLastNodeHeartBeatResponse();


Expand All @@ -167,13 +168,7 @@ public interface RMNode {
* @return labels in this node * @return labels in this node
*/ */
public Set<String> getNodeLabels(); public Set<String> getNodeLabels();


/**
* Update containers to be updated
*/
void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response);

public List<Container> pullNewlyIncreasedContainers(); public List<Container> pullNewlyIncreasedContainers();


OpportunisticContainersStatus getOpportunisticContainersStatus(); OpportunisticContainersStatus getOpportunisticContainersStatus();
Expand Down
Expand Up @@ -598,7 +598,8 @@ public List<ContainerId> getContainersToCleanUp() {
}; };


@Override @Override
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) { public void setAndUpdateNodeHeartbeatResponse(
NodeHeartbeatResponse response) {
this.writeLock.lock(); this.writeLock.lock();


try { try {
Expand All @@ -613,38 +614,30 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response
this.finishedApplications.clear(); this.finishedApplications.clear();
this.containersToSignal.clear(); this.containersToSignal.clear();
this.containersToBeRemovedFromNM.clear(); this.containersToBeRemovedFromNM.clear();
} finally {
this.writeLock.unlock();
}
};

@VisibleForTesting
public Collection<Container> getToBeUpdatedContainers() {
return toBeUpdatedContainers.values();
}

@Override
public void updateNodeHeartbeatResponseForUpdatedContainers(
NodeHeartbeatResponse response) {
this.writeLock.lock();

try {
response.addAllContainersToUpdate(toBeUpdatedContainers.values()); response.addAllContainersToUpdate(toBeUpdatedContainers.values());
toBeUpdatedContainers.clear(); toBeUpdatedContainers.clear();


// NOTE: This is required for backward compatibility. // NOTE: This is required for backward compatibility.
response.addAllContainersToDecrease(toBeDecreasedContainers.values()); response.addAllContainersToDecrease(toBeDecreasedContainers.values());
toBeDecreasedContainers.clear(); toBeDecreasedContainers.clear();

// Synchronously update the last response in rmNode with updated
// responseId
this.latestNodeHeartBeatResponse = response;
} finally { } finally {
this.writeLock.unlock(); this.writeLock.unlock();
} }
};

@VisibleForTesting
public Collection<Container> getToBeUpdatedContainers() {
return toBeUpdatedContainers.values();
} }


@Override @Override
public NodeHeartbeatResponse getLastNodeHeartBeatResponse() { public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {

this.readLock.lock(); this.readLock.lock();

try { try {
return this.latestNodeHeartBeatResponse; return this.latestNodeHeartBeatResponse;
} finally { } finally {
Expand Down Expand Up @@ -818,7 +811,6 @@ private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
private static NodeHealthStatus updateRMNodeFromStatusEvents( private static NodeHealthStatus updateRMNodeFromStatusEvents(
RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) { RMNodeImpl rmNode, RMNodeStatusEvent statusEvent) {
// Switch the last heartbeatresponse. // Switch the last heartbeatresponse.
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport()); rmNode.setHealthReport(remoteNodeHealthStatus.getHealthReport());
rmNode.setLastHealthReportTime(remoteNodeHealthStatus rmNode.setLastHealthReportTime(remoteNodeHealthStatus
Expand Down Expand Up @@ -912,21 +904,6 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode)); new NodeAddedSchedulerEvent(rmNode));
} }
} else {
// Reconnected node differs, so replace old node and start new node
switch (rmNode.getState()) {
case RUNNING:
ClusterMetrics.getMetrics().decrNumActiveNodes();
break;
case UNHEALTHY:
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
break;
default:
LOG.debug("Unexpected Rmnode state");
}
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
rmNode.context.getDispatcher().getEventHandler().handle(
new RMNodeStartedEvent(newNode.getNodeID(), null, null));
} }


} else { } else {
Expand Down
Expand Up @@ -27,28 +27,23 @@
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport; import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;


public class RMNodeStatusEvent extends RMNodeEvent { public class RMNodeStatusEvent extends RMNodeEvent {


private final NodeStatus nodeStatus; private final NodeStatus nodeStatus;
private final NodeHeartbeatResponse latestResponse;
private List<LogAggregationReport> logAggregationReportsForApps; private List<LogAggregationReport> logAggregationReportsForApps;


public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus) {
NodeHeartbeatResponse latestResponse) { this(nodeId, nodeStatus, null);
this(nodeId, nodeStatus, latestResponse, null);
} }


public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus,
NodeHeartbeatResponse latestResponse,
List<LogAggregationReport> logAggregationReportsForApps) { List<LogAggregationReport> logAggregationReportsForApps) {
super(nodeId, RMNodeEventType.STATUS_UPDATE); super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeStatus = nodeStatus; this.nodeStatus = nodeStatus;
this.latestResponse = latestResponse;
this.logAggregationReportsForApps = logAggregationReportsForApps; this.logAggregationReportsForApps = logAggregationReportsForApps;
} }


Expand All @@ -60,10 +55,6 @@ public List<ContainerStatus> getContainers() {
return this.nodeStatus.getContainersStatuses(); return this.nodeStatus.getContainersStatuses();
} }


public NodeHeartbeatResponse getLatestResponse() {
return this.latestResponse;
}

public List<ApplicationId> getKeepAliveAppIds() { public List<ApplicationId> getKeepAliveAppIds() {
return this.nodeStatus.getKeepAliveApplications(); return this.nodeStatus.getKeepAliveApplications();
} }
Expand Down

0 comments on commit ff8378e

Please sign in to comment.