Skip to content

Commit

Permalink
YARN-3079. Scheduler should also update maximumAllocation when update…
Browse files Browse the repository at this point in the history
…NodeResource. (Zhihai Xu via wangda)
  • Loading branch information
wangdatan committed Jan 29, 2015
1 parent f378491 commit 7882bc0
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 20 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -434,6 +434,9 @@ Release 2.7.0 - UNRELEASED
YARN-3103. AMRMClientImpl does not update AMRM token properly. (Jason Lowe YARN-3103. AMRMClientImpl does not update AMRM token properly. (Jason Lowe
via jianhe) via jianhe)


YARN-3079. Scheduler should also update maximumAllocation when updateNodeResource.
(Zhihai Xu via wangda)

Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -22,6 +22,8 @@
import java.util.*; import java.util.*;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;


import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -82,8 +84,9 @@ public abstract class AbstractYarnScheduler
private Resource configuredMaximumAllocation; private Resource configuredMaximumAllocation;
private int maxNodeMemory = -1; private int maxNodeMemory = -1;
private int maxNodeVCores = -1; private int maxNodeVCores = -1;
private ReentrantReadWriteLock maximumAllocationLock = private final ReadLock maxAllocReadLock;
new ReentrantReadWriteLock(); private final WriteLock maxAllocWriteLock;

private boolean useConfiguredMaximumAllocationOnly = true; private boolean useConfiguredMaximumAllocationOnly = true;
private long configuredMaximumAllocationWaitTime; private long configuredMaximumAllocationWaitTime;


Expand All @@ -103,6 +106,9 @@ public abstract class AbstractYarnScheduler
*/ */
public AbstractYarnScheduler(String name) { public AbstractYarnScheduler(String name) {
super(name); super(name);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.maxAllocReadLock = lock.readLock();
this.maxAllocWriteLock = lock.writeLock();
} }


@Override @Override
Expand Down Expand Up @@ -157,8 +163,7 @@ public Resource getMinimumResourceCapability() {
@Override @Override
public Resource getMaximumResourceCapability() { public Resource getMaximumResourceCapability() {
Resource maxResource; Resource maxResource;
ReentrantReadWriteLock.ReadLock readLock = maximumAllocationLock.readLock(); maxAllocReadLock.lock();
readLock.lock();
try { try {
if (useConfiguredMaximumAllocationOnly) { if (useConfiguredMaximumAllocationOnly) {
if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp() if (System.currentTimeMillis() - ResourceManager.getClusterTimeStamp()
Expand All @@ -170,22 +175,20 @@ public Resource getMaximumResourceCapability() {
maxResource = Resources.clone(maximumAllocation); maxResource = Resources.clone(maximumAllocation);
} }
} finally { } finally {
readLock.unlock(); maxAllocReadLock.unlock();
} }
return maxResource; return maxResource;
} }


protected void initMaximumResourceCapability(Resource maximumAllocation) { protected void initMaximumResourceCapability(Resource maximumAllocation) {
ReentrantReadWriteLock.WriteLock writeLock = maxAllocWriteLock.lock();
maximumAllocationLock.writeLock();
writeLock.lock();
try { try {
if (this.configuredMaximumAllocation == null) { if (this.configuredMaximumAllocation == null) {
this.configuredMaximumAllocation = Resources.clone(maximumAllocation); this.configuredMaximumAllocation = Resources.clone(maximumAllocation);
this.maximumAllocation = Resources.clone(maximumAllocation); this.maximumAllocation = Resources.clone(maximumAllocation);
} }
} finally { } finally {
writeLock.unlock(); maxAllocWriteLock.unlock();
} }
} }


Expand Down Expand Up @@ -535,19 +538,24 @@ public synchronized void killAllAppsInQueue(String queueName)
*/ */
public synchronized void updateNodeResource(RMNode nm, public synchronized void updateNodeResource(RMNode nm,
ResourceOption resourceOption) { ResourceOption resourceOption) {

SchedulerNode node = getSchedulerNode(nm.getNodeID()); SchedulerNode node = getSchedulerNode(nm.getNodeID());
Resource newResource = resourceOption.getResource(); Resource newResource = resourceOption.getResource();
Resource oldResource = node.getTotalResource(); Resource oldResource = node.getTotalResource();
if(!oldResource.equals(newResource)) { if(!oldResource.equals(newResource)) {
// Log resource change // Log resource change
LOG.info("Update resource on node: " + node.getNodeName() LOG.info("Update resource on node: " + node.getNodeName()
+ " from: " + oldResource + ", to: " + " from: " + oldResource + ", to: "
+ newResource); + newResource);


nodes.remove(nm.getNodeID());
updateMaximumAllocation(node, false);

// update resource to node // update resource to node
node.setTotalResource(newResource); node.setTotalResource(newResource);


nodes.put(nm.getNodeID(), (N)node);
updateMaximumAllocation(node, true);

// update resource to clusterResource // update resource to clusterResource
Resources.subtractFrom(clusterResource, oldResource); Resources.subtractFrom(clusterResource, oldResource);
Resources.addTo(clusterResource, newResource); Resources.addTo(clusterResource, newResource);
Expand All @@ -571,28 +579,27 @@ public Set<String> getPlanQueues() throws YarnException {
} }


protected void updateMaximumAllocation(SchedulerNode node, boolean add) { protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
ReentrantReadWriteLock.WriteLock writeLock = Resource totalResource = node.getTotalResource();
maximumAllocationLock.writeLock(); maxAllocWriteLock.lock();
writeLock.lock();
try { try {
if (add) { // added node if (add) { // added node
int nodeMemory = node.getTotalResource().getMemory(); int nodeMemory = totalResource.getMemory();
if (nodeMemory > maxNodeMemory) { if (nodeMemory > maxNodeMemory) {
maxNodeMemory = nodeMemory; maxNodeMemory = nodeMemory;
maximumAllocation.setMemory(Math.min( maximumAllocation.setMemory(Math.min(
configuredMaximumAllocation.getMemory(), maxNodeMemory)); configuredMaximumAllocation.getMemory(), maxNodeMemory));
} }
int nodeVCores = node.getTotalResource().getVirtualCores(); int nodeVCores = totalResource.getVirtualCores();
if (nodeVCores > maxNodeVCores) { if (nodeVCores > maxNodeVCores) {
maxNodeVCores = nodeVCores; maxNodeVCores = nodeVCores;
maximumAllocation.setVirtualCores(Math.min( maximumAllocation.setVirtualCores(Math.min(
configuredMaximumAllocation.getVirtualCores(), maxNodeVCores)); configuredMaximumAllocation.getVirtualCores(), maxNodeVCores));
} }
} else { // removed node } else { // removed node
if (maxNodeMemory == node.getTotalResource().getMemory()) { if (maxNodeMemory == totalResource.getMemory()) {
maxNodeMemory = -1; maxNodeMemory = -1;
} }
if (maxNodeVCores == node.getTotalResource().getVirtualCores()) { if (maxNodeVCores == totalResource.getVirtualCores()) {
maxNodeVCores = -1; maxNodeVCores = -1;
} }
// We only have to iterate through the nodes if the current max memory // We only have to iterate through the nodes if the current max memory
Expand Down Expand Up @@ -625,7 +632,7 @@ protected void updateMaximumAllocation(SchedulerNode node, boolean add) {
} }
} }
} finally { } finally {
writeLock.unlock(); maxAllocWriteLock.unlock();
} }
} }
} }
Expand Up @@ -20,6 +20,7 @@


import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
Expand Down Expand Up @@ -279,6 +280,67 @@ public void testUpdateMaxAllocationUsesTotal() throws IOException {
} }
} }


