-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-19147][CORE] Gracefully handle error in task after executor is stopped #25759
Conversation
Wait, why would we be creating a new client after the executor is shut down? |
1c1011d
to
b2510fb
Compare
Here is the scene, when the executor is killed before shuffle process, but the task was created and ready to fetch blocks. Then, the NPE will occur if task try to create a new client. |
// and transform it to InterruptedException which can be processed by Executor. | ||
// See SPARK-19147 | ||
if (workerGroup == null) { | ||
throw new InterruptedException(e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is still going to generate an exception in the logs, no? should it just be a log warning?
This is I think too indirect. Why not throw IllegalStateException
in createClient
instead in this case and catch for it specifically?
Also please improve the title of this PR |
@srowen thanks for the comments, I'll update the pr later. |
@colinmjj, also please fill other items in PR description. |
Test build #4871 has finished for PR 25759 at commit
|
b2510fb
to
4add69b
Compare
@srowen The patch is updated, for the exception from task after executor.stop, add exception process to deal with it. |
try { | ||
// NullPointerException occurred if factory.createClient() after factory.close() | ||
factory.createClient(TestUtils.getLocalHost(), server1.getPort()); | ||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just catch NullPointerException and ignore it?
But, maybe createClient should throw a better exception to begin with?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test shows exception is occurred if TransportClientFactory.createClient() called after TransportClientFactory.close().
Agree to throw a better exception and the patch is updated. There should be an IOException now.
// The exception will be thrown from the task becauseof the unexpected status, | ||
// see: SPARK-19147, here is to process the exception after executor.stop | ||
// as the excepted exception. | ||
case t: Throwable if !isLocal && env.isStopped => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CC @jerryshao or @squito perhaps
My question is, if the executor is shut down, can you even report metrics etc, or is it meaningful?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you might succeed, as this will race against the stopping the executor. But you're very likely to trigger more exceptions from execBackend.statusUpdate
, so it probably doesn't make sense to try, especially if the whole point of this change is to cut down on scary error msgs during shutdown.
btw I think env.isStopped
will need to be volatile
for this to work reliably.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen @squito , thanks for the comments, I check the code again and make clearly how metrics & heartbeat work. You're right, report metrics is meaningless after executor.close(), because heartbeat won't work.
Update the pr and the exception will be processed in "case t: Throwable =>" part with log only.
@@ -246,6 +246,46 @@ class ExecutorSuite extends SparkFunSuite | |||
heartbeatZeroAccumulatorUpdateTest(false) | |||
} | |||
|
|||
test("SPARK-19147: Gracefully handle error in task after executor is stopped") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The purpose of the change is really to avoid stack traces, not assert about the particular error. Would this pass even before this change? I'm just wondering if this is worth testing.
What may be worth testing is whether metrics are updated, which is the real possible behavior change here? or would they already be reported in case of an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the case after I make clearly how metrics works, thanks for review.
8691dc1
to
cb21ae5
Compare
case _: NotSerializableException => | ||
// t is not serializable so just send the stacktrace | ||
val ef = new ExceptionFailure(t, accUpdates, false).withAccums(accums) | ||
if (env.isStopped) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking OK overall, to me. You might be able to avoid most of the diff due to indentation by only adding a single case:
case t: Throwable if env.isStopped =>
logError(...)
case t: Throwable =>
// unchanged
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
cb21ae5
to
8fa404c
Compare
Test build #4876 has finished for PR 25759 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM.
Merged to master |
Hi, All. |
Although it's 'just' a cosmetic issue with how errors are logged in a sort of corner case, it's also a minor change. I'm fine if you want to back-port to 2.4. |
Thanks, @srowen . I'll backport this to branch-2.4. |
… stopped ### What changes were proposed in this pull request? TransportClientFactory.createClient() is called by task and TransportClientFactory.close() is called by executor. When stop the executor, close() will set workerGroup = null, NPE will occur in createClient which generate many exception in log. For exception occurs after close(), treated it as an expected Exception and transform it to InterruptedException which can be processed by Executor. ### Why are the changes needed? The change can reduce the exception stack trace in log file, and user won't be confused by these excepted exception. ### Does this PR introduce any user-facing change? N/A ### How was this patch tested? New tests are added in TransportClientFactorySuite and ExecutorSuite Closes #25759 from colinmjj/spark-19147. Authored-by: colinma <colinma@tencent.com> Signed-off-by: Sean Owen <sean.owen@databricks.com> (cherry picked from commit 076186e) Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
TransportClientFactory.createClient() is called by task and TransportClientFactory.close() is called by executor.
When stop the executor, close() will set workerGroup = null, NPE will occur in createClient which generate many exception in log.
For exception occurs after close(), treated it as an expected Exception
and transform it to InterruptedException which can be processed by Executor.
Why are the changes needed?
The change can reduce the exception stack trace in log file, and user won't be confused by these excepted exception.
Does this PR introduce any user-facing change?
N/A
How was this patch tested?
New tests are added in TransportClientFactorySuite and ExecutorSuite