Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling. #27207

Closed
wants to merge 15 commits into from

Conversation

bmarcott
Copy link
Contributor

@bmarcott bmarcott commented Jan 15, 2020

What changes were proposed in this pull request?

Delay scheduling is an optimization that sacrifices fairness for data locality in order to improve cluster and workload throughput.

One useful definition of "delay" here is how much time has passed since the TaskSet was using its fair share of resources.

However it is impractical to calculate this delay, as it would require running simulations assuming no delay scheduling. Tasks would be run in different orders with different run times.

Currently the heuristic used to estimate this delay is the time since a task was last launched for a TaskSet. The problem is that it essentially does not account for resource utilization, potentially leaving the cluster heavily underutilized.

This PR modifies the heuristic in an attempt to move closer to the useful definition of delay above.
The newly proposed delay is the time since a TasksSet last launched a task and did not reject any resources due to delay scheduling when offered its "fair share".

See the last comments of #26696 for more discussion.

Why are the changes needed?

cluster can become heavily underutilized as described in SPARK-18886

How was this patch tested?

TaskSchedulerImplSuite

@cloud-fan
@tgravescs
@squito

@cloud-fan
Copy link
Contributor

ok to test

// keyed by task set stage id
// value is true if there have been no resources rejected due to delay scheduling
// since the last "full" resource offer
private val noDelayScheduleRejects = new mutable.HashMap[Int, Boolean]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we ask each TSM to track it? We can pass the isAllFreeResources parameter to TSM when calling resourceOffer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you see as pros/cons of keeping this map vs putting in TSM?
The TaskSchedulerImpl is the only one setting/updating the variable based on knowledge only it has (no schedule delay reject).

@SparkQA
Copy link

SparkQA commented Jan 15, 2020

Test build #116771 has finished for PR 27207 at commit d52de6b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

so please update the description with information from the other PR. The description should have basically a high level design of this approach with enough details before someone reads the code to make sure the code is doing what you are proposing. What cases it covers and what cases you know it doesn't.

So the case you mentioned it doesn't cover:

The case I am referring to is: imagine you have 2 resources and an "all resource offer" is scheduled every second. when TSM1 is submitted, it'll also get an "all resource offer", and assume it rejects both, causing a prexisting TSM2 to utilize them. Assume those 2 tasks finish, and the freed resources are offered one by one to TSM1, which accepts both, all within 1 second (before any "all resource offer"). This should reset the timer, but it won't in the implementation.

So the issue here is that we aren't really tracking when all resources are used we are proxying that.
To really calculate the free slots though is pretty complex when you take into account blacklisting (have both application and taskset level).
I'm kind of thinking at this point the above case is ok, it favors not delaying and it will be fixed up on the next "all resource offer"

One thing I don't think I like is that if you are fully scheduled, we keep trying to schedule "all resources" but if there are no resources, then we continue to reset the timer. This means that it takes a long time to fall back in case where you may have multiple tasksets and the first task set rejects it and the second one takes it and the tasks are finishing such that you get an all resources offer in between the task finishes. In this scenario the first taskset can get starved. We would need to perhaps track this separately.

I need to take another walk through all the scenarios again as well

} while (launchedTaskAtCurrentMaxLocality)
}

if (!hadAnyDelaySchedulingReject) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - I think the variable names are a bit confusing because you have noDelay and then hadAnyDelay, I think it would be easier to read if they were the same.

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 16, 2020

@tgravescs
Thanks for the comments.

so please update the description with information from the other PR

Which one of my snippets from the previous PR was most clear to you? I can put that one in the description.

One thing I don't think I like....

Really good point on this scenario. It's bad even for the same scenario that you described but where the all resource offer has only 1 executor, and the first taskset accepts it.
Let me know if you have any good ideas here 😉

Is it ok I do follow up changes, such as variable names, unit tests, and other backend schedulers only once we iron out the problematic scenarios?

@tgravescs
Copy link
Contributor

