diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md
index c78459cd27d1..b63f69f00e13 100644
--- a/docs/streaming-kafka-0-10-integration.md
+++ b/docs/streaming-kafka-0-10-integration.md
@@ -315,3 +315,10 @@ As with any Spark applications, `spark-submit` is used to launch your applicatio
For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide).
+### Security
+
+See [Structured Streaming Security](structured-streaming-kafka-integration.html#security).
+
+##### Additional Caveats
+
+- Kafka native sink is not available so delegation token used only on consumer side.
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
index f24001f4ae3a..062ce9a57994 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala
@@ -28,6 +28,7 @@ import scala.util.control.NonFatal
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
private[kafka010] object CachedKafkaProducer extends Logging {
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
index dfdafce3c053..2326619b9d83 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
+import org.apache.spark.kafka010.KafkaConfigUpdater
+
/**
* Subscribe allows you to subscribe to a fixed collection of topics.
* SubscribePattern allows you to use a regex to specify topics of interest.
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
index a0255a1ad219..83bf4b18f0ff 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.util.UninterruptibleThread
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
deleted file mode 100644
index a11d54f992bd..000000000000
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import org.apache.hadoop.security.UserGroupInformation
-import org.apache.kafka.common.security.scram.ScramLoginModule
-
-import org.apache.spark.SparkConf
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.config._
-import org.apache.spark.kafka010.KafkaTokenUtil
-
-private[kafka010] object KafkaSecurityHelper extends Logging {
- def isTokenAvailable(): Boolean = {
- UserGroupInformation.getCurrentUser().getCredentials.getToken(
- KafkaTokenUtil.TOKEN_SERVICE) != null
- }
-
- def getTokenJaasParams(sparkConf: SparkConf): String = {
- val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
- KafkaTokenUtil.TOKEN_SERVICE)
- val username = new String(token.getIdentifier)
- val password = new String(token.getPassword)
-
- val loginModuleName = classOf[ScramLoginModule].getName
- val params =
- s"""
- |$loginModuleName required
- | tokenauth=true
- | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
- | username="$username"
- | password="$password";
- """.stripMargin.replace("\n", "")
- logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
-
- params
- }
-}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index a1395731e5cf..c7c0d352c915 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}
import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
deleted file mode 100644
index d908bbfc2c5f..000000000000
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.kafka010
-
-import org.apache.spark.{SparkConf, SparkFunSuite}
-
-class KafkaSecurityHelperSuite extends SparkFunSuite with KafkaDelegationTokenTest {
- test("isTokenAvailable without token should return false") {
- assert(!KafkaSecurityHelper.isTokenAvailable())
- }
-
- test("isTokenAvailable with token should return true") {
- addTokenToUGI()
-
- assert(KafkaSecurityHelper.isTokenAvailable())
- }
-
- test("getTokenJaasParams with token should return scram module") {
- addTokenToUGI()
-
- val jaasParams = KafkaSecurityHelper.getTokenJaasParams(new SparkConf())
-
- assert(jaasParams.contains("ScramLoginModule required"))
- assert(jaasParams.contains("tokenauth=true"))
- assert(jaasParams.contains(tokenId))
- assert(jaasParams.contains(tokenPassword))
- }
-}
diff --git a/external/kafka-0-10-token-provider/pom.xml b/external/kafka-0-10-token-provider/pom.xml
index b2abcd909958..40ef1f71e021 100644
--- a/external/kafka-0-10-token-provider/pom.xml
+++ b/external/kafka-0-10-token-provider/pom.xml
@@ -52,6 +52,11 @@
kafka-clients
${kafka.version}
+
+ org.mockito
+ mockito-core
+ test
+
org.apache.spark
spark-tags_${scala.binary.version}
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
similarity index 89%
rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
rename to external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
index 978dfe6cfbd6..d24eb4ae1952 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.kafka010
+package org.apache.spark.kafka010
import java.{util => ju}
@@ -26,12 +26,11 @@ import org.apache.kafka.common.config.SaslConfigs
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kafka
-import org.apache.spark.kafka010.KafkaTokenUtil
/**
* Class to conveniently update Kafka config params, while logging the changes
*/
-private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object])
+private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object])
extends Logging {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)
@@ -58,9 +57,9 @@ private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map
// configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
- } else if (KafkaSecurityHelper.isTokenAvailable()) {
+ } else if (KafkaTokenUtil.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
- val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
+ val jaasParams = KafkaTokenUtil.getTokenJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
index 574d58bf2f40..e5604f2b14cf 100644
--- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
+++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.JaasContext
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
+import org.apache.kafka.common.security.scram.ScramLoginModule
import org.apache.kafka.common.security.token.delegation.DelegationToken
import org.apache.spark.SparkConf
@@ -154,7 +155,7 @@ private[spark] object KafkaTokenUtil extends Logging {
}
}
- private[kafka010] def getKeytabJaasParams(sparkConf: SparkConf): String = {
+ private def getKeytabJaasParams(sparkConf: SparkConf): String = {
val params =
s"""
|${getKrb5LoginModuleName} required
@@ -167,7 +168,7 @@ private[spark] object KafkaTokenUtil extends Logging {
params
}
- def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
+ private def getTicketCacheJaasParams(sparkConf: SparkConf): String = {
val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)
require(serviceName.nonEmpty, "Kerberos service name must be defined")
@@ -208,4 +209,29 @@ private[spark] object KafkaTokenUtil extends Logging {
dateFormat.format(tokenInfo.maxTimestamp)))
}
}
+
+ def isTokenAvailable(): Boolean = {
+ UserGroupInformation.getCurrentUser().getCredentials.getToken(
+ KafkaTokenUtil.TOKEN_SERVICE) != null
+ }
+
+ def getTokenJaasParams(sparkConf: SparkConf): String = {
+ val token = UserGroupInformation.getCurrentUser().getCredentials.getToken(
+ KafkaTokenUtil.TOKEN_SERVICE)
+ val username = new String(token.getIdentifier)
+ val password = new String(token.getPassword)
+
+ val loginModuleName = classOf[ScramLoginModule].getName
+ val params =
+ s"""
+ |$loginModuleName required
+ | tokenauth=true
+ | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}"
+ | username="$username"
+ | password="$password";
+ """.stripMargin.replace("\n", "")
+ logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}")
+
+ params
+ }
}
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
similarity index 98%
rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala
rename to external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
index 25ccca3cb984..538486b9912a 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.kafka010
+package org.apache.spark.kafka010
import org.apache.kafka.common.config.SaslConfigs
diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
similarity index 97%
rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
rename to external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
index d0cefc46c56f..bd9b87374dde 100644
--- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.sql.kafka010
+package org.apache.spark.kafka010
import java.{util => ju}
import javax.security.auth.login.{AppConfigurationEntry, Configuration}
@@ -26,7 +26,6 @@ import org.mockito.Mockito.mock
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
-import org.apache.spark.kafka010.KafkaTokenUtil
import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier
/**
diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
index 5da626056ba4..0a5af1d751bc 100644
--- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
+++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala
@@ -17,20 +17,17 @@
package org.apache.spark.kafka010
-import java.{util => ju}
import java.security.PrivilegedExceptionAction
-import javax.security.auth.login.{AppConfigurationEntry, Configuration}
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL}
-import org.scalatest.BeforeAndAfterEach
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config._
-class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
+class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest {
private val bootStrapServers = "127.0.0.1:0"
private val trustStoreLocation = "/path/to/trustStore"
private val trustStorePassword = "trustStoreSecret"
@@ -42,44 +39,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
private var sparkConf: SparkConf = null
- private class KafkaJaasConfiguration extends Configuration {
- val entry =
- new AppConfigurationEntry(
- "DummyModule",
- AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
- ju.Collections.emptyMap[String, Object]()
- )
-
- override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
- if (name.equals("KafkaClient")) {
- Array(entry)
- } else {
- null
- }
- }
- }
-
override def beforeEach(): Unit = {
super.beforeEach()
sparkConf = new SparkConf()
}
- override def afterEach(): Unit = {
- try {
- resetGlobalConfig()
- } finally {
- super.afterEach()
- }
- }
-
- private def setGlobalKafkaClientConfig(): Unit = {
- Configuration.setConfiguration(new KafkaJaasConfiguration)
- }
-
- private def resetGlobalConfig(): Unit = {
- Configuration.setConfiguration(null)
- }
-
test("checkProxyUser with proxy current user should throw exception") {
val realUser = UserGroupInformation.createUserForTesting("realUser", Array())
UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs(
@@ -229,4 +193,25 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided)
}
+
+ test("isTokenAvailable without token should return false") {
+ assert(!KafkaTokenUtil.isTokenAvailable())
+ }
+
+ test("isTokenAvailable with token should return true") {
+ addTokenToUGI()
+
+ assert(KafkaTokenUtil.isTokenAvailable())
+ }
+
+ test("getTokenJaasParams with token should return scram module") {
+ addTokenToUGI()
+
+ val jaasParams = KafkaTokenUtil.getTokenJaasParams(new SparkConf())
+
+ assert(jaasParams.contains("ScramLoginModule required"))
+ assert(jaasParams.contains("tokenauth=true"))
+ assert(jaasParams.contains(tokenId))
+ assert(jaasParams.contains(tokenPassword))
+ }
}
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 333572e99b1c..f78bdace3523 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -34,6 +34,11 @@
http://spark.apache.org/
+
+ org.apache.spark
+ spark-token-provider-kafka-0-10_${scala.binary.version}
+ ${project.version}
+
org.apache.spark
spark-streaming_${scala.binary.version}
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 07960d14b0bf..3e32b592b3a3 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
import org.apache.kafka.common.TopicPartition
import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
/**
* Choice of how to create and configure underlying Kafka Consumers on driver and executors.
@@ -54,6 +55,15 @@ abstract class ConsumerStrategy[K, V] {
* checkpoint.
*/
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V]
+
+ /**
+ * Updates the parameters with security if needed.
+ * Added a function to hide internals and reduce code duplications because all strategy uses it.
+ */
+ protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, Object]) =
+ KafkaConfigUpdater("source", kafkaParams.asScala.toMap)
+ .setAuthenticationConfigIfNeeded()
+ .build()
}
/**
@@ -78,7 +88,8 @@ private case class Subscribe[K, V](
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
- val consumer = new KafkaConsumer[K, V](kafkaParams)
+ val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+ val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
consumer.subscribe(topics)
val toSeek = if (currentOffsets.isEmpty) {
offsets
@@ -134,7 +145,8 @@ private case class SubscribePattern[K, V](
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
- val consumer = new KafkaConsumer[K, V](kafkaParams)
+ val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+ val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
consumer.subscribe(pattern, new NoOpConsumerRebalanceListener())
val toSeek = if (currentOffsets.isEmpty) {
offsets
@@ -186,7 +198,8 @@ private case class Assign[K, V](
def executorKafkaParams: ju.Map[String, Object] = kafkaParams
def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
- val consumer = new KafkaConsumer[K, V](kafkaParams)
+ val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams)
+ val consumer = new KafkaConsumer[K, V](updatedKafkaParams)
consumer.assign(topicPartitions)
val toSeek = if (currentOffsets.isEmpty) {
offsets
diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
index 68c5fe9ab066..142e946188ac 100644
--- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
+++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala
@@ -19,11 +19,14 @@ package org.apache.spark.streaming.kafka010
import java.{util => ju}
+import scala.collection.JavaConverters._
+
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
+import org.apache.spark.kafka010.KafkaConfigUpdater
private[kafka010] sealed trait KafkaDataConsumer[K, V] {
/**
@@ -109,7 +112,10 @@ private[kafka010] class InternalKafkaConsumer[K, V](
/** Create a KafkaConsumer to fetch records for `topicPartition` */
private def createConsumer: KafkaConsumer[K, V] = {
- val c = new KafkaConsumer[K, V](kafkaParams)
+ val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap)
+ .setAuthenticationConfigIfNeeded()
+ .build()
+ val c = new KafkaConsumer[K, V](updatedKafkaParams)
val topics = ju.Arrays.asList(topicPartition)
c.assign(topics)
c