-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-21219][Core] Task retry occurs on same executor due to race condition with blacklisting #18427
[SPARK-21219][Core] Task retry occurs on same executor due to race condition with blacklisting #18427
Conversation
…nding list and updating black list state.
Jenkins this is ok to test |
Test build #78659 has finished for PR 18427 at commit
|
Please update the title to
|
Test build #78661 has finished for PR 18427 at commit
|
retest this please |
Test build #79224 has finished for PR 18427 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the fix is correct, left a few comments.
// the same executor that it was intended to be black listed from. | ||
val conf = new SparkConf(). | ||
set(config.BLACKLIST_ENABLED, true). | ||
set(config.MAX_TASK_ATTEMPTS_PER_EXECUTOR, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value of config.MAX_TASK_ATTEMPTS_PER_EXECUTOR
is 1, so we don't have to set it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added it to make the test code (configuration) inputs more explicit, but I can remove if it's a default unlikely to change.
val clock = new ManualClock | ||
val mockListenerBus = mock(classOf[LiveListenerBus]) | ||
val blacklistTracker = new BlacklistTracker(mockListenerBus, conf, None, clock) | ||
val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(blacklistTracker)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we using SystemClock
for taskSetManager
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems all the tests in this file are using ManualClock so was following convention here. This test doesn't validate anything specifically dependent on the clock/time.
// Simulate an out of memory error | ||
val e = new OutOfMemoryError | ||
taskSetManagerSpy.handleFailedTask( | ||
taskDesc.get.taskId, TaskState.FAILED, new ExceptionFailure(e, Seq())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: ExceptionFailure
is a case class, so you may use:
val e = ExceptionFailure("a", "b", Array(), "c", None)
taskSetManagerSpy.handleFailedTask(taskDesc.get.taskId, TaskState.FAILED, endReason)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay
Test build #79347 has finished for PR 18427 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, cc @cloud-fan
LGTM, merging to master! |
…ndition with blacklisting There's a race condition in the current TaskSetManager where a failed task is added for retry (addPendingTask), and can asynchronously be assigned to an executor *prior* to the blacklist state (updateBlacklistForFailedTask), the result is the task might re-execute on the same executor. This is particularly problematic if the executor is shutting down since the retry task immediately becomes a lost task (ExecutorLostFailure). Another side effect is that the actual failure reason gets obscured by the retry task which never actually executed. There are sample logs showing the issue in the https://issues.apache.org/jira/browse/SPARK-21219 The fix is to change the ordering of the addPendingTask and updatingBlackListForFailedTask calls in TaskSetManager.handleFailedTask Implemented a unit test that verifies the task is black listed before it is added to the pending task. Ran the unit test without the fix and it fails. Ran the unit test with the fix and it passes. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Eric Vandenberg <ericvandenberg@fb.com> Closes apache#18427 from ericvandenbergfb/blacklistFix.
…ndition with blacklisting There's a race condition in the current TaskSetManager where a failed task is added for retry (addPendingTask), and can asynchronously be assigned to an executor *prior* to the blacklist state (updateBlacklistForFailedTask), the result is the task might re-execute on the same executor. This is particularly problematic if the executor is shutting down since the retry task immediately becomes a lost task (ExecutorLostFailure). Another side effect is that the actual failure reason gets obscured by the retry task which never actually executed. There are sample logs showing the issue in the https://issues.apache.org/jira/browse/SPARK-21219 The fix is to change the ordering of the addPendingTask and updatingBlackListForFailedTask calls in TaskSetManager.handleFailedTask Implemented a unit test that verifies the task is black listed before it is added to the pending task. Ran the unit test without the fix and it fails. Ran the unit test with the fix and it passes. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Eric Vandenberg <ericvandenbergfb.com> Closes #18427 from ericvandenbergfb/blacklistFix. ## What changes were proposed in this pull request? This is a backport of the fix to SPARK-21219, already checked in as 96d58f2. ## How was this patch tested? Ran TaskSetManagerSuite tests locally. Author: Eric Vandenberg <ericvandenberg@fb.com> Closes #18604 from jsoltren/branch-2.2.
…ndition with blacklisting There's a race condition in the current TaskSetManager where a failed task is added for retry (addPendingTask), and can asynchronously be assigned to an executor *prior* to the blacklist state (updateBlacklistForFailedTask), the result is the task might re-execute on the same executor. This is particularly problematic if the executor is shutting down since the retry task immediately becomes a lost task (ExecutorLostFailure). Another side effect is that the actual failure reason gets obscured by the retry task which never actually executed. There are sample logs showing the issue in the https://issues.apache.org/jira/browse/SPARK-21219 The fix is to change the ordering of the addPendingTask and updatingBlackListForFailedTask calls in TaskSetManager.handleFailedTask Implemented a unit test that verifies the task is black listed before it is added to the pending task. Ran the unit test without the fix and it fails. Ran the unit test with the fix and it passes. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Eric Vandenberg <ericvandenbergfb.com> Closes apache#18427 from ericvandenbergfb/blacklistFix. ## What changes were proposed in this pull request? This is a backport of the fix to SPARK-21219, already checked in as 96d58f2. ## How was this patch tested? Ran TaskSetManagerSuite tests locally. Author: Eric Vandenberg <ericvandenberg@fb.com> Closes apache#18604 from jsoltren/branch-2.2.
What changes were proposed in this pull request?
There's a race condition in the current TaskSetManager where a failed task is added for retry (addPendingTask), and can asynchronously be assigned to an executor prior to the blacklist state (updateBlacklistForFailedTask), the result is the task might re-execute on the same executor. This is particularly problematic if the executor is shutting down since the retry task immediately becomes a lost task (ExecutorLostFailure). Another side effect is that the actual failure reason gets obscured by the retry task which never actually executed. There are sample logs showing the issue in the https://issues.apache.org/jira/browse/SPARK-21219
The fix is to change the ordering of the addPendingTask and updatingBlackListForFailedTask calls in TaskSetManager.handleFailedTask
How was this patch tested?
Implemented a unit test that verifies the task is black listed before it is added to the pending task. Ran the unit test without the fix and it fails. Ran the unit test with the fix and it passes.
Please review http://spark.apache.org/contributing.html before opening a pull request.