Skip to content

Commit

Permalink
spring-projectsGH-550: master to 2.2; fix tangles
Browse files Browse the repository at this point in the history
Fixes spring-projects#550

- package tangle `....listener` and `....listener.config` - remove config package
- class tangles between `ContainerProperties` and the listener containers
  - AckMode moved to properties
  - Error handler setters moved from properties to containers
  • Loading branch information
garyrussell committed Apr 3, 2018
1 parent d408c27 commit 64dd43f
Show file tree
Hide file tree
Showing 29 changed files with 292 additions and 242 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.1.5.BUILD-SNAPSHOT
version=2.2.0.BUILD-SNAPSHOT
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.listener.adapter.RecordFilterStrategy;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.converter.MessageConverter;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.support.RetryTemplate;
Expand All @@ -51,6 +52,8 @@ public abstract class AbstractKafkaListenerContainerFactory<C extends AbstractMe

private final ContainerProperties containerProperties = new ContainerProperties((Pattern) null);

private GenericErrorHandler<?> errorHandler;

private ConsumerFactory<K, V> consumerFactory;

private Boolean autoStartup;
Expand Down Expand Up @@ -191,6 +194,24 @@ public void setReplyTemplate(KafkaTemplate<K, V> replyTemplate) {
this.replyTemplate = replyTemplate;
}

