Skip to content

Commit

Permalink
[SPARK-28778][MESOS] Fixed executors advertised address when running …
Browse files Browse the repository at this point in the history
…in virtual network

### What changes were proposed in this pull request?
Resolves [SPARK-28778: Shuffle jobs fail due to incorrect advertised address when running in a virtual network on Mesos](https://issues.apache.org/jira/browse/SPARK-28778).

This patch fixes a bug which occurs when shuffle jobs are launched by Mesos in a virtual network. Mesos scheduler sets executor `--hostname` parameter to `0.0.0.0` in the case when `spark.mesos.network.name` is provided. This makes executors use `0.0.0.0` as their advertised address and, in the presence of shuffle, executors fail to fetch shuffle blocks from each other using `0.0.0.0` as the origin. When a virtual network is used the hostname or IP address is not known upfront and assigned to a container at its start time so the executor process needs to advertise the correct dynamically assigned address to be reachable by other executors.

Changes:
- added a fallback to `Utils.localHostName()` in Spark Executors when `--hostname` is not provided
- removed setting executor address to `0.0.0.0` from Mesos scheduler
- refactored the code related to building executor command in Mesos scheduler
- added network configuration support to Docker containerizer
- added unit tests

### Why are the changes needed?
The bug described above prevents Mesos users from running any jobs which involve shuffle due to the inability of executors to fetch shuffle blocks because of incorrect advertised address when virtual network is used.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
- added unit test to `MesosCoarseGrainedSchedulerBackendSuite` which verifies the absence of `--hostname` parameter  when `spark.mesos.network.name` is provided and its presence otherwise
- added unit test to `MesosSchedulerBackendUtilSuite` which verifies that `MesosSchedulerBackendUtil.buildContainerInfo` sets network-related properties for Docker containerizer
- unit tests from this repo launched with profiles: `./build/mvn test -Pmesos -Pnetlib-lgpl -Psparkr -Phive -Phive-thriftserver`, build log attached: [mvn.test.log](https://github.com/apache/spark/files/3516891/mvn.test.log)
- integration tests from [DCOS Spark repo](https://github.com/mesosphere/spark-build), more specifically - [test_spark_cni.py](https://github.com/mesosphere/spark-build/blob/master/tests/test_spark_cni.py) which runs a specific [shuffle job](https://github.com/mesosphere/spark-build/blob/master/tests/jobs/scala/src/main/scala/ShuffleApp.scala) and verifies its successful completion, Mesos task network configuration, and IP addresses for both Mesos and Docker containerizers

Closes #25500 from akirillov/DCOS-45840-fix-advertised-ip-in-virtual-networks.

Authored-by: Anton Kirillov <akirillov@mesosophere.io>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
  • Loading branch information
Anton Kirillov authored and dongjoon-hyun committed Aug 24, 2019
1 parent 573b1cb commit f17f1d0
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 38 deletions.
Expand Up @@ -355,8 +355,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
}
}

if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
if (hostname == null) {
hostname = Utils.localHostName()
log.info(s"Executor hostname is not provided, will use '$hostname' to advertise itself")
}

if (driverUrl == null || executorId == null || cores <= 0 || appId == null) {
printUsageAndExit(classNameForEntry)
}

Expand Down
Expand Up @@ -268,27 +268,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
.getOrElse {
throw new SparkException(s"Executor Spark home `$EXECUTOR_HOME` is not set!")
}
val runScript = new File(executorSparkHome, "./bin/spark-class").getPath
command.setValue(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
.format(prefixEnv, runScript) +
s" --driver-url $driverURL" +
s" --executor-id $taskId" +
s" --hostname ${executorHostname(offer)}" +
s" --cores $numCores" +
s" --app-id $appId")
val executable = new File(executorSparkHome, "./bin/spark-class").getPath
val runScript = "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
.format(prefixEnv, executable)

command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.get.split('/').last.split('.').head
command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
s" --driver-url $driverURL" +
s" --executor-id $taskId" +
s" --hostname ${executorHostname(offer)}" +
s" --cores $numCores" +
s" --app-id $appId")
val runScript = s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend"

command.setValue(buildExecutorCommand(runScript, taskId, numCores, offer))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get).setCache(useFetcherCache))
}

Expand All @@ -297,6 +289,28 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
command.build()
}

private def buildExecutorCommand(
runScript: String, taskId: String, numCores: Int, offer: Offer): String = {

val sb = new StringBuilder()
.append(runScript)
.append(" --driver-url ")
.append(driverURL)
.append(" --executor-id ")
.append(taskId)
.append(" --cores ")
.append(numCores)
.append(" --app-id ")
.append(appId)

if (sc.conf.get(NETWORK_NAME).isEmpty) {
sb.append(" --hostname ")
sb.append(offer.getHostname)
}

sb.toString()
}

