Skip to content

Commit

Permalink
[SPARK-21383][YARN] Fix the YarnAllocator allocates more Resource
Browse files Browse the repository at this point in the history
When NodeManagers launching Executors,
the `missing` value will exceed the
real value when the launch is slow, this can lead to YARN allocates more resource.

We add the `numExecutorsRunning` when calculate the `missing` to avoid this.

Test by experiment.

Author: DjvuLee <lihu@bytedance.com>

Closes #18651 from djvulee/YarnAllocate.

(cherry picked from commit 8de080d)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
DjvuLee authored and Marcelo Vanzin committed Jul 25, 2017
1 parent 62ca13d commit e5ec339
Showing 1 changed file with 33 additions and 17 deletions.
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn

import java.util.Collections
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern

import scala.collection.mutable
Expand All @@ -30,7 +31,6 @@ import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.log4j.{Level, Logger}

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
Expand Down Expand Up @@ -80,7 +80,9 @@ private[yarn] class YarnAllocator(
private val releasedContainers = Collections.newSetFromMap[ContainerId](
new ConcurrentHashMap[ContainerId, java.lang.Boolean])

@volatile private var numExecutorsRunning = 0
private val numExecutorsRunning = new AtomicInteger(0)

private val numExecutorsStarting = new AtomicInteger(0)

/**
* Used to generate a unique ID per executor
Expand Down Expand Up @@ -163,7 +165,7 @@ private[yarn] class YarnAllocator(
clock = newClock
}

def getNumExecutorsRunning: Int = numExecutorsRunning
def getNumExecutorsRunning: Int = numExecutorsRunning.get()

def getNumExecutorsFailed: Int = synchronized {
val endTime = clock.getTimeMillis()
Expand Down Expand Up @@ -242,7 +244,7 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.get(executorId).get
internalReleaseContainer(container)
numExecutorsRunning -= 1
numExecutorsRunning.decrementAndGet()
} else {
logWarning(s"Attempted to kill unknown executor $executorId!")
}
Expand All @@ -267,10 +269,12 @@ private[yarn] class YarnAllocator(
val allocatedContainers = allocateResponse.getAllocatedContainers()

if (allocatedContainers.size > 0) {
logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
numExecutorsRunning,
numExecutorsRunning.get,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))

handleAllocatedContainers(allocatedContainers.asScala)
Expand All @@ -281,7 +285,7 @@ private[yarn] class YarnAllocator(
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, numExecutorsRunning))
.format(completedContainers.size, numExecutorsRunning.get))
}
}

Expand All @@ -294,7 +298,11 @@ private[yarn] class YarnAllocator(
def updateResourceRequests(): Unit = {
val pendingAllocate = getPendingAllocate
val numPendingAllocate = pendingAllocate.size
val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
val missing = targetNumExecutors - numPendingAllocate -
numExecutorsStarting.get - numExecutorsRunning.get
logDebug(s"Updating resource requests, target: $targetNumExecutors, " +
s"pending: $numPendingAllocate, running: ${numExecutorsRunning.get}, " +
s"executorsStarting: ${numExecutorsStarting.get}")

if (missing > 0) {
logInfo(s"Will request $missing executor container(s), each with " +
Expand Down Expand Up @@ -493,7 +501,8 @@ private[yarn] class YarnAllocator(
s"for executor with ID $executorId")

def updateInternalState(): Unit = synchronized {
numExecutorsRunning += 1
numExecutorsRunning.incrementAndGet()
numExecutorsStarting.decrementAndGet()
executorIdToContainer(executorId) = container
containerIdToExecutorId(container.getId) = executorId

Expand All @@ -503,7 +512,8 @@ private[yarn] class YarnAllocator(
allocatedContainerToHostMap.put(containerId, executorHostname)
}

if (numExecutorsRunning < targetNumExecutors) {
if (numExecutorsRunning.get < targetNumExecutors) {
numExecutorsStarting.incrementAndGet()
if (launchContainers) {
launcherPool.execute(new Runnable {
override def run(): Unit = {
Expand All @@ -523,11 +533,16 @@ private[yarn] class YarnAllocator(
).run()
updateInternalState()
} catch {
case NonFatal(e) =>
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately to avoid unnecessary resource
// occupation.
amClient.releaseAssignedContainer(containerId)
case e: Throwable =>
numExecutorsStarting.decrementAndGet()
if (NonFatal(e)) {
logError(s"Failed to launch executor $executorId on container $containerId", e)
// Assigned container should be released immediately
// to avoid unnecessary resource occupation.
amClient.releaseAssignedContainer(containerId)
} else {
throw e
}
}
}
})
Expand All @@ -537,7 +552,8 @@ private[yarn] class YarnAllocator(
}
} else {
logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
"reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
"reached target Executors count: %d.").format(
numExecutorsRunning.get, targetNumExecutors))
}
}
}
Expand All @@ -552,7 +568,7 @@ private[yarn] class YarnAllocator(
val exitReason = if (!alreadyReleased) {
// Decrement the number of executors running. The next iteration of
// the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning -= 1
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s%s (state: %s, exit status: %s)".format(
containerId,
onHostStr,
Expand Down

0 comments on commit e5ec339

Please sign in to comment.