Skip to content

Commit

Permalink
[SPARK-26254][CORE] Extract Hive + Kafka dependencies from Core.
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

There are ugly provided dependencies inside core for the following:
* Hive
* Kafka

In this PR I've extracted them out. This PR contains the following:
* Token providers are now loaded with service loader
* Hive token provider moved to hive project
* Kafka token provider extracted into a new project

## How was this patch tested?

Existing + newly added unit tests.
Additionally tested on cluster.

Closes #23499 from gaborgsomogyi/SPARK-26254.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
  • Loading branch information
gaborgsomogyi authored and Marcelo Vanzin committed Jan 25, 2019
1 parent b484490 commit 773efed
Show file tree
Hide file tree
Showing 26 changed files with 336 additions and 155 deletions.
40 changes: 0 additions & 40 deletions core/pom.xml
Expand Up @@ -385,46 +385,6 @@
<artifactId>commons-crypto</artifactId>
</dependency>

<!--
The following dependencies are depended upon in HiveCredentialProvider, but are only executed if Hive is enabled in
the user's Hadoop configuration. So in order to prevent spark-core from depending on Hive, these deps have been
placed in the "provided" scope, rather than the "compile" scope, and NoClassDefFoundError exceptions are handled
when the user has not explicitly compiled with the Hive module.
-->
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-exec</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>${hive.group}</groupId>
<artifactId>hive-metastore</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libfb303</artifactId>
<scope>provided</scope>
</dependency>

<!--
The following kafka dependency used to obtain delegation token.
In order to prevent spark-core from depending on kafka, these deps have been placed in the
"provided" scope, rather than the "compile" scope, and NoClassDefFoundError exceptions are
handled when the user explicitly use neither spark-streaming-kafka nor spark-sql-kafka modules.
-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
@@ -0,0 +1,2 @@
org.apache.spark.deploy.security.HadoopFSDelegationTokenProvider
org.apache.spark.deploy.security.HBaseDelegationTokenProvider
Expand Up @@ -21,6 +21,7 @@ import scala.reflect.runtime.universe
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.token.{Token, TokenIdentifier}

Expand All @@ -36,6 +37,7 @@ private[security] class HBaseDelegationTokenProvider
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
val mirror = universe.runtimeMirror(Utils.getContextOrSparkClassLoader)
Expand Down
Expand Up @@ -20,8 +20,11 @@ package org.apache.spark.deploy.security
import java.io.File
import java.net.URI
import java.security.PrivilegedExceptionAction
import java.util.ServiceLoader
import java.util.concurrent.{ScheduledExecutorService, TimeUnit}

