Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation #19881

Closed
wants to merge 9 commits into from

Conversation

jcuquemelle
Copy link
Contributor

@jcuquemelle jcuquemelle commented Dec 4, 2017

What changes were proposed in this pull request?

By default, the dynamic allocation will request enough executors to maximize the
parallelism according to the number of tasks to process. While this minimizes the
latency of the job, with small tasks this setting can waste a lot of resources due to
executor allocation overhead, as some executor might not even do any work.
This setting allows to set a ratio that will be used to reduce the number of
target executors w.r.t. full parallelism.

The number of executors computed with this setting is still fenced by
spark.dynamicAllocation.maxExecutors and spark.dynamicAllocation.minExecutors

How was this patch tested?

Units tests and runs on various actual workloads on a Yarn Cluster

@srowen
Copy link
Member

srowen commented Dec 4, 2017

Please see JIRA. I don't think this is worth doing.

conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)

private val tasksPerExecutorSlot = conf.getInt("spark.dynamicAllocation.tasksPerExecutorSlot", 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should change the name of this config because spark doesn't have the concept of slots and I think it could be confusing to the users who might expect exactly x tasks to be processed on each executor. I am thinking more along the lines of spark.dynamicAllocation.maxExecutorsPerStageDivisor=max # of executors based on # of tasks required for that stage divided by this number. I'm open to other config names here though.

I think we would also need to define its interaction with spark.dynamicAllocation.maxExecutors as well as how it works as # of running/to be run tasks changes.

conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)

private val tasksPerExecutorSlot = conf.getInt("spark.dynamicAllocation.tasksPerExecutorSlot", 1)

private val tasksPerExecutor = tasksPerExecutorSlot * taskSlotPerExecutor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we aren't using concept of slots, I think we should leave the tasksPerExecutor alone and put this functionality into maxNumExecutorsNeeded()

@tgravescs
Copy link
Contributor

ping @jcuquemelle can you update this?

@jcuquemelle
Copy link
Contributor Author

Sorry, I didn't see the ping, I will have a look shortly.

@jcuquemelle jcuquemelle force-pushed the AddTaskPerExecutorSlot branch 2 times, most recently from 2abd46f to 56c3f43 Compare March 12, 2018 09:30
@jcuquemelle
Copy link
Contributor Author

The new semantics (throttling w.r.t max possible parallelism) is actually simpler to understand. I'm proposing another name which doesn't have any ambiguity with the existing maxExecutors param, but I'm open to any other name proposal.

@tgravescs
Copy link
Contributor

jenkins, test this please

@SparkQA
Copy link

SparkQA commented Mar 12, 2018

Test build #88180 has finished for PR 19881 at commit 56c3f43.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

@jcuquemelle please fix the style

<td>
By default, the dynamic allocation will request enough executors to maximize the
parallelism according to the number of tasks to process. While this minimizes the
latency of the job, with small tasks this setting wastes a lot of resources due to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can waste.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

executor allocation overhead, as some executor might not even do any work.
This setting allows to set a divisor that will be used to reduce the number of
executors w.r.t. full parallelism
Defaults to 1.0
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should define that maxExecutors trumps this setting.

If I have 10000 tasks, divisor 2, I would expect 5000 executors, but if max executors is 1000, that is all I get.

we should add a test for this interaction as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

latency of the job, with small tasks this setting wastes a lot of resources due to
executor allocation overhead, as some executor might not even do any work.
This setting allows to set a divisor that will be used to reduce the number of
executors w.r.t. full parallelism
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add period at end of parallelism

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -1795,6 +1796,19 @@ Apart from these, the following properties are also available, and may be useful
Lower bound for the number of executors if dynamic allocation is enabled.
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.fullParallelismDivisor</code></td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming configs is really hard and lots of different opinions on it and in the end someone is going to be confused, I need to think about this some more. I see the reason to use Parallelism here rather then maxExecutors (maxExecutorsDivisor - could be confusing if people think it applies to the maxExecutors config), but I also think parallelism would be confused with the parallelism in the spark.default.parallelism, its not defining number of tasks but number of executors to allocate based on the parallelism. Another one I thought of is executorAllocationDivisor. I'll think about it some more and get back.

Copy link
Contributor Author

@jcuquemelle jcuquemelle Mar 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about something like fullAllocationDivisor ? or fullExecutorAllocationDivisor ? I think the naming should reflect the fact that it is a divisor w.r.t. the full possible parallelism/number of executors

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry didn't get back to this earlier, I think fullExecutorAllocationDivisor would be fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@felixcheung felixcheung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you update the PR title and description to fit the new approach?

@jcuquemelle jcuquemelle changed the title [SPARK-22683][CORE] Add tasksPerExecutorSlot parameter [SPARK-22683][CORE] Add a fullExecutorAllocationDivisor parameter to throttle the parallelism of the dynamic allocation Mar 21, 2018
@jcuquemelle
Copy link
Contributor Author

@felixcheung: updated PR title and description

@tgravescs
Copy link
Contributor

jenkins, test this please

@tgravescs
Copy link
Contributor

jenkins, ok to test

@SparkQA
Copy link

SparkQA commented Mar 21, 2018

Test build #88472 has finished for PR 19881 at commit a40d160.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

