Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7174][Core] Move calling TaskScheduler.executorHeartbeatReceived to another thread #5723

Closed
wants to merge 2 commits into from

Conversation

zsxwing
Copy link
Member

@zsxwing zsxwing commented Apr 27, 2015

HeartbeatReceiver will call TaskScheduler.executorHeartbeatReceived, which is a blocking operation because TaskScheduler.executorHeartbeatReceived will call

    blockManagerMaster.driverEndpoint.askWithReply[Boolean](
      BlockManagerHeartbeat(blockManagerId), 600 seconds)

finally. Even if it asks from a local Actor, it may block the current Akka thread. E.g., the reply may be dispatched to the same thread of the ask operation. So the reply cannot be processed. An extreme case is setting the thread number of Akka dispatch thread pool to 1.

jstack log:

"sparkDriver-akka.actor.default-dispatcher-14" daemon prio=10 tid=0x00007f2a8c02d000 nid=0x725 waiting on condition [0x00007f2b1d6d0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x00000006197a0868> (a scala.concurrent.impl.Promise$CompletionLatch)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
    at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
    at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
    at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
    at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
    at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
    at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
    at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
    at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

This PR moved this blocking operation to a separated thread.

…ead to avoid blocking the Akka thread pool
@zsxwing
Copy link
Member Author

zsxwing commented Apr 27, 2015

cc @rxin @yhuai

@SparkQA
Copy link

SparkQA commented Apr 27, 2015

Test build #30968 has started for PR 5723 at commit 5b3b545.

executorLastSeen(executorId) = System.currentTimeMillis()
context.reply(response)
reportExecutorHeartbeatReceived.submit(new Runnable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep creating threads might be bad -- maybe we should have a separate event loop for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can combine timeoutCheckingThread and reportExecutorHeartbeatReceived to a single thread since the actions running in these threads are pretty fast.

@SparkQA
Copy link

SparkQA commented Apr 27, 2015

Test build #31067 has finished for PR 5723 at commit 98bfe48.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.
  • This patch does not change any dependencies.


private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")

override def onStart(): Unit = {
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
timeoutCheckingTask = eventLoopThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ExpireDeadHosts))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not part of this change, but can we change this to

if (self != null) {
  self.send(ExpireDeadHosts)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this line because I was trying to avoid writing the following 3 lines code:

var _self = self
if (_self != null) {
  _self.send(ExpireDeadHosts)
}

self is a method actually. In your code, the second self may return null if it's stopping.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah ok that makes sense.

@rxin
Copy link
Contributor

rxin commented Apr 28, 2015

LGTM.

We might want to instrument EventLoop to log warnings or errors if the rate of messages going in is much larger than the rate of processing.

@rxin
Copy link
Contributor

rxin commented Apr 28, 2015

I've merged this. Thanks.

@asfgit asfgit closed this in 874a2ca Apr 28, 2015
@zsxwing zsxwing deleted the SPARK-7174 branch April 30, 2015 18:30
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request May 14, 2015
…ved` to another thread

`HeartbeatReceiver` will call `TaskScheduler.executorHeartbeatReceived`, which is a blocking operation because `TaskScheduler.executorHeartbeatReceived` will call

```Scala
    blockManagerMaster.driverEndpoint.askWithReply[Boolean](
      BlockManagerHeartbeat(blockManagerId), 600 seconds)
```

finally. Even if it asks from a local Actor, it may block the current Akka thread. E.g., the reply may be dispatched to the same thread of the ask operation. So the reply cannot be processed. An extreme case is setting the thread number of Akka dispatch thread pool to 1.

jstack log:

```
"sparkDriver-akka.actor.default-dispatcher-14" daemon prio=10 tid=0x00007f2a8c02d000 nid=0x725 waiting on condition [0x00007f2b1d6d0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000006197a0868> (a scala.concurrent.impl.Promise$CompletionLatch)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
	at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
	at scala.concurrent.Await$.result(package.scala:107)
	at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
	at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
	at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
	at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

This PR moved this blocking operation to a separated thread.

Author: zsxwing <zsxwing@gmail.com>

Closes apache#5723 from zsxwing/SPARK-7174 and squashes the following commits:

98bfe48 [zsxwing] Use a single thread for checking timeout and reporting executorHeartbeatReceived
5b3b545 [zsxwing] Move calling `TaskScheduler.executorHeartbeatReceived` to another thread to avoid blocking the Akka thread pool
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
…ved` to another thread

`HeartbeatReceiver` will call `TaskScheduler.executorHeartbeatReceived`, which is a blocking operation because `TaskScheduler.executorHeartbeatReceived` will call

```Scala
    blockManagerMaster.driverEndpoint.askWithReply[Boolean](
      BlockManagerHeartbeat(blockManagerId), 600 seconds)
```

finally. Even if it asks from a local Actor, it may block the current Akka thread. E.g., the reply may be dispatched to the same thread of the ask operation. So the reply cannot be processed. An extreme case is setting the thread number of Akka dispatch thread pool to 1.

jstack log:

```
"sparkDriver-akka.actor.default-dispatcher-14" daemon prio=10 tid=0x00007f2a8c02d000 nid=0x725 waiting on condition [0x00007f2b1d6d0000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000006197a0868> (a scala.concurrent.impl.Promise$CompletionLatch)
	at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedNanos(AbstractQueuedSynchronizer.java:1033)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1326)
	at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
	at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
	at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
	at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
	at scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
	at akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
	at scala.concurrent.Await$.result(package.scala:107)
	at org.apache.spark.rpc.RpcEndpointRef.askWithReply(RpcEnv.scala:355)
	at org.apache.spark.scheduler.DAGScheduler.executorHeartbeatReceived(DAGScheduler.scala:169)
	at org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:367)
	at org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1.applyOrElse(HeartbeatReceiver.scala:103)
	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$processMessage(AkkaRpcEnv.scala:182)
	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$4.apply$mcV$sp(AkkaRpcEnv.scala:128)
	at org.apache.spark.rpc.akka.AkkaRpcEnv.org$apache$spark$rpc$akka$AkkaRpcEnv$$safelyCall(AkkaRpcEnv.scala:203)
	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1$$anonfun$receiveWithLogging$1.applyOrElse(AkkaRpcEnv.scala:127)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
	at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
	at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
	at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
	at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
	at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
	at org.apache.spark.rpc.akka.AkkaRpcEnv$$anonfun$actorRef$lzycompute$1$1$$anon$1.aroundReceive(AkkaRpcEnv.scala:94)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
	at akka.actor.ActorCell.invoke(ActorCell.scala:487)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
	at akka.dispatch.Mailbox.run(Mailbox.scala:220)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

This PR moved this blocking operation to a separated thread.

Author: zsxwing <zsxwing@gmail.com>

Closes apache#5723 from zsxwing/SPARK-7174 and squashes the following commits:

98bfe48 [zsxwing] Use a single thread for checking timeout and reporting executorHeartbeatReceived
5b3b545 [zsxwing] Move calling `TaskScheduler.executorHeartbeatReceived` to another thread to avoid blocking the Akka thread pool
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants