Skip to content

Commit

Permalink
Change the code according to comments
Browse files Browse the repository at this point in the history
Change-Id: I05b9271d44c196d72865aaf01570b65abcf7b9ce
  • Loading branch information
jerryshao committed Jan 5, 2017
1 parent 0c1ec23 commit 131d420
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 24 deletions.
10 changes: 5 additions & 5 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -479,12 +479,12 @@ Hadoop services issue *hadoop tokens* to grant access to the services and data.
Clients must first acquire tokens for the services they will access and pass them along with their
application as it is launched in the YARN cluster.

For a Spark application to interact with HDFS, HBase and Hive, it must acquire the relevant tokens
For a Spark application to interact with Hadoop filesystem, HBase and Hive, it must acquire the relevant tokens
using the Kerberos credentials of the user launching the application
—that is, the principal whose identity will become that of the launched Spark application.

This is normally done at launch time: in a secure cluster Spark will automatically obtain a
token for the cluster's HDFS filesystem, and potentially for HBase and Hive.
token for the cluster's Hadoop filesystem, and potentially for HBase and Hive.

An HBase token will be obtained if HBase is in on classpath, the HBase configuration declares
the application is secure (i.e. `hbase-site.xml` sets `hbase.security.authentication` to `kerberos`),
Expand All @@ -494,7 +494,7 @@ Similarly, a Hive token will be obtained if Hive is on the classpath, its config
includes a URI of the metadata store in `"hive.metastore.uris`, and
`spark.yarn.security.credentials.hive.enabled` is not set to `false`.

If an application needs to interact with other secure HDFS clusters, then
If an application needs to interact with other secure Hadoop filesystem clusters, then
the tokens needed to access these clusters must be explicitly requested at
launch time. This is done by listing them in the `spark.yarn.access.namenodes` property.

Expand Down Expand Up @@ -558,8 +558,8 @@ For Spark applications, the Oozie workflow must be set up for Oozie to request a
the application needs, including:

- The YARN resource manager.
- The local HDFS filesystem.
- Any remote HDFS filesystems used as a source or destination of I/O.
- The local Hadoop filesystem.
- Any remote Hadoop filesystems used as a source or destination of I/O.
- Hive —if used.
- HBase —if used.
- The YARN timeline server, if the application interacts with this.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
org.apache.spark.deploy.yarn.security.FileSystemCredentialProvider
org.apache.spark.deploy.yarn.security.HadoopFSCredentialProvider
org.apache.spark.deploy.yarn.security.HBaseCredentialProvider
org.apache.spark.deploy.yarn.security.HiveCredentialProvider
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils
/**
* A ConfigurableCredentialManager to manage all the registered credential providers and offer
* APIs for other modules to obtain credentials as well as renewal time. By default
* [[FileSystemCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
* [[HadoopFSCredentialProvider]], [[HiveCredentialProvider]] and [[HBaseCredentialProvider]] will
* be loaded in if not explicitly disabled, any plugged-in credential provider wants to be
* managed by ConfigurableCredentialManager needs to implement [[ServiceCredentialProvider]]
* interface and put into resources/META-INF/services to be loaded by ServiceLoader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._

private[security] class FileSystemCredentialProvider
private[security] class HadoopFSCredentialProvider
extends ServiceCredentialProvider with Logging {
// Token renewal interval, this value will be set in the first call,
// if None means no token renewer specified, so cannot get token renewal interval.
// if None means no token renewer specified or no token can be renewed,
// so cannot get token renewal interval.
private var tokenRenewalInterval: Option[Long] = null

override val serviceName: String = "fs"
override val serviceName: String = "hadoopfs"

override def obtainCredentials(
hadoopConf: Configuration,
Expand Down Expand Up @@ -81,7 +82,8 @@ private[security] class FileSystemCredentialProvider

val renewIntervals = creds.getAllTokens.asScala.filter {
_.decodeIdentifier().isInstanceOf[AbstractDelegationTokenIdentifier]
}.flatMap { token => Try {
}.flatMap { token =>
Try {
val newExpiration = token.renew(hadoopConf)
val identifier = token.decodeIdentifier().asInstanceOf[AbstractDelegationTokenIdentifier]
val interval = newExpiration - identifier.getIssueDate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
test("Correctly load default credential providers") {
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)

credentialManager.getServiceCredentialProvider("fs") should not be (None)
credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None)
credentialManager.getServiceCredentialProvider("hive") should not be (None)
}
Expand All @@ -57,17 +57,17 @@ class ConfigurableCredentialManagerSuite extends SparkFunSuite with Matchers wit
sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)

credentialManager.getServiceCredentialProvider("fs") should not be (None)
credentialManager.getServiceCredentialProvider("hadoopfs") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None)
credentialManager.getServiceCredentialProvider("hive") should be (None)
}

test("using deprecated configurations") {
sparkConf.set("spark.yarn.security.tokens.fs.enabled", "false")
sparkConf.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
sparkConf.set("spark.yarn.security.tokens.hive.enabled", "false")
credentialManager = new ConfigurableCredentialManager(sparkConf, hadoopConf)

credentialManager.getServiceCredentialProvider("fs") should be (None)
credentialManager.getServiceCredentialProvider("hadoopfs") should be (None)
credentialManager.getServiceCredentialProvider("hive") should be (None)
credentialManager.getServiceCredentialProvider("test") should not be (None)
credentialManager.getServiceCredentialProvider("hbase") should not be (None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,30 @@ import org.scalatest.{Matchers, PrivateMethodTester}

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}

class FileSystemCredentialProviderSuite
class HadoopFSCredentialProviderSuite
extends SparkFunSuite
with PrivateMethodTester
with Matchers {
private val _getTokenRenewer = PrivateMethod[String]('getTokenRenewer)

private def getTokenRenewer(
fsCredentialProvider: FileSystemCredentialProvider, conf: Configuration): String = {
fsCredentialProvider: HadoopFSCredentialProvider, conf: Configuration): String = {
fsCredentialProvider invokePrivate _getTokenRenewer(conf)
}

private var fsCredentialProvider: FileSystemCredentialProvider = null
private var hadoopFsCredentialProvider: HadoopFSCredentialProvider = null

override def beforeAll() {
super.beforeAll()

if (fsCredentialProvider == null) {
fsCredentialProvider = new FileSystemCredentialProvider()
if (hadoopFsCredentialProvider == null) {
hadoopFsCredentialProvider = new HadoopFSCredentialProvider()
}
}

override def afterAll() {
if (fsCredentialProvider != null) {
fsCredentialProvider = null
if (hadoopFsCredentialProvider != null) {
hadoopFsCredentialProvider = null
}

super.afterAll()
Expand All @@ -56,15 +56,15 @@ class FileSystemCredentialProviderSuite
val hadoopConf = new Configuration()
hadoopConf.set("yarn.resourcemanager.address", "myrm:8033")
hadoopConf.set("yarn.resourcemanager.principal", "yarn/myrm:8032@SPARKTEST.COM")
val renewer = getTokenRenewer(fsCredentialProvider, hadoopConf)
val renewer = getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
renewer should be ("yarn/myrm:8032@SPARKTEST.COM")
}

test("check token renewer default") {
val hadoopConf = new Configuration()
val caught =
intercept[SparkException] {
getTokenRenewer(fsCredentialProvider, hadoopConf)
getTokenRenewer(hadoopFsCredentialProvider, hadoopConf)
}
assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer")
}
Expand Down

0 comments on commit 131d420

Please sign in to comment.