yes its fine not to do any minor changes until we decide on design.
I'm not so worried about the 1 executor case if the first taskset takes it because that one should be higher priority and that should work as expected, when there isn't anywhere to put the tasks, then have them wait a bit to try to get locality. Its the case with the fair scheduler where the one that isn't the highest priority that I'm more concerned with.

I wonder if we can add in a separate tracking/check in TaskSchedulerImpl that tracks to see if it keeps rejecting it on the non all resource offers but then resets on the all offers, then after some number of those we stop resetting it, thoughts on that?

@bmarcott
Copy link
Contributor Author

bmarcott commented Jan 22, 2020

That may be reasonable to do, but I'd like to avoid adding more tracking/accounting if possible. I already don't like the boolean map I added.
What do you think about adding back the old condition of "must have launched a task".
so the new condition for reset would be must have no rejects and launch a task on an all resource offer
there is starvation still, but should be no more starvation than current master code I believe.

@cloud-fan
Copy link
Contributor

I'm fine with a non-perfect solution as long as it's no worse than the existing one and fixes the local wait problem.

@tgravescs
Copy link
Contributor

So the conditions would be like:

offer 1 resource that was rejected - no timer reset
offer all resources with no rejects and launched a task - timer is reset
offer 1 resource, no reject - timer is reset
offer 1 resource that was rejected - no timer reset
offer 1 resource, no reject - no timer reset because previous offer was rejected

I think that makes sense, it definitely addresses the case I was talking about with 2 task sets.

I was also wondering about leaving the old logic in there but configured off by default. While I don't like it, it would be safer if we haven't thought of some corner case and user could turn it back on if necessary, thoughts?

@bmarcott
Copy link
Contributor Author

@cloud-fan
thanks for the input

@tgravescs
yep, that sequence and explanation matches with my understanding

I was also wondering about leaving the old logic in there but configured off by default. While I don't like it, it would be safer if we haven't thought of some corner case and user could turn it back on if necessary, thoughts?

I'm not opposed to adding a new config that disables new way and enables old way, but is this standard practice in spark for the more risky changes?

I would feel more comfortable knowing there was a fallback too ;)

@tgravescs
Copy link
Contributor

its a case by case basis. While we don't like adding more configs and code for maintenance, if its somewhat risky change and with all the corner cases and different ways to run I think its warranted. Actually what we can do is leave the config undocumented and default to new algorithm and put a deprecated message by it so we can remove it in like a 3.1 if we want.

@tgravescs
Copy link
Contributor

@bmarcott will you be able to update this?

@bmarcott
Copy link
Contributor Author

yea, I'll try to take a look this weekend. Thanks for checking in.

@bmarcott
Copy link
Contributor Author

bmarcott commented Feb 2, 2020

@tgravescs
I updated with the new config to switch to legacy behavior as well as some doc/variable renaming.
I also updated the description of this PR.

After reading more through SchedulerBackend impls:

  1. decided to assume resource offers are full resource by default, to match previous behavior for any SchedulerBackends not touched in this PR.
  2. I only found MesosFineGrainedSchedulerBackend which doesn't use CoarseGrainedSchedulerBackend and decided not to touch it since it is deprecated and from the design looks like "all free resources" can't be tracked.

Test updates are still needed, but wanted to get your feedback first.

@SparkQA
Copy link

SparkQA commented Feb 2, 2020

Test build #117728 has finished for PR 27207 at commit e0ac12e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 2, 2020

Test build #117727 has finished for PR 27207 at commit 2924c5b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes me feel that we're trying to put cluster utilized before data locality while assigning resources. Right?

@bmarcott
Copy link
Contributor Author

bmarcott commented Feb 4, 2020

@Ngone51 thanks for taking a look.
I wouldn't say it puts one thing before another. The idea is to move closer to what a reasonable definition of scheduling delay (locality.wait time) is: how long you want to sacrifice "fairness" (using your fair share of resources) in favor of data locality.

I'll do the touch ups you mentioned after there is enough support on the design.

@cloud-fan
Copy link
Contributor

