Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14962: Trim whitespace from ACL configuration #13670

Merged
merged 3 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ public Boolean getBoolean(String key) {
}

public String getString(String key) {
return (String) get(key);
final String res = (String) get(key);
return res == null ? res : res.trim();
}

public ConfigDef.Type typeOf(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ public class AbstractConfigTest {

@Test
public void testConfiguredInstances() {
testValidInputs(" ");
testValidInputs("");
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter");
testValidInputs(" org.apache.kafka.common.metrics.FakeMetricsReporter ");
testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter");
testInvalidInputs(",");
testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object AclAuthorizer {

private[authorizer] def zkClientConfigFromKafkaConfigAndMap(kafkaConfig: KafkaConfig, configMap: mutable.Map[String, _<:Any]): ZKClientConfig = {
val zkSslClientEnable = configMap.get(AclAuthorizer.configPrefix + KafkaConfig.ZkSslClientEnableProp).
map(_.toString).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
map(_.toString.trim).getOrElse(kafkaConfig.zkSslClientEnable.toString).toBoolean
if (!zkSslClientEnable)
new ZKClientConfig
else {
Expand All @@ -109,9 +109,9 @@ object AclAuthorizer {
configMap.get(AclAuthorizer.configPrefix + kafkaProp).foreach { prefixedValue =>
zkClientConfig.setProperty(sysProp,
if (kafkaProp == KafkaConfig.ZkSslEndpointIdentificationAlgorithmProp)
(prefixedValue.toString.toUpperCase == "HTTPS").toString
(prefixedValue.toString.trim.toUpperCase == "HTTPS").toString
else
prefixedValue.toString)
prefixedValue.toString.trim)
}
}
zkClientConfig
Expand Down Expand Up @@ -185,22 +185,22 @@ class AclAuthorizer extends Authorizer with Logging {
override def configure(javaConfigs: util.Map[String, _]): Unit = {
val configs = javaConfigs.asScala
val props = new java.util.Properties()
configs.forKeyValue { (key, value) => props.put(key, value.toString) }
configs.forKeyValue { (key, value) => props.put(key, value.toString.trim) }

superUsers = configs.get(AclAuthorizer.SuperUsersProp).collect {
case str: String if str.nonEmpty => str.split(";").map(s => SecurityUtils.parseKafkaPrincipal(s.trim)).toSet
}.getOrElse(Set.empty[KafkaPrincipal])

shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.toBoolean)
shouldAllowEveryoneIfNoAclIsFound = configs.get(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp).exists(_.toString.trim.toBoolean)

// Use `KafkaConfig` in order to get the default ZK config values if not present in `javaConfigs`. Note that this
// means that `KafkaConfig.zkConnect` must always be set by the user (even if `AclAuthorizer.ZkUrlProp` is also
// set).
val kafkaConfig = KafkaConfig.fromProps(props, doLog = false)
val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString).getOrElse(kafkaConfig.zkConnect)
val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)
val zkUrl = configs.get(AclAuthorizer.ZkUrlProp).map(_.toString.trim).getOrElse(kafkaConfig.zkConnect)
val zkConnectionTimeoutMs = configs.get(AclAuthorizer.ZkConnectionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkConnectionTimeoutMs)
val zkSessionTimeOutMs = configs.get(AclAuthorizer.ZkSessionTimeOutProp).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkSessionTimeoutMs)
val zkMaxInFlightRequests = configs.get(AclAuthorizer.ZkMaxInFlightRequests).map(_.toString.trim.toInt).getOrElse(kafkaConfig.zkMaxInFlightRequests)

val zkClientConfig = AclAuthorizer.zkClientConfigFromKafkaConfigAndMap(kafkaConfig, configs)
val time = Time.SYSTEM
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,24 @@ class AuthorizerTest extends QuorumTestHarness with BaseAuthorizerTest {
}
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array(KRAFT, ZK))
def testAclConfigWithWhitespace(quorum: String): Unit = {
val props = properties
props.put(AclAuthorizer.AllowEveryoneIfNoAclIsFoundProp, " true")
// replace all property values with leading & trailing whitespaces
props.replaceAll((_,v) => " " + v + " ")
val cfg = KafkaConfig.fromProps(props)
var testAuthorizer: Authorizer = null
try {
testAuthorizer = createAuthorizer(cfg.originals)
assertTrue(authorize(testAuthorizer, requestContext, READ, resource),
"when acls = null or [], authorizer should allow op with allow.everyone = true.")
} finally {
testAuthorizer.close()
}
}

@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
@ValueSource(strings = Array(KRAFT, ZK))
def testAclManagementAPIs(quorum: String): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void configure(Map<String, ?> configs) {
AuthorizationResult defaultResult = getDefaultResult(configs);
int nodeId;
try {
nodeId = Integer.parseInt(configs.get("node.id").toString());
nodeId = Integer.parseInt(configs.get("node.id").toString().trim());
} catch (Exception e) {
nodeId = -1;
}
Expand Down Expand Up @@ -204,6 +204,6 @@ static Set<String> getConfiguredSuperUsers(Map<String, ?> configs) {
static AuthorizationResult getDefaultResult(Map<String, ?> configs) {
Object configValue = configs.get(ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG);
if (configValue == null) return DENIED;
return Boolean.valueOf(configValue.toString()) ? ALLOWED : DENIED;
return Boolean.parseBoolean(configValue.toString().trim()) ? ALLOWED : DENIED;
}
}