Skip to content

Commit

Permalink
YARN-7670. Modifications to the ResourceScheduler API to support Sche…
Browse files Browse the repository at this point in the history
…dulingRequests. (asuresh)
  • Loading branch information
xslogic committed Jan 31, 2018
1 parent 801c098 commit 88d8d3f
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 23 deletions.
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
Expand Down Expand Up @@ -295,6 +296,10 @@ public boolean accept(SchedulerNode node) {
return nodeTracker.getNodes(nodeFilter); return nodeTracker.getNodes(nodeFilter);
} }


public List<N> getNodes(final NodeFilter filter) {
return nodeTracker.getNodes(filter);
}

public boolean shouldContainersBeAutoUpdated() { public boolean shouldContainersBeAutoUpdated() {
return this.autoUpdateContainers; return this.autoUpdateContainers;
} }
Expand Down Expand Up @@ -1443,4 +1448,17 @@ public void reinitialize(Configuration conf, RMContext rmContext)
throw new IOException(e); throw new IOException(e);
} }
} }

/**
* Default implementation. Always returns false.
* @param appAttempt ApplicationAttempt.
* @param schedulingRequest SchedulingRequest.
* @param schedulerNode SchedulerNode.
* @return Success or not.
*/
@Override
public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
return false;
}
} }
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;


Expand Down Expand Up @@ -58,4 +59,16 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable {
* @return the number of available {@link NodeId} by resource name. * @return the number of available {@link NodeId} by resource name.
*/ */
List<NodeId> getNodeIds(String resourceName); List<NodeId> getNodeIds(String resourceName);

/**
* Attempts to allocate a SchedulerRequest on a Node.
* NOTE: This ignores the numAllocations in the resource sizing and tries
* to allocate a SINGLE container only.
* @param appAttempt ApplicationAttempt.
* @param schedulingRequest SchedulingRequest.
* @param schedulerNode SchedulerNode.
* @return true if proposal was accepted.
*/
boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode);
} }
Expand Up @@ -50,6 +50,7 @@
import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
Expand All @@ -59,6 +60,7 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
Expand All @@ -82,6 +84,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
Expand All @@ -99,7 +102,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;


import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
Expand Down Expand Up @@ -141,6 +146,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
Expand Down Expand Up @@ -596,7 +603,7 @@ public void run() {


try { try {
cs.writeLock.lock(); cs.writeLock.lock();
cs.tryCommit(cs.getClusterResource(), request); cs.tryCommit(cs.getClusterResource(), request, true);
} finally { } finally {
cs.writeLock.unlock(); cs.writeLock.unlock();
} }
Expand Down Expand Up @@ -2551,10 +2558,67 @@ public void submitResourceCommitRequest(Resource cluster,
resourceCommitterService.addNewCommitRequest(request); resourceCommitterService.addNewCommitRequest(request);
} else{ } else{
// Otherwise do it sync-ly. // Otherwise do it sync-ly.
tryCommit(cluster, request); tryCommit(cluster, request, true);
} }
} }


@Override
public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
if (schedulingRequest.getResourceSizing() != null) {
if (schedulingRequest.getResourceSizing().getNumAllocations() > 1) {
LOG.warn("The SchedulingRequest has requested more than 1 allocation," +
" but only 1 will be attempted !!");
}
if (!appAttempt.isStopped()) {
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
resourceCommitRequest = createResourceCommitRequest(
appAttempt, schedulingRequest, schedulerNode);
return tryCommit(getClusterResource(), resourceCommitRequest, false);
}
}
return false;
}

// This assumes numContainers = 1 for the request.
private ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
createResourceCommitRequest(SchedulerApplicationAttempt appAttempt,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) {
ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocated =
null;
Resource resource = schedulingRequest.getResourceSizing().getResources();
if (Resources.greaterThan(calculator, getClusterResource(),
resource, Resources.none())) {
ContainerId cId =
ContainerId.newContainerId(appAttempt.getApplicationAttemptId(),
appAttempt.getAppSchedulingInfo().getNewContainerId());
Container container = BuilderUtils.newContainer(
cId, schedulerNode.getNodeID(), schedulerNode.getHttpAddress(),
resource, schedulingRequest.getPriority(), null,
ExecutionType.GUARANTEED,
schedulingRequest.getAllocationRequestId());
RMContainer rmContainer = new RMContainerImpl(container,
SchedulerRequestKey.extractFrom(container),
appAttempt.getApplicationAttemptId(), container.getNodeId(),
appAttempt.getUser(), rmContext, false);

allocated = new ContainerAllocationProposal<>(
getSchedulerContainer(rmContainer, true),
null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
resource);
}

if (null != allocated) {
List<ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>>
allocationsList = new ArrayList<>();
allocationsList.add(allocated);

return new ResourceCommitRequest<>(allocationsList, null, null);
}
return null;
}