The newly proposed delay is the time since a TasksSet last launched a task and did not reject any resources due to delay scheduling when offered its "fair share".

Thanks for updating the PR description! The proposal makes sense to me.

Is it possible to centralize the delay scheduling code? Now it's in both TaskSchedulerImpl and TaskSetManager, which makes it a bit hard to understand as you need to think about the interactions between them.

@tgravescs
Copy link
Contributor

I'll take a closer look later, I think its fine not to support mesos fine grain scheduler

Copy link
Contributor Author

@bmarcott bmarcott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate everyones feedback. I've updated some nits locally and will follow up on tests later.

@@ -469,8 +486,10 @@ private[spark] class TaskSetManager(
extraResources,
serializedTask)
}
(taskDescription,
taskDescription.isEmpty && maxLocality == TaskLocality.ANY && pendingTasks.all.nonEmpty)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
What are your thoughts on:
I'm thinking instead of making assumptions about what taskDescription.isEmpty means, maybe it'd be better to pass maxLocality into dequeueTask and then change its logic near the bottom to be something like:

    if (TaskLocality.isAllowed(allowedLocality, TaskLocality.ANY)) {
      dequeue(pendingTaskSetToUse.all).foreach { index =>
        return Some((index, TaskLocality.ANY, speculative))
      }
    } else if (maxLocality == TaskLocality.ANY && pendingTasks.all.nonEmpty) {
      hasReject = true
    }

@bmarcott
Copy link
Contributor Author

bmarcott commented Feb 10, 2020

@cloud-fan

Is it possible to centralize the delay scheduling code? Now it's in both TaskSchedulerImpl and TaskSetManager, which makes it a bit hard to understand as you need to think about the interactions between them.

I am not sure a good way to centralize because

  1. The TSM is called multiple times with various offers and we need to keep track of what happened across those calls
  2. TSM today doesn't differentiate whether it didn't launch a task due to blacklisting or due to delay scheduling, hence the new boolean returned.

@SparkQA
Copy link

SparkQA commented Feb 10, 2020

Test build #118111 has finished for PR 27207 at commit 61671b3.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 10, 2020

Test build #118113 has finished for PR 27207 at commit f341ebb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2020

Test build #118214 has finished for PR 27207 at commit e39680b.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 11, 2020

Test build #118223 has finished for PR 27207 at commit 894bebb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 3, 2020

Test build #120771 has finished for PR 27207 at commit 24c8ad9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

@bmarcott accepted your linked in but we can discuss here as well. what all manual testing have you done? I was hoping to do some myself but haven't had time yet. We can continue to test after its merged in.

@bmarcott
Copy link
Contributor Author

bmarcott commented Apr 4, 2020

I have not tested this manually yet, nor have played around with using multiple executors locally before.
Maybe I can just use spark standalone: https://spark.apache.org/docs/latest/spark-standalone.html

@bmarcott
Copy link
Contributor Author

bmarcott commented Apr 6, 2020

running into problems running standalone. posted question on stack

....updated...
was using wrong port. Answered my own question

@tgravescs
Copy link
Contributor

@bmarcott did you get it to work on standalone cluster?

@bmarcott
Copy link
Contributor Author

bmarcott commented Apr 8, 2020

@tgravescs I did setup everything and can run jobs. Now I am trying to create a use case where the tasks prefer a particular executor. Have any ideas on a way to easily repro this issue?

@cloud-fan
Copy link
Contributor

Can you create a custom RDD which generates random data and set the preferred location to one executor?

@bmarcott
Copy link
Contributor Author

bmarcott commented Apr 9, 2020

Thanks @cloud-fan
That is the approach I was going to start with.
In the wild/production I hit this issue with a join, but I forget the exact reason/context.

@bmarcott
Copy link
Contributor Author

bmarcott commented Apr 9, 2020

First manual test looks good. Ran a simple app which creates 1000 partitions all preferring executor 0.
Each task sleeps 1 second. Here is the app's code .
New code utilized all executors, whereas with legacy flag enabled, only process local tasks were run, making it much slower.

