Browse files

Merge pull request #21 from garyrussell/AMQP-221

* AMQP-221:
  AMQP-221 Support Dead Letter Queues
  • Loading branch information...
2 parents 4a2e08e + 5526a91 commit 8332a77b7db84a71d3efe5fc697bd4bf3f5c20df Oleg Zhurakousky committed Apr 9, 2012
View
41 ...g-amqp-core/src/main/java/org/springframework/amqp/AmqpRejectAndDontRequeueException.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.amqp;
+
+/**
+ * Exception for listener implementations used to indicate the
+ * basic.reject will be sent with requeue=false in order to enable
+ * features such as DLQ.
+ * @author Gary Russell
+ * @since 1.0.1
+ *
+ */
+@SuppressWarnings("serial")
+public class AmqpRejectAndDontRequeueException extends AmqpException {
+
+ public AmqpRejectAndDontRequeueException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public AmqpRejectAndDontRequeueException(String message) {
+ super(message);
+ }
+
+ public AmqpRejectAndDontRequeueException(Throwable cause) {
+ super(cause);
+ }
+
+}
View
2 spring-amqp-parent/pom.xml
@@ -20,7 +20,7 @@
<org.mockito.version>1.8.4</org.mockito.version>
<org.codehaus.jackson.version>1.4.3</org.codehaus.jackson.version>
<org.erlang.otp.version>1.5.3</org.erlang.otp.version>
- <com.rabbitmq.version>2.7.1</com.rabbitmq.version>
+ <com.rabbitmq.version>2.8.1</com.rabbitmq.version>
<org.springframework.version>3.0.5.RELEASE</org.springframework.version>
</properties>
<licenses>
View
9 ...-rabbit/src/main/java/org/springframework/amqp/rabbit/config/ListenerContainerParser.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2011 the original author or authors.
+ * Copyright 2010-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -33,6 +33,7 @@
/**
* @author Mark Fisher
+ * @author Gary Russell
* @since 1.0
*/
class ListenerContainerParser implements BeanDefinitionParser {
@@ -83,6 +84,7 @@
private static final String ADVICE_CHAIN_ATTRIBUTE = "advice-chain";
+ private static final String REQUEUE_REJECTED_ATTRIBUTE = "requeue-rejected";
public BeanDefinition parse(Element element, ParserContext parserContext) {
CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(),
@@ -243,6 +245,11 @@ private BeanDefinition parseContainer(Element listenerEle, Element containerEle,
containerDef.getPropertyValues().add("txSize", new TypedStringValue(transactionSize));
}
+ String requeueRejected = containerEle.getAttribute(REQUEUE_REJECTED_ATTRIBUTE);
+ if (StringUtils.hasText(requeueRejected)) {
+ containerDef.getPropertyValues().add("defaultRequeueRejected", new TypedStringValue(requeueRejected));
+ }
+
String phase = containerEle.getAttribute(PHASE_ATTRIBUTE);
if (StringUtils.hasText(phase)) {
containerDef.getPropertyValues().add("phase", phase);
View
31 ...-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumer.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2002-2011 the original author or authors.
+ * Copyright 2002-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
@@ -46,6 +47,7 @@
*
* @author Mark Pollack
* @author Dave Syer
+ * @author Gary Russell
*
*/
public class BlockingQueueConsumer {
@@ -80,20 +82,35 @@
private Set<Long> deliveryTags = new LinkedHashSet<Long>();
+ private final boolean defaultRequeuRejected;
+
/**
* Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker
- * until it is started.
+ * until it is started. RequeueRejected defaults to true.
*/
public BlockingQueueConsumer(ConnectionFactory connectionFactory,
MessagePropertiesConverter messagePropertiesConverter,
ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
boolean transactional, int prefetchCount, String... queues) {
+ this(connectionFactory, messagePropertiesConverter, activeObjectCounter,
+ acknowledgeMode, transactional, prefetchCount, true, queues);
+ }
+
+ /**
+ * Create a consumer. The consumer must not attempt to use the connection factory or communicate with the broker
+ * until it is started.
+ */
+ public BlockingQueueConsumer(ConnectionFactory connectionFactory,
+ MessagePropertiesConverter messagePropertiesConverter,
+ ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
+ boolean transactional, int prefetchCount, boolean defaultRequeueRejected, String... queues) {
this.connectionFactory = connectionFactory;
this.messagePropertiesConverter = messagePropertiesConverter;
this.activeObjectCounter = activeObjectCounter;
this.acknowledgeMode = acknowledgeMode;
this.transactional = transactional;
this.prefetchCount = prefetchCount;
+ this.defaultRequeuRejected = defaultRequeueRejected;
this.queues = queues;
}
@@ -315,9 +332,17 @@ public void rollbackOnExceptionIfNecessary(Throwable ex) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Rejecting messages");
}
+ boolean shouldRequeue = this.defaultRequeuRejected;
+ Throwable t = ex;
+ while (shouldRequeue && t != null) {
+ if (t instanceof AmqpRejectAndDontRequeueException) {
+ shouldRequeue = false;
+ }
+ t = t.getCause();
+ }
for (Long deliveryTag : deliveryTags) {
// With newer RabbitMQ brokers could use basicNack here...
- channel.basicReject(deliveryTag, true);
+ channel.basicReject(deliveryTag, shouldRequeue);
}
if (transactional) {
// Need to commit the reject (=nack)
View
18 ...rc/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java
@@ -23,6 +23,7 @@
import org.aopalliance.aop.Advice;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIllegalStateException;
+import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
@@ -94,6 +95,8 @@
private volatile MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
+ private volatile boolean defaultRequeueRejected = true;
+
public static interface ContainerDelegate {
void invokeListener(Channel channel, Message message) throws Exception;
}
@@ -219,6 +222,19 @@ public void setMessagePropertiesConverter(MessagePropertiesConverter messageProp
}
/**
+ * Determines the default behavior when a message is rejected, for example because the listener
+ * threw an exception. When true, messages will be requeued, when false, they will not. For
+ * versions of Rabbit that support dead-lettering, the message must not be requeued in order
+ * to be sent to the dead letter exchange. Setting to false causes all rejections to not
+ * be requeued. When true, the default can be overridden by the listener throwing an
+ * {@link AmqpRejectAndDontRequeueException}. Default true.
+ * @param defaultRequeueRejected
+ */
+ public void setDefaultRequeueRejected(boolean defaultRequeueRejected) {
+ this.defaultRequeueRejected = defaultRequeueRejected;
+ }
+
+ /**
* Avoid the possibility of not configuring the CachingConnectionFactory in sync with the number of concurrent
* consumers.
*/
@@ -378,7 +394,7 @@ protected BlockingQueueConsumer createBlockingQueueConsumer() {
// didn't get an ack for delivered messages
int actualPrefetchCount = prefetchCount > txSize ? prefetchCount : txSize;
consumer = new BlockingQueueConsumer(getConnectionFactory(), this.messagePropertiesConverter, cancellationLock,
- getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, queues);
+ getAcknowledgeMode(), isChannelTransacted(), actualPrefetchCount, this.defaultRequeueRejected, queues);
return consumer;
}
View
9 ...ng-rabbit/src/main/resources/org/springframework/amqp/rabbit/config/spring-rabbit-1.0.xsd
@@ -480,6 +480,15 @@
]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="requeue-rejected" type="xsd:string">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[
+ Tells the container the default requeue behavior when rejecting messages. Default is 'true' meaning messages
+ will be requeued, unless the listener signals not to by throwing an AmqpRejectAndDontRequeueException. When
+ set to false, messages will never be requeued.
+ ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
<xsd:attribute name="phase" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[
View
11 ...it/src/test/java/org/springframework/amqp/rabbit/config/ListenerContainerParserTests.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2010-2011 the original author or authors.
+ * Copyright 2010-2012 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -41,6 +41,7 @@
/**
* @author Mark Fisher
+ * @author Gary Russell
*/
public class ListenerContainerParserTests {
@@ -84,6 +85,14 @@ public void testParseWithAdviceChain() throws Exception {
public void testParseWithDefaults() throws Exception {
SimpleMessageListenerContainer container = beanFactory.getBean("container4", SimpleMessageListenerContainer.class);
assertEquals(1, ReflectionTestUtils.getField(container, "concurrentConsumers"));
+ assertEquals(true, ReflectionTestUtils.getField(container, "defaultRequeueRejected"));
+ }
+
+ @Test
+ public void testParseWithDefaultQueueRejectedFalse() throws Exception {
+ SimpleMessageListenerContainer container = beanFactory.getBean("container5", SimpleMessageListenerContainer.class);
+ assertEquals(1, ReflectionTestUtils.getField(container, "concurrentConsumers"));
+ assertEquals(false, ReflectionTestUtils.getField(container, "defaultRequeueRejected"));
}
static class TestBean {
View
125 ...it/src/test/java/org/springframework/amqp/rabbit/listener/BlockingQueueConsumerTests.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2002-2012 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.amqp.rabbit.listener;
+
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.springframework.amqp.AmqpRejectAndDontRequeueException;
+import org.springframework.amqp.core.AcknowledgeMode;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
+import org.springframework.beans.DirectFieldAccessor;
+
+import com.rabbitmq.client.Channel;
+
+/**
+ * @author Gary Russell
+ * @since 1.0.1
+ *
+ */
+public class BlockingQueueConsumerTests {
+
+ @Test
+ public void testRequeue() throws Exception {
+ Exception ex = new RuntimeException();
+ testRequeueOrNot(ex, true);
+ }
+
+ @Test
+ public void testRequeueNullException() throws Exception {
+ Exception ex = null;
+ testRequeueOrNot(ex, true);
+ }
+
+ @Test
+ public void testDontRequeue() throws Exception {
+ Exception ex = new AmqpRejectAndDontRequeueException("fail");
+ testRequeueOrNot(ex, false);
+ }
+
+ @Test
+ public void testDontRequeueNested() throws Exception {
+ Exception ex = new RuntimeException(
+ new RuntimeException(new AmqpRejectAndDontRequeueException(
+ "fail")));
+ testRequeueOrNot(ex, false);
+ }
+
+ @Test
+ public void testRequeueDefaultNot() throws Exception {
+ Exception ex = new RuntimeException();
+ testRequeueOrNotDefaultNot(ex, false);
+ }
+
+ @Test
+ public void testRequeueNullExceptionDefaultNot() throws Exception {
+ Exception ex = null;
+ testRequeueOrNotDefaultNot(ex, false);
+ }
+
+ @Test
+ public void testDontRequeueDefaultNot() throws Exception {
+ Exception ex = new AmqpRejectAndDontRequeueException("fail");
+ testRequeueOrNotDefaultNot(ex, false);
+ }
+
+ @Test
+ public void testDontRequeueNestedDefaultNot() throws Exception {
+ Exception ex = new RuntimeException(
+ new RuntimeException(new AmqpRejectAndDontRequeueException(
+ "fail")));
+ testRequeueOrNotDefaultNot(ex, false);
+ }
+
+ private void testRequeueOrNot(Exception ex, boolean requeue)
+ throws Exception, IOException {
+ ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
+ Channel channel = mock(Channel.class);
+ BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(connectionFactory,
+ new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<BlockingQueueConsumer>(),
+ AcknowledgeMode.AUTO, true, 1, "testQ");
+ testRequeueOrNotGuts(ex, requeue, channel, blockingQueueConsumer);
+ }
+
+ private void testRequeueOrNotDefaultNot(Exception ex, boolean requeue)
+ throws Exception, IOException {
+ ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
+ Channel channel = mock(Channel.class);
+ BlockingQueueConsumer blockingQueueConsumer = new BlockingQueueConsumer(connectionFactory,
+ new DefaultMessagePropertiesConverter(), new ActiveObjectCounter<BlockingQueueConsumer>(),
+ AcknowledgeMode.AUTO, true, 1, false, "testQ");
+ testRequeueOrNotGuts(ex, requeue, channel, blockingQueueConsumer);
+ }
+
+ private void testRequeueOrNotGuts(Exception ex, boolean requeue,
+ Channel channel, BlockingQueueConsumer blockingQueueConsumer)
+ throws Exception, IOException {
+ DirectFieldAccessor dfa = new DirectFieldAccessor(blockingQueueConsumer);
+ dfa.setPropertyValue("channel", channel);
+ Set<Long> deliveryTags = new HashSet<Long>();
+ deliveryTags.add(1L);
+ dfa.setPropertyValue("deliveryTags", deliveryTags);
+ blockingQueueConsumer.rollbackOnExceptionIfNecessary(ex);
+ Mockito.verify(channel).basicReject(1L, requeue);
+ }
+
+}
View
4 ...resources/org/springframework/amqp/rabbit/config/ListenerContainerParserTests-context.xml
@@ -28,6 +28,10 @@
<rabbit:listener id="testListener" queues="foo" ref="testBean" method="handle"/>
</rabbit:listener-container>
+ <rabbit:listener-container id="container5" connection-factory="connectionFactory" requeue-rejected="false">
+ <rabbit:listener id="testListener" queues="foo" ref="testBean" method="handle"/>
+ </rabbit:listener-container>
+
<util:list id="adviceChain">
<bean class="org.springframework.amqp.rabbit.config.ListenerContainerParserTests$TestAdvice"/>
<bean class="org.springframework.amqp.rabbit.config.ListenerContainerParserTests$TestAdvice"/>

0 comments on commit 8332a77

Please sign in to comment.