Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19606][MESOS] Support constraints in spark-dispatcher #19543

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,10 @@ resource offers will be accepted.
conf.set("spark.mesos.constraints", "os:centos7;us-east-1:false")
{% endhighlight %}

For example, Let's say `spark.mesos.constraints` is set to `os:centos7;us-east-1:false`, then the resource offers will be checked to see if they meet both these constraints and only then will be accepted to start new executors.
For example, Let's say `spark.mesos.constraints` is set to `os:centos7;us-east-1:false`, then the resource offers will
be checked to see if they meet both these constraints and only then will be accepted to start new executors.

To constrain where driver tasks are run, use `spark.mesos.driver.constraints`

# Mesos Docker Support

Expand Down Expand Up @@ -447,7 +450,9 @@ See the [configuration page](configuration.html) for information on Spark config
<td><code>spark.mesos.constraints</code></td>
<td>(none)</td>
<td>
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos Attributes & Resources</a> for more information on attributes.
Attribute based constraints on mesos resource offers. By default, all resource offers will be accepted. This setting
applies only to executors. Refer to <a href="http://mesos.apache.org/documentation/attributes-resources/">Mesos
Attributes & Resources</a> for more information on attributes.
<ul>
<li>Scalar constraints are matched with "less than equal" semantics i.e. value in the constraint must be less than or equal to the value in the resource offer.</li>
<li>Range constraints are matched with "contains" semantics i.e. value in the constraint must be within the resource offer's value.</li>
Expand All @@ -457,6 +462,14 @@ See the [configuration page](configuration.html) for information on Spark config
</ul>
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.constraints</code></td>
<td>(none)</td>
<td>
Same as <code>spark.mesos.constraints</code> except applied to drivers when launched through the dispatcher. By default,
all offers with sufficient resources will be accepted.
</td>
</tr>
<tr>
<td><code>spark.mesos.containerizer</code></td>
<td><code>docker</code></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,11 @@ package object config {
"Example: key1:val1,key2:val2")
.stringConf
.createOptional

private[spark] val DRIVER_CONSTRAINTS =
ConfigBuilder("spark.mesos.driver.constraints")
.doc("Attribute based constraints on mesos resource offers. Applied by the dispatcher " +
"when launching drivers. Default is to accept all offers with sufficient resources.")
.stringConf
.createWithDefault("")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be default to ""? looks like it might still match something

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@felixcheung I think it's okay. There's a utility function, parseConstraintString, which parses the input string into a Map. If the string is empty, it returns an empty Map:

def parseConstraintString(constraintsVal: String): Map[String, Set[String]] = {

This is also consistent with how the executor constraint string is parsed.

}
Original file line number Diff line number Diff line change
Expand Up @@ -556,9 +556,10 @@ private[spark] class MesosClusterScheduler(

private class ResourceOffer(
val offer: Offer,
var remainingResources: JList[Resource]) {
var remainingResources: JList[Resource],
var attributes: JList[Attribute]) {
override def toString(): String = {
s"Offer id: ${offer.getId}, resources: ${remainingResources}"
s"Offer id: ${offer.getId}, resources: ${remainingResources}, attributes: ${attributes}"
}
}

Expand Down Expand Up @@ -601,10 +602,14 @@ private[spark] class MesosClusterScheduler(
for (submission <- candidates) {
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
val driverConstraints =
parseConstraintString(submission.conf.get(config.DRIVER_CONSTRAINTS))
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem, " +
s"driverConstraints: $driverConstraints")
val offerOption = currentOffers.find { offer =>
getResource(offer.remainingResources, "cpus") >= driverCpu &&
getResource(offer.remainingResources, "mem") >= driverMem
getResource(offer.remainingResources, "mem") >= driverMem &&
matchesAttributeRequirements(driverConstraints, toAttributeMap(offer.attributes))
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
Expand Down Expand Up @@ -652,7 +657,7 @@ private[spark] class MesosClusterScheduler(
val currentTime = new Date()

val currentOffers = offers.asScala.map {
offer => new ResourceOffer(offer, offer.getResourcesList)
offer => new ResourceOffer(offer, offer.getResourcesList, offer.getAttributesList)
}.toList

stateLock.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,53 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}

test("accept/decline offers with driver constraints") {
setScheduler()

val mem = 1000
val cpu = 1
val s2Attributes = List(Utils.createTextAttribute("c1", "a"))
val s3Attributes = List(
Utils.createTextAttribute("c1", "a"),
Utils.createTextAttribute("c2", "b"))
val offers = List(
Utils.createOffer("o1", "s1", mem, cpu, None, 0),
Utils.createOffer("o2", "s2", mem, cpu, None, 0, s2Attributes),
Utils.createOffer("o3", "s3", mem, cpu, None, 0, s3Attributes))

def submitDriver(driverConstraints: String): Unit = {
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", mem, cpu, true,
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
config.DRIVER_CONSTRAINTS.key -> driverConstraints),
"s1",
new Date()))
assert(response.success)
}

submitDriver("c1:x")
scheduler.resourceOffers(driver, offers.asJava)
offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))

