Skip to content
Permalink
Browse files

[SPARK-27496][CORE] Fatal errors should also be sent back to the sender

## What changes were proposed in this pull request?

When a fatal error (such as StackOverflowError) throws from "receiveAndReply", we should try our best to notify the sender. Otherwise, the sender will hang until timeout.

In addition, when a MessageLoop is dying unexpectedly, it should resubmit a new one so that Dispatcher is still working.

## How was this patch tested?

New unit tests.

Closes #24396 from zsxwing/SPARK-27496.

Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 009059e)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information...
zsxwing authored and dongjoon-hyun committed Apr 22, 2019
1 parent a85ab12 commit 257abc476dae1a7af68d4e55db7c0afeea2bf831
@@ -224,7 +224,15 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
}
}
} catch {
case ie: InterruptedException => // exit
case _: InterruptedException => // exit
case t: Throwable =>
try {
// Re-submit a MessageLoop so that Dispatcher will still work if
// UncaughtExceptionHandler decides to not kill JVM.
threadpool.execute(new MessageLoop)
} finally {
throw t
}
}
}
}
@@ -106,7 +106,7 @@ private[netty] class Inbox(
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case NonFatal(e) =>
case e: Throwable =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
@@ -17,13 +17,20 @@

package org.apache.spark.rpc.netty

import java.util.concurrent.ExecutionException

import scala.concurrent.duration._

import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.mockito.MockitoSugar

import org.apache.spark._
import org.apache.spark.network.client.TransportClient
import org.apache.spark.rpc._

class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar with TimeLimits {

private implicit val signaler: Signaler = ThreadSignaler

override def createRpcEnv(
conf: SparkConf,
@@ -84,4 +91,48 @@ class NettyRpcEnvSuite extends RpcEnvSuite with MockitoSugar {
msg3,
RequestMessage(nettyEnv, client, msg3.serialize(nettyEnv)))
}

test("StackOverflowError should be sent back and Dispatcher should survive") {
val numUsableCores = 2
val conf = new SparkConf
val config = RpcEnvConfig(
conf,
"test",
"localhost",
"localhost",
0,
new SecurityManager(conf),
numUsableCores,
clientMode = false)
val anotherEnv = new NettyRpcEnvFactory().create(config)
anotherEnv.setupEndpoint("StackOverflowError", new RpcEndpoint {
override val rpcEnv = anotherEnv

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// scalastyle:off throwerror
case msg: String => throw new StackOverflowError
// scalastyle:on throwerror
case num: Int => context.reply(num)
}
})

val rpcEndpointRef = env.setupEndpointRef(anotherEnv.address, "StackOverflowError")
try {
// Send `numUsableCores` messages to trigger `numUsableCores` `StackOverflowError`s
for (_ <- 0 until numUsableCores) {
val e = intercept[SparkException] {
rpcEndpointRef.askSync[String]("hello")
}
// The root cause `e.getCause.getCause` because it is boxed by Scala Promise.
assert(e.getCause.isInstanceOf[ExecutionException])
assert(e.getCause.getCause.isInstanceOf[StackOverflowError])
}
failAfter(10.seconds) {
assert(rpcEndpointRef.askSync[Int](100) === 100)
}
} finally {
anotherEnv.shutdown()
anotherEnv.awaitTermination()
}
}
}

0 comments on commit 257abc4

Please sign in to comment.
You can’t perform that action at this time.