Skip to content

Commit

Permalink
SPARK-1051. On YARN, executors don't doAs submitting user
Browse files Browse the repository at this point in the history
This reopens https://github.com/apache/incubator-spark/pull/538 against the new repo

Author: Sandy Ryza <sandy@cloudera.com>

Closes apache#29 from sryza/sandy-spark-1051 and squashes the following commits:

708ce49 [Sandy Ryza] SPARK-1051. doAs submitting user in YARN
  • Loading branch information
sryza authored and tgravescs committed Feb 28, 2014
1 parent 5f419bf commit 46dff34
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 10 deletions.
18 changes: 10 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Expand Up @@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkContext, SparkException}

import scala.collection.JavaConversions._

/**
* Contains util methods to interact with Hadoop from Spark.
*/
Expand All @@ -33,15 +35,9 @@ class SparkHadoopUtil {
UserGroupInformation.setConfiguration(conf)

def runAsUser(user: String)(func: () => Unit) {
// if we are already running as the user intended there is no reason to do the doAs. It
// will actually break secure HDFS access as it doesn't fill in the credentials. Also if
// the user is UNKNOWN then we shouldn't be creating a remote unknown user
// (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
// in SparkContext.
val currentUser = Option(System.getProperty("user.name")).
getOrElse(SparkContext.SPARK_UNKNOWN_USER)
if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
if (user != SparkContext.SPARK_UNKNOWN_USER) {
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
Expand All @@ -50,6 +46,12 @@ class SparkHadoopUtil {
}
}

def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
for (token <- source.getTokens()) {
dest.addToken(token)
}
}

/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
Expand Down
Expand Up @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils

class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
Expand Down Expand Up @@ -67,6 +68,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
Expand Down Expand Up @@ -180,7 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
try {
// Copy
Expand Down
Expand Up @@ -272,6 +272,7 @@ trait ClientBase extends Logging {
ClientBase.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()

// Set the environment variables to be passed on to the Workers.
distCacheMgr.setDistFilesEnv(env)
Expand Down
Expand Up @@ -28,6 +28,10 @@ import org.apache.hadoop.conf.Configuration
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {

override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
dest.addCredentials(source.getCredentials())
}

// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
override def isYarnMode(): Boolean = { true }

Expand Down
Expand Up @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils


Expand Down Expand Up @@ -68,6 +69,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
math.max(args.numWorkers * 2, 3))

private var registered = false

private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
SparkContext.SPARK_UNKNOWN_USER)

def run() {
// Setup the directories so things go to YARN approved directories rather
Expand Down Expand Up @@ -152,7 +156,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
override def run() {
override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
try {
// Copy
Expand Down

0 comments on commit 46dff34

Please sign in to comment.