Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ private[spark] class MesosSchedulerBackend(

// The listener bus to publish executor added/removed events.
val listenerBus = sc.listenerBus

private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)

@volatile var appId: String = _

Expand Down Expand Up @@ -139,7 +141,7 @@ private[spark] class MesosSchedulerBackend(
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(scheduler.CPUS_PER_TASK).build())
.setValue(mesosExecutorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
Expand Down Expand Up @@ -220,10 +222,9 @@ private[spark] class MesosSchedulerBackend(
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
// TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this comment still valid?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 I've missed it. It doesn't need

cpus >= 2 * scheduler.CPUS_PER_TASK) ||
cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
(slaveIdsWithExecutors.contains(slaveId) &&
cpus >= scheduler.CPUS_PER_TASK)
}
Expand All @@ -232,10 +233,9 @@ private[spark] class MesosSchedulerBackend(
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the executor doesn't exist yet, subtract CPU for executor
// TODO(pwendell): Should below just subtract "1"?
getResource(o.getResourcesList, "cpus").toInt -
scheduler.CPUS_PER_TASK
// If the Mesos executor has not been started on this slave yet, set aside a few
// cores for the Mesos executor by offering fewer cores to the Spark executor
(getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt
}
new WorkerOffer(
o.getSlaveId.getValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
2
(minCpu - backend.mesosExecutorCores).toInt
))
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(2).getSlaveId.getValue,
mesosOffers.get(2).getHostname,
2
(minCpu - backend.mesosExecutorCores).toInt
))
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
Expand Down
10 changes: 10 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,16 @@ See the [configuration page](configuration.html) for information on Spark config
Note that total amount of cores the executor will request in total will not exceed the spark.cores.max setting.
</td>
</tr>
<tr>
<td><code>spark.mesos.mesosExecutor.cores</code></td>
<td>1.0</td>
<td>
(Fine-grained mode only) Number of cores to give each Mesos executor. This does not
include the cores used to run the Spark tasks. In other words, even if no Spark task
is being run, each Mesos executor will occupy the number of cores configured here.
The value can be a floating point number.
</td>
</tr>
<tr>
<td><code>spark.mesos.executor.home</code></td>
<td>driver side <code>SPARK_HOME</code></td>
Expand Down