Skip to content

Commit

Permalink
[SPARK-19606][MESOS] Support constraints in spark-dispatcher
Browse files Browse the repository at this point in the history
A discussed in SPARK-19606, the addition of a new config property named "spark.mesos.constraints.driver" for constraining drivers running on a Mesos cluster

Corresponding unit test added also tested locally on a Mesos cluster

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Paul Mackles <pmackles@adobe.com>

Closes #19543 from pmackles/SPARK-19606.

(cherry picked from commit b3f9dbf)
Signed-off-by: Felix Cheung <felixcheung@apache.org>
  • Loading branch information
Paul Mackles authored and Felix Cheung committed Nov 12, 2017
1 parent 4ef0bef commit f6ee3d9
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 15 deletions.
17 changes: 15 additions & 2 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,10 @@ resource offers will be accepted.
conf.set("spark.mesos.constraints", "os:centos7;us-east-1:false")
{% endhighlight %}

For example, Let's say `spark.mesos.constraints` is set to `os:centos7;us-east-1:false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors.
For example, Let's say `spark.mesos.constraints` is set to `os:centos7;us-east-1:false`, then the resource offers will
be checked to see if they meet both these constraints and only then will be accepted to start new executors.

To constrain where driver tasks are run, use `spark.mesos.driver.constraints`

# Mesos Docker Support

Expand Down Expand Up @@ -442,7 +445,9 @@ See the [configuration page](configuration.html) for information on Spark config
<td><code>spark.mesos.constraints</code></td>
<td>(none)</td>
<td>
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos Attributes & Resources</a> for more information on attributes.
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. This setting
applies only to executors. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos
Attributes & Resources</a> for more information on attributes.
<ul>
<li>Scalar constraints are matched with "less than equal" semantics i.e. value in the constraint must be less than or equal to the value in the resource offer.</li>
<li>Range constraints are matched with "contains" semantics i.e. value in the constraint must be within the resource offer's value.</li>
Expand All @@ -452,6 +457,14 @@ See the [configuration page](configuration.html) for information on Spark config
</ul>
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.constraints</code></td>
<td>(none)</td>
<td>
Same as <code>spark.mesos.constraints</code> except applied to drivers when launched through the dispatcher. By default,
all offers with sufficient resources will be accepted.
</td>
</tr>
<tr>
<td><code>spark.mesos.containerizer</code></td>
<td><code>docker</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,10 @@ package object config {
.stringConf
.createOptional

private[spark] val DRIVER_CONSTRAINTS =
ConfigBuilder("spark.mesos.driver.constraints")
.doc("Attribute based constraints on mesos resource offers. Applied by the dispatcher " +
"when launching drivers. Default is to accept all offers with sufficient resources.")
.stringConf
.createWithDefault("")
}
Original file line number Diff line number Diff line change
Expand Up @@ -509,9 +509,10 @@ private[spark] class MesosClusterScheduler(

private class ResourceOffer(
val offer: Offer,
var remainingResources: JList[Resource]) {
var remainingResources: JList[Resource],
var attributes: JList[Attribute]) {
override def toString(): String = {
s"Offer id: ${offer.getId}, resources: ${remainingResources}"
s"Offer id: ${offer.getId}, resources: ${remainingResources}, attributes: ${attributes}"
}
}

Expand Down Expand Up @@ -549,10 +550,14 @@ private[spark] class MesosClusterScheduler(
for (submission <- candidates) {
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val driverConstraints =
parseConstraintString(submission.conf.get(config.DRIVER_CONSTRAINTS))
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem, " +
s"driverConstraints: $driverConstraints")
val offerOption = currentOffers.find { offer =>
getResource(offer.remainingResources, "cpus") >= driverCpu &&
getResource(offer.remainingResources, "mem") >= driverMem
getResource(offer.remainingResources, "mem") >= driverMem &&
matchesAttributeRequirements(driverConstraints, toAttributeMap(offer.attributes))
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
Expand Down Expand Up @@ -595,7 +600,7 @@ private[spark] class MesosClusterScheduler(
val currentTime = new Date()

val currentOffers = offers.asScala.map {
offer => new ResourceOffer(offer, offer.getResourcesList)
offer => new ResourceOffer(offer, offer.getResourcesList, offer.getAttributesList)
}.toList

stateLock.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,53 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(networkInfos.get(0).getName == "test-network-name")
}

test("accept/decline offers with driver constraints") {
setScheduler()

val mem = 1000
val cpu = 1
val s2Attributes = List(Utils.createTextAttribute("c1", "a"))
val s3Attributes = List(
Utils.createTextAttribute("c1", "a"),
Utils.createTextAttribute("c2", "b"))
val offers = List(
Utils.createOffer("o1", "s1", mem, cpu, None, 0),
Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes),
Utils.createOffer("o3", "s3", mem, cpu, None, 0, s3Attributes))

def submitDriver(driverConstraints: String): Unit = {
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
config.DRIVER_CONSTRAINTS.key -> driverConstraints),
"s1",
new Date()))
assert(response.success)
}

submitDriver("c1:x")
scheduler.resourceOffers(driver, offers.asJava)
offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))

submitDriver("c1:y;c2:z")
scheduler.resourceOffers(driver, offers.asJava)
offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))

submitDriver("")
scheduler.resourceOffers(driver, offers.asJava)
Utils.verifyTaskLaunched(driver, "o1")

submitDriver("c1:a")
scheduler.resourceOffers(driver, offers.asJava)
Utils.verifyTaskLaunched(driver, "o2")

submitDriver("c1:a;c2:b")
scheduler.resourceOffers(driver, offers.asJava)
Utils.verifyTaskLaunched(driver, "o3")
}

test("can kill supervised drivers") {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ object Utils {
.build()

def createOffer(
offerId: String,
slaveId: String,
mem: Int,
cpus: Int,
ports: Option[(Long, Long)] = None,
gpus: Int = 0): Offer = {
offerId: String,
slaveId: String,
mem: Int,
cpus: Int,
ports: Option[(Long, Long)] = None,
gpus: Int = 0,
attributes: List[Attribute] = List.empty): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
Expand All @@ -60,7 +61,7 @@ object Utils {
.setName("ports")
.setType(Value.Type.RANGES)
.setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
}
if (gpus > 0) {
builder.addResourcesBuilder()
Expand All @@ -70,9 +71,10 @@ object Utils {
}
builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
.setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(slaveId))
.setHostname(s"host${slaveId}")
.addAllAttributes(attributes.asJava)
.build()
}

Expand All @@ -99,4 +101,13 @@ object Utils {
def createTaskId(taskId: String): TaskID = {
TaskID.newBuilder().setValue(taskId).build()
}

def createTextAttribute(name: String, value: String): Attribute = {
Attribute.newBuilder()
.setName(name)
.setType(Value.Type.TEXT)
.setText(Value.Text.newBuilder().setValue(value))
.build()
}
}

0 comments on commit f6ee3d9

Please sign in to comment.