Skip to content

Commit

Permalink
[SPARK-22256] - Introduce spark.mesos.driver.memoryOverhead
Browse files Browse the repository at this point in the history
  • Loading branch information
Paul Mackles committed Apr 9, 2018
1 parent 8d40a79 commit 1197c0b
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 8 deletions.
9 changes: 9 additions & 0 deletions docs/running-on-mesos.md
Expand Up @@ -435,6 +435,15 @@ See the [configuration page](configuration.html) for information on Spark config
the final overhead will be this value.
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.memoryOverhead</code></td>
<td>driver memory * 0.10, with minimum of 384</td>
<td>
The amount of additional memory, specified in MB, to be allocated to the driver. By default,
the overhead will be larger of either 384 or 10% of <code>spark.driver.memory</code>. If set,
the final overhead will be this value. Only applies to cluster mode.
</td>
</tr>
<tr>
<td><code>spark.mesos.uris</code></td>
<td>(none)</td>
Expand Down
Expand Up @@ -129,4 +129,12 @@ package object config {
"when launching drivers. Default is to accept all offers with sufficient resources.")
.stringConf
.createWithDefault("")

private[spark] val DRIVER_MEMORY_OVERHEAD =
ConfigBuilder("spark.mesos.driver.memoryOverhead")
.doc("The amount of additional memory, specified in MB, to be allocated to the driver. " +
"By default, the overhead will be larger of either 384 or 10% of spark.driver.memory. " +
"Only applies to cluster mode.")
.intConf
.createOptional
}
Expand Up @@ -568,7 +568,7 @@ private[spark] class MesosClusterScheduler(
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.remainingResources, "cpus", desc.cores)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", desc.mem)
partitionResources(remainingResources.asJava, "mem", driverContainerMemory(desc))
offer.remainingResources = finalResources.asJava

val appName = desc.conf.get("spark.app.name")
Expand Down Expand Up @@ -600,7 +600,7 @@ private[spark] class MesosClusterScheduler(
tasks: mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]): Unit = {
for (submission <- candidates) {
val driverCpu = submission.cores
val driverMem = submission.mem
val driverMem = driverContainerMemory(submission)
val driverConstraints =
parseConstraintString(submission.conf.get(config.DRIVER_CONSTRAINTS))
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem, " +
Expand Down
Expand Up @@ -36,6 +36,7 @@ import org.apache.mesos.protobuf.{ByteString, GeneratedMessageV3}

import org.apache.spark.{SparkConf, SparkContext, SparkException}
import org.apache.spark.TaskState
import org.apache.spark.deploy.mesos.{config, MesosDriverDescription}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -404,6 +405,19 @@ trait MesosSchedulerUtils extends Logging {
sc.executorMemory
}

/**
* Return the amount of memory to allocate to each driver, taking into account
* container overheads.
*
* @param driverDesc used to get driver memory
* @return memory requirement as (0.1 * memoryOverhead) or MEMORY_OVERHEAD_MINIMUM
* (whichever is larger)
*/
def driverContainerMemory(driverDesc: MesosDriverDescription): Int = {
val defaultMem = math.max(MEMORY_OVERHEAD_FRACTION * driverDesc.mem, MEMORY_OVERHEAD_MINIMUM)
driverDesc.conf.get(config.DRIVER_MEMORY_OVERHEAD).getOrElse(defaultMem.toInt) + driverDesc.mem
}

def setupUris(uris: String,
builder: CommandInfo.Builder,
useFetcherCache: Boolean = false): Unit = {
Expand Down
Expand Up @@ -104,7 +104,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
command,
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test"),
("spark.mesos.driver.memoryOverhead", "0")),
"s1",
new Date()))
assert(response.success)
Expand Down Expand Up @@ -199,6 +200,33 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
})
}

test("supports spark.mesos.driver.memoryOverhead") {
setScheduler()

val mem = 1000
val cpu = 1

val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test"),
"s1",
new Date()))
assert(response.success)

val offer = Utils.createOffer("o1", "s1", mem*2, cpu)
scheduler.resourceOffers(driver, List(offer).asJava)
val tasks = Utils.verifyTaskLaunched(driver, "o1")
// 1384.0
val taskMem = tasks.head.getResourcesList
.asScala
.filter(_.getName.equals("mem"))
.map(_.getScalar.getValue)
.head
assert(1384.0 === taskMem)
}

test("supports spark.mesos.driverEnv.*") {
setScheduler()

Expand All @@ -210,7 +238,9 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
"spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL",
"spark.mesos.driver.memoryOverhead" -> "0"
),
"s1",
new Date()))
assert(response.success)
Expand All @@ -235,7 +265,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"),
"spark.mesos.network.labels" -> "key1:val1,key2:val2",
"spark.mesos.driver.memoryOverhead" -> "0"),
"s1",
new Date()))

Expand Down Expand Up @@ -274,7 +305,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
config.DRIVER_CONSTRAINTS.key -> driverConstraints),
config.DRIVER_CONSTRAINTS.key -> driverConstraints,
"spark.mesos.driver.memoryOverhead" -> "0"),
"s1",
new Date()))
assert(response.success)
Expand Down Expand Up @@ -312,7 +344,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.driver.labels" -> "key:value"),
"spark.mesos.driver.labels" -> "key:value",
"spark.mesos.driver.memoryOverhead" -> "0"),
"s1",
new Date()))

Expand Down Expand Up @@ -423,7 +456,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test") ++
"spark.app.name" -> "test",
"spark.mesos.driver.memoryOverhead" -> "0") ++
addlSparkConfVars,
"s1",
new Date())
Expand Down

0 comments on commit 1197c0b

Please sign in to comment.