Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5158] [core] [security] Spark standalone mode can authenticate against a Kerberos-secured Hadoop cluster #4106

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,7 @@ import scala.collection.JavaConversions._
* Contains util methods to interact with Hadoop from Spark.
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration(new SparkConf())
UserGroupInformation.setConfiguration(conf)
abstract class SparkHadoopUtil extends Logging {

/**
* Runs the given function with a Hadoop UserGroupInformation as a thread local variable
Expand All @@ -54,13 +52,16 @@ class SparkHadoopUtil extends Logging {
def runAsSparkUser(func: () => Unit) {
val user = Utils.getCurrentUserName()
logDebug("running as user: " + user)
val ugi = UserGroupInformation.createRemoteUser(user)
val ugi = getAuthenticatedUgiForSparkUser(user)
transferCredentials(UserGroupInformation.getCurrentUser(), ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
}

// Log in as the Spark user, if necessary.
def loginAsSparkUser() {}

def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
for (token <- source.getTokens()) {
dest.addToken(token)
Expand Down Expand Up @@ -121,6 +122,8 @@ class SparkHadoopUtil extends Logging {
UserGroupInformation.loginUserFromKeytab(principalName, keytabFilename)
}

protected def getAuthenticatedUgiForSparkUser(user: String): UserGroupInformation

/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
Expand Down Expand Up @@ -205,7 +208,7 @@ class SparkHadoopUtil extends Logging {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, this doesn't have to be lazy

object SparkHadoopUtil {

private val hadoop = {
private lazy val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
Expand All @@ -217,7 +220,7 @@ object SparkHadoopUtil {
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
new StandaloneSparkHadoopUtil
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import java.security.PrivilegedAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkEnv, Logging}
import sun.security.jgss.krb5.Krb5InitCredential

import scala.sys.process.Process

private[spark] class StandaloneSparkHadoopUtil extends SparkHadoopUtil {

val hadoopSecurityAuthenticationKey = "spark.hadoop.security.authentication"
val principalKey = "spark.hadoop.dfs.namenode.kerberos.principal"
val keytabKey = "spark.hadoop.dfs.namenode.keytab.file"
val securityEnabledButKeyNotDefined = "Hadoop security was enabled, but %s" +
"was not set in the Spark configuration."

// Lazily evaluated upon invoking loginAsSparkUserAndReturnUGI()
lazy val loggedInUser: UserGroupInformation = {
val authenticationType = SparkEnv.get.conf.get(hadoopSecurityAuthenticationKey, "simple")
if (authenticationType.equalsIgnoreCase("kerberos")) {
logInfo("Setting up kerberos to authenticate")
SparkEnv.get.conf.getOption(principalKey) match {
case Some(principal) =>
SparkEnv.get.conf.getOption(keytabKey) match {
case Some(keytab) =>
UserGroupInformation.setConfiguration(newConfiguration(SparkEnv.get.conf))
loginUserFromKeytab(principal, keytab)
UserGroupInformation.getLoginUser()
case None =>
val errorMsg = securityEnabledButKeyNotDefined.format(keytabKey)
logError(errorMsg)
throw new IllegalStateException(errorMsg)
}
case None =>
val errorMsg = securityEnabledButKeyNotDefined.format(principalKey)
logError(errorMsg)
throw new IllegalStateException(errorMsg)
}
} else {
logInfo("Not using Kerberos to authenticate to Hadoop.")
UserGroupInformation.getCurrentUser()
}
}
override def getAuthenticatedUgiForSparkUser(user: String): UserGroupInformation = {
UserGroupInformation.createProxyUser(user, loginAsSparkUserAndReturnUGI())
}

override def loginAsSparkUser() {
loginAsSparkUserAndReturnUGI()
}

private def loginAsSparkUserAndReturnUGI(): UserGroupInformation = loggedInUser

override def newConfiguration(sparkConf: SparkConf): Configuration = {
val originalConf = super.newConfiguration(sparkConf)
originalConf.set("hadoop.security.authentication",
sparkConf.get(hadoopSecurityAuthenticationKey, "simple"))
originalConf
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,39 +125,39 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {

SignalLogger.register(log)

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)

// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val port = executorConf.getInt("spark.executor.port", 0)
val (fetcher, _) = AkkaUtils.createActorSystem(
"driverPropsFetcher",
hostname,
port,
executorConf,
new SecurityManager(executorConf))
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++
Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()

// Create SparkEnv using properties we fetched from the driver.
val driverConf = new SparkConf()
for ((key, value) <- props) {
// this is required for SSL in standalone mode
if (SparkConf.isExecutorStartupConf(key)) {
driverConf.setIfMissing(key, value)
} else {
driverConf.set(key, value)
}
// Debug code
Utils.checkHost(hostname)

// Bootstrap to fetch the driver's Spark properties.
val executorConf = new SparkConf
val port = executorConf.getInt("spark.executor.port", 0)
val (fetcher, _) = AkkaUtils.createActorSystem(
"driverPropsFetcher",
hostname,
port,
executorConf,
new SecurityManager(executorConf))
val driver = fetcher.actorSelection(driverUrl)
val timeout = AkkaUtils.askTimeout(executorConf)
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]] ++
Seq[(String, String)](("spark.app.id", appId))
fetcher.shutdown()

// Create SparkEnv using properties we fetched from the driver.
val driverConf = new SparkConf()
for ((key, value) <- props) {
// this is required for SSL in standalone mode
if (SparkConf.isExecutorStartupConf(key)) {
driverConf.setIfMissing(key, value)
} else {
driverConf.set(key, value)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please double check my merge here.

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)
}
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)

SparkHadoopUtil.get.runAsSparkUser { () =>
// SparkEnv sets spark.driver.port so it shouldn't be 0 anymore.
val boundPort = env.conf.getInt("spark.executor.port", 0)
assert(boundPort != 0)
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ class HadoopRDD[K, V](
val jobConf = getJobConf()
// add the credentials here as this can be called before SparkContext initialized
SparkHadoopUtil.get.addCredentials(jobConf)
SparkHadoopUtil.get.loginAsSparkUser()
val inputFormat = getInputFormat(jobConf)
if (inputFormat.isInstanceOf[Configurable]) {
inputFormat.asInstanceOf[Configurable].setConf(jobConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
throw new SparkException("Output value class not set")
}
SparkHadoopUtil.get.addCredentials(hadoopConf)
SparkHadoopUtil.get.loginAsSparkUser()

logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import org.apache.spark.util.Utils
* Contains util methods to interact with Hadoop from spark.
*/
class YarnSparkHadoopUtil extends SparkHadoopUtil {
val sparkConf = new SparkConf()
UserGroupInformation.setConfiguration(newConfiguration(sparkConf))

override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
dest.addCredentials(source.getCredentials())
Expand Down Expand Up @@ -71,6 +73,12 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
UserGroupInformation.getCurrentUser().addCredentials(creds)
}

override def getAuthenticatedUgiForSparkUser(user: String): UserGroupInformation = {
val ugi = UserGroupInformation.createRemoteUser(user)
transferCredentials(UserGroupInformation.getCurrentUser, ugi)
ugi
}

override def addSecretKeyToUserCredentials(key: String, secret: String) {
val creds = new Credentials()
creds.addSecretKey(new Text(key), secret.getBytes("utf-8"))
Expand Down