Skip to content

Commit

Permalink
ARTEMIS-2007 - refactor to make use of existing refCountForConsumers …
Browse files Browse the repository at this point in the history
…for tracking consumer count and remove need for volatile redistributor
  • Loading branch information
gtully committed Sep 24, 2021
1 parent 28a1045 commit 72cfda6
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 80 deletions.
Expand Up @@ -21,7 +21,8 @@
public interface ResettableIterator<E> extends Iterator<E> {

/**
* Resets the iterator so you can re-iterate over all elements.
* Resets the iterator so that you can iterate over all elements from your current position.
* Your current position (when reached again) signals the end of iteration as if the collection is circular.
*/
void reset();
}
Expand Up @@ -178,14 +178,12 @@ default void setMirrorController(boolean mirrorController) {
}

/**
* This will set a reference counter for every consumer present on the queue.
* This will hold a reference counter for every consumer present on the queue.
* The ReferenceCounter will know what to do when the counter became zeroed.
* This is used to control what to do with temporary queues, especially
* on shared subscriptions where the queue needs to be deleted when all the
* consumers are closed.
*/
void setConsumersRefCount(ReferenceCounter referenceCounter);

ReferenceCounter getConsumersRefCount();

/* Called when a message is cancelled back into the queue */
Expand Down
Expand Up @@ -3805,12 +3805,6 @@ public Queue createQueue(final QueueConfiguration queueConfiguration, boolean ig

final Queue queue = queueFactory.createQueueWith(queueConfiguration, pagingManager);

if (queueConfiguration.isTransient()) {
queue.setConsumersRefCount(new TransientQueueManagerImpl(this, queue.getName()));
} else {
queue.setConsumersRefCount(new QueueManagerImpl(this, queue.getName()));
}

final QueueBinding localQueueBinding = new LocalQueueBinding(queue.getAddress(), queue, nodeManager.getNodeId());

long txID = 0;
Expand Down
Expand Up @@ -51,7 +51,6 @@
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.PostOfficeImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.NodeManager;
Expand Down Expand Up @@ -162,7 +161,6 @@ public void initQueues(Map<Long, QueueBindingInfo> queueBindingInfosMap,
.setRingSize(queueBindingInfo.getRingSize()),
pagingManager);

queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));

