From 73f9df224246e04163847419ef082278014c6a2a Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Thu, 18 Jan 2018 22:05:32 -0800 Subject: [PATCH] [SPARK-12963][CORE] NM host for driver end points --- .../org/apache/spark/ui/JettyUtils.scala | 8 +++-- .../spark/deploy/yarn/YarnRMClient.scala | 4 +-- .../spark/deploy/yarn/YarnClusterSuite.scala | 34 ++++++++++++++++++- 3 files changed, 40 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index 0adeb4058b6e4..b039160522b03 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -342,13 +342,15 @@ private[spark] object JettyUtils extends Logging { -1, -1, connectionFactories: _*) + connector.setHost(hostName) connector.setPort(port) - connector.start() - // Currently we only use "SelectChannelConnector" // Limit the max acceptor number to 8 so that we don't waste a lot of threads connector.setAcceptQueueSize(math.min(connector.getAcceptors, 8)) - connector.setHost(hostName) + // Done with connector configuration, start it + + connector.start() + // The number of selectors always equals to the number of acceptors minThreads += connector.getAcceptors * 2 diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index c1ae12aabb8cc..6e4744915a257 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -28,8 +28,8 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.rpc.RpcEndpointRef -import org.apache.spark.util.Utils /** * Handles registering and unregistering the application with the YARN ResourceManager. @@ -71,7 +71,7 @@ private[spark] class YarnRMClient extends Logging { logInfo("Registering the ApplicationMaster") synchronized { - amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl) + amClient.registerApplicationMaster(sparkConf.get(DRIVER_BIND_ADDRESS), 0, trackingUrl) registered = true } new YarnAllocator(driverUrl, driverRef, conf, sparkConf, amClient, getAttemptId(), securityMgr, diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index a129be7c06b53..34f5e8bf9a9de 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.yarn import java.io.File -import java.net.URL import java.nio.charset.StandardCharsets import java.util.{HashMap => JHashMap} @@ -136,6 +135,39 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, result) } + private def testClusterDriverBind( + uiEnabled: Boolean, + localHost: String, + localIp: String, + success: Boolean): Unit = { + val result = File.createTempFile("result", null, tempDir) + val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass), + appArgs = Seq(result.getAbsolutePath()), + extraConf = Map( + "spark.yarn.appMasterEnv.SPARK_LOCAL_HOSTNAME" -> localHost, + "spark.yarn.appMasterEnv.SPARK_LOCAL_IP" -> localIp, + "spark.ui.enabled" -> uiEnabled.toString + )) + if (success) { + checkResult(finalState, result, "success") + } else { + finalState should be (SparkAppHandle.State.FAILED) + } + } + + test("yarn-cluster driver should be able to bind listeners to MM_HOST") { + testClusterDriverBind(uiEnabled = true, "$NM_HOST", "$NM_HOST", success = true) + } + + private val unbindableIP = "1.1.1.1" + test("yarn-cluster driver works when SPARK_LOCAL_IP is invalid without UI") { + testClusterDriverBind(uiEnabled = false, "$NM_HOST", unbindableIP, success = true) + } + + test("yarn-cluster driver fails when SPARK_LOCAL_IP is invalid with UI") { + testClusterDriverBind(uiEnabled = true, "$NM_HOST", unbindableIP, success = false) + } + test("run Spark in yarn-client mode with additional jar") { testWithAddJar(true) }