Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,14 @@ If you need a reference to the proper location to put log files in the YARN so t
running against earlier versions, this property will be ignored.
</td>
</tr>
<tr>
<td><code>spark.yarn.tags</code></td>
<td>(none)</td>
<td>
Comma-separated list of strings to pass through as YARN application tags appearing
in YARN ApplicationReports, which can be used for filtering when querying YARN apps.
</td>
</tr>
<tr>
<td><code>spark.yarn.keytab</code></td>
<td>(none)</td>
Expand Down
21 changes: 21 additions & 0 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,23 @@ private[spark] class Client(
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but I'd flip this around and do something like:

Try(appContext.getClass().getMethod("setApplicationTags", classOf[java.util.Set[String]])).foreach { setTagsMethod =>
  ...
}

That way you print the log message only once, lookup the method only once, and skip all the processing of the option if the method is not available.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW if you do this you're probably better off doing:

Try(...) match {
    case Success =>
    case Failure =>
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! I'm a bit rusty in Scala, so I could be misapplying some idioms here, but my intent on this line was to use the foreach, etc., only to access the "None or single element" of the Option, as indicated on http://www.scala-lang.org/api/current/index.html#scala.Option , so the "foreach" here is supposed to only iterate over the "single" element which happens to itself be a collection, as opposed to iterating over elements of the inner collection. So there shouldn't be any way to cause multiple log statements or multiple reflection-based lookups of the method.

I also wanted to err on the side of minimizing behavioral changes for existing setups, so that if the Option is None, then this chaining as a monad avoids ever needing to lookup a method, or even invoking any of the option-processing methods like StringUtils.getTrimmedStringCollection.

I could add a note to make it more clear that the map/filter/foreach is on the Option, as opposed to the Collection if that'd help.

Anyhow, I'll be happy to apply the reversal to start with the method as you suggest if you prefer, just wanted to hear your thoughts on this Option usage first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. setApplicationTags takes a Set[String], not a single string, so you're not calling it once per tag in the collection. My bad. Current code is fine.

.map(StringUtils.getTrimmedStringCollection(_))
.filter(!_.isEmpty())
.foreach { tagCollection =>
try {
// The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
// reflection to set it, printing a warning if a tag was specified but the YARN version
// doesn't support it.
val method = appContext.getClass().getMethod(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment here mentioning why this needs to be called via reflection

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"setApplicationTags", classOf[java.util.Set[String]])
method.invoke(appContext, new java.util.HashSet[String](tagCollection))
} catch {
case e: NoSuchMethodException =>
logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " +
"YARN does not support it")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using reflect make the code reading more difficult. generally not be recommanded.

sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
Expand Down Expand Up @@ -982,6 +999,10 @@ object Client extends Logging {
// of the executors
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"

// Comma-separated list of strings to pass through as YARN application tags appearing
// in YARN ApplicationReports, which can be used for filtering when querying YARN.
val CONF_SPARK_YARN_APPLICATION_TAGS = "spark.yarn.tags"

// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission =
FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
Expand Down
36 changes: 36 additions & 0 deletions yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.YarnClientApplication
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
Expand Down Expand Up @@ -170,6 +173,39 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
cp should contain ("/remotePath/my1.jar")
}

test("configuration and args propagate through createApplicationSubmissionContext") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you verified that this test passes both against versions of YARN that support app tags and against versions that do not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed I did before sending out the first PR:

$ sbt/sbt -Pyarn -Phadoop-2.3 -Phive -Phive-thriftserver yarn/test test-only
...
[info] Compiling 1 Scala source to /home/dhuo/github/dhuo_spark2/spark/yarn/target/scala-2.10/test-classes...
[info] YarnAllocatorSuite:
[info] - single container allocated (667 milliseconds)
[info] - some containers allocated (221 milliseconds)
[info] - receive more containers than requested (200 milliseconds)
[info] - decrease total requested executors (17 milliseconds)
[info] - decrease total requested executors to less than currently running (13 milliseconds)
[info] - kill executors (25 milliseconds)
[info] - lost executor removed from backend (26 milliseconds)
[info] - memory exceeded diagnostic regexes (5 milliseconds)
[info] YarnClusterSuite:
[info] - run Spark in yarn-client mode (14 seconds, 642 milliseconds)
[info] - run Spark in yarn-cluster mode (13 seconds, 292 milliseconds)
[info] - run Spark in yarn-cluster mode unsuccessfully (10 seconds, 150 milliseconds)
[info] - run Python application in yarn-client mode (16 seconds, 129 milliseconds)
[info] - run Python application in yarn-cluster mode (17 seconds, 323 milliseconds)
[info] - user class path first in client mode (13 seconds, 628 milliseconds)
[info] - user class path first in cluster mode (13 seconds, 120 milliseconds)
[info] ContainerPlacementStrategySuite:
[info] - allocate locality preferred containers with enough resource and no matched existed containers (838 milliseconds)
[info] - allocate locality preferred containers with enough resource and partially matched containers (21 milliseconds)
[info] - allocate locality preferred containers with limited resource and partially matched containers (16 milliseconds)
[info] - allocate locality preferred containers with fully matched containers (14 milliseconds)
[info] - allocate containers with no locality preference (16 milliseconds)
[info] YarnSparkHadoopUtilSuite:
[info] - shell script escaping (17 milliseconds)
[info] - Yarn configuration override (53 milliseconds)
[info] - test getApplicationAclsForYarn acls on (10 milliseconds)
[info] - test getApplicationAclsForYarn acls on and specify users (12 milliseconds)
[info] - test expandEnvironment result (4 milliseconds)
[info] - test getClassPathSeparator result (6 milliseconds)
[info] - check access nns empty (20 milliseconds)
[info] - check access nns unset (18 milliseconds)
[info] - check access nns (16 milliseconds)
[info] - check access nns space (17 milliseconds)
[info] - check access two nns (16 milliseconds)
[info] - check token renewer (206 milliseconds)
[info] - check token renewer default (32 milliseconds)
[info] ClientSuite:
[info] - default Yarn application classpath (29 milliseconds)
[info] - default MR application classpath (3 milliseconds)
[info] - resultant classpath for an application that defines a classpath for YARN (16 milliseconds)
[info] - resultant classpath for an application that defines a classpath for MR (11 milliseconds)
[info] - resultant classpath for an application that defines both classpaths, YARN and MR (9 milliseconds)
[info] - Local jar URIs (40 milliseconds)
[info] - Jar path propagation through SparkConf (115 milliseconds)
[info] - Cluster path translation (14 milliseconds)
[info] - configuration and args propagate through createApplicationSubmissionContext (8 milliseconds)
[info] ClientDistributedCacheManagerSuite:
[info] - test getFileStatus empty (24 milliseconds)
[info] - test getFileStatus cached (1 millisecond)
[info] - test addResource (10 milliseconds)
[info] - test addResource link null (3 milliseconds)
[info] - test addResource appmaster only (1 millisecond)
[info] - test addResource archive (7 milliseconds)
[info] ScalaTest
[info] Run completed in 2 minutes.
[info] Total number of tests run: 48
[info] Suites: completed 6, aborted 0
[info] Tests: succeeded 48, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[info] Passed: Total 48, Failed 0, Errors 0, Passed 48
[success] Total time: 136 s, completed Aug 7, 2015 6:47:43 PM

$ sbt/sbt -Pyarn -Phadoop-2.4 -Phive -Phive-thriftserver yarn/test
...
[info] YarnAllocatorSuite:
[info] - single container allocated (1 second, 508 milliseconds)
[info] - some containers allocated (163 milliseconds)
[info] - receive more containers than requested (160 milliseconds)
[info] - decrease total requested executors (26 milliseconds)
[info] - decrease total requested executors to less than currently running (13 milliseconds)
[info] - kill executors (96 milliseconds)
[info] - lost executor removed from backend (48 milliseconds)
[info] - memory exceeded diagnostic regexes (13 milliseconds)
[info] YarnClusterSuite:
[info] - run Spark in yarn-client mode (22 seconds, 554 milliseconds)
[info] - run Spark in yarn-cluster mode (15 seconds, 772 milliseconds)
[info] - run Spark in yarn-cluster mode unsuccessfully (10 seconds, 285 milliseconds)
[info] - run Python application in yarn-client mode (18 seconds, 747 milliseconds)
[info] - run Python application in yarn-cluster mode (16 seconds, 356 milliseconds)
[info] - user class path first in client mode (13 seconds, 841 milliseconds)
[info] - user class path first in cluster mode (14 seconds, 344 milliseconds)
[info] ContainerPlacementStrategySuite:
[info] - allocate locality preferred containers with enough resource and no matched existed containers (719 milliseconds)
[info] - allocate locality preferred containers with enough resource and partially matched containers (14 milliseconds)
[info] - allocate locality preferred containers with limited resource and partially matched containers (15 milliseconds)
[info] - allocate locality preferred containers with fully matched containers (14 milliseconds)
[info] - allocate containers with no locality preference (16 milliseconds)
[info] YarnSparkHadoopUtilSuite:
[info] - shell script escaping (18 milliseconds)
[info] - Yarn configuration override (58 milliseconds)
[info] - test getApplicationAclsForYarn acls on (11 milliseconds)
[info] - test getApplicationAclsForYarn acls on and specify users (17 milliseconds)
[info] - test expandEnvironment result (9 milliseconds)
[info] - test getClassPathSeparator result (5 milliseconds)
[info] - check access nns empty (22 milliseconds)
[info] - check access nns unset (21 milliseconds)
[info] - check access nns (22 milliseconds)
[info] - check access nns space (17 milliseconds)
[info] - check access two nns (18 milliseconds)
[info] - check token renewer (176 milliseconds)
[info] - check token renewer default (82 milliseconds)
[info] ClientSuite:
[info] - default Yarn application classpath (293 milliseconds)
[info] - default MR application classpath (5 milliseconds)
[info] - resultant classpath for an application that defines a classpath for YARN (50 milliseconds)
[info] - resultant classpath for an application that defines a classpath for MR (19 milliseconds)
[info] - resultant classpath for an application that defines both classpaths, YARN and MR (16 milliseconds)
[info] - Local jar URIs (87 milliseconds)
[info] - Jar path propagation through SparkConf (172 milliseconds)
[info] - Cluster path translation (12 milliseconds)
[info] - configuration and args propagate through createApplicationSubmissionContext (62 milliseconds)
[info] ClientDistributedCacheManagerSuite:
[info] - test getFileStatus empty (30 milliseconds)
[info] - test getFileStatus cached (0 milliseconds)
[info] - test addResource (12 milliseconds)
[info] - test addResource link null (3 milliseconds)
[info] - test addResource appmaster only (1 millisecond)
[info] - test addResource archive (14 milliseconds)
[info] ScalaTest
[info] Run completed in 2 minutes, 18 seconds.
[info] Total number of tests run: 48
[info] Suites: completed 6, aborted 0
[info] Tests: succeeded 48, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[info] Passed: Total 48, Failed 0, Errors 0, Passed 48
[success] Total time: 156 s, completed Aug 10, 2015 10:58:20 AM

Kicking off a new round of build/tests locally now after the changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests were green with both a 2.3 and 2.4 build. Thanks for the help reviewing!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Console output from my test runs here: https://gist.github.com/dennishuo/79212ba4f8a3bfb71227

val conf = new Configuration()
// When parsing tags, duplicates and leading/trailing whitespace should be removed.
// Spaces between non-comma strings should be preserved as single tags. Empty strings may or
// may not be removed depending on the version of Hadoop being used.
val sparkConf = new SparkConf()
.set(Client.CONF_SPARK_YARN_APPLICATION_TAGS, ",tag1, dup,tag2 , ,multi word , dup")
.set("spark.yarn.maxAppAttempts", "42")
val args = new ClientArguments(Array(
"--name", "foo-test-app",
"--queue", "staging-queue"), sparkConf)

val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])

val client = new Client(args, conf, sparkConf)
client.createApplicationSubmissionContext(
new YarnClientApplication(getNewApplicationResponse, appContext),
containerLaunchContext)

appContext.getApplicationName should be ("foo-test-app")
appContext.getQueue should be ("staging-queue")
appContext.getAMContainerSpec should be (containerLaunchContext)
appContext.getApplicationType should be ("SPARK")
appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method =>
val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]]
tags should contain allOf ("tag1", "dup", "tag2", "multi word")
tags.filter(!_.isEmpty).size should be (4)
}
appContext.getMaxAppAttempts should be (42)
}

object Fixtures {

val knownDefYarnAppCP: Seq[String] =
Expand Down