diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index e0a32fb65cd51..fc33ad0ab7f17 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -39,9 +39,7 @@ import scala.collection.JavaConversions._ * Contains util methods to interact with Hadoop from Spark. */ @DeveloperApi -class SparkHadoopUtil extends Logging { - val conf: Configuration = newConfiguration(new SparkConf()) - UserGroupInformation.setConfiguration(conf) +abstract class SparkHadoopUtil extends Logging { /** * Runs the given function with a Hadoop UserGroupInformation as a thread local variable @@ -54,13 +52,16 @@ class SparkHadoopUtil extends Logging { def runAsSparkUser(func: () => Unit) { val user = Utils.getCurrentUserName() logDebug("running as user: " + user) - val ugi = UserGroupInformation.createRemoteUser(user) + val ugi = getAuthenticatedUgiForSparkUser(user) transferCredentials(UserGroupInformation.getCurrentUser(), ugi) ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) } + // Log in as the Spark user, if necessary. + def loginAsSparkUser() {} + def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { for (token <- source.getTokens()) { dest.addToken(token) @@ -121,6 +122,8 @@ class SparkHadoopUtil extends Logging { UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename) } + protected def getAuthenticatedUgiForSparkUser(user: String): UserGroupInformation + /** * Returns a function that can be called to find Hadoop FileSystem bytes read. If * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will @@ -205,7 +208,7 @@ class SparkHadoopUtil extends Logging { object SparkHadoopUtil { - private val hadoop = { + private lazy val hadoop = { val yarnMode = java.lang.Boolean.valueOf( System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) if (yarnMode) { @@ -217,7 +220,7 @@ object SparkHadoopUtil { case e: Exception => throw new SparkException("Unable to load YARN support", e) } } else { - new SparkHadoopUtil + new StandaloneSparkHadoopUtil } } diff --git a/core/src/main/scala/org/apache/spark/deploy/StandaloneSparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/StandaloneSparkHadoopUtil.scala new file mode 100644 index 0000000000000..2611d8a84ca0a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/StandaloneSparkHadoopUtil.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.security.PrivilegedAction + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.UserGroupInformation +import org.apache.spark.{SparkConf, SparkEnv, Logging} +import sun.security.jgss.krb5.Krb5InitCredential + +import scala.sys.process.Process + +private[spark] class StandaloneSparkHadoopUtil extends SparkHadoopUtil { + + val hadoopSecurityAuthenticationKey = "spark.hadoop.security.authentication" + val principalKey = "spark.hadoop.dfs.namenode.kerberos.principal" + val keytabKey = "spark.hadoop.dfs.namenode.keytab.file" + val securityEnabledButKeyNotDefined = "Hadoop security was enabled, but %s" + + "was not set in the Spark configuration." + + // Lazily evaluated upon invoking loginAsSparkUserAndReturnUGI() + lazy val loggedInUser: UserGroupInformation = { + val authenticationType = SparkEnv.get.conf.get(hadoopSecurityAuthenticationKey, "simple") + if (authenticationType.equalsIgnoreCase("kerberos")) { + logInfo("Setting up kerberos to authenticate") + SparkEnv.get.conf.getOption(principalKey) match { + case Some(principal) => + SparkEnv.get.conf.getOption(keytabKey) match { + case Some(keytab) => + UserGroupInformation.setConfiguration(newConfiguration(SparkEnv.get.conf)) + loginUserFromKeytab(principal, keytab) + UserGroupInformation.getLoginUser() + case None => + val errorMsg = securityEnabledButKeyNotDefined.format(keytabKey) + logError(errorMsg) + throw new IllegalStateException(errorMsg) + } + case None => + val errorMsg = securityEnabledButKeyNotDefined.format(principalKey) + logError(errorMsg) + throw new IllegalStateException(errorMsg) + } + } else { + logInfo("Not using Kerberos to authenticate to Hadoop.") + UserGroupInformation.getCurrentUser() + } + } + override def getAuthenticatedUgiForSparkUser(user: String): UserGroupInformation = { + UserGroupInformation.createProxyUser(user, loginAsSparkUserAndReturnUGI()) + } + + override def loginAsSparkUser() { + loginAsSparkUserAndReturnUGI() + } + + private def loginAsSparkUserAndReturnUGI(): UserGroupInformation = loggedInUser + + override def newConfiguration(sparkConf: SparkConf): Configuration = { + val originalConf = super.newConfiguration(sparkConf) + originalConf.set("hadoop.security.authentication", + sparkConf.get(hadoopSecurityAuthenticationKey, "simple")) + originalConf + } + +} diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index dd19e4947db1e..c2acc972cda90 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -125,39 +125,39 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { SignalLogger.register(log) - SparkHadoopUtil.get.runAsSparkUser { () => - // Debug code - Utils.checkHost(hostname) - - // Bootstrap to fetch the driver's Spark properties. - val executorConf = new SparkConf - val port = executorConf.getInt("spark.executor.port", 0) - val (fetcher, _) = AkkaUtils.createActorSystem( - "driverPropsFetcher", - hostname, - port, - executorConf, - new SecurityManager(executorConf)) - val driver = fetcher.actorSelection(driverUrl) - val timeout = AkkaUtils.askTimeout(executorConf) - val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) - val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++ - Seq[(String, String)](("spark.app.id", appId)) - fetcher.shutdown() - - // Create SparkEnv using properties we fetched from the driver. - val driverConf = new SparkConf() - for ((key, value) <- props) { - // this is required for SSL in standalone mode - if (SparkConf.isExecutorStartupConf(key)) { - driverConf.setIfMissing(key, value) - } else { - driverConf.set(key, value) - } + // Debug code + Utils.checkHost(hostname) + + // Bootstrap to fetch the driver's Spark properties. + val executorConf = new SparkConf + val port = executorConf.getInt("spark.executor.port", 0) + val (fetcher, _) = AkkaUtils.createActorSystem( + "driverPropsFetcher", + hostname, + port, + executorConf, + new SecurityManager(executorConf)) + val driver = fetcher.actorSelection(driverUrl) + val timeout = AkkaUtils.askTimeout(executorConf) + val fut = Patterns.ask(driver, RetrieveSparkProps, timeout) + val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++ + Seq[(String, String)](("spark.app.id", appId)) + fetcher.shutdown() + + // Create SparkEnv using properties we fetched from the driver. + val driverConf = new SparkConf() + for ((key, value) <- props) { + // this is required for SSL in standalone mode + if (SparkConf.isExecutorStartupConf(key)) { + driverConf.setIfMissing(key, value) + } else { + driverConf.set(key, value) } - val env = SparkEnv.createExecutorEnv( - driverConf, executorId, hostname, port, cores, isLocal = false) + } + val env = SparkEnv.createExecutorEnv( + driverConf, executorId, hostname, port, cores, isLocal = false) + SparkHadoopUtil.get.runAsSparkUser { () => // SparkEnv sets spark.driver.port so it shouldn't be 0 anymore. val boundPort = env.conf.getInt("spark.executor.port", 0) assert(boundPort != 0) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 486e86ce1bb19..a4abaff1443a6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -196,6 +196,7 @@ class HadoopRDD[K, V]( val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) + SparkHadoopUtil.get.loginAsSparkUser() val inputFormat = getInputFormat(jobConf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(jobConf) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 955b42c3baaa1..76fe8e436f4eb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -1043,6 +1043,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) throw new SparkException("Output value class not set") } SparkHadoopUtil.get.addCredentials(hadoopConf) + SparkHadoopUtil.get.loginAsSparkUser() logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 146b2c0f1a302..23946e4ba30e2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -42,6 +42,8 @@ import org.apache.spark.util.Utils * Contains util methods to interact with Hadoop from spark. */ class YarnSparkHadoopUtil extends SparkHadoopUtil { + val sparkConf = new SparkConf() + UserGroupInformation.setConfiguration(newConfiguration(sparkConf)) override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { dest.addCredentials(source.getCredentials()) @@ -71,6 +73,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { UserGroupInformation.getCurrentUser().addCredentials(creds) } + override def getAuthenticatedUgiForSparkUser(user: String): UserGroupInformation = { + val ugi = UserGroupInformation.createRemoteUser(user) + transferCredentials(UserGroupInformation.getCurrentUser, ugi) + ugi + } + override def addSecretKeyToUserCredentials(key: String, secret: String) { val creds = new Credentials() creds.addSecretKey(new Text(key), secret.getBytes("utf-8"))