protected def driverURL: String = {
if (conf.contains(IS_TESTING)) {
"driverURL"
Expand Down Expand Up @@ -778,15 +792,6 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private def numExecutors(): Int = {
slaves.values.map(_.taskIDs.size).sum
}

private def executorHostname(offer: Offer): String = {
if (sc.conf.get(NETWORK_NAME).isDefined) {
// The agent's IP is not visible in a CNI container, so we bind to 0.0.0.0
"0.0.0.0"
} else {
offer.getHostname
}
}
}

private class Slave(val hostname: String) {
Expand Down
Expand Up @@ -152,7 +152,9 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
.getOrElse(List.empty)

if (containerType == ContainerInfo.Type.DOCKER) {
containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
containerInfo.setDocker(
dockerInfo(image, forcePullImage, portMaps, params, conf.get(NETWORK_NAME))
)
} else {
containerInfo.setMesos(mesosInfo(image, forcePullImage))
}
Expand Down Expand Up @@ -262,13 +264,24 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
image: String,
forcePullImage: Boolean,
portMaps: List[ContainerInfo.DockerInfo.PortMapping],
params: List[Parameter]): DockerInfo = {
params: List[Parameter],
networkName: Option[String]): DockerInfo = {
val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
.setImage(image)
.setForcePullImage(forcePullImage)
portMaps.foreach(dockerBuilder.addPortMappings(_))
params.foreach(dockerBuilder.addParameters(_))

networkName.foreach { net =>
val network = Parameter.newBuilder()
.setKey("net")
.setValue(net)
.build()

dockerBuilder.setNetwork(DockerInfo.Network.USER)
dockerBuilder.addParameters(network)
}

dockerBuilder.build
}

Expand Down
Expand Up @@ -587,6 +587,30 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(networkInfos.get(0).getLabels.getLabels(1).getValue == "val2")
}

test("SPARK-28778 '--hostname' shouldn't be set for executor when virtual network is enabled") {
setBackend()
val (mem, cpu) = (backend.executorMemory(sc), 4)
val offer = createOffer("o1", "s1", mem, cpu)

assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
sc.stop()

setBackend(Map("spark.executor.uri" -> "hdfs://test/executor.jar"))
assert(backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
sc.stop()

setBackend(Map("spark.mesos.network.name" -> "test"))
assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
sc.stop()

setBackend(Map(
"spark.mesos.network.name" -> "test",
"spark.executor.uri" -> "hdfs://test/executor.jar"
))
assert(!backend.createCommand(offer, cpu, "test").getValue.contains("--hostname"))
sc.stop()
}

test("supports spark.scheduler.minRegisteredResourcesRatio") {
val expectedCores = 1
setBackend(Map(
Expand Down
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.mesos.Protos.ContainerInfo.DockerInfo

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.mesos.config

Expand All @@ -31,7 +33,7 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
conf)
val params = containerInfo.getDocker.getParametersList

assert(params.size() == 0)
assert(params.size() === 0)
}

test("ContainerInfo parses docker parameters") {
Expand All @@ -42,12 +44,28 @@ class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(
conf)
val params = containerInfo.getDocker.getParametersList
assert(params.size() == 3)
assert(params.get(0).getKey == "a")
assert(params.get(0).getValue == "1")
assert(params.get(1).getKey == "b")
assert(params.get(1).getValue == "2")
assert(params.get(2).getKey == "c")
assert(params.get(2).getValue == "3")
assert(params.size() === 3)
assert(params.get(0).getKey === "a")
assert(params.get(0).getValue === "1")
assert(params.get(1).getKey === "b")
assert(params.get(1).getValue === "2")
assert(params.get(2).getKey === "c")
assert(params.get(2).getValue === "3")
}

test("SPARK-28778 ContainerInfo respects Docker network configuration") {
val networkName = "test"
val conf = new SparkConf()
conf.set(config.CONTAINERIZER, "docker")
conf.set(config.EXECUTOR_DOCKER_IMAGE, "image")
conf.set(config.NETWORK_NAME, networkName)

val containerInfo = MesosSchedulerBackendUtil.buildContainerInfo(conf)

assert(containerInfo.getDocker.getNetwork === DockerInfo.Network.USER)
val params = containerInfo.getDocker.getParametersList
assert(params.size() === 1)
assert(params.get(0).getKey === "net")
assert(params.get(0).getValue === networkName)
}
}

0 comments on commit f17f1d0

Please sign in to comment.