Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31922][CORE] logDebug "RpcEnv already stopped" error on LocalSparkCluster shutdown #28746

Closed
wants to merge 8 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Jun 7, 2020

What changes were proposed in this pull request?

Catch the RpcEnvStoppedException and log debug it when stop is called for a LocalSparkCluster.

This PR also contains two small changes to fix the potential issues.

Why are the changes needed?

Currently, there's always "RpcEnv already stopped" error if we exit spark-shell with local-cluster mode:

20/06/07 14:54:18 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.
org.apache.spark.rpc.RpcEnvStoppedException: RpcEnv already stopped.
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:167)
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:150)
        at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:691)
        at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:253)
        at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:111)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:140)
        at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)

When we call stop on StandaloneSchedulerBackend, the backend will firstly send UnregisterApplication to Master and then call stop on LocalSparkCluster immediately. On the other side, Master will send messages to Worker when it receives UnregisterApplication. However, the rpcEnv of the Worker has been already stoped by the backend. Therefore, the error message shows when the Worker tries to handle the messages.

It's only an error on shutdown, users would not like to care about it. So we could hide it in debug log and this is also what we've done previously in #18547.

Does this PR introduce any user-facing change?

Yes, users will not see the error message after this PR.

How was this patch tested?

Tested manually.

// SPARK-31922: wait one more second before shutting down rpcEnvs of master and worker,
// in order to let the cluster have time to handle the `UnregisterApplication` message.
// Otherwise, we could hit "RpcEnv already stopped" error.
Thread.sleep(1000)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1000ms is from my test result. Actually, 500ms could also work. I just want to be conservative without an obvious pause when exiting spark-shell.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is ideal to handle the issue, there can always be lags for various reasons so the error message may still appear. Ideally we should get to know the LocalCluster is stopped on the Master side and don't output the RpcEnv stopped error message but I'm not sure how hard it is to do so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, can we have unregister happen synchronously for the local cluster Ngone51#3

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gerashegalov , thank you for the PR. But I think the PR might not work. Sync from AppClient to Master is not enough. We should also sync the events from Master to Worker after the Master receives the UnregisterApplication message. In Master. finishApplication, you can see that the Master not only reply to AppClient, but also send messages to Worker in an async way. And the error happens exactly on the Worker. But if we do in this way, I'm afraid we add too much special code logic for local cluster only, which I personally think can be overhead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for taking a look @Ngone51 . It's just a sample but, on my box the issue has not shown up after the PR making sure the client is stopped only after finishApplication and reworking the shutdown logic. Agree that perfecting it comes at a price for minimum returns given it's a corner case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on my box the issue has not shown up after the PR making sure the client is stopped only after finishApplication and reworking the shutdown logic

I guess the issue does not show because askSync to Master has increased a little time for Worker to handle the messages comparing to send. But the fact that Master sending messages asynchronously to Worker hasn't changed. So you still can not make sure whether messages have been handled by Worker when stop is called.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, agreed. Since any timeout is either too long or does not provide any guarantees either, I would argue for a practical modest improvement without timeouts.

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 7, 2020

ping @jiangxb1987 @zsxwing @tgravescs Please take a look, thanks!

@SparkQA
Copy link

SparkQA commented Jun 7, 2020

Test build #123600 has finished for PR 28746 at commit d46d4df.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -557,7 +557,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}

override def reviveOffers(): Unit = {
override def reviveOffers(): Unit = Utils.tryLogNonFatalError {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change fixes the failure of test org.apache.spark.launcher.LauncherBackendSuite.standalone/client: launcher handle. After sleeping one more second, the application launched by the SparkLauncher now has a chance to submit tasks to TaskScheduler and call reviveOffers on the SchedulerBackend. At the same time, the SparkLauncher will ask the application to stop. Therefore, the SchedulerBackend could have been already stopped when it receives ReviveOffers messages, which would fail the entire application at the end.

So, I use Utils.tryLogNonFatalError to fix it and I think this should be fine since we've already use it at:

reviveThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(ReviveOffers))
}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123612 has finished for PR 28746 at commit eba978e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

rpcEnv.shutdown()
rpcEnv.awaitTermination()
}}
rpcEnvArr.clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I doubt this is right, it seems previously the masterRpcEnvs are cleared before workerRpcEnvs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it only clears the array of rpcEnvs. so it should not matter?

@SparkQA
Copy link

SparkQA commented Jun 8, 2020

Test build #123616 has finished for PR 28746 at commit 12bf24c.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jun 10, 2020

Test build #123776 has finished for PR 28746 at commit 12bf24c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 15, 2020

Test build #124054 has started for PR 28746 at commit 99fad37.

@SparkQA
Copy link

SparkQA commented Jun 15, 2020

