Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-27112] : Spark Scheduler encounters two independent Deadlocks … #24035

Closed
wants to merge 3 commits into from

Conversation

@pgandhi999
Copy link
Contributor

pgandhi999 commented Mar 8, 2019

…when trying to kill executors either due to dynamic allocation or blacklisting

Recently, a few spark users in the organization have reported that their jobs were getting stuck. On further analysis, it was found out that there exist two independent deadlocks and either of them occur under different circumstances. The screenshots for these two deadlocks are attached here.

We were able to reproduce the deadlocks with the following piece of code:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark._
import org.apache.spark.TaskContext

// Simple example of Word Count in Scala
object ScalaWordCount {
def main(args: Array[String]) {

if (args.length < 2) {
System.err.println("Usage: ScalaWordCount <inputFilesURI> <outputFilesUri>")
System.exit(1)
}

val conf = new SparkConf().setAppName("Scala Word Count")
val sc = new SparkContext(conf)

// get the input file uri
val inputFilesUri = args(0)

// get the output file uri
val outputFilesUri = args(1)

while (true) {
val textFile = sc.textFile(inputFilesUri)
val counts = textFile.flatMap(line => line.split(" "))
.map(word => {if (TaskContext.get.partitionId == 5 && TaskContext.get.attemptNumber == 0) throw new Exception("Fail for blacklisting") else (word, 1)})
.reduceByKey(_ + _)
counts.saveAsTextFile(outputFilesUri)
val conf: Configuration = new Configuration()
val path: Path = new Path(outputFilesUri)
val hdfs: FileSystem = FileSystem.get(conf)
hdfs.delete(path, true)
}

sc.stop()
}
}

Additionally, to ensure that the deadlock surfaces up soon enough, I also added a small delay in the Spark code here:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/BlacklistTracker.scala#L256

executorIdToFailureList.remove(exec)
updateNextExpiryTime()
Thread.sleep(2000)
killBlacklistedExecutor(exec)

Also make sure that the following configs are set when launching the above spark job:
spark.blacklist.enabled=true
spark.blacklist.killBlacklistedExecutors=true
spark.blacklist.application.maxFailedTasksPerExecutor=1

Screenshots for deadlock between task-result-getter-thread and spark-dynamic-executor-allocation thread:
Screen Shot 2019-02-26 at 4 10 26 PM

Screen Shot 2019-02-26 at 4 10 48 PM

Screenshots for deadlock between task-result-getter-thread and dispatcher-event-loop thread:

Screen Shot 2019-02-26 at 4 11 11 PM

Screen Shot 2019-02-26 at 4 11 26 PM

What changes were proposed in this pull request?

There are two deadlocks as a result of the interplay between three different threads:

task-result-getter thread

spark-dynamic-executor-allocation thread

dispatcher-event-loop thread(makeOffers())

The fix for the deadlock between dynamic allocation thread and result getter thread involves moving the method isExecutorBusy outside of the lock CoarseGrainedSchedulerBackend.this.

The fix for the deadlock between event loop thread and result getter thread involves removing synchronized on CoarseGrainedSchedulerBackend.this. The same synchronization has been replaced by a dummy lock to ensure synchronization between dynamic allocation thread and event loop thread in order to fix https://issues.apache.org/jira/browse/SPARK-19757 but avoid the deadlock scenario which was introduced by the code changes for the above JIRA.

How was this patch tested?

The code used to reproduce the deadlock issue is documented above.

…when trying to kill executors either due to dynamic allocation or blacklisting

There are two deadlocks as a result of the interplay between three different threads:

task-result-getter thread

spark-dynamic-executor-allocation thread

dispatcher-event-loop thread(makeOffers())

The fix for the deadlock between dynamic allocation thread and result getter thread involves moving the method isExecutorBusy outside of the lock CoarseGrainedSchedulerBackend.this.

