From 35d57a2d2989024884f7e63906e94e826a2c3801 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 23 Aug 2017 15:56:17 -0700 Subject: [PATCH 1/3] [SPARK-18769][yarn] Limit resource requests based on RM's available resources. This change limits the number of resource requests Spark will make to the YARN RM based on the number of available resources the RM advertises. This lets Spark "play nice" by reducing load on the RM when making requests, and also slightly speeding up the allocation code when the driver thinks it needs a lot of containers but the RM doesn't have enough free resources. --- .../spark/deploy/yarn/YarnAllocator.scala | 70 +++++++++--- .../deploy/yarn/YarnAllocatorSuite.scala | 100 +++++++++++++++++- 2 files changed, 154 insertions(+), 16 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 7052fb347106b..ae667bf3662b1 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -111,6 +111,11 @@ private[yarn] class YarnAllocator( @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf) + // The number of containers that we think the RM is able to launch given its available + // resources. -1 means the number is not known, so it should be ignored until it becomes + // known. + private var rmAvailableContainers = -1 + private var currentNodeBlacklist = Set.empty[String] // Executor loss reason requests that are pending - maps from executor ID for inquiry to a @@ -268,6 +273,8 @@ private[yarn] class YarnAllocator( val allocatedContainers = allocateResponse.getAllocatedContainers() + val availableResources = allocateResponse.getAvailableResources() + if (allocatedContainers.size > 0) { logDebug(("Allocated containers: %d. Current executor count: %d. " + "Launching executor count: %d. Cluster resources: %s.") @@ -275,7 +282,7 @@ private[yarn] class YarnAllocator( allocatedContainers.size, numExecutorsRunning.get, numExecutorsStarting.get, - allocateResponse.getAvailableResources)) + availableResources)) handleAllocatedContainers(allocatedContainers.asScala) } @@ -287,6 +294,19 @@ private[yarn] class YarnAllocator( logDebug("Finished processing %d completed containers. Current running executor count: %d." .format(completedContainers.size, numExecutorsRunning.get)) } + + // Adjust the number of available containers by checking the RM response. This is a best guess, + // since the data returned isn't fine grained enough; for example you may have many GB of memory + // available but that is the sum of all NMs, it doesn't necessarily mean you can launch a + // container of that size. + // + // This also assumes that all executors are of the same size, which is the only possible + // configuration currently in Spark. + val availableByCores = availableResources.getVirtualCores() / resource.getVirtualCores() + val availableByMemory = availableResources.getMemory() / resource.getMemory() + rmAvailableContainers = math.min(availableByCores, availableByMemory) + logDebug(s"Allocation done, RM can still launch $rmAvailableContainers executors (of size " + + s"$resource) if needed.") } /** @@ -300,12 +320,18 @@ private[yarn] class YarnAllocator( val numPendingAllocate = pendingAllocate.size val missing = targetNumExecutors - numPendingAllocate - numExecutorsStarting.get - numExecutorsRunning.get + + val numberOfRequests = if (rmAvailableContainers > 0) { + math.min(missing, rmAvailableContainers) + } else { + missing + } logDebug(s"Updating resource requests, target: $targetNumExecutors, " + s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " + - s"executorsStarting: ${numExecutorsStarting.get}") + s"executorsStarting: ${numExecutorsStarting.get}, available: $rmAvailableContainers") - if (missing > 0) { - logInfo(s"Will request $missing executor container(s), each with " + + if (numberOfRequests > 0) { + logInfo(s"Will request $numberOfRequests executor container(s), each with " + s"${resource.getVirtualCores} core(s) and " + s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)") @@ -328,7 +354,7 @@ private[yarn] class YarnAllocator( } // consider the number of new containers and cancelled stale containers available - val availableContainers = missing + cancelledContainers + val availableContainers = numberOfRequests + cancelledContainers // to maximize locality, include requests with no locality preference that can be cancelled val potentialContainers = availableContainers + anyHostRequests.size @@ -373,17 +399,31 @@ private[yarn] class YarnAllocator( logInfo(s"Submitted container request for host ${hostStr(request)}.") } } - } else if (numPendingAllocate > 0 && missing < 0) { - val numToCancel = math.min(numPendingAllocate, -missing) - logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " + - s"total $targetNumExecutors executors.") - - val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource) - if (!matchingRequests.isEmpty) { - matchingRequests.iterator().next().asScala - .take(numToCancel).foreach(amClient.removeContainerRequest) + } else if (numPendingAllocate > 0) { + // There are two cases where we may need to reduce the number of requests: + // - when the driver has reduced the number of desired executors (i.e. missing < 0) + // - when the current pending requests exceed the number that YARN can allocate at the + // moment. + val unwanted = if (missing < 0) math.min(numPendingAllocate, -missing) else 0 + val unavailable = if (rmAvailableContainers > 0) { + math.max(numPendingAllocate - rmAvailableContainers, 0) } else { - logWarning("Expected to find pending requests, but found none.") + 0 + } + val numToCancel = math.max(unwanted, unavailable) + + if (numToCancel > 0) { + logInfo(s"Canceling requests for $numToCancel executor container(s); " + + s"desired target = $targetNumExecutors, " + + s"available = $rmAvailableContainers.") + + val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource) + if (!matchingRequests.isEmpty) { + matchingRequests.iterator().next().asScala + .take(numToCancel).foreach(amClient.removeContainerRequest) + } else { + logWarning("Expected to find pending requests, but found none.") + } } } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index cb1e3c5268510..a5a04161865f0 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -17,14 +17,20 @@ package org.apache.spark.deploy.yarn +import java.util.Collections + import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.mockito.Matchers._ import org.mockito.Mockito._ +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer import org.scalatest.{BeforeAndAfterEach, Matchers} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} @@ -88,7 +94,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter "--jar", "somejar.jar", "--class", "SomeClass") val sparkConfClone = sparkConf.clone() - sparkConfClone .set("spark.executor.instances", maxExecutors.toString) .set("spark.executor.cores", "5") .set("spark.executor.memory", "2048") @@ -350,4 +355,97 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter clock.advance(50 * 1000L) handler.getNumExecutorsFailed should be (0) } + + test("SPARK-18769: limit requests according to RM's available resources") { + // These match the configuration in createAllocator() + val containerCpu = 5 + val containerMem = (2048 + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN).toInt + + // Available resources returned by the mock client for the test. + val normal = Resource.newInstance(containerMem * 10, containerCpu * 10) + val lowMem = Resource.newInstance(containerMem * 2, containerCpu * 5) + val lowCpu = Resource.newInstance(containerMem * 5, containerCpu * 1) + + val client = mock(classOf[AMRMClient[ContainerRequest]]) + val response = mock(classOf[AllocateResponse]) + when(response.getAllocatedContainers()).thenReturn(Collections.emptyList(), + Collections.emptyList()) + when(response.getAvailableResources()).thenReturn(normal) + when(client.allocate(anyFloat())).thenReturn(response) + + // Keep track of how many container requests were added to the client. The request count + // needs to be reset to 0 after an allocation request, since the allocator only adds requests + // and rely on the AMRMClient to clean up state internally. + var requestCount = 0 + when(client.addContainerRequest(any(classOf[ContainerRequest]))).thenAnswer( + new Answer[Unit]() { + override def answer(unused: InvocationOnMock): Unit = { + requestCount += 1 + } + } + ) + + val allocator = createAllocator(0, client) + + // First allocation should not create any requests. + allocator.allocateResources() + assert(requestCount === 0) + + // Request 2 executors. + allocator.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set()) + allocator.allocateResources() + assert(requestCount === 2) + requestCount = 0 + + // Switch to "low memory" resources. + when(response.getAvailableResources()).thenReturn(lowMem) + allocator.allocateResources() + requestCount = 0 + + // Try to allocate a new container, verify that only 2 requests remain since that's what + // "lowMem" supports. + allocator.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set()) + allocator.allocateResources() + assert(requestCount === 2) + requestCount = 0 + + // Switch to "low cpu" resources. + when(response.getAvailableResources()).thenReturn(lowCpu) + allocator.allocateResources() + requestCount = 0 + + // This will cause the number of requests to fall to 1, since that's all "lowCpu" allows to run. + allocator.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set()) + allocator.allocateResources() + assert(requestCount === 1) + requestCount = 0 + + // Switch back to normal. + when(response.getAvailableResources()).thenReturn(normal) + allocator.allocateResources() + requestCount = 0 + + allocator.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set()) + allocator.allocateResources() + assert(requestCount === 3) + + // Switch bach to low CPU, and mock some state so that there are a few pending allocation + // requests. This should cause these requests to be removed. + when(response.getAvailableResources()).thenReturn(lowCpu) + allocator.allocateResources() + requestCount = 0 + + val pending = (1 to 10).map { i => + val res = Resource.newInstance(containerMem, containerCpu) + new ContainerRequest(res, Array("*"), null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) + }.toList.asJava + doReturn(List(pending).asJava) + .when(client) + .getMatchingRequests(any(classOf[Priority]), anyString(), any(classOf[Resource])) + + allocator.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set()) + allocator.allocateResources() + verify(client, times(9)).removeContainerRequest(any(classOf[ContainerRequest])) + } + } From c356734be82146e258b275d15926da82cd4c359f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 24 Aug 2017 15:38:44 -0700 Subject: [PATCH 2/3] Fix an edge case. --- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ae667bf3662b1..9191f3c297f3a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -321,7 +321,7 @@ private[yarn] class YarnAllocator( val missing = targetNumExecutors - numPendingAllocate - numExecutorsStarting.get - numExecutorsRunning.get - val numberOfRequests = if (rmAvailableContainers > 0) { + val numberOfRequests = if (rmAvailableContainers >= 0) { math.min(missing, rmAvailableContainers) } else { missing diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index a5a04161865f0..15da0b6f1a096 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -365,6 +365,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter val normal = Resource.newInstance(containerMem * 10, containerCpu * 10) val lowMem = Resource.newInstance(containerMem * 2, containerCpu * 5) val lowCpu = Resource.newInstance(containerMem * 5, containerCpu * 1) + val empty = Resource.newInstance(0, 0) val client = mock(classOf[AMRMClient[ContainerRequest]]) val response = mock(classOf[AllocateResponse]) @@ -420,6 +421,16 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter assert(requestCount === 1) requestCount = 0 + // Switch to empty. + when(response.getAvailableResources()).thenReturn(empty) + allocator.allocateResources() + requestCount = 0 + + allocator.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set()) + allocator.allocateResources() + assert(requestCount === 0) + requestCount = 0 + // Switch back to normal. when(response.getAvailableResources()).thenReturn(normal) allocator.allocateResources() From 03477f5282ba00b40915bae32dc5bd48c946e27c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 24 Aug 2017 17:54:16 -0700 Subject: [PATCH 3/3] Style. --- .../org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 15da0b6f1a096..d34dbdcb2a232 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -446,9 +446,10 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter allocator.allocateResources() requestCount = 0 - val pending = (1 to 10).map { i => + val pending = (1 to 10).map { _ => val res = Resource.newInstance(containerMem, containerCpu) - new ContainerRequest(res, Array("*"), null, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) + new ContainerRequest(res, Array(YarnSparkHadoopUtil.ANY_HOST), null, + YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) }.toList.asJava doReturn(List(pending).asJava) .when(client)