From 6e22d999345af3616203a5da1112e36aef1b68a0 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 27 Mar 2015 15:33:23 -0700 Subject: [PATCH 1/2] [YARN] SPARK-6470. Add support for YARN node labels. --- docs/running-on-yarn.md | 9 ++++++ .../spark/deploy/yarn/YarnAllocator.scala | 31 ++++++++++++++++++- 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 4fb4a90307ec8..51c1339165024 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -220,6 +220,15 @@ Most of the configs are the same for Spark on YARN as for other deployment modes Otherwise, the client process will exit after submission. + + spark.yarn.executor.nodeLabelExpression + (none) + + A YARN node label expression that restricts the set of nodes executors will be scheduled on. + Only versions of YARN greater than or equal to 2.6 support node label expressions, so when + running against earlier versions, this property will be ignored. + + # Launching Spark on YARN diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 88d68d5556162..97f05d263d314 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -117,6 +117,24 @@ private[yarn] class YarnAllocator( // For testing private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true) + private val labelExpression = sparkConf.getOption("spark.yarn.executor.nodeLabelExpression") + + // ContainerRequest constructor that can take a node label expression. We grab it through + // reflection because it's only available in later versions of YARN. + private val nodeLabelConstructor = labelExpression.flatMap { expr => + try { + Some(classOf[ContainerRequest].getConstructor(classOf[Resource], + classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean], + classOf[String])) + } catch { + case e: NoSuchMethodException => { + logInfo(s"Node label expression $expr will be ignored because YARN version on classpath" + + " does not support it.") + None + } + } + } + def getNumExecutorsRunning: Int = numExecutorsRunning def getNumExecutorsFailed: Int = numExecutorsFailed @@ -211,7 +229,7 @@ private[yarn] class YarnAllocator( s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead") for (i <- 0 until missing) { - val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY) + val request = createContainerRequest(resource) amClient.addContainerRequest(request) val nodes = request.getNodes val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last @@ -230,6 +248,17 @@ private[yarn] class YarnAllocator( } } + /** + * Creates a container request, handling the reflection required to use YARN features that were + * added in recent versions. + */ + private def createContainerRequest(resource: Resource): ContainerRequest = { + nodeLabelConstructor.map { constructor => + constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: java.lang.Boolean, + labelExpression.orNull) + }.getOrElse(new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)) + } + /** * Handle containers granted by the RM by launching executors on them. * From 6af87b9b8ab0cc0b79658d29a440e4c975d9b558 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Mon, 11 May 2015 10:08:05 -0700 Subject: [PATCH 2/2] Change info to warning --- .../scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 97f05d263d314..8a08f561a2df2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -128,8 +128,8 @@ private[yarn] class YarnAllocator( classOf[String])) } catch { case e: NoSuchMethodException => { - logInfo(s"Node label expression $expr will be ignored because YARN version on classpath" + - " does not support it.") + logWarning(s"Node label expression $expr will be ignored because YARN version on" + + " classpath does not support it.") None } }