The fix for the deadlock between event loop thread and result getter thread involves removing synchronized on CoarseGrainedSchedulerBackend.this. The same synchronization has been replaced by a dummy lock to ensure synchronization between dynamic allocation thread and event loop thread.
@pgandhi999

This comment has been minimized.

Copy link
Contributor Author

pgandhi999 commented Mar 8, 2019

ok to test

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Mar 9, 2019

Test build #103238 has finished for PR 24035 at commit d01c0ef.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
Copy link
Contributor

abellina left a comment

@pgandhi999 I think the InvalidUseOfMatchersException suggests that an any() should be added to the killExecutors calls in a couple of the failing suites, as you added an argument. The verify calls also expect the blacklistingOnTaskCompletion argument. Note, on ExecutorAllocationManagerSuite we need the extra argument on the when(killExecutor...) calls, looks like Mockito doesn't like the default arguments not specified, but I am not 100% on this.

The other failure is a bug in killExecutors, commented on below.

numPendingExecutors += executorsToKill.size
Future.successful(true)
var response: Future[Seq[String]] = null
val idleExecutorIds = executorIds.filter { id => !scheduler.isExecutorBusy(id) }

This comment has been minimized.

Copy link
@abellina

abellina Mar 9, 2019

Contributor

This should be force || !scheduler.isExecutorBusy(id). That's the test failure "org.apache.spark.deploy.StandaloneDynamicAllocationSuite.disable force kill for busy executors (SPARK-9552)"

This comment has been minimized.

Copy link
@pgandhi999

pgandhi999 Mar 11, 2019

Author Contributor

Yes, have fixed it. Thank you @abellina .

Copy link
Contributor

attilapiros left a comment

Could you please provide the stack traces in text format as well? I would like to open them in my IDE to follow the call stacks.

@@ -111,6 +111,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

// YSPARK-1163: This lock is explicitly added here to keep the changes introduced by SPARK-19757

This comment has been minimized.

Copy link
@attilapiros

attilapiros Mar 11, 2019

Contributor

NIT, extra Y prefix: "YSPARK-1163" => "SPARK-1163"
And also at the end of this comment block.

This comment has been minimized.

Copy link
@pgandhi999

pgandhi999 Mar 11, 2019

Author Contributor

Yep took care of that. Thank you.

