From 255e5195a414f0f9e609cb579a2e8fdfea200be1 Mon Sep 17 00:00:00 2001 From: "Susan X. Huynh" Date: Thu, 10 Aug 2017 11:19:51 -0700 Subject: [PATCH 1/4] [SPARK-21694][MESOS] Allow the user to pass network labels to CNI plugins via --- docs/running-on-mesos.md | 14 ++++++++++++++ .../org/apache/spark/deploy/mesos/config.scala | 8 ++++++++ .../cluster/mesos/MesosSchedulerBackendUtil.scala | 7 ++++++- .../cluster/mesos/MesosClusterSchedulerSuite.scala | 9 +++++++-- .../MesosCoarseGrainedSchedulerBackendSuite.scala | 9 +++++++-- 5 files changed, 42 insertions(+), 5 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index ae3855084a650..a15d19842b5e8 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -537,6 +537,20 @@ See the [configuration page](configuration.html) for information on Spark config for more details. + + spark.mesos.network.labels + (none) + + Pass network labels to CNI plugins. This is a comma-separated list + of key-value pairs, where each key-value pair has the format key:value. + Example: + +
key1=val1,key2=val2
+ See + the Mesos CNI docs + for more details. + + spark.mesos.fetcherCache.enable false diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 6c8619e3c3c13..c1d3990124a94 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -70,4 +70,12 @@ package object config { "during a temporary disconnection, before tearing down all the executors.") .doubleConf .createWithDefault(0.0) + + private [spark] val NETWORK_LABELS = + ConfigBuilder("spark.mesos.network.labels") + .doc("Network labels to pass to CNI plugins. This is a comma-separated list " + + "of key-value pairs, where each key-value pair has the format key:value. " + + "Example: key1=val1,key2=val2") + .stringConf + .createOptional } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index fbcbc55099ec5..a9226de7074b5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -21,6 +21,7 @@ import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Vo import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo} import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.Logging /** @@ -162,7 +163,11 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { } conf.getOption("spark.mesos.network.name").map { name => - val info = NetworkInfo.newBuilder().setName(name).build() + val networkLabels = MesosProtoUtils.mesosLabels(conf.get(NETWORK_LABELS).getOrElse("")) + val info = NetworkInfo.newBuilder() + .setName(name) + .setLabels(networkLabels) + .build() containerInfo.addNetworkInfos(info) } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index 0bb47906347d5..50bb501071509 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -222,7 +222,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL") } - test("supports spark.mesos.network.name") { + test("supports spark.mesos.network.name and spark.mesos.network.labels") { setScheduler() val mem = 1000 @@ -233,7 +233,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi command, Map("spark.mesos.executor.home" -> "test", "spark.app.name" -> "test", - "spark.mesos.network.name" -> "test-network-name"), + "spark.mesos.network.name" -> "test-network-name", + "spark.mesos.network.labels" -> "key1:val1,key2:val2"), "s1", new Date())) @@ -246,6 +247,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList assert(networkInfos.size == 1) assert(networkInfos.get(0).getName == "test-network-name") + assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1") + assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1") + assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2") + assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2") } test("supports spark.mesos.driver.labels") { diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index d9ff4a403ea36..605ed07b641a8 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -568,9 +568,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(launchedTasks.head.getLabels.equals(taskLabels)) } - test("mesos supports spark.mesos.network.name") { + test("mesos supports spark.mesos.network.name and spark.mesos.network.labels") { setBackend(Map( - "spark.mesos.network.name" -> "test-network-name" + "spark.mesos.network.name" -> "test-network-name", + "spark.mesos.network.labels" -> "key1:val1,key2:val2" )) val (mem, cpu) = (backend.executorMemory(sc), 4) @@ -582,6 +583,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val networkInfos = launchedTasks.head.getContainer.getNetworkInfosList assert(networkInfos.size == 1) assert(networkInfos.get(0).getName == "test-network-name") + assert(networkInfos.get(0).getLabels.getLabels(0).getKey == "key1") + assert(networkInfos.get(0).getLabels.getLabels(0).getValue == "val1") + assert(networkInfos.get(0).getLabels.getLabels(1).getKey == "key2") + assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2") } test("supports spark.scheduler.minRegisteredResourcesRatio") { From d261593a68fd5bd9d2527118eca7d2665570bb4e Mon Sep 17 00:00:00 2001 From: "Susan X. Huynh" Date: Fri, 11 Aug 2017 15:10:11 -0700 Subject: [PATCH 2/4] Addressed review feedback: fixed the example in documentation; added 'NETWORK_NAME' to config object. --- docs/running-on-mesos.md | 2 +- .../main/scala/org/apache/spark/deploy/mesos/config.scala | 7 +++++++ .../cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala | 2 +- .../cluster/mesos/MesosSchedulerBackendUtil.scala | 4 ++-- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index a15d19842b5e8..0e5a20c578db3 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -545,7 +545,7 @@ See the [configuration page](configuration.html) for information on Spark config of key-value pairs, where each key-value pair has the format key:value. Example: -
key1=val1,key2=val2
+
key1:val1,key2:val2
See the Mesos CNI docs for more details. diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index c1d3990124a94..e9a8f449934b7 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -71,6 +71,13 @@ package object config { .doubleConf .createWithDefault(0.0) + private [spark] val NETWORK_NAME = + ConfigBuilder("spark.mesos.network.name") + .doc("Attach containers to the given named network. If this job is launched " + + "in cluster mode, also launch the driver in the given named network.") + .stringConf + .createOptional + private [spark] val NETWORK_LABELS = ConfigBuilder("spark.mesos.network.labels") .doc("Network labels to pass to CNI plugins. This is a comma-separated list " + diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index e6b09572121d6..b2daeaa8d2141 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -668,7 +668,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } private def executorHostname(offer: Offer): String = { - if (sc.conf.getOption("spark.mesos.network.name").isDefined) { + if (sc.conf.get(NETWORK_NAME).isDefined) { // The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0 "0.0.0.0" } else { diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index a9226de7074b5..e5c1e801f2772 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -21,7 +21,7 @@ import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Vo import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo} import org.apache.spark.{SparkConf, SparkException} -import org.apache.spark.deploy.mesos.config._ +import org.apache.spark.deploy.mesos.config.{NETWORK_LABELS, NETWORK_NAME} import org.apache.spark.internal.Logging /** @@ -162,7 +162,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { volumes.foreach(_.foreach(containerInfo.addVolumes(_))) } - conf.getOption("spark.mesos.network.name").map { name => + conf.get(NETWORK_NAME).map { name => val networkLabels = MesosProtoUtils.mesosLabels(conf.get(NETWORK_LABELS).getOrElse("")) val info = NetworkInfo.newBuilder() .setName(name) From dc09312a9d011e7d2d6c62c5b0ac7982284ab6aa Mon Sep 17 00:00:00 2001 From: "Susan X. Huynh" Date: Fri, 11 Aug 2017 15:22:43 -0700 Subject: [PATCH 3/4] One more doc fix. --- .../src/main/scala/org/apache/spark/deploy/mesos/config.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index e9a8f449934b7..4b998d17798aa 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -82,7 +82,7 @@ package object config { ConfigBuilder("spark.mesos.network.labels") .doc("Network labels to pass to CNI plugins. This is a comma-separated list " + "of key-value pairs, where each key-value pair has the format key:value. " + - "Example: key1=val1,key2=val2") + "Example: key1:val1,key2:val2") .stringConf .createOptional } From 38e3b4df33ac37c5e75224b607d3255ea8296f7b Mon Sep 17 00:00:00 2001 From: "Susan X. Huynh" Date: Wed, 23 Aug 2017 15:13:17 -0700 Subject: [PATCH 4/4] Fixed style --- .../main/scala/org/apache/spark/deploy/mesos/config.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 4b998d17798aa..a5015b9243316 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -56,7 +56,7 @@ package object config { .stringConf .createOptional - private [spark] val DRIVER_LABELS = + private[spark] val DRIVER_LABELS = ConfigBuilder("spark.mesos.driver.labels") .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " + "pairs should be separated by a colon, and commas used to list more than one." + @@ -64,21 +64,21 @@ package object config { .stringConf .createOptional - private [spark] val DRIVER_FAILOVER_TIMEOUT = + private[spark] val DRIVER_FAILOVER_TIMEOUT = ConfigBuilder("spark.mesos.driver.failoverTimeout") .doc("Amount of time in seconds that the master will wait to hear from the driver, " + "during a temporary disconnection, before tearing down all the executors.") .doubleConf .createWithDefault(0.0) - private [spark] val NETWORK_NAME = + private[spark] val NETWORK_NAME = ConfigBuilder("spark.mesos.network.name") .doc("Attach containers to the given named network. If this job is launched " + "in cluster mode, also launch the driver in the given named network.") .stringConf .createOptional - private [spark] val NETWORK_LABELS = + private[spark] val NETWORK_LABELS = ConfigBuilder("spark.mesos.network.labels") .doc("Network labels to pass to CNI plugins. This is a comma-separated list " + "of key-value pairs, where each key-value pair has the format key:value. " +