From 7026056fd63d739683a686da63fd03c09f5c8894 Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Thu, 13 Dec 2018 19:26:19 +0100 Subject: [PATCH] [SPARK-26371][SS][TESTS] Increase kafka ConfigUpdater test coverage. --- external/kafka-0-10-sql/pom.xml | 5 + .../sql/kafka010/KafkaConfigUpdater.scala | 74 +++++++ .../sql/kafka010/KafkaSourceProvider.scala | 52 +---- .../kafka010/KafkaConfigUpdaterSuite.scala | 186 ++++++++++++++++++ 4 files changed, 268 insertions(+), 49 deletions(-) create mode 100644 external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala create mode 100644 external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index de8731c4b774b..1c77906f43b17 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -106,6 +106,11 @@ ${jetty.version} test + + org.mockito + mockito-core + test + org.scalacheck scalacheck_${scala.binary.version} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala new file mode 100644 index 0000000000000..c28a62f07d5fc --- /dev/null +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdater.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.common.config.SaslConfigs + +import org.apache.spark.SparkEnv +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.Kafka + +/** + * Class to conveniently update Kafka config params, while logging the changes + */ +private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, String]) + extends Logging { + private val map = new ju.HashMap[String, Object](kafkaParams.asJava) + + def set(key: String, value: Object): this.type = { + map.put(key, value) + logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}") + this + } + + def setIfUnset(key: String, value: Object): KafkaConfigUpdater = { + if (!map.containsKey(key)) { + map.put(key, value) + logDebug(s"$module: Set $key to $value") + } + this + } + + def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater = { + // There are multiple possibilities to log in and applied in the following order: + // - JVM global security provided -> try to log in with JVM global security configuration + // which can be configured for example with 'java.security.auth.login.config'. + // For this no additional parameter needed. + // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS + // configuration. + if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) { + logDebug("JVM global security configuration detected, using it for login.") + } else if (KafkaSecurityHelper.isTokenAvailable()) { + logDebug("Delegation token detected, using it for login.") + val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) + set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) + val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM) + require(mechanism.startsWith("SCRAM"), + "Delegation token works only with SCRAM mechanism.") + set(SaslConfigs.SASL_MECHANISM, mechanism) + } + this + } + + def build(): ju.Map[String, Object] = map +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 6a0c2088ac3d1..ddd629b7ba716 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -24,13 +24,9 @@ import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.config.SaslConfigs import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} -import org.apache.spark.SparkEnv -import org.apache.spark.deploy.security.KafkaTokenUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config._ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ @@ -485,7 +481,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { } def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): ju.Map[String, Object] = - ConfigUpdater("source", specifiedKafkaParams) + KafkaConfigUpdater("source", specifiedKafkaParams) .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName) .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName) @@ -508,7 +504,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { def kafkaParamsForExecutors( specifiedKafkaParams: Map[String, String], uniqueGroupId: String): ju.Map[String, Object] = - ConfigUpdater("executor", specifiedKafkaParams) + KafkaConfigUpdater("executor", specifiedKafkaParams) .set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName) .set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName) @@ -539,48 +535,6 @@ private[kafka010] object KafkaSourceProvider extends Logging { s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}" } - /** Class to conveniently update Kafka config params, while logging the changes */ - private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) { - private val map = new ju.HashMap[String, Object](kafkaParams.asJava) - - def set(key: String, value: Object): this.type = { - map.put(key, value) - logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}") - this - } - - def setIfUnset(key: String, value: Object): ConfigUpdater = { - if (!map.containsKey(key)) { - map.put(key, value) - logDebug(s"$module: Set $key to $value") - } - this - } - - def setAuthenticationConfigIfNeeded(): ConfigUpdater = { - // There are multiple possibilities to log in and applied in the following order: - // - JVM global security provided -> try to log in with JVM global security configuration - // which can be configured for example with 'java.security.auth.login.config'. - // For this no additional parameter needed. - // - Token is provided -> try to log in with scram module using kafka's dynamic JAAS - // configuration. - if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) { - logDebug("JVM global security configuration detected, using it for login.") - } else if (KafkaSecurityHelper.isTokenAvailable()) { - logDebug("Delegation token detected, using it for login.") - val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf) - set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams) - val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM) - require(mechanism.startsWith("SCRAM"), - "Delegation token works only with SCRAM mechanism.") - set(SaslConfigs.SASL_MECHANISM, mechanism) - } - this - } - - def build(): ju.Map[String, Object] = map - } - private[kafka010] def kafkaParamsForProducer( parameters: Map[String, String]): ju.Map[String, Object] = { val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } @@ -598,7 +552,7 @@ private[kafka010] object KafkaSourceProvider extends Logging { val specifiedKafkaParams = convertToSpecifiedParams(parameters) - ConfigUpdater("executor", specifiedKafkaParams) + KafkaConfigUpdater("executor", specifiedKafkaParams) .set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName) .set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName) .setAuthenticationConfigIfNeeded() diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala new file mode 100644 index 0000000000000..1e3840eae551b --- /dev/null +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaConfigUpdaterSuite.scala @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.UUID +import javax.security.auth.login.{AppConfigurationEntry, Configuration} + +import org.apache.hadoop.security.{Credentials, UserGroupInformation} +import org.apache.hadoop.security.token.Token +import org.apache.kafka.common.config.SaslConfigs +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfterEach + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.deploy.security.KafkaTokenUtil +import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier +import org.apache.spark.internal.config._ + +class KafkaConfigUpdaterSuite extends SparkFunSuite with BeforeAndAfterEach { + private val testModule = "testModule" + private val testKey = "testKey" + private val testValue = "testValue" + private val otherTestValue = "otherTestValue" + private val tokenId = "tokenId" + UUID.randomUUID().toString + private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString + + private class KafkaJaasConfiguration extends Configuration { + val entry = + new AppConfigurationEntry( + "DummyModule", + AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, + ju.Collections.emptyMap[String, Object]() + ) + + override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = { + if (name.equals("KafkaClient")) { + Array(entry) + } else { + null + } + } + } + + override def afterEach(): Unit = { + try { + resetGlobalConfig + resetUGI + resetSparkEnv + } finally { + super.afterEach() + } + } + + private def setGlobalKafkaClientConfig(): Unit = { + Configuration.setConfiguration(new KafkaJaasConfiguration) + } + + private def resetGlobalConfig: Unit = { + Configuration.setConfiguration(null) + } + + private def addTokenToUGI(): Unit = { + val token = new Token[KafkaDelegationTokenIdentifier]( + tokenId.getBytes, + tokenPassword.getBytes, + KafkaTokenUtil.TOKEN_KIND, + KafkaTokenUtil.TOKEN_SERVICE + ) + val creds = new Credentials() + creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token) + UserGroupInformation.getCurrentUser.addCredentials(creds) + } + + private def resetUGI: Unit = { + UserGroupInformation.setLoginUser(null) + } + + private def setSparkEnv(settings: Traversable[(String, String)]): Unit = { + val conf = new SparkConf().setAll(settings) + val env = mock(classOf[SparkEnv]) + doReturn(conf).when(env).conf + SparkEnv.set(env) + } + + private def resetSparkEnv: Unit = { + SparkEnv.set(null) + } + + test("set should always set value") { + val params = Map.empty[String, String] + + val updatedParams = KafkaConfigUpdater(testModule, params) + .set(testKey, testValue) + .build() + + assert(updatedParams.size() === 1) + assert(updatedParams.get(testKey) === testValue) + } + + test("setIfUnset without existing key should set value") { + val params = Map.empty[String, String] + + val updatedParams = KafkaConfigUpdater(testModule, params) + .setIfUnset(testKey, testValue) + .build() + + assert(updatedParams.size() === 1) + assert(updatedParams.get(testKey) === testValue) + } + + test("setIfUnset with existing key should not set value") { + val params = Map[String, String](testKey -> testValue) + + val updatedParams = KafkaConfigUpdater(testModule, params) + .setIfUnset(testKey, otherTestValue) + .build() + + assert(updatedParams.size() === 1) + assert(updatedParams.get(testKey) === testValue) + } + + test("setAuthenticationConfigIfNeeded with global security should not set values") { + val params = Map.empty[String, String] + setGlobalKafkaClientConfig() + + val updatedParams = KafkaConfigUpdater(testModule, params) + .setAuthenticationConfigIfNeeded() + .build() + + assert(updatedParams.size() === 0) + } + + test("setAuthenticationConfigIfNeeded with token should set values") { + val params = Map.empty[String, String] + setSparkEnv(Map.empty) + addTokenToUGI() + + val updatedParams = KafkaConfigUpdater(testModule, params) + .setAuthenticationConfigIfNeeded() + .build() + + assert(updatedParams.size() === 2) + assert(updatedParams.containsKey(SaslConfigs.SASL_JAAS_CONFIG)) + assert(updatedParams.get(SaslConfigs.SASL_MECHANISM) === "SCRAM-SHA-512") + } + + test("setAuthenticationConfigIfNeeded with token and invalid mechanism should throw exception") { + val params = Map.empty[String, String] + setSparkEnv(Map[String, String](Kafka.TOKEN_SASL_MECHANISM.key -> "INVALID")) + addTokenToUGI() + + val e = intercept[IllegalArgumentException] { + KafkaConfigUpdater(testModule, params) + .setAuthenticationConfigIfNeeded() + .build() + } + + assert(e.getMessage.contains("Delegation token works only with SCRAM mechanism.")) + } + + test("setAuthenticationConfigIfNeeded without security should not set values") { + val params = Map.empty[String, String] + + val updatedParams = KafkaConfigUpdater(testModule, params) + .setAuthenticationConfigIfNeeded() + .build() + + assert(updatedParams.size() === 0) + } +}