Future.successful(true)
var response: Future[Seq[String]] = null
val idleExecutorIds = executorIds.filter { id => !scheduler.isExecutorBusy(id) }
if (!blacklistingOnTaskCompletion) {

This comment has been minimized.

Copy link
@attilapiros

attilapiros Mar 11, 2019

Contributor

What would be the performance cost to always use makeOffersLock (and deplementing the flag blacklistingOnTaskCompletion)? As this code is already quite complex and with the boolean flag dependent locking I think it will be even harder to follow.

This comment has been minimized.

Copy link
@pgandhi999

pgandhi999 Mar 11, 2019

Author Contributor

I agree that the code fix is a little tricky here, however, as far as I have tested, I have not seen a performance degradation in the job running time by addition of the extra lock.

This comment has been minimized.

Copy link
@attilapiros

attilapiros Mar 11, 2019

Contributor

Sorry, there must be some misunderstanding here.
My suggestion is removing this if condition completely and using always:

makeOffersLock.synchronized {
  response = synchronized {
    killExecutorsImpl(idleExecutorIds, adjustTargetNumExecutors, countFailures, force)
  }
}

And as you got rid of the if you can remove the blacklistingOnTaskCompletion from the methods's arguments as well.

As the order of locking always starts makeOffersLock I think this should be enough to avoid the deadlock.

This comment has been minimized.

Copy link
@pgandhi999

pgandhi999 Mar 11, 2019

Author Contributor

The flag blacklistingOnTaskCompletion is needed to ensure that the thread "task-result-getter-x" should not try to acquire the lock on makeOffersLock which is a necessary condition to avoid the deadlock between "task-result-getter" thread and "dispatcher-event-loop" thread.

The reason is that when "task-result-getter" thread reaches the method killExecutors(), it has already acquired the lock on TaskSchedulerImpl and will try to acquire makeOffersLock. The "dispatcher-event-loop" thread on the other hand, acquires makeOffersLock and will wait on acquiring TaskSchedulerImpl lock in the method resourceOffers(), thus leading to the deadlock.

This comment has been minimized.

Copy link
@attilapiros

attilapiros Mar 12, 2019

Contributor

Ok I see. I checked the first deadlock and I think the problem is in org.apache.spark.scheduler.TaskSchedulerImpl#isExecutorBusy:

def isExecutorBusy(execId: String): Boolean = synchronized {
executorIdToRunningTaskIds.get(execId).exists(_.nonEmpty)
}

That synchronised is too restrictive here for reading a snapshot state of the executorIdToRunningTaskIds map. For this problem a solution could be just using TrieMap, which is "A concurrent hash-trie or TrieMap is a concurrent thread-safe lock-free implementation of a hash array mapped trie".

If you change the type of executorIdToRunningTaskIds from HashMap to TrieMap then you can remove the synchronised from isExecutorBusy.

I have checked and the isExecutorBusy is only used from two places:

  • org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers where we already in a synchronised block, so with the type change the behaviour is the same as before
  • org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#killExecutors where we already lived with a snapshot state which could be outdated after the method call

Regarding the second deadlock I will continue my analyses.

This comment has been minimized.

Copy link
@attilapiros

attilapiros Mar 12, 2019

Contributor

Yes, I just focused on saving the extra lock first.

But we could keep track of the executor IDs where tasks are scheduled/running separately in a concurrently accessable set (volatile reference for an Immutable Set or CopyOnWriteArraySet).

The method isExecutorBusy could use this new set. So we can keep HashMap for executorIdToRunningTaskIds and still we are not introducing that lock.

This comment has been minimized.

Copy link
@squito

squito Mar 12, 2019

Contributor

I don't think that makeOffersLock solves the deadlock here. You wont' get a deadlock between the same two locks, but now it can be with makeOffersLock instead. Consider this sequence (some simplification of full call stack, but showing the important locks at least)

  1. taskresultgetter: handleFailedTask --> lock on taskSchedulerImpl

  2. taskresultgetter: BlacklistTracker.killExecutor

  3. dispatcher: receive --> lock on CoarseGrainedSchedulerBackendkk

  4. dispatcher: makeOffers --> lock on makeOffersLock

  5. dispatcher: blocked on TaskSchedulerImpl lock

  6. taskResultGetter: makeOffers, but blocked on makeOffersLock

As Attila suggested, I would consider creating an ordering between the TaskSchedulerImpl lock and the CoarseGrainedSchedulerBackend lock, so that we always get the TaskSchedulerImpl lock first. Of course that comes with a performance penalty, and we will have to audit all other uses of the CoarseGrainedSchedulerBackend lock too.

Still thinking about any other options ...

This comment has been minimized.

Copy link
@pgandhi999

pgandhi999 Mar 12, 2019

Author Contributor

@squito I agree with you and @attilapiros about creating an ordering. I shall definitely follow the approach and try it out.

Regarding your comment on the deadlock between makeOffersLock and task-result-getter thread, that should ideally not happen as the task-result-getter thread will never compete for acquiring makeOffersLock. The reason I have added the flag blacklistingForTaskCompletion is to ensure that task-result-getter thread never acquires lock on makeOffersLock.

Also, you are right in saying that makeOffersLock does not solve the deadlock. I have explained the purpose of makeOffersLock in my comment below. Quoting it here:

I can explain more about the makeOffersLock here.

PR #17091 introduced the part about acquiring synchronization on CoarseGrainedSchedulerBackend object in the method makeOffers(). This particular piece of code introduced a deadlock between task-result-getter thread and dispatcher-event-loop thread. I can simply removed the synchronized statement in makeOffers() and the deadlock would be resolved and we really do not need makeOffersLock.

However, removing the synchronized statement will once again expose the race condition described in JIRA https://issues.apache.org/jira/browse/SPARK-19757 for which the fix in the corresponding PR was merged. makeOffersLock here serves as the solution to the above problem. By synchronizing on makeOffersLock, the race condition between dynamic-executor-allocation thread and dispatcher-event-loop thread is avoided. That is indeed it's sole purpose. I am however, open to discussing and working on better solutions to the above problem, if any. Thank you.

This comment has been minimized.

Copy link
@pgandhi999

pgandhi999 Mar 12, 2019

Author Contributor

I am basically trying to solve the two deadlocks and also fix the race condition issue for SPARK-19757. I think the approach of ordering alongwith using a concurrently accessible separate Set as suggested by @attilapiros and @squito should work out. Let me work on that and get back to you.

This comment has been minimized.

Copy link
@squito

squito Mar 12, 2019

Contributor

ah I see, sorry I misread part of the logic, thanks

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Mar 11, 2019

Test build #103333 has finished for PR 24035 at commit 764ace1.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
@pgandhi999

This comment has been minimized.

Copy link
Contributor Author

pgandhi999 commented Mar 11, 2019

@attilapiros Have attached the stack trace in text format here:

Deadlock between task-result-getter-thread and spark-dynamic-executor-allocation thread:

=============================
"task-result-getter-0":
  waiting to lock monitor 0x00007f35dcf25cb8 (object 0x00000004404f2518, a org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend),
  which is held by "spark-dynamic-executor-allocation"
"spark-dynamic-executor-allocation":
  waiting to lock monitor 0x00007f35dc20f1f8 (object 0x00000004404f25c0, a org.apache.spark.scheduler.cluster.YarnClusterScheduler),
  which is held by "task-result-getter-0"


Java stack information for the threads listed above:
===================================================
"task-result-getter-0":
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors(CoarseGrainedSchedulerBackend.scala:603)
        - waiting to lock <0x00000004404f2518> (a org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend)
        at org.apache.spark.scheduler.BlacklistTracker.org$apache$spark$scheduler$BlacklistTracker$$killBlacklistedExecutor(BlacklistTracker.scala:155)
        at org.apache.spark.scheduler.BlacklistTracker$$anonfun$updateBlacklistForSuccessfulTaskSet$1.apply(BlacklistTracker.scala:247)
        at org.apache.spark.scheduler.BlacklistTracker$$anonfun$updateBlacklistForSuccessfulTaskSet$1.apply(BlacklistTracker.scala:226)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
        at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
        at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
        at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
        at org.apache.spark.scheduler.BlacklistTracker.updateBlacklistForSuccessfulTaskSet(BlacklistTracker.scala:226)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$maybeFinishTaskSet$1.apply(TaskSetManager.scala:530)
        at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$maybeFinishTaskSet$1.apply(TaskSetManager.scala:530)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$maybeFinishTaskSet(TaskSetManager.scala:530)
        at org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:787)
        at org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:466)
        - locked <0x00000004404f25c0> (a org.apache.spark.scheduler.cluster.YarnClusterScheduler)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:113)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$1.apply(TaskResultGetter.scala:63)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2004)
        at org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:62)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)


