Skip to content

Commit

Permalink
SPARK-4585. Spark dynamic executor allocation should use minExecutors…
Browse files Browse the repository at this point in the history
… as...

... initial number

Author: Sandy Ryza <sandy@cloudera.com>

Closes #4051 from sryza/sandy-spark-4585 and squashes the following commits:

d1dd039 [Sandy Ryza] Add spark.dynamicAllocation.initialNumExecutors and make min and max not required
b7c59dc [Sandy Ryza] SPARK-4585. Spark dynamic executor allocation should use minExecutors as initial number
  • Loading branch information
sryza authored and Andrew Or committed Feb 2, 2015
1 parent c081b21 commit b2047b5
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ import org.apache.spark.scheduler._
* spark.dynamicAllocation.enabled - Whether this feature is enabled
* spark.dynamicAllocation.minExecutors - Lower bound on the number of executors
* spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors
* spark.dynamicAllocation.initialExecutors - Number of executors to start with
*
* spark.dynamicAllocation.schedulerBacklogTimeout (M) -
* If there are backlogged tasks for this duration, add new executors
Expand All @@ -70,9 +71,10 @@ private[spark] class ExecutorAllocationManager(

import ExecutorAllocationManager._

// Lower and upper bounds on the number of executors. These are required.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", -1)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", -1)
// Lower and upper bounds on the number of executors.
private val minNumExecutors = conf.getInt("spark.dynamicAllocation.minExecutors", 0)
private val maxNumExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors",
Integer.MAX_VALUE)

// How long there must be backlogged tasks for before an addition is triggered
private val schedulerBacklogTimeout = conf.getLong(
Expand Down Expand Up @@ -132,10 +134,10 @@ private[spark] class ExecutorAllocationManager(
*/
private def validateSettings(): Unit = {
if (minNumExecutors < 0 || maxNumExecutors < 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be set!")
throw new SparkException("spark.dynamicAllocation.{min/max}Executors must be positive!")
}
if (minNumExecutors == 0 || maxNumExecutors == 0) {
throw new SparkException("spark.dynamicAllocation.{min/max}Executors cannot be 0!")
if (maxNumExecutors == 0) {
throw new SparkException("spark.dynamicAllocation.maxExecutors cannot be 0!")
}
if (minNumExecutors > maxNumExecutors) {
throw new SparkException(s"spark.dynamicAllocation.minExecutors ($minNumExecutors) must " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,23 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
import ExecutorAllocationManagerSuite._

test("verify min/max executors") {
// No min or max
val conf = new SparkConf()
.setMaster("local")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
intercept[SparkException] { new SparkContext(conf) }
SparkEnv.get.stop() // cleanup the created environment
SparkContext.clearActiveContext()
val sc0 = new SparkContext(conf)
assert(sc0.executorAllocationManager.isDefined)
sc0.stop()

// Only min
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "1")
// Min < 0
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
intercept[SparkException] { new SparkContext(conf1) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()

// Only max
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "2")
// Max < 0
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
intercept[SparkException] { new SparkContext(conf2) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
Expand Down
20 changes: 14 additions & 6 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -1098,24 +1098,32 @@ Apart from these, the following properties are also available, and may be useful
available on YARN mode. For more detail, see the description
<a href="job-scheduling.html#dynamic-resource-allocation">here</a>.
<br><br>
This requires the following configurations to be set:
This requires <code>spark.shuffle.service.enabled</code> to be set.
The following configurations are also relevant:
<code>spark.dynamicAllocation.minExecutors</code>,
<code>spark.dynamicAllocation.maxExecutors</code>, and
<code>spark.shuffle.service.enabled</code>
<code>spark.dynamicAllocation.initialExecutors</code>
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.minExecutors</code></td>
<td>(none)</td>
<td>0</td>
<td>
Lower bound for the number of executors if dynamic allocation is enabled (required).
Lower bound for the number of executors if dynamic allocation is enabled.
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.maxExecutors</code></td>
<td>(none)</td>
<td>Integer.MAX_VALUE</td>
<td>
Upper bound for the number of executors if dynamic allocation is enabled.
</td>
</tr>
<tr>
<td><code>spark.dynamicAllocation.maxExecutors</code></td>
<td><code>spark.dynamicAllocation.minExecutors</code></td>
<td>
Upper bound for the number of executors if dynamic allocation is enabled (required).
Initial number of executors to run if dynamic allocation is enabled.
</td>
</tr>
<tr>
Expand Down
9 changes: 4 additions & 5 deletions docs/job-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,10 @@ scheduling while sharing cluster resources efficiently.
### Configuration and Setup

All configurations used by this feature live under the `spark.dynamicAllocation.*` namespace.
To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true` and
provide lower and upper bounds for the number of executors through
`spark.dynamicAllocation.minExecutors` and `spark.dynamicAllocation.maxExecutors`. Other relevant
configurations are described on the [configurations page](configuration.html#dynamic-allocation)
and in the subsequent sections in detail.
To enable this feature, your application must set `spark.dynamicAllocation.enabled` to `true`.
Other relevant configurations are described on the
[configurations page](configuration.html#dynamic-allocation) and in the subsequent sections in
detail.

Additionally, your application must use an external shuffle service. The purpose of the service is
to preserve the shuffle files written by executors so the executors can be safely removed (more
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,23 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
.orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p)))
.orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES"))
.orNull
// If dynamic allocation is enabled, start at the max number of executors
// If dynamic allocation is enabled, start at the configured initial number of executors.
// Default to minExecutors if no initialExecutors is set.
if (isDynamicAllocationEnabled) {
val minExecutorsConf = "spark.dynamicAllocation.minExecutors"
val initialExecutorsConf = "spark.dynamicAllocation.initialExecutors"
val maxExecutorsConf = "spark.dynamicAllocation.maxExecutors"
if (!sparkConf.contains(maxExecutorsConf)) {
val minNumExecutors = sparkConf.getInt(minExecutorsConf, 0)
val initialNumExecutors = sparkConf.getInt(initialExecutorsConf, minNumExecutors)
val maxNumExecutors = sparkConf.getInt(maxExecutorsConf, Integer.MAX_VALUE)

// If defined, initial executors must be between min and max
if (initialNumExecutors < minNumExecutors || initialNumExecutors > maxNumExecutors) {
throw new IllegalArgumentException(
s"$maxExecutorsConf must be set if dynamic allocation is enabled!")
s"$initialExecutorsConf must be between $minExecutorsConf and $maxNumExecutors!")
}
numExecutors = sparkConf.get(maxExecutorsConf).toInt

numExecutors = initialNumExecutors
}
}

Expand Down

0 comments on commit b2047b5

Please sign in to comment.