diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 4b7632741cc0b..213d784185946 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -295,6 +296,10 @@ public boolean accept(SchedulerNode node) { return nodeTracker.getNodes(nodeFilter); } + public List getNodes(final NodeFilter filter) { + return nodeTracker.getNodes(filter); + } + public boolean shouldContainersBeAutoUpdated() { return this.autoUpdateContainers; } @@ -1443,4 +1448,17 @@ public void reinitialize(Configuration conf, RMContext rmContext) throw new IOException(e); } } + + /** + * Default implementation. Always returns false. + * @param appAttempt ApplicationAttempt. + * @param schedulingRequest SchedulingRequest. + * @param schedulerNode SchedulerNode. + * @return Success or not. + */ + @Override + public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt, + SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) { + return false; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java index d96d62545c85d..5a56ac7eec252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceScheduler.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; @@ -58,4 +59,16 @@ public interface ResourceScheduler extends YarnScheduler, Recoverable { * @return the number of available {@link NodeId} by resource name. */ List getNodeIds(String resourceName); + + /** + * Attempts to allocate a SchedulerRequest on a Node. + * NOTE: This ignores the numAllocations in the resource sizing and tries + * to allocate a SINGLE container only. + * @param appAttempt ApplicationAttempt. + * @param schedulingRequest SchedulingRequest. + * @param schedulerNode SchedulerNode. + * @return true if proposal was accepted. + */ + boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt, + SchedulingRequest schedulingRequest, SchedulerNode schedulerNode); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 03ca507bd6f39..676c0fe4d60f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -82,6 +84,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -99,7 +102,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; @@ -141,6 +146,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -596,7 +603,7 @@ public void run() { try { cs.writeLock.lock(); - cs.tryCommit(cs.getClusterResource(), request); + cs.tryCommit(cs.getClusterResource(), request, true); } finally { cs.writeLock.unlock(); } @@ -2551,10 +2558,67 @@ public void submitResourceCommitRequest(Resource cluster, resourceCommitterService.addNewCommitRequest(request); } else{ // Otherwise do it sync-ly. - tryCommit(cluster, request); + tryCommit(cluster, request, true); } } + @Override + public boolean attemptAllocationOnNode(SchedulerApplicationAttempt appAttempt, + SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) { + if (schedulingRequest.getResourceSizing() != null) { + if (schedulingRequest.getResourceSizing().getNumAllocations() > 1) { + LOG.warn("The SchedulingRequest has requested more than 1 allocation," + + " but only 1 will be attempted !!"); + } + if (!appAttempt.isStopped()) { + ResourceCommitRequest + resourceCommitRequest = createResourceCommitRequest( + appAttempt, schedulingRequest, schedulerNode); + return tryCommit(getClusterResource(), resourceCommitRequest, false); + } + } + return false; + } + + // This assumes numContainers = 1 for the request. + private ResourceCommitRequest + createResourceCommitRequest(SchedulerApplicationAttempt appAttempt, + SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) { + ContainerAllocationProposal allocated = + null; + Resource resource = schedulingRequest.getResourceSizing().getResources(); + if (Resources.greaterThan(calculator, getClusterResource(), + resource, Resources.none())) { + ContainerId cId = + ContainerId.newContainerId(appAttempt.getApplicationAttemptId(), + appAttempt.getAppSchedulingInfo().getNewContainerId()); + Container container = BuilderUtils.newContainer( + cId, schedulerNode.getNodeID(), schedulerNode.getHttpAddress(), + resource, schedulingRequest.getPriority(), null, + ExecutionType.GUARANTEED, + schedulingRequest.getAllocationRequestId()); + RMContainer rmContainer = new RMContainerImpl(container, + SchedulerRequestKey.extractFrom(container), + appAttempt.getApplicationAttemptId(), container.getNodeId(), + appAttempt.getUser(), rmContext, false); + + allocated = new ContainerAllocationProposal<>( + getSchedulerContainer(rmContainer, true), + null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + resource); + } + + if (null != allocated) { + List> + allocationsList = new ArrayList<>(); + allocationsList.add(allocated); + + return new ResourceCommitRequest<>(allocationsList, null, null); + } + return null; + } + @VisibleForTesting public ResourceCommitRequest createResourceCommitRequest(CSAssignment csAssignment) { @@ -2632,7 +2696,8 @@ public void submitResourceCommitRequest(Resource cluster, } @Override - public void tryCommit(Resource cluster, ResourceCommitRequest r) { + public boolean tryCommit(Resource cluster, ResourceCommitRequest r, + boolean updatePending) { ResourceCommitRequest request = (ResourceCommitRequest) r; @@ -2662,15 +2727,17 @@ public void tryCommit(Resource cluster, ResourceCommitRequest r) { LOG.debug("Try to commit allocation proposal=" + request); } + boolean isSuccess = false; if (attemptId != null) { FiCaSchedulerApp app = getApplicationAttempt(attemptId); // Required sanity check for attemptId - when async-scheduling enabled, // proposal might be outdated if AM failover just finished // and proposal queue was not be consumed in time if (app != null && attemptId.equals(app.getApplicationAttemptId())) { - if (app.accept(cluster, request)) { - app.apply(cluster, request); + if (app.accept(cluster, request, updatePending)) { + app.apply(cluster, request, updatePending); LOG.info("Allocation proposal accepted"); + isSuccess = true; } else{ LOG.info("Failed to accept allocation proposal"); } @@ -2681,6 +2748,7 @@ public void tryCommit(Resource cluster, ResourceCommitRequest r) { } } } + return isSuccess; } public int getAsyncSchedulingPendingBacklogs() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java index bdea97db13479..2e36b2e951493 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/ResourceAllocationCommitter.java @@ -25,5 +25,15 @@ * plus global scheduling functionality */ public interface ResourceAllocationCommitter { - void tryCommit(Resource cluster, ResourceCommitRequest proposal); + + /** + * Try to commit the allocation Proposal. This also gives the option of + * not updating a pending queued request. + * @param cluster Cluster Resource. + * @param proposal Proposal. + * @param updatePending Decrement pending if successful. + * @return Is successful or not. + */ + boolean tryCommit(Resource cluster, ResourceCommitRequest proposal, + boolean updatePending); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index d6ad2922769c0..4ea03470413c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -375,7 +375,8 @@ private boolean commonCheckContainerAllocation( } public boolean accept(Resource cluster, - ResourceCommitRequest request) { + ResourceCommitRequest request, + boolean checkPending) { ContainerRequest containerRequest = null; boolean reReservation = false; @@ -408,9 +409,11 @@ public boolean accept(Resource cluster, schedulerContainer.getRmContainer().getContainerRequest(); // Check pending resource request - if (!appSchedulingInfo.checkAllocation(allocation.getAllocationLocalityType(), - schedulerContainer.getSchedulerNode(), - schedulerContainer.getSchedulerRequestKey())) { + if (checkPending && + !appSchedulingInfo.checkAllocation( + allocation.getAllocationLocalityType(), + schedulerContainer.getSchedulerNode(), + schedulerContainer.getSchedulerRequestKey())) { if (LOG.isDebugEnabled()) { LOG.debug("No pending resource for: nodeType=" + allocation .getAllocationLocalityType() + ", node=" + schedulerContainer @@ -485,8 +488,8 @@ public boolean accept(Resource cluster, return accepted; } - public void apply(Resource cluster, - ResourceCommitRequest request) { + public void apply(Resource cluster, ResourceCommitRequest request, boolean updatePending) { boolean reReservation = false; try { @@ -531,12 +534,15 @@ public void apply(Resource cluster, liveContainers.put(containerId, rmContainer); // Deduct pending resource requests - ContainerRequest containerRequest = appSchedulingInfo.allocate( - allocation.getAllocationLocalityType(), - schedulerContainer.getSchedulerNode(), - schedulerContainer.getSchedulerRequestKey(), - schedulerContainer.getRmContainer().getContainer()); - ((RMContainerImpl) rmContainer).setContainerRequest(containerRequest); + if (updatePending) { + ContainerRequest containerRequest = appSchedulingInfo.allocate( + allocation.getAllocationLocalityType(), + schedulerContainer.getSchedulerNode(), + schedulerContainer.getSchedulerRequestKey(), + schedulerContainer.getRmContainer().getContainer()); + ((RMContainerImpl) rmContainer).setContainerRequest( + containerRequest); + } attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(), allocation.getAllocatedOrReservedResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java index 548b909d2332b..eddf8c8c829da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java @@ -264,7 +264,7 @@ public void testCommitProposalForFailedAppAttempt() reservedProposals.add(reservedForAttempt1Proposal); ResourceCommitRequest request = new ResourceCommitRequest(null, reservedProposals, null); - scheduler.tryCommit(scheduler.getClusterResource(), request); + scheduler.tryCommit(scheduler.getClusterResource(), request, true); Assert.assertNull("Outdated proposal should not be accepted!", sn2.getReservedContainer()); @@ -385,7 +385,7 @@ public Object answer(InvocationOnMock invocation) throws Exception { // call real apply try { cs.tryCommit((Resource) invocation.getArguments()[0], - (ResourceCommitRequest) invocation.getArguments()[1]); + (ResourceCommitRequest) invocation.getArguments()[1], true); } catch (Exception e) { e.printStackTrace(); Assert.fail(); @@ -393,12 +393,12 @@ public Object answer(InvocationOnMock invocation) throws Exception { isChecked.set(true); } else { cs.tryCommit((Resource) invocation.getArguments()[0], - (ResourceCommitRequest) invocation.getArguments()[1]); + (ResourceCommitRequest) invocation.getArguments()[1], true); } return null; } }).when(spyCs).tryCommit(Mockito.any(Resource.class), - Mockito.any(ResourceCommitRequest.class)); + Mockito.any(ResourceCommitRequest.class), Mockito.anyBoolean()); spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode())); @@ -473,7 +473,7 @@ public void testNodeResourceOverAllocated() newProposals.add(newContainerProposal); ResourceCommitRequest request = new ResourceCommitRequest(newProposals, null, null); - scheduler.tryCommit(scheduler.getClusterResource(), request); + scheduler.tryCommit(scheduler.getClusterResource(), request, true); } // make sure node resource can't be over-allocated! Assert.assertTrue("Node resource is Over-allocated!",