"spark-dynamic-executor-allocation":
        at org.apache.spark.scheduler.TaskSchedulerImpl.isExecutorBusy(TaskSchedulerImpl.scala:647)
        - waiting to lock <0x00000004404f25c0> (a org.apache.spark.scheduler.cluster.YarnClusterScheduler)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$$anonfun$9.apply(CoarseGrainedSchedulerBackend.scala:613)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$$anonfun$9.apply(CoarseGrainedSchedulerBackend.scala:613)
        at scala.collection.TraversableLike$$anonfun$filterImpl$1.apply(TraversableLike.scala:248)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at scala.collection.TraversableLike$class.filterImpl(TraversableLike.scala:247)
        at scala.collection.TraversableLike$class.filter(TraversableLike.scala:259)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:104)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors(CoarseGrainedSchedulerBackend.scala:613)
       - locked <0x00000004404f2518> (a org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend)
        at org.apache.spark.ExecutorAllocationManager.removeExecutors(ExecutorAllocationManager.scala:481)

        - locked <0x00000004442fb590> (a org.apache.spark.ExecutorAllocationManager)
        at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:321)
        - locked <0x00000004442fb590> (a org.apache.spark.ExecutorAllocationManager)
        at org.apache.spark.ExecutorAllocationManager$$anon$2.run(ExecutorAllocationManager.scala:246)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Deadlock between task-result-getter-thread and dispatcher-event-loop thread:

