New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-39853][CORE] Support stage level task resource profile for standalone cluster when dynamic allocation disabled #37268
Conversation
Can one of the admins verify this patch? |
cc @Ngone51 , could you please review this PR? Thanks. |
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
Outdated
Show resolved
Hide resolved
cc @Ngone51 , code refactored(add a new |
/** | ||
* Target executor's resource profile id, used for schedule. | ||
*/ | ||
override def targetExecutorRpId: Int = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should override _id
instead so that ResourceProfile.getNextProfileId
doesn't increase for task resource profile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure about this. Since TaskResourceProfile
is also a special ResourceProfile
and _id
is to identify different ResourceProfile
.
@@ -388,14 +388,16 @@ private[spark] class TaskSchedulerImpl( | |||
val execId = shuffledOffers(i).executorId | |||
val host = shuffledOffers(i).host | |||
val taskSetRpID = taskSet.taskSet.resourceProfileId | |||
val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID) | |||
val targetExecutorRpID = prof.targetExecutorRpId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think once we do https://github.com/apache/spark/pull/37268/files#r934220131, we'd no longer need this.
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
We should also update
or we could also extend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we please add more of a high level design/overview here?
Why is this only being added for standalone mode?
Also I assume the intent here is to reuse executors since no dynamic allocation - please see #33941
/** | ||
* Target executor's resource profile id, used for schedule. | ||
*/ | ||
def targetExecutorRpId: Int = id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand what this is and how is it different then the id? This needs more explanation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
id
is an identifier for a ResourceProfile
, the targetExecutorRpId
describes which executors shall TaskScheduler
assign tasks. Before this PR, TaskScheduler
will do exactly id matching between task rp id and executor rp id, so id
should be enough.
But if we want to share/reuse executors, TaskScheduler
will need to schedule tasks to different/compatible executors, so task's RP Id and executor's RP Id may be not same in that way. The targetExecutorRpId
here try to describe which executors to assign for tasks with ResourceProfile
, it could be different with id
.
Pretty like compatible resource profile id in concept.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this assume resource profile has to know there is a taskResourceProfile. This also seems to limit you to one executor resource profile that is compatible? Why can't it match on any number of compatible executors. I guess with the current implementation you limit it to dynamic allocation off so that is why you can but I don't want to make API changes based on that limit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't think we have the actual task RP id and executor's RP id for now but they're all resource profile id still.
Should override _id
as _id=DEFAULT_RESOURCE_PROFILE_ID
in TaskResourceProfile
help get rid of this API?
(In this way, the task still limits one executor resource profile. But it doesn't mean dynamic allocation has to be turned off, right? )
If we really want to achieve "match on any number of compatible executors" (which I think is a good direction), I think we need to separate the resource profile id into task resource requirement id and executor resource requirement id completely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while that sounds like an interesting workaround, the id if public, so if I create a task resource profile that has the same id as another one, that seems odd right? Also I think that would break the UI, or at least make it funny without other changes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if we pass ResourceProfile.taskResources
to TaskSet and pass
ResourceProfile.executorResourcesto
WorkerOffer` directly? It seems like we no longer depend on resource profile id for scheduling in this way.. cc @ivoson @tgravescs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Ngone51
I think that's pretty much the idea of sharing/re-use executors with the policy: sharing any executors which can fulfill task's resource requests. The problem may be that do we want to let users to specify re-use policy.
How about we narrow down the scenario here in this PR, schedule tasks with TaskResourceProfile
to executors with DEFAULT_RESOURCE_PROFILE
directly when dynamic allocation off(without checking rpId).
Even thought the idea is pretty much like reuse compatible executors, it's much simpler in this case. And we can still leave SPARK-36699 to handle the API change, we don't introduce the API change in this PR.
What do you think? @tgravescs @Ngone51
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine narrowing case here but I don't want to have public api affected. so if you just mean hardcode scheduler or something that might be fine. We still need to make sure if user passes TaskResourceProfile we error out if not this specific case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can limit the use case of TaskResourceProfile
. I'll make the change and update the PR soon. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even thought the idea is pretty much like reuse compatible executors, it's much simpler in this case. And we can still leave SPARK-36699 to handle the API change, we don't introduce the API change in this PR.
SGTM! Thanks @ivoson
Hi @tgravescs , thanks for your feedback. You are right, the idea here is to reuse executors with This PR introduces a new special
|
so I would like to see the issue or this PR description have much more details about design, API, and its behavior.
This is not true, this is very much user impacting API. Also the issue I linked to talked about the policy for reusing executors. What is the proposal for that here? Docs will need to update to explain new API and behavior. |
b97e384
to
835638f
Compare
Thanks @tgravescs , will update the doc and then we can discuss. |
Updated the PR's description with more details, please take a look and share your thoughts on this. cc @tgravescs @Ngone51 Proposed new API changes in the description based on previous comments. Will work on the code change after we come to an agreement. Thanks. |
thanks for adding more details.
It seems a little bit odd to ask the ResourceProfile to give you compatible other ResourceProfiles. This feels like it should be in the ResourceProfileManager which knows about all the ResourceProfiles. I guess that is why you pass in the ResourceProfileManager here? Is the intention the user could explicitly set which ResourceProfiles its compatible with? If so I definitely would want a way to not have to specify it. The other issue raised that wasn't addressed was the reuse policy. I guess in this case we are limiting the executor profile to 1 because we don't have dynamic allocation so one could argue that if you use task resource request with that you know what you get. Which I am fine with but we need to be clear that it might very well waste resources. Also if the intent is to not support TaskResourceProfile with dynamic allocation, I think we should throw an exception if anyone uses it with the dynamic allocation config on. |
Yes, exactly. I put the
As mentioned above, in this case we will only have Thanks @tgravescs for your feedback. Does this behavior change make sense? |
Yeah so to me I think it makes more sense to put into the ResourceProfileManager API to say find me compatible ones but at the same time now that I said it, we may want the user to be able to specify something via the ResourceProfile. That is where like the reuse policy could be specified for instance, or if user really wanted to limit it to exactly one executor resource profile that might be nice. need to think about it a bit more. |
* normal [[ResourceProfile]] | ||
* @return This ResourceProfileBuilder | ||
*/ | ||
def taskOnly(isTaskResourceProfile: Boolean = true): this.type = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we still need isTaskResourceProfile
since the function is already named taskOnly()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgravescs This introduces the new API but doesn't affect the existing ones. Are you good with it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we still need
isTaskResourceProfile
since the function is already namedtaskOnly()
?
@Ngone51 Just want to provide a method that user can override the property isTaskResourceProfile
to reuse the resource profile builder to create normal ResourceProfile
, since there are existing methods to override/cleanup resource requests: clearExecutorResourceRequests
Do you think is it necessary? or any other suggestions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's necessary. To me, the name taskOnly()
implies isTaskResourceProfile=true
. So setting isTaskResourceProfile=false
is very wired.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. Removed the parameter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user hasn't specified executor requirements it can just be task only requirements.
Sounds good to me. And we can make TaskResourceProfile
internally for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With dynamic allocation enabled, user can still create a ResourceProfile without specifying executor requirements, and request new executors for this rp.
Is the cluster manager able to launch the new executor If there're no executor requirements (e.g., no memory, no CPU) specified? @ivoson
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this PR, we add a new TaskResourceProfile, which is limited to some scenarios like dynamic allocation off and it will reuse executors. So that we may want user to explicitly specify it's a TaskResourceProfile to avoid misunderstanding.
Why does this the user have to explicitly set it.. isn't it implicit if they don't specify executor resources and they call build()? I guess you could argue you could perhaps give them a better error message if that isn't really what they intended, but that likely still becomes an issue with dynamic allocation support. I'm fine with TaskResourceProfile being public as a shortcut after building, especially if it makes the schedule code cleaner to check instance type.
The only difference I can see between this and with dynamic allocation is with dynamic allocation if you don't specify the executor resources it defaults to use the initial resources but it gets a new executor based on the current implementation behavior. if we support reuse executors with dynamic allocation that restriction goes away and I would expect it to find any executor that would fit - or like the linked issue we would specify some sort of reuse policy that user could indicate which type of executors to reuse. Is there some other use case you had in mind?
Please don't force push the code, it makes reviewing what changed much harder.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please don't force push the code, it makes reviewing what changed much harder.
Thanks. Will avoid force push in future.
but that likely still becomes an issue with dynamic allocation support.
Yes, I agree. My previous concern is that we try to limit TaskResourceProfile
to dynamic allocation off, exception will be thrown if user build a TaskResourceProfile
when dynamic allocation enabled. This will introduce behavior change.
If we don't have the limit, then we need to think about the policy for scheduling tasks with TaskResourceProfile
when dynamic allocation is enabled.
cc @Ngone51 @tgravescs What do you think? Shall we support dynamic allocation as well here?
Co-authored-by: wuyi <yi.wu@databricks.com>
…scala Co-authored-by: wuyi <yi.wu@databricks.com>
…scala Co-authored-by: wuyi <yi.wu@databricks.com>
Gentle ping @tgravescs @Ngone51 . Could you please help to review this PR when you have time? Thanks. |
sorry , might be next Monday before I get a chance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly looks good. We need to update the docs like:
https://github.com/apache/spark/blob/master/docs/configuration.md#stage-level-scheduling-overview
It says "the current implementation acquires new executors for each ResourceProfile created"
Also should update https://github.com/apache/spark/blob/master/docs/spark-standalone.md#stage-level-scheduling-overview
Many thanks @tgravescs . Last commit updated the docs. Please help to review. Thanks. @tgravescs @Ngone51 |
// exception or in a real application. Otherwise in all other testing scenarios we want | ||
// to skip throwing the exception so that we can test in other modes to make testing easier. | ||
if ((notRunningUnitTests || testExceptionThrown) && | ||
if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean TaskResourceProfile
can be used when the dynamic is enabled? And in that case, TaskResourceProfile
seems to never meet the requirement (i.e. taskRpId == executorRpId
) in canBeScheduled()
(except for the 1st created TaskResourceProfile
) , right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @Ngone51, thanks for the feedback, and for your concerns:
TaskResourceProfile
can be used when dynamic allocation is enabled.- When dynamic allocation is enabled,
TaskResourceProfile
will be treated as a normalResourceProfile
with no specific executor resource requirements, and dynamic allocation manager will also request executors forTaskResourceProfile
, so we'll have executors matching the same resource profile id. The behavior will be the same with what we have in master branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it can be used with dynamic allocation, in that case it uses the default resource profile executor resources but it must acquire new executors. The TaskResourceProfile gets a unique rpid just like standard resource profile and it should go through the same path to get executors via dynamic allocation like a normal ResourceProfile (ie stage submitted kicks off). Is there something I'm not thinking about here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...default resource profile executor resources but it must acquire new executors.
So it should be the default resource profile executor resources but not the default rp id? Then, it makes sense to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, when dynamic allocation is enabled, it is just like a normal resource profile with a unique id, requesting executors based on default executor resources requirement.
* when dynamic allocation is disabled, tasks will be scheduled to executors with default resource | ||
* profile based on task resources described by this task resource profile. | ||
* And when dynamic allocation is enabled, will require new executors for this profile base on | ||
* default build-in executor resources and assign tasks by resource profile id. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should say something like: based on the default executor resources requested at startup and assign tasks only on executors created with this resource profile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, done.
* with executorRpId. | ||
* | ||
* Here are the rules: | ||
* 1. Tasks with [[TaskResourceProfile]] can be scheduled to executors with |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this makes it sounds like that is the only time TaskResourceProfile used, perhaps put dynamic allocation disabled and tasks with TaskResourceProfile.... or possibly add another rule that has dynamic allocation enabled and TaskResourceProfile...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Changing the wording in the latest commit.
// exception or in a real application. Otherwise in all other testing scenarios we want | ||
// to skip throwing the exception so that we can test in other modes to make testing easier. | ||
if ((notRunningUnitTests || testExceptionThrown) && | ||
if (rp.isInstanceOf[TaskResourceProfile] && !dynamicEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes it can be used with dynamic allocation, in that case it uses the default resource profile executor resources but it must acquire new executors. The TaskResourceProfile gets a unique rpid just like standard resource profile and it should go through the same path to get executors via dynamic allocation like a normal ResourceProfile (ie stage submitted kicks off). Is there something I'm not thinking about here?
Co-authored-by: Thomas Graves <tgraves@apache.org>
@tgravescs do you have any other concerns? |
Thanks, merged to master! |
Thanks @tgravescs and @Ngone51 |
…n cluster when dynamic allocation disabled ### What changes were proposed in this pull request? This PR is a follow-up of #37268 which supports stage level task resource profile for standalone cluster when dynamic allocation disabled. This PR enables stage-level task resource profile for yarn cluster. ### Why are the changes needed? Users who work on spark ML/DL cases running on Yarn would expect stage-level task resource profile feature. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The current tests of #37268 can also cover this PR since both yarn and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover yarn cluster. Apart from that, I also performed some manual tests which have been updated in the comments. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43030 from wbo4958/yarn-task-resoure-profile. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…n cluster when dynamic allocation disabled ### What changes were proposed in this pull request? This PR is a follow-up of #37268 which supports stage level task resource profile for standalone cluster when dynamic allocation disabled. This PR enables stage-level task resource profile for yarn cluster. ### Why are the changes needed? Users who work on spark ML/DL cases running on Yarn would expect stage-level task resource profile feature. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The current tests of #37268 can also cover this PR since both yarn and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover yarn cluster. Apart from that, I also performed some manual tests which have been updated in the comments. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43030 from wbo4958/yarn-task-resoure-profile. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 5b80639) Signed-off-by: Thomas Graves <tgraves@apache.org>
… cluster when dynamic allocation disabled ### What changes were proposed in this pull request? This PR is a follow-up of #37268 which supports stage-level task resource profile for standalone cluster when dynamic allocation is disabled. This PR enables stage-level task resource profile for the Kubernetes cluster. ### Why are the changes needed? Users who work on spark ML/DL cases running on Kubernetes would expect stage-level task resource profile feature. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The current tests of #37268 can also cover this PR since both Kubernetes and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover the Kubernetes cluster. Apart from that, I also performed some manual tests which have been updated in the comments. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43323 from wbo4958/k8s-stage-level. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Thomas Graves <tgraves@apache.org> (cherry picked from commit 632eabd) Signed-off-by: Thomas Graves <tgraves@apache.org>
… cluster when dynamic allocation disabled ### What changes were proposed in this pull request? This PR is a follow-up of #37268 which supports stage-level task resource profile for standalone cluster when dynamic allocation is disabled. This PR enables stage-level task resource profile for the Kubernetes cluster. ### Why are the changes needed? Users who work on spark ML/DL cases running on Kubernetes would expect stage-level task resource profile feature. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? The current tests of #37268 can also cover this PR since both Kubernetes and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover the Kubernetes cluster. Apart from that, I also performed some manual tests which have been updated in the comments. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43323 from wbo4958/k8s-stage-level. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
What changes were proposed in this pull request?
Currently stage level scheduling works for yarn/k8s/standalone cluster when dynamic allocation is enabled, and spark app will acquire executors with different resource profiles and assign tasks to executors with the same resource profile id.
This PR proposed to add stage level scheduling when dynamic allocation is off. In this case, spark app will only have executors with default resource profiles, but different
Stages
can still customize their task resource requests which should be compatible with default resource profile executor resources. And all theseStages
with different task resource requests will reuse/share the same set of executors with default resource profile.And this PR proposed to:
ResourceProfile
:TaskResourceProfile
, it can be used to describe different task resource requests when dynamic allocation is off. And tasks bind to thisTaskResourceProfile
will reuse executors with default resource profile.Exception
should be thrown if executors with default resource profile can not fulfill the task resource requests.DADScheduler
andTaskScheduler
will schedule tasks with customizedResourceProfile
based on resource profile type and resource profile Id, taskSets withTaskResourceProfile
can be scheduled to executors withDEFAULT_RESOURCE_PROFILE_ID
and other taskSets can be scheduled to executors with exactly same resource profile id.Why are the changes needed?
When dynamic allocation is disabled, we can also leverage stage level schedule to customize task resource requests for different stages.
Does this PR introduce any user-facing change?
Spark users can specify
TaskResourceProfile
to customize task resource requests for different stages when dynamic allocation is off.How was this patch tested?
New UTs added.