var manager = sc.executorAllocationManager.get
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 20)))
for (i <- 0 to 5) {
addExecutors(manager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this loop isn't really needed right? All we are checking is the target not the number to add?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to check the capping by max / min executors, we need to actually try and add executors. The max /min capping does not occur during the computation of the target number of exes, but at the time they are added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@tgravescs
Copy link
Contributor

just minor comment about the test otherwise looks good.

@@ -116,9 +120,12 @@ private[spark] class ExecutorAllocationManager(
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutor =
private val tasksPerExecutorForFullParallelism =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need this variable now, can we just remove it?

Copy link
Contributor Author

@jcuquemelle jcuquemelle Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is used at 2 places, one to validate arguments and the other to actually compute the target number of executors. If I remove this variable, I will need to either store spark.executor.cores and spark.task.cpus instead, or to fetch them each time we do a validation or a computation of target nbExecutors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987, do you agree with my comment, or do you still want me to remove the variable ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally thinking we may avoid introducing the concept tasksPerExecutorForFullParallelism, but rather only have executorCores and taskCPUs, but I don't have a strong opinion over that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not exposed, it is merely a more precise description of the actual computation. I just wanted to state more clearly that the existing default behavior is maximizing the parallelism

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@tgravescs
Copy link
Contributor

+1

@jiangxb1987
Copy link
Contributor

cc @rxin

@tgravescs
Copy link
Contributor

I'll leave this a bit longer but then I'm going to merge it later today

@rxin
Copy link
Contributor

rxin commented Mar 28, 2018 via email

@tgravescs
Copy link
Contributor

Yes we can wait another day or so if you are looking at it, this discussion has been going on for a long time now though, if you have a better name suggestion let us know. No other configs have "divisor" suffix.s

@jcuquemelle
Copy link
Contributor Author

@rxin , can we merge this PR ?

@rxin
Copy link
Contributor

rxin commented Apr 6, 2018

Maybe instead of "divisor", we just have a "rate" or "factor" that can be floating point value, and use multiplication rather than division? This way people can also make it even more aggressive.

@jcuquemelle
Copy link
Contributor Author

jcuquemelle commented Apr 6, 2018

@rxin : more aggressive must be forbidden, because the setting of 1.0 gives enough executors so that if the executor provisioning was perfect (e.g. all executors were available at the same time) and the mapping of tasks to executors was optimal, each executor core (or taskSlot as in the original naming) would process exactly one task. If you ask for more executors, you're sure they will be wasted.
Which is why it felt natural to have a divisor semantics, because it implies the parameter can only be used to reduce parallelism.
How about fullParallelismThrottlingRate ?

conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)

private val fullExecutorAllocationDivisor =
conf.getDouble("spark.dynamicAllocation.fullExecutorAllocationDivisor", 1.0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forgot about this earlier but this should really be a config similar to DYN_ALLOCATION_MIN_EXECUTORS

@tgravescs
Copy link
Contributor

@rxin I assume you are just trying to not use divisor since its not used anywhere else? As @jcuquemelle state I don't see a use case for this to be made more aggressive if you have one please let us know, but otherwise it just wastes resources.

Personally I still like divisor because that is what you are doing. I don't think because its not in any other configs is a good reason to not use it. Looking at I don't see any public configs that have factor in the name of them either. I am not fond of rate because its not a rate (ie how quickly/slowly you are allocating), its a limit on max number of executors.

I also think its more natural for people to think of this as a divisor vs a multiplier. if I want 1/2 of the executors you divide by 2. I think we should name it based on what is most likely understood by the end user.

@rxin
Copy link
Contributor

rxin commented Apr 9, 2018

SGTM on divisor.

Do we need "full" there in the config?

@tgravescs
Copy link
Contributor

No we don't strictly need it in the name, the reasoning behind it was to indicate that this was a divisor based on if you have fully allocated executors for all the tasks and were running full parallelism.
Are you suggesting just use spark.dynamicAllocation.executorAllocationDivisor? other ones thrown are were like maxExecutorAllocationDivisor. One thing we were trying to keep from doing is confusing it with the maxExecutors config as well. Opinions?

@rxin
Copy link
Contributor

rxin commented Apr 10, 2018

I thought about this more, and I actually think something like this makes more sense: executorAllocationRatio. Basically it is just a ratio that determines how aggressive we want Spark to request full executors. Ratio of 1.0 means fill up everything. Ratio of 0.5 means only request half of the executors.

What do you think?

@tgravescs
Copy link
Contributor

I'm fine with that

@jcuquemelle
Copy link
Contributor Author

Ok, will quickly do the change
thanks for the proposals

@rxin
Copy link
Contributor

rxin commented Apr 17, 2018

Thanks @jcuquemelle

@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89467 has finished for PR 19881 at commit 15732ab.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jcuquemelle jcuquemelle changed the title [SPARK-22683][CORE] Add a fullExecutorAllocationDivisor parameter to throttle the parallelism of the dynamic allocation [SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation Apr 18, 2018
@@ -26,7 +26,10 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.internal.config.{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just make this import org.apache.spark.internal.config._

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -1751,6 +1751,7 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.dynamicAllocation.minExecutors</code>,
<code>spark.dynamicAllocation.maxExecutors</code>, and
<code>spark.dynamicAllocation.initialExecutors</code>
<code>spark.dynamicAllocation.fullExecutorAllocationDivisor</code>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs changed to executorAllocationRatio

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, missed that one, sorry :-)

…executors

let's say an executor has spark.executor.cores / spark.task.cpus taskSlots

The current dynamic allocation policy allocates enough executors
to have each taskSlot execute a single task, which wastes resources when
 tasks are small regarding executor allocation overhead. By adding the
tasksPerExecutorSlot, it is made possible to specify how many tasks
a single slot should ideally execute to mitigate the overhead of executor
allocation.
This allows for a different semantic, which yields a simpler explanation
and allows considering this parameter as a double for a finer control

Utests have been updated to actually test the number of executors and have
been refactored
@SparkQA
Copy link

SparkQA commented Apr 23, 2018

Test build #89712 has finished for PR 19881 at commit 3b1dddc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

+1

@asfgit asfgit closed this in 55c4ca8 Apr 24, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants