Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[YARN-11421] Graceful Decommission ignores launched containers and gets deactivated before timeout #5905

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1280,6 +1280,16 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int
DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL = 20;

/**
* Allow Delayed Removal of Decommissioning Nodes to account for scheduled AM containers
* on that node. No delay if set to 0.
*/
public static final String RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS =
RM_PREFIX + "decommissioning-nodes-watcher.delay-ms";

public static final int
DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS = 0;

////////////////////////////////
// Node Manager Configs
////////////////////////////////
Expand Down
Expand Up @@ -3635,6 +3635,15 @@
<value>20</value>
</property>

<property>
<description>
Allow Delayed Removal of Decommissioning Nodes to account for scheduled AM containers on that node.
No delay if set to 0. Expire interval should not be configured more than RM_AM_EXPIRY_INTERVAL_MS.
</description>
<name>yarn.resourcemanager.decommissioning-nodes-watcher.delay-ms</name>
<value>0</value>
</property>

<property>
<description>
Used to specify custom web services for Resourcemanager. Value can be
Expand Down
Expand Up @@ -114,6 +114,7 @@ void updateTimeout(int timeoutSec) {

private Timer pollTimer;
private MonotonicClock mclock;
private long expireIntvl;

public DecommissioningNodesWatcher(RMContext rmContext) {
this.rmContext = rmContext;
Expand All @@ -126,9 +127,21 @@ public void init(Configuration conf) {
YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL,
YarnConfiguration
.DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_POLL_INTERVAL);
this.expireIntvl = setExpireInterval(conf);
pollTimer.schedule(new PollTimerTask(rmContext), 0, (1000L * v));
}

// set value of decommissioned nodes watcher delay to allow delayed removal of
// decommissioning nodes, but delay should not be more than RM_AM_EXPIRY_INTERVAL_MS
private long setExpireInterval(Configuration conf) {
return Math.min(
conf.getInt(YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use getTimeDuration()?

YarnConfiguration.DEFAULT_RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS),
conf.getLong(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS)
);
}

/**
* Update rmNode decommissioning status based on NodeStatus.
* @param rmNode The node
Expand Down Expand Up @@ -221,10 +234,14 @@ public enum DecommissioningNodeStatus {

// The node has already been decommissioned
DECOMMISSIONED,

// wait for possibility of scheduled AM containers
WAIT_SCHEDULED_APPS
}

public boolean checkReadyToBeDecommissioned(NodeId nodeId) {
DecommissioningNodeStatus s = checkDecommissioningStatus(nodeId);
LOG.debug("checkReadyToBeDecommissioned {} status {}.", nodeId, s);
return (s == DecommissioningNodeStatus.READY ||
s == DecommissioningNodeStatus.TIMEOUT);
}
Expand All @@ -247,7 +264,15 @@ public DecommissioningNodeStatus checkDecommissioningStatus(NodeId nodeId) {
}

if (context.appIds.size() == 0) {
return DecommissioningNodeStatus.READY;
// wait for am expire interval or decommission timeout whichever is smaller
// if decommission timeout is negative, use am expire interval
long effectiveTimeout = context.timeoutMs > 0 ? Math.min(context.timeoutMs, expireIntvl) :
expireIntvl;
LOG.debug("checkReadyToBeDecommissioned {}, context.timeoutMs={}, expireIntvl={}, " +
"waitTime={}, effectiveTimeout={}", nodeId, context.timeoutMs, expireIntvl, waitTime,
effectiveTimeout);
return waitTime >= effectiveTimeout?
DecommissioningNodeStatus.READY : DecommissioningNodeStatus.WAIT_SCHEDULED_APPS;
} else {
return (context.timeoutMs < 0 || waitTime < context.timeoutMs)?
DecommissioningNodeStatus.WAIT_APP :
Expand Down
Expand Up @@ -1439,10 +1439,15 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
* @return true if node has any AM scheduled on it.
*/
private boolean hasScheduledAMContainers(RMNodeImpl rmNode) {
return rmNode.context.getScheduler()
boolean hasScheduledAMContainers = rmNode.context.getScheduler()
.getSchedulerNode(rmNode.getNodeID())
.getCopiedListOfRunningContainers()
.stream().anyMatch(RMContainer::isAMContainer);
if (hasScheduledAMContainers) {
LOG.info("Node " + rmNode.nodeId + " has AM containers scheduled on it."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use {} logger format.

+ " Will not deactivate it.");
}
return hasScheduledAMContainers;
}
}