Found one Java-level deadlock:
=============================
"task-result-getter-2":
  waiting to lock monitor 0x00007f9be88b2678 (object 0x00000003c0720ed0, a org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend),
  which is held by "dispatcher-event-loop-23"
"dispatcher-event-loop-23":
  waiting to lock monitor 0x00007f9bf077abb8 (object 0x00000003c0720f78, a org.apache.spark.scheduler.cluster.YarnClusterScheduler),
  which is held by "task-result-getter-2"

Java stack information for the threads listed above:
===================================================
"task-result-getter-2":
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.killExecutors(CoarseGrainedSchedulerBackend.scala:604)
	- waiting to lock <0x00000003c0720ed0> (a org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend)
	at org.apache.spark.scheduler.BlacklistTracker.killExecutor(BlacklistTracker.scala:153)
	at org.apache.spark.scheduler.BlacklistTracker.org$apache$spark$scheduler$BlacklistTracker$$killBlacklistedExecutor(BlacklistTracker.scala:163)
	at org.apache.spark.scheduler.BlacklistTracker$$anonfun$updateBlacklistForSuccessfulTaskSet$1.apply(BlacklistTracker.scala:257)
	at org.apache.spark.scheduler.BlacklistTracker$$anonfun$updateBlacklistForSuccessfulTaskSet$1.apply(BlacklistTracker.scala:236)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:99)
	at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:230)
	at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:40)
	at scala.collection.mutable.HashMap.foreach(HashMap.scala:99)
	at org.apache.spark.scheduler.BlacklistTracker.updateBlacklistForSuccessfulTaskSet(BlacklistTracker.scala:236)
	at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$maybeFinishTaskSet$1.apply(TaskSetManager.scala:530)
	at org.apache.spark.scheduler.TaskSetManager$$anonfun$org$apache$spark$scheduler$TaskSetManager$$maybeFinishTaskSet$1.apply(TaskSetManager.scala:530)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.TaskSetManager.org$apache$spark$scheduler$TaskSetManager$$maybeFinishTaskSet(TaskSetManager.scala:530)
	at org.apache.spark.scheduler.TaskSetManager.handleFailedTask(TaskSetManager.scala:916)
	at org.apache.spark.scheduler.TaskSchedulerImpl.handleFailedTask(TaskSchedulerImpl.scala:539)
	- locked <0x00000003c0720f78> (a org.apache.spark.scheduler.cluster.YarnClusterScheduler)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$4$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:150)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$4$$anonfun$run$2.apply(TaskResultGetter.scala:132)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$4$$anonfun$run$2.apply(TaskResultGetter.scala:132)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2005)
	at org.apache.spark.scheduler.TaskResultGetter$$anon$4.run(TaskResultGetter.scala:132)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
"dispatcher-event-loop-23":
	at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:321)
	- waiting to lock <0x00000003c0720f78> (a org.apache.spark.scheduler.cluster.YarnClusterScheduler)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint.org$apache$spark$scheduler$cluster$CoarseGrainedSchedulerBackend$DriverEndpoint$$makeOffers(CoarseGrainedSchedulerBackend.scala:248)
	- locked <0x00000003c0720ed0> (a org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend)
	at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:136)
	at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
	at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
	at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
	at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
