Skip to content

Commit

Permalink
AMQP-455: Pub. Confirms with Send and Receive
Browse files Browse the repository at this point in the history
JIRA: https://jira.spring.io/browse/AMQP-455

Support publisher confirms on `sendAndReceive` operations.
  • Loading branch information
garyrussell authored and artembilan committed Apr 27, 2015
1 parent 78e7ac9 commit fbd3fee
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 34 deletions.
Expand Up @@ -748,7 +748,7 @@ public <R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, Re
@Override
public <R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException {
return this.doReceiveAndReply(queueName, callback, replyToAddressCallback);
return doReceiveAndReply(queueName, callback, replyToAddressCallback);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -838,55 +838,98 @@ else if (channelLocallyTransacted) {

@Override
public Message sendAndReceive(final Message message) throws AmqpException {
return this.doSendAndReceive(this.exchange, this.routingKey, message);
return sendAndReceive(message, null);
}

public Message sendAndReceive(final Message message, CorrelationData correlationData) throws AmqpException {
return doSendAndReceive(this.exchange, this.routingKey, message, correlationData);
}

@Override
public Message sendAndReceive(final String routingKey, final Message message) throws AmqpException {
return this.doSendAndReceive(this.exchange, routingKey, message);
return sendAndReceive(routingKey, message, null);
}

public Message sendAndReceive(final String routingKey, final Message message, CorrelationData correlationData) throws AmqpException {
return doSendAndReceive(this.exchange, routingKey, message, correlationData);
}

@Override
public Message sendAndReceive(final String exchange, final String routingKey, final Message message)
throws AmqpException {
return this.doSendAndReceive(exchange, routingKey, message);
return sendAndReceive(exchange, routingKey, message, null);
}

public Message sendAndReceive(final String exchange, final String routingKey, final Message message, CorrelationData correlationData)
throws AmqpException {
return doSendAndReceive(exchange, routingKey, message, correlationData);
}

@Override
public Object convertSendAndReceive(final Object message) throws AmqpException {
return this.convertSendAndReceive(this.exchange, this.routingKey, message, null);
return convertSendAndReceive(message, (CorrelationData) null);
}

public Object convertSendAndReceive(final Object message, CorrelationData correlationData) throws AmqpException {
return convertSendAndReceive(this.exchange, this.routingKey, message, null, correlationData);
}

@Override
public Object convertSendAndReceive(final String routingKey, final Object message) throws AmqpException {
return this.convertSendAndReceive(this.exchange, routingKey, message, null);
return convertSendAndReceive(routingKey, message, (CorrelationData) null);
}

public Object convertSendAndReceive(final String routingKey, final Object message, CorrelationData correlationData)
throws AmqpException {
return convertSendAndReceive(this.exchange, routingKey, message, null, correlationData);
}

@Override
public Object convertSendAndReceive(final String exchange, final String routingKey, final Object message)
throws AmqpException {
return this.convertSendAndReceive(exchange, routingKey, message, null);
return convertSendAndReceive(exchange, routingKey, message, (CorrelationData) null);
}

@Override
public Object convertSendAndReceive(final Object message, final MessagePostProcessor messagePostProcessor) throws AmqpException {
return this.convertSendAndReceive(this.exchange, this.routingKey, message, messagePostProcessor);
public Object convertSendAndReceive(final String exchange, final String routingKey, final Object message,
CorrelationData correlationData) throws AmqpException {
return convertSendAndReceive(exchange, routingKey, message, null, correlationData);
}

@Override
public Object convertSendAndReceive(final String routingKey, final Object message, final MessagePostProcessor messagePostProcessor)
public Object convertSendAndReceive(final Object message, final MessagePostProcessor messagePostProcessor)
throws AmqpException {
return this.convertSendAndReceive(this.exchange, routingKey, message, messagePostProcessor);
return convertSendAndReceive(message, messagePostProcessor, null);
}

public Object convertSendAndReceive(final Object message, final MessagePostProcessor messagePostProcessor,
CorrelationData correlationData) throws AmqpException {
return convertSendAndReceive(this.exchange, this.routingKey, message, messagePostProcessor, correlationData);
}

@Override
public Object convertSendAndReceive(final String routingKey, final Object message,
final MessagePostProcessor messagePostProcessor) throws AmqpException {
return convertSendAndReceive(routingKey, message, messagePostProcessor, null);
}

public Object convertSendAndReceive(final String routingKey, final Object message, final MessagePostProcessor messagePostProcessor,
CorrelationData correlationData) throws AmqpException {
return convertSendAndReceive(this.exchange, routingKey, message, messagePostProcessor, correlationData);
}

@Override
public Object convertSendAndReceive(final String exchange, final String routingKey, final Object message,
final MessagePostProcessor messagePostProcessor) throws AmqpException {
return convertSendAndReceive(exchange, routingKey, message, messagePostProcessor, null);
}

public Object convertSendAndReceive(final String exchange, final String routingKey, final Object message,
final MessagePostProcessor messagePostProcessor, final CorrelationData correlationData) throws AmqpException {
Message requestMessage = convertMessageIfNecessary(message);
if (messagePostProcessor != null) {
requestMessage = messagePostProcessor.postProcessMessage(requestMessage);
}
Message replyMessage = this.doSendAndReceive(exchange, routingKey, requestMessage);
Message replyMessage = doSendAndReceive(exchange, routingKey, requestMessage, correlationData);
if (replyMessage == null) {
return null;
}
Expand All @@ -906,9 +949,11 @@ protected Message convertMessageIfNecessary(final Object object) {
* @param exchange the exchange name
* @param routingKey the routing key
* @param message the message to send
* @param correlationData the correlation data for confirms
* @return the message that is received in reply
*/
protected Message doSendAndReceive(final String exchange, final String routingKey, final Message message) {
protected Message doSendAndReceive(final String exchange, final String routingKey, final Message message,
CorrelationData correlationData) {
if (!this.evaluatedFastReplyTo) {
synchronized(this) {
if (!this.evaluatedFastReplyTo) {
Expand All @@ -917,14 +962,15 @@ protected Message doSendAndReceive(final String exchange, final String routingKe
}
}
if (this.replyAddress == null || this.usingFastReplyTo) {
return doSendAndReceiveWithTemporary(exchange, routingKey, message);
return doSendAndReceiveWithTemporary(exchange, routingKey, message, correlationData);
}
else {
return doSendAndReceiveWithFixed(exchange, routingKey, message);
return doSendAndReceiveWithFixed(exchange, routingKey, message, correlationData);
}
}

protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey, final Message message) {
protected Message doSendAndReceiveWithTemporary(final String exchange, final String routingKey,
final Message message, final CorrelationData correlationData) {
return this.execute(new ChannelCallback<Message>() {

@Override
Expand Down Expand Up @@ -964,7 +1010,7 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
}
};
channel.basicConsume(replyTo, true, consumerTag, true, true, null, consumer);
doSend(channel, exchange, routingKey, message, null);
doSend(channel, exchange, routingKey, message, correlationData);
Message reply = (replyTimeout < 0) ? replyHandoff.take() : replyHandoff.poll(replyTimeout,
TimeUnit.MILLISECONDS);
channel.basicCancel(consumerTag);
Expand All @@ -973,7 +1019,8 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp
}, obtainTargetConnectionFactoryIfNecessary(this.sendConnectionFactorySelectorExpression, message));
}

protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message) {
protected Message doSendAndReceiveWithFixed(final String exchange, final String routingKey, final Message message,
final CorrelationData correlationData) {
return this.execute(new ChannelCallback<Message>() {

@Override
Expand Down Expand Up @@ -1015,7 +1062,7 @@ public Message doInRabbit(Channel channel) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Sending message with tag " + messageTag);
}
doSend(channel, exchange, routingKey, message, null);
doSend(channel, exchange, routingKey, message, correlationData);
LinkedBlockingQueue<Message> replyHandoff = pendingReply.getQueue();
Message reply = (replyTimeout < 0) ? replyHandoff.take() : replyHandoff.poll(replyTimeout,
TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -1051,7 +1098,7 @@ public T doWithRetry(RetryContext context) throws Exception {
}
}
else {
return this.doExecute(action, connectionFactory);
return doExecute(action, connectionFactory);
}
}

Expand Down Expand Up @@ -1106,11 +1153,7 @@ protected void doSend(Channel channel, String exchange, String routingKey, Messa
// try to send to configured routing key
routingKey = this.routingKey;
}
if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) {
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(),
new PendingConfirm(correlationData, System.currentTimeMillis()));
}
setupConfirm(channel, correlationData);
boolean mandatory = this.returnCallback != null &&
this.mandatoryExpression.getValue(this.evaluationContext, message, Boolean.class);
MessageProperties messageProperties = message.getMessageProperties();
Expand All @@ -1132,6 +1175,14 @@ protected void doSend(Channel channel, String exchange, String routingKey, Messa
}
}

private void setupConfirm(Channel channel, CorrelationData correlationData) {
if (this.confirmCallback != null && channel instanceof PublisherCallbackChannel) {
PublisherCallbackChannel publisherCallbackChannel = (PublisherCallbackChannel) channel;
publisherCallbackChannel.addPendingConfirm(this, channel.getNextPublishSeqNo(),
new PendingConfirm(correlationData, System.currentTimeMillis()));
}
}

/**
* Check whether the given Channel is locally transacted, that is, whether its transaction is managed by this
* template's Channel handling and not by an external transaction coordinator.
Expand Down
Expand Up @@ -61,6 +61,8 @@
import org.springframework.amqp.rabbit.connection.ChannelProxy;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ReturnCallback;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl;
import org.springframework.amqp.rabbit.test.BrokerRunning;
Expand Down Expand Up @@ -190,6 +192,36 @@ public Void doInRabbit(Channel channel) throws Exception {
verify(logger, never()).error(any());
}

@Test
public void testPublisherConfirmWithSendAndReceive() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<CorrelationData> confirmCD = new AtomicReference<CorrelationData>();
templateWithConfirmsEnabled.setConfirmCallback(new ConfirmCallback() {

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
confirmCD.set(correlationData);
latch.countDown();
}
});
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactoryWithConfirmsEnabled);
container.setQueueNames(ROUTE);
container.setMessageListener(new MessageListenerAdapter(new Object() {

@SuppressWarnings("unused")
public String handleMessage(String in) {
return in.toUpperCase();
}
}));
container.start();
CorrelationData correlationData = new CorrelationData("abc");
String result = (String) this.templateWithConfirmsEnabled.convertSendAndReceive(ROUTE, (Object) "message", correlationData);
container.stop();
assertEquals("MESSAGE", result);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertEquals(correlationData, confirmCD.get());
}

@Test
public void testPublisherConfirmReceivedConcurrentThreads() throws Exception {
final CountDownLatch latch = new CountDownLatch(2);
Expand Down
Expand Up @@ -189,7 +189,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
RabbitTemplate template = new RabbitTemplate(new SingleConnectionFactory(mockConnectionFactory));
template.setReplyTimeout(1);
Message input = new Message("Hello, world!".getBytes(), new MessageProperties());
template.doSendAndReceiveWithTemporary("foo", "bar", input);
template.doSendAndReceiveWithTemporary("foo", "bar", input, null);
Envelope envelope = new Envelope(1, false, "foo", "bar");
// used to hang here because of the SynchronousQueue and doSendAndReceive() already exited
consumer.get().handleDelivery("foo", envelope, new AMQP.BasicProperties(), new byte[0]);
Expand Down
17 changes: 10 additions & 7 deletions src/reference/asciidoc/amqp.adoc
Expand Up @@ -537,19 +537,12 @@ For Publisher Confirms (aka Publisher Acknowledgements), the template requires a
Confirms are sent to to the client by it registering a `RabbitTemplate.ConfirmCallback` by calling `setConfirmCallback(ConfirmCallback callback)`.
The callback must implement this method:

NOTE: When a rabbit template send operation completes, the channel is closed; this would preclude the reception of confirms or returns in the case when the connection factory cache is full (when there is space in the cache, the channel is not physically closed and the returns/confirms will proceed as normal).
When the cache is full, the framework defers the close for up to 5 seconds, in order to allow time for the confirms/returns to be received.
When using confirms, the channel will be closed when the last confirm is received.
When using only returns, the channel will remain open for the full 5 seconds.
It is generally recommended to set the connection factory's `channelCacheSize` to a large enough value so that the channel on which a message is published is returned to the cache instead of being closed.

[source,java]
----
void confirm(CorrelationData correlationData, boolean ack, String cause);
----

The `CorrelationData` is an object supplied by the client when sending the original message.
This is described further in the next section.
The `ack` is true for an `ack` and false for a `nack`.
For `nack` s, the cause may contain a reason for the nack, if it is available when the `nack` is generated.
An example is when sending a message to a non-existent exchange.
Expand All @@ -558,6 +551,12 @@ In that case the broker closes the channel; the reason for the closure is includ

Only one `ConfirmCallback` is supported by a `RabbitTemplate`.

NOTE: When a rabbit template send operation completes, the channel is closed; this would preclude the reception of confirms or returns in the case when the connection factory cache is full (when there is space in the cache, the channel is not physically closed and the returns/confirms will proceed as normal).
When the cache is full, the framework defers the close for up to 5 seconds, in order to allow time for the confirms/returns to be received.
When using confirms, the channel will be closed when the last confirm is received.
When using only returns, the channel will remain open for the full 5 seconds.
It is generally recommended to set the connection factory's `channelCacheSize` to a large enough value so that the channel on which a message is published is returned to the cache instead of being closed.

[[template-messaging]]
===== Messaging integration

Expand Down Expand Up @@ -1480,6 +1479,10 @@ Similar request/reply methods are also available where the `MessageConverter` is
Those methods are named `convertSendAndReceive`.
See the Javadoc of `AmqpTemplate` for more detail.

Starting with _version 1.5_, each of the `sendAndReceive` method variants has an overloaded version that takes `CorrelationData`.
Together with a properly configured connection factory, this enables the receipt of publisher confirms for the send side of the operation.
See <<template-confirms>> for more information.

By default, a new temporary queue is used for each reply (but see <<direct-reply-to>>).
However, a single reply queue can be configured on the template, which can be more efficient, and also allows you to set arguments on that queue.
In this case, however, you must also provide a <reply-listener/> sub element.
Expand Down

0 comments on commit fbd3fee

Please sign in to comment.