From 70f610f855c3fd2b3ea9b0a44f6015162f555f87 Mon Sep 17 00:00:00 2001 From: Hari Shreedharan Date: Fri, 20 Nov 2015 15:42:01 -0800 Subject: [PATCH] [SPARK-11662] [YARN]. In Client mode, make sure we re-login before attempting to create new delegation tokens if a new SparkContext is created within the same application. Since Hadoop gives precedence to the delegation tokens, we must make sure we login as a different user, get new tokens and replace the old ones in the current user's credentials cache to avoid not being able to get new ones. --- .../org/apache/spark/deploy/yarn/Client.scala | 69 ++++++++++++++----- 1 file changed, 51 insertions(+), 18 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ba799884f5689..cbef3a7f281cc 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -104,6 +104,7 @@ private[spark] class Client( def stop(): Unit = { launcherBackend.close() yarnClient.stop() + reusedJVM = true // Unset YARN mode system env variable, to allow switching between cluster types. System.clearProperty("SPARK_YARN_MODE") } @@ -317,14 +318,35 @@ private[spark] class Client( // and add them as local resources to the application master. val fs = FileSystem.get(hadoopConf) val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst - YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) // Used to keep track of URIs added to the distributed cache. If the same URI is added // multiple times, YARN will fail to launch containers for the app with an internal // error. val distributedUris = new HashSet[String] - obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials) - obtainTokenForHBase(sparkConf, hadoopConf, credentials) + + + def updateTokens(credentials: Credentials): Unit = { + val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst + YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) + + obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials) + obtainTokenForHBase(sparkConf, hadoopConf, credentials) + } + + // If this JVM hosted an yarn-client mode driver before, the credentials of the current user + // now has delegation tokens, which means Hadoop security code will look at that and not the + // keytab login. So we must re-login and get new tokens. + if (reusedJVM && loginFromKeytab && !isClusterMode) { + val tempUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab) + tempUGI.doAs(new PrivilegedExceptionAction[Unit] { + override def run(): Unit = { + updateTokens(tempUGI.getCredentials) + } + }) + credentials.addAll(tempUGI.getCredentials) + } else { + updateTokens(credentials) + } + val replication = sparkConf.getInt("spark.yarn.submit.file.replication", fs.getDefaultReplication(dst)).toShort @@ -558,20 +580,24 @@ private[spark] class Client( private def getTokenRenewalInterval(stagingDirPath: Path): Long = { // We cannot use the tokens generated above since those have renewer yarn. Trying to renew // those will fail with an access control issue. So create new tokens with the logged in - // user as renewer. - val creds = new Credentials() - val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath - YarnSparkHadoopUtil.get.obtainTokensForNamenodes( - nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal"))) - val t = creds.getAllTokens.asScala - .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) - .head - val newExpiration = t.renew(hadoopConf) - val identifier = new DelegationTokenIdentifier() - identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) - val interval = newExpiration - identifier.getIssueDate - logInfo(s"Renewal Interval set to $interval") - interval + // user as renewer. Also, since renewal interval does not change, let's just stash it and not + // retry unless needed. + tokenRenewalInterval.getOrElse { + val creds = new Credentials() + val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath + YarnSparkHadoopUtil.get.obtainTokensForNamenodes( + nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal"))) + val t = creds.getAllTokens.asScala + .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) + .head + val newExpiration = t.renew(hadoopConf) + val identifier = new DelegationTokenIdentifier() + identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) + val interval = newExpiration - identifier.getIssueDate + logInfo(s"Renewal Interval set to $interval") + tokenRenewalInterval = Some(interval) + interval + } } /** @@ -1097,6 +1123,13 @@ object Client extends Logging { // Subdirectory where the user's python files (not archives) will be placed. val LOCALIZED_PYTHON_DIR = "__pyfiles__" + // Is this a new Client instance that is running in the same JVM as an old one? If login is via + // keytab, then we must relogin - else token creation fails as Hadoop thinks it should use the + // tokens to talk to HDFS, and this can cause the app to fail. + @volatile var reusedJVM = false + + @volatile var tokenRenewalInterval: Option[Long] = None + /** * Find the user-defined Spark jar if configured, or return the jar containing this * class if not.