Skip to content

Commit

Permalink
Support Spring Boot KafkaProperties
Browse files Browse the repository at this point in the history
 - If KafkaProperties for the KafkaAutoConfiguration is set, then use those properties for the KafkaMessageChannelBinder
 - For the KafkaProperties that have explicit default, override with the KafkaMessageChannelBinder defaults when the properties are not set by any of the property sources
 - Support the existing Kafka Producer/Consumer properties if they are set
 - Add tests

Resolves spring-cloud#73
  • Loading branch information
ilayaperumalg committed Feb 13, 2017
1 parent 8fafc26 commit 3ab636e
Show file tree
Hide file tree
Showing 6 changed files with 197 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,77 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;

/**
* An {@link EnvironmentPostProcessor} that sets some common configuration properties (log config etc.,) for Kafka
* binder.
* binder.
*
* @author Ilayaperumal Gopinathan
*/
public class KafkaBinderEnvironmentPostProcessor implements EnvironmentPostProcessor {

public final static String SPRING_KAFKA = "spring.kafka";

public final static String SPRING_KAFKA_PRODUCER = SPRING_KAFKA + ".producer";

public final static String SPRING_KAFKA_CONSUMER = SPRING_KAFKA + ".consumer";

public final static String SPRING_KAFKA_PRODUCER_KEY_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "keySerializer";

public final static String SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "valueSerializer";

public final static String SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "keyDeserializer";

public final static String SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "valueDeserializer";

public final static String SPRING_KAFKA_BOOTSTRAP_SERVERS = SPRING_KAFKA + "." + "bootstrapServers";

@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
Map<String, Object> propertiesToAdd = new HashMap<>();
propertiesToAdd.put("logging.pattern.console", "%d{ISO8601} %5p %t %c{2}:%L - %m%n");
propertiesToAdd.put("logging.level.org.I0Itec.zkclient", "ERROR");
propertiesToAdd.put("logging.level.kafka.server.KafkaConfig", "ERROR");
propertiesToAdd.put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
environment.getPropertySources().addLast(new MapPropertySource("kafkaBinderLogConfig", propertiesToAdd));
Map<String, Object> logProperties = new HashMap<>();
logProperties.put("logging.pattern.console", "%d{ISO8601} %5p %t %c{2}:%L - %m%n");
logProperties.put("logging.level.org.I0Itec.zkclient", "ERROR");
logProperties.put("logging.level.kafka.server.KafkaConfig", "ERROR");
logProperties.put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
environment.getPropertySources().addLast(new MapPropertySource("kafkaBinderLogConfig", logProperties));
Map<String, Object> binderConfig = new HashMap<>();
if (environment.getProperty(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER) != null) {
binderConfig.put(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER, environment.getProperty(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER));
}
else {
binderConfig.put(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class);
}
if (environment.getProperty(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER) != null) {
binderConfig.put(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER, environment.getProperty(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER));
}
else {
binderConfig.put(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class);
}
if (environment.getProperty(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER) != null) {
binderConfig.put(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER, environment.getProperty(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER));
}
else {
binderConfig.put(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER, ByteArrayDeserializer.class);
}
if (environment.getProperty(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER) != null) {
binderConfig.put(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER, environment.getProperty(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER));
}
else {
binderConfig.put(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER, ByteArrayDeserializer.class);
}
if (environment.getProperty(SPRING_KAFKA_BOOTSTRAP_SERVERS) != null) {
binderConfig.put(SPRING_KAFKA_BOOTSTRAP_SERVERS, environment.getProperty(SPRING_KAFKA_BOOTSTRAP_SERVERS));
}
else {
binderConfig.put(SPRING_KAFKA_BOOTSTRAP_SERVERS, "");
}
environment.getPropertySources().addLast(new MapPropertySource("kafkaBinderConfig", binderConfig));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,7 @@
import org.apache.kafka.common.utils.Utils;

import org.springframework.beans.factory.DisposableBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderException;
Expand Down Expand Up @@ -110,6 +111,8 @@ public class KafkaMessageChannelBinder extends

private AdminUtilsOperation adminUtilsOperation;

private KafkaProperties kafkaProperties;

public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties) {
super(false, headersToMap(configurationProperties));
this.configurationProperties = configurationProperties;
Expand Down Expand Up @@ -148,6 +151,10 @@ public void setExtendedBindingProperties(KafkaExtendedBindingProperties extended
this.extendedBindingProperties = extendedBindingProperties;
}

public void setKafkaProperties(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}

@Override
public void onInit() throws Exception {

Expand Down Expand Up @@ -195,7 +202,7 @@ public KafkaProducerProperties getExtendedProducerProperties(String channelName)

@Override
protected MessageHandler createProducerMessageHandler(final String destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {

KafkaTopicUtils.validateTopicName(destination);
createTopicsIfAutoCreateEnabledAndAdminUtilsPresent(destination, producerProperties.getPartitionCount());
Expand All @@ -221,7 +228,7 @@ protected MessageHandler createProducerMessageHandler(final String destination,

@Override
protected String createProducerDestinationIfNecessary(String name,
ExtendedProducerProperties<KafkaProducerProperties> properties) {
ExtendedProducerProperties<KafkaProducerProperties> properties) {
if (this.logger.isInfoEnabled()) {
this.logger.info("Using kafka topic for outbound: " + name);
}
Expand All @@ -242,20 +249,27 @@ protected String createProducerDestinationIfNecessary(String name,
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
Map<String, Object> props = new HashMap<>();
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(this.configurationProperties.getRequiredAcks()));
props.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
if (this.kafkaProperties != null) {
if (!this.kafkaProperties.getBootstrapServers().isEmpty()) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperties.getBootstrapServers());
}
props.putAll(this.kafkaProperties.getProducer().buildProperties());
props.putAll(this.kafkaProperties.getProperties());
}
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
Expand All @@ -264,7 +278,7 @@ private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(

@Override
protected Collection<PartitionInfo> createConsumerDestinationIfNecessary(String name, String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
KafkaTopicUtils.validateTopicName(name);
if (properties.getInstanceCount() == 0) {
throw new IllegalArgumentException("Instance count cannot be zero");
Expand Down Expand Up @@ -295,24 +309,20 @@ protected Collection<PartitionInfo> createConsumerDestinationIfNecessary(String
@Override
@SuppressWarnings("unchecked")
protected MessageProducer createConsumerEndpoint(String name, String group, Collection<PartitionInfo> destination,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
Assert.isTrue(!anonymous || !consumerProperties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
props.putAll(properties.getExtension().getConfiguration());
}
ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup, consumerProperties);
Collection<PartitionInfo> listenedPartitions = destination;
Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
listenedPartitions);
final ContainerProperties containerProperties =
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(name)
anonymous || consumerProperties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(name)
: new ContainerProperties(topicPartitionInitialOffsets);
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
int concurrency = Math.min(consumerProperties.getConcurrency(), listenedPartitions.size());
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
new ConcurrentMessageListenerContainer(
consumerFactory, containerProperties) {
Expand All @@ -323,8 +333,8 @@ public void stop(Runnable callback) {
}
};
messageListenerContainer.setConcurrency(concurrency);
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(properties));
if (!properties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(consumerProperties));
if (!consumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
}
if (this.logger.isDebugEnabled()) {
Expand All @@ -339,9 +349,9 @@ public void stop(Runnable callback) {
new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
final RetryTemplate retryTemplate = buildRetryTemplate(consumerProperties);
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
if (properties.getExtension().isEnableDlq()) {
if (consumerProperties.getExtension().isEnableDlq()) {
final String dlqTopic = "error." + name + "." + group;
initDlqProducer();
messageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler() {
Expand Down Expand Up @@ -381,20 +391,30 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
return kafkaMessageDrivenChannelAdapter;
}

private Map<String, Object> getConsumerConfig(boolean anonymous, String consumerGroup) {
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
return props;
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
if (this.kafkaProperties != null) {
if (!this.kafkaProperties.getBootstrapServers().isEmpty()) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperties.getBootstrapServers());
}
props.putAll(this.kafkaProperties.getConsumer().buildProperties());
props.putAll(this.kafkaProperties.getProperties());
}
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) {
props.putAll(consumerProperties.getExtension().getConfiguration());
}
return new DefaultKafkaConsumerFactory<>(props);
}

private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
Expand Down Expand Up @@ -477,7 +497,7 @@ public Object doWithRetry(RetryContext context) throws RuntimeException {
catch (Exception e) {
String exceptionClass = e.getClass().getName();
if (exceptionClass.equals("kafka.common.TopicExistsException") ||
exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")){
exceptionClass.equals("org.apache.kafka.common.errors.TopicExistsException")) {
if (logger.isWarnEnabled()) {
logger.warn("Attempt to create topic: " + topicName + ". Topic already exists.");
}
Expand Down Expand Up @@ -544,6 +564,13 @@ private synchronized void initDlqProducer() {
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
if (this.kafkaProperties != null) {
if (!this.kafkaProperties.getBootstrapServers().isEmpty()) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperties.getBootstrapServers());
}
props.putAll(this.kafkaProperties.getProducer().buildProperties());
props.putAll(this.kafkaProperties.getProperties());
}
DefaultKafkaProducerFactory<byte[], byte[]> defaultKafkaProducerFactory =
new DefaultKafkaProducerFactory<>(props);
this.dlqProducer = defaultKafkaProducerFactory.createProducer();
Expand Down Expand Up @@ -571,8 +598,8 @@ private final class ProducerConfigurationMessageHandler extends KafkaProducerMes
private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;

private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,8 +26,10 @@
*/
public class KafkaProducerProperties {

@Deprecated
private int bufferSize = 16384;

@Deprecated
private CompressionType compressionType = CompressionType.none;

private boolean sync;
Expand Down Expand Up @@ -80,6 +82,7 @@ public void setConfiguration(Map<String, String> configuration) {
public enum CompressionType {
none,
gzip,
snappy
snappy,
lz4
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,6 +26,7 @@
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
Expand Down Expand Up @@ -76,6 +77,9 @@ public class KafkaBinderConfiguration {
@Autowired
private ProducerListener producerListener;

@Autowired(required = false)
private KafkaProperties kafkaProperties;

@Autowired
private ApplicationContext context;

Expand All @@ -90,6 +94,7 @@ KafkaMessageChannelBinder kafkaMessageChannelBinder() {
kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
kafkaMessageChannelBinder.setAdminUtilsOperation(adminUtilsOperation);
kafkaMessageChannelBinder.setKafkaProperties(kafkaProperties);
return kafkaMessageChannelBinder;
}

Expand Down

0 comments on commit 3ab636e

Please sign in to comment.