Run with new command:
../../projects/apache/spark/bin/spark-submit --class "TestLocalityWait" --master spark://localhost:7077 --conf spark.executor.instances=4 --conf spark.executor.cores=2 target/scala-2.12/simple-project_2.12-1.0.jar

Processed 326 partitions in 35 seconds:
image

Many tasks run at ANY locality level:
image


Run with legacy flag set to true:
../../projects/apache/spark/bin/spark-submit --class "TestLocalityWait" --master spark://localhost:7077 --conf spark.executor.instances=4 --conf spark.executor.cores=2 --conf spark.locality.wait.legacyResetOnTaskLaunch=true target/scala-2.12/simple-project_2.12-1.0.jar

Processed 146 partitions in 1.3 min:
image

All tasks run at PROCESS_LOCAL locality level
image

@cloud-fan
Copy link
Contributor

cloud-fan commented Apr 9, 2020

Thanks for the manual testing! Great Job!

Merging to master!

@cloud-fan cloud-fan closed this in 8b48629 Apr 9, 2020
@cloud-fan
Copy link
Contributor

It conflicts with 3.0, can you send a new PR for 3.0?

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Apr 10, 2020

Hi, All.
The last test was 7 days ago. This causes a UT failure on all master Jenkins jobs which is added by another PR. I made a follow-up to recover master branch.

@cloud-fan
Copy link
Contributor

thanks for fixing!

@bmarcott
Copy link
Contributor Author

bmarcott commented Apr 10, 2020

@cloud-fan I thought 3.0 was already branched and frozen? Will open a PR for it if I am given the go ahead.
Thanks for all the review and feedback along the way (@tgravescs as well!)

I am further manual testing and finding either I'm testing wrong, or locality wait isn't being respected even when I set spark.locality.wait higher . Looking into it...(may have several day delay)
I believe I may need to add a special case for when there is an all resource offer with 0 offers (there will be no launched task, so no reset).

@cloud-fan
Copy link
Contributor

This is a high-value perf fix, but seems too risky for 3.0 after a hindsight. @tgravescs are you OK with having this fix in master only?

@tgravescs
Copy link
Contributor

yeah at this point I'm fine with leaving it in master only.

@dongjoon-hyun
Copy link
Member

Hi, @cloud-fan . If the decision is final, could you resolve SPARK-18886 with the fixed version, 3.1.0? The JIRA issue is still open. Thanks.

@cloud-fan
Copy link
Contributor

done

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…ilization due to delay scheduling

### What changes were proposed in this pull request?

