Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ private[yarn] class YarnAllocator(
@volatile private var targetNumExecutors =
YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)

// The number of containers that we think the RM is able to launch given its available
// resources. -1 means the number is not known, so it should be ignored until it becomes
// known.
private var rmAvailableContainers = -1

private var currentNodeBlacklist = Set.empty[String]

// Executor loss reason requests that are pending - maps from executor ID for inquiry to a
Expand Down Expand Up @@ -268,14 +273,16 @@ private[yarn] class YarnAllocator(

val allocatedContainers = allocateResponse.getAllocatedContainers()

val availableResources = allocateResponse.getAvailableResources()

if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
numExecutorsRunning.get,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))
availableResources))

handleAllocatedContainers(allocatedContainers.asScala)
}
Expand All @@ -287,6 +294,19 @@ private[yarn] class YarnAllocator(
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning.get))
}

// Adjust the number of available containers by checking the RM response. This is a best guess,
// since the data returned isn't fine grained enough; for example you may have many GB of memory
// available but that is the sum of all NMs, it doesn't necessarily mean you can launch a
// container of that size.
//
// This also assumes that all executors are of the same size, which is the only possible
// configuration currently in Spark.
val availableByCores = availableResources.getVirtualCores() / resource.getVirtualCores()
val availableByMemory = availableResources.getMemory() / resource.getMemory()
rmAvailableContainers = math.min(availableByCores, availableByMemory)
logDebug(s"Allocation done, RM can still launch $rmAvailableContainers executors (of size " +
s"$resource) if needed.")
}