@Test
public void testMaxAllocationAfterUpdateNodeResource() throws IOException {
final int configuredMaxVCores = 20;
final int configuredMaxMemory = 10 * 1024;
Resource configuredMaximumResource = Resource.newInstance
(configuredMaxMemory, configuredMaxVCores);

configureScheduler();
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
configuredMaxVCores);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
configuredMaxMemory);
conf.setLong(
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_SCHEDULING_WAIT_MS,
0);

MockRM rm = new MockRM(conf);
try {
rm.start();
AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm
.getResourceScheduler();
verifyMaximumResourceCapability(configuredMaximumResource, scheduler);

Resource resource1 = Resource.newInstance(2048, 5);
Resource resource2 = Resource.newInstance(4096, 10);
Resource resource3 = Resource.newInstance(512, 1);
Resource resource4 = Resource.newInstance(1024, 2);

RMNode node1 = MockNodes.newNodeInfo(
0, resource1, 1, "127.0.0.2");
scheduler.handle(new NodeAddedSchedulerEvent(node1));
RMNode node2 = MockNodes.newNodeInfo(
0, resource3, 2, "127.0.0.3");
scheduler.handle(new NodeAddedSchedulerEvent(node2));
verifyMaximumResourceCapability(resource1, scheduler);

// increase node1 resource
scheduler.updateNodeResource(node1, ResourceOption.newInstance(
resource2, 0));
verifyMaximumResourceCapability(resource2, scheduler);

// decrease node1 resource
scheduler.updateNodeResource(node1, ResourceOption.newInstance(
resource1, 0));
verifyMaximumResourceCapability(resource1, scheduler);

// increase node2 resource
scheduler.updateNodeResource(node2, ResourceOption.newInstance(
resource4, 0));
verifyMaximumResourceCapability(resource1, scheduler);

// decrease node2 resource
scheduler.updateNodeResource(node2, ResourceOption.newInstance(
resource3, 0));
verifyMaximumResourceCapability(resource1, scheduler);
} finally {
rm.stop();
}
}

private void verifyMaximumResourceCapability( private void verifyMaximumResourceCapability(
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) { Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {


Expand Down

0 comments on commit 7882bc0

Please sign in to comment.