Expand Down
Expand Up @@ -162,6 +162,80 @@ public void testDecommissioningNodesWatcherWithPreviousRunningApps()
watcher.checkDecommissioningStatus(id1));
}

@Test
public void testDecommissioningNodesWatcherWithScheduledAMContainers()
throws Exception {
Configuration conf = new Configuration();
// decommission timeout is 10 min
conf.set(YarnConfiguration.RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT, "600");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setInt or ideally setTimeDuration?

// watcher delay is 5min
conf.set(YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS, "300000");

rm = new MockRM(conf);
rm.start();

DecommissioningNodesWatcher watcher =
new DecommissioningNodesWatcher(rm.getRMContext());
watcher.init(conf);

MockNM nm1 = rm.registerNode("host1:1234", 10240);
RMNodeImpl node1 =
(RMNodeImpl) rm.getRMContext().getRMNodes().get(nm1.getNodeId());
NodeId id1 = nm1.getNodeId();

rm.waitForState(id1, NodeState.RUNNING);

// just submit the app
RMApp app = MockRMAppSubmitter.submit(rm,
MockRMAppSubmissionData.Builder.createWithMemory(2000, rm).build());
MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);

// rmnode still thinks there are 0 apps because it hasn't received updated node status
Assert.assertEquals(0, node1.getRunningApps().size());

// Setup nm1 as DECOMMISSIONING for DecommissioningNodesWatcher. Right now
// there is no container running on the node.
rm.sendNodeGracefulDecommission(nm1,
YarnConfiguration.DEFAULT_RM_NODE_GRACEFUL_DECOMMISSION_TIMEOUT);
rm.waitForState(id1, NodeState.DECOMMISSIONING);

// we should still get WAIT_SCHEDULED_APPS as expiry time is not over
NodeHealthStatus status = NodeHealthStatus.newInstance(true, "",
System.currentTimeMillis() - 1000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indentation looks funny.

NodeStatus nodeStatus = NodeStatus.newInstance(id1, 0,
new ArrayList<>(), Collections.emptyList(), status, null, null, null);
watcher.update(node1, nodeStatus);

Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
Assert.assertEquals(DecommissioningNodeStatus.WAIT_SCHEDULED_APPS,
watcher.checkDecommissioningStatus(id1));

// update node with 3 running containers
nodeStatus = createNodeStatus(id1, app, 3);
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
watcher.update(node1, nodeStatus);
Assert.assertEquals(1, node1.getRunningApps().size());
Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
Assert.assertEquals(DecommissioningNodeStatus.WAIT_CONTAINER,
watcher.checkDecommissioningStatus(id1));

// update node with 0 running containers
nodeStatus = createNodeStatus(id1, app, 0);
node1.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeStatus));
watcher.update(node1, nodeStatus);
Assert.assertFalse(watcher.checkReadyToBeDecommissioned(id1));
// we still get status as WAIT_APP since app is still running even if
// containers are 0
Assert.assertEquals(DecommissioningNodeStatus.WAIT_APP,
watcher.checkDecommissioningStatus(id1));

// Set app to be FINISHED and verified DecommissioningNodeStatus is READY.
MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
Assert.assertEquals(0, node1.getRunningApps().size());
watcher.update(node1, nodeStatus);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we update this for if we don't assert later?

}