/**
* Set the error handler to call when the listener throws an exception.
* @param errorHandler the error handler.
* @since 2.2
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Set the batch error handler to call when the listener throws an exception.
* @param errorHandler the error handler.
* @since 2.2
*/
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Obtain the properties template for this factory - set properties as needed
* and they will be copied to a final properties instance for the endpoint.
Expand Down Expand Up @@ -274,11 +295,8 @@ protected void initializeContainer(C instance) {
if (this.containerProperties.getAckTime() > 0) {
properties.setAckTime(this.containerProperties.getAckTime());
}
if (this.containerProperties.getGenericErrorHandler() instanceof BatchErrorHandler) {
properties.setBatchErrorHandler((BatchErrorHandler) this.containerProperties.getGenericErrorHandler());
}
else {
properties.setErrorHandler((ErrorHandler) this.containerProperties.getGenericErrorHandler());
if (this.errorHandler != null) {
instance.setGenericErrorHandler(this.errorHandler);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2016 the original author or authors.
* Copyright 2014-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,7 @@
import java.util.Collection;

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionInitialOffset;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

Expand All @@ -56,57 +55,6 @@ public abstract class AbstractMessageListenerContainer<K, V>

protected final Log logger = LogFactory.getLog(this.getClass()); // NOSONAR

/**
* The offset commit behavior enumeration.
*/
public enum AckMode {

/**
* Commit after each record is processed by the listener.
*/
RECORD,

/**
* Commit whatever has already been processed before the next poll.
*/
BATCH,

/**
* Commit pending updates after
* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
*/
TIME,

/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded.
*/
COUNT,

/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded or after {@link ContainerProperties#setAckTime(long)
* ackTime} has elapsed.
*/
COUNT_TIME,

/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}.
*/
MANUAL,

/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
*/
MANUAL_IMMEDIATE,

}

protected final ConsumerFactory<K, V> consumerFactory; // NOSONAR (final)

private final ContainerProperties containerProperties;
Expand All @@ -117,6 +65,8 @@ public enum AckMode {

private ApplicationEventPublisher applicationEventPublisher;

private GenericErrorHandler<?> errorHandler;

private boolean autoStartup = true;

private int phase = DEFAULT_PHASE;
Expand Down Expand Up @@ -168,12 +118,6 @@ else if (containerProperties.getTopicPattern() != null) {
if (this.containerProperties.getConsumerRebalanceListener() == null) {
this.containerProperties.setConsumerRebalanceListener(createSimpleLoggingConsumerRebalanceListener());
}
if (containerProperties.getGenericErrorHandler() instanceof BatchErrorHandler) {
this.containerProperties.setBatchErrorHandler((BatchErrorHandler) containerProperties.getGenericErrorHandler());
}
else {
this.containerProperties.setErrorHandler((ErrorHandler) containerProperties.getGenericErrorHandler());
}
}

@Override
Expand All @@ -194,6 +138,42 @@ public ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}

/**
* Set the error handler to call when the listener throws an exception.
* @param errorHandler the error handler.
* @since 2.2
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Set the error handler to call when the listener throws an exception.
* @param errorHandler the error handler.
* @since 2.2
*/
public void setGenericErrorHandler(GenericErrorHandler<?> errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Set the batch error handler to call when the listener throws an exception.
* @param errorHandler the error handler.
* @since 2.2
*/
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Get the configured error handler.
* @return the error handler.
* @since 2.2
*/
protected GenericErrorHandler<?> getGenericErrorHandler() {
return this.errorHandler;
}

@Override
public boolean isAutoStartup() {
return this.autoStartup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.common.TopicPartition;

import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -161,6 +160,7 @@ protected void doStart() {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix("-" + i);
container.setGenericErrorHandler(getGenericErrorHandler());
container.start();
this.containers.add(container);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package org.springframework.kafka.listener.config;
package org.springframework.kafka.listener;

import java.util.Arrays;
import java.util.LinkedHashSet;
Expand All @@ -24,11 +24,6 @@
import org.apache.kafka.clients.consumer.OffsetCommitCallback;

import org.springframework.core.task.AsyncListenableTaskExecutor;
import org.springframework.kafka.listener.AbstractMessageListenerContainer;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
import org.springframework.kafka.listener.BatchErrorHandler;
import org.springframework.kafka.listener.ErrorHandler;
import org.springframework.kafka.listener.GenericErrorHandler;
import org.springframework.kafka.support.LogIfLevelEnabled;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.scheduling.TaskScheduler;
Expand All @@ -46,6 +41,57 @@
*/
public class ContainerProperties {

/**
* The offset commit behavior enumeration.
*/
public enum AckMode {

/**
* Commit after each record is processed by the listener.
*/
RECORD,

/**
* Commit whatever has already been processed before the next poll.
*/
BATCH,

/**
* Commit pending updates after
* {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
*/
TIME,

/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded.
*/
COUNT,

/**
* Commit pending updates after
* {@link ContainerProperties#setAckCount(int) ackCount} has been
* exceeded or after {@link ContainerProperties#setAckTime(long)
* ackTime} has elapsed.
*/
COUNT_TIME,

/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}.
*/
MANUAL,

/**
* User takes responsibility for acks using an
* {@link AcknowledgingMessageListener}. The consumer
* immediately processes the commit.
*/
MANUAL_IMMEDIATE,

}

private static final long DEFAULT_POLL_TIMEOUT = 1000L;

private static final int DEFAULT_SHUTDOWN_TIMEOUT = 10000;
Expand Down Expand Up @@ -82,7 +128,7 @@ public class ContainerProperties {
* {@link org.springframework.kafka.listener.AcknowledgingMessageListener}.
* </ul>
*/
private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
private AckMode ackMode = AckMode.BATCH;

/**
* The number of outstanding record count after which offsets should be
Expand Down Expand Up @@ -114,11 +160,6 @@ public class ContainerProperties {
*/
private AsyncListenableTaskExecutor consumerTaskExecutor;

/**
* The error handler to call when the listener throws an exception.
*/
private GenericErrorHandler<?> errorHandler;

/**
* The timeout for shutting down the container. This is the maximum amount of
* time that the invocation to {@code #stop(Runnable)} will block for, before
Expand Down Expand Up @@ -209,7 +250,7 @@ public void setMessageListener(Object messageListener) {
* </ul>
* @param ackMode the {@link AckMode}; default BATCH.
*/
public void setAckMode(AbstractMessageListenerContainer.AckMode ackMode) {
public void setAckMode(AckMode ackMode) {
Assert.notNull(ackMode, "'ackMode' cannot be null");
this.ackMode = ackMode;
}
Expand Down Expand Up @@ -243,22 +284,6 @@ public void setAckTime(long ackTime) {
this.ackTime = ackTime;
}

/**
* Set the error handler to call when the listener throws an exception.
* @param errorHandler the error handler.
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Set the batch error handler to call when the listener throws an exception.
* @param errorHandler the error handler.
*/
public void setBatchErrorHandler(BatchErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}

/**
* Set the executor for threads that poll the consumer.
* @param consumerTaskExecutor the executor
Expand Down Expand Up @@ -355,7 +380,7 @@ public TopicPartitionInitialOffset[] getTopicPartitions() {
return this.topicPartitions;
}

public AbstractMessageListenerContainer.AckMode getAckMode() {
public AckMode getAckMode() {
return this.ackMode;
}

Expand All @@ -379,10 +404,6 @@ public AsyncListenableTaskExecutor getConsumerTaskExecutor() {
return this.consumerTaskExecutor;
}

public GenericErrorHandler<?> getGenericErrorHandler() {
return this.errorHandler;
}

public long getShutdownTimeout() {
return this.shutdownTimeout;
}
Expand Down Expand Up @@ -542,7 +563,6 @@ public String toString() {
+ ", pollTimeout=" + this.pollTimeout
+ (this.consumerTaskExecutor != null
? ", consumerTaskExecutor=" + this.consumerTaskExecutor : "")
+ (this.errorHandler != null ? ", errorHandler=" + this.errorHandler : "")
+ ", shutdownTimeout=" + this.shutdownTimeout
+ (this.consumerRebalanceListener != null
? ", consumerRebalanceListener=" + this.consumerRebalanceListener : "")
Expand Down

0 comments on commit 64dd43f

Please sign in to comment.