Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-35022][CORE] Task Scheduling Plugin in Spark #32136

Closed
wants to merge 8 commits into from

Conversation

viirya
Copy link
Member

@viirya viirya commented Apr 12, 2021

What changes were proposed in this pull request?

This patch proposes to add a plugin API for providing scheduling suggestion to Spark task scheduler.

Design doc: https://docs.google.com/document/d/1wfEaAZA7t02P6uBH4F3NGuH_qjK5e4X05v1E5pWNhlQ/edit?usp=sharing

Why are the changes needed?

Spark scheduler schedules tasks to executors in an arbitrary (maybe not accurate description, but I cannot find good term here) way. The schedule schedules the tasks by itself. Although there is locality configuration, the configuration is used for data locality purposes. Generally we cannot suggest the scheduler where a task should be scheduled to. Normally it is not a problem because the general task is executor-agnostic.

But for special tasks, for example stateful tasks in Structured Streaming, state store is maintained at the executor side. Changing task location means reloading checkpoint data from the last batch. It has disadvantages from the performance perspective and also casts some limitations when we want to implement some features in Structured Streaming. We need an API to tell Spark scheduler how to schedule tasks.

Does this PR introduce any user-facing change?

No. This API should be developer-only and currently for Spark internal only too.

How was this patch tested?

Unit test.

@viirya
Copy link
Member Author

viirya commented Apr 12, 2021

May need to have more test cases. But I'd like to ask the opinions from the community first.

cc @cloud-fan @Ngone51

@github-actions github-actions bot added the CORE label Apr 12, 2021
@SparkQA
Copy link

SparkQA commented Apr 12, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41825/

@SparkQA
Copy link

SparkQA commented Apr 12, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41825/

@SparkQA
Copy link

SparkQA commented Apr 13, 2021

Test build #137245 has finished for PR 32136 at commit ae9a8cb.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

cc @mridulm and @tgravescs FYI

@hvanhovell
Copy link
Contributor

@viirya why is the current locality scheduling mechanism not good enough? What problem are you trying to fix?

@viirya
Copy link
Member Author

viirya commented Apr 13, 2021

#30812 is a previous attempt to use locality mechanism to stabilize state store location. Basically I want to do is to avoid Spark schedule streaming tasks which use state store (let me call them stateful tasks) to arbitrary executors. In short it wastes resource consumption on state store, and costs extra time on restoring state store on different executors.

For the use-case, current locality seems a hacky approach as we can just blindly assign stateful tasks to executors evenly. We do not know if the assignment makes sense for the scheduler. It makes me think that we may need an API that we can use to provide scheduling suggestion.

@cloud-fan
Copy link
Contributor

to avoid Spark schedule streaming tasks which use state store (let me call them stateful tasks) to arbitrary executors.

I don't think we can guarantee it. It's a best effort and tasks should be able to run on any executor, thought tasks can have preferred executors (locality). Otherwise, we need to revisit many design decisions like how to avoid infinite wait, how to auto-scale, etc.

current locality seems a hacky approach as we can just blindly assign stateful tasks to executors evenly.

Can you elaborate? If it's a problem of delay scheduling let's fix it instead.

