Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for accessing secured HDFS #265

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ class SparkContext(
}
executorEnvs("SPARK_USER") = sparkUser

// Need to do security authentication when Hadoop security is turned on
if (SparkHadoopUtil.get.isSecurityEnabled()) {
SparkHadoopUtil.get.doUserAuthentication(this)
}

// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
taskScheduler.start()
Expand Down
171 changes: 169 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,25 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkContext, SparkException}
import org.apache.spark._

import scala.collection.JavaConversions._
import java.util.{Collection, TimerTask, Timer}
import java.io.{File, IOException}
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import org.apache.hadoop.security.token.{TokenIdentifier, Token}
import org.apache.hadoop.fs.permission.FsPermission

/**
* Contains util methods to interact with Hadoop from Spark.
*/
class SparkHadoopUtil {
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)

val sparkConf = new SparkConf()

def runAsUser(user: String)(func: () => Unit) {
if (user != SparkContext.SPARK_UNKNOWN_USER) {
val ugi = UserGroupInformation.createRemoteUser(user)
Expand Down Expand Up @@ -75,6 +83,165 @@ class SparkHadoopUtil {

def getSecretKeyFromUserCredentials(key: String): Array[Byte] = { null }

/**
* Return whether Hadoop security is enabled or not.
*
* @return Whether Hadoop security is enabled or not
*/
def isSecurityEnabled(): Boolean = {
UserGroupInformation.isSecurityEnabled
}

/**
* Do user authentication when Hadoop security is turned on. Used by the driver.
*
* @param sc Spark context
*/
def doUserAuthentication(sc: SparkContext) {
getAuthenticationType match {
case "keytab" => {
// Authentication through a Kerberos keytab file. Necessary for
// long-running services like Shark/Spark Streaming.
scheduleKerberosRenewTask(sc)
}
case _ => {
// No authentication needed. Assuming authentication is already done
// before Spark is launched, e.g., the user has authenticated with
// Kerberos through kinit already.
// Renew a Hadoop delegation token and store the token into a file.
// Add the token file so it gets downloaded by every slave nodes.
sc.addFile(initDelegationToken().toString)
}
}
}

/**
* Get the user whom the task belongs to.
*
* @param userName Name of the user whom the task belongs to
* @return The user whom the task belongs to
*/
def getTaskUser(userName: String): UserGroupInformation = {
val ugi = UserGroupInformation.createRemoteUser(userName)
// Change the authentication method to Kerberos
ugi.setAuthenticationMethod(
UserGroupInformation.AuthenticationMethod.KERBEROS)
// Get and add Hadoop delegation tokens for the user
val iter = getDelegationTokens().iterator()
while (iter.hasNext) {
ugi.addToken(iter.next())
}

ugi
}

/**
* Get the type of Hadoop security authentication.
*
* @return Type of Hadoop security authentication
*/
private def getAuthenticationType: String = {
sparkConf.get("spark.hadoop.security.authentication")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this not have a default value?

}

/**
* Schedule a timer task for automatically renewing Kerberos credential.
*
* @param sc Spark context
*/
private def scheduleKerberosRenewTask(sc: SparkContext) {
val kerberosRenewTimer = new Timer()
val kerberosRenewTimerTask = new TimerTask {
def run() {
try {
kerberosLoginFromKeytab
// Renew a Hadoop delegation token and store the token into a file.
// Add the token file so it gets downloaded by every slave nodes.
sc.addFile(initDelegationToken().toString)
} catch {
case ioe: IOException => {
logError("Failed to login from Kerberos keytab", ioe)
}
}
}
}

val interval = sparkConf.getLong(
"spark.hadoop.security.kerberos.renewInterval", 21600000)
kerberosRenewTimer.schedule(kerberosRenewTimerTask, 0, interval)
logInfo("Scheduled timer task for renewing Kerberos credential")
}

/**
* Log a user in from a keytab file. Loads user credential from a keytab
* file and logs the user in.
*/
private def kerberosLoginFromKeytab() {
val defaultKeytab = System.getProperty("user.home") + Path.SEPARATOR +
System.getProperty("user.name") + ".keytab"
val keytab = sparkConf.get(
"spark.hadoop.security.kerberos.keytab", defaultKeytab)
val principal = sparkConf.get(
"spark.hadoop.security.kerberos.principal", System.getProperty("user.name"))

// Keytab file not found
if (!new File(keytab).exists()) {
throw new IOException("Keytab file %s not found".format(keytab))
}

UserGroupInformation.loginUserFromKeytab(principal, keytab)
}

/**
* Initialize a Hadoop delegation token, store the token into a file,
* and add it to the SparkContext so executors can get it.
*
* @return URI of the token file
*/
private def initDelegationToken(): URI = {
val localFS = FileSystem.getLocal(conf)
// Store the token file under user's home directory
val tokenFile = new Path(localFS.getHomeDirectory, sparkConf.get(
"spark.hadoop.security.token.name", "spark.token"))
if (localFS.exists(tokenFile)) {
localFS.delete(tokenFile, false)
}

// Get a new token and write it to the given token file
val currentUser = UserGroupInformation.getCurrentUser
val fs = FileSystem.get(conf)
val token: Token[_ <: TokenIdentifier] =
fs.getDelegationToken(currentUser.getShortUserName)
.asInstanceOf[Token[_ <: TokenIdentifier]]
val cred = new Credentials()
cred.addToken(token.getService, token)
cred.writeTokenStorageFile(tokenFile, conf)
// Make sure the token file is read-only to the owner
localFS.setPermission(tokenFile, FsPermission.createImmutable(0400))

logInfo("Stored Hadoop delegation token for user %s to file %s".format(
currentUser.getShortUserName, tokenFile.toUri.toString))
tokenFile.toUri
}

/**
* Get delegation tokens from the token file added through SparkContext.addFile().
*
* @return Collection of delegation tokens
*/
private def getDelegationTokens(): Collection[Token[_ <: TokenIdentifier]] = {
// Get the token file added through SparkContext.addFile()
val source = new File(SparkFiles.get(sparkConf.get(
"spark.hadoop.security.token.name", "spark.token")))
if (source.exists()) {
val sourcePath = new Path("file://" + source.getAbsolutePath)
// Read credentials from the token file
Credentials.readTokenStorageFile(sourcePath, conf).getAllTokens
} else {
throw new IOException(
"Token file %s does not exist".format(source.getAbsolutePath))
}
}
}

object SparkHadoopUtil {
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.{AkkaUtils, Utils}
import java.security.PrivilegedExceptionAction

/**
* Spark executor used with Mesos, YARN, and the standalone scheduler.
Expand Down Expand Up @@ -173,7 +174,7 @@ private[spark] class Executor(
}
}

override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () =>
override def run() {
val startTime = System.currentTimeMillis()
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(replClassLoader)
Expand All @@ -188,7 +189,8 @@ private[spark] class Executor(
try {
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
val (userName, taskFiles, taskJars, taskBytes) =
Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)

Expand All @@ -208,7 +210,19 @@ private[spark] class Executor(

// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
var value: Any = None
if (SparkHadoopUtil.get.isSecurityEnabled()) {
// Get the user whom the task belongs to
val ugi = SparkHadoopUtil.get.getTaskUser(userName)
// Run the task as the user whom the task belongs to
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run(): Unit = {
value = task.run(taskId.toInt)
}
})
} else {
value = task.run(taskId.toInt)
}
val taskFinish = System.currentTimeMillis()

// If the task has been killed, let's fail it.
Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private[spark] object Task {
* Serialize a task and the current app dependencies (files and JARs added to the SparkContext)
*/
def serializeWithDependencies(
userName: String,
task: Task[_],
currentFiles: HashMap[String, Long],
currentJars: HashMap[String, Long],
Expand All @@ -107,6 +108,9 @@ private[spark] object Task {
val out = new FastByteArrayOutputStream(4096)
val dataOut = new DataOutputStream(out)

// Write the name of the user launching the task
dataOut.writeUTF(userName)

// Write currentFiles
dataOut.writeInt(currentFiles.size)
for ((name, timestamp) <- currentFiles) {
Expand Down Expand Up @@ -134,14 +138,17 @@ private[spark] object Task {
* and return the task itself as a serialized ByteBuffer. The caller can then update its
* ClassLoaders and deserialize the task.
*
* @return (taskFiles, taskJars, taskBytes)
* @return (userName, taskFiles, taskJars, taskBytes)
*/
def deserializeWithDependencies(serializedTask: ByteBuffer)
: (HashMap[String, Long], HashMap[String, Long], ByteBuffer) = {
: (String, HashMap[String, Long], HashMap[String, Long], ByteBuffer) = {

val in = new ByteBufferInputStream(serializedTask)
val dataIn = new DataInputStream(in)

// Read the name of the user launching the task
val userName = dataIn.readUTF()

// Read task's files
val taskFiles = new HashMap[String, Long]()
val numFiles = dataIn.readInt()
Expand All @@ -158,6 +165,6 @@ private[spark] object Task {

// Create a sub-buffer for the rest of the data, which is the serialized Task object
val subBuffer = serializedTask.slice() // ByteBufferInputStream will have read just up to task
(taskFiles, taskJars, subBuffer)
(userName, taskFiles, taskJars, subBuffer)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -410,10 +410,11 @@ private[spark] class TaskSetManager(
lastLaunchTime = curTime
// Serialize and return the task
val startTime = clock.getTime()
val userName = System.getProperty("user.name")
// We rely on the DAGScheduler to catch non-serializable closures and RDDs, so in here
// we assume the task can be serialized without exceptions.
val serializedTask = Task.serializeWithDependencies(
task, sched.sc.addedFiles, sched.sc.addedJars, ser)
userName, task, sched.sc.addedFiles, sched.sc.addedJars, ser)
val timeTaken = clock.getTime() - startTime
addRunningTask(taskId)
logInfo("Serialized task %s:%d as %d bytes in %d ms".format(
Expand Down
35 changes: 35 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,41 @@ Apart from these, the following properties are also available, and may be useful
Number of cores to allocate for each task.
</td>
</tr>
<tr>
<td>spark.hadoop.security.authentication</td>
<td>(none)</td>
<td>
Method used for authenticating user when Hadoop security is turned on. A Hadoop delegation token can be obtained only after the user is authenticated.
</td>
</tr>
<tr>
<td>spark.hadoop.security.kerberos.renewInterval</td>
<td>21600000</td>
<td>
Interval for automatically renewing the Kerberos credential when Hadoop security is turned on and Kerberos is the method for user authentication.
</td>
</tr>
<tr>
<td>spark.hadoop.security.kerberos.keytab</td>
<td>{Current login user name}.keytab under the home directory of the current login user</td>
<td>
Local path of the Kerberos keytab file. The keytab usually is located on the gateway host to the Spark cluster.
</td>
</tr>
<tr>
<td>spark.hadoop.security.kerberos.principal</td>
<td>Current login user name</td>
<td>
Principal used for Kerberos login.
</td>
</tr>
<tr>
<td>spark.hadoop.security.token.name</td>
<td>spark.token</td>
<td>
Name of the file storing the Hadoop delegation token obtained by the driver.
</td>
</tr>
</table>

## Viewing Spark Properties
Expand Down