[Delay scheduling](http://elmeleegy.com/khaled/papers/delay_scheduling.pdf) is an optimization that sacrifices fairness for data locality in order to improve cluster and workload throughput.

One useful definition of "delay" here is how much time has passed since the TaskSet was using its fair share of resources.

However it is impractical to calculate this delay, as it would require running simulations assuming no delay scheduling. Tasks would be run in different orders with different run times.

Currently the heuristic used to estimate this delay is the time since a task was last launched for a TaskSet. The problem is that it essentially does not account for resource utilization, potentially leaving the cluster heavily underutilized.

This PR modifies the heuristic in an attempt to move closer to the useful definition of delay above.
The newly proposed delay is the time since a TasksSet last launched a task **and** did not reject any resources due to delay scheduling when offered its "fair share".

See the last comments of apache#26696 for more discussion.

### Why are the changes needed?

cluster can become heavily underutilized as described in [SPARK-18886](https://issues.apache.org/jira/browse/SPARK-18886?jql=project%20%3D%20SPARK%20AND%20text%20~%20delay)

### How was this patch tested?

TaskSchedulerImplSuite

cloud-fan
tgravescs
squito

Closes apache#27207 from bmarcott/nmarcott-fulfill-slots-2.

Authored-by: Nicholas Marcott <481161+bmarcott@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
asfgit pushed a commit that referenced this pull request Apr 22, 2020
… no task was launched

### What changes were proposed in this pull request?
Remove the requirement to launch a task in order to reset locality wait timer.

### Why are the changes needed?
Recently #27207 was merged, but contained a bug which leads to undesirable behavior.

The crux of the issue is that single resource offers couldn't reset the timer, if there had been a previous reject followed by an allResourceOffer with no available resources.
This lead to a problem where once locality level reached ANY, single resource offers are all accepted, leading allResourceOffers to be left with no resources to utilize (hence no task being launched on an all resource offer -> no timer reset). The task manager would be stuck in ANY locality level.

Noting down here the downsides of using below reset conditions, in case we want to follow up.
As this is quite complex, I could easily be missing something, so please comment/respond if you have more bad behavior scenarios or find something wrong here:
The format is:

> **Reset condition**
>  - the unwanted side effect
>      - the cause/use case

Below references to locality increase/decrease mean:
```
PROCESS_LOCAL, NODE_LOCAL ... .. ANY
    ------ locality decrease --->
   <----- locality increase -----
```

**Task launch:**
- locality decrease:
   - Blacklisting, FAIR/FIFO scheduling, or task resource requirements can minimize tasks launched
 - locality increase:
   - single task launch decreases locality despite many tasks remaining

**No delay schedule reject since last allFreeResource offer**
- locality decrease:
   - locality wait less than allFreeResource offer frequency, which occurs at least 1 per second
- locality increase:
   - single resource (or none) not rejected despite many tasks remaining (other lower priority tasks utilizing resources)

**Current impl - No delay schedule reject since last (allFreeResource offer + task launch)**
- locality decrease:
  - all from above
- locality increase:
   - single resource accepted and task launched despite many tasks remaining

The current impl is an improvement on the legacy (task launch) in that unintended locality decrease case is similar and the unintended locality increase case only occurs when the cluster is fully utilized.

For the locality increase cases, perhaps a config which specifies a certain % of tasks in a taskset to finish before resetting locality levels would be helpful.

**If** that was considered a good approach then perhaps removing the task launch as a requirement would eliminate most of downsides listed above.
Lemme know if you have more ideas for eliminating locality increase downside of **No delay schedule reject since last allFreeResource offer**

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
TaskSchedulerImplSuite

Also manually tested similar to how I tested in #27207 using [this simple app](https://github.com/bmarcott/spark-test-apps/blob/master/src/main/scala/TestLocalityWait.scala).

With the new changes, given locality wait of 10s the behavior is generally:
10 seconds of locality being respected, followed by a single full utilization of resources using ANY locality level, followed by 10 seconds of locality being respected, and so on

If the legacy flag is enabled (spark.locality.wait.legacyResetOnTaskLaunch=true), the behavior is only scheduling PROCESS_LOCAL tasks (only utilizing a single executor)

cloud-fan
tgravescs

Closes #28188 from bmarcott/nmarcott-locality-fix.

Authored-by: Nicholas Marcott <481161+bmarcott@users.noreply.github.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
l2
}
}

/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we update the description of this function and explain the parameter "isAllFreeResources"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I can add something like: if true, then the parameter offers contains all workers and their free resources. See delay scheduling comments in class description.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there are changes suggested and required, please file a separate new jira for this and link them. this pr has been merged and we have had to many followups at this point.

