From 82eff3061c696c3f97b6da83be476f9a5e096861 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 18 Nov 2015 09:36:40 +0800 Subject: [PATCH 1/4] Add label expression support for AM --- .../org/apache/spark/deploy/yarn/Client.scala | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index a3f33d80184a3..cb81eaccade1e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -225,7 +225,26 @@ private[spark] class Client( val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) capability.setVirtualCores(args.amCores) - appContext.setResource(capability) + + val amRequest = Records.newRecord(classOf[ResourceRequest]) + amRequest.setResourceName(ResourceRequest.ANY) + amRequest.setPriority(Priority.newInstance(0)) + amRequest.setCapability(capability) + amRequest.setNumContainers(1) + amRequest.setRelaxLocality(true) + if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) { + try { + val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression") + val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String]) + method.invoke(amRequest, amLabelExpression) + } catch { + case e: NoSuchMethodException => + logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " + + "of YARN does not support it") + } + } + appContext.setAMContainerResourceRequest(amRequest) + appContext } From 1abbe52b4e815a647d9e23362fc3313cf70d9ea0 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 18 Nov 2015 15:51:42 +0800 Subject: [PATCH 2/4] Add doc --- docs/running-on-yarn.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index db6bfa69ee0fe..925a1e0ba6fcf 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -326,6 +326,15 @@ If you need a reference to the proper location to put log files in the YARN so t Otherwise, the client process will exit after submission. + + spark.yarn.am.nodeLabelExpression + (none) + + A YARN node label expression that restricts the set of nodes AM will be scheduled on. + Only versions of YARN greater than or equal to 2.6 support node label expressions, so when + running against earlier versions, this property will be ignored. + + spark.yarn.executor.nodeLabelExpression (none) From ea1564f27c1995c8b8de4b61f6d0ae969141df0c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Wed, 18 Nov 2015 16:53:58 +0800 Subject: [PATCH 3/4] Fix compilation error --- .../org/apache/spark/deploy/yarn/Client.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index cb81eaccade1e..88d3998d20ad1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -226,24 +226,30 @@ private[spark] class Client( capability.setMemory(args.amMemory + amMemoryOverhead) capability.setVirtualCores(args.amCores) - val amRequest = Records.newRecord(classOf[ResourceRequest]) - amRequest.setResourceName(ResourceRequest.ANY) - amRequest.setPriority(Priority.newInstance(0)) - amRequest.setCapability(capability) - amRequest.setNumContainers(1) - amRequest.setRelaxLocality(true) if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) { try { + val amRequest = Records.newRecord(classOf[ResourceRequest]) + amRequest.setResourceName(ResourceRequest.ANY) + amRequest.setPriority(Priority.newInstance(0)) + amRequest.setCapability(capability) + amRequest.setNumContainers(1) + amRequest.setRelaxLocality(true) val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression") val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String]) method.invoke(amRequest, amLabelExpression) + + val setResourceRequestMethod = + appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest]) + setResourceRequestMethod.invoke(appContext, amRequest) } catch { case e: NoSuchMethodException => logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " + "of YARN does not support it") + appContext.setResource(capability) } + } else { + appContext.setResource(capability) } - appContext.setAMContainerResourceRequest(amRequest) appContext } From 6f6d1c55167afe113f6f571d790ca9bd2d92d224 Mon Sep 17 00:00:00 2001 From: jerryshao Date: Fri, 20 Nov 2015 16:13:42 +0800 Subject: [PATCH 4/4] Remove unnecessary parameter --- yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 88d3998d20ad1..59db835712ce3 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -233,7 +233,6 @@ private[spark] class Client( amRequest.setPriority(Priority.newInstance(0)) amRequest.setCapability(capability) amRequest.setNumContainers(1) - amRequest.setRelaxLocality(true) val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression") val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String]) method.invoke(amRequest, amLabelExpression)