* This trait provides a plugin interface for suggesting task scheduling to Spark
* scheduler.
*/
private[spark] trait TaskSchedulingPlugin {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this private to spark if people are supposed to implement?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be marked with developer api

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure now if we want it to be open to implement outside Spark. So just as private. Can be public if we have a consensus.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not to be exposed, then why make it a plugin? Isn't the risk that you are now overgeneralizing something for a non-existent benefit?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we add a locality constraints that forces you to schedule on an executor instead?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I replied on the doc too, locality is static. During the assignment of task set, locality is static. I also considered this option, e.g. dynamically change locality during task scheduling, but I think the change will be more, and it's more easily to affect current code/app.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is not to be exposed, then why make it a plugin? Isn't the risk that you are now overgeneralizing something for a non-existent benefit?

I agree now the class name might be confusing. I thought it as a plugin. Now it is not a plugin you can plug into Spark but a private API.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure now if we want it to be open to implement outside Spark.

IMHO I'm thinking opposite on this. I'm not sure if we want to add some implementation in Spark codebase, unless the implementation of plugin proves that it works well for all possible cases.

I'd rather be OK to let Spark users (including us) to be hackers and customize the scheduler with "taking their own risks", but I have to be conservative if we want to change the behavior of Spark. If changing the behavior of Spark to cope with state by default is the final goal, I think the behavioral change itself is a major one requiring SPIP, not something we can simply go after doing POC on happy case.

@tgravescs
Copy link
Contributor

I would like to see more overview and design details. I think the idea of having something here is good because some people may want to cluster tasks, while some might want to spread them. You might want to place them based on hardware or something else. I want to understand how flexible this plugin api you are proposing it.

I just saw https://docs.google.com/document/d/1wfEaAZA7t02P6uBH4F3NGuH_qjK5e4X05v1E5pWNhlQ/edit# which has a few details. Would be good to link from description.

questions:

  1. what happens with locality? it looks like this is plugged in after locality, are you disabling locality then or it doesn't have any for your use case? if we create a plugin to choose location I wouldn't necessarily want locality to take affect.
  2. @param tasks The full list of tasks => this is all tasks even if done? Would you want to know which ones are running already or which was have succeeded
  3. this is being called from synchronized block, in the very least we need to document better and affects it could have on scheduling time
  4. it looks like your plugin runs before blacklisting, is this really what we want or would plugin like to know to make better decision?
  5. how does this interact with barrier where it resets things if it doesn't get scheduled?

I would like to see how this applies to other use cases I mentioned above before putting this in.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although there is locality configuration, the configuration is used for data locality purposes

a) In the case of streaming workloads, I think the locality info here is about the state store instead of data. e.g.,

override def getPreferredLocations(partition: Partition): Seq[String] = {
val stateStoreProviderId = getStateProviderId(partition)
storeCoordinator.flatMap(_.getLocation(stateStoreProviderId)).toSeq
}

So I think that locality preference scheduling (or delay scheduling) would also apply to it (the state store location).

b) That being said, I actually had the same concern when I toughed the streaming code. Because I know that delay scheduling doesn't guarantee the final scheduling location to be the preferred location provided by the task. So, the cost of reloading statestore would still exist potentially.

I want to make sure first that which case you're trying to fix here? a or b?

@viirya
Copy link
Member Author

viirya commented Apr 13, 2021

I don't think we can guarantee it. It's a best effort and tasks should be able to run on any executor, thought tasks can have preferred executors (locality). Otherwise, we need to revisit many design decisions like how to avoid infinite wait, how to auto-scale, etc.

to avoid Spark schedule streaming tasks which use state store (let me call them stateful tasks) to arbitrary executors.

Sorry I think this sentence is misleading. I don't mean to break "tasks should be able to run on any executor" design. This API doesn't break it. What we want is to be able to set constraint/condition on choosing which task to be scheduled to an executor. So basically tasks are still able to run on any executor. Just for some purpose, we need an executor to pick up a task.

Can you elaborate? If it's a problem of delay scheduling let's fix it instead.

In #30812, @zsxwing, @HeartSaVioR and me have a long discussion around using locality for stateful tasks. You can see my original approach is to use locality, but overall it is considered too hacky and now I share the points from them. You may catch up the comments there.

Basically I think the problem is, for a stateful job that we want to evenly distribute tasks to all executors and let the executor-task mapping relatively stable. With locality, we can only assign tasks to executors blindly. For example, the scheduler knows more about executor capacity and knows what executors should be assigned with tasks. But in SS, we don't have such info (and should not have it too).

@SparkQA
Copy link

SparkQA commented Apr 13, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41887/

@SparkQA
Copy link

SparkQA commented Apr 13, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/41887/

@SparkQA
Copy link

SparkQA commented Apr 14, 2021

Test build #137307 has finished for PR 32136 at commit 3097e73.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member Author

viirya commented Apr 14, 2021

