Permalink
Browse files

SPARK-1051. doAs submitting user in YARN

  • Loading branch information...
1 parent 5f419bf commit 708ce49978e4b409749d8ade0c8569db28fc2305 @sryza sryza committed Feb 6, 2014
View
18 core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -25,6 +25,8 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkContext, SparkException}
+import scala.collection.JavaConversions._
+
/**
* Contains util methods to interact with Hadoop from Spark.
*/
@@ -33,15 +35,9 @@ class SparkHadoopUtil {
UserGroupInformation.setConfiguration(conf)
def runAsUser(user: String)(func: () => Unit) {
- // if we are already running as the user intended there is no reason to do the doAs. It
- // will actually break secure HDFS access as it doesn't fill in the credentials. Also if
- // the user is UNKNOWN then we shouldn't be creating a remote unknown user
- // (this is actually the path spark on yarn takes) since SPARK_USER is initialized only
- // in SparkContext.
- val currentUser = Option(System.getProperty("user.name")).
- getOrElse(SparkContext.SPARK_UNKNOWN_USER)
- if (user != SparkContext.SPARK_UNKNOWN_USER && currentUser != user) {
+ if (user != SparkContext.SPARK_UNKNOWN_USER) {
val ugi = UserGroupInformation.createRemoteUser(user)
+ transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
@@ -50,6 +46,12 @@ class SparkHadoopUtil {
}
}
+ def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
+ for (token <- source.getTokens()) {
+ dest.addToken(token)
+ }
+ }
+
/**
* Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop
* subsystems.
View
6 yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
@@ -67,6 +68,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
private var registered = false
+ private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
+ SparkContext.SPARK_UNKNOWN_USER)
+
def run() {
// Setup the directories so things go to yarn approved directories rather
// then user specified and /tmp.
@@ -180,7 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
- override def run() {
+ override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
try {
// Copy
View
1 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala
@@ -272,6 +272,7 @@ trait ClientBase extends Logging {
ClientBase.populateClasspath(yarnConf, sparkConf, log4jConfLocalRes != null, env)
env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_STAGING_DIR") = stagingDir
+ env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
// Set the environment variables to be passed on to the Workers.
distCacheMgr.setDistFilesEnv(env)
View
4 yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -28,6 +28,10 @@ import org.apache.hadoop.conf.Configuration
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {
+ override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
+ dest.addCredentials(source.getCredentials())
+ }
+
// Note that all params which start with SPARK are propagated all the way through, so if in yarn mode, this MUST be set to true.
override def isYarnMode(): Boolean = { true }
View
6 yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkConf, SparkContext, Logging}
+import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
@@ -68,6 +69,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
math.max(args.numWorkers * 2, 3))
private var registered = false
+
+ private val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(
+ SparkContext.SPARK_UNKNOWN_USER)
def run() {
// Setup the directories so things go to YARN approved directories rather
@@ -152,7 +156,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
false /* initialize */ ,
Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]])
val t = new Thread {
- override def run() {
+ override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
var successed = false
try {
// Copy

0 comments on commit 708ce49

Please sign in to comment.