def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
def resourceOffers(
offers: IndexedSeq[WorkerOffer],
isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to set the default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I originally did this to not break the api + maintain something closer to previous behavior for callers who hadn't migrated to setting it to false.
Lemme know if this is the wrong approach.

otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
…ilization due to delay scheduling

Ref: LIHADOOP-57393

[Delay scheduling](http://elmeleegy.com/khaled/papers/delay_scheduling.pdf) is an optimization that sacrifices fairness for data locality in order to improve cluster and workload throughput.

One useful definition of "delay" here is how much time has passed since the TaskSet was using its fair share of resources.

However it is impractical to calculate this delay, as it would require running simulations assuming no delay scheduling. Tasks would be run in different orders with different run times.

Currently the heuristic used to estimate this delay is the time since a task was last launched for a TaskSet. The problem is that it essentially does not account for resource utilization, potentially leaving the cluster heavily underutilized.

This PR modifies the heuristic in an attempt to move closer to the useful definition of delay above.
The newly proposed delay is the time since a TasksSet last launched a task **and** did not reject any resources due to delay scheduling when offered its "fair share".

See the last comments of apache#26696 for more discussion.

cluster can become heavily underutilized as described in [SPARK-18886](https://issues.apache.org/jira/browse/SPARK-18886?jql=project%20%3D%20SPARK%20AND%20text%20~%20delay)

TaskSchedulerImplSuite

cloud-fan
tgravescs
squito

Closes apache#27207 from bmarcott/nmarcott-fulfill-slots-2.

Authored-by: Nicholas Marcott <481161+bmarcott@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
… no task was launched

Ref: LIHADOOP-57393

Remove the requirement to launch a task in order to reset locality wait timer.

Recently apache#27207 was merged, but contained a bug which leads to undesirable behavior.

The crux of the issue is that single resource offers couldn't reset the timer, if there had been a previous reject followed by an allResourceOffer with no available resources.
This lead to a problem where once locality level reached ANY, single resource offers are all accepted, leading allResourceOffers to be left with no resources to utilize (hence no task being launched on an all resource offer -> no timer reset). The task manager would be stuck in ANY locality level.

Noting down here the downsides of using below reset conditions, in case we want to follow up.
As this is quite complex, I could easily be missing something, so please comment/respond if you have more bad behavior scenarios or find something wrong here:
The format is:

> **Reset condition**
>  - the unwanted side effect
>      - the cause/use case

Below references to locality increase/decrease mean:
```
PROCESS_LOCAL, NODE_LOCAL ... .. ANY
    ------ locality decrease --->
   <----- locality increase -----
```

**Task launch:**
- locality decrease:
   - Blacklisting, FAIR/FIFO scheduling, or task resource requirements can minimize tasks launched
 - locality increase:
   - single task launch decreases locality despite many tasks remaining

**No delay schedule reject since last allFreeResource offer**
- locality decrease:
   - locality wait less than allFreeResource offer frequency, which occurs at least 1 per second
- locality increase:
   - single resource (or none) not rejected despite many tasks remaining (other lower priority tasks utilizing resources)

**Current impl - No delay schedule reject since last (allFreeResource offer + task launch)**
- locality decrease:
  - all from above
- locality increase:
   - single resource accepted and task launched despite many tasks remaining

The current impl is an improvement on the legacy (task launch) in that unintended locality decrease case is similar and the unintended locality increase case only occurs when the cluster is fully utilized.

For the locality increase cases, perhaps a config which specifies a certain % of tasks in a taskset to finish before resetting locality levels would be helpful.

**If** that was considered a good approach then perhaps removing the task launch as a requirement would eliminate most of downsides listed above.
Lemme know if you have more ideas for eliminating locality increase downside of **No delay schedule reject since last allFreeResource offer**

No

TaskSchedulerImplSuite

Also manually tested similar to how I tested in apache#27207 using [this simple app](https://github.com/bmarcott/spark-test-apps/blob/master/src/main/scala/TestLocalityWait.scala).

With the new changes, given locality wait of 10s the behavior is generally:
10 seconds of locality being respected, followed by a single full utilization of resources using ANY locality level, followed by 10 seconds of locality being respected, and so on

If the legacy flag is enabled (spark.locality.wait.legacyResetOnTaskLaunch=true), the behavior is only scheduling PROCESS_LOCAL tasks (only utilizing a single executor)

cloud-fan
tgravescs

Closes apache#28188 from bmarcott/nmarcott-locality-fix.

Authored-by: Nicholas Marcott <481161+bmarcott@users.noreply.github.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>

RB=2466127
BUG=LIHADOOP-57393
G=spark-reviewers
R=mmuralid,minyang,mshen,chsingh
A=mmuralid,mshen
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants