New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29994][CORE] Add WILDCARD task location #26633
Conversation
* preferred locations to indicate that the task can be assigned to any host if it cannot get any | ||
* desired location immediately. | ||
*/ | ||
private [spark] case class WildcardLocation() extends TaskLocation { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case object/object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea we can use object
here.
Test build #114270 has finished for PR 26633 at commit
|
Test build #114415 has finished for PR 26633 at commit
|
From my POV this is a shortcut to allow some tasks to get rid of the 3 seconds locality wait time limitation from delay scheduling. This change looks good to me because it avoids to change the global locality wait time, thus the influence can be restricted in a desired way. One concern that I shall rise is, how shall we restrict the WILDCARD locality being used less properly? i.e, include WILDCARD into preferedLocations where the penalty of locality miss is nontrivial. Also cc @squito @tgravescs |
I don't think there is a need to restrict it. Every RDD should "know" their own locality preference as well as the penalty for a locality miss. If we ever needed to make sure the WILDCARD is being used properly, we would have to worry about whether other regular preferred locations are returned correctly and truly reflect their best possible locality choice. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Could we please hold this PR for a couple more days to allow more eyes on it, since it's a critical improvement to Spark Core? |
Sure, @jiangxb1987 ! |
I'm not seeing how this is different then any other task in Spark? In my opinion the locality fall back in Spark is broken. I generally recommend people to set the locality delay to 0 on the node side because you can get very weird results where tasks wait way to long to be scheduled. On most networks these days its better to just run the task somewhere then wait for locality. I realize though there are other conditions this was added for and I've never spent the time to go look at a proper solution for it. I'm assuming the intention is to just have LocalShuffledRowRDD always add it to the preferred locations? I'm a bit surprised that change isn't in here as well as it seems relatively small and would show the use of it, maybe I'm wrong and its large though, which would make sense to split apart. Are there other specific usecases you have for this? I think this should be discussed more before going in. |
well, if we look at resource utilization, it may be better to wait for locality, and save resources for other jobs/tasks. This is really a hard problem, and the default 3 seconds locality wait may not be optimal either. We can only know the optimal solution if we know what jobs/tasks will be submitted in the future. For Is it possible to make this thing internal? e.g. do not document it publicly. This is not a perfect solution but I'm afraid there is no perf solution. This solution at lease gives us an option: if the locality is not that important for some certain tasks, you can use WILDCARD to let Spark schedule your tasks in other hosts. |
Thanks for the feedback, @tgravescs! This is a workaround. A complete solution would be bring the current locality fallback to task level instead instead of node level, as I said in the previous comment. An RDD knows the importance of locality based on its job and/or data size and sets a wait time for itself. Setting the cluster/node level wait time would definitely affect other workloads and is not a solution here. I could add the usage by LocalShuffledRowRDD here in this PR but was thinking it was a Spark SQL change and better be put in a separate PR. I'm fine either way, it's one line of code change plus some code comments. |
I am wondering whether you should return here with spark/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala Lines 287 to 290 in 72a946c
|
case _ => | ||
} | ||
pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index | ||
if (loc == WildcardLocation) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I would avoid this if
by introducing a new function which gets the pendingTaskSetToAddTo
and the resolveRacks
flag then handles the forHost
and forRack
part from these lines:
spark/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Lines 242 to 248 in 72a946c
pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index | |
if (resolveRacks) { | |
sched.getRackForHost(loc.host).foreach { rack => | |
pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index | |
} | |
} |
Then call this new function with the relevant case
branches and add a new case
for WildcardLocation
.
I think this way it would be easier to follow what happens where.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... you can match for case object this way:
case e: WildcardLocation.type =>
pendingTaskSetToAddTo.noPrefs += index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does case WildcardLocation =>
work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it does. It is even better as here nothing is needed from the matching object (which would be identical with WildcardLocation anyway).
For a while I've been saying that we should set the locality wait to 0 in general (the biggest problem IMO is https://issues.apache.org/jira/browse/SPARK-18886). You will notice even Kay comments in the discussion there:
Spark 3.0 might be a good chance to change that default. |
Fixing the scheduler locality algorithm is definitely more changes. The locality delay to me should be a per task delay, if a task doesn't get scheduled in 3 seconds then fall to the next locality. Right now it waits for any tasks to not be scheduled for 3 seconds at that locality. I know Kay has an argument for the FairScheduler use case but I don't know that I agree with that or that it isn't handled by the per task delay. If you really want your task to wait that long for locality you can simple set it higher. I'm not sure the code changes required to make that change though and if we really wanted to leave the old way in there with a config how ugly the code gets. | But the problem is when we have less mappers (from the shuffle map stage) than the number of worker nodes, e.g., 5 vs. 10, and if we stick to the preferred locations, the LocalShuffledRowRDD will suffer from locality wait and be even slower than the original ShuffledRowRDD. I'm not sure I follow this statement. If you have less mappers -lets say you have 5 and you have 10 worker nodes (assuming this is standalone mode - or do you mean executors?) - the 5 maps will run one 5 of those nodes. Your LocalShuffledRowRDD uses the map output location as the preferred locations so why wouldn't the scheduler schedule on those nodes? Are you saying the 10 workers nodes (not sure if you mean executors or workers?) are being used by others (either job or stage) and some might be busy and the delay on waiting is more then just reading over the network? Is this case with dynamic allocation or not? It sounds to me like the normal case of you ran on some executors, you may not have the same executors when your reduce phase runs so you are being delayed scheduling because you can't get node locality. I think you could have the same thing with shuffledRowRDD with a small number maps/reducers. The issue I want to understand is why are we special casing this one RDD for a performance improvement when in my opinion the majority of jobs would get a benefit from not having to wait for the locality (as implemented today). Changing the default like Imran mentioned might be a good first step and then fixing the algorithm would be the second in my opinion. Do you think a default of 0 node locality would solve your problem? Obviously if a user does set it then it gets applied again though. |
Changing the default locality wait time to 0 (or whatever it is) is based on the assumption that all workloads do not have serious penalty from a locality miss, coz we are looking at shuffles only. There can be exceptions where locality does matter a lot and it would be worth some wait time. Back to the local shuffle reader. I'll explain from the very beginning what it does. |
Thanks for the explanation.
What are these cases? I'm sure there are but based on what myself and many others I've talked to, its the exception instead of the rule. Doesn't matter whether its HDFS data or shuffle data. People are setting this to zero now anyway so changing default makes sense to me.
I don't follow this logic how do you go from 200 output partitions to 40 tasks? I would expect 200 output partitions to have 200 tasks. Doesn't matter to much as the main issue is your next sentence.
Now this part I understand. But goes back to what I said before, I don't see how this is any different then any other RDD. About a month ago, we specifically had a job that during shuffle was hitting this same thing. We had 20 nodes, but only 1 node was being scheduling on and it had significant impact on the job time, we set locality delay to 0 and worked around the issue. Another example, running on YARN. Let say I have 10 tasks reading hdfs data, I get 10 nodes, 5 of those nodes actually have HDFS blocks on them. With locality turned on those 5 nodes will be loaded up and depending on how long they take could keep the other 5 tasks from running as quickly as they should. So what happens if these 5 mappers have very large data or skewed data? Is it better to skip locality? That is going to add to the network usage - who is to say I want that vs waiting? It might be that the tasks take long enough that it actually does fall back to rack locality - it depends on the harmonics of when tasks finish and are scheduled. I'm willing to bet that in general having locality delay 0 still is more performant, which is why we go back to locality delay 0 for a default. If they actually need locality then they can turn it back on. Now that will also affect your RDD here as well - but this seems more of a very specific case. |
what kind of performance impact do you see if you just don't set preferred locations at all in your RDD? |
It's 200 tasks overall, but each mapper has 50. That simple, but doesn't really matter.
It would be no different from ShuffledRowRDD, and why would we bother to do the LocalShuffledRowRDD in the first place.
It is no different from any other RDDs (you mentioned). The only difference is that this RDD has a definitive "baseline" or "goal": it looks to perform no worse than a regular shuffle and better if possible. For other RDDs, I can't say what the target is, and what performance impact is considered acceptable. Yes, setting the locality wait to 0 would solve the problem of LocalShuffledRowRDD perfectly, and the effect is equivalent to this PR's proposal on a single task set alone. This leaves us only one difference of opinion: the exceptions, which I choose not to disclose. |
Please correct me if I'm wrong, it seems to me that the Set the locality wait time to 0 should be an answer to this case (and possibly many other use cases, too). But on the other hand, it would cause regression to other jobs/stages, where task locality is critical(those How about we accept the current PR as a temporary solution to workaround the delay scheduling issue, thus those RDDs that don't want to wait for perfect locality can just add |
Of course, it worth a separated JIRA/PR to discuss changing the default value of delay scheduling. |
I'm on the other side, I would rather see default set to 0 which I think most people do anyway and I believe will help a lot of other cases then add extra one off maintenance code here. But if others disagree I'm ok with this it just needs to he heavily documented as internal only developer api that should go away. Do we know how much performance diff this makes and how often? |
@@ -70,7 +80,9 @@ private[spark] object TaskLocation { | |||
def apply(str: String): TaskLocation = { | |||
val hstr = str.stripPrefix(inMemoryLocationTag) | |||
if (hstr.equals(str)) { | |||
if (str.startsWith(executorLocationTag)) { | |||
if (str == "*") { | |||
WildcardLocation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this goes in I would like to see documentation about this being DeveloperAPI and a temporary workaround and will be removed once locality is fixed.
I agree the default setting change needs to happen in a bigger conversation, but if that conversation is going to happen we shouldn't check this in until that is had in my opinion. I have not seen a real argument why this RDD is different than any other. But if we fix the real issue with locality then it helps everything. The argument that its a special version of ShuffledRowRDD and that sometimes you hit this locality issue doesn't convince me. I can hit the locality issue with ShuffledRowRDD, I might not hit the issue with the LocalShuffleRowRDD. Why not change ShuffledRowRDD or HadoopRDD to use this as well because I can hit the same issue? The only argument I can see is limited scope, but at the same time does it only turn it on then when you hit the case described with mappers < reducers and I have more executors then mappers? If it turns it on more than that, then one could argue you aren't following the semantics defined by Spark for locality wait. I don't see any concrete numbers here on performance impact or how much this affects users or why we should special case this? If it has a huge impact then I can see why we would special case it but I haven't seen any evidence of that. Do we have any cases this is seen in production - is there negative impact of user just setting node locality wait = 0? Again the main issue I have is that once it's introduced anyone can use it in an RDD - therefore I consider it a public interface. You say its limited impact and only used by adaptive execution but once introduced nothing stopping others from using it. Adding more people to get opinions. |
Not sure how representative these "people" are. So let's bring the whole thing to dev list discussion.
I actually see it the other way. If we do see that regular ShuffledRowRDD suffer from locality wait when it does happen to have a preferred location (because of satisfying That said, this is a partial solution only. I'd like to see a complete fix as well, but I don't think we should go completely the other way, by changing the default wait time to 0. |
Seems we all agree that the delay scheduling is problematic. And theoretically delay scheduling can be critical for some jobs (we can easily think of a custom RDD to prove this). Setting locality wait to 0 by default is another topic. Even if we do it, people that run jobs that need delay scheduling still need to set the locality wait. For these users, we need this WILDCARD location feature to enable AQE. That said, changing the default locality wait to 0 doesn't solve the problem. It doesn't mean locality wait is always 0. As long as it can be non-zero, we need to deal with it in AQE. |
It would be good to go through all the places where Like this place which returns a spark/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala Lines 179 to 181 in 72a946c
Which latter will be used to create only one |
Nobody has answered my questions above as to why this RDD should be treated differently and the impact of this. You just keep saying this is for adaptive scheduling. As far as I can see, this is purely another instance of https://issues.apache.org/jira/browse/SPARK-18886 and I don't see why we aren't using the same workaround or really fixing the real issue.
I don't agree. HadoopRDD for instance knows its locality, but how important the locality is very user/cluster specific. I don't see how the LocalShuffledRowRDD is any different. You are saying the user never cares about the locality on this - please explain to me why and how it is different from HadoopRDD? If we were to turn this on for HadoopRDD though then we would essentially be bypassing the locality settings.
Again why is AQE different? lets say I really want my HadoopRDD to use locality but then the shuffledRDD hits this issue. As a user I can't just turn locality off for my shuffleRDD so what makes the LocalShuffledRowRDD any different? From what has been described here, this is a very particular case. You have more nodes and reducers then maps, the maps finish very quickly (probably within 3 seconds), these are the same conditions other RDDs can hit the same issue |
You do because you are doing a one off hack here that Spark has to maintain, adds a new public api, and we can't use for anything else, why not fix locality for all RDDs as they can hit the same issue. |
So what fix exactly are you talking about here? |
@attilapiros Very good point. I'll go thru all references of |
It is the same problem as https://issues.apache.org/jira/browse/SPARK-18886 . It would be great if we can solve that problem first, but seems there is no conclusion yet. There is one difference in Do we have an ETA about when we can resolve https://issues.apache.org/jira/browse/SPARK-18886 ? We can't remove locality wait as nowadays we usually run many jobs on a Spark cluster. It's unclear to me what's the best solution to it. BTW, this feature won't be documented and it's not that public to me. Users can only know it by reading the discussion here. We can still remove it later if https://issues.apache.org/jira/browse/SPARK-18886 is resolved. To me this is just a workaround to turn off delay scheduling for certain tasks instead of globally, which does have value. |
any more comments? This workaround makes sense to me when there is no perfect solution for https://issues.apache.org/jira/browse/SPARK-18886 . This is also an important fix to turn on AQE by default in 3.0. |
In my opinion the ideal thing is to fix SPARK-18886, its the perfect time, this is a new major release and this isn't something impacting production now so we don't really need a "quick fix". I disagree with your comment there is no perfect solution, no one here has tried and really no one here has give me any metrics as to why this is so important of a fix. But I realize that is a lot more change so I'm ok with this going in as a temporary fix. Please update based on the comments made - I want to make sure this is clearly documented in the code has a hack that will go away and no on else should use it. Also can someone give me any performance metrics - how much of a different does the LocalShuffledRowRDD make? |
Could someone help review my proposed solution for SPARK-18886 here: The idea is to only reset scheduling delay timers if allocated slots, based on the scheduling policy (FIFO vs FAIR), are fully utilized. |
@tgravescs Our benchmark comparing AQE w/ LSR (local shuffle reader) with AQE w/o LSR showed that before locality wait fix, there were 2 queries with over 10% regressions, and after the fix, there was no regression and one query had over 27% improvement. @attilapiros Thank you again for the careful review! I changed a code in a way that I also added javadoc stating the limited application and the experimental nature of the |
Test build #114802 has finished for PR 26633 at commit
|
Test build #114823 has finished for PR 26633 at commit
|
retest this please |
Test build #114863 has finished for PR 26633 at commit
|
thanks for the performance stats, obviously its going to change some depending on job size and such when you say 10% and 27%, what are the total job run times? |
are you sure that in your case the slowdown is even caused by SPARK-18886? Even when that is solved, you could still get end up with one 3s wait for almost all tasks. I really do see how this can help, I see why folks want this. But my hesitance is that we're going to start putting in these random changes to delay scheduling, which will make the code even harder to understand; users will end up with even more knobs to tune; and we may be stuck with this even after SPARK-18886 because it would still be a performance regression against this change. I agree with Tom's point -- I don't see how we know that ignoring locality waits is right for just this one RDD but not for others. Though I want the default locality wait set to 0, I could see a cluster admin wanting to increase the locality wait because they know their cluster is very network constrained. In fact this may be against the wishes of of one particular spark application, but still best for the cluster as a whole. In that case, you really might want a 3s wait on LocalShuffledRowRDD |
Thank you @squito , for the feedback!
Actually, as I stated in previous comments, the only difference |
I think you're comparing LocalShuffledRowRDD vs. ShuffledRowRDD; I'm comparing LocalShuffledRowRDD with this change vs. LocalShuffledRowRDD without it. |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
This PR adds a new WILDCARD task location that can match any host. This WILDCARD location can be used together with other regular locations in the list of preferred locations to indicate that the task can be assigned to any host/executor if none of the preferred locations is available.
Why are the changes needed?
This is motivated by the requirement from LocalShuffledRowRDD. When the number of initial mappers of LocalShuffledRowRDD is smaller than the number of worker nodes, it can cause serious regressions if short-running tasks all wait on their preferred locations while they could have otherwise finished quickly on non-preferred locations too.
We have a "locality wait time" configuration that allows a task set to downgrade locality requirement after a certain time has passed. Yet, this configuration affects all task sets in the scheduler, and tasks all differ in penalty of locality miss. Thus, we need this finer-grained option for individual tasks to opt out of locality.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added UT.