diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72123f2232532..02de13e6bffc5 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -25,6 +25,7 @@ import scala.collection.mutable import scala.util.Properties import com.google.common.collect.MapMaker +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory @@ -141,6 +142,17 @@ object SparkEnv extends Logging { private[spark] val driverSystemName = "sparkDriver" private[spark] val executorSystemName = "sparkExecutor" + var cachedUGI: UserGroupInformation = _ + + def setUGI(ugi: UserGroupInformation): Unit = { + cachedUGI = ugi + logInfo(s"Set UGI $ugi") + } + + def getUGI(): UserGroupInformation = { + cachedUGI + } + def set(e: SparkEnv) { env = e } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 4cc0063d010ef..a585208a6c4b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -36,7 +36,7 @@ import org.apache.hadoop.security.{Credentials, UserGroupInformation} import org.apache.hadoop.security.token.{Token, TokenIdentifier} import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.{SparkConf, SparkEnv, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ @@ -61,7 +61,9 @@ class SparkHadoopUtil extends Logging { * do a FileSystem.closeAllForUGI in order to avoid leaking Filesystems */ def runAsSparkUser(func: () => Unit) { - createSparkUser().doAs(new PrivilegedExceptionAction[Unit] { + val ugi = createSparkUser() + SparkEnv.setUGI(ugi) + ugi.doAs(new PrivilegedExceptionAction[Unit] { def run: Unit = func() }) } @@ -147,8 +149,12 @@ class SparkHadoopUtil extends Logging { UserGroupInformation.setConfiguration(newConfiguration(sparkConf)) val creds = deserialize(tokens) logInfo("Updating delegation tokens for current user.") - logDebug(s"Adding/updating delegation tokens ${dumpTokens(creds)}") + logInfo(s"Adding/updating delegation tokens ${dumpTokens(creds)}") addCurrentUserCredentials(creds) + val user = UserGroupInformation.getCurrentUser + logInfo(s"Spark user hashcode : ${user.hashCode()}") + user.getTokens + .asScala.map(tokenToString).foreach(token => logInfo(token)) } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala index 21ca669ea98f0..4af4ecc8aafea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/security/HadoopFSDelegationTokenProvider.scala @@ -18,7 +18,6 @@ package org.apache.spark.deploy.security import scala.collection.JavaConverters._ -import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem @@ -52,7 +51,7 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration if (tokenRenewalInterval == null) { tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens) } - + logInfo(s"Token renew interval is :$tokenRenewalInterval") // Get the time of next renewal. val nextRenewalDate = tokenRenewalInterval.flatMap { interval => val nextRenewalDates = fetchCreds.getAllTokens.asScala @@ -61,11 +60,12 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration val identifier = token .decodeIdentifier() .asInstanceOf[AbstractDelegationTokenIdentifier] + logInfo(s"Current token renew interval is :${identifier.getIssueDate}, token $identifier") identifier.getIssueDate + interval } if (nextRenewalDates.isEmpty) None else Some(nextRenewalDates.min) } - + logInfo(s"Next token renew interval is :$nextRenewalDate") nextRenewalDate } @@ -115,13 +115,17 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: Configuration val renewIntervals = creds.getAllTokens.asScala.filter { _.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier] }.flatMap { token => - Try { + var interval = 18 * 3600L + try { val newExpiration = token.renew(hadoopConf) val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier] - val interval = newExpiration - identifier.getIssueDate + interval = newExpiration - identifier.getIssueDate logInfo(s"Renewal interval is $interval for token ${token.getKind.toString}") - interval - }.toOption + } catch { + case throwable: Throwable => + logInfo("Error for init Token", throwable) + } + List(interval) } if (renewIntervals.isEmpty) None else Some(renewIntervals.min) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4afc63254ceba..907bc014af29a 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -128,6 +128,8 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo(s"Received tokens of ${tokenBytes.length} bytes") if (SparkEnv.get.executorId != SparkContext.DRIVER_IDENTIFIER) { SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) + } else { + logInfo("Skip update tokens with driver.") } } @@ -186,7 +188,6 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { userClassPath: Seq[URL]) { Utils.initDaemon(log) - SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname)