Skip to content

Commit

Permalink
[SPARK-26371][SS][TESTS] Increase kafka ConfigUpdater test coverage.
Browse files Browse the repository at this point in the history
  • Loading branch information
gaborgsomogyi committed Dec 14, 2018
1 parent 6c1f7ba commit 7026056
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 49 deletions.
5 changes: 5 additions & 0 deletions external/kafka-0-10-sql/pom.xml
Expand Up @@ -106,6 +106,11 @@
<version>${jetty.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Expand Down
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}

import scala.collection.JavaConverters._

import org.apache.kafka.common.config.SaslConfigs

import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.Kafka

/**
* Class to conveniently update Kafka config params, while logging the changes
*/
private[kafka010] case class KafkaConfigUpdater(module: String, kafkaParams: Map[String, String])
extends Logging {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)

def set(key: String, value: Object): this.type = {
map.put(key, value)
logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
this
}

def setIfUnset(key: String, value: Object): KafkaConfigUpdater = {
if (!map.containsKey(key)) {
map.put(key, value)
logDebug(s"$module: Set $key to $value")
}
this
}

def setAuthenticationConfigIfNeeded(): KafkaConfigUpdater = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
// For this no additional parameter needed.
// - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
// configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}

def build(): ju.Map[String, Object] = map
}
Expand Up @@ -24,13 +24,9 @@ import scala.collection.JavaConverters._

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer}

import org.apache.spark.SparkEnv
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
Expand Down Expand Up @@ -485,7 +481,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
}

def kafkaParamsForDriver(specifiedKafkaParams: Map[String, String]): ju.Map[String, Object] =
ConfigUpdater("source", specifiedKafkaParams)
KafkaConfigUpdater("source", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)

Expand All @@ -508,7 +504,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {
def kafkaParamsForExecutors(
specifiedKafkaParams: Map[String, String],
uniqueGroupId: String): ju.Map[String, Object] =
ConfigUpdater("executor", specifiedKafkaParams)
KafkaConfigUpdater("executor", specifiedKafkaParams)
.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserClassName)

Expand Down Expand Up @@ -539,48 +535,6 @@ private[kafka010] object KafkaSourceProvider extends Logging {
s"${groupIdPrefix}-${UUID.randomUUID}-${metadataPath.hashCode}"
}

/** Class to conveniently update Kafka config params, while logging the changes */
private case class ConfigUpdater(module: String, kafkaParams: Map[String, String]) {
private val map = new ju.HashMap[String, Object](kafkaParams.asJava)

def set(key: String, value: Object): this.type = {
map.put(key, value)
logDebug(s"$module: Set $key to $value, earlier value: ${kafkaParams.getOrElse(key, "")}")
this
}

def setIfUnset(key: String, value: Object): ConfigUpdater = {
if (!map.containsKey(key)) {
map.put(key, value)
logDebug(s"$module: Set $key to $value")
}
this
}

def setAuthenticationConfigIfNeeded(): ConfigUpdater = {
// There are multiple possibilities to log in and applied in the following order:
// - JVM global security provided -> try to log in with JVM global security configuration
// which can be configured for example with 'java.security.auth.login.config'.
// For this no additional parameter needed.
// - Token is provided -> try to log in with scram module using kafka's dynamic JAAS
// configuration.
if (KafkaTokenUtil.isGlobalJaasConfigurationProvided) {
logDebug("JVM global security configuration detected, using it for login.")
} else if (KafkaSecurityHelper.isTokenAvailable()) {
logDebug("Delegation token detected, using it for login.")
val jaasParams = KafkaSecurityHelper.getTokenJaasParams(SparkEnv.get.conf)
set(SaslConfigs.SASL_JAAS_CONFIG, jaasParams)
val mechanism = SparkEnv.get.conf.get(Kafka.TOKEN_SASL_MECHANISM)
require(mechanism.startsWith("SCRAM"),
"Delegation token works only with SCRAM mechanism.")
set(SaslConfigs.SASL_MECHANISM, mechanism)
}
this
}

def build(): ju.Map[String, Object] = map
}