submitDriver("c1:y;c2:z")
scheduler.resourceOffers(driver, offers.asJava)
offers.foreach(o => Utils.verifyTaskNotLaunched(driver, o.getId.getValue))

submitDriver("")
scheduler.resourceOffers(driver, offers.asJava)
Utils.verifyTaskLaunched(driver, "o1")

submitDriver("c1:a")
scheduler.resourceOffers(driver, offers.asJava)
Utils.verifyTaskLaunched(driver, "o2")

submitDriver("c1:a;c2:b")
scheduler.resourceOffers(driver, offers.asJava)
Utils.verifyTaskLaunched(driver, "o3")
}

test("supports spark.mesos.driver.labels") {
setScheduler()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,13 @@ object Utils {
.build()

def createOffer(
offerId: String,
slaveId: String,
mem: Int,
cpus: Int,
ports: Option[(Long, Long)] = None,
gpus: Int = 0): Offer = {
offerId: String,
slaveId: String,
mem: Int,
cpus: Int,
ports: Option[(Long, Long)] = None,
gpus: Int = 0,
attributes: List[Attribute] = List.empty): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
Expand All @@ -63,7 +64,7 @@ object Utils {
.setName("ports")
.setType(Value.Type.RANGES)
.setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
.setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
}
if (gpus > 0) {
builder.addResourcesBuilder()
Expand All @@ -73,9 +74,10 @@ object Utils {
}
builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
.setValue("f1"))
.setSlaveId(SlaveID.newBuilder().setValue(slaveId))
.setHostname(s"host${slaveId}")
.addAllAttributes(attributes.asJava)
.build()
}

Expand Down Expand Up @@ -125,7 +127,7 @@ object Utils {
.getVariablesList
.asScala
assert(envVars
.count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
.count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
val variableOne = envVars.filter(_.getName == "SECRET_ENV_KEY").head
assert(variableOne.getSecret.isInitialized)
assert(variableOne.getSecret.getType == Secret.Type.REFERENCE)
Expand Down Expand Up @@ -154,7 +156,7 @@ object Utils {
.getVariablesList
.asScala
assert(envVars
.count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
.count(!_.getName.startsWith("SPARK_")) == 2) // user-defined secret env vars
val variableOne = envVars.filter(_.getName == "USER").head
assert(variableOne.getSecret.isInitialized)
assert(variableOne.getSecret.getType == Secret.Type.VALUE)
Expand Down Expand Up @@ -212,4 +214,13 @@ object Utils {
assert(secretVolTwo.getSource.getSecret.getValue.getData ==
ByteString.copyFrom("password".getBytes))
}

def createTextAttribute(name: String, value: String): Attribute = {
Attribute.newBuilder()
.setName(name)
.setType(Value.Type.TEXT)
.setText(Value.Text.newBuilder().setValue(value))
.build()
}
}