diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/BaseRabbitListenerContainerFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/BaseRabbitListenerContainerFactory.java index 339c7e08b2..51e0102027 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/BaseRabbitListenerContainerFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/config/BaseRabbitListenerContainerFactory.java @@ -17,6 +17,7 @@ package org.springframework.amqp.rabbit.config; import java.util.Arrays; +import java.util.function.Function; import org.aopalliance.aop.Advice; @@ -26,6 +27,7 @@ import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpoint; import org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener; +import org.springframework.amqp.rabbit.listener.adapter.ReplyPostProcessor; import org.springframework.amqp.utils.JavaUtils; import org.springframework.lang.Nullable; import org.springframework.retry.RecoveryCallback; @@ -54,6 +56,8 @@ public abstract class BaseRabbitListenerContainerFactory replyPostProcessorProvider; + @Override public abstract C createListenerContainer(RabbitListenerEndpoint endpoint); @@ -108,6 +112,17 @@ public void setReplyRecoveryCallback(RecoveryCallback recoveryCallback) { this.recoveryCallback = recoveryCallback; } + /** + * Set a function to provide a reply post processor; it will be used if there is no + * replyPostProcessor on the rabbit listener annotation. The input parameter is the + * listener id. + * @param replyPostProcessorProvider the post processor. + * @since 3.0 + */ + public void setReplyPostProcessorProvider(Function replyPostProcessorProvider) { + this.replyPostProcessorProvider = replyPostProcessorProvider; + } + protected void applyCommonOverrides(@Nullable RabbitListenerEndpoint endpoint, C instance) { if (endpoint != null) { // endpoint settings overriding default factory settings JavaUtils.INSTANCE @@ -130,6 +145,11 @@ protected void applyCommonOverrides(@Nullable RabbitListenerEndpoint endpoint, C .acceptIfNotNull(endpoint.getReplyPostProcessor(), messageListener::setReplyPostProcessor) .acceptIfNotNull(endpoint.getReplyContentType(), messageListener::setReplyContentType); messageListener.setConverterWinsContentType(endpoint.isConverterWinsContentType()); + if (endpoint.getReplyPostProcessor() == null && this.replyPostProcessorProvider != null) { + JavaUtils.INSTANCE + .acceptIfNotNull(this.replyPostProcessorProvider.apply(endpoint.getId()), + messageListener::setReplyPostProcessor); + } } } } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java index 6eec2a3162..af286a88cd 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/EnableRabbitIntegrationTests.java @@ -237,6 +237,9 @@ public class EnableRabbitIntegrationTests extends NeedsManagementTests { @Autowired private MultiListenerValidatedJsonBean multiValidated; + @Autowired + private ReplyPostProcessor rpp; + @BeforeAll public static void setUp() { System.setProperty(RabbitListenerAnnotationBeanPostProcessor.RABBIT_EMPTY_STRING_ARGUMENTS_PROPERTY, @@ -310,6 +313,8 @@ public void autoStart() { this.registry.start(); assertThat(listenerContainer.isRunning()).isTrue(); listenerContainer.stop(); + assertThat(listenerContainer.getMessageListener()).extracting("replyPostProcessor") + .isSameAs(this.rpp); } @Test @@ -1690,14 +1695,22 @@ public SimpleMessageListenerContainer factoryCreatedContainerNoListener( } @Bean - public SimpleRabbitListenerContainerFactory rabbitAutoStartFalseListenerContainerFactory() { + public SimpleRabbitListenerContainerFactory rabbitAutoStartFalseListenerContainerFactory( + ReplyPostProcessor rpp) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(rabbitConnectionFactory()); factory.setReceiveTimeout(10L); factory.setAutoStartup(false); + factory.setReplyPostProcessorProvider(id -> rpp); return factory; } + @Bean + ReplyPostProcessor rpp() { + return (in, out) -> out; + } + @Bean public SimpleRabbitListenerContainerFactory jsonListenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); diff --git a/src/reference/asciidoc/amqp.adoc b/src/reference/asciidoc/amqp.adoc index 67a3d8511e..0375aebd5e 100644 --- a/src/reference/asciidoc/amqp.adoc +++ b/src/reference/asciidoc/amqp.adoc @@ -3144,6 +3144,20 @@ public ReplyPostProcessor echoCustomHeader() { ---- ==== +Starting with version 3.0, you can configure the post processor on the container factory instead of on the annotation. + +==== +[source, java] +---- +factory.setReplyPostProcessorProvider(id -> (req, resp) -> { + resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader")); + return resp; +}); +---- +==== + +The annotation will supersede the factory setting. + The `@SendTo` value is assumed as a reply `exchange` and `routingKey` pair that follows the `exchange/routingKey` pattern, where one of those parts can be omitted. The valid values are as follows: diff --git a/src/reference/asciidoc/whats-new.adoc b/src/reference/asciidoc/whats-new.adoc index 912926c37b..a55a2d40c5 100644 --- a/src/reference/asciidoc/whats-new.adoc +++ b/src/reference/asciidoc/whats-new.adoc @@ -37,7 +37,10 @@ When setting the container factory `consumerBatchEnabled` to `true`, the `batchL See <> for more infoprmation. `MessageConverter` s can now return `Optional.empty()` for a null value; this is currently implemented by the `Jackson2JsonMessageConverter`. -See <> for more information. +See <> for more information + +You can now configure a `ReplyPostProcessor` via the container factory rather than via a property on `@RabbitListener`. +See <> for more information. ==== Connection Factory Changes