Skip to content

Commit

Permalink
[SPARK-27094][YARN] Work around RackResolver swallowing thread interr…
Browse files Browse the repository at this point in the history
…upt.

To avoid the case where the YARN libraries would swallow the exception and
prevent YarnAllocator from shutting down, call the offending code in a
separate thread, so that the parent thread can respond appropriately to
the shut down.

As a safeguard, also explicitly stop the executor launch thread pool when
shutting down the application, to prevent new executors from coming up
after the application started its shutdown.

Tested with unit tests + some internal tests on real cluster.

Closes #24017 from vanzin/SPARK-27094.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
Marcelo Vanzin committed Mar 20, 2019
1 parent c65f9b2 commit ec5e342
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 79 deletions.
Expand Up @@ -550,88 +550,94 @@ private[spark] class ApplicationMaster(
reporterThread.join()
}

private def launchReporterThread(): Thread = {
// The number of failures in a row until Reporter thread give up
private def allocationThreadImpl(): Unit = {
// The number of failures in a row until the allocation thread gives up.
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)

val t = new Thread {
override def run() {
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else if (allocator.isAllNodeBlacklisted) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Due to executor failures all available nodes are blacklisted")
} else {
logDebug("Sending progress")
allocator.allocateResources()
}
failureCount = 0
} catch {
case i: InterruptedException => // do nothing
case e: ApplicationAttemptNotFoundException =>
failureCount += 1
logError("Exception from Reporter thread.", e)
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
e.getMessage)
case e: Throwable =>
failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else if (allocator.isAllNodeBlacklisted) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Due to executor failures all available nodes are blacklisted")
} else {
logDebug("Sending progress")
allocator.allocateResources()
}
failureCount = 0
} catch {
case i: InterruptedException => // do nothing
case e: ApplicationAttemptNotFoundException =>
failureCount += 1
logError("Exception from Reporter thread.", e)
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
e.getMessage)
case e: Throwable =>
failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
currentAllocationInterval
} else {
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
sleepStartNs = System.nanoTime()
allocatorLock.wait(sleepInterval)
}
val sleepDuration = System.nanoTime() - sleepStartNs
if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
// log when sleep is interrupted
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval ms.")
// if sleep was less than the minimum interval, sleep for the rest of it
val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
if (toSleep > 0) {
logDebug(s"Going back to sleep for $toSleep ms")
// use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
// by the methods that signal allocatorLock because this is just finishing the min
// sleep interval, which should happen even if this is signalled again.
Thread.sleep(toSleep)
}
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
currentAllocationInterval
} else {
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval.")
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
} catch {
case e: InterruptedException =>
sleepStartNs = System.nanoTime()
allocatorLock.wait(sleepInterval)
}
val sleepDuration = System.nanoTime() - sleepStartNs
if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
// log when sleep is interrupted
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval ms.")
// if sleep was less than the minimum interval, sleep for the rest of it
val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
if (toSleep > 0) {
logDebug(s"Going back to sleep for $toSleep ms")
// use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
// by the methods that signal allocatorLock because this is just finishing the min
// sleep interval, which should happen even if this is signalled again.
Thread.sleep(toSleep)
}
} else {
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval.")
}
} catch {
case e: InterruptedException =>
}
}
}

private def launchReporterThread(): Thread = {
val t = new Thread {
override def run(): Unit = {
try {
allocationThreadImpl()
} finally {
allocator.stop()
}
}
}
// setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.setName("Reporter")
t.start()
Expand Down
Expand Up @@ -381,6 +381,13 @@ private[yarn] class YarnAllocator(
}
}

def stop(): Unit = {
// Forcefully shut down the launcher pool, in case this is being called in the middle of
// container allocation. This will prevent queued executors from being started - and
// potentially interrupt active ExecutorRunnable instaces too.
launcherPool.shutdownNow()
}

private def hostStr(request: ContainerRequest): String = {
Option(request.getNodes) match {
case Some(nodes) => nodes.asScala.mkString(",")
Expand Down Expand Up @@ -417,12 +424,40 @@ private[yarn] class YarnAllocator(
containersToUse, remainingAfterHostMatches)
}

// Match remaining by rack
// Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
// (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
// a separate thread to perform the operation.
val remainingAfterRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
if (remainingAfterHostMatches.nonEmpty) {
var exception: Option[Throwable] = None
val thread = new Thread("spark-rack-resolver") {
override def run(): Unit = {
try {
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
} catch {
case e: Throwable =>
exception = Some(e)
}
}
}
thread.setDaemon(true)
thread.start()

try {
thread.join()
} catch {
case e: InterruptedException =>
thread.interrupt()
throw e
}

if (exception.isDefined) {
throw exception.get
}
}

// Assign remaining that are neither node-local nor rack-local
Expand Down

0 comments on commit ec5e342

Please sign in to comment.