Skip to content

Commit

Permalink
KAFKA-6180; Add a Validator for NonNull configurations and remove red…
Browse files Browse the repository at this point in the history
…undant null checks on lists (#4188)
  • Loading branch information
lahabana authored and hachikuji committed Jan 25, 2018
1 parent 52191c3 commit aa42a11
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 46 deletions.
Expand Up @@ -257,6 +257,8 @@ public class ConsumerConfig extends AbstractConfig {
static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.HIGH,
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC)
Expand All @@ -273,6 +275,7 @@ public class ConsumerConfig extends AbstractConfig {
.define(PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
Type.LIST,
Collections.singletonList(RangeAssignor.class),
new ConfigDef.NonNullValidator(),
Importance.MEDIUM,
PARTITION_ASSIGNMENT_STRATEGY_DOC)
.define(METADATA_MAX_AGE_CONFIG,
Expand Down Expand Up @@ -382,7 +385,8 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
"",
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(KEY_DESERIALIZER_CLASS_CONFIG,
Expand All @@ -407,7 +411,8 @@ public class ConsumerConfig extends AbstractConfig {
CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC)
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
null,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
INTERCEPTOR_CLASSES_DOC)
.define(MAX_POLL_RECORDS_CONFIG,
Expand Down
Expand Up @@ -63,6 +63,7 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -683,7 +684,7 @@ private KafkaConsumer(ConsumerConfig config,
userProvidedConfigs.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId);
List<ConsumerInterceptor<K, V>> interceptorList = (List) (new ConsumerConfig(userProvidedConfigs, false)).getConfiguredInstances(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
ConsumerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ConsumerInterceptors<>(interceptorList);
this.interceptors = new ConsumerInterceptors<>(interceptorList);
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
Expand Down Expand Up @@ -815,7 +816,7 @@ private KafkaConsumer(ConsumerConfig config,
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.fetcher = fetcher;
this.interceptors = interceptors;
this.interceptors = Objects.requireNonNull(interceptors);
this.time = time;
this.client = client;
this.metrics = metrics;
Expand Down Expand Up @@ -1122,10 +1123,7 @@ public ConsumerRecords<K, V> poll(long timeout) {
if (fetcher.sendFetches() > 0 || client.hasPendingRequests())
client.pollNoWakeup();

if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records));
return this.interceptors.onConsume(new ConsumerRecords<>(records));
}

long elapsed = time.milliseconds() - start;
Expand Down
Expand Up @@ -378,7 +378,7 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali
userProvidedConfigs.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class);
this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
Expand Down Expand Up @@ -780,7 +780,7 @@ public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
// intercept the record, which can be potentially modified; this method does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}

Expand Down Expand Up @@ -822,7 +822,7 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
// producer callback will make sure to call both 'callback' and interceptor callback
Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
Expand All @@ -842,29 +842,24 @@ private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback call
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (BufferExhaustedException e) {
this.errors.record();
this.metrics.sensor("buffer-exhausted-records").record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (KafkaException e) {
this.errors.record();
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
if (this.interceptors != null)
this.interceptors.onSendError(record, tp, e);
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
Expand Down Expand Up @@ -1198,14 +1193,8 @@ private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> in
}

public void onCompletion(RecordMetadata metadata, Exception exception) {
if (this.interceptors != null) {
if (metadata == null) {
this.interceptors.onAcknowledgement(new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP,
Long.valueOf(-1L), -1, -1), exception);
} else {
this.interceptors.onAcknowledgement(metadata, exception);
}
}
metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, Long.valueOf(-1L), -1, -1);
this.interceptors.onAcknowledgement(metadata, exception);
if (this.userCallback != null)
this.userCallback.onCompletion(metadata, exception);
}
Expand Down
Expand Up @@ -25,6 +25,7 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -221,7 +222,7 @@ public class ProducerConfig extends AbstractConfig {
"Note that transactions requires a cluster of at least three brokers by default what is the recommended setting for production; for development you can change this, by adjusting broker setting `transaction.state.log.replication.factor`.";

static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
.define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
.define(RETRIES_CONFIG, Type.INT, 0, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
.define(ACKS_CONFIG,
Expand Down Expand Up @@ -273,7 +274,8 @@ public class ProducerConfig extends AbstractConfig {
CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
.define(METRIC_REPORTER_CLASSES_CONFIG,
Type.LIST,
"",
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
Expand Down Expand Up @@ -302,7 +304,8 @@ public class ProducerConfig extends AbstractConfig {
Importance.MEDIUM, PARTITIONER_CLASS_DOC)
.define(INTERCEPTOR_CLASSES_CONFIG,
Type.LIST,
null,
Collections.emptyList(),
new ConfigDef.NonNullValidator(),
Importance.LOW,
INTERCEPTOR_CLASSES_DOC)
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
Expand Down
Expand Up @@ -289,9 +289,7 @@ public <T> List<T> getConfiguredInstances(String key, Class<T> t) {
*/
public <T> List<T> getConfiguredInstances(String key, Class<T> t, Map<String, Object> configOverrides) {
List<String> klasses = getList(key);
List<T> objects = new ArrayList<T>();
if (klasses == null)
return objects;
List<T> objects = new ArrayList<>();
Map<String, Object> configPairs = originals();
configPairs.putAll(configOverrides);
for (Object klass : klasses) {
Expand Down
Expand Up @@ -937,6 +937,35 @@ public String toString() {
}
}

public static class NonNullValidator implements Validator {
@Override
public void ensureValid(String name, Object value) {
if (value == null) {
// Pass in the string null to avoid the findbugs warning
throw new ConfigException(name, "null", "entry must be non null");
}
}
}

public static class CompositeValidator implements Validator {
private final List<Validator> validators;

private CompositeValidator(List<Validator> validators) {
this.validators = Collections.unmodifiableList(validators);
}

public static CompositeValidator of(Validator... validators) {
return new CompositeValidator(Arrays.asList(validators));
}

@Override
public void ensureValid(String name, Object value) {
for (Validator validator: validators) {
validator.ensureValid(name, value);
}
}
}

public static class NonEmptyString implements Validator {

@Override
Expand Down
Expand Up @@ -89,12 +89,12 @@ public void configure(Map<String, ?> configs) throws KafkaException {

@SuppressWarnings("unchecked")
List<String> cipherSuitesList = (List<String>) configs.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
if (cipherSuitesList != null)
if (cipherSuitesList != null && !cipherSuitesList.isEmpty())
this.cipherSuites = cipherSuitesList.toArray(new String[cipherSuitesList.size()]);

@SuppressWarnings("unchecked")
List<String> enabledProtocolsList = (List<String>) configs.get(SslConfigs.SSL_ENABLED_PROTOCOLS_CONFIG);
if (enabledProtocolsList != null)
if (enabledProtocolsList != null && !enabledProtocolsList.isEmpty())
this.enabledProtocols = enabledProtocolsList.toArray(new String[enabledProtocolsList.size()]);

String endpointIdentification = (String) configs.get(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG);
Expand Down
Expand Up @@ -54,6 +54,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -464,6 +465,7 @@ public static <T> String join(T[] strs, String separator) {
* @return The string representation.
*/
public static <T> String join(Collection<T> list, String separator) {
Objects.requireNonNull(list);
StringBuilder sb = new StringBuilder();
Iterator<T> iter = list.iterator();
while (iter.hasNext()) {
Expand Down
Expand Up @@ -1615,7 +1615,7 @@ private KafkaConsumer<String, String> newConsumer(Time time,

OffsetResetStrategy autoResetStrategy = OffsetResetStrategy.EARLIEST;
List<PartitionAssignor> assignors = Arrays.asList(assignor);
ConsumerInterceptors<String, String> interceptors = null;
ConsumerInterceptors<String, String> interceptors = new ConsumerInterceptors<>(Collections.<ConsumerInterceptor<String, String>>emptyList());

Metrics metrics = new Metrics();
ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricGroupPrefix);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.junit.Test;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -50,6 +51,21 @@ public void testConfiguredInstances() {
testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
}

@Test
public void testEmptyList() {
AbstractConfig conf;
ConfigDef configDef = new ConfigDef().define("a", Type.LIST, "", new ConfigDef.NonNullValidator(), Importance.HIGH, "doc");

conf = new AbstractConfig(configDef, Collections.emptyMap());
assertEquals(Collections.emptyList(), conf.getList("a"));

conf = new AbstractConfig(configDef, Collections.singletonMap("a", ""));
assertEquals(Collections.emptyList(), conf.getList("a"));

conf = new AbstractConfig(configDef, Collections.singletonMap("a", "b,c,d"));
assertEquals(Arrays.asList("b", "c", "d"), conf.getList("a"));
}

@Test
public void testOriginalsWithPrefix() {
Properties props = new Properties();
Expand Down
Expand Up @@ -158,6 +158,8 @@ public void testValidators() {
testValidators(Type.STRING, ValidString.in("good", "values", "default"), "default",
new Object[]{"good", "values", "default"}, new Object[]{"bad", "inputs", null});
testValidators(Type.LIST, ConfigDef.ValidList.in("1", "2", "3"), "1", new Object[]{"1", "2", "3"}, new Object[]{"4", "5", "6"});
testValidators(Type.STRING, new ConfigDef.NonNullValidator(), "a", new Object[]{"abb"}, new Object[] {null});
testValidators(Type.STRING, ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), ValidString.in("a", "b")), "a", new Object[]{"a", "b"}, new Object[] {null, -1, "c"});
}

@Test
Expand Down
Expand Up @@ -101,16 +101,15 @@ public static ConfigDef configDef() {
.define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
.define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
.define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
.define(TRANSFORMS_CONFIG, Type.LIST, null, new ConfigDef.Validator() {
.define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.Validator() {
@Override
public void ensureValid(String name, Object value) {
if (value == null) return;
final List<String> transformAliases = (List<String>) value;
if (transformAliases.size() > new HashSet<>(transformAliases).size()) {
throw new ConfigException(name, value, "Duplicate alias provided.");
}
}
}, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
}), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
}

public ConnectorConfig(Plugins plugins) {
Expand Down Expand Up @@ -139,9 +138,6 @@ public Object get(String key) {
*/
public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
if (transformAliases == null || transformAliases.isEmpty()) {
return Collections.emptyList();
}

final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
Expand Down

0 comments on commit aa42a11

Please sign in to comment.