import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
Expand All @@ -33,7 +36,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens
import org.apache.spark.ui.UIUtils
import org.apache.spark.util.ThreadUtils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Manager for delegation tokens in a Spark application.
Expand Down Expand Up @@ -132,7 +135,7 @@ private[spark] class HadoopDelegationTokenManager(
def obtainDelegationTokens(creds: Credentials): Long = {
delegationTokenProviders.values.flatMap { provider =>
if (provider.delegationTokensRequired(sparkConf, hadoopConf)) {
provider.obtainDelegationTokens(hadoopConf, sparkConf, creds)
provider.obtainDelegationTokens(hadoopConf, sparkConf, fileSystemsToAccess(), creds)
} else {
logDebug(s"Service ${provider.serviceName} does not require a token." +
s" Check your configuration to see if security is disabled or not.")
Expand Down Expand Up @@ -244,29 +247,24 @@ private[spark] class HadoopDelegationTokenManager(
}

private def loadProviders(): Map[String, HadoopDelegationTokenProvider] = {
val providers = Seq(
new HadoopFSDelegationTokenProvider(
() => HadoopDelegationTokenManager.this.fileSystemsToAccess())) ++
safeCreateProvider(new HiveDelegationTokenProvider) ++
safeCreateProvider(new HBaseDelegationTokenProvider) ++
safeCreateProvider(new KafkaDelegationTokenProvider)
val loader = ServiceLoader.load(classOf[HadoopDelegationTokenProvider],
Utils.getContextOrSparkClassLoader)
val providers = mutable.ArrayBuffer[HadoopDelegationTokenProvider]()

val iterator = loader.iterator
while (iterator.hasNext) {
try {
providers += iterator.next
} catch {
case t: Throwable =>
logDebug(s"Failed to load built in provider.", t)
}
}

// Filter out providers for which spark.security.credentials.{service}.enabled is false.
providers
.filter { p => isServiceEnabled(p.serviceName) }
.map { p => (p.serviceName, p) }
.toMap
}

private def safeCreateProvider(
createFn: => HadoopDelegationTokenProvider): Option[HadoopDelegationTokenProvider] = {
try {
Some(createFn)
} catch {
case t: Throwable =>
logDebug(s"Failed to load built in provider.", t)
None
}
}

}
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.security

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials

import org.apache.spark.SparkConf
Expand All @@ -43,11 +44,13 @@ private[spark] trait HadoopDelegationTokenProvider {
* Obtain delegation tokens for this service and get the time of the next renewal.
* @param hadoopConf Configuration of current Hadoop Compatible system.
* @param creds Credentials to add tokens and security keys to.
* @param fileSystems List of file systems for which to obtain delegation tokens.
* @return If the returned tokens are renewable and can be renewed, return the time of the next
* renewal, otherwise None should be returned.
*/
def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long]
}
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._

private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[FileSystem])
private[deploy] class HadoopFSDelegationTokenProvider
extends HadoopDelegationTokenProvider with Logging {

// This tokenRenewalInterval will be set in the first call to obtainDelegationTokens.
Expand All @@ -44,14 +44,14 @@ private[deploy] class HadoopFSDelegationTokenProvider(fileSystems: () => Set[Fil
override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = {
try {
val fsToGetTokens = fileSystems()
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fsToGetTokens, creds)
val fetchCreds = fetchDelegationTokens(getTokenRenewer(hadoopConf), fileSystems, creds)

// Get the token renewal interval if it is not set. It will only be called once.
if (tokenRenewalInterval == null) {
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fsToGetTokens)
tokenRenewalInterval = getTokenRenewalInterval(hadoopConf, sparkConf, fileSystems)
}

// Get the time of next renewal.
Expand Down
@@ -0,0 +1 @@
org.apache.spark.deploy.security.ExceptionThrowingDelegationTokenProvider
Expand Up @@ -17,99 +17,57 @@

package org.apache.spark.deploy.security

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.security.Credentials

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.Utils

private class ExceptionThrowingDelegationTokenProvider extends HadoopDelegationTokenProvider {
ExceptionThrowingDelegationTokenProvider.constructed = true
throw new IllegalArgumentException

override def serviceName: String = "throw"

override def delegationTokensRequired(
sparkConf: SparkConf,
hadoopConf: Configuration): Boolean = throw new IllegalArgumentException

override def obtainDelegationTokens(
hadoopConf: Configuration,
sparkConf: SparkConf,
fileSystems: Set[FileSystem],
creds: Credentials): Option[Long] = throw new IllegalArgumentException
}

private object ExceptionThrowingDelegationTokenProvider {
var constructed = false
}

class HadoopDelegationTokenManagerSuite extends SparkFunSuite {
private val hadoopConf = new Configuration()

test("default configuration") {
ExceptionThrowingDelegationTokenProvider.constructed = false
val manager = new HadoopDelegationTokenManager(new SparkConf(false), hadoopConf, null)
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(manager.isProviderLoaded("hive"))
assert(manager.isProviderLoaded("kafka"))
// This checks that providers are loaded independently and they have no effect on each other
assert(ExceptionThrowingDelegationTokenProvider.constructed)
assert(!manager.isProviderLoaded("throw"))
}

test("disable hive credential provider") {
val sparkConf = new SparkConf(false).set("spark.security.credentials.hive.enabled", "false")
test("disable hadoopfs credential provider") {
val sparkConf = new SparkConf(false).set("spark.security.credentials.hadoopfs.enabled", "false")
val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null)
assert(manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
assert(manager.isProviderLoaded("kafka"))
assert(!manager.isProviderLoaded("hadoopfs"))
}

test("using deprecated configurations") {
val sparkConf = new SparkConf(false)
.set("spark.yarn.security.tokens.hadoopfs.enabled", "false")
.set("spark.yarn.security.credentials.hive.enabled", "false")
val manager = new HadoopDelegationTokenManager(sparkConf, hadoopConf, null)
assert(!manager.isProviderLoaded("hadoopfs"))
assert(manager.isProviderLoaded("hbase"))
assert(!manager.isProviderLoaded("hive"))
assert(manager.isProviderLoaded("kafka"))
}

test("SPARK-23209: obtain tokens when Hive classes are not available") {
// This test needs a custom class loader to hide Hive classes which are in the classpath.
// Because the manager code loads the Hive provider directly instead of using reflection, we
// need to drive the test through the custom class loader so a new copy that cannot find
// Hive classes is loaded.
val currentLoader = Thread.currentThread().getContextClassLoader()
val noHive = new ClassLoader() {
override def loadClass(name: String, resolve: Boolean): Class[_] = {
if (name.startsWith("org.apache.hive") || name.startsWith("org.apache.hadoop.hive")) {
throw new ClassNotFoundException(name)
}

val prefixBlacklist = Seq("java", "scala", "com.sun.", "sun.")
if (prefixBlacklist.exists(name.startsWith(_))) {
return currentLoader.loadClass(name)
}

val found = findLoadedClass(name)
if (found != null) {
return found
}

val classFileName = name.replaceAll("\\.", "/") + ".class"
val in = currentLoader.getResourceAsStream(classFileName)
if (in != null) {
val bytes = IOUtils.toByteArray(in)
return defineClass(name, bytes, 0, bytes.length)
}

throw new ClassNotFoundException(name)
}
}

Utils.withContextClassLoader(noHive) {
val test = noHive.loadClass(NoHiveTest.getClass.getName().stripSuffix("$"))
test.getMethod("runTest").invoke(null)
}
}
}

/** Test code for SPARK-23209 to avoid using too much reflection above. */
private object NoHiveTest {

def runTest(): Unit = {
try {
val manager = new HadoopDelegationTokenManager(new SparkConf(), new Configuration(), null)
require(!manager.isProviderLoaded("hive"))
} catch {
case e: Throwable =>
// Throw a better exception in case the test fails, since there may be a lot of nesting.
var cause = e
while (cause.getCause() != null) {
cause = cause.getCause()
}
throw cause
}
}

}
5 changes: 5 additions & 0 deletions external/kafka-0-10-sql/pom.xml
Expand Up @@ -35,6 +35,11 @@
<url>http://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-token-provider-kafka-0-10_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
Expand Down
Expand Up @@ -24,9 +24,9 @@ import scala.collection.JavaConverters._
import org.apache.kafka.common.config.SaslConfigs

import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kafka
import org.apache.spark.kafka010.KafkaTokenUtil

/**
* Class to conveniently update Kafka config params, while logging the changes
Expand Down
Expand Up @@ -21,9 +21,9 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.common.security.scram.ScramLoginModule

import org.apache.spark.SparkConf
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.kafka010.KafkaTokenUtil

private[kafka010] object KafkaSecurityHelper extends Logging {
def isTokenAvailable(): Boolean = {
Expand Down
Expand Up @@ -26,8 +26,8 @@ import org.mockito.Mockito.mock
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
import org.apache.spark.kafka010.KafkaTokenUtil
import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier

/**
* This is a trait which provides functionalities for Kafka delegation token related test suites.
Expand Down

0 comments on commit 773efed

Please sign in to comment.