Skip to content

Commit

Permalink
ARTEMIS-4410 - process deliveries before removing consumer on session…
Browse files Browse the repository at this point in the history
… close, ensure strict order for a single consumer
  • Loading branch information
gtully authored and gemmellr committed Sep 1, 2023
1 parent 60ac0f3 commit b11945e
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,6 @@ public synchronized void close(final boolean failed) throws Exception {
del.finish();
}

removeItself();

List<MessageReference> refs = cancelRefs(failed, false, null);

Transaction tx = new TransactionImpl(storageManager);
Expand All @@ -587,6 +585,9 @@ public synchronized void close(final boolean failed) throws Exception {

tx.rollback();

// started is false, leaving remove till after cancel ensures order for a single exclusive consumer
removeItself();

addLingerRefs();

if (!browseOnly) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
Expand All @@ -50,21 +53,29 @@
public class JMSOrderTest extends JMSTestBase {

String protocol;

boolean exclusive;
ConnectionFactory protocolCF;

public JMSOrderTest(String protocol) {
public JMSOrderTest(String protocol, boolean exclusive) {
this.protocol = protocol;
this.exclusive = exclusive;
}

@Before
public void setupCF() {
protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616");
}

@Parameterized.Parameters(name = "protocol={0}")
@Parameterized.Parameters(name = "protocol={0}&exclusive={1}")
public static Collection getParameters() {
return Arrays.asList(new Object[][]{{"AMQP"}, {"OPENWIRE"}, {"CORE"}});
return Arrays.asList(new Object[][]{{"AMQP", true}, {"AMQP", false}, {"OPENWIRE", true}, {"OPENWIRE", false}, {"CORE", true}, {"CORE", false}});
}

@Override
protected void extraServerConfig(ActiveMQServer server) {
if (exclusive) {
server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setDefaultExclusiveQueue(true));
}
}

protected void sendToAmqQueue(int count) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import java.util.Map;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -46,7 +49,9 @@ public void setUp() throws Exception {
protected void configureAddressSettings(Map<String, AddressSettings> addressSettingsMap) {
super.configureAddressSettings(addressSettingsMap);
// force send to dlq early
addressSettingsMap.get("#").setMaxDeliveryAttempts(2);
addressSettingsMap.put("exampleQueue", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(2));
// force send to dlq late
addressSettingsMap.put("exampleQueueTwo", new AddressSettings().setAutoCreateQueues(false).setAutoCreateAddresses(false).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setAutoCreateAddresses(true).setMaxDeliveryAttempts(4000));
}

@Test(timeout = 60_000)
Expand Down Expand Up @@ -93,4 +98,72 @@ public void testConsumerSingleMessageLoop() throws Exception {
}
}
}

@Test(timeout = 60_000)
public void testExclusiveConsumerOrderOnReconnectionLargePrefetch() throws Exception {
Connection exConn = null;

SimpleString durableQueue = new SimpleString("exampleQueueTwo");
this.server.createQueue(new QueueConfiguration(durableQueue).setRoutingType(RoutingType.ANYCAST).setExclusive(true));

try {
ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
exFact.setWatchTopicAdvisories(false);

ActiveMQPrefetchPolicy prefetchPastMaxDeliveriesInLoop = new ActiveMQPrefetchPolicy();
prefetchPastMaxDeliveriesInLoop.setAll(2000);
exFact.setPrefetchPolicy(prefetchPastMaxDeliveriesInLoop);

RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(4000);
exFact.setRedeliveryPolicy(redeliveryPolicy);

Queue queue = new ActiveMQQueue("exampleQueueTwo");

exConn = exFact.createConnection();

exConn.start();

Session session = exConn.createSession(true, Session.AUTO_ACKNOWLEDGE);

MessageProducer producer = session.createProducer(queue);

TextMessage message = session.createTextMessage("This is a text message");

int numMessages = 2000;
for (int i = 0; i < numMessages; i++) {
message.setIntProperty("SEQ", i);
producer.send(message);
}
session.commit();
exConn.close();

final int batch = 100;
for (int i = 0; i < numMessages; i += batch) {
// connection per batch
exConn = exFact.createConnection();
exConn.start();

session = exConn.createSession(true, Session.SESSION_TRANSACTED);

MessageConsumer messageConsumer = session.createConsumer(queue);
TextMessage messageReceived = null;
for (int j = 0; j < batch; j++) { // a small batch
messageReceived = (TextMessage) messageConsumer.receive(5000);
Assert.assertNotNull("null @ i=" + i, messageReceived);
Assert.assertEquals(i + j, messageReceived.getIntProperty("SEQ"));

assertEquals("This is a text message", messageReceived.getText());
}
session.commit();

((FailoverTransport)((org.apache.activemq.ActiveMQConnection)exConn).getTransport().narrow(FailoverTransport.class)).stop();
exConn.close();
}
} finally {
if (exConn != null) {
exConn.close();
}
}
}
}

0 comments on commit b11945e

Please sign in to comment.