Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 51 additions & 18 deletions yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ private[spark] class Client(
def stop(): Unit = {
launcherBackend.close()
yarnClient.stop()
reusedJVM = true
// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
}
Expand Down Expand Up @@ -317,14 +318,35 @@ private[spark] class Client(
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
// Used to keep track of URIs added to the distributed cache. If the same URI is added
// multiple times, YARN will fail to launch containers for the app with an internal
// error.
val distributedUris = new HashSet[String]
obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
obtainTokenForHBase(sparkConf, hadoopConf, credentials)


def updateTokens(credentials: Credentials): Unit = {
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)

obtainTokenForHiveMetastore(sparkConf, hadoopConf, credentials)
obtainTokenForHBase(sparkConf, hadoopConf, credentials)
}

// If this JVM hosted an yarn-client mode driver before, the credentials of the current user
// now has delegation tokens, which means Hadoop security code will look at that and not the
// keytab login. So we must re-login and get new tokens.
if (reusedJVM && loginFromKeytab && !isClusterMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @harishreedharan , do you plan on updating this patch?

If yes, I'm wondering why not do this in all cases, not just when a new context is created. The same code should work in both scenarios, right?

If not, should probably close the PR.

val tempUGI = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
tempUGI.doAs(new PrivilegedExceptionAction[Unit] {
override def run(): Unit = {
updateTokens(tempUGI.getCredentials)
}
})
credentials.addAll(tempUGI.getCredentials)
} else {
updateTokens(credentials)
}


val replication = sparkConf.getInt("spark.yarn.submit.file.replication",
fs.getDefaultReplication(dst)).toShort
Expand Down Expand Up @@ -558,20 +580,24 @@ private[spark] class Client(
private def getTokenRenewalInterval(stagingDirPath: Path): Long = {
// We cannot use the tokens generated above since those have renewer yarn. Trying to renew
// those will fail with an access control issue. So create new tokens with the logged in
// user as renewer.
val creds = new Credentials()
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
val t = creds.getAllTokens.asScala
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.head
val newExpiration = t.renew(hadoopConf)
val identifier = new DelegationTokenIdentifier()
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
val interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal Interval set to $interval")
interval
// user as renewer. Also, since renewal interval does not change, let's just stash it and not
// retry unless needed.
tokenRenewalInterval.getOrElse {
val creds = new Credentials()
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + stagingDirPath
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(
nns, hadoopConf, creds, Some(sparkConf.get("spark.yarn.principal")))
val t = creds.getAllTokens.asScala
.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.head
val newExpiration = t.renew(hadoopConf)
val identifier = new DelegationTokenIdentifier()
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
val interval = newExpiration - identifier.getIssueDate
logInfo(s"Renewal Interval set to $interval")
tokenRenewalInterval = Some(interval)
interval
}
}

/**
Expand Down Expand Up @@ -1097,6 +1123,13 @@ object Client extends Logging {
// Subdirectory where the user's python files (not archives) will be placed.
val LOCALIZED_PYTHON_DIR = "__pyfiles__"

// Is this a new Client instance that is running in the same JVM as an old one? If login is via
// keytab, then we must relogin - else token creation fails as Hadoop thinks it should use the
// tokens to talk to HDFS, and this can cause the app to fail.
@volatile var reusedJVM = false

@volatile var tokenRenewalInterval: Option[Long] = None

/**
* Find the user-defined Spark jar if configured, or return the jar containing this
* class if not.
Expand Down