-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12967][Netty] Avoid NettyRpc error message during sparkContext shutdown #10881
Conversation
Test build #49924 has finished for PR 10881 at commit
|
@@ -2183,6 +2183,13 @@ object SparkContext extends Logging { | |||
} | |||
|
|||
/** | |||
* Return SparkContext | |||
*/ | |||
def get(): SparkContext = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this meant to be part of the public API or should this be private?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@holdenk, figured this may be a good utility to have in SparkContext alongside getOrCreate. Unless you think otherwise?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No please avoid introducing new random APIs in an unrelated pull request.
Also it would be great to not have the lower level depend on an upper level (e.g. network rpc depending on sparkcontext).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rxin Modified it to private. Any ideas on how to fix this without using SparkContext?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could potentially just drop a message if it cannot be sent and log a warning?
@@ -2183,6 +2183,13 @@ object SparkContext extends Logging { | |||
} | |||
|
|||
/** | |||
* Return SparkContext | |||
*/ | |||
private[spark] def get(): SparkContext = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's best to just remove this get method. We should have the upper layer (i.e. sparkcontext) send an explicit signal via function calls to the lower layer (e.g. rpc).
@zsxwing might know more about whether this is needed at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, there is already a such explicit signal: RpcEnv.shutdown
. Previously, I was worried about the immature NettyRpcEnv. So the stack trace of the error is still logged to help debug after RpcEnv is shutdown. However, since NettyRpcEnv
becomes mature, we can remove the stack trace.
@nishkamravi2 I suggest that you add a new exception RpcEnvStopException that extends IllegalStateException. Then you can use RpcEnvStopException in this line:
After that, there are two places need to handle this exception:
and only log a simple message
If it's RpcEnvStopException, log a simple message. |
Thanks for the pointers @zsxwing. Makes sense to just log an error message. Instead of throwing an exception from postMessage and logging it in send, what if we just log the warning in postMessage directly? |
If this is called from |
Ok, makes sense. Btw, we also want to mask the exception being thrown at line 161 (I see that in the logs as well). |
@zsxwing Updated the PR after some testing. Most of the stack trace in the log was coming from postToAll RPC for RemoteProcessDisconnected, so I have removed the stack trace from the callback. |
Test build #49979 has finished for PR 10881 at commit
|
LGTM. |
@@ -106,7 +106,7 @@ private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) extends Logging { | |||
val iter = endpoints.keySet().iterator() | |||
while (iter.hasNext) { | |||
val name = iter.next | |||
postMessage(name, message, (e) => logWarning(s"Message $message dropped.", e)) | |||
postMessage(name, message, (e) => logWarning(s"Message $message dropped.")) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you update the message as s"Message $message dropped. ${e.getMessage}"
? It's worth to add the exception message here since there are two types of exceptions.
Looks pretty good. Will merge it once you update the warning message. |
Updated the warning message. Thanks. |
Test build #50166 has finished for PR 10881 at commit
|
@nishkamravi2 thanks, merging to master |
…ntext shutdown If there's an RPC issue while sparkContext is alive but stopped (which would happen only when executing SparkContext.stop), log a warning instead. This is a common occurrence. vanzin Author: Nishkam Ravi <nishkamravi@gmail.com> Author: nishkamravi2 <nishkamravi@gmail.com> Closes apache#10881 from nishkamravi2/master_netty. (cherry picked from commit bae3c9a)
This bug still exists in latest Spark 1.6.2. How about merging it to branch-1.6? @nishkamravi2 @zsxwing |
could i know how to merge the updated code to my project to avoid this error? |
@1236897 Just try |
actually, I face the issue that "RpcEnv already stopped" when i call sparkContxt.stop in the end of the program. I add the spark1.6 through maven pom. could i know how to fix this issue ? |
@1236897 You can check out Spark 1.6.2 tag and apply this patch. Then build the Spark and use this one to submit your Spark application. You can still use the Spark maven artifact to build your application. |
@zsxwing Thank you for your reply and sorry to disturb you, cos this project is so import for me, I descrise what i need to do. Firistly, check out the spark 1.6 from Github. Secondly, use the git cherry-pick to apply this patch and compile the code through ecplise. Lastly, add the output jar to maven repository and add this jar as dependency to my project. right ? |
No. Take a look at this page about how to build Spark: http://spark.apache.org/docs/1.6.2/building-spark.html
You don't need to. You can still use the official maven artifact to build your application. However, you need to use the new |
@1236897 If you don't want to build Spark, it's fine to just catch this special exception thrown from |
@ zsxwing if I just ignore it, will it avoid my issue? my issue is I need complete my job within 5 mins. but the issue "RpcEnv already stopped" waste a lot of time to disconnect and make my job beyond my expect. |
Did you measure how long to stop SparkContext? I don't think it will take several minutes. |
cos I watch the monitoring page. my last step is save as parquet. but it page monitoring page stage show completed and the job keep running, after few minutes, the job is complete and throw the Expention about "RpcEnv already stopped" , that's the reason I assume the Excetpion lead to the long RUNNING and not Complete. PS: the job use 40 executors in the 200 servers Env. |
@1236897 The Spark job status from RUNNING to COMPLETE happens before |
HI, @zsxwing. Is there any way to reproduce this issue? |
If there's an RPC issue while sparkContext is alive but stopped (which would happen only when executing SparkContext.stop), log a warning instead. This is a common occurrence.
@vanzin