From 996e5c30c647442eb944a30e91e4148aa2a88faf Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 17 Nov 2017 16:34:02 -0500 Subject: [PATCH] ARTEMIS-1517 Clarify SendACK versus confirmationWindowSize https://issues.apache.org/jira/browse/ARTEMIS-1517 --- .../api/core/client/ClientProducer.java | 4 +-- .../core/client/ActiveMQClientLogger.java | 6 ++++ .../core/client/impl/ClientProducerImpl.java | 28 +++++++++++++------ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientProducer.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientProducer.java index c157fa30281..4faeba7cb9d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientProducer.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientProducer.java @@ -94,9 +94,9 @@ public interface ClientProducer extends AutoCloseable { /** * Sends a message to the specified address instead of the ClientProducer's address.
*
- * This message will be sent asynchronously. + * This message will be sent asynchronously as long as {@link ServerLocator#setConfirmationWindowSize(int)} was set. *

- * The handler will only get called if {@link ServerLocator#setConfirmationWindowSize(int) -1}. + * Notice that if no confirmationWindowsize is set * * @param address the address where the message will be sent * @param message the message to send diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index 53943926ceb..a06e221bc39 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -397,6 +397,12 @@ public interface ActiveMQClientLogger extends BasicLogger { format = Message.Format.MESSAGE_FORMAT) void unableToCheckKQueueAvailability(@Cause Throwable e); + @LogMessage(level = Logger.Level.WARN) + @Message(id = 212074, value = "SendAcknowledgementHandler will not be asynchronous without setting up confirmation window size", + format = Message.Format.MESSAGE_FORMAT) + void confirmationNotSet(); + + @LogMessage(level = Logger.Level.ERROR) @Message(id = 214000, value = "Failed to call onMessage", format = Message.Format.MESSAGE_FORMAT) void onMessageError(@Cause Throwable e); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java index 5504093a06d..0ad1999316c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientProducerImpl.java @@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle; import org.apache.activemq.artemis.core.message.LargeBodyEncoder; import org.apache.activemq.artemis.spi.core.remoting.SessionContext; @@ -43,6 +44,8 @@ public class ClientProducerImpl implements ClientProducerInternal { private static final Logger logger = Logger.getLogger(ClientProducerImpl.class); + private static boolean confirmationNotSetLogged = false; + private final SimpleString address; private final ClientSessionInternal session; @@ -116,14 +119,14 @@ public SimpleString getAddress() { public void send(final Message msg) throws ActiveMQException { checkClosed(); - doSend(null, msg, null, false); + doSend(null, msg, null); } @Override public void send(final SimpleString address1, final Message msg) throws ActiveMQException { checkClosed(); - doSend(address1, msg, null, false); + doSend(address1, msg, null); } @Override @@ -138,10 +141,20 @@ public void send(SimpleString address1, checkClosed(); boolean confirmationWindowEnabled = session.isConfirmationWindowEnabled(); if (confirmationWindowEnabled) { - doSend(address1, message, handler, true); + doSend(address1, message, handler); } else { - doSend(address1, message, null, true); + doSend(address1, message, null); if (handler != null) { + if (logger.isDebugEnabled()) { + logger.debug("Handler was used on producing messages towards address " + address1.toString() + " however there is no confirmationWindowEnabled"); + } + + if (!confirmationNotSetLogged) { + // will log thisonly once + ActiveMQClientLogger.LOGGER.confirmationNotSet(); + } + + // if there is no confirmation enabled, we will at least call the handler after the sent is done session.scheduleConfirmation(handler, message); } } @@ -209,8 +222,7 @@ private void doCleanup() { private void doSend(SimpleString sendingAddress, final Message msgToSend, - final SendAcknowledgementHandler handler, - final boolean forceAsync) throws ActiveMQException { + final SendAcknowledgementHandler handler) throws ActiveMQException { if (sendingAddress == null) { sendingAddress = this.address; } @@ -253,8 +265,8 @@ private void doSend(SimpleString sendingAddress, } final boolean sendBlockingConfig = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend; - final boolean forceAsyncOverride = handler != null; - final boolean sendBlocking = sendBlockingConfig && !forceAsyncOverride; + // if Handler != null, we will send non blocking + final boolean sendBlocking = sendBlockingConfig && handler == null; session.workDone();