Skip to content

Commit

Permalink
ARTEMIS-4421 Page counters should work before page rebuild is done
Browse files Browse the repository at this point in the history
  • Loading branch information
clebertsuconic committed Sep 10, 2023
1 parent 4b8c719 commit 263a44e
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1234,6 +1234,9 @@ final class MutableLong {

if (sub != null) {
sub.getCounter().loadValue(record.id, encoding.getValue(), encoding.getPersistentSize());
if (encoding.getValue() > 0) {
sub.notEmpty();
}
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingPage(encoding.getQueueID());
messageJournal.tryAppendDeleteRecord(record.id, this::recordNotFoundCallback, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.activemq.artemis.tests.integration.paging;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.transaction.xa.Xid;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
Expand All @@ -27,6 +30,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
Expand All @@ -37,13 +41,13 @@
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -481,6 +485,45 @@ public void testCommitCounter() throws Exception {

}


@Test
public void testSendNoRebuild() throws Exception {
Queue queue = server.createQueue(new QueueConfiguration(new SimpleString("A1")).setRoutingType(RoutingType.ANYCAST));

queue.getPagingStore().startPaging();

PageSubscriptionCounter counter = locateCounter(queue);

ConnectionFactory cf = CFUtil.createConnectionFactory("core", "tcp://localhost:61616");
try (Connection connection = cf.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue("A1"));
for (int i = 0; i < 3000; i++) {
producer.send(session.createTextMessage("i" + i));
}
session.commit();
}

server.stop();

server = newActiveMQServer();

server.setRebuildCounters(false);

server.start();

queue = server.locateQueue(new SimpleString("A1"));

assertNotNull(queue);

counter = locateCounter(queue);

logger.debug("Counter:: {}", queue.getMessageCount());

Wait.assertEquals(3000, counter::getValue);
Wait.assertEquals(3000L, queue::getMessageCount, 1000, 100);
}

@Override
@Before
public void setUp() throws Exception {
Expand All @@ -495,9 +538,9 @@ private ActiveMQServer newActiveMQServer() throws Exception {

OperationContextImpl.clearContext();

ActiveMQServer server = super.createServer(true, false);
ActiveMQServer server = super.createServer(true, true);

AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024);
AddressSettings defaultSetting = new AddressSettings().setPageSizeBytes(10 * 1024).setMaxSizeBytes(20 * 1024).setMaxReadPageMessages(10);

server.getAddressSettingsRepository().addMatch("#", defaultSetting);

Expand Down

0 comments on commit 263a44e

Please sign in to comment.