Skip to content
Permalink
Browse files

[SPARK-27112][CORE] : Create a resource ordering between threads to r…

…esolve the deadlocks encountered when trying to kill executors either due to dynamic allocation or blacklisting

Closes #24072 from pgandhi999/SPARK-27112-2.

Authored-by: pgandhi <pgandhiverizonmedia.com>
Signed-off-by: Imran Rashid <irashidcloudera.com>

## What changes were proposed in this pull request?

There are two deadlocks as a result of the interplay between three different threads:

**task-result-getter thread**

**spark-dynamic-executor-allocation thread**

**dispatcher-event-loop thread(makeOffers())**

The fix ensures ordering synchronization constraint by acquiring lock on `TaskSchedulerImpl` before acquiring lock on `CoarseGrainedSchedulerBackend` in `makeOffers()` as well as killExecutors() method. This ensures resource ordering between the threads and thus, fixes the deadlocks.

## How was this patch tested?

Manual Tests

Closes #24134 from pgandhi999/branch-2.4-SPARK-27112.

Authored-by: pgandhi <pgandhi@verizonmedia.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit 95e73b3)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
  • Loading branch information...
pgandhi authored and squito committed Mar 19, 2019
1 parent 959a7ec commit 7bb2b42de9a52e75107469f028c1e707d6a174b7
@@ -238,7 +238,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
@@ -263,7 +263,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val taskDescs = withLock {
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
@@ -600,7 +600,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")

val response = synchronized {
val response = withLock {
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
@@ -678,6 +678,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None }

// SPARK-27112: We need to ensure that there is ordering of lock acquisition
// between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix
// the deadlock issue exposed in SPARK-27112
private def withLock[T](fn: => T): T = scheduler.synchronized {
CoarseGrainedSchedulerBackend.this.synchronized { fn }
}
}

private[spark] object CoarseGrainedSchedulerBackend {

0 comments on commit 7bb2b42

Please sign in to comment.
You can’t perform that action at this time.