@After
public void tearDown() {
if (rm != null) {
Expand Down
Expand Up @@ -351,6 +351,65 @@ public void testGracefulDecommissionNoApp() throws Exception {
Assert.assertEquals(NodeAction.SHUTDOWN, nodeHeartbeat3.getNodeAction());
}

@Test (timeout = 60000)
public void testGracefulDecommissionRaceCondition() throws Exception {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH, hostFile
.getAbsolutePath());
conf.set(YarnConfiguration.RM_DECOMMISSIONING_NODES_WATCHER_DELAY_MS, "120000");

writeToHostsFile("");
rm = new MockRM(conf);
rm.start();

MockNM nm1 = rm.registerNode("host1:1234", 10240);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it 10*1024.

MockNM nm2 = rm.registerNode("host2:5678", 20480);
MockNM nm3 = rm.registerNode("host3:4433", 10240);

NodeId id1 = nm1.getNodeId();
NodeId id2 = nm2.getNodeId();
NodeId id3 = nm3.getNodeId();

rm.waitForState(id1, NodeState.RUNNING);
rm.waitForState(id2, NodeState.RUNNING);
rm.waitForState(id3, NodeState.RUNNING);

// Create an app and schedule AM on host1.
RMApp app = MockRMAppSubmitter.submitWithMemory(2000, rm);
MockAM am = MockRM.launchAM(app, rm, nm1);

// Before sending heartbeat we gracefully decommission the node on which AM
// is scheduled to simulate race condition.
writeToHostsFile("host1", "host3");
rm.getNodesListManager().refreshNodesGracefully(conf, 30);
rm.waitForState(id1, NodeState.DECOMMISSIONING);
rm.waitForState(id3, NodeState.DECOMMISSIONING);

nm1.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
// host1 should stay in DECOMMISSIONING as we are waiting for am launch timeout
// and the there are scheduled am containers on host1.
// host3 should be decommissioned as there is no container scheduled on it.
rm.waitForState(id1, NodeState.DECOMMISSIONING);
rm.waitForState(id3, NodeState.DECOMMISSIONED);

ApplicationAttemptId aaid = app.getCurrentAppAttempt().getAppAttemptId();
nm1.nodeHeartbeat(aaid, 1, ContainerState.RUNNING);
nm3.nodeHeartbeat(true);

// host1 is in Decommissioning due to containers on the node, host3 should be in DECOMMISSIONED
// because no containers on the node
rm.waitForState(id1, NodeState.DECOMMISSIONING);
rm.waitForState(id3, NodeState.DECOMMISSIONED);

am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
MockRM.finishAMAndVerifyAppState(app, rm, nm1, am);
nm1.nodeHeartbeat(aaid, 1, ContainerState.COMPLETE);
rm.waitForState(app.getApplicationId(), RMAppState.FINISHED);
rm.waitForState(id1, NodeState.DECOMMISSIONED);
}

@Test
public void testGracefulDecommissionDefaultTimeoutResolution()
throws Exception {
Expand Down
Expand Up @@ -160,9 +160,10 @@ Following are the sub-status of a decommissioning node:
Configuration
--------

| Property | Value |
| ---------------------------------------- | ---------------------------------------- |
| yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs | Timeout in seconds for YARN node graceful decommission. This is the maximal time to wait for running containers and applications to complete before transition a DECOMMISSIONING node into DECOMMISSIONED. The default value is 3600 seconds. Negative value (like -1) is handled as infinite timeout. |
| yarn.resourcemanager.decommissioning-nodes-watcher.poll-interval-secs | Period in seconds of the poll timer task inside DecommissioningNodesWatcher to identify and take care of DECOMMISSIONING nodes missing regular heart beat. The default value is 20 seconds. |
| yarn.resourcemanager.nodes.exclude-path | Path to file with nodes to exclude. |
| yarn.resourcemanager.nodes.include-path | Path to file with nodes to include. |
| Property | Value |
|-----------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| yarn.resourcemanager.nodemanager-graceful-decommission-timeout-secs | Timeout in seconds for YARN node graceful decommission. This is the maximal time to wait for running containers and applications to complete before transition a DECOMMISSIONING node into DECOMMISSIONED. The default value is 3600 seconds. Negative value (like -1) is handled as infinite timeout. |
| yarn.resourcemanager.decommissioning-nodes-watcher.poll-interval-secs | Period in seconds of the poll timer task inside DecommissioningNodesWatcher to identify and take care of DECOMMISSIONING nodes missing regular heart beat. The default value is 20 seconds. |
| yarn.resourcemanager.decommissioning-nodes-watcher.delay-ms | Allow Delayed Removal of Decommissioning Nodes to account for Scheduled AM containers on that node. Default value is 0 which means no delay in removing node. Expire interval should not be configured more than RM_AM_EXPIRY_INTERVAL_MS. |
| yarn.resourcemanager.nodes.exclude-path | Path to file with nodes to exclude. |
| yarn.resourcemanager.nodes.include-path | Path to file with nodes to include. |