Skip to content

Commit

Permalink
Support Spring Boot KafkaProperties
Browse files Browse the repository at this point in the history
 - If KafkaProperties for the KafkaAutoConfiguration is set, then use those properties for the KafkaMessageChannelBinder
 - For the KafkaProperties that have explicit default, override with the KafkaMessageChannelBinder defaults when the properties are not set by any of the property sources
 - Support the existing Kafka Producer/Consumer properties if they are set
 - Add tests

Resolves spring-cloud#73
  • Loading branch information
ilayaperumalg committed Feb 15, 2017
1 parent dc0bc18 commit 7a822ce
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 the original author or authors.
* Copyright 2016-2017 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,8 +26,10 @@
*/
public class KafkaProducerProperties {

@Deprecated
private int bufferSize = 16384;

@Deprecated
private CompressionType compressionType = CompressionType.none;

private boolean sync;
Expand Down Expand Up @@ -80,6 +82,7 @@ public void setConfiguration(Map<String, String> configuration) {
public enum CompressionType {
none,
gzip,
snappy
snappy,
lz4
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,77 @@
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;

/**
* An {@link EnvironmentPostProcessor} that sets some common configuration properties (log config etc.,) for Kafka
* binder.
* binder.
*
* @author Ilayaperumal Gopinathan
*/
public class KafkaBinderEnvironmentPostProcessor implements EnvironmentPostProcessor {

public final static String SPRING_KAFKA = "spring.kafka";

public final static String SPRING_KAFKA_PRODUCER = SPRING_KAFKA + ".producer";

public final static String SPRING_KAFKA_CONSUMER = SPRING_KAFKA + ".consumer";

public final static String SPRING_KAFKA_PRODUCER_KEY_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "keySerializer";

public final static String SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER = SPRING_KAFKA_PRODUCER + "." + "valueSerializer";

public final static String SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "keyDeserializer";

public final static String SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER = SPRING_KAFKA_CONSUMER + "." + "valueDeserializer";

public final static String SPRING_KAFKA_BOOTSTRAP_SERVERS = SPRING_KAFKA + "." + "bootstrapServers";

@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
Map<String, Object> propertiesToAdd = new HashMap<>();
propertiesToAdd.put("logging.pattern.console", "%d{ISO8601} %5p %t %c{2}:%L - %m%n");
propertiesToAdd.put("logging.level.org.I0Itec.zkclient", "ERROR");
propertiesToAdd.put("logging.level.kafka.server.KafkaConfig", "ERROR");
propertiesToAdd.put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
environment.getPropertySources().addLast(new MapPropertySource("kafkaBinderLogConfig", propertiesToAdd));
Map<String, Object> logProperties = new HashMap<>();
logProperties.put("logging.pattern.console", "%d{ISO8601} %5p %t %c{2}:%L - %m%n");
logProperties.put("logging.level.org.I0Itec.zkclient", "ERROR");
logProperties.put("logging.level.kafka.server.KafkaConfig", "ERROR");
logProperties.put("logging.level.kafka.admin.AdminClient.AdminConfig", "ERROR");
environment.getPropertySources().addLast(new MapPropertySource("kafkaBinderLogConfig", logProperties));
Map<String, Object> binderConfig = new HashMap<>();
if (environment.getProperty(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER) != null) {
binderConfig.put(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER, environment.getProperty(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER));
}
else {
binderConfig.put(SPRING_KAFKA_PRODUCER_KEY_SERIALIZER, ByteArraySerializer.class);
}
if (environment.getProperty(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER) != null) {
binderConfig.put(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER, environment.getProperty(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER));
}
else {
binderConfig.put(SPRING_KAFKA_PRODUCER_VALUE_SERIALIZER, ByteArraySerializer.class);
}
if (environment.getProperty(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER) != null) {
binderConfig.put(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER, environment.getProperty(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER));
}
else {
binderConfig.put(SPRING_KAFKA_CONSUMER_KEY_DESERIALIZER, ByteArrayDeserializer.class);
}
if (environment.getProperty(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER) != null) {
binderConfig.put(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER, environment.getProperty(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER));
}
else {
binderConfig.put(SPRING_KAFKA_CONSUMER_VALUE_DESERIALIZER, ByteArrayDeserializer.class);
}
if (environment.getProperty(SPRING_KAFKA_BOOTSTRAP_SERVERS) != null) {
binderConfig.put(SPRING_KAFKA_BOOTSTRAP_SERVERS, environment.getProperty(SPRING_KAFKA_BOOTSTRAP_SERVERS));
}
else {
binderConfig.put(SPRING_KAFKA_BOOTSTRAP_SERVERS, "");
}
environment.getPropertySources().addLast(new MapPropertySource("kafkaBinderConfig", binderConfig));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.utils.Utils;

import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.BinderHeaders;
Expand Down Expand Up @@ -97,8 +98,10 @@ public class KafkaMessageChannelBinder extends

private final Map<String, Collection<PartitionInfo>> topicsInUse = new HashMap<>();

private KafkaProperties kafkaProperties;

public KafkaMessageChannelBinder(KafkaBinderConfigurationProperties configurationProperties,
KafkaTopicProvisioner provisioningProvider) {
KafkaTopicProvisioner provisioningProvider) {
super(false, headersToMap(configurationProperties), provisioningProvider);
this.configurationProperties = configurationProperties;
}
Expand Down Expand Up @@ -127,6 +130,10 @@ public void setProducerListener(ProducerListener<byte[], byte[]> producerListene
this.producerListener = producerListener;
}

public void setKafkaProperties(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}

Map<String, Collection<PartitionInfo>> getTopicsInUse() {
return this.topicsInUse;
}
Expand All @@ -143,7 +150,7 @@ public KafkaProducerProperties getExtendedProducerProperties(String channelName)

@Override
protected MessageHandler createProducerMessageHandler(final ProducerDestination destination,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) throws Exception {
final DefaultKafkaProducerFactory<byte[], byte[]> producerFB = getProducerFactory(producerProperties);
Collection<PartitionInfo> partitions = provisioningProvider.getPartitionsForTopic(producerProperties.getPartitionCount(),
new Callable<Collection<PartitionInfo>>() {
Expand Down Expand Up @@ -171,20 +178,27 @@ public Collection<PartitionInfo> call() throws Exception {
private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
ExtendedProducerProperties<KafkaProducerProperties> producerProperties) {
Map<String, Object> props = new HashMap<>();
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, String.valueOf(producerProperties.getExtension().getBufferSize()));
props.put(ProducerConfig.ACKS_CONFIG, String.valueOf(this.configurationProperties.getRequiredAcks()));
props.put(ProducerConfig.LINGER_MS_CONFIG,
String.valueOf(producerProperties.getExtension().getBatchTimeout()));
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,
producerProperties.getExtension().getCompressionType().toString());
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
if (this.kafkaProperties != null) {
if (!this.kafkaProperties.getBootstrapServers().isEmpty()) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperties.getBootstrapServers());
}
props.putAll(this.kafkaProperties.getProducer().buildProperties());
props.putAll(this.kafkaProperties.getProperties());
}
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
if (!ObjectUtils.isEmpty(producerProperties.getExtension().getConfiguration())) {
props.putAll(producerProperties.getExtension().getConfiguration());
}
Expand All @@ -194,51 +208,42 @@ private DefaultKafkaProducerFactory<byte[], byte[]> getProducerFactory(
@Override
@SuppressWarnings("unchecked")
protected MessageProducer createConsumerEndpoint(final ConsumerDestination destination, final String group,
ExtendedConsumerProperties<KafkaConsumerProperties> properties) {

ExtendedConsumerProperties<KafkaConsumerProperties> extendedConsumerProperties) {
boolean anonymous = !StringUtils.hasText(group);
Assert.isTrue(!anonymous || !properties.getExtension().isEnableDlq(),
Assert.isTrue(!anonymous || !extendedConsumerProperties.getExtension().isEnableDlq(),
"DLQ support is not available for anonymous subscriptions");
String consumerGroup = anonymous ? "anonymous." + UUID.randomUUID().toString() : group;
Map<String, Object> props = getConsumerConfig(anonymous, consumerGroup);
if (!ObjectUtils.isEmpty(properties.getExtension().getConfiguration())) {
props.putAll(properties.getExtension().getConfiguration());
}
final ConsumerFactory<?, ?> consumerFactory = new DefaultKafkaConsumerFactory<>(props);
int partitionCount = properties.getInstanceCount() * properties.getConcurrency();

final ConsumerFactory<?, ?> consumerFactory = createKafkaConsumerFactory(anonymous, consumerGroup, extendedConsumerProperties);
int partitionCount = extendedConsumerProperties.getInstanceCount() * extendedConsumerProperties.getConcurrency();
Collection<PartitionInfo> allPartitions = provisioningProvider.getPartitionsForTopic(partitionCount,
new Callable<Collection<PartitionInfo>>() {
@Override
public Collection<PartitionInfo> call() throws Exception {
return consumerFactory.createConsumer().partitionsFor(destination.getName());
}
});

Collection<PartitionInfo> listenedPartitions;

if (properties.getExtension().isAutoRebalanceEnabled() ||
properties.getInstanceCount() == 1) {
if (extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ||
extendedConsumerProperties.getInstanceCount() == 1) {
listenedPartitions = allPartitions;
}
else {
listenedPartitions = new ArrayList<>();
for (PartitionInfo partition : allPartitions) {
// divide partitions across modules
if ((partition.partition() % properties.getInstanceCount()) == properties.getInstanceIndex()) {
if ((partition.partition() % extendedConsumerProperties.getInstanceCount()) == extendedConsumerProperties.getInstanceIndex()) {
listenedPartitions.add(partition);
}
}
}
this.topicsInUse.put(destination.getName(), listenedPartitions);

Assert.isTrue(!CollectionUtils.isEmpty(listenedPartitions), "A list of partitions must be provided");
final TopicPartitionInitialOffset[] topicPartitionInitialOffsets = getTopicPartitionInitialOffsets(
listenedPartitions);
final ContainerProperties containerProperties =
anonymous || properties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(destination.getName())
anonymous || extendedConsumerProperties.getExtension().isAutoRebalanceEnabled() ? new ContainerProperties(destination.getName())
: new ContainerProperties(topicPartitionInitialOffsets);
int concurrency = Math.min(properties.getConcurrency(), listenedPartitions.size());
int concurrency = Math.min(extendedConsumerProperties.getConcurrency(), listenedPartitions.size());
final ConcurrentMessageListenerContainer<?, ?> messageListenerContainer =
new ConcurrentMessageListenerContainer(
consumerFactory, containerProperties) {
Expand All @@ -249,8 +254,8 @@ public void stop(Runnable callback) {
}
};
messageListenerContainer.setConcurrency(concurrency);
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(properties));
if (!properties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckOnError(isAutoCommitOnError(extendedConsumerProperties));
if (!extendedConsumerProperties.getExtension().isAutoCommitOffset()) {
messageListenerContainer.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
}
if (this.logger.isDebugEnabled()) {
Expand All @@ -265,9 +270,9 @@ public void stop(Runnable callback) {
new KafkaMessageDrivenChannelAdapter<>(
messageListenerContainer);
kafkaMessageDrivenChannelAdapter.setBeanFactory(this.getBeanFactory());
final RetryTemplate retryTemplate = buildRetryTemplate(properties);
final RetryTemplate retryTemplate = buildRetryTemplate(extendedConsumerProperties);
kafkaMessageDrivenChannelAdapter.setRetryTemplate(retryTemplate);
if (properties.getExtension().isEnableDlq()) {
if (extendedConsumerProperties.getExtension().isEnableDlq()) {
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory = getProducerFactory(new ExtendedProducerProperties<>(new KafkaProducerProperties()));
final KafkaTemplate<byte[], byte[]> kafkaTemplate = new KafkaTemplate<>(producerFactory);
messageListenerContainer.getContainerProperties().setErrorHandler(new ErrorHandler() {
Expand Down Expand Up @@ -308,20 +313,30 @@ public void onSuccess(SendResult<byte[], byte[]> result) {
return kafkaMessageDrivenChannelAdapter;
}

private Map<String, Object> getConsumerConfig(boolean anonymous, String consumerGroup) {
private ConsumerFactory<?, ?> createKafkaConsumerFactory(boolean anonymous, String consumerGroup,
ExtendedConsumerProperties<KafkaConsumerProperties> consumerProperties) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, anonymous ? "latest" : "earliest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
return props;
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configurationProperties.getKafkaConnectionString());
if (this.kafkaProperties != null) {
if (!this.kafkaProperties.getBootstrapServers().isEmpty()) {
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaProperties.getBootstrapServers());
}
props.putAll(this.kafkaProperties.getConsumer().buildProperties());
props.putAll(this.kafkaProperties.getProperties());
}
if (!ObjectUtils.isEmpty(configurationProperties.getConfiguration())) {
props.putAll(configurationProperties.getConfiguration());
}
if (!ObjectUtils.isEmpty(consumerProperties.getExtension().getConfiguration())) {
props.putAll(consumerProperties.getExtension().getConfiguration());
}
return new DefaultKafkaConsumerFactory<>(props);
}

private boolean isAutoCommitOnError(ExtendedConsumerProperties<KafkaConsumerProperties> properties) {
Expand Down Expand Up @@ -358,8 +373,8 @@ private final class ProducerConfigurationMessageHandler extends KafkaProducerMes
private final DefaultKafkaProducerFactory<byte[], byte[]> producerFactory;

private ProducerConfigurationMessageHandler(KafkaTemplate<byte[], byte[]> kafkaTemplate, String topic,
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
ExtendedProducerProperties<KafkaProducerProperties> producerProperties,
DefaultKafkaProducerFactory<byte[], byte[]> producerFactory) {
super(kafkaTemplate);
setTopicExpression(new LiteralExpression(topic));
setBeanFactory(KafkaMessageChannelBinder.this.getBeanFactory());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.springframework.boot.autoconfigure.PropertyPlaceholderAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.binder.Binder;
import org.springframework.cloud.stream.binder.kafka.KafkaBinderHealthIndicator;
Expand All @@ -39,7 +40,6 @@
import org.springframework.cloud.stream.binder.kafka.properties.KafkaExtendedBindingProperties;
import org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner;
import org.springframework.cloud.stream.config.codec.kryo.KryoCodecAutoConfiguration;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Condition;
Expand Down Expand Up @@ -79,8 +79,8 @@ public class KafkaBinderConfiguration {
@Autowired
private ProducerListener producerListener;

@Autowired
private ApplicationContext context;
@Autowired(required = false)
private KafkaProperties kafkaProperties;

@Autowired (required = false)
private AdminUtilsOperation adminUtilsOperation;
Expand All @@ -97,6 +97,7 @@ KafkaMessageChannelBinder kafkaMessageChannelBinder() {
kafkaMessageChannelBinder.setCodec(this.codec);
kafkaMessageChannelBinder.setProducerListener(producerListener);
kafkaMessageChannelBinder.setExtendedBindingProperties(this.kafkaExtendedBindingProperties);
kafkaMessageChannelBinder.setKafkaProperties(kafkaProperties);
return kafkaMessageChannelBinder;
}

Expand Down Expand Up @@ -139,7 +140,7 @@ public boolean matches(ConditionContext conditionContext, AnnotatedTypeMetadata
return AppInfoParser.getVersion().startsWith("0.10");
}
}

static class Kafka09Present implements Condition {

@Override
Expand Down

0 comments on commit 7a822ce

Please sign in to comment.