@VisibleForTesting @VisibleForTesting
public ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> public ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
createResourceCommitRequest(CSAssignment csAssignment) { createResourceCommitRequest(CSAssignment csAssignment) {
Expand Down Expand Up @@ -2632,7 +2696,8 @@ public void submitResourceCommitRequest(Resource cluster,
} }


@Override @Override
public void tryCommit(Resource cluster, ResourceCommitRequest r) { public boolean tryCommit(Resource cluster, ResourceCommitRequest r,
boolean updatePending) {
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request = ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request =
(ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r; (ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>) r;


Expand Down Expand Up @@ -2662,15 +2727,17 @@ public void tryCommit(Resource cluster, ResourceCommitRequest r) {
LOG.debug("Try to commit allocation proposal=" + request); LOG.debug("Try to commit allocation proposal=" + request);
} }


boolean isSuccess = false;
if (attemptId != null) { if (attemptId != null) {
FiCaSchedulerApp app = getApplicationAttempt(attemptId); FiCaSchedulerApp app = getApplicationAttempt(attemptId);
// Required sanity check for attemptId - when async-scheduling enabled, // Required sanity check for attemptId - when async-scheduling enabled,
// proposal might be outdated if AM failover just finished // proposal might be outdated if AM failover just finished
// and proposal queue was not be consumed in time // and proposal queue was not be consumed in time
if (app != null && attemptId.equals(app.getApplicationAttemptId())) { if (app != null && attemptId.equals(app.getApplicationAttemptId())) {
if (app.accept(cluster, request)) { if (app.accept(cluster, request, updatePending)) {
app.apply(cluster, request); app.apply(cluster, request, updatePending);
LOG.info("Allocation proposal accepted"); LOG.info("Allocation proposal accepted");
isSuccess = true;
} else{ } else{
LOG.info("Failed to accept allocation proposal"); LOG.info("Failed to accept allocation proposal");
} }
Expand All @@ -2681,6 +2748,7 @@ public void tryCommit(Resource cluster, ResourceCommitRequest r) {
} }
} }
} }
return isSuccess;
} }


