Skip to content

Commit

Permalink
ARTEMIS-4241 paging + FQQN is broken
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and brusdev committed Apr 17, 2023
1 parent 05f5af6 commit 6734813
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.LongHashSet;
Expand Down Expand Up @@ -393,7 +394,7 @@ public void reloadStores() throws Exception {
public void deletePageStore(final SimpleString storeName) throws Exception {
syncLock.readLock().lock();
try {
PagingStore store = stores.remove(storeName);
PagingStore store = stores.remove(CompositeAddress.extractAddressName(storeName));
if (store != null) {
store.stop();
store.destroy();
Expand All @@ -407,7 +408,8 @@ public void deletePageStore(final SimpleString storeName) throws Exception {
* This method creates a new store if not exist.
*/
@Override
public PagingStore getPageStore(final SimpleString storeName) throws Exception {
public PagingStore getPageStore(final SimpleString rawStoreName) throws Exception {
final SimpleString storeName = CompositeAddress.extractAddressName(rawStoreName);
if (managementAddress != null && storeName.startsWith(managementAddress)) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.RetryRule;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.After;
Expand Down Expand Up @@ -1393,6 +1394,79 @@ public void testEmptyAddress() throws Exception {
}
}

@Test
public void testFqqn() throws Exception {
final SimpleString queue = RandomUtil.randomSimpleString();
SimpleString fqqn = CompositeAddress.toFullyQualified(ADDRESS, queue);
boolean persistentMessages = true;

clearDataRecreateServerDirs();

Configuration config = createDefaultInVMConfig().setJournalSyncNonTransactional(false);

server = createServer(true, config, PagingTest.PAGE_SIZE, PagingTest.PAGE_MAX, -1, -1);

server.start();

final int numberOfMessages = 1000;

locator = createInVMNonHALocator().setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setBlockOnAcknowledge(true);

sf = createSessionFactory(locator);

ClientSession session = sf.createSession(false, false, false);

session.createQueue(new QueueConfiguration(fqqn).setRoutingType(RoutingType.ANYCAST));

ClientProducer producer = session.createProducer(fqqn);

ClientMessage message = null;

byte[] body = new byte[MESSAGE_SIZE];

for (int i = 0; i < numberOfMessages; i++) {
message = session.createMessage(persistentMessages);

ActiveMQBuffer bodyLocal = message.getBodyBuffer();

bodyLocal.writeBytes(body);

message.putIntProperty(new SimpleString("id"), i);

producer.send(message);
if (i % 1000 == 0) {
session.commit();
}
}

session.commit();

Wait.assertTrue(server.getPagingManager().getPageStore(ADDRESS)::isPaging, 5000, 100);
assertEquals(ADDRESS, server.getPagingManager().getPageStore(ADDRESS).getAddress());

session.start();

ClientConsumer consumer = session.createConsumer(fqqn);

for (int i = 0; i < numberOfMessages; i++) {
message = consumer.receive(5000);
assertNotNull(message);
message.acknowledge();

assertEquals(i, message.getIntProperty("id").intValue());
if (i % 1000 == 0) {
session.commit();
}
}

session.commit();

Wait.assertFalse(server.getPagingManager().getPageStore(ADDRESS)::isPaging, 5000, 100);

server.getPagingManager().deletePageStore(fqqn);
assertFalse(Arrays.asList(server.getPagingManager().getStoreNames()).contains(ADDRESS));
}

@Test
public void testPurge() throws Exception {
clearDataRecreateServerDirs();
Expand Down

0 comments on commit 6734813

Please sign in to comment.