Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31922][CORE] logDebug "RpcEnv already stopped" error on LocalSparkCluster shutdown #28746

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -76,8 +76,8 @@ class LocalSparkCluster(
logInfo("Shutting down local Spark cluster.")
// Stop the workers before the master so they don't get upset that it disconnected
workerRpcEnvs.foreach(_.shutdown())
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved
masterRpcEnvs.foreach(_.shutdown())
workerRpcEnvs.foreach(_.awaitTermination())
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved
masterRpcEnvs.foreach(_.shutdown())
masterRpcEnvs.foreach(_.awaitTermination())
masterRpcEnvs.clear()
workerRpcEnvs.clear()
Expand Down
Expand Up @@ -147,7 +147,14 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) exte
/** Posts a one-way message. */
def postOneWayMessage(message: RequestMessage): Unit = {
postMessage(message.receiver.name, OneWayMessage(message.senderAddress, message.content),
(e) => throw e)
(e) => e match {
// SPARK-31922: in local cluster mode, there's always a RpcEnvStoppedException when
// stop is called due to some asynchronous message handling. We catch the exception
// and log it at debug level to avoid verbose error message when user stop a local
// cluster in spark shell.
case re: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${re.getMessage}")
Ngone51 marked this conversation as resolved.
Show resolved Hide resolved
case _ => throw e
})
}

/**
Expand Down
Expand Up @@ -560,7 +560,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

override def reviveOffers(): Unit = {
override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change fixes the failure of test org.apache.spark.launcher.LauncherBackendSuite.standalone/client: launcher handle. After sleeping one more second, the application launched by the SparkLauncher now has a chance to submit tasks to TaskScheduler and call reviveOffers on the SchedulerBackend. At the same time, the SparkLauncher will ask the application to stop. Therefore, the SchedulerBackend could have been already stopped when it receives ReviveOffers messages, which would fail the entire application at the end.

So, I use Utils.tryLogNonFatalError to fix it and I think this should be fine since we've already use it at:

reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)

driverEndpoint.send(ReviveOffers)
}

Expand Down