New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30448][Core] accelerator aware scheduling enforce cores as limiting resource #27138
Conversation
Test build #116322 has finished for PR 27138 at commit
|
@@ -2818,12 +2825,28 @@ object SparkContext extends Logging { | |||
// multiple executor resources. | |||
val resourceNumSlots = Math.floor(execAmount * taskReq.numParts / taskReq.amount).toInt | |||
if (resourceNumSlots < numSlots) { | |||
if (shouldCheckExecCores) { | |||
throw new IllegalArgumentException("The number of slots on an executor has to be " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
only throw this exception when dynamic allocation is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its safer to always require it just in case there are other places in the code that use cores and task cpus to determine slots. I know in doing the stage level sched work there were a bunch of places that did this but I would have to go back thru to see if they were only during dynamic allocation.
Actually one example of this is #27126
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sounds good
@@ -911,7 +927,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu | |||
"""{"name": "gpu","addresses":["0", "1", "2", "3", "4", "5", "6", "7", "8"]}""") | |||
|
|||
val conf = new SparkConf() | |||
.setMaster("local-cluster[3, 3, 1024]") | |||
.setMaster("local-cluster[3, 1, 1024]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC we require 3 cores on each executor in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the test passes with it like this. The requirement looks like its 3 executors, not 3 tasks per executor and it waits for all executors to be up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yes
LGTM |
Test build #116323 has finished for PR 27138 at commit
|
retest this please |
Test build #116327 has finished for PR 27138 at commit
|
merged this, thanks @jiangxb1987 |
What changes were proposed in this pull request?
This PR is to make sure cores is the limiting resource when using accelerator aware scheduling and fix a few issues with SparkContext.checkResourcesPerTask
For the first version of accelerator aware scheduling(SPARK-27495), the SPIP had a condition that we can support dynamic allocation because we were going to have a strict requirement that we don't waste any resources. This means that the number of slots each executor has could be calculated from the number of cores and task cpus just as is done today.
Somewhere along the line of development we relaxed that and only warn when we are wasting resources. This breaks the dynamic allocation logic if the limiting resource is no longer the cores because its using the cores and task cpus to calculate the number of executors it needs. This means we will request less executors then we really need to run everything. We have to enforce that cores is always the limiting resource so we should throw if its not.
The only issue with us enforcing this is on cluster managers (standalone and mesos coarse grained) where we don't know the executor cores up front by default. Meaning the spark.executor.cores config defaults to 1 but when the executor is started by default it gets all the cores of the Worker. So we have to add logic specifically to handle that and we can't enforce this requirements, we can just warn when dynamic allocation is enabled for those.
Why are the changes needed?
Bug in dynamic allocation if cores is not limiting resource and warnings not correct.
Does this PR introduce any user-facing change?
no
How was this patch tested?
Unit test added and manually tested the confiditions on local mode, local cluster mode, standalone mode, and yarn.