if (queueBindingInfo.getQueueStatusEncodings() != null) {
for (QueueStatusEncoding encoding : queueBindingInfo.getQueueStatusEncodings()) {
Expand Down
Expand Up @@ -179,7 +179,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {

protected final PageSubscription pageSubscription;

private ReferenceCounter refCountForConsumers;
private final ReferenceCounter refCountForConsumers;

private final PageIterator pageIterator;

Expand Down Expand Up @@ -218,17 +218,17 @@ private void checkIDSupplier(NodeStore<MessageReference> nodeStore) {

protected final ScheduledDeliveryHandler scheduledDeliveryHandler;

private AtomicLong messagesAdded = new AtomicLong(0);
private final AtomicLong messagesAdded = new AtomicLong(0);

private AtomicLong messagesAcknowledged = new AtomicLong(0);
private final AtomicLong messagesAcknowledged = new AtomicLong(0);

private AtomicLong ackAttempts = new AtomicLong(0);
private final AtomicLong ackAttempts = new AtomicLong(0);

private AtomicLong messagesExpired = new AtomicLong(0);
private final AtomicLong messagesExpired = new AtomicLong(0);

private AtomicLong messagesKilled = new AtomicLong(0);
private final AtomicLong messagesKilled = new AtomicLong(0);

private AtomicLong messagesReplaced = new AtomicLong(0);
private final AtomicLong messagesReplaced = new AtomicLong(0);

private boolean paused;

Expand Down Expand Up @@ -261,8 +261,8 @@ private void checkIDSupplier(NodeStore<MessageReference> nodeStore) {

private final SimpleString address;

// redistributor goes in the consumers list, this signals its presence and allows for easy comparison/check
private volatile ConsumerHolder<Redistributor> redistributor;
// redistributor singleton goes in the consumers list
private ConsumerHolder<Redistributor> redistributor;

private ScheduledFuture<?> redistributorFuture;

Expand Down Expand Up @@ -634,6 +634,7 @@ public QueueImpl(final QueueConfiguration queueConfiguration,
this.id = queueConfiguration.getId();

this.address = queueConfiguration.getAddress();
this.refCountForConsumers = queueConfiguration.isTransient() ? new TransientQueueManagerImpl(server, queueConfiguration.getName()) : new QueueManagerImpl(server, queueConfiguration.getName());

this.addressInfo = postOffice == null ? null : postOffice.getAddressInfo(address);

Expand Down Expand Up @@ -861,13 +862,6 @@ public void routeWithAck(Message message, RoutingContext context) {
}

// Queue implementation ----------------------------------------------------------------------------------------
@Override
public synchronized void setConsumersRefCount(final ReferenceCounter referenceCounter) {
if (refCountForConsumers == null) {
this.refCountForConsumers = referenceCounter;
}
}

@Override
public ReferenceCounter getConsumersRefCount() {
return refCountForConsumers;
Expand Down Expand Up @@ -1442,13 +1436,8 @@ public void addConsumer(final Consumer consumer) throws Exception {
if (delayBeforeDispatch >= 0) {
dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis());
}

}

if (refCountForConsumers != null) {
refCountForConsumers.increment();
}

}
}
}
Expand Down Expand Up @@ -1485,7 +1474,7 @@ public void removeConsumer(final Consumer consumer) {

if (consumerRemoved) {
consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis());
if (getConsumerCount() == 0) {
if (refCountForConsumers.decrement() == 0) {
stopDispatch();
}
}
Expand All @@ -1496,11 +1485,6 @@ public void removeConsumer(final Consumer consumer) {

groups.removeIf(consumer::equals);


if (refCountForConsumers != null) {
refCountForConsumers.decrement();
}

}
}
}
Expand Down Expand Up @@ -1557,7 +1541,7 @@ private void clearRedistributorFuture() {
@Override
public synchronized void cancelRedistributor() {
clearRedistributorFuture();

hasUnMatchedPending = false;
if (redistributor != null) {
try {
redistributor.consumer.stop();
Expand All @@ -1572,18 +1556,7 @@ public synchronized void cancelRedistributor() {

@Override
public int getConsumerCount() {
// we don't want to count the redistributor, it is an internal transient entry in the consumer list
if (redistributor != null) {
synchronized (this) {
final int size = consumers.size();
if (size > 0 && redistributor != null) {
return size - 1;
} else {
return size;
}
}
}
return consumers.size();
return refCountForConsumers.getCount();
}

@Override
Expand Down Expand Up @@ -3014,7 +2987,7 @@ private boolean deliver() {
synchronized (this) {

// Need to do these checks inside the synchronized
if (isPaused() || !canDispatch() && redistributor == null) {
if (isPaused() || !canDispatch()) {
return false;
}

Expand Down Expand Up @@ -3082,9 +3055,7 @@ private boolean deliver() {
numNoMatch = 0;
numAttempts = 0;

if (consumer != redistributor) {
ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
ref = handleMessageGroup(ref, consumer, groupConsumer, groupID);

deliveriesInTransit.countUp();

Expand Down Expand Up @@ -3118,7 +3089,7 @@ private boolean deliver() {
consumers.reset();
numNoMatch++;
// every attempt resulted in noMatch for number of consumers means we tried all consumers for a single message
if (numNoMatch == numAttempts && numAttempts == consumers.size()) {
if (numNoMatch == numAttempts && numAttempts == consumers.size() && redistributor == null) {
hasUnMatchedPending = true;
// one hit of unmatched message is enough, no need to reset counters
}
Expand Down Expand Up @@ -3753,7 +3724,7 @@ private boolean deliver(final MessageReference ref) {
if (!supportsDirectDeliver) {
return false;
}
if (isPaused() || !canDispatch() && redistributor == null) {
if (isPaused() || !canDispatch()) {
return false;
}

Expand All @@ -3777,12 +3748,7 @@ private boolean deliver(final MessageReference ref) {

HandleStatus status = handle(ref, consumer);
if (status == HandleStatus.HANDLED) {
final MessageReference reference;
if (consumer != redistributor) {
reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);
} else {
reference = ref;
}
final MessageReference reference = handleMessageGroup(ref, consumer, groupConsumer, groupID);

incrementMesssagesAdded();

Expand All @@ -3793,7 +3759,7 @@ private boolean deliver(final MessageReference ref) {
return true;
}

if (redistributor != null || groupConsumer != null) {
if (groupConsumer != null) {
break;
}
}
Expand Down
Expand Up @@ -39,6 +39,9 @@ public void addTest() {
assertFalse(queueConsumers.hasNext());

queueConsumers.add(testPriority);
// not visible till reset
assertFalse(queueConsumers.hasNext());

queueConsumers.reset();
assertTrue(queueConsumers.hasNext());

Expand Down Expand Up @@ -109,6 +112,31 @@ public void roundRobinTest() {

}

@Test
public void roundRobinEqualPriorityResetTest() {
queueConsumers.add(new TestPriority("A", 0));
queueConsumers.add(new TestPriority("B", 0));
queueConsumers.add(new TestPriority("C", 0));
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());

assertEquals("A", queueConsumers.next().getName());

//Reset iterator should mark start as current position
queueConsumers.reset();
assertTrue(queueConsumers.hasNext());
assertEquals("B", queueConsumers.next().getName());

assertTrue(queueConsumers.hasNext());
assertEquals("C", queueConsumers.next().getName());

//Expect another A as after reset, we started at B so after A, we then expect the next level
assertTrue(queueConsumers.hasNext());
assertEquals("A", queueConsumers.next().getName());

//We have iterated all.
assertFalse(queueConsumers.hasNext());
}



Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.junit.Assert;
import org.junit.Test;
Expand All @@ -50,6 +51,8 @@ public void deleteAllPagedReferencesTest() throws Exception {
PageSubscription pageSubscription = Mockito.mock(PageSubscription.class);
ExecutorService executorService = Executors.newSingleThreadExecutor();
StorageManager storageManager = Mockito.mock(StorageManager.class);
ActiveMQServer server = Mockito.mock(ActiveMQServer.class);
ExecutorFactory executorFactory = Mockito.mock(ExecutorFactory.class);

final int flushLimit = 100;
final int pagedReferences = 5 * flushLimit;
Expand All @@ -76,10 +79,13 @@ public void deleteAllPagedReferencesTest() throws Exception {
return null;
}).when(storageManager).afterCompleteOperations(Mockito.any(IOCallback.class));

// Mock server
Mockito.doReturn(executorFactory).when(server).getExecutorFactory();

QueueImpl queue = new QueueImpl(0, address, address, null, null, pageSubscription, null, false,
false, false, Mockito.mock(ScheduledExecutorService.class),
Mockito.mock(PostOffice.class), storageManager, null,
Mockito.mock(ArtemisExecutor.class), Mockito.mock(ActiveMQServer.class),
Mockito.mock(ArtemisExecutor.class), server,
Mockito.mock(QueueFactory.class));

Mockito.doReturn(queue).when(pageSubscription).getQueue();
Expand Down
Expand Up @@ -1129,11 +1129,6 @@ public long getRingSize() {
return 0;
}

@Override
public void setConsumersRefCount(ReferenceCounter referenceCounter) {

}

@Override
public ReferenceCounter getConsumersRefCount() {
return null;
Expand Down
Expand Up @@ -351,7 +351,7 @@ public void testForceDuplicationOnBindings() throws Exception {
// Forcing a situation where the server would unexpectedly create a duplicated queue. The server should still start normally
LocalQueueBinding newBinding = new LocalQueueBinding(QUEUE,
new QueueImpl(queueID, QUEUE, QUEUE, null, null, true, false,
false, null, null, null, null, null, null, null),
false, null, null, null, null, null, server, null),
server.getNodeID());
server.getStorageManager().addQueueBinding(txID, newBinding);
server.getStorageManager().commitBindings(txID);
Expand Down
Expand Up @@ -267,11 +267,6 @@ public int retryMessages(Filter filter) throws Exception {
return 0;
}

@Override
public void setConsumersRefCount(ReferenceCounter referenceCounter) {

}

@Override
public void setInternalQueue(boolean internalQueue) {
// no-op
Expand Down

0 comments on commit 72cfda6

Please sign in to comment.