public int getAsyncSchedulingPendingBacklogs() { public int getAsyncSchedulingPendingBacklogs() {
Expand Down
Expand Up @@ -25,5 +25,15 @@
* plus global scheduling functionality * plus global scheduling functionality
*/ */
public interface ResourceAllocationCommitter { public interface ResourceAllocationCommitter {
void tryCommit(Resource cluster, ResourceCommitRequest proposal);
/**
* Try to commit the allocation Proposal. This also gives the option of
* not updating a pending queued request.
* @param cluster Cluster Resource.
* @param proposal Proposal.
* @param updatePending Decrement pending if successful.
* @return Is successful or not.
*/
boolean tryCommit(Resource cluster, ResourceCommitRequest proposal,
boolean updatePending);
} }
Expand Up @@ -375,7 +375,8 @@ private boolean commonCheckContainerAllocation(
} }


public boolean accept(Resource cluster, public boolean accept(Resource cluster,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request,
boolean checkPending) {
ContainerRequest containerRequest = null; ContainerRequest containerRequest = null;
boolean reReservation = false; boolean reReservation = false;


Expand Down Expand Up @@ -408,9 +409,11 @@ public boolean accept(Resource cluster,
schedulerContainer.getRmContainer().getContainerRequest(); schedulerContainer.getRmContainer().getContainerRequest();


// Check pending resource request // Check pending resource request
if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(), if (checkPending &&
schedulerContainer.getSchedulerNode(), !appSchedulingInfo.checkAllocation(
schedulerContainer.getSchedulerRequestKey())) { allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey())) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("No pending resource for: nodeType=" + allocation LOG.debug("No pending resource for: nodeType=" + allocation
.getAllocationLocalityType() + ", node=" + schedulerContainer .getAllocationLocalityType() + ", node=" + schedulerContainer
Expand Down Expand Up @@ -485,8 +488,8 @@ public boolean accept(Resource cluster,
return accepted; return accepted;
} }


public void apply(Resource cluster, public void apply(Resource cluster, ResourceCommitRequest<FiCaSchedulerApp,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) { FiCaSchedulerNode> request, boolean updatePending) {
boolean reReservation = false; boolean reReservation = false;


try { try {
Expand Down Expand Up @@ -531,12 +534,15 @@ public void apply(Resource cluster,
liveContainers.put(containerId, rmContainer); liveContainers.put(containerId, rmContainer);


// Deduct pending resource requests // Deduct pending resource requests
ContainerRequest containerRequest = appSchedulingInfo.allocate( if (updatePending) {
allocation.getAllocationLocalityType(), ContainerRequest containerRequest = appSchedulingInfo.allocate(
schedulerContainer.getSchedulerNode(), allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerRequestKey(), schedulerContainer.getSchedulerNode(),
schedulerContainer.getRmContainer().getContainer()); schedulerContainer.getSchedulerRequestKey(),
((RMContainerImpl) rmContainer).setContainerRequest(containerRequest); schedulerContainer.getRmContainer().getContainer());
((RMContainerImpl) rmContainer).setContainerRequest(
containerRequest);
}


attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),
allocation.getAllocatedOrReservedResource()); allocation.getAllocatedOrReservedResource());
Expand Down
Expand Up @@ -264,7 +264,7 @@ public void testCommitProposalForFailedAppAttempt()
reservedProposals.add(reservedForAttempt1Proposal); reservedProposals.add(reservedForAttempt1Proposal);
ResourceCommitRequest request = ResourceCommitRequest request =
new ResourceCommitRequest(null, reservedProposals, null); new ResourceCommitRequest(null, reservedProposals, null);
scheduler.tryCommit(scheduler.getClusterResource(), request); scheduler.tryCommit(scheduler.getClusterResource(), request, true);
Assert.assertNull("Outdated proposal should not be accepted!", Assert.assertNull("Outdated proposal should not be accepted!",
sn2.getReservedContainer()); sn2.getReservedContainer());


Expand Down Expand Up @@ -385,20 +385,20 @@ public Object answer(InvocationOnMock invocation) throws Exception {
// call real apply // call real apply
try { try {
cs.tryCommit((Resource) invocation.getArguments()[0], cs.tryCommit((Resource) invocation.getArguments()[0],
(ResourceCommitRequest) invocation.getArguments()[1]); (ResourceCommitRequest) invocation.getArguments()[1], true);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
Assert.fail(); Assert.fail();
} }
isChecked.set(true); isChecked.set(true);
} else { } else {
cs.tryCommit((Resource) invocation.getArguments()[0], cs.tryCommit((Resource) invocation.getArguments()[0],
(ResourceCommitRequest) invocation.getArguments()[1]); (ResourceCommitRequest) invocation.getArguments()[1], true);
} }
return null; return null;
} }
}).when(spyCs).tryCommit(Mockito.any(Resource.class), }).when(spyCs).tryCommit(Mockito.any(Resource.class),
Mockito.any(ResourceCommitRequest.class)); Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean());


spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));


Expand Down Expand Up @@ -473,7 +473,7 @@ public void testNodeResourceOverAllocated()
newProposals.add(newContainerProposal); newProposals.add(newContainerProposal);
ResourceCommitRequest request = ResourceCommitRequest request =
new ResourceCommitRequest(newProposals, null, null); new ResourceCommitRequest(newProposals, null, null);
scheduler.tryCommit(scheduler.getClusterResource(), request); scheduler.tryCommit(scheduler.getClusterResource(), request, true);
} }
// make sure node resource can't be over-allocated! // make sure node resource can't be over-allocated!
Assert.assertTrue("Node resource is Over-allocated!", Assert.assertTrue("Node resource is Over-allocated!",
Expand Down

0 comments on commit 88d8d3f

Please sign in to comment.