diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index a42a928936a8c..e96c41a61b066 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -355,8 +355,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { } } - if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || - appId == null) { + if (hostname == null) { + hostname = Utils.localHostName() + log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself") + } + + if (driverUrl == null || executorId == null || cores <= 0 || appId == null) { printUsageAndExit(classNameForEntry) } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 89797159fac21..a54bca800a007 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -268,27 +268,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( .getOrElse { throw new SparkException(s"Executor Spark home `$EXECUTOR_HOME` is not set!") } - val runScript = new File(executorSparkHome, "./bin/spark-class").getPath - command.setValue( - "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" - .format(prefixEnv, runScript) + - s" --driver-url $driverURL" + - s" --executor-id $taskId" + - s" --hostname ${executorHostname(offer)}" + - s" --cores $numCores" + - s" --app-id $appId") + val executable = new File(executorSparkHome, "./bin/spark-class").getPath + val runScript = "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend" + .format(prefixEnv, executable) + + command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer)) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.get.split('/').last.split('.').head - command.setValue( - s"cd $basename*; $prefixEnv " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + - s" --driver-url $driverURL" + - s" --executor-id $taskId" + - s" --hostname ${executorHostname(offer)}" + - s" --cores $numCores" + - s" --app-id $appId") + val runScript = s"cd $basename*; $prefixEnv " + + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" + + command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache)) } @@ -297,6 +289,28 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( command.build() } + private def buildExecutorCommand( + runScript: String, taskId: String, numCores: Int, offer: Offer): String = { + + val sb = new StringBuilder() + .append(runScript) + .append(" --driver-url ") + .append(driverURL) + .append(" --executor-id ") + .append(taskId) + .append(" --cores ") + .append(numCores) + .append(" --app-id ") + .append(appId) + + if (sc.conf.get(NETWORK_NAME).isEmpty) { + sb.append(" --hostname ") + sb.append(offer.getHostname) + } + + sb.toString() + } + protected def driverURL: String = { if (conf.contains(IS_TESTING)) { "driverURL" @@ -778,15 +792,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private def numExecutors(): Int = { slaves.values.map(_.taskIDs.size).sum } - - private def executorHostname(offer: Offer): String = { - if (sc.conf.get(NETWORK_NAME).isDefined) { - // The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0 - "0.0.0.0" - } else { - offer.getHostname - } - } } private class Slave(val hostname: String) { diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index a217deb8b49d7..7b2f6a2535eda 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -152,7 +152,9 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { .getOrElse(List.empty) if (containerType == ContainerInfo.Type.DOCKER) { - containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps, params)) + containerInfo.setDocker( + dockerInfo(image, forcePullImage, portMaps, params, conf.get(NETWORK_NAME)) + ) } else { containerInfo.setMesos(mesosInfo(image, forcePullImage)) } @@ -262,13 +264,24 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { image: String, forcePullImage: Boolean, portMaps: List[ContainerInfo.DockerInfo.PortMapping], - params: List[Parameter]): DockerInfo = { + params: List[Parameter], + networkName: Option[String]): DockerInfo = { val dockerBuilder = ContainerInfo.DockerInfo.newBuilder() .setImage(image) .setForcePullImage(forcePullImage) portMaps.foreach(dockerBuilder.addPortMappings(_)) params.foreach(dockerBuilder.addParameters(_)) + networkName.foreach { net => + val network = Parameter.newBuilder() + .setKey("net") + .setValue(net) + .build() + + dockerBuilder.setNetwork(DockerInfo.Network.USER) + dockerBuilder.addParameters(network) + } + dockerBuilder.build } diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 444a9d13bc077..f810da17e6c44 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -587,6 +587,30 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2") } + test("SPARK-28778 '--hostname' shouldn't be set for executor when virtual network is enabled") { + setBackend() + val (mem, cpu) = (backend.executorMemory(sc), 4) + val offer = createOffer("o1", "s1", mem, cpu) + + assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname")) + sc.stop() + + setBackend(Map("spark.executor.uri" -> "hdfs://test/executor.jar")) + assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname")) + sc.stop() + + setBackend(Map("spark.mesos.network.name" -> "test")) + assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname")) + sc.stop() + + setBackend(Map( + "spark.mesos.network.name" -> "test", + "spark.executor.uri" -> "hdfs://test/executor.jar" + )) + assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname")) + sc.stop() + } + test("supports spark.scheduler.minRegisteredResourcesRatio") { val expectedCores = 1 setBackend(Map( diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala index 6b7ae900768ef..1fe1ecea4824e 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtilSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler.cluster.mesos +import org.apache.mesos.Protos.ContainerInfo.DockerInfo + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.mesos.config @@ -31,7 +33,7 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite { conf) val params = containerInfo.getDocker.getParametersList - assert(params.size() == 0) + assert(params.size() === 0) } test("ContainerInfo parses docker parameters") { @@ -42,12 +44,28 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite { val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo( conf) val params = containerInfo.getDocker.getParametersList - assert(params.size() == 3) - assert(params.get(0).getKey == "a") - assert(params.get(0).getValue == "1") - assert(params.get(1).getKey == "b") - assert(params.get(1).getValue == "2") - assert(params.get(2).getKey == "c") - assert(params.get(2).getValue == "3") + assert(params.size() === 3) + assert(params.get(0).getKey === "a") + assert(params.get(0).getValue === "1") + assert(params.get(1).getKey === "b") + assert(params.get(1).getValue === "2") + assert(params.get(2).getKey === "c") + assert(params.get(2).getValue === "3") + } + + test("SPARK-28778 ContainerInfo respects Docker network configuration") { + val networkName = "test" + val conf = new SparkConf() + conf.set(config.CONTAINERIZER, "docker") + conf.set(config.EXECUTOR_DOCKER_IMAGE, "image") + conf.set(config.NETWORK_NAME, networkName) + + val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf) + + assert(containerInfo.getDocker.getNetwork === DockerInfo.Network.USER) + val params = containerInfo.getDocker.getParametersList + assert(params.size() === 1) + assert(params.get(0).getKey === "net") + assert(params.get(0).getValue === networkName) } }