Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22683][CORE] Add a executorAllocationRatio parameter to throttle the parallelism of the dynamic allocation #19881

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.{ControlThrowable, NonFatal}
import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.{DYN_ALLOCATION_MAX_EXECUTORS, DYN_ALLOCATION_MIN_EXECUTORS}
import org.apache.spark.internal.config._
import org.apache.spark.metrics.source.Source
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMaster
Expand Down Expand Up @@ -69,6 +69,10 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
* spark.dynamicAllocation.initialExecutors - Number of executors to start with
*
* spark.dynamicAllocation.executorAllocationRatio -
* This is used to reduce the parallelism of the dynamic allocation that can waste
* resources when tasks are small
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
* If there are backlogged tasks for this duration, add new executors
*
Expand Down Expand Up @@ -116,9 +120,12 @@ private[spark] class ExecutorAllocationManager(
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutor =
private val tasksPerExecutorForFullParallelism =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need this variable now, can we just remove it?

Copy link
Contributor Author

@jcuquemelle jcuquemelle Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is used at 2 places, one to validate arguments and the other to actually compute the target number of executors. If I remove this variable, I will need to either store spark.executor.cores and spark.task.cpus instead, or to fetch them each time we do a validation or a computation of target nbExecutors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jiangxb1987, do you agree with my comment, or do you still want me to remove the variable ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was originally thinking we may avoid introducing the concept tasksPerExecutorForFullParallelism, but rather only have executorCores and taskCPUs, but I don't have a strong opinion over that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not exposed, it is merely a more precise description of the actual computation. I just wanted to state more clearly that the existing default behavior is maximizing the parallelism

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)

private val executorAllocationRatio =
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)

validateSettings()

// Number of executors to add in the next round
Expand Down Expand Up @@ -209,8 +216,13 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutor == 0) {
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.")
if (tasksPerExecutorForFullParallelism == 0) {
throw new SparkException("spark.executor.cores must not be < spark.task.cpus.")
}

if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
throw new SparkException(
"spark.dynamicAllocation.executorAllocationRatio must be > 0 and <= 1.0")
}
}

Expand Down Expand Up @@ -273,7 +285,9 @@ private[spark] class ExecutorAllocationManager(
*/
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
(numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutorForFullParallelism)
.toInt
}

private def totalRunningTasks(): Int = synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ package object config {
private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)

private[spark] val DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO =
ConfigBuilder("spark.dynamicAllocation.executorAllocationRatio")
.doubleConf.createWithDefault(1.0)

private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,39 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsToAdd(manager) === 1)
}

def testAllocationRatio(cores: Int, divisor: Double, expected: Int): Unit = {
val conf = new SparkConf()
.setMaster("myDummyLocalExternalClusterManager")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
.set("spark.dynamicAllocation.maxExecutors", "15")
.set("spark.dynamicAllocation.minExecutors", "3")
.set("spark.dynamicAllocation.executorAllocationRatio", divisor.toString)
.set("spark.executor.cores", cores.toString)
val sc = new SparkContext(conf)
contexts += sc
var manager = sc.executorAllocationManager.get
post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 20)))
for (i <- 0 to 5) {
addExecutors(manager)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this loop isn't really needed right? All we are checking is the target not the number to add?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to check the capping by max / min executors, we need to actually try and add executors. The max /min capping does not occur during the computation of the target number of exes, but at the time they are added

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
assert(numExecutorsTarget(manager) === expected)
sc.stop()
}

test("executionAllocationRatio is correctly handled") {
testAllocationRatio(1, 0.5, 10)
testAllocationRatio(1, 1.0/3.0, 7)
testAllocationRatio(2, 1.0/3.0, 4)
testAllocationRatio(1, 0.385, 8)

// max/min executors capping
testAllocationRatio(1, 1.0, 15) // should be 20 but capped by max
testAllocationRatio(4, 1.0/3.0, 3) // should be 2 but elevated by min
}


test("add executors capped by num pending tasks") {
sc = createSparkContext(0, 10, 0)
val manager = sc.executorAllocationManager.get
Expand Down
18 changes: 18 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,7 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.dynamicAllocation.minExecutors</code>,
<code>spark.dynamicAllocation.maxExecutors</code>, and
<code>spark.dynamicAllocation.initialExecutors</code>
<code>spark.dynamicAllocation.executorAllocationRatio</code>
</td>
</tr>
<tr>
Expand Down Expand Up @@ -1797,6 +1798,23 @@ Apart from these, the following properties are also available, and may be useful
Lower bound for the number of executors if dynamic allocation is enabled.
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.executorAllocationRatio</code></td>
<td>1</td>
<td>
By default, the dynamic allocation will request enough executors to maximize the
parallelism according to the number of tasks to process. While this minimizes the
latency of the job, with small tasks this setting can waste a lot of resources due to
executor allocation overhead, as some executor might not even do any work.
This setting allows to set a ratio that will be used to reduce the number of
executors w.r.t. full parallelism.
Defaults to 1.0 to give maximum parallelism.
0.5 will divide the target number of executors by 2
The target number of executors computed by the dynamicAllocation can still be overriden
by the <code>spark.dynamicAllocation.minExecutors</code> and
<code>spark.dynamicAllocation.maxExecutors</code> settings
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
<td>1s</td>
Expand Down