Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.rest._
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.scheduler.{KerberosUser, KerberosUtil}
import org.apache.spark.util._

/**
Expand Down Expand Up @@ -151,9 +152,13 @@ object SparkSubmit extends CommandLineUtils {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)

def doRunMain(): Unit = {
if (args.principal != null && args.keytab!= null) {
KerberosUser.securize(args.principal, args.keytab)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will cause multiple UGI.loginUserFromKeytab (in yarn case it happens in SparkSubmit.prepareSubmitEnvironment) - which causes various issues.
In an application, there must be only one call to UGI.loginUserFromKeytab; if more than one, random things fail (dfs client, metastore, etc) due to the way the loginUser is cached/used and hadoop ipc renews unfortunately.

}
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
KerberosUtil.proxyUser = Option(proxyUser)
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,6 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
"either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.")
}
}

if (proxyUser != null && principal != null) {
SparkSubmit.printErrorAndExit("Only one of --proxy-user or --principal can be provided.")
}
}

private def validateKillArguments(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ class DAGScheduler(
runningStages -= stage
return
}

val tokens = KerberosUtil.getHadoopDelegationTokens
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my comment about below about how current spark (on yarn) handles security https://github.com/apache/spark/pull/16788/files#r105319503.

@tgravescs or @vanzin can correct me if I am wrong (in case I am misremembering) -
In a secure hdfs, it is not necessary for principal/keytab to be provided - if the job will finishes before token renewal is necessitated.

Given above : The call chain in KerberosUtil.getHadoopDelegationTokens will throw an exception if they are missing if ugi security is enabled.
I am not sure if this is a requirement in mesos, but it is not for yarn.

val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
Expand All @@ -1018,7 +1018,7 @@ class DAGScheduler(
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
Option(sc.applicationId), sc.applicationAttemptId, Option(tokens))
}

case stage: ResultStage =>
Expand All @@ -1028,7 +1028,7 @@ class DAGScheduler(
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId, Option(tokens))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current spark model in yarn for managing tokens is to do it out of band with the actual tasks (unlike tez/MR iirc : where then the execution model is itself different).
The tasks themselves do not propagate the credentials - the executors directly update the credentials based on driver updates.

This allows for very long running spark tasks (> 24 hours for example) to run - which per task tokens might not allow.

}
} catch {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.internal.Logging

object KerberosFunction extends Logging {

def executeSecure[U, T](tokens: Option[Array[Byte]], funct: (U => T), inputParameters: U): T = {
if (tokens.isDefined) {
KerberosUtil.useTokenAuth(tokens.get)
}
funct(inputParameters)
}
}
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/KerberosUser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging

object KerberosUser extends Logging {

def securize (principal: String, keytab: String) : Unit = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
hadoopConf.set("hadoop.security.authentication", "Kerberos")
UserGroupInformation.setConfiguration(hadoopConf)
UserGroupInformation.loginUserFromKeytab(principal, keytab)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is duplicated in KerberosUtil ?

}
108 changes: 108 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/KerberosUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import java.security.PrivilegedExceptionAction

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.mapred.Master
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.internal.Logging

object KerberosUtil extends Logging {
var proxyUser : Option[UserGroupInformation] = None
def securize (principal: String, keytab: String) : Unit = {
val hadoopConf = SparkHadoopUtil.get.newConfiguration(new SparkConf())
hadoopConf.set("hadoop.security.authentication", "Kerberos")
UserGroupInformation.setConfiguration(hadoopConf)
UserGroupInformation.loginUserFromKeytab(principal, keytab)
}


def getHadoopDelegationTokens : Array[Byte] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the frequency (and where it is invoked from) of this method - how expensive is it ?
+CC @tgravescs, @vanzin

val ugi = proxyUser match {
case Some(user) => user
case None => UserGroupInformation.getLoginUser
}
val principal = ugi.getUserName
val hadoopConf = SparkHadoopUtil.get.conf
val namenodes = Set(FileSystem.get(hadoopConf).getHomeDirectory())
logInfo(s"Found these HDFS namenodes: $namenodes")
val ugiCreds = ugi.getCredentials
ugi.doAs(new PrivilegedExceptionAction[Unit] {
override def run() = {
// use the job principal itself to renew the tokens
obtainTokensForNamenodes(namenodes, hadoopConf, ugiCreds, Some(principal))
}
})
// write tokens into a memory file to transfer it to the executors
val tokenBuf = new java.io.ByteArrayOutputStream(1024 * 1024)
ugiCreds.writeTokenStorageToStream(new java.io.DataOutputStream(tokenBuf))
logDebug(s"Wrote ${tokenBuf.size()} bytes of token data")

hadoopConf.set("hadoop.security.authentication", "Kerberos")
tokenBuf.toByteArray
}
def obtainTokensForNamenodes(
paths: Set[Path],
conf: Configuration,
creds: Credentials,
renewer: Option[String] = None
): Unit = {
if (UserGroupInformation.isSecurityEnabled()) {
val delegTokenRenewer = renewer.getOrElse(getTokenRenewer(conf))
paths.foreach { dst =>
val dstFs = dst.getFileSystem(conf)
logInfo("getting token for namenode: " + dst)
dstFs.addDelegationTokens(delegTokenRenewer, creds)
}
}

}
def getTokenRenewer(conf: Configuration): String = {
val delegTokenRenewer = Master.getMasterPrincipal(conf)
logDebug("delegation token renewer is: " + delegTokenRenewer)
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
val errorMessage = "Can't get Master Kerberos principal for use as renewer"
logError(errorMessage)
throw new SparkException(errorMessage)
}
delegTokenRenewer
}

def useTokenAuth(tokens: Array[Byte]) {
val sparkConf = SparkEnv.get.conf
logInfo(s"Found delegation tokens of ${tokens.length} bytes")

// configure to use tokens for HDFS login
val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf)
hadoopConf.set("hadoop.security.authentication", "Token")
UserGroupInformation.setConfiguration(hadoopConf)

// decode tokens and add them to the credentials
val creds = UserGroupInformation.getCurrentUser.getCredentials
val tokensBuf = new java.io.ByteArrayInputStream(tokens)
creds.readTokenStorageStream(new java.io.DataInputStream(tokensBuf))
UserGroupInformation.getCurrentUser.addCredentials(creds)
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ private[spark] class ResultTask[T, U](
serializedTaskMetrics: Array[Byte],
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
appAttemptId: Option[String] = None,
tokens: Option[Array[Byte]] = None)
extends Task[U](stageId, stageAttemptId, partition.index, localProperties, serializedTaskMetrics,
jobId, appId, appAttemptId)
jobId, appId, appAttemptId,tokens)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes to both *Task.scala is changing the security model for not just mesos, but also yarn - and this is incompatible with existing public api (credential managers, etc) : unless the PR is planning to overhaul the security in spark for all cluster managers.

with Serializable {

@transient private[this] val preferredLocs: Seq[TaskLocation] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ private[spark] class ShuffleMapTask(
serializedTaskMetrics: Array[Byte],
jobId: Option[Int] = None,
appId: Option[String] = None,
appAttemptId: Option[String] = None)
appAttemptId: Option[String] = None,
tokens: Option[Array[Byte]] = None)
extends Task[MapStatus](stageId, stageAttemptId, partition.index, localProperties,
serializedTaskMetrics, jobId, appId, appAttemptId)
serializedTaskMetrics, jobId, appId, appAttemptId,tokens)
with Logging {

/** A constructor used only in test suites. This does not require passing in an RDD. */
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ private[spark] abstract class Task[T](
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None) extends Serializable {
val appAttemptId: Option[String] = None,
val tokens: Option[Array[Byte]] = None) extends Serializable {

@transient lazy val metrics: TaskMetrics =
SparkEnv.get.closureSerializer.newInstance().deserialize(ByteBuffer.wrap(serializedTaskMetrics))
Expand Down Expand Up @@ -110,7 +111,7 @@ private[spark] abstract class Task[T](
Option(attemptNumber)).setCurrentContext()

try {
runTask(context)
KerberosFunction.executeSecure(tokens, runTask, context)
} catch {
case e: Throwable =>
// Catch all errors; run task failure callbacks, and rethrow the exception.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,11 +389,10 @@ class SparkSession private(
Dataset.ofRows(self, LogicalRelation(baseRelation))
}

/* ------------------------------- *
| Methods for creating DataSets |
* ------------------------------- */

/**
/* ------------------------------- *
| Methods for creating DataSets |
* ------------------------------- */
/**
* :: Experimental ::
* Creates a [[Dataset]] from a local Seq of data of a given type. This method requires an
* encoder (to convert a JVM object of type `T` to and from the internal Spark SQL representation)
Expand Down