Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/streaming-kafka-0-10-integration.md
Original file line number Diff line number Diff line change
Expand Up @@ -315,3 +315,10 @@ As with any Spark applications, `spark-submit` is used to launch your applicatio

For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).

### Security

See [Structured Streaming Security](structured-streaming-kafka-integration.html#security).

##### Additional Caveats

- Kafka native sink is not available so delegation token used only on consumer side.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.util.control.NonFatal

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater

private[kafka010] object CachedKafkaProducer extends Logging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition

import org.apache.spark.kafka010.KafkaConfigUpdater

/**
* Subscribe allows you to subscribe to a fixed collection of topics.
* SubscribePattern allows you to use a regex to specify topics of interest.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition

import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}

import org.apache.spark.internal.Logging
import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
Expand Down

This file was deleted.

5 changes: 5 additions & 0 deletions external/kafka-0-10-token-provider/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-tags_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.kafka010
package org.apache.spark.kafka010

import java.{util => ju}

Expand All @@ -26,12 +26,11 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kafka
import org.apache.spark.kafka010.KafkaTokenUtil

/**
* Class to conveniently update Kafka config params, while logging the changes
*/
private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object])
private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object])
extends Logging {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)

Expand All @@ -58,9 +57,9 @@ private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map
// configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else if (KafkaSecurityHelper.isTokenAvailable()) {
} else if (KafkaTokenUtil.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
val jaasParams = KafkaTokenUtil.getTokenJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
import org.apache.kafka.common.security.scram.ScramLoginModule
import org.apache.kafka.common.security.token.delegation.DelegationToken

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -154,7 +155,7 @@ private[spark] object KafkaTokenUtil extends Logging {
}
}

private[kafka010] def getKeytabJaasParams(sparkConf: SparkConf): String = {
private def getKeytabJaasParams(sparkConf: SparkConf): String = {
val params =
s"""
|${getKrb5LoginModuleName} required
Expand All @@ -167,7 +168,7 @@ private[spark] object KafkaTokenUtil extends Logging {
params
}

def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
private def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
require(serviceName.nonEmpty, "Kerberos service name must be defined")

Expand Down Expand Up @@ -208,4 +209,29 @@ private[spark] object KafkaTokenUtil extends Logging {
dateFormat.format(tokenInfo.maxTimestamp)))
}
}

def isTokenAvailable(): Boolean = {
UserGroupInformation.getCurrentUser().getCredentials.getToken(
KafkaTokenUtil.TOKEN_SERVICE) != null
}

def getTokenJaasParams(sparkConf: SparkConf): String = {
val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
KafkaTokenUtil.TOKEN_SERVICE)
val username = new String(token.getIdentifier)
val password = new String(token.getPassword)

val loginModuleName = classOf[ScramLoginModule].getName
val params =
s"""
|$loginModuleName required
| tokenauth=true
| serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
| username="$username"
| password="$password";
""".stripMargin.replace("\n", "")
logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")

params
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.kafka010
package org.apache.spark.kafka010

import org.apache.kafka.common.config.SaslConfigs

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.kafka010
package org.apache.spark.kafka010

import java.{util => ju}
import javax.security.auth.login.{AppConfigurationEntry, Configuration}
Expand All @@ -26,7 +26,6 @@ import org.mockito.Mockito.mock
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.kafka010.KafkaTokenUtil
import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@

package org.apache.spark.kafka010

import java.{util => ju}
import java.security.PrivilegedExceptionAction
import javax.security.auth.login.{AppConfigurationEntry, Configuration}

import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._

class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
private val bootStrapServers = "127.0.0.1:0"
private val trustStoreLocation = "/path/to/trustStore"
private val trustStorePassword = "trustStoreSecret"
Expand All @@ -42,44 +39,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {

private var sparkConf: SparkConf = null

private class KafkaJaasConfiguration extends Configuration {
val entry =
new AppConfigurationEntry(
"DummyModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
ju.Collections.emptyMap[String, Object]()
)

override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
if (name.equals("KafkaClient")) {
Array(entry)
} else {
null
}
}
}

override def beforeEach(): Unit = {
super.beforeEach()
sparkConf = new SparkConf()
}

override def afterEach(): Unit = {
try {
resetGlobalConfig()
} finally {
super.afterEach()
}
}

private def setGlobalKafkaClientConfig(): Unit = {
Configuration.setConfiguration(new KafkaJaasConfiguration)
}

private def resetGlobalConfig(): Unit = {
Configuration.setConfiguration(null)
}

test("checkProxyUser with proxy current user should throw exception") {
val realUser = UserGroupInformation.createUserForTesting("realUser", Array())
UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs(
Expand Down Expand Up @@ -229,4 +193,25 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {

assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided)
}

test("isTokenAvailable without token should return false") {
assert(!KafkaTokenUtil.isTokenAvailable())
}

test("isTokenAvailable with token should return true") {
addTokenToUGI()

assert(KafkaTokenUtil.isTokenAvailable())
}

test("getTokenJaasParams with token should return scram module") {
addTokenToUGI()

val jaasParams = KafkaTokenUtil.getTokenJaasParams(new SparkConf())

assert(jaasParams.contains("ScramLoginModule required"))
assert(jaasParams.contains("tokenauth=true"))
assert(jaasParams.contains(tokenId))
assert(jaasParams.contains(tokenPassword))
}
}
5 changes: 5 additions & 0 deletions external/kafka-0-10/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<url>http://spark.apache.org/</url>

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-token-provider-kafka-0-10_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
Expand Down
Loading