private[kafka010] def kafkaParamsForProducer(
parameters: Map[String, String]): ju.Map[String, Object] = {
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
Expand All @@ -598,7 +552,7 @@ private[kafka010] object KafkaSourceProvider extends Logging {

val specifiedKafkaParams = convertToSpecifiedParams(parameters)

ConfigUpdater("executor", specifiedKafkaParams)
KafkaConfigUpdater("executor", specifiedKafkaParams)
.set(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, serClassName)
.set(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, serClassName)
.setAuthenticationConfigIfNeeded()
Expand Down
@@ -0,0 +1,186 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.kafka010

import java.{util => ju}
import java.util.UUID
import javax.security.auth.login.{AppConfigurationEntry, Configuration}

import org.apache.hadoop.security.{Credentials, UserGroupInformation}
import org.apache.hadoop.security.token.Token
import org.apache.kafka.common.config.SaslConfigs
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite}
import org.apache.spark.deploy.security.KafkaTokenUtil
import org.apache.spark.deploy.security.KafkaTokenUtil.KafkaDelegationTokenIdentifier
import org.apache.spark.internal.config._

class KafkaConfigUpdaterSuite extends SparkFunSuite with BeforeAndAfterEach {
private val testModule = "testModule"
private val testKey = "testKey"
private val testValue = "testValue"
private val otherTestValue = "otherTestValue"
private val tokenId = "tokenId" + UUID.randomUUID().toString
private val tokenPassword = "tokenPassword" + UUID.randomUUID().toString

private class KafkaJaasConfiguration extends Configuration {
val entry =
new AppConfigurationEntry(
"DummyModule",
AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
ju.Collections.emptyMap[String, Object]()
)

override def getAppConfigurationEntry(name: String): Array[AppConfigurationEntry] = {
if (name.equals("KafkaClient")) {
Array(entry)
} else {
null
}
}
}

override def afterEach(): Unit = {
try {
resetGlobalConfig
resetUGI
resetSparkEnv
} finally {
super.afterEach()
}
}

private def setGlobalKafkaClientConfig(): Unit = {
Configuration.setConfiguration(new KafkaJaasConfiguration)
}

private def resetGlobalConfig: Unit = {
Configuration.setConfiguration(null)
}

private def addTokenToUGI(): Unit = {
val token = new Token[KafkaDelegationTokenIdentifier](
tokenId.getBytes,
tokenPassword.getBytes,
KafkaTokenUtil.TOKEN_KIND,
KafkaTokenUtil.TOKEN_SERVICE
)
val creds = new Credentials()
creds.addToken(KafkaTokenUtil.TOKEN_SERVICE, token)
UserGroupInformation.getCurrentUser.addCredentials(creds)
}

private def resetUGI: Unit = {
UserGroupInformation.setLoginUser(null)
}

private def setSparkEnv(settings: Traversable[(String, String)]): Unit = {
val conf = new SparkConf().setAll(settings)
val env = mock(classOf[SparkEnv])
doReturn(conf).when(env).conf
SparkEnv.set(env)
}

private def resetSparkEnv: Unit = {
SparkEnv.set(null)
}

test("set should always set value") {
val params = Map.empty[String, String]

val updatedParams = KafkaConfigUpdater(testModule, params)
.set(testKey, testValue)
.build()

assert(updatedParams.size() === 1)
assert(updatedParams.get(testKey) === testValue)
}

test("setIfUnset without existing key should set value") {
val params = Map.empty[String, String]

val updatedParams = KafkaConfigUpdater(testModule, params)
.setIfUnset(testKey, testValue)
.build()

assert(updatedParams.size() === 1)
assert(updatedParams.get(testKey) === testValue)
}

test("setIfUnset with existing key should not set value") {
val params = Map[String, String](testKey -> testValue)

val updatedParams = KafkaConfigUpdater(testModule, params)
.setIfUnset(testKey, otherTestValue)
.build()

assert(updatedParams.size() === 1)
assert(updatedParams.get(testKey) === testValue)
}

test("setAuthenticationConfigIfNeeded with global security should not set values") {
val params = Map.empty[String, String]
setGlobalKafkaClientConfig()

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()

assert(updatedParams.size() === 0)
}

test("setAuthenticationConfigIfNeeded with token should set values") {
val params = Map.empty[String, String]
setSparkEnv(Map.empty)
addTokenToUGI()

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()

assert(updatedParams.size() === 2)
assert(updatedParams.containsKey(SaslConfigs.SASL_JAAS_CONFIG))
assert(updatedParams.get(SaslConfigs.SASL_MECHANISM) === "SCRAM-SHA-512")
}

test("setAuthenticationConfigIfNeeded with token and invalid mechanism should throw exception") {
val params = Map.empty[String, String]
setSparkEnv(Map[String, String](Kafka.TOKEN_SASL_MECHANISM.key -> "INVALID"))
addTokenToUGI()

val e = intercept[IllegalArgumentException] {
KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()
}

assert(e.getMessage.contains("Delegation token works only with SCRAM mechanism."))
}

test("setAuthenticationConfigIfNeeded without security should not set values") {
val params = Map.empty[String, String]

val updatedParams = KafkaConfigUpdater(testModule, params)
.setAuthenticationConfigIfNeeded()
.build()

assert(updatedParams.size() === 0)
}
}

0 comments on commit 7026056

Please sign in to comment.