a) In the case of streaming workloads, I think the locality info here is about the state store instead of data. e.g.,
So I think that locality preference scheduling (or delay scheduling) would also apply to it (the state store location).

b) That being said, I actually had the same concern when I toughed the streaming code. Because I know that delay scheduling doesn't guarantee the final scheduling location to be the preferred location provided by the task. So, the cost of reloading statestore would still exist potentially.

Let me figure the difference between a and b. So (a) looks like using locality for state store location and (b) is that locality cannot guarantee actual location. Right? Please let me know if I misunderstand.

I have tried to use locality for tasks with state stores in #30812. As you know (in the code snippet), actually SS already uses locality for state store location. However it has a few problems: 1. it uses previous state store location as locality, so if no previous location info, we still let Spark pick up executor arbitrarily. 2. It depends on initial chosen executor-state store mapping. So if Spark choose a sub-optimal mapping, locality doesn't work well for later batches. 3. Forcibly assigning state stores to executors can possibly lead to unreasonable scheduling decision. For example, we don't know if the executor satisfy resource requirement.

@viirya
Copy link
Member Author

viirya commented Apr 14, 2021

I just saw https://docs.google.com/document/d/1wfEaAZA7t02P6uBH4F3NGuH_qjK5e4X05v1E5pWNhlQ/edit# which has a few details. Would be good to link from description.

Linked it from the description. Thanks for reminding.

questions:

1. what happens with locality? it looks like this is plugged in after locality, are you disabling locality then or it doesn't have any for your use case?   if we create a plugin to choose location I wouldn't necessarily want locality to take affect.

