Skip to content

Commit

Permalink
[SPARK-21694][MESOS] Allow the user to pass network labels to CNI plu…
Browse files Browse the repository at this point in the history
…gins via
  • Loading branch information
susanxhuynh committed Aug 10, 2017
1 parent 2d799d0 commit 255e519
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 5 deletions.
14 changes: 14 additions & 0 deletions docs/running-on-mesos.md
Expand Up @@ -537,6 +537,20 @@ See the [configuration page](configuration.html) for information on Spark config
for more details.
</td>
</tr>
<tr>
<td><code>spark.mesos.network.labels</code></td>
<td><code>(none)</code></td>
<td>
Pass network labels to CNI plugins. This is a comma-separated list
of key-value pairs, where each key-value pair has the format key:value.
Example:

<pre>key1=val1,key2=val2</pre>
See
<a href="http://mesos.apache.org/documentation/latest/cni/#mesos-meta-data-to-cni-plugins">the Mesos CNI docs</a>
for more details.
</td>
</tr>
<tr>
<td><code>spark.mesos.fetcherCache.enable</code></td>
<td><code>false</code></td>
Expand Down
Expand Up @@ -70,4 +70,12 @@ package object config {
"during a temporary disconnection, before tearing down all the executors.")
.doubleConf
.createWithDefault(0.0)

private [spark] val NETWORK_LABELS =
ConfigBuilder("spark.mesos.network.labels")
.doc("Network labels to pass to CNI plugins. This is a comma-separated list " +
"of key-value pairs, where each key-value pair has the format key:value. " +
"Example: key1=val1,key2=val2")
.stringConf
.createOptional
}
Expand Up @@ -21,6 +21,7 @@ import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Vo
import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.Logging

/**
Expand Down Expand Up @@ -162,7 +163,11 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
}

conf.getOption("spark.mesos.network.name").map { name =>
val info = NetworkInfo.newBuilder().setName(name).build()
val networkLabels = MesosProtoUtils.mesosLabels(conf.get(NETWORK_LABELS).getOrElse(""))
val info = NetworkInfo.newBuilder()
.setName(name)
.setLabels(networkLabels)
.build()
containerInfo.addNetworkInfos(info)
}

Expand Down
Expand Up @@ -222,7 +222,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
}

test("supports spark.mesos.network.name") {
test("supports spark.mesos.network.name and spark.mesos.network.labels") {
setScheduler()

val mem = 1000
Expand All @@ -233,7 +233,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
command,
Map("spark.mesos.executor.home" -> "test",
"spark.app.name" -> "test",
"spark.mesos.network.name" -> "test-network-name"),
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"),
"s1",
new Date()))

Expand All @@ -246,6 +247,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}

test("supports spark.mesos.driver.labels") {
Expand Down
Expand Up @@ -568,9 +568,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getLabels.equals(taskLabels))
}

test("mesos supports spark.mesos.network.name") {
test("mesos supports spark.mesos.network.name and spark.mesos.network.labels") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name"
"spark.mesos.network.name" -> "test-network-name",
"spark.mesos.network.labels" -> "key1:val1,key2:val2"
))

val (mem, cpu) = (backend.executorMemory(sc), 4)
Expand All @@ -582,6 +583,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1")
assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1")
assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2")
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}

test("supports spark.scheduler.minRegisteredResourcesRatio") {
Expand Down

0 comments on commit 255e519

Please sign in to comment.