/**
Expand All @@ -300,12 +320,18 @@ private[yarn] class YarnAllocator(
val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - numExecutorsRunning.get

val numberOfRequests = if (rmAvailableContainers >= 0) {
math.min(missing, rmAvailableContainers)
} else {
missing
}
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
s"executorsStarting: ${numExecutorsStarting.get}")
s"executorsStarting: ${numExecutorsStarting.get}, available: $rmAvailableContainers")

if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
if (numberOfRequests > 0) {
logInfo(s"Will request $numberOfRequests executor container(s), each with " +
s"${resource.getVirtualCores} core(s) and " +
s"${resource.getMemory} MB memory (including $memoryOverhead MB of overhead)")

Expand All @@ -328,7 +354,7 @@ private[yarn] class YarnAllocator(
}

// consider the number of new containers and cancelled stale containers available
val availableContainers = missing + cancelledContainers
val availableContainers = numberOfRequests + cancelledContainers

// to maximize locality, include requests with no locality preference that can be cancelled
val potentialContainers = availableContainers + anyHostRequests.size
Expand Down Expand Up @@ -373,17 +399,31 @@ private[yarn] class YarnAllocator(
logInfo(s"Submitted container request for host ${hostStr(request)}.")
}
}
} else if (numPendingAllocate > 0 && missing < 0) {
val numToCancel = math.min(numPendingAllocate, -missing)
logInfo(s"Canceling requests for $numToCancel executor container(s) to have a new desired " +
s"total $targetNumExecutors executors.")

val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.iterator().next().asScala
.take(numToCancel).foreach(amClient.removeContainerRequest)
} else if (numPendingAllocate > 0) {
// There are two cases where we may need to reduce the number of requests:
// - when the driver has reduced the number of desired executors (i.e. missing < 0)
// - when the current pending requests exceed the number that YARN can allocate at the
// moment.
val unwanted = if (missing < 0) math.min(numPendingAllocate, -missing) else 0
val unavailable = if (rmAvailableContainers > 0) {
math.max(numPendingAllocate - rmAvailableContainers, 0)
} else {
logWarning("Expected to find pending requests, but found none.")
0
}
val numToCancel = math.max(unwanted, unavailable)

if (numToCancel > 0) {
logInfo(s"Canceling requests for $numToCancel executor container(s); " +
s"desired target = $targetNumExecutors, " +
s"available = $rmAvailableContainers.")

val matchingRequests = amClient.getMatchingRequests(RM_REQUEST_PRIORITY, ANY_HOST, resource)
if (!matchingRequests.isEmpty) {
matchingRequests.iterator().next().asScala
.take(numToCancel).foreach(amClient.removeContainerRequest)
} else {
logWarning("Expected to find pending requests, but found none.")
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,20 @@

package org.apache.spark.deploy.yarn

import java.util.Collections

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import org.scalatest.{BeforeAndAfterEach, Matchers}

import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
Expand Down Expand Up @@ -88,7 +94,6 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
"--jar", "somejar.jar",
"--class", "SomeClass")
val sparkConfClone = sparkConf.clone()
sparkConfClone
.set("spark.executor.instances", maxExecutors.toString)
.set("spark.executor.cores", "5")
.set("spark.executor.memory", "2048")
Expand Down Expand Up @@ -350,4 +355,109 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
clock.advance(50 * 1000L)
handler.getNumExecutorsFailed should be (0)
}

test("SPARK-18769: limit requests according to RM's available resources") {
// These match the configuration in createAllocator()
val containerCpu = 5
val containerMem = (2048 + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN).toInt

// Available resources returned by the mock client for the test.
val normal = Resource.newInstance(containerMem * 10, containerCpu * 10)
val lowMem = Resource.newInstance(containerMem * 2, containerCpu * 5)
val lowCpu = Resource.newInstance(containerMem * 5, containerCpu * 1)
val empty = Resource.newInstance(0, 0)

val client = mock(classOf[AMRMClient[ContainerRequest]])
val response = mock(classOf[AllocateResponse])
when(response.getAllocatedContainers()).thenReturn(Collections.emptyList(),
Collections.emptyList())
when(response.getAvailableResources()).thenReturn(normal)
when(client.allocate(anyFloat())).thenReturn(response)

// Keep track of how many container requests were added to the client. The request count
// needs to be reset to 0 after an allocation request, since the allocator only adds requests
// and rely on the AMRMClient to clean up state internally.
var requestCount = 0
when(client.addContainerRequest(any(classOf[ContainerRequest]))).thenAnswer(
new Answer[Unit]() {
override def answer(unused: InvocationOnMock): Unit = {
requestCount += 1
}
}
)

val allocator = createAllocator(0, client)

// First allocation should not create any requests.
allocator.allocateResources()
assert(requestCount === 0)

// Request 2 executors.
allocator.requestTotalExecutorsWithPreferredLocalities(2, 0, Map(), Set())
allocator.allocateResources()
assert(requestCount === 2)
requestCount = 0

// Switch to "low memory" resources.
when(response.getAvailableResources()).thenReturn(lowMem)
allocator.allocateResources()
requestCount = 0

// Try to allocate a new container, verify that only 2 requests remain since that's what
// "lowMem" supports.
allocator.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set())
allocator.allocateResources()
assert(requestCount === 2)
requestCount = 0

// Switch to "low cpu" resources.
when(response.getAvailableResources()).thenReturn(lowCpu)
allocator.allocateResources()
requestCount = 0

// This will cause the number of requests to fall to 1, since that's all "lowCpu" allows to run.
allocator.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set())
allocator.allocateResources()
assert(requestCount === 1)
requestCount = 0

// Switch to empty.
when(response.getAvailableResources()).thenReturn(empty)
allocator.allocateResources()
requestCount = 0

allocator.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set())
allocator.allocateResources()
assert(requestCount === 0)
requestCount = 0

// Switch back to normal.
when(response.getAvailableResources()).thenReturn(normal)
allocator.allocateResources()
requestCount = 0

allocator.requestTotalExecutorsWithPreferredLocalities(3, 0, Map(), Set())
allocator.allocateResources()
assert(requestCount === 3)

// Switch bach to low CPU, and mock some state so that there are a few pending allocation
// requests. This should cause these requests to be removed.
when(response.getAvailableResources()).thenReturn(lowCpu)
allocator.allocateResources()
requestCount = 0

val pending = (1 to 10).map { _ =>
val res = Resource.newInstance(containerMem, containerCpu)
new ContainerRequest(res, Array(YarnSparkHadoopUtil.ANY_HOST), null,
YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
}.toList.asJava
doReturn(List(pending).asJava)
.when(client)
.getMatchingRequests(any(classOf[Priority]), anyString(), any(classOf[Resource]))

allocator.requestTotalExecutorsWithPreferredLocalities(1, 0, Map(), Set())
allocator.allocateResources()
verify(client, times(9)).removeContainerRequest(any(classOf[ContainerRequest]))
}

}