-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13279] Remove unnecessary duplicate check in addPendingTask fu… #11167
Conversation
I don't quite see why this solves a lock problem. Should this be a set? |
As you can see from the jstack of the driver http://pastebin.com/m8CP6VMv. The dag-scheduler-event-loop thread has taken a lock and is spending a lot of time in the addPendingTask function. For each task added, It is iterating over the list of tasks to check for duplicates. Which becomes an o(n2) operation and when the number of tasks is huge, it takes more than 5 minutes. As mentioned in the comment, the addPendingTask function does not really need to check for duplicates because dequeueTaskFromList will skip already running tasks. If we remove the duplicate check from addPendingTask function, then the time period for which the lock is held is very short and things are working fine. We can not make this as a set because we treat the list of pending tasks as a stack, please see https://github.com/sitalkedia/spark/blob/fix_stuck_driver/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L113 . Please note that this is a regression from Spark 1.5 and is introduced in [SPARK-11163] |
OK, but the JIRA describes a deadlock, or at least that's how it reads. |
Sorry, I did not make it clear. Its not a deadlock, the lock is held for a very long time. |
/** | ||
* Add a task to all the pending-task lists that it should be on. | ||
* Note that it's okay if we add a task to the same queue twice because | ||
* dequeueTaskFromList will skip already-running tasks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move your new comment to the comment above pendingTasksForExecutor? I'd change that comment to say something like:
// Set of pending tasks for each executor. These collections are actually
// treated as stacks, in which new tasks are added to the end of the
// ArrayBuffer and removed from the end. This makes it faster to detect
// tasks that repeatedly fail because whenever a task failed, it is put
// back at the head of the stack. These collections may contain duplicates
// for two reasons:
// (1) : Tasks are only removed lazily; when a task is launched, it remains in all the pending lists except
// the one that it was launched from.
// (2): Tasks may be re-added to these lists multiple times as a result
// of failures.
// Duplicates are handled in dequeueTaskFromList, which ensures that a
// task hasn't already started running before launching it.
Thanks for fixing this! You're right that the change you linked to is broken; when I removed the readding parameter, I should have killed the if-statement. I updated the JIRA description to describe this problem somewhat better and link to the offending change. This should be merged into 1.6 when it's ready since it's a regression. |
f6b49de
to
2302c9d
Compare
2302c9d
to
3fe1af8
Compare
Thanks @kayousterhout. I updated the comment accordingly. |
Jenkins, test this please |
I just merged this but then realized Jenkins hadn't run yet... reverted the pushed version but will re-merge when Jenkins passes |
I reverted this since it hasn't passed tests yet. |
@sitalkedia can you re-open this? (It was closed automatically when inadventently merged) I don't think we can test it unless it's open |
@kayousterhout I am not seeing an option to reopen this pull request. Should I create a new pull request? |
That would be great thanks!! Sorry for the trouble caused by my rushed merging! |
No problem. Created a new pull request #11175 |
if (!list.contains(index)) { | ||
list += index | ||
} | ||
list += index | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for not getting to this party until after the merge, but it strikes me that addTo
is now only making a negative contribution toward understanding the code -- mostly because the "...if it's not already there" comment is now wrong. I don't see why the handful of uses of addTo
shouldn't just be replaced with, e.g.:
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're in luck because I accidentally merged before Jenkins got to it, so we reverted it pending tests. There's a new PR here: #11175
Would you mind sticking this comment there? Agree with your sentiment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR : #11175 according to @markhamstra 's comment.
…nction