Skip to content
Permalink
Browse files

[SPARK-23209][core] Allow credential manager to work when Hive not av…

…ailable.

The JVM seems to be doing early binding of classes that the Hive provider
depends on, causing an error to be thrown before it was caught by the code
in the class.

The fix wraps the creation of the provider in a try..catch so that
the provider can be ignored when dependencies are missing.

Added a unit test (which fails without the fix), and also tested
that getting tokens still works in a real cluster.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #20399 from vanzin/SPARK-23209.

(cherry picked from commit b834446)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
  • Loading branch information...
vanzin authored and squito committed Jan 29, 2018
1 parent 4386310 commit 75131ee867bca17ca3aade5f832f1d49b7cfcff5
@@ -64,9 +64,9 @@ private[spark] class HadoopDelegationTokenManager(
}

private def getDelegationTokenProviders: Map[String, HadoopDelegationTokenProvider] = {
val providers = List(new HadoopFSDelegationTokenProvider(fileSystems),
new HiveDelegationTokenProvider,
new HBaseDelegationTokenProvider)
val providers = Seq(new HadoopFSDelegationTokenProvider(fileSystems)) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider)

// Filter out providers for which spark.security.credentials.{service}.enabled is false.
providers
@@ -75,6 +75,17 @@ private[spark] class HadoopDelegationTokenManager(
.toMap
}

private def safeCreateProvider(
createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
try {
Some(createFn)
} catch {
case t: Throwable =>
logDebug(s"Failed to load built in provider.", t)
None
}
}

def isServiceEnabled(serviceName: String): Boolean = {
val key = providerEnabledConfig.format(serviceName)

@@ -17,6 +17,7 @@

package org.apache.spark.deploy.security

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
@@ -110,7 +111,64 @@ class HadoopDelegationTokenManagerSuite extends SparkFunSuite with Matchers {
creds.getAllTokens.size should be (0)
}

test("SPARK-23209: obtain tokens when Hive classes are not available") {
// This test needs a custom class loader to hide Hive classes which are in the classpath.
// Because the manager code loads the Hive provider directly instead of using reflection, we
// need to drive the test through the custom class loader so a new copy that cannot find
// Hive classes is loaded.
val currentLoader = Thread.currentThread().getContextClassLoader()
val noHive = new ClassLoader() {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
if (name.startsWith("org.apache.hive") || name.startsWith("org.apache.hadoop.hive")) {
throw new ClassNotFoundException(name)
}

if (name.startsWith("java") || name.startsWith("scala")) {
currentLoader.loadClass(name)
} else {
val classFileName = name.replaceAll("\\.", "/") + ".class"
val in = currentLoader.getResourceAsStream(classFileName)
if (in != null) {
val bytes = IOUtils.toByteArray(in)
defineClass(name, bytes, 0, bytes.length)
} else {
throw new ClassNotFoundException(name)
}
}
}
}

try {
Thread.currentThread().setContextClassLoader(noHive)
val test = noHive.loadClass(NoHiveTest.getClass.getName().stripSuffix("$"))
test.getMethod("runTest").invoke(null)
} finally {
Thread.currentThread().setContextClassLoader(currentLoader)
}
}

private[spark] def hadoopFSsToAccess(hadoopConf: Configuration): Set[FileSystem] = {
Set(FileSystem.get(hadoopConf))
}
}

/** Test code for SPARK-23209 to avoid using too much reflection above. */
private object NoHiveTest extends Matchers {

def runTest(): Unit = {
try {
val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration(),
_ => Set())
manager.getServiceDelegationTokenProvider("hive") should be (None)
} catch {
case e: Throwable =>
// Throw a better exception in case the test fails, since there may be a lot of nesting.
var cause = e
while (cause.getCause() != null) {
cause = cause.getCause()
}
throw cause
}
}

}

0 comments on commit 75131ee

Please sign in to comment.
You can’t perform that action at this time.