Skip to content

Commit

Permalink
ARTEMIS-4520 JMSContext.acknowledge() doesn't work if last message re…
Browse files Browse the repository at this point in the history
…ceived is null
  • Loading branch information
jbertram authored and clebertsuconic committed Dec 1, 2023
1 parent 3bdef0e commit c858323
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void setMessageListener(MessageListener listener) throws JMSRuntimeExcept
@Override
public Message receive() {
try {
return context.setLastMessage(this, consumer.receive());
return context.setLastMessage(consumer.receive());
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
Expand All @@ -72,7 +72,7 @@ public Message receive() {
@Override
public Message receive(long timeout) {
try {
return context.setLastMessage(this, consumer.receive(timeout));
return context.setLastMessage(consumer.receive(timeout));
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
Expand All @@ -81,7 +81,7 @@ public Message receive(long timeout) {
@Override
public Message receiveNoWait() {
try {
return context.setLastMessage(this, consumer.receiveNoWait());
return context.setLastMessage(consumer.receiveNoWait());
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
}
Expand All @@ -100,7 +100,7 @@ public void close() {
public <T> T receiveBody(Class<T> c) {
try {
Message message = consumer.receive();
context.setLastMessage(ActiveMQJMSConsumer.this, message);
context.setLastMessage(message);
return message == null ? null : message.getBody(c);
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
Expand All @@ -111,7 +111,7 @@ public <T> T receiveBody(Class<T> c) {
public <T> T receiveBody(Class<T> c, long timeout) {
try {
Message message = consumer.receive(timeout);
context.setLastMessage(ActiveMQJMSConsumer.this, message);
context.setLastMessage(message);
return message == null ? null : message.getBody(c);
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
Expand All @@ -122,7 +122,7 @@ public <T> T receiveBody(Class<T> c, long timeout) {
public <T> T receiveBodyNoWait(Class<T> c) {
try {
Message message = consumer.receiveNoWait();
context.setLastMessage(ActiveMQJMSConsumer.this, message);
context.setLastMessage(message);
return message == null ? null : message.getBody(c);
} catch (JMSException e) {
throw JmsExceptionUtils.convertToRuntimeException(e);
Expand All @@ -139,7 +139,7 @@ final class MessageListenerWrapper implements MessageListener {

@Override
public void onMessage(Message message) {
context.setLastMessage(ActiveMQJMSConsumer.this, message);
context.setLastMessage(message);

context.getThreadAwareContext().setCurrentThread(false);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,8 @@ private synchronized void checkAutoStart() throws JMSException {
/**
* this is to ensure Context.acknowledge would work on ClientACK
*/
Message setLastMessage(final JMSConsumer consumer, final Message lastMessageReceived) {
if (sessionMode == CLIENT_ACKNOWLEDGE) {
Message setLastMessage(final Message lastMessageReceived) {
if (sessionMode == CLIENT_ACKNOWLEDGE && lastMessageReceived != null) {
lastMessagesWaitingAck = lastMessageReceived;
}
return lastMessageReceived;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,50 @@ public void recoverAckTest() throws Exception {
}
}

@Test
public void receiveNullAckTest() throws Exception {
// Create JMSContext with CLIENT_ACKNOWLEDGE

try (JMSContext context = cf.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) {
int numMessages = 10;

TextMessage textMessage = null;

// Create JMSConsumer from JMSContext
JMSConsumer consumer = context.createConsumer(queue1);

// Create JMSProducer from JMSContext
JMSProducer producer = context.createProducer();

// send messages
for (int i = 0; i < numMessages; i++) {
String message = "text message " + i;
textMessage = context.createTextMessage(message);
textMessage.setStringProperty("COM_SUN_JMS_TESTNAME", "recoverAckTest" + i);
producer.send(queue1, textMessage);
}

// receive messages but do not acknowledge
for (int i = 0; i < numMessages; i++) {
textMessage = (TextMessage) consumer.receive(5000);
assertNotNull(textMessage);
}

assertNull(consumer.receiveNoWait());

// Acknowledge all messages
context.acknowledge();
}

// doing this check with another context / consumer to make sure it was acked.
try (JMSContext context = cf.createContext(JMSContext.CLIENT_ACKNOWLEDGE)) {
// Create JMSConsumer from JMSContext
JMSConsumer consumer = context.createConsumer(queue1);

assertNull(consumer.receiveNoWait());
}
}

@Test
public void bytesMessage() throws Exception {
context = cf.createContext();
Expand Down

0 comments on commit c858323

Please sign in to comment.