Skip to content

Commit

Permalink
should reduce active threads when fatal exception
Browse files Browse the repository at this point in the history
  • Loading branch information
wzhfy committed Aug 30, 2020
1 parent cfe012a commit eb8b0b3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
* Calls action closure, and calls the endpoint's onError function in the case of exceptions.
*/
private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {
def dealWithFatalException(fatal: Throwable): Unit = {
inbox.synchronized {
// Should reduce the number of active threads before throw exception
if (numActiveThreads != 0) {
numActiveThreads -= 1
}
}
throw fatal
}

try action catch {
case NonFatal(e) =>
try endpoint.onError(e) catch {
Expand All @@ -209,8 +219,18 @@ private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
} else {
logError("Ignoring error", ee)
}
case fatal =>
dealWithFatalException(fatal)
}
case fatal =>
dealWithFatalException(fatal)
}
}

// exposed only for testing
def getNumActiveThreads: Int = {
inbox.synchronized {
inbox.numActiveThreads
}
}
}
13 changes: 13 additions & 0 deletions core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,17 @@ class InboxSuite extends SparkFunSuite {

endpoint.verifySingleOnNetworkErrorMessage(cause, remoteAddress)
}

test("SPARK-32738: should reduce active thread when fatal exception") {
val endpoint = mock(classOf[TestRpcEndpoint])
when(endpoint.receive).thenThrow(new OutOfMemoryError())

val dispatcher = mock(classOf[Dispatcher])
val inbox = new Inbox("name", endpoint)
inbox.post(OneWayMessage(null, "hi"))
intercept[OutOfMemoryError] {
inbox.process(dispatcher)
}
assert(inbox.getNumActiveThreads == 0)
}
}

0 comments on commit eb8b0b3

Please sign in to comment.