From 4749908356aaf6d8999be45785e7841c3beeda43 Mon Sep 17 00:00:00 2001 From: lianhuiwang Date: Thu, 13 Nov 2014 22:12:31 +0800 Subject: [PATCH] amclient-remove-request --- .../spark/deploy/yarn/YarnAllocationHandler.scala | 3 +++ .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 4 ++++ .../spark/deploy/yarn/YarnAllocationHandler.scala | 12 ++++++++++++ 3 files changed, 19 insertions(+) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index abd37834ed3cc..4d28f1b6ae180 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -125,6 +125,9 @@ private[yarn] class YarnAllocationHandler( releaseList.add(container.getId()) } + override protected def removeContainerRequest() = { + } + private def createRackResourceRequests(hostContainers: List[ResourceRequest]): List[ResourceRequest] = { // First generate modified racks and new set of hosts under it : then issue requests diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b32e15738f28b..4dc0752596596 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -301,6 +301,7 @@ private[yarn] abstract class YarnAllocator( val executorMemoryOverhead = (executorMemory + memoryOverhead) assert(container.getResource.getMemory >= executorMemoryOverhead) + removeContainerRequest() if (numExecutorsRunningNow > maxExecutors) { logInfo("""Ignoring container %s at host %s, since we already have the required number of @@ -506,6 +507,9 @@ private[yarn] abstract class YarnAllocator( /** Called to release a previously allocated container. */ protected def releaseContainer(container: Container): Unit + /** Called to remove container request. */ + protected def removeContainerRequest(): Unit + /** * Defines the interface for an allocate response from the RM. This is needed since the alpha * and stable interfaces differ here in ways that cannot be fixed using other routes. diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 2bbf5d7db8668..d7754dcdf7d74 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.util.Records +import java.util.concurrent.LinkedBlockingQueue /** * Acquires resources for executors from a ResourceManager and launches executors in new containers. @@ -43,10 +44,20 @@ private[yarn] class YarnAllocationHandler( securityMgr: SecurityManager) extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) { + + // ContainerRequests which have been added to AMRMClient. + private val containerRequestList = new LinkedBlockingQueue[ContainerRequest]() + override protected def releaseContainer(container: Container) = { amClient.releaseAssignedContainer(container.getId()) } + override protected def removeContainerRequest() = { + if(!containerRequestList.isEmpty()){ + amClient.removeContainerRequest(containerRequestList.take()) + } + } + // pending isn't used on stable as the AMRMClient handles incremental asks override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = { addResourceRequests(count) @@ -133,6 +144,7 @@ private[yarn] class YarnAllocationHandler( } for (request <- containerRequests) { + containerRequestList.put(request) amClient.addContainerRequest(request) }