From aa42a11dfd99ee9ab24d2e9a7521ef1c97ae1ff4 Mon Sep 17 00:00:00 2001 From: Charly Molter Date: Thu, 25 Jan 2018 05:06:44 +0000 Subject: [PATCH] KAFKA-6180; Add a Validator for NonNull configurations and remove redundant null checks on lists (#4188) --- .../clients/consumer/ConsumerConfig.java | 9 ++++-- .../kafka/clients/consumer/KafkaConsumer.java | 10 +++--- .../kafka/clients/producer/KafkaProducer.java | 31 ++++++------------- .../clients/producer/ProducerConfig.java | 9 ++++-- .../kafka/common/config/AbstractConfig.java | 4 +-- .../apache/kafka/common/config/ConfigDef.java | 29 +++++++++++++++++ .../kafka/common/security/ssl/SslFactory.java | 4 +-- .../org/apache/kafka/common/utils/Utils.java | 2 ++ .../clients/consumer/KafkaConsumerTest.java | 2 +- .../common/config/AbstractConfigTest.java | 16 ++++++++++ .../kafka/common/config/ConfigDefTest.java | 2 ++ .../connect/runtime/ConnectorConfig.java | 8 ++--- .../main/scala/kafka/server/KafkaConfig.scala | 4 +-- 13 files changed, 84 insertions(+), 46 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index 3fe58d7576ad..72e496cbd469 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -257,6 +257,8 @@ public class ConsumerConfig extends AbstractConfig { static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, + Collections.emptyList(), + new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) @@ -273,6 +275,7 @@ public class ConsumerConfig extends AbstractConfig { .define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG, Type.LIST, Collections.singletonList(RangeAssignor.class), + new ConfigDef.NonNullValidator(), Importance.MEDIUM, PARTITION_ASSIGNMENT_STRATEGY_DOC) .define(METADATA_MAX_AGE_CONFIG, @@ -382,7 +385,8 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, - "", + Collections.emptyList(), + new ConfigDef.NonNullValidator(), Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(KEY_DESERIALIZER_CLASS_CONFIG, @@ -407,7 +411,8 @@ public class ConsumerConfig extends AbstractConfig { CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC) .define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, - null, + Collections.emptyList(), + new ConfigDef.NonNullValidator(), Importance.LOW, INTERCEPTOR_CLASSES_DOC) .define(MAX_POLL_RECORDS_CONFIG, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 08877c9d7d3e..0bbbcf1e60f6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -63,6 +63,7 @@ import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -683,7 +684,7 @@ private KafkaConsumer(ConsumerConfig config, userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId); List> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptor.class); - this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList); + this.interceptors = new ConsumerInterceptors<>(interceptorList); if (keyDeserializer == null) { this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Deserializer.class); @@ -815,7 +816,7 @@ private KafkaConsumer(ConsumerConfig config, this.keyDeserializer = keyDeserializer; this.valueDeserializer = valueDeserializer; this.fetcher = fetcher; - this.interceptors = interceptors; + this.interceptors = Objects.requireNonNull(interceptors); this.time = time; this.client = client; this.metrics = metrics; @@ -1122,10 +1123,7 @@ public ConsumerRecords poll(long timeout) { if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) client.pollNoWakeup(); - if (this.interceptors == null) - return new ConsumerRecords<>(records); - else - return this.interceptors.onConsume(new ConsumerRecords<>(records)); + return this.interceptors.onConsume(new ConsumerRecords<>(records)); } long elapsed = time.milliseconds() - start; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 4e67fe8323e1..5fc9a1b9b38b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -378,7 +378,7 @@ public KafkaProducer(Properties properties, Serializer keySerializer, Seriali userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); List> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); - this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList); + this.interceptors = new ProducerInterceptors<>(interceptorList); ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); @@ -780,7 +780,7 @@ public Future send(ProducerRecord record) { @Override public Future send(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions - ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record); + ProducerRecord interceptedRecord = this.interceptors.onSend(record); return doSend(interceptedRecord, callback); } @@ -822,7 +822,7 @@ private Future doSend(ProducerRecord record, Callback call long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp(); log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition); // producer callback will make sure to call both 'callback' and interceptor callback - Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp); + Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp); if (transactionManager != null && transactionManager.isTransactional()) transactionManager.maybeAddPartitionToTransaction(tp); @@ -842,29 +842,24 @@ private Future doSend(ProducerRecord record, Callback call if (callback != null) callback.onCompletion(null, e); this.errors.record(); - if (this.interceptors != null) - this.interceptors.onSendError(record, tp, e); + this.interceptors.onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this.errors.record(); - if (this.interceptors != null) - this.interceptors.onSendError(record, tp, e); + this.interceptors.onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); - if (this.interceptors != null) - this.interceptors.onSendError(record, tp, e); + this.interceptors.onSendError(record, tp, e); throw e; } catch (KafkaException e) { this.errors.record(); - if (this.interceptors != null) - this.interceptors.onSendError(record, tp, e); + this.interceptors.onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method - if (this.interceptors != null) - this.interceptors.onSendError(record, tp, e); + this.interceptors.onSendError(record, tp, e); throw e; } } @@ -1198,14 +1193,8 @@ private InterceptorCallback(Callback userCallback, ProducerInterceptors in } public void onCompletion(RecordMetadata metadata, Exception exception) { - if (this.interceptors != null) { - if (metadata == null) { - this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, - Long.valueOf(-1L), -1, -1), exception); - } else { - this.interceptors.onAcknowledgement(metadata, exception); - } - } + metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1); + this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); } diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 0631814cda9c..6428dc42d491 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.Serializer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -221,7 +222,7 @@ public class ProducerConfig extends AbstractConfig { "Note that transactions requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting `transaction.state.log.replication.factor`."; static { - CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) + CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) .define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) .define(ACKS_CONFIG, @@ -273,7 +274,8 @@ public class ProducerConfig extends AbstractConfig { CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, - "", + Collections.emptyList(), + new ConfigDef.NonNullValidator(), Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, @@ -302,7 +304,8 @@ public class ProducerConfig extends AbstractConfig { Importance.MEDIUM, PARTITIONER_CLASS_DOC) .define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, - null, + Collections.emptyList(), + new ConfigDef.NonNullValidator(), Importance.LOW, INTERCEPTOR_CLASSES_DOC) .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 61a5798eeb4a..9e32074defb3 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -289,9 +289,7 @@ public List getConfiguredInstances(String key, Class t) { */ public List getConfiguredInstances(String key, Class t, Map configOverrides) { List klasses = getList(key); - List objects = new ArrayList(); - if (klasses == null) - return objects; + List objects = new ArrayList<>(); Map configPairs = originals(); configPairs.putAll(configOverrides); for (Object klass : klasses) { diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 3340ab3d656f..30802984cac9 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -937,6 +937,35 @@ public String toString() { } } + public static class NonNullValidator implements Validator { + @Override + public void ensureValid(String name, Object value) { + if (value == null) { + // Pass in the string null to avoid the findbugs warning + throw new ConfigException(name, "null", "entry must be non null"); + } + } + } + + public static class CompositeValidator implements Validator { + private final List validators; + + private CompositeValidator(List validators) { + this.validators = Collections.unmodifiableList(validators); + } + + public static CompositeValidator of(Validator... validators) { + return new CompositeValidator(Arrays.asList(validators)); + } + + @Override + public void ensureValid(String name, Object value) { + for (Validator validator: validators) { + validator.ensureValid(name, value); + } + } + } + public static class NonEmptyString implements Validator { @Override diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java index 285582cc58cc..0d1fbf95dc5f 100644 --- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java +++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java @@ -89,12 +89,12 @@ public void configure(Map configs) throws KafkaException { @SuppressWarnings("unchecked") List cipherSuitesList = (List) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG); - if (cipherSuitesList != null) + if (cipherSuitesList != null && !cipherSuitesList.isEmpty()) this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]); @SuppressWarnings("unchecked") List enabledProtocolsList = (List) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG); - if (enabledProtocolsList != null) + if (enabledProtocolsList != null && !enabledProtocolsList.isEmpty()) this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]); String endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG); diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 8d8f118981ab..9da382237723 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -54,6 +54,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Properties; import java.util.Set; import java.util.regex.Matcher; @@ -464,6 +465,7 @@ public static String join(T[] strs, String separator) { * @return The string representation. */ public static String join(Collection list, String separator) { + Objects.requireNonNull(list); StringBuilder sb = new StringBuilder(); Iterator iter = list.iterator(); while (iter.hasNext()) { diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index ab682d6222a7..a827168039ec 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -1615,7 +1615,7 @@ private KafkaConsumer newConsumer(Time time, OffsetResetStrategy autoResetStrategy = OffsetResetStrategy.EARLIEST; List assignors = Arrays.asList(assignor); - ConsumerInterceptors interceptors = null; + ConsumerInterceptors interceptors = new ConsumerInterceptors<>(Collections.>emptyList()); Metrics metrics = new Metrics(); ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix); diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java index 9e2117975a95..2e157152bccb 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java @@ -26,6 +26,7 @@ import org.junit.Test; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -50,6 +51,21 @@ public void testConfiguredInstances() { testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,"); } + @Test + public void testEmptyList() { + AbstractConfig conf; + ConfigDef configDef = new ConfigDef().define("a", Type.LIST, "", new ConfigDef.NonNullValidator(), Importance.HIGH, "doc"); + + conf = new AbstractConfig(configDef, Collections.emptyMap()); + assertEquals(Collections.emptyList(), conf.getList("a")); + + conf = new AbstractConfig(configDef, Collections.singletonMap("a", "")); + assertEquals(Collections.emptyList(), conf.getList("a")); + + conf = new AbstractConfig(configDef, Collections.singletonMap("a", "b,c,d")); + assertEquals(Arrays.asList("b", "c", "d"), conf.getList("a")); + } + @Test public void testOriginalsWithPrefix() { Properties props = new Properties(); diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index ed4997dea4ba..602147b31143 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -158,6 +158,8 @@ public void testValidators() { testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default", new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs", null}); testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"}); + testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"}, new Object[] {null}); + testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[] {null, -1, "c"}); } @Test diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 0f8c39088eb8..e63d1002a92a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -101,16 +101,15 @@ public static ConfigDef configDef() { .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY) .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY) .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY) - .define(TRANSFORMS_CONFIG, Type.LIST, null, new ConfigDef.Validator() { + .define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.Validator() { @Override public void ensureValid(String name, Object value) { - if (value == null) return; final List transformAliases = (List) value; if (transformAliases.size() > new HashSet<>(transformAliases).size()) { throw new ConfigException(name, value, "Duplicate alias provided."); } } - }, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY); + }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY); } public ConnectorConfig(Plugins plugins) { @@ -139,9 +138,6 @@ public Object get(String key) { */ public > List> transformations() { final List transformAliases = getList(TRANSFORMS_CONFIG); - if (transformAliases == null || transformAliases.isEmpty()) { - return Collections.emptyList(); - } final List> transformations = new ArrayList<>(transformAliases.size()); for (String alias : transformAliases) { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 47f13f66e775..fba186c21c52 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -18,7 +18,7 @@ package kafka.server import java.util -import java.util.Properties +import java.util.{Collections, Properties} import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1} import kafka.cluster.EndPoint @@ -894,7 +894,7 @@ object KafkaConfig { .define(SslEndpointIdentificationAlgorithmProp, STRING, null, LOW, SslEndpointIdentificationAlgorithmDoc) .define(SslSecureRandomImplementationProp, STRING, null, LOW, SslSecureRandomImplementationDoc) .define(SslClientAuthProp, STRING, Defaults.SslClientAuth, in(Defaults.SslClientAuthRequired, Defaults.SslClientAuthRequested, Defaults.SslClientAuthNone), MEDIUM, SslClientAuthDoc) - .define(SslCipherSuitesProp, LIST, null, MEDIUM, SslCipherSuitesDoc) + .define(SslCipherSuitesProp, LIST, Collections.emptyList(), MEDIUM, SslCipherSuitesDoc) /** ********* Sasl Configuration ****************/ .define(SaslMechanismInterBrokerProtocolProp, STRING, Defaults.SaslMechanismInterBrokerProtocol, MEDIUM, SaslMechanismInterBrokerProtocolDoc)