Test build #124055 has started for PR 28746 at commit ddcadfd.

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 15, 2020

What do you guys think about the latest solution? @jiangxb1987 @gerashegalov

@SparkQA
Copy link

SparkQA commented Jun 15, 2020

Test build #124057 has started for PR 28746 at commit a3ed5c1.

@dongjoon-hyun
Copy link
Member

Retest this please.

var busyWorkers = workerRefs
while (busyWorkers.nonEmpty) {
Thread.sleep(300)
busyWorkers = busyWorkers.filterNot(_.askSync[Boolean](IsWorkerReadyToStop))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for working on this, @Ngone51 . BTW, this seems to cause an infinite loop if a worker cannot response properly for some reasons. Can we use a finite for statement instead of while statement to avoid any potential issues?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, thanks!

// `ApplicationFinished`) from the Master before we shutdown the workers' rpcEnvs.
// Otherwise, we could hit "RpcEnv already stopped" error.
var busyWorkers = workerRefs
while (busyWorkers.nonEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is wrong, what if worker didn't reply and throw RPCTimeoutException? Actually I doubt whether it worth to make so many changes to avoid an error message that doesn't hurt anything.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I doubt whether it worth to make so many changes to avoid an error message that doesn't hurt anything.

Hmm...yet this is the way of least changes for a determined fix I can think of.

Do you have any other ideas?

@SparkQA
Copy link

SparkQA commented Jun 16, 2020

Test build #124084 has finished for PR 28746 at commit a3ed5c1.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 9, 2020

Test build #125484 has started for PR 28746 at commit 6cc556a.

@Ngone51
Copy link
Member Author

Ngone51 commented Jul 9, 2020

Hi all, I'm recalling this PR because I just find that there's another developer @sarutak meet the same issue(SPARK-32236). It has less impact for end-users but still occasionally caught by developers. So I think it's still better to fix it.

And since we've already experienced many discussions here, so I think it may be better to continue here(thanks sarutak!)

Base on all the above dissuasions, I decide to move this PR back to the original solution, which simply sleeps one more second in the stop of LocalSparkCluster. Both sarutak and I think it's the simplest compromise for the issue. On the other hand, as you know, any possible deterministic solutions me and @gerashegalov have tried can introduce overhead changes.

The simple solution won't introduce any side effects. Besides, it works most times practically, though it's not deterministic theoretically.

cc: @jiangxb1987 @dongjoon-hyun

@zsxwing
Copy link
Member

zsxwing commented Jul 9, 2020

@Ngone51
Copy link
Member Author

Ngone51 commented Jul 10, 2020

How about changing this to a debug log like this?

Oh, I see. I believe we can also do that according to the change and discusstions in #18547.

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125624 has started for PR 28746 at commit 4c8ceec.

@SparkQA
Copy link

SparkQA commented Jul 10, 2020

Test build #125619 has finished for PR 28746 at commit 1c5e441.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @Ngone51 . This mitigates the currently situation in a better way. BTW, could you adjust the PR title because this doesn't completely fix "RpcEnv already stopped" error? This PR still shows DEBUG-level logs, doesn't it?

@SparkQA
Copy link

SparkQA commented Jul 12, 2020

Test build #125694 has finished for PR 28746 at commit 4c8ceec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51 Ngone51 changed the title [SPARK-31922][CORE] Fix "RpcEnv already stopped" error when exit spark-shell with local-cluster mode [SPARK-31922][CORE] logDebug "RpcEnv already stopped" error on LocalSparkCluster shutdown Jul 13, 2020
@Ngone51
Copy link
Member Author

Ngone51 commented Jul 13, 2020

@dongjoon-hyun yea, updated. Thanks!

@SparkQA
Copy link

SparkQA commented Jul 13, 2020

Test build #125743 has finished for PR 28746 at commit 59a0595.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Jul 21, 2020

kindly ping @dongjoon-hyun @jiangxb1987 @zsxwing Does it looks ok now?

@dongjoon-hyun
Copy link
Member

Oops. Sorry, @Ngone51 .

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Merged to master. Thank you, @Ngone51 and all.

@dongjoon-hyun
Copy link
Member

Also, @zsxwing and @jiangxb1987 . I skimmed the previous discussion and I believe @Ngone51 addressed all your concerns. If I missed something, please comment here. We can make follow-up. Thanks!

@Ngone51
Copy link
Member Author

Ngone51 commented Jul 22, 2020

Thank you @dongjoon-hyun and all!

@zsxwing
Copy link
Member

zsxwing commented Jul 22, 2020

LGTM as well!

@gerashegalov
Copy link
Contributor

Confirmed that the fix is effective on my setup too. Thanks @Ngone51 !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
6 participants