From 647e40fec8354cabf1ad26383d7237e8812d11ba Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 19 Feb 2019 14:44:21 +0100 Subject: [PATCH 1/3] [SPARK-27022][DSTREAMS] Add kafka delegation token support. --- .../sql/kafka010/CachedKafkaProducer.scala | 1 + .../spark/sql/kafka010/ConsumerStrategy.scala | 2 + .../sql/kafka010/KafkaDataConsumer.scala | 1 + .../sql/kafka010/KafkaSecurityHelper.scala | 53 ----------------- .../sql/kafka010/KafkaSourceProvider.scala | 1 + .../kafka010/KafkaSecurityHelperSuite.scala | 43 -------------- external/kafka-0-10-token-provider/pom.xml | 5 ++ .../spark}/kafka010/KafkaConfigUpdater.scala | 9 ++- .../spark/kafka010/KafkaTokenUtil.scala | 30 +++++++++- .../kafka010/KafkaConfigUpdaterSuite.scala | 2 +- .../kafka010/KafkaDelegationTokenTest.scala | 3 +- .../spark/kafka010/KafkaTokenUtilSuite.scala | 59 +++++++------------ external/kafka-0-10/pom.xml | 5 ++ .../streaming/kafka010/ConsumerStrategy.scala | 19 +++++- .../kafka010/KafkaDataConsumer.scala | 8 ++- 15 files changed, 94 insertions(+), 147 deletions(-) delete mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala delete mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala rename external/{kafka-0-10-sql/src/main/scala/org/apache/spark/sql => kafka-0-10-token-provider/src/main/scala/org/apache/spark}/kafka010/KafkaConfigUpdater.scala (89%) rename external/{kafka-0-10-sql/src/test/scala/org/apache/spark/sql => kafka-0-10-token-provider/src/test/scala/org/apache/spark}/kafka010/KafkaConfigUpdaterSuite.scala (98%) rename external/{kafka-0-10-sql/src/test/scala/org/apache/spark/sql => kafka-0-10-token-provider/src/test/scala/org/apache/spark}/kafka010/KafkaDelegationTokenTest.scala (97%) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala index f24001f4ae3a..062ce9a57994 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaProducer.scala @@ -28,6 +28,7 @@ import scala.util.control.NonFatal import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater private[kafka010] object CachedKafkaProducer extends Logging { diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala index dfdafce3c053..2326619b9d83 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala @@ -25,6 +25,8 @@ import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer} import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition +import org.apache.spark.kafka010.KafkaConfigUpdater + /** * Subscribe allows you to subscribe to a fixed collection of topics. * SubscribePattern allows you to use a regex to specify topics of interest. diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala index a0255a1ad219..83bf4b18f0ff 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition import org.apache.spark.{SparkEnv, SparkException, TaskContext} import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.util.UninterruptibleThread diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala deleted file mode 100644 index a11d54f992bd..000000000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelper.scala +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import org.apache.hadoop.security.UserGroupInformation -import org.apache.kafka.common.security.scram.ScramLoginModule - -import org.apache.spark.SparkConf -import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ -import org.apache.spark.kafka010.KafkaTokenUtil - -private[kafka010] object KafkaSecurityHelper extends Logging { - def isTokenAvailable(): Boolean = { - UserGroupInformation.getCurrentUser().getCredentials.getToken( - KafkaTokenUtil.TOKEN_SERVICE) != null - } - - def getTokenJaasParams(sparkConf: SparkConf): String = { - val token = UserGroupInformation.getCurrentUser().getCredentials.getToken( - KafkaTokenUtil.TOKEN_SERVICE) - val username = new String(token.getIdentifier) - val password = new String(token.getPassword) - - val loginModuleName = classOf[ScramLoginModule].getName - val params = - s""" - |$loginModuleName required - | tokenauth=true - | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}" - | username="$username" - | password="$password"; - """.stripMargin.replace("\n", "") - logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}") - - params - } -} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index a1395731e5cf..c7c0d352c915 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -27,6 +27,7 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala deleted file mode 100644 index d908bbfc2c5f..000000000000 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSecurityHelperSuite.scala +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import org.apache.spark.{SparkConf, SparkFunSuite} - -class KafkaSecurityHelperSuite extends SparkFunSuite with KafkaDelegationTokenTest { - test("isTokenAvailable without token should return false") { - assert(!KafkaSecurityHelper.isTokenAvailable()) - } - - test("isTokenAvailable with token should return true") { - addTokenToUGI() - - assert(KafkaSecurityHelper.isTokenAvailable()) - } - - test("getTokenJaasParams with token should return scram module") { - addTokenToUGI() - - val jaasParams = KafkaSecurityHelper.getTokenJaasParams(new SparkConf()) - - assert(jaasParams.contains("ScramLoginModule required")) - assert(jaasParams.contains("tokenauth=true")) - assert(jaasParams.contains(tokenId)) - assert(jaasParams.contains(tokenPassword)) - } -} diff --git a/external/kafka-0-10-token-provider/pom.xml b/external/kafka-0-10-token-provider/pom.xml index b2abcd909958..40ef1f71e021 100644 --- a/external/kafka-0-10-token-provider/pom.xml +++ b/external/kafka-0-10-token-provider/pom.xml @@ -52,6 +52,11 @@ kafka-clients ${kafka.version} + + org.mockito + mockito-core + test + org.apache.spark spark-tags_${scala.binary.version} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala similarity index 89% rename from external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala rename to external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala index 978dfe6cfbd6..d24eb4ae1952 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaConfigUpdater.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.kafka010 import java.{util => ju} @@ -26,12 +26,11 @@ import org.apache.kafka.common.config.SaslConfigs import org.apache.spark.SparkEnv import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Kafka -import org.apache.spark.kafka010.KafkaTokenUtil /** * Class to conveniently update Kafka config params, while logging the changes */ -private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) +private[spark] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, Object]) extends Logging { private val map = new ju.HashMap[String, Object](kafkaParams.asJava) @@ -58,9 +57,9 @@ private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map // configuration. if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) { logDebug("JVM global security configuration detected, using it for login.") - } else if (KafkaSecurityHelper.isTokenAvailable()) { + } else if (KafkaTokenUtil.isTokenAvailable()) { logDebug("Delegation token detected, using it for login.") - val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) + val jaasParams = KafkaTokenUtil.getTokenJaasParams(SparkEnv.get.conf) set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM) require(mechanism.startsWith("SCRAM"), diff --git a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala index 574d58bf2f40..e5604f2b14cf 100644 --- a/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala +++ b/external/kafka-0-10-token-provider/src/main/scala/org/apache/spark/kafka010/KafkaTokenUtil.scala @@ -31,6 +31,7 @@ import org.apache.kafka.clients.admin.{AdminClient, CreateDelegationTokenOptions import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.JaasContext import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} +import org.apache.kafka.common.security.scram.ScramLoginModule import org.apache.kafka.common.security.token.delegation.DelegationToken import org.apache.spark.SparkConf @@ -154,7 +155,7 @@ private[spark] object KafkaTokenUtil extends Logging { } } - private[kafka010] def getKeytabJaasParams(sparkConf: SparkConf): String = { + private def getKeytabJaasParams(sparkConf: SparkConf): String = { val params = s""" |${getKrb5LoginModuleName} required @@ -167,7 +168,7 @@ private[spark] object KafkaTokenUtil extends Logging { params } - def getTicketCacheJaasParams(sparkConf: SparkConf): String = { + private def getTicketCacheJaasParams(sparkConf: SparkConf): String = { val serviceName = sparkConf.get(Kafka.KERBEROS_SERVICE_NAME) require(serviceName.nonEmpty, "Kerberos service name must be defined") @@ -208,4 +209,29 @@ private[spark] object KafkaTokenUtil extends Logging { dateFormat.format(tokenInfo.maxTimestamp))) } } + + def isTokenAvailable(): Boolean = { + UserGroupInformation.getCurrentUser().getCredentials.getToken( + KafkaTokenUtil.TOKEN_SERVICE) != null + } + + def getTokenJaasParams(sparkConf: SparkConf): String = { + val token = UserGroupInformation.getCurrentUser().getCredentials.getToken( + KafkaTokenUtil.TOKEN_SERVICE) + val username = new String(token.getIdentifier) + val password = new String(token.getPassword) + + val loginModuleName = classOf[ScramLoginModule].getName + val params = + s""" + |$loginModuleName required + | tokenauth=true + | serviceName="${sparkConf.get(Kafka.KERBEROS_SERVICE_NAME)}" + | username="$username" + | password="$password"; + """.stripMargin.replace("\n", "") + logDebug(s"Scram JAAS params: ${params.replaceAll("password=\".*\"", "password=\"[hidden]\"")}") + + params + } } diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala similarity index 98% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala rename to external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala index 25ccca3cb984..538486b9912a 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaConfigUpdaterSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.kafka010 import org.apache.kafka.common.config.SaslConfigs diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala similarity index 97% rename from external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala rename to external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala index d0cefc46c56f..bd9b87374dde 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaDelegationTokenTest.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaDelegationTokenTest.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.kafka010 +package org.apache.spark.kafka010 import java.{util => ju} import javax.security.auth.login.{AppConfigurationEntry, Configuration} @@ -26,7 +26,6 @@ import org.mockito.Mockito.mock import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} -import org.apache.spark.kafka010.KafkaTokenUtil import org.apache.spark.kafka010.KafkaTokenUtil.KafkaDelegationTokenIdentifier /** diff --git a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala index 5da626056ba4..0a5af1d751bc 100644 --- a/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala +++ b/external/kafka-0-10-token-provider/src/test/scala/org/apache/spark/kafka010/KafkaTokenUtilSuite.scala @@ -17,20 +17,17 @@ package org.apache.spark.kafka010 -import java.{util => ju} import java.security.PrivilegedExceptionAction -import javax.security.auth.login.{AppConfigurationEntry, Configuration} import org.apache.hadoop.security.UserGroupInformation import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.security.auth.SecurityProtocol.{SASL_PLAINTEXT, SASL_SSL, SSL} -import org.scalatest.BeforeAndAfterEach import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.internal.config._ -class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { +class KafkaTokenUtilSuite extends SparkFunSuite with KafkaDelegationTokenTest { private val bootStrapServers = "127.0.0.1:0" private val trustStoreLocation = "/path/to/trustStore" private val trustStorePassword = "trustStoreSecret" @@ -42,44 +39,11 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { private var sparkConf: SparkConf = null - private class KafkaJaasConfiguration extends Configuration { - val entry = - new AppConfigurationEntry( - "DummyModule", - AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, - ju.Collections.emptyMap[String, Object]() - ) - - override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = { - if (name.equals("KafkaClient")) { - Array(entry) - } else { - null - } - } - } - override def beforeEach(): Unit = { super.beforeEach() sparkConf = new SparkConf() } - override def afterEach(): Unit = { - try { - resetGlobalConfig() - } finally { - super.afterEach() - } - } - - private def setGlobalKafkaClientConfig(): Unit = { - Configuration.setConfiguration(new KafkaJaasConfiguration) - } - - private def resetGlobalConfig(): Unit = { - Configuration.setConfiguration(null) - } - test("checkProxyUser with proxy current user should throw exception") { val realUser = UserGroupInformation.createUserForTesting("realUser", Array()) UserGroupInformation.createProxyUserForTesting("proxyUser", realUser, Array()).doAs( @@ -229,4 +193,25 @@ class KafkaTokenUtilSuite extends SparkFunSuite with BeforeAndAfterEach { assert(KafkaTokenUtil.isGlobalJaasConfigurationProvided) } + + test("isTokenAvailable without token should return false") { + assert(!KafkaTokenUtil.isTokenAvailable()) + } + + test("isTokenAvailable with token should return true") { + addTokenToUGI() + + assert(KafkaTokenUtil.isTokenAvailable()) + } + + test("getTokenJaasParams with token should return scram module") { + addTokenToUGI() + + val jaasParams = KafkaTokenUtil.getTokenJaasParams(new SparkConf()) + + assert(jaasParams.contains("ScramLoginModule required")) + assert(jaasParams.contains("tokenauth=true")) + assert(jaasParams.contains(tokenId)) + assert(jaasParams.contains(tokenPassword)) + } } diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 333572e99b1c..f78bdace3523 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -34,6 +34,11 @@ http://spark.apache.org/ + + org.apache.spark + spark-token-provider-kafka-0-10_${scala.binary.version} + ${project.version} + org.apache.spark spark-streaming_${scala.binary.version} diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 07960d14b0bf..3e32b592b3a3 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -27,6 +27,7 @@ import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.common.TopicPartition import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater /** * Choice of how to create and configure underlying Kafka Consumers on driver and executors. @@ -54,6 +55,15 @@ abstract class ConsumerStrategy[K, V] { * checkpoint. */ def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] + + /** + * Updates the parameters with security if needed. + * Added a function to hide internals and reduce code duplications because all strategy uses it. + */ + protected def setAuthenticationConfigIfNeeded(kafkaParams: ju.Map[String, Object]) = + KafkaConfigUpdater("source", kafkaParams.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() } /** @@ -78,7 +88,8 @@ private case class Subscribe[K, V]( def executorKafkaParams: ju.Map[String, Object] = kafkaParams def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) + val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) + val consumer = new KafkaConsumer[K, V](updatedKafkaParams) consumer.subscribe(topics) val toSeek = if (currentOffsets.isEmpty) { offsets @@ -134,7 +145,8 @@ private case class SubscribePattern[K, V]( def executorKafkaParams: ju.Map[String, Object] = kafkaParams def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) + val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) + val consumer = new KafkaConsumer[K, V](updatedKafkaParams) consumer.subscribe(pattern, new NoOpConsumerRebalanceListener()) val toSeek = if (currentOffsets.isEmpty) { offsets @@ -186,7 +198,8 @@ private case class Assign[K, V]( def executorKafkaParams: ju.Map[String, Object] = kafkaParams def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { - val consumer = new KafkaConsumer[K, V](kafkaParams) + val updatedKafkaParams = setAuthenticationConfigIfNeeded(kafkaParams) + val consumer = new KafkaConsumer[K, V](updatedKafkaParams) consumer.assign(topicPartitions) val toSeek = if (currentOffsets.isEmpty) { offsets diff --git a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala index 68c5fe9ab066..142e946188ac 100644 --- a/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala +++ b/external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaDataConsumer.scala @@ -19,11 +19,14 @@ package org.apache.spark.streaming.kafka010 import java.{util => ju} +import scala.collection.JavaConverters._ + import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging +import org.apache.spark.kafka010.KafkaConfigUpdater private[kafka010] sealed trait KafkaDataConsumer[K, V] { /** @@ -109,7 +112,10 @@ private[kafka010] class InternalKafkaConsumer[K, V]( /** Create a KafkaConsumer to fetch records for `topicPartition` */ private def createConsumer: KafkaConsumer[K, V] = { - val c = new KafkaConsumer[K, V](kafkaParams) + val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) + .setAuthenticationConfigIfNeeded() + .build() + val c = new KafkaConsumer[K, V](updatedKafkaParams) val topics = ju.Arrays.asList(topicPartition) c.assign(topics) c From 6978b57350e0db97ee310e1f8b96ea746f147739 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Tue, 5 Mar 2019 10:44:26 +0100 Subject: [PATCH 2/3] Add doc --- docs/streaming-kafka-0-10-integration.md | 81 +++++++++++++++++++++--- 1 file changed, 72 insertions(+), 9 deletions(-) diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index c78459cd27d1..f35227d739dd 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -277,9 +277,79 @@ stream.foreachRDD(rdd -> { -### SSL / TLS -The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html#security_ssl). To enable it, set kafkaParams appropriately before passing to `createDirectStream` / `createRDD`. Note that this only applies to communication between Spark and Kafka brokers; you are still responsible for separately [securing](security.html) Spark inter-node communication. +### Deploying + +As with any Spark applications, `spark-submit` is used to launch your application. + +For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). + +### Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)** +- **JAAS login configuration** + +#### Delegation token + +This way the application can be configured via Spark parameters and may not need JAAS login +configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information +about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + +The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set, +Spark considers the following log in options, in order of preference: +- **JAAS login configuration**, please see example below. +- **Keytab file**, such as, + + ./bin/spark-submit \ + --keytab \ + --principal \ + --conf "spark.kafka.bootstrap.servers=" \ + ... + +- **Kerberos credential cache**, such as, + + ./bin/spark-submit \ + --conf "spark.kafka.bootstrap.servers=" \ + ... + +The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`). +Spark can be configured to use the following authentication protocols to obtain token (it must match with +Kafka broker configuration): +- **SASL SSL (default)** +- **SSL** +- **SASL PLAINTEXT (for testing)** + +After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. +Delegation token uses `SCRAM` login module for authentication and because of that the appropriate +`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter +must match with Kafka broker configuration. + +When delegation token is available on an executor it can be overridden with JAAS login configuration. + +##### Caveats + +- Obtaining delegation token for proxy user is not yet supported ([KAFKA-6945](https://issues.apache.org/jira/browse/KAFKA-6945)). +- Kafka native sink is not available so delegation token used only on consumer side. + +#### JAAS login configuration + +JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster. +This provides the possibility to apply any custom authentication logic with a higher cost to maintain. +This can be done several ways. One possibility is to provide additional JVM parameters, such as, + + ./bin/spark-submit \ + --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \ + --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \ + ... + +#### SSL / TLS +The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html#security_ssl). To enable it, set kafkaParams appropriately before passing to `createDirectStream` / `createRDD`. Note that this only applies to communication between Spark and Kafka brokers; you are still responsible for separately [securing](security.html) Spark inter-node communication.
@@ -308,10 +378,3 @@ kafkaParams.put("ssl.key.password", "test1234"); {% endhighlight %}
- -### Deploying - -As with any Spark applications, `spark-submit` is used to launch your application. - -For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). - From 07bb7165557c624c1b8b086377949e8949dfd192 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 7 Mar 2019 15:05:27 +0100 Subject: [PATCH 3/3] Add link to Structured Streaming --- docs/streaming-kafka-0-10-integration.md | 88 +++++------------------- 1 file changed, 16 insertions(+), 72 deletions(-) diff --git a/docs/streaming-kafka-0-10-integration.md b/docs/streaming-kafka-0-10-integration.md index f35227d739dd..b63f69f00e13 100644 --- a/docs/streaming-kafka-0-10-integration.md +++ b/docs/streaming-kafka-0-10-integration.md @@ -277,80 +277,10 @@ stream.foreachRDD(rdd -> { -### Deploying - -As with any Spark applications, `spark-submit` is used to launch your application. - -For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). - -### Security - -Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed -description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). - -It's worth noting that security is optional and turned off by default. - -Spark supports the following ways to authenticate against Kafka cluster: -- **Delegation token (introduced in Kafka broker 1.1.0)** -- **JAAS login configuration** - -#### Delegation token - -This way the application can be configured via Spark parameters and may not need JAAS login -configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information -about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). - -The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers` is set, -Spark considers the following log in options, in order of preference: -- **JAAS login configuration**, please see example below. -- **Keytab file**, such as, - - ./bin/spark-submit \ - --keytab \ - --principal \ - --conf "spark.kafka.bootstrap.servers=" \ - ... - -- **Kerberos credential cache**, such as, - - ./bin/spark-submit \ - --conf "spark.kafka.bootstrap.servers=" \ - ... - -The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`). - -Spark can be configured to use the following authentication protocols to obtain token (it must match with -Kafka broker configuration): -- **SASL SSL (default)** -- **SSL** -- **SASL PLAINTEXT (for testing)** - -After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. -Delegation token uses `SCRAM` login module for authentication and because of that the appropriate -`spark.kafka.sasl.token.mechanism` (default: `SCRAM-SHA-512`) has to be configured. Also, this parameter -must match with Kafka broker configuration. - -When delegation token is available on an executor it can be overridden with JAAS login configuration. - -##### Caveats - -- Obtaining delegation token for proxy user is not yet supported ([KAFKA-6945](https://issues.apache.org/jira/browse/KAFKA-6945)). -- Kafka native sink is not available so delegation token used only on consumer side. - -#### JAAS login configuration - -JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster. -This provides the possibility to apply any custom authentication logic with a higher cost to maintain. -This can be done several ways. One possibility is to provide additional JVM parameters, such as, - - ./bin/spark-submit \ - --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \ - --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \ - ... - -#### SSL / TLS +### SSL / TLS The new Kafka consumer [supports SSL](http://kafka.apache.org/documentation.html#security_ssl). To enable it, set kafkaParams appropriately before passing to `createDirectStream` / `createRDD`. Note that this only applies to communication between Spark and Kafka brokers; you are still responsible for separately [securing](security.html) Spark inter-node communication. +
{% highlight scala %} @@ -378,3 +308,17 @@ kafkaParams.put("ssl.key.password", "test1234"); {% endhighlight %}
+ +### Deploying + +As with any Spark applications, `spark-submit` is used to launch your application. + +For Scala and Java applications, if you are using SBT or Maven for project management, then package `spark-streaming-kafka-0-10_{{site.SCALA_BINARY_VERSION}}` and its dependencies into the application JAR. Make sure `spark-core_{{site.SCALA_BINARY_VERSION}}` and `spark-streaming_{{site.SCALA_BINARY_VERSION}}` are marked as `provided` dependencies as those are already present in a Spark installation. Then use `spark-submit` to launch your application (see [Deploying section](streaming-programming-guide.html#deploying-applications) in the main programming guide). + +### Security + +See [Structured Streaming Security](structured-streaming-kafka-integration.html#security). + +##### Additional Caveats + +- Kafka native sink is not available so delegation token used only on consumer side.