For now it still respects locality. My thought is to not interfere the scheduler too much. For each locality level, the scheduler will try to pick up one task from a list of tasks for the particular locality level. The API jumps in at the moment and let the scheduler know which tasks are most preferred on the executor. If users don't want locality to take effect, it is doable by disabling locality configs. Otherwise, from the API perspective, if it doesn't want a particular task to be on an executor, it can also let the scheduler know (aka, don't have it in the returned list).

2. @param tasks The full list of tasks => this is all tasks even if done?  Would you want to know which ones are running already or which was have succeeded

We may not need to know. The parameter can be discussed. As taskIndexes are indexes to the full task list, it is easier for us to get task from the index. Can be the subset of tasks, i.e. the tasks of the passed in task indexes pointing to, if it is preferred.

3. this is being called from synchronized block, in the very least we need to document better and affects it could have on scheduling time

Yea, I thought about it, but forgot to add the comment. I will do it in next commit.

4. it looks like your plugin runs before blacklisting, is this really what we want or would plugin like to know to make better decision?

Good point. I think the reason it runs before blacklisting is that it is easier to fit into current logic and seems safer. Currently the scheduler iterates each task from the list, and then checks blacklist before picking it up. If I let the API gets a list after blacklisting, seems it might be a larger change to the dequeue logic.

5. how does this interact with barrier where it resets things if it doesn't get scheduled?

IIUC, the plugin does not affect or change how the scheduler acts on barrier tasks. In the current dequeue logic, the scheduler doesn't have different behavior on barrier task/general task. For now if the scheduler cannot schedule all barrier tasks at once, it will reset the assigned resource offers. It is the same with the plugin.

@Ngone51
Copy link
Member

Ngone51 commented Apr 14, 2021

... So (a) looks like using locality for state store location and (b) is that locality cannot guarantee actual location. Right?

Yes.

  1. it uses previous state store location as locality, so if no previous location info, we still let Spark pick up executor arbitrarily.

So, how would the plugin help when there's no previous location info?

  1. It depends on initial chosen executor-state store mapping. So if Spark choose a sub-optimal mapping, locality doesn't work well for later batches.

I think this is the point that matches my point of case b above.

But looking at the code, it seems the plugin is still applied after locality scheduling?

  1. Forcibly assigning state stores to executors can possibly lead to unreasonable scheduling decision. For example, we don't know if the executor satisfy resource requirement.

I don't get this. Do you mean some executors may not be suitable for having a state store?

@cloud-fan
Copy link
Contributor

Correct me if I'm wrong: Spark tries its best to schedule SS tasks on executors that have existing state store data. This is already the case and is implemented via the preferred location. The problem we are solving here is the first micro-batch, where there is no existing state store data and we want to schedule the tasks of the first micro-batch evenly on the cluster. This is to avoid skews in the future that many SS tasks are running on very few executors.

@viirya
Copy link
Member Author

viirya commented Apr 14, 2021

So, how would the plugin help when there's no previous location info?

For example, in the plugin implementation, it can distribute tasks to executors more evenly.

I think this is the point that matches my point of case b above.

But looking at the code, it seems the plugin is still applied after locality scheduling?

Locality still works. Generally, this plugin API doesn't break locality if it is set.

  1. Forcibly assigning state stores to executors can possibly lead to unreasonable scheduling decision. For example, we don't know if the executor satisfy resource requirement.

I don't get this. Do you mean some executors may not be suitable for having a state store?

For example, an executor is not capable for running the task? If we blindly assign stateful task to executor, we don't actually know if the executor is capable for the task. Only the scheduler knows the info. I think this is the major point discussed in my previous PR.

@Ngone51
Copy link
Member

Ngone51 commented Apr 14, 2021

...it can distribute tasks to executors more evenly.

I think the scheduler already distributes tasks evenly when there's no locality preference as we'll shuffle the executors before scheduling:

val shuffledOffers = shuffleOffers(filteredOffers)

Doesn't it work?

@viirya
Copy link
Member Author

viirya commented Apr 14, 2021

I think the scheduler already distributes tasks evenly when there's no locality preference as we'll shuffle the executors before scheduling:

Doesn't it work?

For the code snippet, doesn't it depend on if all executors are available during the moment of making offers? It seems to be unreliable due to a problem like race condition. For example, running SS job with state stores, it is easier to see the initial tasks are scheduled to part of all executors.

@Ngone51
Copy link
Member

Ngone51 commented Apr 15, 2021

For the code snippet, doesn't it depend on if all executors are available during the moment of making offers?

Yes, that's true. But normally, even in the case of offering a single resource that released from a single task, it seems it's less possible to schedule tasks unevenly unless the resources are really scarce.

For example, running SS job with state stores, it is easier to see the initial tasks are scheduled to part of all executors.

Do you have logs related to the scheduling? I'd like to see how it happens.

@viirya
Copy link
Member Author

viirya commented Apr 15, 2021

Yes, that's true. But normally, even in the case of offering a single resource that released from a single task, it seems it's less possible to schedule tasks unevenly unless the resources are really scarce.

As I saw in previous tests, it is by chance to have all tasks are evenly distributed to all executors. Sometimes it is, but sometimes only partial executors are scheduled at the first batch.

Do you have logs related to the scheduling? I'd like to see how it happens.

I don't have logs now. It is a general and simple SS job reading from Kafka.

@viirya
Copy link
Member Author

viirya commented Apr 16, 2021

Correct me if I'm wrong: Spark tries its best to schedule SS tasks on executors that have existing state store data. This is already the case and is implemented via the preferred location. The problem we are solving here is the first micro-batch, where there is no existing state store data and we want to schedule the tasks of the first micro-batch evenly on the cluster. This is to avoid skews in the future that many SS tasks are running on very few executors.

That is correct. However, even for not first micro-batch, we currently use preferred location + non-trivial locality config (e.g., 10h) to force Spark schedule tasks to previous locations. I think it is not flexible because locality is a global setting. A non-trivial locality config might cause sub-optimal result for other stages. And, it requires end-users to set it. It makes me feel that it is not a user-friendly approach.

@SparkQA
Copy link

SparkQA commented Apr 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/42023/

@viirya
Copy link
Member Author

viirya commented May 24, 2021

BTW, @Ngone51 Is dynamic allocation is required for stage-level scheduling? At least it is what I got from reading the code. But seems dynamic allocation has some issues (or not work well?) with SS (e.g. SPARK-24815)? Right? @HeartSaVioR

@Ngone51
Copy link
Member

Ngone51 commented May 24, 2021

BTW, @Ngone51 Is dynamic allocation is required for stage-level scheduling?

It's required for the classic use case (when you really need to change executor resources dynamically) but not this case.

In this case, we only leverage the task scheduling characteristic of the stage level scheduling. Thus, adding the task request for the state store resource only should be enough.

@tgravescs
Copy link
Contributor

thanks for explaining some of the SS specific things.

the stage level scheduling changes you are proposing are definitely not what it does now and we would definitely need a few features we were wanting to do anyway, like reusing existing containers, but that is all doable just needs to be implemented. it sounds like really this would be a special stage level scheduling feature that is SS state store specific, because user would not specify an exact requirement just that tasks must be scheduling on executors with its specific state store. This means that if an executor goes down it has to wait for something else on executor to start up the task specific
state store - what is going to do that in this scenario? Or you wait a certain period and schedule it anywhere.

it seems like this feature could existing outside creating a new ResourceProfile with the stage level scheduling api's and user should be able to specify this option that would only work with stateStoreRDD. Is it useful outside of that? I don't see how unless we added another plugin point for executor to report back any resource and then come up with some api that it could call to do the mapping of taskId to resourceId reported back to it.

To follow the existing custom resource management, the state store resource might be maintained as ("statestore" -> list of statestore ids) as a part of the custom resource.

I think this would mean scheduler would have some specific logic to be able to match task id to state store id, right? Otherwise stage level scheduling would schedule a task on anything in that list., which seems like at that point makes a list not relavent if Spark knows how to do some sort of mapping.

@viirya
Copy link
Member Author

viirya commented May 24, 2021

It's required for the classic use case (when you really need to change executor resources dynamically) but not this case.

In this case, we only leverage the task scheduling characteristic of the stage level scheduling. Thus, adding the task request for the state store resource only should be enough.

Good to know that. So, does it mean I can remove the dynamic allocation check for our case without affecting classic stage-level scheduling?

This means that if an executor goes down it has to wait for something else on executor to start up the task specific
state store - what is going to do that in this scenario? Or you wait a certain period and schedule it anywhere.

I think this could be configurable. Users can configure it to schedule on anywhere and load from checkpointed data, or in other case just fail the streaming app, after a certain period.

I think this would mean scheduler would have some specific logic to be able to match task id to state store id, right? Otherwise stage level scheduling would schedule a task on anything in that list., which seems like at that point makes a list not relavent if Spark knows how to do some sort of mapping.

Hmm, @Ngone51, is it true? For other resources like GPUs, it makes sense but in this case we need specific (task id)-(resource id, e.g. state store id, PVC claim name, etc.) bound.

@Ngone51
Copy link
Member

Ngone51 commented May 25, 2021

So, does it mean I can remove the dynamic allocation check for our case without affecting classic stage-level scheduling?

Which check are you referring to?

This means that if an executor goes down it has to wait for something else on executor to start up the task specific
state store - what is going to do that in this scenario? Or you wait a certain period and schedule it anywhere.

I think this could be configurable. Users can configure it to schedule on anywhere and load from checkpointed data, or in other case just fail the streaming app, after a certain period.

Yes. Besides, in the Spark default use case, we can also move the state store resources to other active executors (ExecutorData). And tasks would just reconstruct them at the executor later.

I think this would mean scheduler would have some specific logic to be able to match task id to state store id, right? Otherwise stage level scheduling would schedule a task on anything in that list., which seems like at that point makes a list not relavent if Spark knows how to do some sort of mapping.

Hmm, @Ngone51, is it true? For other resources like GPUs, it makes sense but in this case we need specific (task id)-(resource id, e.g. state store id, PVC claim name, etc.) bound.

That's true. We need the mapping. I thought about the using the exiting task info (e.g., prtitionId) should be enough to match the right state store but looks wrong. We'd have to add the extra info for the mapping.

which seems like at that point makes a list not relavent if Spark knows how to do some sort of mapping.

@tgravescs yes the target might be archieved directly with the mapping. But I think that's would be the last choice as the community wants to introduce less invading changes when working across the modules. And that's the reason that @viirya proposed plugin APIs first and we're now discussing the possibility of reusing existing feature - stage level scheduling.
But if we still have to introduce many invading changes (e.g., the mapping) even if reusing the stage level scheduling, I think we should revisit our decision.

it seems like this feature could existing outside creating a new ResourceProfile with the stage level scheduling api's and user should be able to specify this option that would only work with stateStoreRDD. Is it useful outside of that? I don't see how unless we added another plugin point for executor to report back any resource and then come up with some api that it could call to do the mapping of taskId to resourceId reported back to it.

I think it's only used for StateStoreRDD yet. BTW, it seems not possible for end users to specify the resource request by themselves as streaming uses the DataFrame API and StateStoreRDD hides from it.

@viirya
Copy link
Member Author

viirya commented May 25, 2021

Which check are you referring to?

There is assertion that dynamic allocation should be enabled under stage-level scheduling. I mean if we remove such assertion, will it affect normal cases of stage-level scheduling?

@tgravescs yes the target might be archieved directly with the mapping. But I think that's would be the last choice as the community wants to introduce less invading changes when working across the modules. And that's the reason that @viirya proposed plugin APIs first and we're now discussing the possibility of reusing existing feature - stage level scheduling.
But if we still have to introduce many invading changes (e.g., the mapping) even if reusing the stage level scheduling, I think we should revisit our decision.

That's correct as this is one major point behind this API. It is proposed to keep as separate as possible to not affect Spark other than SS code path. Personally I'd refer a separate design. You may know more about how much invading change we need to support the use-case in stage-level scheduling. Let me know if you need to revisit the decision so I can follow up with the direction. I'll be willing to continue on this API or turn to stage-level scheduling with some changes if you think it is better.

@Ngone51
Copy link
Member

Ngone51 commented Jun 3, 2021

(Sorry about the late reply.)

There is assertion that dynamic allocation should be enabled under stage-level scheduling. I mean if we remove such assertion, will it affect normal cases of stage-level scheduling?

Could you post the code snapshot?

--

I’m thinking about how to establish the “mapping” with stage level scheduling. My current idea is:
Add a new type of task location, e.g., StateStoreTaskLocation(host, executorId, StateStoreProviderId) , and let BaseStateStoreRDD.getPreferredLocations returns it in string. Then, the TaskSetManager could establish the “mapping” while building the pending task list:


, and probably using a map (from task index to its StateStoreProviderId) to store the mapping.

@tgravescs @viirya WDYT?

@tgravescs
Copy link
Contributor

But if we still have to introduce many invading changes (e.g., the mapping) even if reusing the stage level scheduling, I think we should revisit our decision.

I agree with this, its a matter of coming up with the right design to solve the problem and possibly others (in the case of plugin). If we discuss alternatives that become to complex we should drop them. But we should have the discussion like we are.

BTW, it seems not possible for end users to specify the resource request by themselves as streaming uses the DataFrame API and StateStoreRDD hides from it.

if user can't specify it themselves with stage level api, you are saying Spark would internally do it for the user?

There is assertion that dynamic allocation should be enabled under stage-level scheduling. I mean if we remove such assertion, will it affect normal cases of stage-level scheduling?

We can relax the requirement if something like this is specified. If we were to add allowing new ResourceProfiles to fit into existing containers that requirement would be relaxed for that also. We just need to make sure its clear to user so jobs don't hang waiting on getting containers they will never get.

I’m thinking about how to establish the “mapping” with stage level scheduling. My current idea is:
Add a new type of task location, e.g., StateStoreTaskLocation(host, executorId, StateStoreProviderId) , and let BaseStateStoreRDD.getPreferredLocations returns it in string. Then, the TaskSetManager could establish the “mapping” while building the pending task list:

so essentially this is extending the locality feature and then the only thing you would need in stage level scheduling api is ability to say use this new locality algorithm for this stage?

@Ngone51
Copy link
Member

Ngone51 commented Jun 4, 2021

if user can't specify it themselves with stage level api, you are saying Spark would internally do it for the user?

Yes. And we can add a new conf for users to control the behavior.

so essentially this is extending the locality feature and then the only thing you would need in stage level scheduling api is ability to say use this new locality algorithm for this stage?

Yes. I'm thinking a bit more: we probably even don't need the stage level scheduling API ability. After knowing the "mapping", we can use it directly in resourcesMeetTaskRequirements. The "mapping" is actually a hard-coded task requirement, and use stage level scheduling API ability to specify that requirement looks redundant and unnecessary.

@viirya
Copy link
Member Author

viirya commented Jun 9, 2021

Could you post the code snapshot?

E.g.,

ResourceProfileManager {
  private[spark] def isSupported(rp: ResourceProfile): Boolean = {
    ...
    val YarnOrK8sNotDynAllocAndNotDefaultProfile =
      isNotDefaultProfile && (isYarn || isK8s) && !dynamicEnabled  // <= Remove it?
    ...
}

Add a new type of task location, e.g., StateStoreTaskLocation(host, executorId, StateStoreProviderId) , and let BaseStateStoreRDD.getPreferredLocations returns it in string. Then, the TaskSetManager could establish the “mapping” while building the pending task list:

Isn't it the mapping still executor id <-> statestore? Executor id could be changed due to executor lost. More robust mapping, e.g. for our use-case, might be PVC id <-> statestore.

I agree with this, its a matter of coming up with the right design to solve the problem and possibly others (in the case of plugin). If we discuss alternatives that become to complex we should drop them. But we should have the discussion like we are.

Yes, agree. Appreciate for the discussion.

@Ngone51
Copy link
Member

Ngone51 commented Jun 9, 2021

Isn't it the mapping still executor id <-> statestore? Executor id could be changed due to executor lost. More robust mapping, e.g. for our use-case, might be PVC id <-> statestore.

The mapping between executor id and statestore is necessarily needed. And it can be achieved by the existing framework - ExecutorData.resourcesInfo, so it won't be a problem. But the "mapping" between tasks and the specific resources (statestore in this case) is a new requirement that we have to add.

As mentioned above, we probably even don't need the stage level scheduling API ability if we follow the StateStoreTaskLocation solution. And, then, the isSupport checking also won't be a problem.

@Ngone51
Copy link
Member

Ngone51 commented Jun 9, 2021

@tgravescs what's your opinion on the StateStoreTaskLocation proposal?

@viirya
Copy link
Member Author

viirya commented Jun 9, 2021

But the "mapping" between tasks and the specific resources (statestore in this case) is a new requirement that we have to add.

Hm? We don't need mapping between tasks <-> statestore. Do you mean PVC id <-> task (i.e. statestore)?

@viirya
Copy link
Member Author

viirya commented Jun 9, 2021

Yes. I'm thinking a bit more: we probably even don't need the stage level scheduling API ability. After knowing the "mapping", we can use it directly in resourcesMeetTaskRequirements. The "mapping" is actually a hard-coded task requirement, and use stage level scheduling API ability to specify that requirement looks redundant and unnecessary.

Sounds making sense. So let me rephrase it, and correct me if I misunderstand it.

Basically, we introduce new task location StateStoreTaskLocation. The RDDs using statestore return this kind of task location as preferred locations.

When TaskSetManager builds the pending task list, it could establish a mapping from the locations. The mapping could be between specific resources (e.g. PVC) and task (i.e. state store). resourcesMeetTaskRequirements directly uses the mapping to schedule tasks.

@viirya
Copy link
Member Author

viirya commented Jun 10, 2021

BTW, StateStoreTaskLocation is not an accurate name for the purpose. Actually we cannot schedule a task to specific location of statestore. Task-statestore relation is fixed at the beginning. Maybe ResourceLocation? It means the task prefers a location with specific resource (e.g. PVC).

@Ngone51
Copy link
Member

Ngone51 commented Jun 10, 2021

The mapping could be between specific resources (e.g. PVC) and task (i.e. state store).

Your rephrase looks good except for one point here. "task (i.e. state store)"? You mean task is kind of a type of state store? is it a typo? I actually expect that it's a mapping between PVC and task Id.

Actually we cannot schedule a task to specific location of statestore.

I don't understand this. I assume each statestore must be bound to a specific location. Why we can't schedule the task?

Maybe ResourceLocation? It means the task prefers a location with specific resource (e.g. PVC).

ResourceLocation sounds too general. Mabye, RequiredResourceLocation?

@viirya
Copy link
Member Author

viirya commented Jun 10, 2021

Your rephrase looks good except for one point here. "task (i.e. state store)"? You mean task is kind of a type of state store? is it a typo? I actually expect that it's a mapping between PVC and task Id.

A specific statestore is bound to a task, e.g. task 0 is bound with state store 0. This cannot be changed. But where to schedule the task, could be changed generally. This is current situation for HDFS-backed statestore. In other word, task-statestore is moved together if Spark schedule the task to different executor.

So actually the mapping between PVC and task id, also means a (implicit) mapping between PVC and statestore of the task. That is why I add "i.e." there. Sorry for if any confusing.

I don't understand this. I assume each statestore must be bound to a specific location. Why we can't schedule the task?

For current HDFS-backed statestore, the location can be changed. It is when Spark schedules the task with the statestore to new executor.

Once it is changed to different executor, Spark will reload from checkpointed data from HDFS to construct the state store in new executor (location).

For resource-specific case (e.g. PVC), the location is fixed generally, because it is bound to specific resource on the executor. But in case like executor lost and the resource is re-mountable. Spark can schedule the task with the statestore to new executor re-mounted with the resource.

ResourceLocation sounds too general. Mabye, RequiredResourceLocation?

RequiredResourceLocation sounds good to me.

@Ngone51
Copy link
Member

Ngone51 commented Jun 10, 2021

I see.

And to ensure we're on the same page - for the RequiredResourceLocation, how would you provide the PVC info there? IIUC, you want to put the PVC info there, right?

@viirya
Copy link
Member Author

viirya commented Jun 10, 2021

And to ensure we're on the same page - for the RequiredResourceLocation, how would you provide the PVC info there? IIUC, you want to put the PVC info there, right?

Yea, I have not tried yet as we are still in discussion phase. But my idea is to retrieve PVC info from scheduler backend (k8s) when it retrieves executor info. I guess it doesn't return such info now.

So in state store rdd, when preparing preferred locations, it queries scheduler backend (if it is k8s) to get PVC info and fill into RequiredResourceLocation. RequiredResourceLocation should be general enough. So the resource requirement might be a map like resource -> resource id.

At task scheduler side, during scheduling task set, it looks at required resources to meet task requirement in the location.

Sounds okay?

@Ngone51
Copy link
Member

Ngone51 commented Jun 11, 2021

sgtm.

@mridulm
Copy link
Contributor

mridulm commented Jun 11, 2021

I am trying to catch up on this discussion, and it is a very long thread already :-) Thanks for all the discussion !
Can we update the doc @viirya given that there seems to be some consensus developing ?
Based on that, I will revisit comments to understand the rationales better for the various conclusion points.

@tgravescs
Copy link
Contributor

yeah it would be great to have a summary and also please describe exactly the flow when an executor is lost and how everything is updated.

@viirya
Copy link
Member Author

viirya commented Jun 11, 2021

@mridulm @tgravescs Yeah, I will update the doc. Thanks for the discussion!

@viirya
Copy link
Member Author

viirya commented Jun 18, 2021

Note that I just updated the doc.

@viirya
Copy link
Member Author

viirya commented Jun 18, 2021

Let me close this as we have some conclusions.

@viirya viirya closed this Jun 18, 2021
@tgravescs
Copy link
Contributor

is the plan to continue discussion in the doc and jira then?

@viirya
Copy link
Member Author

viirya commented Jun 18, 2021

Yea, we can continue discussion in the jira if needed.

@viirya
Copy link
Member Author

viirya commented Jun 18, 2021

Oh, we can continue the discussion here too if you prefer. I just think that as we are not to review the code here, maybe it is good to close the PR.

@viirya viirya deleted the SPARK-35022 branch December 27, 2023 18:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
10 participants