Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14228][CORE][YARN] Lost executor of RPC disassociated, and occurs exception: Could not find CoarseGrainedScheduler or it has been stopped #19741

Closed
wants to merge 5 commits into from

Conversation

devaraj-kavali
Copy link

What changes were proposed in this pull request?

I see the two instances where the exception is occurring.

Instance 1:

17/11/10 15:49:32 ERROR util.Utils: Uncaught exception in thread driver-revive-thread
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
        at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140)
        at org.apache.spark.rpc.netty.NettyRpcEnv.send(NettyRpcEnv.scala:187)
        at org.apache.spark.rpc.netty.NettyRpcEndpointRef.send(NettyRpcEnv.scala:521)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(CoarseGrainedSchedulerBackend.scala:125)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(CoarseGrainedSchedulerBackend.scala:125)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1$$anonfun$run$1.apply$mcV$sp(CoarseGrainedSchedulerBackend.scala:125)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1344)
        at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverEndpoint$$anon$1.run(CoarseGrainedSchedulerBackend.scala:124)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

In CoarseGrainedSchedulerBackend.scala, driver-revive-thread starts with DriverEndpoint.onStart() and keeps sending the ReviveOffers messages periodically till it gets shutdown as part DriverEndpoint.onStop(). There is no proper coordination between the driver-revive-thread(shutdown) and the RpcEndpoint unregister, RpcEndpoint unregister happens first and then driver-revive-thread shuts down as part of DriverEndpoint.onStop(), In-between driver-revive-thread may try to send the ReviveOffers message which is leading to the above exception.

To fix this issue, this PR moves the shutting down of driver-revive-thread to CoarseGrainedSchedulerBackend.stop() which executes before the DriverEndpoint unregister.

Instance 2:

17/11/10 16:31:38 ERROR cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Error requesting driver to remove executor 1 for reason Executor for container container_1508535467865_0226_01_000002 exited because of a YARN event (e.g., pre-emption) and not because of an error in the running job.
org.apache.spark.SparkException: Could not find CoarseGrainedScheduler.
        at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160)
        at org.apache.spark.rpc.netty.Dispatcher.postLocalMessage(Dispatcher.scala:135)
        at org.apache.spark.rpc.netty.NettyRpcEnv.ask(NettyRpcEnv.scala:229)
        at org.apache.spark.rpc.netty.NettyRpcEndpointRef.ask(NettyRpcEnv.scala:516)
        at org.apache.spark.rpc.RpcEndpointRef.ask(RpcEndpointRef.scala:63)
        at org.apache.spark.scheduler.cluster.YarnSchedulerBackend$YarnSchedulerEndpoint$$anonfun$receive$1.applyOrElse(YarnSchedulerBackend.scala:269)
        at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:117)
        at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:205)
        at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:101)
        at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:221)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Here YarnDriverEndpoint tries to send remove executor messages after the Yarn scheduler backend service stop, which is leading to the above exception. To avoid the above exception,

  1. We may add a condition(which checks whether service has stopped or not) before sending executor remove message
  2. Add a warn log message in onFailure case when the service is already stopped

In this PR, chosen the 2) option which adds a log message in the case of onFailure without the exception stack trace since the option 1) would need to to go through for every remove executor message.

How was this patch tested?

I verified it manually, I don't see these exceptions with the PR changes.

exception: Could not find CoarseGrainedScheduler or it has been stopped
@jiangxb1987
Copy link
Contributor

cc @jerryshao

@jerryshao
Copy link
Contributor

From my understanding, the above exception seems no harm to the Spark application, just running into some threading corner case during stop, am I right?

@devaraj-kavali
Copy link
Author

Thanks @jerryshao for looking into this.

From my understanding, the above exception seems no harm to the Spark application, just running into some threading corner case during stop, am I right?

Yeah, It doesn't cause any functional problem but these exceptions create suspect in the user's mind that some thing went wrong with the spark application, and then they would start diagnosing/debugging the cause which wastes lot of time or creates wrong impression. I think this should be fixed to avoid the suspect/confusion.

@@ -268,8 +268,13 @@ private[spark] abstract class YarnSchedulerBackend(
logWarning(reason.toString)
driverEndpoint.ask[Boolean](r).onFailure {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if using ask here is doing anything useful at all. Instead, using send would be cheaper, and the handler for RemoveExecutor can log whatever errors it runs into instead.

Then this could just be:

if (!stopped.get()) {
  driverEndpoint.send(...)
}

(And the driver endpoint will probably need a minor change too.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vanzin for the review.

I think it is a good suggestion, there are other places where the RemoveExecutor message is sending using the ask, are you suggesting to change those as well?

(And the driver endpoint will probably need a minor change too.)

You mean moving the case RemoveExecutor(executorId, reason) from receiveAndReply to receive?

Copy link
Contributor

@jerryshao jerryshao Nov 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think when you use send, all the related things as you mentioned above should be changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC there are two different RemoveExecutor messages in the code, as confusing as that may be.

But if this one is used in multiple places then it's probably not worth changing right now, unless you're up for verifying the return value is not needed in the other places.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see there are multiple places this message is being used, but all of them are just logging for the failure. I am thinking this logging may be useful to diagnose the failures in some cases.

Copy link
Contributor

@vanzin vanzin Nov 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you point out to a couple of those? I only see this RemoveExecutor being handled in CoarseGrainedSchedulerBackend.scala. You could just log any errors there (the RPC layer will already log any communication issues).

(All the references in the block manager code are for a different RemoveExecutor.)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see these places other than CoarseGrainedSchedulerBackend.scala and the one present in the PR.

RemoveExecutor(executorId, new ExecutorLossReason(reason))


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, as you said, there's only logging going on. There's only two things that can happen:

  • the handler throws an exception instead of replying
  • the RPC layer hits an error

The first can be logged at the handler (it not already logged by the RPC layer). The second is already logged by the RPC layer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds fine, will update the PR with the changes from ask to send for RemoveExecutor.

@vanzin
Copy link
Contributor

vanzin commented Nov 15, 2017

ok to test

@SparkQA
Copy link

SparkQA commented Nov 15, 2017

Test build #83908 has finished for PR 19741 at commit 4184efa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 15, 2017

Test build #83907 has finished for PR 19741 at commit 4184efa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 17, 2017

Test build #83947 has finished for PR 19741 at commit 1fae5ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 20, 2017

Test build #84007 has finished for PR 19741 at commit 930ba79.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Dec 4, 2017

retest this please

@SparkQA
Copy link

SparkQA commented Dec 5, 2017

Test build #84445 has finished for PR 19741 at commit 930ba79.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Dec 5, 2017

@devaraj-kavali looks like you'll need to update the kubernetes backend now...

@SparkQA
Copy link

SparkQA commented Dec 6, 2017

Test build #84511 has finished for PR 19741 at commit 5419cdd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Dec 6, 2017

LGTM, merging to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants