Skip to content

Commit

Permalink
KAFKA-15053: Use case insensitive validator for security.protocol con…
Browse files Browse the repository at this point in the history
…fig (#13831)

Fixed a regression described in KAFKA-15053 that security.protocol only allows uppercase values like PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL. With this fix, both lower case and upper case values will be supported (e.g. PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL, plaintext, ssl, sasl_plaintext, sasl_ssl)

Reviewers: Chris Egerton <chrise@aiven.io>, Divij Vaidya <diviv@amazon.com>
  • Loading branch information
bogao007 authored and divijvaidya committed Jun 29, 2023
1 parent cdafa2f commit a25cc75
Show file tree
Hide file tree
Showing 16 changed files with 81 additions and 11 deletions.
Expand Up @@ -223,7 +223,8 @@ public class AdminClientConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand Down
Expand Up @@ -586,7 +586,8 @@ public class ConsumerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
Expand Down
Expand Up @@ -458,7 +458,8 @@ public class ProducerConfig extends AbstractConfig {
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(SECURITY_PROVIDERS_CONFIG,
Expand Down
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
Expand All @@ -27,6 +28,7 @@

import java.util.Arrays;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;

Expand Down Expand Up @@ -145,4 +147,15 @@ public void testInvalidSecurityProtocol() {
ConfigException ce = assertThrows(ConfigException.class, () -> new ConsumerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializerClass);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase);
final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
assertEquals(saslSslLowerCase, consumerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
}
Expand Up @@ -18,12 +18,14 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -95,4 +97,15 @@ public void testInvalidSecurityProtocol() {
ConfigException ce = assertThrows(ConfigException.class, () -> new ProducerConfig(configs));
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializerClass);
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase);
final ProducerConfig producerConfig = new ProducerConfig(configs);
assertEquals(saslSslLowerCase, producerConfig.originals().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
}
Expand Up @@ -29,7 +29,7 @@
import java.util.Map;
import java.util.HashMap;

import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;

/** Configuration required for MirrorClient to talk to a given target cluster.
* <p>
Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.apache.kafka.connect.runtime.ConnectorConfig;
import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;

import java.util.Map;
import java.util.HashMap;
Expand Down
Expand Up @@ -41,7 +41,7 @@
import java.util.Collections;
import java.util.stream.Collectors;

import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;

/** Top-level config describing replication flows between multiple Kafka clusters.
*
Expand Down
Expand Up @@ -19,9 +19,11 @@
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.test.MockMetricsReporter;
import org.junit.jupiter.api.Test;

import java.util.Locale;
import java.util.Map;
import java.util.HashMap;

Expand Down Expand Up @@ -163,6 +165,14 @@ public void testInvalidSecurityProtocol() {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final TestMirrorConnectorConfig config = new TestMirrorConnectorConfig(makeProps(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase));
assertEquals(saslSslLowerCase, config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

@Test
@SuppressWarnings("deprecation")
public void testMetricsReporters() {
Expand Down
Expand Up @@ -25,8 +25,10 @@
import org.apache.kafka.common.config.ConfigData;
import org.apache.kafka.common.metrics.FakeMetricsReporter;

import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.jupiter.api.Test;

import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.Collections;
Expand Down Expand Up @@ -361,6 +363,14 @@ public void testClientInvalidSecurityProtocol() {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final MirrorClientConfig config = new MirrorClientConfig(makeProps(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase));
assertEquals(saslSslLowerCase, config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

@Test
public void testAllConfigNames() {
MirrorMakerConfig mirrorConfig = new MirrorMakerConfig(makeProps(
Expand Down
Expand Up @@ -44,7 +44,7 @@

import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
import static org.apache.kafka.common.config.ConfigDef.Range.between;
import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
import static org.apache.kafka.common.config.ConfigDef.CaseInsensitiveValidString.in;
import static org.apache.kafka.common.utils.Utils.enumOptions;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.PARTITIONS_VALIDATOR;
import static org.apache.kafka.connect.runtime.TopicCreationConfig.REPLICATION_FACTOR_VALIDATOR;
Expand Down Expand Up @@ -317,7 +317,7 @@ private static ConfigDef config(Crypto crypto) {
.define(EXACTLY_ONCE_SOURCE_SUPPORT_CONFIG,
ConfigDef.Type.STRING,
EXACTLY_ONCE_SOURCE_SUPPORT_DEFAULT,
ConfigDef.CaseInsensitiveValidString.in(enumOptions(ExactlyOnceSourceSupport.class)),
in(enumOptions(ExactlyOnceSourceSupport.class)),
ConfigDef.Importance.HIGH,
EXACTLY_ONCE_SOURCE_SUPPORT_DOC)
.define(CommonClientConfigs.METADATA_MAX_AGE_CONFIG,
Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.junit.Test;

import javax.crypto.KeyGenerator;
Expand All @@ -30,6 +31,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -408,6 +410,16 @@ public void testInvalidSecurityProtocol() {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
final Map<String, String> configs = configs();
configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase);
final DistributedConfig distributedConfig = new DistributedConfig(configs);
assertEquals(saslSslLowerCase, distributedConfig.originalsStrings()
.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

@Test
public void shouldIdentifyNeedForTransactionalLeader() {
Map<String, String> workerProps = configs();
Expand Down
Expand Up @@ -229,7 +229,7 @@ object BrokerApiVersionsCommand {
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
ConfigDef.Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(classOf[SecurityProtocol]):_*),
ConfigDef.Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(
Expand Down
2 changes: 1 addition & 1 deletion docs/security.html
Expand Up @@ -66,7 +66,7 @@ <h3 class="anchor-heading"><a id="listener_configuration" class="anchor-link"></
<pre class="line-numbers"><code class="language-text">listener.security.protocol.map=CLIENT:SSL,BROKER:PLAINTEXT</code></pre>
<p>Possible options for the security protocol are given below:</p>
<p>Possible options (case-insensitive) for the security protocol are given below:</p>
<ol>
<li>PLAINTEXT</li>
<li>SSL</li>
Expand Down
Expand Up @@ -888,7 +888,7 @@ public class StreamsConfig extends AbstractConfig {
.define(SECURITY_PROTOCOL_CONFIG,
Type.STRING,
CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
in(Utils.enumOptions(SecurityProtocol.class)),
ConfigDef.CaseInsensitiveValidString.in(Utils.enumOptions(SecurityProtocol.class)),
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.define(TASK_TIMEOUT_MS_CONFIG,
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.TopicConfig;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
Expand Down Expand Up @@ -1287,6 +1288,14 @@ public void shouldUseDefaultStateStoreCacheMaxBytesConfigWhenNoConfigIsSet() {
assertEquals(getTotalCacheSize(config), 10 * 1024 * 1024);
}

@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase = SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, saslSslLowerCase);
final StreamsConfig config = new StreamsConfig(props);
assertEquals(saslSslLowerCase, config.originalsStrings().get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}

@Test
public void testInvalidSecurityProtocol() {
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "abc");
Expand Down

0 comments on commit a25cc75

Please sign in to comment.