-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-18967][SCHEDULER] compute locality levels even if delay = 0 #16376
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Before this change, with delay scheduling off, spark would effectively ignore locality preferences for bulk scheduling. This ensures that locality preferences are used when multiple offers are made simultaneously, and adds a test case for it.
|
At first glance, this looks like the right change - but I might be missing something. |
|
Test build #70494 has finished for PR 16376 at commit
|
|
It'd be nice to check whether the test case attached to SPARK-1937 still applies. Reading the description of that bug, it seems related not only to this but to other things you've been playing with recently. |
|
Test build #70495 has finished for PR 16376 at commit
|
|
The change looks good to me, although I still want to make sure I understand it correctly. Before the change, a locality level is invalid if it has delay=0. The patch changes that and makes such locality level valid too. So during scheduling, it gives us a chance to try the smaller levels first, e.g. PROCESS_LOCAL, NODE_LOCAL, otherwise we'll just schedule the task to any executors available. And meanwhile, since the wait time for the smaller levels are 0, we'll quickly fall back to ANY if a satisfying executor is not available. So that the delay=0 is still enforced. Is this correct? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good. Can you also update the comment on myLocalityLevels in TaskSetManager.scala to clarify what it's used for? It currently mentions delay scheduling, which is no longer true after this PR.
Similar to @vanzin, I was a little nervous about this change so tracked down the origin of it and why it was added as part of #892 (for SPARK-1937). It looks like the bug you've fixed was added towards the end of the review process for #892: it was added in the 2nd to last commit in that PR, and apparently as a result of the June 10th comment beginning with "I've updated...". That comment implies that this bug was introduced as a last-minute opportunistic performance improvement (and not as part of fixing the bug that PR was addressing). Specifically, that comment/commit changed TaskSchedulerImpl to iterate only though TSM.myLocalityLevels, rather than through all of the locality levels. So, you could also fix this issue by un-doing that change, but I think the approach you took here is more intuitive (and results in slightly better performance, since it does avoid iterating over levels for which nothing will happen, which is exactly what the original buggy change was intended to do).
| @@ -338,7 +338,7 @@ private[spark] class TaskSchedulerImpl private[scheduler]( | |||
| }.getOrElse(offers) | |||
|
|
|||
| // Randomly shuffle offers to avoid always placing tasks on the same set of workers. | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove this comment now that it's duplicated in the shuffleOffers method description
|
@lirui-intel yes, that's consistent with my understanding. The TaskSetManager still checks that it's not going beyond the currently-allowed locality level here, so locality levels > 0 will still be enforced, and the TaskSchedulerImpl is still checking all of the locality levels in myLocalityLevels each time it has an offer, so it will still schedule non-local tasks when the wait is 0 and no local tasks can be scheduled. Also, my comment above may be helpful for understanding why this bug was originally introduced. |
|
@kayousterhout I see. Thanks for the explanations :) |
|
Thanks for the feedback. I've updated the comment on Also I updated the tests slightly. To ensure that we're really testing no delay, I updated the tests to use a manual clock which never advances. And also to address @lirui-intel 's concern, i also added a test that we still schedule at non-preferred locations immediately with delay scheduling off. (Somewhat obvious in current implementation since we always add |
| * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip | ||
| * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors). | ||
| */ | ||
| var myLocalityLevels = computeValidLocalityLevels() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I was figuring out the purpose of this for what to put in the comment, I made a couple of observations:
-
For each executor we add or remove, its an O(numExecutors) operation to update the locality levels. So overall its an O(numExecutors^2) to add a bunch. Minor on small clusters, but I wonder if this is an issue when you're using dynamic allocation and going up and down to 1000s of executors. Its all happening with a lock on the
TaskSchedulerImpltoo. -
Though we recompute valid locality levels as executors come and go, we do not as tasks complete. That's not a problem -- as offers come in, we still go through the right task lists. But it does make me wonder whether this business of updating the locality levels for the current set of executors is useful, and instead we should just always use all levels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(1) does seem like an issue. I also mostly agree for (2), since the logic of avoiding unnecessarily waiting for delay timeouts is already handled (separately from the myLocalityLevels calculation) here. My only hesitation is that myLocalityLevels does allow avoiding the delay timeout in cases where there are tasks have constraints to run on executors that haven't been granted to the application, so that use case seems like it might merit keeping the code (also, if you agree, can you update the myLocalityLevels comment?). In any case I'd do this in a separate PR.
kayousterhout
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. New test looks great!
| * This allows a performance optimization, of skipping levels that aren't relevant (eg., skip | ||
| * PROCESS_LOCAL if no tasks could be run PROCESS_LOCAL for the current set of executors). | ||
| */ | ||
| var myLocalityLevels = computeValidLocalityLevels() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(1) does seem like an issue. I also mostly agree for (2), since the logic of avoiding unnecessarily waiting for delay timeouts is already handled (separately from the myLocalityLevels calculation) here. My only hesitation is that myLocalityLevels does allow avoiding the delay timeout in cases where there are tasks have constraints to run on executors that haven't been granted to the application, so that use case seems like it might merit keeping the code (also, if you agree, can you update the myLocalityLevels comment?). In any case I'd do this in a separate PR.
|
Test build #70937 has finished for PR 16376 at commit
|
|
Test build #70936 has finished for PR 16376 at commit
|
|
Jenkins, retest this please |
|
Test build #71084 has finished for PR 16376 at commit
|
| val sc: SparkContext, | ||
| val maxTaskFailures: Int, | ||
| blacklistTrackerOpt: Option[BlacklistTracker], | ||
| val blacklistTrackerOpt: Option[BlacklistTracker], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noticed that this doesn't seem to be used publicly anywhere -- does it need to be a val? Should it be private val?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh good point, this change is not necessary (must have been part of another change which I later reverted, sorry)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doh, actually this is used in tests. I override createTaskSetManager to use my manual clock. Another alternative is to allow the clock to get passed in to the TaskSCheudlerImpl constructor, but that ends up being a bigger code change.
I'll make it private[scheduler] val, slightly tighter than the implicit private[spark] it would be otherwise.
|
Test build #71137 has finished for PR 16376 at commit
|
| } | ||
| } | ||
|
|
||
| test("With delay scheduling off, tasks can be run at any locality level immediately") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry last thing: just realized -- does this test need to first submit a local resource offer? That makes sure that the local executor is considered alive. Otherwise, process local won't be in the set of allowed locality levels because of the code here: https://github.com/apache/spark/pull/16376/files#diff-bad3987c83bd22d46416d3dd9d208e76R966, which makes this test somewhat less effective if I understand correctly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, you are absolutely right. I've updated and also added a check to make sure tsm includes lower locality levels.
|
Test build #71141 has finished for PR 16376 at commit
|
|
LGTM |
|
LGTM, thanks @squito ! |
|
Test build #71156 has finished for PR 16376 at commit
|
|
Jenkins, retest this please |
|
Test build #71223 has finished for PR 16376 at commit
|
|
@squito I just noticed this hasn't been merged. Is this good to go pending tests passing again? |
|
Jenkins, retest this please |
|
Test build #72477 has finished for PR 16376 at commit
|
|
yes, I think this is ready (I just noticed a couple of minor nits with a fresh read but no real changes) |
|
Test build #72484 has finished for PR 16376 at commit
|
|
Awesome always enthusiastic about fixing minor nits!! I merged this into master. I didn't merge it into 2.1 but I don't feel strongly about it. |
## What changes were proposed in this pull request? Before this change, with delay scheduling off, spark would effectively ignore locality preferences for bulk scheduling. With this change, locality preferences are used when multiple offers are made simultaneously. ## How was this patch tested? Test case added which fails without this change. All unit tests run via jenkins. Author: Imran Rashid <irashid@cloudera.com> Closes apache#16376 from squito/locality_without_delay.
What changes were proposed in this pull request?
Before this change, with delay scheduling off, spark would effectively
ignore locality preferences for bulk scheduling. With this change,
locality preferences are used when multiple offers are made
simultaneously.
How was this patch tested?
Test case added which fails without this change. All unit tests run via jenkins.