diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc new file mode 100644 index 0000000000..658e24d497 --- /dev/null +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/kafka/receiving-messages/async-returns.adoc @@ -0,0 +1,31 @@ +[[async-returns]] += Asynchronous `@KafkaListener` Return Types + +`@KafkaListener` (and `@KafkaHandler`) methods can be specified with asynchronous return types `CompletableFuture` and `Mono`, letting the reply be sent asynchronously. + +[source, java] +---- +@KafkaListener(id = "myListener", topics = "myTopic") +public CompletableFuture listen(String data) { + ... + CompletableFuture future = new CompletableFuture<>(); + future.complete("done"); + return future; +} +---- + +[source, java] +---- +@KafkaListener(id = "myListener", topics = "myTopic") +public Mono listen(String data) { + ... + return Mono.empty(); +} +---- + +IMPORTANT: The listener container factory must be configured with manual ack mode and async ack to enable out-of-order commits; instead, the asynchronous completion will ack or nack the message when the async operation completes. +When the async result is completed with an error, whether the message is recover or not depends on the container error handler. +If some exception occurs within the listener method that prevents creation of the async result object, you MUST catch that exception and return an appropriate return object that will cause the message to be ack or recover. + +If a `KafkaListenerErrorHandler` is configured on a listener with an async return type, the error handler is invoked after a failure. +See xref:kafka/annotation-error-handling.adoc[Handling Exceptions] for more information about this error handler and its purpose. diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 75142f4fe0..cb8c764aa3 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -7,3 +7,9 @@ This section covers the changes made from version 3.1 to version 3.2. For changes in earlier version, see xref:appendix/change-history.adoc[Change History]. + +[[x32-async-return]] +=== Async @KafkaListener Return + +`@KafkaListener` (and `@KafkaHandler`) methods can now return asynchronous return types `CompletableFuture` and `Mono`. +See xref:kafka/receiving-messages/async-returns.adoc[Async Returns] for more information. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java index 28e190674e..5e27af27cd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/MethodKafkaListenerEndpoint.java @@ -147,8 +147,8 @@ private String getReplyTopic() { } String topic = destinations.length == 1 ? destinations[0] : ""; BeanFactory beanFactory = getBeanFactory(); - if (beanFactory instanceof ConfigurableListableBeanFactory) { - topic = ((ConfigurableListableBeanFactory) beanFactory).resolveEmbeddedValue(topic); + if (beanFactory instanceof ConfigurableListableBeanFactory configurableListableBeanFactory) { + topic = configurableListableBeanFactory.resolveEmbeddedValue(topic); if (topic != null) { topic = resolve(topic); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index bd0523c1c5..d34970fb23 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -3395,6 +3395,11 @@ public void nack(Duration sleep) { ListenerConsumer.this.nackSleepDurationMillis = sleep.toMillis(); } + @Override + public boolean isAsyncAcks() { + return !ListenerConsumer.this.containerProperties.isAsyncAcks(); + } + @Override public String toString() { return "Acknowledgment for " + KafkaUtils.format(this.cRecord); @@ -3493,6 +3498,11 @@ public void nack(int index, Duration sleep) { processAcks(new ConsumerRecords(newRecords)); } + @Override + public boolean isAsyncAcks() { + return !ListenerConsumer.this.containerProperties.isAsyncAcks(); + } + @Override public String toString() { return "Acknowledgment for " + this.records; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java index 4bcbda9016..6b354eea7a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java @@ -26,6 +26,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.commons.logging.LogFactory; @@ -47,6 +48,7 @@ import org.springframework.kafka.listener.ConsumerSeekAware; import org.springframework.kafka.listener.KafkaListenerErrorHandler; import org.springframework.kafka.listener.ListenerExecutionFailedException; +import org.springframework.kafka.listener.MessageListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.kafka.support.KafkaNull; @@ -66,9 +68,12 @@ import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageBuilder; import org.springframework.util.Assert; +import org.springframework.util.ClassUtils; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import reactor.core.publisher.Mono; + /** * An abstract {@link org.springframework.kafka.listener.MessageListener} adapter * providing the necessary infrastructure to extract the payload of a @@ -95,6 +100,9 @@ public abstract class MessagingMessageListenerAdapter implements ConsumerS */ protected static final Message NULL_MESSAGE = new GenericMessage<>(KafkaNull.INSTANCE); // NOSONAR + private static final boolean monoPresent = + ClassUtils.isPresent("reactor.core.publisher.Mono", MessageListener.class.getClassLoader()); + private final Object bean; protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR @@ -368,7 +376,7 @@ protected void invoke(Object records, @Nullable Acknowledgment acknowledgment, C try { Object result = invokeHandler(records, acknowledgment, message, consumer); if (result != null) { - handleResult(result, records, message); + handleResult(result, records, acknowledgment, consumer, message); } } catch (ListenerExecutionFailedException e) { // NOSONAR ex flow control @@ -436,19 +444,58 @@ private RuntimeException checkAckArg(@Nullable Acknowledgment acknowledgment, Me * response message to the SendTo topic. * @param resultArg the result object to handle (never null) * @param request the original request message + * @param acknowledgment the acknowledgment to manual ack + * @param consumer the consumer to handler error * @param source the source data for the method invocation - e.g. * {@code o.s.messaging.Message}; may be null */ - protected void handleResult(Object resultArg, Object request, Object source) { + protected void handleResult(Object resultArg, Object request, @Nullable Acknowledgment acknowledgment, + Consumer consumer, @Nullable Message source) { + this.logger.debug(() -> "Listener method returned result [" + resultArg + "] - generating response message for it"); - boolean isInvocationResult = resultArg instanceof InvocationResult; - Object result = isInvocationResult ? ((InvocationResult) resultArg).getResult() : resultArg; String replyTopic = evaluateReplyTopic(request, source, resultArg); Assert.state(replyTopic == null || this.replyTemplate != null, "a KafkaTemplate is required to support replies"); - sendResponse(result, replyTopic, source, isInvocationResult - ? ((InvocationResult) resultArg).isMessageReturnType() : this.messageReturnType); + Object result; + boolean messageReturnType; + if (resultArg instanceof InvocationResult invocationResult) { + result = invocationResult.getResult(); + messageReturnType = invocationResult.isMessageReturnType(); + } + else { + result = resultArg; + messageReturnType = this.messageReturnType; + } + if (result instanceof CompletableFuture completable) { + if (acknowledgment == null || acknowledgment.isAsyncAcks()) { + this.logger.warn("Container 'Acknowledgment' must be async ack for Future return type; " + + "otherwise the container will ack the message immediately"); + } + completable.whenComplete((r, t) -> { + if (t == null) { + asyncSuccess(r, replyTopic, source, messageReturnType); + acknowledge(acknowledgment); + } + else { + asyncFailure(request, acknowledgment, consumer, t, source); + } + }); + } + else if (monoPresent && result instanceof Mono mono) { + if (acknowledgment == null || acknowledgment.isAsyncAcks()) { + this.logger.warn("Container 'Acknowledgment' must be async ack for Mono return type; " + + "otherwise the container will ack the message immediately"); + } + mono.subscribe( + r -> asyncSuccess(r, replyTopic, source, messageReturnType), + t -> asyncFailure(request, acknowledgment, consumer, t, source), + () -> acknowledge(acknowledgment) + ); + } + else { + sendResponse(result, replyTopic, source, messageReturnType); + } } @Nullable @@ -586,6 +633,37 @@ private void sendReplyForMessageSource(Object result, String topic, Message s this.replyTemplate.send(builder.build()); } + protected void asyncSuccess(@Nullable Object result, String replyTopic, Message source, boolean returnTypeMessage) { + if (result == null) { + if (this.logger.isDebugEnabled()) { + this.logger.debug("Async result is null, ignoring"); + } + } + else { + sendResponse(result, replyTopic, source, returnTypeMessage); + } + } + + protected void acknowledge(@Nullable Acknowledgment acknowledgment) { + if (acknowledgment != null) { + acknowledgment.acknowledge(); + } + } + + protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgment, Consumer consumer, + Throwable t, Message source) { + try { + handleException(request, acknowledgment, consumer, source, + new ListenerExecutionFailedException(createMessagingErrorMessage( + "Async Fail", source.getPayload()), t)); + return; + } + catch (Exception ex) { + } + this.logger.error(t, "Future, Mono, or suspend function was completed with an exception for " + source); + acknowledge(acknowledgment); + } + protected void handleException(Object records, @Nullable Acknowledgment acknowledgment, Consumer consumer, Message message, ListenerExecutionFailedException e) { @@ -596,7 +674,7 @@ protected void handleException(Object records, @Nullable Acknowledgment acknowle } Object result = this.errorHandler.handleError(message, e, consumer, acknowledgment); if (result != null) { - handleResult(result, records, message); + handleResult(result, records, acknowledgment, consumer, message); } } catch (Exception ex) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java index 08c015fb4b..63278b4a70 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/Acknowledgment.java @@ -81,4 +81,8 @@ default void nack(int index, Duration sleep) { throw new UnsupportedOperationException("nack(index, sleep) is not supported by this Acknowledgment"); } + default boolean isAsyncAcks() { + return false; + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java new file mode 100644 index 0000000000..e8e1b5a972 --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/annotation/AsyncListenerTests.java @@ -0,0 +1,249 @@ +/* + * Copyright 2023 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.annotation; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.listener.KafkaListenerErrorHandler; +import org.springframework.kafka.test.EmbeddedKafkaBroker; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.messaging.handler.annotation.SendTo; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; + +import reactor.core.publisher.Mono; + +@SpringJUnitConfig +@DirtiesContext +@EmbeddedKafka(topics = { + AsyncListenerTests.FUTURE_TOPIC_1, AsyncListenerTests.FUTURE_TOPIC_BATCH_1, + AsyncListenerTests.MONO_TOPIC_1, AsyncListenerTests.MONO_TOPIC_BATCH_1, + AsyncListenerTests.SEND_TOPIC_1}, partitions = 1) +public class AsyncListenerTests { + + static final String FUTURE_TOPIC_1 = "future-topic-1"; + + static final String FUTURE_TOPIC_BATCH_1 = "future-topic-batch-1"; + + static final String MONO_TOPIC_1 = "mono-topic-1"; + + static final String MONO_TOPIC_BATCH_1 = "mono-topic-batch-1"; + + static final String SEND_TOPIC_1 = "send-topic-1"; + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Autowired + private Config config; + + @Test + public void testAsyncListener() throws Exception { + + kafkaTemplate.send(FUTURE_TOPIC_1, "foo-1"); + ConsumerRecord cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 0); + assertThat(cr1.value()).isEqualTo("FOO-1"); + kafkaTemplate.send(FUTURE_TOPIC_1, "bar-1"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 1); + assertThat(cr1.value()).isEqualTo("bar-1_eh"); + + kafkaTemplate.send(FUTURE_TOPIC_BATCH_1, "foo-2"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 2); + assertThat(cr1.value()).isEqualTo("1"); + kafkaTemplate.send(FUTURE_TOPIC_BATCH_1, "bar-2"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 3); + assertThat(cr1.value()).isEqualTo("bar-2_beh"); + + kafkaTemplate.send(MONO_TOPIC_1, "foo-3"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 4); + assertThat(cr1.value()).isEqualTo("FOO-3"); + kafkaTemplate.send(MONO_TOPIC_1, "bar-3"); + assertThat(config.latch1.await(10, TimeUnit.SECONDS)).isEqualTo(true); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 5); + assertThat(cr1.value()).isEqualTo("bar-3_eh"); + + + kafkaTemplate.send(MONO_TOPIC_BATCH_1, "foo-4"); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 6); + assertThat(cr1.value()).isEqualTo("1"); + kafkaTemplate.send(MONO_TOPIC_BATCH_1, "bar-4"); + assertThat(config.latch2.await(10, TimeUnit.SECONDS)).isEqualTo(true); + cr1 = kafkaTemplate.receive(SEND_TOPIC_1, 0, 7); + assertThat(cr1.value()).isEqualTo("bar-4_beh"); + } + + public static class Listener { + + private final AtomicBoolean future1 = new AtomicBoolean(true); + + private final AtomicBoolean futureBatch1 = new AtomicBoolean(true); + + private final AtomicBoolean mono1 = new AtomicBoolean(true); + + private final AtomicBoolean monoBatch1 = new AtomicBoolean(true); + + @KafkaListener(id = "future1", topics = FUTURE_TOPIC_1, errorHandler = "errorHandler") + @SendTo(SEND_TOPIC_1) + public CompletableFuture listen1(String foo) { + CompletableFuture future = new CompletableFuture<>(); + if (future1.getAndSet(false)) { + future.complete(foo.toUpperCase()); + } + else { + future.completeExceptionally(new RuntimeException("Future.exception()")); + } + return future; + } + + @KafkaListener(id = "futureBatch1", topics = FUTURE_TOPIC_BATCH_1, errorHandler = "errorBatchHandler") + @SendTo(SEND_TOPIC_1) + public CompletableFuture listen2(List foo) { + CompletableFuture future = new CompletableFuture<>(); + if (futureBatch1.getAndSet(false)) { + future.complete(String.valueOf(foo.size())); + } + else { + future.completeExceptionally(new RuntimeException("Future.exception(batch)")); + } + return future; + } + + @KafkaListener(id = "mono1", topics = MONO_TOPIC_1, errorHandler = "errorHandler") + @SendTo(SEND_TOPIC_1) + public Mono listen3(String bar) { + if (mono1.getAndSet(false)) { + return Mono.just(bar.toUpperCase()); + } + else { + return Mono.error(new RuntimeException("Mono.error()")); + } + } + + @KafkaListener(id = "monoBatch1", topics = MONO_TOPIC_BATCH_1, errorHandler = "errorBatchHandler") + @SendTo(SEND_TOPIC_1) + public Mono listen4(List bar) { + if (monoBatch1.getAndSet(false)) { + return Mono.just(String.valueOf(bar.size())); + } + else { + return Mono.error(new RuntimeException("Mono.error(batch)")); + } + } + + } + + @Configuration + @EnableKafka + public static class Config { + + private final CountDownLatch latch1 = new CountDownLatch(2); + + private final CountDownLatch latch2 = new CountDownLatch(2); + + @Autowired + private KafkaTemplate kafkaTemplate; + + @Bean + public Listener listener() { + return new Listener(); + } + + @Bean + public KafkaTemplate template(EmbeddedKafkaBroker embeddedKafka) { + KafkaTemplate template = new KafkaTemplate<>(producerFactory(embeddedKafka)); + template.setConsumerFactory(consumerFactory(embeddedKafka)); + return template; + } + + @Bean + public ProducerFactory producerFactory(EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaProducerFactory<>(producerConfigs(embeddedKafka)); + } + + @Bean + public Map producerConfigs(EmbeddedKafkaBroker embeddedKafka) { + return KafkaTestUtils.producerProps(embeddedKafka); + } + + @Bean + public KafkaListenerErrorHandler errorHandler() { + return (message, exception) -> { + latch1.countDown(); + return message.getPayload() + "_eh"; + }; + } + + @Bean + public KafkaListenerErrorHandler errorBatchHandler() { + return (message, exception) -> { + latch2.countDown(); + return message.getPayload() + "_beh"; + }; + } + + @Bean + public DefaultKafkaConsumerFactory consumerFactory( + EmbeddedKafkaBroker embeddedKafka) { + return new DefaultKafkaConsumerFactory<>(consumerConfigs(embeddedKafka)); + } + + @Bean + public Map consumerConfigs(EmbeddedKafkaBroker embeddedKafka) { + return KafkaTestUtils.consumerProps("test", "false", embeddedKafka); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaBatchListenerContainerFactory( + EmbeddedKafkaBroker embeddedKafka) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(embeddedKafka)); + factory.setBatchListener(true); + factory.setReplyTemplate(kafkaTemplate); + return factory; + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( + EmbeddedKafkaBroker embeddedKafka) { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory(embeddedKafka)); + factory.setReplyTemplate(kafkaTemplate); + return factory; + } + + } + +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java index d19d1bca26..2ece2495b5 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2019 the original author or authors. + * Copyright 2019-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,10 +17,17 @@ package org.springframework.kafka.listener.adapter; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -39,6 +46,8 @@ import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; +import reactor.core.publisher.Mono; + /** * @author Gary Russell * @since 2.2.5 @@ -60,6 +69,41 @@ public void testKafkaNullInList(@Autowired KafkaListenerEndpointRegistry registr assertThat(foo.group).isEqualTo("test.group"); } + @SuppressWarnings("unchecked") + @Test + public void testFutureResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Bar bar) { + + BatchMessagingMessageListenerAdapter adapter = + spy((BatchMessagingMessageListenerAdapter) registry + .getListenerContainer("bar").getContainerProperties().getMessageListener()); + KafkaUtils.setConsumerGroupId("test.group.future"); + List> list = new ArrayList<>(); + list.add(new ConsumerRecord<>("bar", 0, 0L, null, "future_1")); + list.add(new ConsumerRecord<>("bar", 0, 1L, null, "future_2")); + list.add(new ConsumerRecord<>("bar", 1, 0L, null, "future_3")); + adapter.onMessage(list, null, null); + assertThat(bar.group).isEqualTo("test.group.future"); + verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); + verify(adapter, times(1)).acknowledge(any()); + } + + @SuppressWarnings("unchecked") + @Test + public void testMonoResult(@Autowired KafkaListenerEndpointRegistry registry, @Autowired Baz baz) { + + BatchMessagingMessageListenerAdapter adapter = + spy((BatchMessagingMessageListenerAdapter) registry + .getListenerContainer("baz").getContainerProperties().getMessageListener()); + KafkaUtils.setConsumerGroupId("test.group.mono"); + List> list = new ArrayList<>(); + list.add(new ConsumerRecord<>("baz", 0, 0L, null, "mono_1")); + list.add(new ConsumerRecord<>("baz", 0, 1L, null, "mono_2")); + adapter.onMessage(list, null, null); + assertThat(baz.group).isEqualTo("test.group.mono"); + verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); + verify(adapter, times(1)).acknowledge(any()); + } + public static class Foo { public volatile String value = "someValue"; @@ -68,10 +112,38 @@ public static class Foo { @KafkaListener(id = "foo", topics = "foo", autoStartup = "false") public void listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { - list.forEach(s -> { - this.value = s; - }); + list.forEach(s -> this.value = s); + this.group = groupId; + } + + } + + public static class Bar { + + public volatile String group; + + @KafkaListener(id = "bar", topics = "bar", autoStartup = "false") + public CompletableFuture listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { + + this.group = groupId; + CompletableFuture future = new CompletableFuture<>(); + future.complete("processed: " + list.size()); + return future; + } + + } + + public static class Baz { + + public volatile String value = "someValue"; + + public volatile String group; + + @KafkaListener(id = "baz", topics = "baz", autoStartup = "false") + public Mono listen(List list, @Header(KafkaHeaders.GROUP_ID) String groupId) { + this.group = groupId; + return Mono.just(list.size()); } } @@ -85,11 +157,20 @@ public Foo foo() { return new Foo(); } + @Bean + public Bar bar() { + return new Bar(); + } + + @Bean + public Baz baz() { + return new Baz(); + } + @SuppressWarnings({ "rawtypes" }) @Bean public ConsumerFactory consumerFactory() { - ConsumerFactory consumerFactory = mock(ConsumerFactory.class); - return consumerFactory; + return mock(ConsumerFactory.class); } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -100,6 +181,7 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerCont factory.setBatchListener(true); return factory; } + } } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java index a20468b7b1..d8ed544380 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapterTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2021 the original author or authors. + * Copyright 2016-2023 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,16 @@ package org.springframework.kafka.listener.adapter; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.BDDMockito.willReturn; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.lang.reflect.Method; +import java.util.concurrent.CompletableFuture; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.junit.jupiter.api.Test; @@ -33,6 +38,8 @@ import org.springframework.kafka.support.converter.RecordMessageConverter; import org.springframework.messaging.support.GenericMessage; +import reactor.core.publisher.Mono; + /** * @author Gary Russell * @since 1.1.2 @@ -66,6 +73,37 @@ public void onMessage(ConsumerRecord data, Acknowledgment acknow verify(converter).toMessage(cr, ack, null, String.class); } + @Test + public void testCompletableFutureReturn() throws NoSuchMethodException { + + Method method = getClass().getDeclaredMethod("future", String.class, Acknowledgment.class); + testAsyncResult(method, "bar"); + } + + @Test + public void testMonoReturn() throws NoSuchMethodException { + + Method method = getClass().getDeclaredMethod("mono", String.class, Acknowledgment.class); + testAsyncResult(method, "baz"); + } + + private void testAsyncResult(Method method, String topic) { + + KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor<>(); + RecordMessagingMessageListenerAdapter adapter = + spy(new RecordMessagingMessageListenerAdapter<>(this, method)); + adapter.setHandlerMethod( + new HandlerAdapter(bpp.getMessageHandlerMethodFactory().createInvocableHandlerMethod(this, method))); + ConsumerRecord cr = new ConsumerRecord<>(topic, 0, 0L, null, "foo"); + Acknowledgment ack = mock(Acknowledgment.class); + RecordMessageConverter converter = mock(RecordMessageConverter.class); + willReturn(new GenericMessage<>("foo")).given(converter).toMessage(cr, ack, null, String.class); + adapter.setMessageConverter(converter); + adapter.onMessage(cr, ack, null); + verify(adapter, times(1)).asyncSuccess(any(), any(), any(), anyBoolean()); + verify(adapter, times(1)).acknowledge(any()); + } + @Test void testMissingAck() throws NoSuchMethodException, SecurityException { KafkaListenerAnnotationBeanPostProcessor bpp = new KafkaListenerAnnotationBeanPostProcessor<>(); @@ -84,4 +122,16 @@ public void test(Acknowledgment ack) { } + public CompletableFuture future(String data, Acknowledgment ack) { + + CompletableFuture future = new CompletableFuture<>(); + future.complete("processed" + data); + return future; + } + + public Mono mono(String data, Acknowledgment ack) { + + return Mono.just(data); + } + }