From 88ccdfb961ec4adc82121b1ed44a198a4dbe4f89 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 17 Nov 2015 12:48:25 +0000 Subject: [PATCH 1/4] Remove principal.builder.class from client configs --- .../kafka/common/config/SslConfigs.java | 6 +++--- .../kafka/common/network/ChannelBuilders.java | 19 +++++++++++++++++++ .../network/PlaintextChannelBuilder.java | 5 +---- .../common/network/SaslChannelBuilder.java | 5 +---- .../common/network/SslChannelBuilder.java | 5 +---- .../clients/producer/KafkaProducerTest.java | 4 +--- .../kafka/common/network/SelectorTest.java | 4 +--- .../kafka/common/network/SslSelectorTest.java | 1 - .../common/network/SslTransportLayerTest.java | 5 +++-- 9 files changed, 30 insertions(+), 24 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index ae4667a8a4da9..0f9607a7865b4 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -22,7 +22,8 @@ public class SslConfigs { */ public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class"; - public static final String PRINCIPAL_BUILDER_CLASS_DOC = "principal builder to generate a java Principal. This config is optional for client."; + public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the PrincipalBuilder interface. " + + "Default is DefaultPrincipalBuilder."; public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder"; public static final String SSL_PROTOCOL_CONFIG = "ssl.protocol"; @@ -97,8 +98,7 @@ public class SslConfigs { + "
  • ssl.client.auth=none This means client authentication is not needed."; public static void addClientSslSupport(ConfigDef config) { - config.define(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, ConfigDef.Type.CLASS, SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS, ConfigDef.Importance.LOW, SslConfigs.PRINCIPAL_BUILDER_CLASS_DOC) - .define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) + config.define(SslConfigs.SSL_PROTOCOL_CONFIG, ConfigDef.Type.STRING, SslConfigs.DEFAULT_SSL_PROTOCOL, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROTOCOL_DOC) .define(SslConfigs.SSL_PROVIDER_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_PROVIDER_DOC) .define(SslConfigs.SSL_CIPHER_SUITES_CONFIG, ConfigDef.Type.LIST, null, ConfigDef.Importance.LOW, SslConfigs.SSL_CIPHER_SUITES_DOC) .define(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG, ConfigDef.Type.LIST, SslConfigs.DEFAULT_SSL_ENABLED_PROTOCOLS, ConfigDef.Importance.MEDIUM, SslConfigs.SSL_ENABLED_PROTOCOLS_DOC) diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java index 03c663d0f102d..669f269f91c2a 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java +++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java @@ -13,7 +13,11 @@ package org.apache.kafka.common.network; +import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.protocol.SecurityProtocol; +import org.apache.kafka.common.security.auth.DefaultPrincipalBuilder; +import org.apache.kafka.common.security.auth.PrincipalBuilder; +import org.apache.kafka.common.utils.Utils; import java.util.Map; @@ -57,6 +61,21 @@ public static ChannelBuilder create(SecurityProtocol securityProtocol, Mode mode return channelBuilder; } + /** + * Returns a configured `PrincipalBuilder`. + */ + static PrincipalBuilder createPrincipalBuilder(Map configs) { + // this is a server-only config so it will always be null on the client + Class principalBuilderClass = (Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG); + PrincipalBuilder principalBuilder; + if (principalBuilderClass == null) + principalBuilder = new DefaultPrincipalBuilder(); + else + principalBuilder = (PrincipalBuilder) Utils.newInstance(principalBuilderClass); + principalBuilder.configure(configs); + return principalBuilder; + } + private static void requireNonNullMode(Mode mode, SecurityProtocol securityProtocol) { if (mode == null) throw new IllegalArgumentException("`mode` must be non-null if `securityProtocol` is `" + securityProtocol + "`"); diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java index bc1536aa98b1d..f0af9351ae4a5 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java @@ -15,9 +15,7 @@ import java.nio.channels.SelectionKey; import java.util.Map; -import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.security.auth.PrincipalBuilder; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; @@ -31,8 +29,7 @@ public class PlaintextChannelBuilder implements ChannelBuilder { public void configure(Map configs) throws KafkaException { try { this.configs = configs; - this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); - this.principalBuilder.configure(this.configs); + principalBuilder = ChannelBuilders.createPrincipalBuilder(configs); } catch (Exception e) { throw new KafkaException(e); } diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index 75e3fcad4f3dc..e4765c144705f 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -27,8 +27,6 @@ import org.apache.kafka.common.security.authenticator.SaslServerAuthenticator; import org.apache.kafka.common.security.ssl.SslFactory; import org.apache.kafka.common.protocol.SecurityProtocol; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; @@ -57,8 +55,7 @@ public void configure(Map configs) throws KafkaException { try { this.configs = configs; this.loginManager = LoginManager.acquireLoginManager(loginType, configs); - this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); - this.principalBuilder.configure(configs); + this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs); String defaultRealm; try { diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java index 9a7ba0c42ea9d..b546174fdf1ac 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java @@ -19,8 +19,6 @@ import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.apache.kafka.common.security.ssl.SslFactory; -import org.apache.kafka.common.config.SslConfigs; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.common.KafkaException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,8 +39,7 @@ public void configure(Map configs) throws KafkaException { this.configs = configs; this.sslFactory = new SslFactory(mode); this.sslFactory.configure(this.configs); - this.principalBuilder = (PrincipalBuilder) Utils.newInstance((Class) configs.get(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG)); - this.principalBuilder.configure(this.configs); + this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs); } catch (Exception e) { throw new KafkaException(e); } diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java index b96a5f768008a..11302254da769 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.test.MockMetricsReporter; import org.apache.kafka.test.MockSerializer; @@ -54,12 +53,11 @@ public void testConstructorFailureCloseResource() { @Test public void testSerializerClose() throws Exception { - Map configs = new HashMap(); + Map configs = new HashMap<>(); configs.put(ProducerConfig.CLIENT_ID_CONFIG, "testConstructorClose"); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); configs.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName()); configs.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL); - configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); final int oldInitCount = MockSerializer.INIT_COUNT.get(); final int oldCloseCount = MockSerializer.CLOSE_COUNT.get(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 8ce02988c9ec1..18fd080a2ef6a 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -24,7 +24,6 @@ import java.util.*; import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.config.SslConfigs; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; @@ -48,8 +47,7 @@ public class SelectorTest { @Before public void setup() throws Exception { - Map configs = new HashMap(); - configs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + Map configs = new HashMap<>(); this.server = new EchoServer(configs); this.server.start(); this.time = new MockTime(); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java index 94c5654e2e88b..a442ea00049da 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java @@ -52,7 +52,6 @@ public void setup() throws Exception { this.server.start(); this.time = new MockTime(); sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.SERVER, trustStoreFile, "client"); - sslClientConfigs.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); this.channelBuilder = new SslChannelBuilder(Mode.CLIENT); this.channelBuilder.configure(sslClientConfigs); diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java index 282ff8b9cf7bd..2b5d26b50bd7d 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java @@ -464,11 +464,12 @@ private static class CertStores { Mode mode = server ? Mode.SERVER : Mode.CLIENT; File truststoreFile = File.createTempFile(name + "TS", ".jks"); sslConfig = TestSslUtils.createSslConfig(!server, true, mode, truststoreFile, name); - sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); + if (server) + sslConfig.put(SslConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, Class.forName(SslConfigs.DEFAULT_PRINCIPAL_BUILDER_CLASS)); } private Map getTrustingConfig(CertStores truststoreConfig) { - Map config = new HashMap(sslConfig); + Map config = new HashMap<>(sslConfig); config.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)); config.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)); config.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, truststoreConfig.sslConfig.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG)); From ce2c307bae8da40bed4eb53cc209c65973b72331 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 17 Nov 2015 12:52:35 +0000 Subject: [PATCH 2/4] Mark the `PrincipalBuilder` interface as `Unstable` I think it would be good to use it for SASL too and that may require the interface to be changed. --- checkstyle/import-control.xml | 1 + .../org/apache/kafka/common/security/auth/PrincipalBuilder.java | 2 ++ 2 files changed, 3 insertions(+) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 204bc60bfa5db..e221dce633b4f 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -66,6 +66,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java index 99b6d21aff3a2..75e18555e331b 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.security.auth; +import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.network.TransportLayer; import org.apache.kafka.common.network.Authenticator; import org.apache.kafka.common.KafkaException; @@ -28,6 +29,7 @@ /* * PrincipalBuilder for Authenticator */ +@InterfaceStability.Unstable public interface PrincipalBuilder extends Configurable { /** From 70249b1502e32c0ea484e398b318545a500ecb12 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 17 Nov 2015 12:53:44 +0000 Subject: [PATCH 3/4] Mention in docs for `PRINCIPAL_BUILDER_CLASS_DOC` that it's only used for `SecurityProtocol.SSL` --- .../main/java/org/apache/kafka/common/config/SslConfigs.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java index 0f9607a7865b4..a893b754cceed 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java +++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java @@ -22,7 +22,8 @@ public class SslConfigs { */ public static final String PRINCIPAL_BUILDER_CLASS_CONFIG = "principal.builder.class"; - public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the PrincipalBuilder interface. " + + public static final String PRINCIPAL_BUILDER_CLASS_DOC = "The fully qualified name of a class that implements the PrincipalBuilder interface, " + + "which is currently used to build the Principal for connections with the SSL SecurityProtocol. " + "Default is DefaultPrincipalBuilder."; public static final String DEFAULT_PRINCIPAL_BUILDER_CLASS = "org.apache.kafka.common.security.auth.DefaultPrincipalBuilder"; From 33a2faba753875336257d66fc6c36513f9c37f41 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 17 Nov 2015 15:52:15 +0000 Subject: [PATCH 4/4] Pass `null` as `principalBuilder` to SASL authenticators as they don't use it As suggested by Jun. --- .../apache/kafka/common/network/SaslChannelBuilder.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java index e4765c144705f..86ac779a4d68f 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.security.JaasUtils; -import org.apache.kafka.common.security.auth.PrincipalBuilder; import org.apache.kafka.common.security.kerberos.KerberosShortNamer; import org.apache.kafka.common.security.kerberos.LoginManager; import org.apache.kafka.common.security.authenticator.SaslClientAuthenticator; @@ -40,7 +39,6 @@ public class SaslChannelBuilder implements ChannelBuilder { private final LoginType loginType; private LoginManager loginManager; - private PrincipalBuilder principalBuilder; private SslFactory sslFactory; private Map configs; private KerberosShortNamer kerberosShortNamer; @@ -55,7 +53,6 @@ public void configure(Map configs) throws KafkaException { try { this.configs = configs; this.loginManager = LoginManager.acquireLoginManager(loginType, configs); - this.principalBuilder = ChannelBuilders.createPrincipalBuilder(configs); String defaultRealm; try { @@ -87,7 +84,8 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize else authenticator = new SaslClientAuthenticator(id, loginManager.subject(), loginManager.serviceName(), socketChannel.socket().getInetAddress().getHostName()); - authenticator.configure(transportLayer, this.principalBuilder, this.configs); + // Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes. + authenticator.configure(transportLayer, null, this.configs); return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize); } catch (Exception e) { log.info("Failed to create channel due to ", e); @@ -96,7 +94,6 @@ public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize } public void close() { - this.principalBuilder.close(); this.loginManager.release(); }