@pgandhi999

This comment has been minimized.

Copy link
Contributor Author

pgandhi999 commented Mar 11, 2019

@SparkQA

This comment has been minimized.

Copy link

SparkQA commented Mar 11, 2019

Test build #103335 has finished for PR 24035 at commit 32aabd8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
Copy link
Contributor

squito left a comment

haven't fully digested this yet, but I do at least agree about the bug, thanks for reporting.

I don't understand the use of makeOffersLock yet, but I will give it a more careful read later

@pgandhi999

This comment has been minimized.

Copy link
Contributor Author

pgandhi999 commented Mar 12, 2019

@squito Thank you for your response. I can explain more about the makeOffersLock here.

PR #17091 introduced the part about acquiring synchronization on CoarseGrainedSchedulerBackend object in the method makeOffers(). This particular piece of code introduced a deadlock between task-result-getter thread and dispatcher-event-loop thread. I can simply removed the synchronized statement in makeOffers() and the deadlock would be resolved and we really do not need makeOffersLock.

However, removing the synchronized statement will once again expose the race condition described in JIRA https://issues.apache.org/jira/browse/SPARK-19757 for which the fix in the corresponding PR was merged. makeOffersLock here serves as the solution to the above problem. By synchronizing on makeOffersLock, the race condition between dynamic-executor-allocation thread and dispatcher-event-loop thread is avoided. That is indeed it's sole purpose. I am however, open to discussing and working on better solutions to the above problem, if any. Thank you.

@dhruve

This comment has been minimized.

Copy link
Contributor

dhruve commented Mar 12, 2019

I think if we fix the lock ordering for the involved threads, this will solve the issue.

The current order in which locks are being acquired for individual threads is:

TaskResultGetter Order:

  • Lock YarnClusterScheduler
  • Lock CoarseGrainedSchedulerBackend

DispatcherEventLoop Order:

  • Lock CoarseGrainedSchedulerBackend
  • Lock YarnClusterScheduler

SparkDynamicExecutorAllocation Order:

  • Lock ExecutorAllocationManager
  • Lock CoarseGrainedSchedulerBackend
  • Lock TaskSchedulerImpl/YarnClusterScheduler

Solution:
The methods which are resulting in the deadlock are from activity in the CoarseGrainedSchedulerBackend.

  1. KillExecutors: The only check which requires the lock on TSI/YCS is to check if the executor is busy or not. We can bump up the check for idle executors before synchronizing on CGSB. This will fix the lock order for the dynamic allocation thread.

  2. MakeOffers: This currently acquires the lock on CGSB to ensure executors are not killed while a task is being offered on them. And eventually makes the resourceOffer on the scheduler which is where it acquires the second lock. I agree with @attilapiros suggestion here to fix the second lock ordering issue by synchronizing on the scheduler first and then the backend.

These 2 changes should align the ordering sequence and seem to be simple to reason about. I think this should solve the issue, but it would be good to have more contributors eyeball this change.

@pgandhi999

This comment has been minimized.

Copy link
Contributor Author

pgandhi999 commented Mar 12, 2019

@squito @attilapiros @abellina @dhruve @vanzin Thank you for all your suggestions and comments. Have combined @squito @attilapiros and @dhruve 's suggestions into another PR: #24072. This PR uses ordering on lock acquisition and fixes the deadlock issue. It is relatively a lot simpler to reason about than the current PR. Request you to have a look and let me know so that I can close this PR. Thank you.

@vanzin

This comment has been minimized.

Copy link

vanzin commented Mar 15, 2019

I guess we can close this one?

@pgandhi999 pgandhi999 closed this Mar 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
7 participants
You can’t perform that action at this time.