Skip to content

Commit

Permalink
apache#14 export spark ugi
Browse files Browse the repository at this point in the history
  • Loading branch information
hn5092 committed Jan 29, 2019
1 parent 5112cff commit 7ea7c38
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 11 deletions.
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Expand Up @@ -25,6 +25,7 @@ import scala.collection.mutable
import scala.util.Properties

import com.google.common.collect.MapMaker
import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
Expand Down Expand Up @@ -141,6 +142,17 @@ object SparkEnv extends Logging {
private[spark] val driverSystemName = "sparkDriver"
private[spark] val executorSystemName = "sparkExecutor"

var cachedUGI: UserGroupInformation = _

def setUGI(ugi: UserGroupInformation): Unit = {
cachedUGI = ugi
logInfo(s"Set UGI $ugi")
}

def getUGI(): UserGroupInformation = {
cachedUGI
}

def set(e: SparkEnv) {
env = e
}
Expand Down
Expand Up @@ -36,7 +36,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.{Token, TokenIdentifier}
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand All @@ -61,7 +61,9 @@ class SparkHadoopUtil extends Logging {
* do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems
*/
def runAsSparkUser(func: () => Unit) {
createSparkUser().doAs(new PrivilegedExceptionAction[Unit] {
val ugi = createSparkUser()
SparkEnv.setUGI(ugi)
ugi.doAs(new PrivilegedExceptionAction[Unit] {
def run: Unit = func()
})
}
Expand Down Expand Up @@ -147,8 +149,12 @@ class SparkHadoopUtil extends Logging {
UserGroupInformation.setConfiguration(newConfiguration(sparkConf))
val creds = deserialize(tokens)
logInfo("Updating delegation tokens for current user.")
logDebug(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}")
addCurrentUserCredentials(creds)
val user = UserGroupInformation.getCurrentUser
logInfo(s"Spark user hashcode : ${user.hashCode()}")
user.getTokens
.asScala.map(tokenToString).foreach(token => logInfo(token))
}

/**
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.deploy.security

import scala.collection.JavaConverters._
import scala.util.Try

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
Expand Down Expand Up @@ -52,7 +51,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
if (tokenRenewalInterval == null) {
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens)
}

logInfo(s"Token renew interval is :$tokenRenewalInterval")
// Get the time of next renewal.
val nextRenewalDate = tokenRenewalInterval.flatMap { interval =>
val nextRenewalDates = fetchCreds.getAllTokens.asScala
Expand All @@ -61,11 +60,12 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
val identifier = token
.decodeIdentifier()
.asInstanceOf[AbstractDelegationTokenIdentifier]
logInfo(s"Current token renew interval is :${identifier.getIssueDate}, token $identifier")
identifier.getIssueDate + interval
}
if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min)
}

logInfo(s"Next token renew interval is :$nextRenewalDate")
nextRenewalDate
}

Expand Down Expand Up @@ -115,13 +115,17 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration
val renewIntervals = creds.getAllTokens.asScala.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token =>
Try {
var interval = 18 * 3600L
try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}")
interval
}.toOption
} catch {
case throwable: Throwable =>
logInfo("Error for init Token", throwable)
}
List(interval)
}
if (renewIntervals.isEmpty) None else Some(renewIntervals.min)
}
Expand Down
Expand Up @@ -128,6 +128,8 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo(s"Received tokens of ${tokenBytes.length} bytes")
if (SparkEnv.get.executorId != SparkContext.DRIVER_IDENTIFIER) {
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
} else {
logInfo("Skip update tokens with driver.")
}
}

Expand Down Expand Up @@ -186,7 +188,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
userClassPath: Seq[URL]) {

Utils.initDaemon(log)

SparkHadoopUtil.get.runAsSparkUser { () =>
// Debug code
Utils.checkHost(hostname)
Expand Down

0 comments on commit 7ea7c38

Please sign in to comment.