Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21225][CORE] Considering CPUS_PER_TASK when allocating task slots for each WorkerOffer #18435

Closed
wants to merge 1 commit into from

Conversation

JackYangzg
Copy link

@JackYangzg JackYangzg commented Jun 27, 2017

JIRA Issue:https://issues.apache.org/jira/browse/SPARK-21225
In the function "resourceOffers", It declare a variable "tasks" for storage the tasks which have allocated a executor. It declared like this:
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
But, I think this code only conside a situation for that one task per core. If the user set "spark.task.cpus" as 2 or 3, It really don't need so much Mem. I think It can motify as follow:
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
to instead.
Motify like this the other earning is that it's more easy to understand the way how the tasks allocate offers.

@@ -345,7 +345,7 @@ private[spark] class TaskSchedulerImpl(

val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](Math.ceil(o.cores*1.0/CPUS_PER_TASK).toInt))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use math.ceil(o.cores.toDouble / CPUS_PER_TASK).toInt
How much difference can this make? you're saving at most about 8 bytes x # cores per offer.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have motify to use math.ceil(o.cores.toDouble / CPUS_PER_TASK).toInt);
and each offer can save o.cores*(1-1/CPUS_PER_TASK)*length(TaskDescription) bytes.

@JackYangzg JackYangzg force-pushed the motifyTaskCoreDisp branch 2 times, most recently from be633c6 to 0074c3b Compare June 27, 2017 12:39
@JackYangzg
Copy link
Author

I have motify to use math.ceil(o.cores.toDouble/CPUS_PER_TASK).toInt);
and each offer can save o.cores*(1-1/CPUS_PER_TASK)*length(TaskDescription) bytes.

@srowen
Copy link
Member

srowen commented Jun 27, 2017

It doesn't save a TaskDescription's size; it saves the size of a reference only. that's why I'm not sure this is worthwhile, but at the same time, doesn't really hurt.

@JackYangzg
Copy link
Author

yes, It's only save a reference.
I have read the related code, and It only affect this code
if (availableCpus(i) >= CPUS_PER_TASK) { try { for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { **tasks(i) += task** val tid = task.taskId taskIdToTaskSetManager(tid) = taskSet taskIdToExecutorId(tid) = execId executorIdToRunningTaskIds(execId).add(tid) availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0) launchedTask = true } }
obvious, it safe

@jerryshao
Copy link
Contributor

jerryshao commented Jun 28, 2017

From my understanding, this looks like a bug here, we didn't consider CPU_PER_TASK configuration. Instead of saving memory, I think this PR is more like fixing a bug here. As for saving memory, yes it does, but I don't think performance will be affected a lot.

@jerryshao
Copy link
Contributor

Besides is it enough to use o.cores / CPUS_PER_TASK? I'm not sure why do we need to use ceil, for example if we have 10 cores in work offer and CPUS_PER_TASK is 3, then 3 slots should be enough? Please correct me if I'm wrong.

@JackYangzg
Copy link
Author

@jerryshao I agree with you point, it enough to use o.cores / CPUS_PER_TASK to instead。like the example you list, the left one core is not enough to allocate for a task.
In fact, to deal with this
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
is easy to understand the way how to allocate offers for the task .
@srowen

@jerryshao
Copy link
Contributor

Can you please change your PR title and description to reflect the real issue here?

@JackYangzg JackYangzg changed the title [SPARK-21225][CORE] decrease the Mem using for variable 'tasks' in fu… [SPARK-21225][CORE] make it easy understand for offering resources for tasks and saving Mem Jun 28, 2017
@JackYangzg
Copy link
Author

@jerryshao Thank you, I have changed.

@SparkQA
Copy link

SparkQA commented Jun 28, 2017

Test build #3816 has finished for PR 18435 at commit b745dab.

  • This patch fails due to an unknown error code, -10.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JackYangzg
Copy link
Author

JackYangzg commented Jun 29, 2017

From the jenkins, The Execption is caused by lost of the file "target/unit-tests.log" ,I think it not caused by this PR and all the tests look like passed. Can you reset the test for me, thanks you @srowen

@jerryshao
Copy link
Contributor

@jiangxb1987 @cloud-fan Can you please review this JIRA? The changes should be safe from my understanding.

Also can we change the title to: Considering CPUS_PER_TASK when allocating task slots for each WorkerOffer. I think that is more clear to reflect the issue here.

@JackYangzg JackYangzg changed the title [SPARK-21225][CORE] make it easy understand for offering resources for tasks and saving Mem [SPARK-21225][CORE] Considering CPUS_PER_TASK when allocating task slots for each WorkerOffer Jun 29, 2017
@JackYangzg
Copy link
Author

@jerryshao Ok

@srowen
Copy link
Member

srowen commented Jun 29, 2017

This is only changing the default size of a collection, so I think it's safe to merge.

@SparkQA
Copy link

SparkQA commented Jun 29, 2017

Test build #3819 has finished for PR 18435 at commit b745dab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

LGTM, merging to master!

@asfgit asfgit closed this in 29bd251 Jun 29, 2017
robert3005 pushed a commit to palantir/spark that referenced this pull request Jun 29, 2017
…ots for each WorkerOffer

JIRA Issue:https://issues.apache.org/jira/browse/SPARK-21225
    In the function "resourceOffers", It declare a variable "tasks" for storage the tasks which have allocated a executor. It declared like this:
`val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))`
    But, I think this code only conside a situation for that one task per core. If the user set "spark.task.cpus" as 2 or 3, It really don't need so much Mem. I think It can motify as follow:
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
 to instead.
    Motify like this the other earning is that it's more easy to understand the way how the tasks allocate offers.

Author: 杨治国10192065 <yang.zhiguo@zte.com.cn>

Closes apache#18435 from JackYangzg/motifyTaskCoreDisp.
@JackYangzg JackYangzg deleted the motifyTaskCoreDisp branch June 30, 2017 03:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants