diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 292cfc1e520..8afe3795375 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -21,6 +21,7 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -29,6 +30,7 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; @@ -187,7 +189,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // used to control if we should recalculate certain positions inside deliverAsync private volatile boolean consumersChanged = true; - private final List consumerList = new CopyOnWriteArrayList<>(); + private final List consumers = new CopyOnWriteArrayList<>(); private final ScheduledDeliveryHandler scheduledDeliveryHandler; @@ -226,7 +228,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final SimpleString address; - private ConsumerHolder redistributor; + private Redistributor redistributor; private ScheduledFuture redistributorFuture; @@ -236,8 +238,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile long consumerRemovedTimestamp = -1; - private final Set consumerSet = new HashSet<>(); - private final Map groups = new HashMap<>(); private volatile SimpleString expiryAddress; @@ -292,6 +292,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private volatile boolean nonDestructive; + + private volatile LinkedListIterator messageIterator; + + private synchronized void resetIterator() { + if (messageIterator != null) { + messageIterator.close(); + } + messageIterator = null; + } + /** * This is to avoid multi-thread races on calculating direct delivery, * to guarantee ordering will be always be correct @@ -327,8 +337,8 @@ public String debug() { out.println("queueMemorySize=" + queueMemorySize); - for (ConsumerHolder holder : consumerList) { - out.println("consumer: " + holder.consumer.debug()); + for (Consumer consumer : consumers) { + out.println("consumer: " + consumer.debug()); } for (MessageReference reference : intermediateMessageReferences) { @@ -1034,9 +1044,19 @@ public void addConsumer(final Consumer consumer) throws Exception { cancelRedistributor(); - consumerList.add(new ConsumerHolder(consumer)); + if (!consumers.contains(consumer)) { + boolean sameFilter = false; + for (Consumer consumer1 : consumers) { + if (Objects.equals(consumer1.getFilter(), consumer.getFilter())) { + sameFilter = true; + break; + } + } + if (!sameFilter) { + resetAllIterators(); + } - if (consumerSet.add(consumer)) { + consumers.add(consumer); int currentConsumerCount = consumersCount.incrementAndGet(); if (delayBeforeDispatch >= 0) { dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis()); @@ -1067,23 +1087,13 @@ public void removeConsumer(final Consumer consumer) { synchronized (this) { consumersChanged = true; - for (ConsumerHolder holder : consumerList) { - if (holder.consumer == consumer) { - if (holder.iter != null) { - holder.iter.close(); - } - consumerList.remove(holder); - break; - } - } - this.supportsDirectDeliver = checkConsumerDirectDeliver(); - if (pos > 0 && pos >= consumerList.size()) { - pos = consumerList.size() - 1; + if (pos > 0 && pos >= consumers.size()) { + pos = consumers.size() - 1; } - if (consumerSet.remove(consumer)) { + if (consumers.remove(consumer)) { int currentConsumerCount = consumersCount.decrementAndGet(); consumerRemovedTimestampUpdater.set(this, System.currentTimeMillis()); boolean stopped = dispatchingUpdater.compareAndSet(this, BooleanUtil.toInt(true), BooleanUtil.toInt(currentConsumerCount != 0)); @@ -1124,13 +1134,13 @@ public void removeConsumer(final Consumer consumer) { private boolean checkConsumerDirectDeliver() { boolean supports = true; - for (ConsumerHolder consumerCheck : consumerList) { - if (!consumerCheck.consumer.supportsDirectDelivery()) { + for (Consumer consumer : consumers) { + if (!consumer.supportsDirectDelivery()) { supports = false; } } if (redistributor != null) { - if (!redistributor.consumer.supportsDirectDelivery()) { + if (!redistributor.supportsDirectDelivery()) { supports = false; } } @@ -1147,7 +1157,7 @@ public synchronized void addRedistributor(final long delay) { } if (delay > 0) { - if (consumerSet.isEmpty()) { + if (consumers.isEmpty()) { DelayedAddRedistributor dar = new DelayedAddRedistributor(executor); redistributorFuture = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS); @@ -1168,7 +1178,7 @@ private void clearRedistributorFuture() { @Override public synchronized void cancelRedistributor() throws Exception { if (redistributor != null) { - redistributor.consumer.stop(); + redistributor.stop(); redistributor = null; } @@ -1194,7 +1204,7 @@ public long getConsumerRemovedTimestamp() { @Override public synchronized Set getConsumers() { - return new HashSet<>(consumerSet); + return new HashSet<>(consumers); } @Override @@ -1219,8 +1229,7 @@ public synchronized int getGroupCount() { @Override public boolean hasMatchingConsumer(final Message message) { - for (ConsumerHolder holder : consumerList) { - Consumer consumer = holder.consumer; + for (Consumer consumer : consumers) { if (consumer instanceof Redistributor) { continue; @@ -1367,14 +1376,14 @@ public synchronized List getScheduledMessages() { @Override public Map> getDeliveringMessages() { - List consumerListClone = cloneConsumersList(); + Collection consumers = cloneConsumersList(); Map> mapReturn = new HashMap<>(); - for (ConsumerHolder holder : consumerListClone) { - List msgs = holder.consumer.getDeliveringMessages(); + for (Consumer consumer : consumers) { + List msgs = consumer.getDeliveringMessages(); if (msgs != null && msgs.size() > 0) { - mapReturn.put(holder.consumer.toManagementString(), msgs); + mapReturn.put(consumer.toManagementString(), msgs); } } @@ -1856,8 +1865,8 @@ public void deleteQueue(boolean removeConsumers) throws Exception { postOffice.removeBinding(name, tx, true); if (removeConsumers) { - for (ConsumerHolder consumerHolder : consumerList) { - consumerHolder.consumer.disconnect(); + for (Consumer consumer : consumers) { + consumer.disconnect(); } } @@ -2228,12 +2237,7 @@ public synchronized int changeReferencesPriority(final Filter filter, final byte @Override public synchronized void resetAllIterators() { - for (ConsumerHolder holder : this.consumerList) { - holder.resetIterator(); - } - if (redistributor != null) { - redistributor.resetIterator(); - } + resetIterator(); } @Override @@ -2452,13 +2456,13 @@ private void deliver() { break; } - ConsumerHolder holder; + Consumer consumer; if (redistributor == null) { if (endPos < 0 || consumersChanged) { consumersChanged = false; - size = consumerList.size(); + size = consumers.size(); endPos = pos - 1; @@ -2467,21 +2471,19 @@ private void deliver() { noDelivery = 0; } } - - holder = consumerList.get(pos); + consumer = consumers.get(pos); } else { - holder = redistributor; + consumer = redistributor; } - Consumer consumer = holder.consumer; Consumer groupConsumer = null; - if (holder.iter == null) { - holder.iter = messageReferences.iterator(); + if (messageIterator == null) { + messageIterator = messageReferences.iterator(); } - if (holder.iter.hasNext()) { - ref = holder.iter.next(); + if (messageIterator.hasNext()) { + ref = messageIterator.next(); } else { ref = null; } @@ -2492,7 +2494,7 @@ private void deliver() { if (logger.isTraceEnabled()) { logger.trace("Reference " + ref + " being expired"); } - removeMessageReference(holder, ref); + removeMessageReference(ref); @@ -2518,7 +2520,7 @@ private void deliver() { } if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; + consumer = consumers.get(0); } HandleStatus status = handle(ref, consumer); @@ -2529,7 +2531,7 @@ private void deliver() { handledconsumer = consumer; - removeMessageReference(holder, ref); + removeMessageReference(ref); if (redistributor == null) { handleMessageGroup(ref, consumer, groupConsumer, groupID); @@ -2538,7 +2540,7 @@ private void deliver() { handled++; } else if (status == HandleStatus.BUSY) { try { - holder.iter.repeat(); + messageIterator.repeat(); } catch (NoSuchElementException e) { // this could happen if there was an exception on the queue handling // and it returned BUSY because of that exception @@ -2596,9 +2598,9 @@ private void deliver() { checkDepage(); } - protected void removeMessageReference(ConsumerHolder holder, MessageReference ref) { + protected void removeMessageReference(MessageReference ref) { if (!nonDestructive) { - holder.iter.remove(); + messageIterator.remove(); refRemoved(ref); } } @@ -2727,16 +2729,17 @@ private void depage(final boolean scheduleExpiry) { private void internalAddRedistributor(final ArtemisExecutor executor) { // create the redistributor only once if there are no local consumers - if (consumerSet.isEmpty() && redistributor == null) { + if (consumers.isEmpty() && redistributor == null) { if (logger.isTraceEnabled()) { logger.trace("QueueImpl::Adding redistributor on queue " + this.toString()); } + resetAllIterators(); - redistributor = (new ConsumerHolder(new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE))); + redistributor = new Redistributor(this, storageManager, postOffice, executor, QueueImpl.REDISTRIBUTOR_BATCH_SIZE); consumersChanged = true; - redistributor.consumer.start(); + redistributor.start(); deliverAsync(); } @@ -3082,18 +3085,16 @@ private boolean deliverDirect(final MessageReference ref) { int startPos = pos; - int size = consumerList.size(); + int size = consumers.size(); while (true) { - ConsumerHolder holder; + Consumer consumer; if (redistributor == null) { - holder = consumerList.get(pos); + consumer = consumers.get(pos); } else { - holder = redistributor; + consumer = redistributor; } - Consumer consumer = holder.consumer; - Consumer groupConsumer = null; // If a group id is set, then this overrides the consumer chosen round-robin @@ -3109,7 +3110,7 @@ private boolean deliverDirect(final MessageReference ref) { } if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; + consumer = consumers.get(0); } // Only move onto the next position if the consumer on the current position was used. @@ -3225,12 +3226,12 @@ private synchronized HandleStatus handle(final MessageReference reference, final return status; } - private List cloneConsumersList() { - List consumerListClone; + private List cloneConsumersList() { + List consumerListClone; synchronized (this) { if (redistributor == null) { - consumerListClone = new ArrayList<>(consumerList); + consumerListClone = new ArrayList<>(consumers); } else { consumerListClone = Collections.singletonList(redistributor); } @@ -3384,28 +3385,6 @@ public void onError(int errorCode, String errorMessage) { } - // Inner classes - // -------------------------------------------------------------------------- - - protected static class ConsumerHolder { - - ConsumerHolder(final T consumer) { - this.consumer = consumer; - } - - final T consumer; - - LinkedListIterator iter; - - private void resetIterator() { - if (iter != null) { - iter.close(); - } - iter = null; - } - - } - private class DelayedAddRedistributor implements Runnable { private final ArtemisExecutor executor1;