Skip to content

Commit

Permalink
[SPARK-17022][YARN] Handle potential deadlock in driver handling mess…
Browse files Browse the repository at this point in the history
…ages

## What changes were proposed in this pull request?

We directly send RequestExecutors to AM instead of transfer it to yarnShedulerBackend first, to avoid potential deadlock.

## How was this patch tested?

manual tests

Author: WangTaoTheTonic <wangtao111@huawei.com>

Closes #14605 from WangTaoTheTonic/lock.
  • Loading branch information
WangTaoTheTonic authored and Marcelo Vanzin committed Aug 11, 2016
1 parent 4ec5c36 commit ea0bf91
Showing 1 changed file with 15 additions and 3 deletions.
Expand Up @@ -125,8 +125,20 @@ private[spark] abstract class YarnSchedulerBackend(
* This includes executors already pending or running.
*/
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
yarnSchedulerEndpointRef.askWithRetry[Boolean](
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)
yarnSchedulerEndpoint.amEndpoint match {
case Some(am) =>
try {
am.askWithRetry[Boolean](r)
} catch {
case NonFatal(e) =>
logError(s"Sending $r to AM was unsuccessful", e)
return false
}
case None =>
logWarning("Attempted to request executors before the AM has registered!")
return false
}
}

/**
Expand Down Expand Up @@ -209,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend(
*/
private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with Logging {
private var amEndpoint: Option[RpcEndpointRef] = None
var amEndpoint: Option[RpcEndpointRef] = None

private val askAmThreadPool =
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
Expand Down

0 comments on commit ea0bf91

Please sign in to comment.