Skip to content

Commit

Permalink
spring-projectsGH-1189: support Mono and Future
Browse files Browse the repository at this point in the history
* Support `Mono` and `Future`
* Support auto ack at async return scenario when manual commit
* Support `KafkaListenerErrorHandler`
* Add warn log if the container is not configured for out-of-order manual commit
* Add async return test in `BatchMessagingMessageListenerAdapterTests`
  and `MessagingMessageListenerAdapterTests`
* Add unit test async listener with `@SendTo` in `AsyncListenerTests`
* Add `async-returns.adoc` and `whats-new.adoc`
  • Loading branch information
Zhiyang.Wang1 authored and Wzy19930507 committed Jan 19, 2024
1 parent 2e59574 commit b44b4fb
Show file tree
Hide file tree
Showing 9 changed files with 527 additions and 17 deletions.
@@ -0,0 +1,31 @@
[[async-returns]]
= Asynchronous `@KafkaListener` Return Types

`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types `CompletableFuture<?>` and `Mono<?>`, letting the reply be sent asynchronously.

[source, java]
----
@KafkaListener(id = "myListener", topics = "myTopic")
public CompletableFuture<String> listen(String data) {
...
CompletableFuture<String> future = new CompletableFuture<>();
future.complete("done");
return future;
}
----

[source, java]
----
@KafkaListener(id = "myListener", topics = "myTopic")
public Mono<Void> listen(String data) {
...
return Mono.empty();
}
----

IMPORTANT: The listener container factory must be configured with manual ack mode and async ack to enable out-of-order commits; instead, the asynchronous completion will ack or nack the message when the async operation completes.
When the async result is completed with an error, whether the message is recover or not depends on the container error handler.
If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover.

If a `KafkaListenerErrorHandler` is configured on a listener with an async return type, the error handler is invoked after a failure.
See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose.
Expand Up @@ -12,4 +12,10 @@ For changes in earlier version, see xref:appendix/change-history.adoc[Change His

A new `TransactionIdSuffixStrategy` interface was introduced to manage `transactional.id` suffix.
The default implementation is `DefaultTransactionIdSuffixStrategy` when setting `maxCache` greater than zero can reuse `transactional.id` within a specific range, otherwise suffixes will be generated on the fly by incrementing a counter.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.
See xref:kafka/transactions.adoc#transaction-id-suffix-fixed[Fixed TransactionIdSuffix] for more information.

[[x32-async-return]]
=== Async @KafkaListener Return

`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types `CompletableFuture<?>` and `Mono<?>`.
See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information.
Expand Up @@ -147,8 +147,8 @@ private String getReplyTopic() {
}
String topic = destinations.length == 1 ? destinations[0] : "";
BeanFactory beanFactory = getBeanFactory();
if (beanFactory instanceof ConfigurableListableBeanFactory) {
topic = ((ConfigurableListableBeanFactory) beanFactory).resolveEmbeddedValue(topic);
if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) {
topic = configurableListableBeanFactory.resolveEmbeddedValue(topic);
if (topic != null) {
topic = resolve(topic);
}
Expand Down
Expand Up @@ -3395,6 +3395,11 @@ public void nack(Duration sleep) {
ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis();
}

@Override
public boolean isAsyncAcks() {
return !ListenerConsumer.this.containerProperties.isAsyncAcks();
}

@Override
public String toString() {
return "Acknowledgment for " + KafkaUtils.format(this.cRecord);
Expand Down Expand Up @@ -3493,6 +3498,11 @@ public void nack(int index, Duration sleep) {
processAcks(new ConsumerRecords<K, V>(newRecords));
}

@Override
public boolean isAsyncAcks() {
return !ListenerConsumer.this.containerProperties.isAsyncAcks();
}

@Override
public String toString() {
return "Acknowledgment for " + this.records;
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import org.apache.commons.logging.LogFactory;
Expand All @@ -47,6 +48,7 @@
import org.springframework.kafka.listener.ConsumerSeekAware;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.listener.ListenerExecutionFailedException;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.KafkaNull;
Expand All @@ -66,9 +68,12 @@
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

import reactor.core.publisher.Mono;

/**
* An abstract {@link org.springframework.kafka.listener.MessageListener} adapter
* providing the necessary infrastructure to extract the payload of a
Expand All @@ -95,6 +100,9 @@ public abstract class MessagingMessageListenerAdapter<K, V> implements ConsumerS
*/
protected static final Message<KafkaNull> NULL_MESSAGE = new GenericMessage<>(KafkaNull.INSTANCE); // NOSONAR

private static final boolean monoPresent =
ClassUtils.isPresent("reactor.core.publisher.Mono", MessageListener.class.getClassLoader());

private final Object bean;

protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR
Expand Down Expand Up @@ -368,7 +376,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C
try {
Object result = invokeHandler(records, acknowledgment, message, consumer);
if (result != null) {
handleResult(result, records, message);
handleResult(result, records, acknowledgment, consumer, message);
}
}
catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control
Expand Down Expand Up @@ -436,19 +444,58 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me
* response message to the SendTo topic.
* @param resultArg the result object to handle (never <code>null</code>)
* @param request the original request message
* @param acknowledgment the acknowledgment to manual ack
* @param consumer the consumer to handler error
* @param source the source data for the method invocation - e.g.
* {@code o.s.messaging.Message<?>}; may be null
*/
protected void handleResult(Object resultArg, Object request, Object source) {
protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment,
Consumer<?, ?> consumer, @Nullable Message<?> source) {

this.logger.debug(() -> "Listener method returned result [" + resultArg
+ "] - generating response message for it");
boolean isInvocationResult = resultArg instanceof InvocationResult;
Object result = isInvocationResult ? ((InvocationResult) resultArg).getResult() : resultArg;
String replyTopic = evaluateReplyTopic(request, source, resultArg);
Assert.state(replyTopic == null || this.replyTemplate != null,
"a KafkaTemplate is required to support replies");
sendResponse(result, replyTopic, source, isInvocationResult
? ((InvocationResult) resultArg).isMessageReturnType() : this.messageReturnType);
Object result;
boolean messageReturnType;
if (resultArg instanceof InvocationResult invocationResult) {
result = invocationResult.getResult();
messageReturnType = invocationResult.isMessageReturnType();
}
else {
result = resultArg;
messageReturnType = this.messageReturnType;
}
if (result instanceof CompletableFuture<?> completable) {
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Future<?> return type; "
+ "otherwise the container will ack the message immediately");
}
completable.whenComplete((r, t) -> {
if (t == null) {
asyncSuccess(r, replyTopic, source, messageReturnType);
acknowledge(acknowledgment);
}
else {
asyncFailure(request, acknowledgment, consumer, t, source);
}
});
}
else if (monoPresent && result instanceof Mono<?> mono) {
if (acknowledgment == null || acknowledgment.isAsyncAcks()) {
this.logger.warn("Container 'Acknowledgment' must be async ack for Mono<?> return type; " +
"otherwise the container will ack the message immediately");
}
mono.subscribe(
r -> asyncSuccess(r, replyTopic, source, messageReturnType),
t -> asyncFailure(request, acknowledgment, consumer, t, source),
() -> acknowledge(acknowledgment)
);
}
else {
sendResponse(result, replyTopic, source, messageReturnType);
}
}

@Nullable
Expand Down Expand Up @@ -586,6 +633,37 @@ private void sendReplyForMessageSource(Object result, String topic, Message<?> s
this.replyTemplate.send(builder.build());
}

protected void asyncSuccess(@Nullable Object result, String replyTopic, Message<?> source, boolean returnTypeMessage) {
if (result == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("Async result is null, ignoring");
}
}
else {
sendResponse(result, replyTopic, source, returnTypeMessage);
}
}

protected void acknowledge(@Nullable Acknowledgment acknowledgment) {
if (acknowledgment != null) {
acknowledgment.acknowledge();
}
}

protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Throwable t, Message<?> source) {
try {
handleException(request, acknowledgment, consumer, source,
new ListenerExecutionFailedException(createMessagingErrorMessage(
"Async Fail", source.getPayload()), t));
return;
}
catch (Exception ex) {
}
this.logger.error(t, "Future, Mono, or suspend function was completed with an exception for " + source);
acknowledge(acknowledgment);
}

protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer<?, ?> consumer,
Message<?> message, ListenerExecutionFailedException e) {

Expand All @@ -596,7 +674,7 @@ protected void handleException(Object records, @Nullable Acknowledgment acknowle
}
Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment);
if (result != null) {
handleResult(result, records, message);
handleResult(result, records, acknowledgment, consumer, message);
}
}
catch (Exception ex) {
Expand Down
Expand Up @@ -81,4 +81,8 @@ default void nack(int index, Duration sleep) {
throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment");
}

default boolean isAsyncAcks() {
return false;
}

}

0 comments on commit b44b4fb

Please sign in to comment.