Skip to content

Commit

Permalink
[SPARK-32738][CORE] Should reduce the number of active threads if fat…
Browse files Browse the repository at this point in the history
…al error happens in `Inbox.process`

### What changes were proposed in this pull request?

Processing for `ThreadSafeRpcEndpoint` is controlled by  `numActiveThreads` in `Inbox`. Now if any fatal error happens during `Inbox.process`, `numActiveThreads` is not reduced. Then other threads can not process messages in that inbox, which causes the endpoint to "hang". For other type of endpoints, we also should keep  `numActiveThreads` correct.

This problem is more serious in previous Spark 2.x versions since the driver, executor and block manager endpoints are all thread safe endpoints.

To fix this, we should reduce the number of active threads if fatal error happens in `Inbox.process`.

### Why are the changes needed?

`numActiveThreads` is not correct when fatal error happens and will cause the described problem.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add a new test.

Closes #29580 from wzhfy/deal_with_fatal_error.

Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
wzhfy committed Sep 15, 2020
1 parent fe6ff15 commit 589061c
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 0 deletions.
20 changes: 20 additions & 0 deletions core/src/main/scala/org/apache/spark/rpc/netty/Inbox.scala
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,16 @@ private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
* Calls action closure, and calls the endpoint's onError function in the case of exceptions.
*/
private def safelyCall(endpoint: RpcEndpoint)(action: => Unit): Unit = {
def dealWithFatalError(fatal: Throwable): Unit = {
inbox.synchronized {
assert(numActiveThreads > 0, "The number of active threads should be positive.")
// Should reduce the number of active threads before throw the error.
numActiveThreads -= 1
}
logError(s"An error happened while processing message in the inbox for $endpointName", fatal)
throw fatal
}

try action catch {
case NonFatal(e) =>
try endpoint.onError(e) catch {
Expand All @@ -209,8 +219,18 @@ private[netty] class Inbox(val endpointName: String, val endpoint: RpcEndpoint)
} else {
logError("Ignoring error", ee)
}
case fatal: Throwable =>
dealWithFatalError(fatal)
}
case fatal: Throwable =>
dealWithFatalError(fatal)
}
}

// exposed only for testing
def getNumActiveThreads: Int = {
inbox.synchronized {
inbox.numActiveThreads
}
}
}
13 changes: 13 additions & 0 deletions core/src/test/scala/org/apache/spark/rpc/netty/InboxSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,17 @@ class InboxSuite extends SparkFunSuite {

endpoint.verifySingleOnNetworkErrorMessage(cause, remoteAddress)
}

test("SPARK-32738: should reduce the number of active threads when fatal error happens") {
val endpoint = mock(classOf[TestRpcEndpoint])
when(endpoint.receive).thenThrow(new OutOfMemoryError())

val dispatcher = mock(classOf[Dispatcher])
val inbox = new Inbox("name", endpoint)
inbox.post(OneWayMessage(null, "hi"))
intercept[OutOfMemoryError] {
inbox.process(dispatcher)
}
assert(inbox.getNumActiveThreads == 0)
}
}

0 comments on commit 589061c

Please sign in to comment.