diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index ee190c62d42..b0b3cc8f4e1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -807,7 +807,7 @@ public String createQueue(String address, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception { AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address == null ? name : address); - return createQueue(address, routingType, name, filterStr, durable, addressSettings.getDefaultMaxConsumers(), addressSettings.isDefaultPurgeOnNoConsumers(), addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), addressSettings.getDefaultConsumersBeforeDispatch(), addressSettings.getDefaultDelayBeforeDispatch(), addressSettings.isAutoCreateAddresses()); + return createQueue(address, routingType, name, filterStr, durable, maxConsumers, purgeOnNoConsumers, addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), addressSettings.getDefaultConsumersBeforeDispatch(), addressSettings.getDefaultDelayBeforeDispatch(), autoCreateAddress); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index f01e0c7639a..a71a862b5ab 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1766,7 +1766,7 @@ public void createSharedQueue(final SimpleString address, boolean exclusive, boolean lastValue) throws Exception { AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? name.toString() : address.toString()); - createSharedQueue(address, routingType, name, filterString, user, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch()); + createSharedQueue(address, routingType, name, filterString, user, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch()); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 265621741da..f7d1d3b27fb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -186,6 +186,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final List consumerList = new CopyOnWriteArrayList<>(); + private int getConsumerListSize() { + return consumerList.size() + (redistributor != null ? 1 : 0); + } + + private ConsumerHolder getConsumerList(int i) { + if (i == consumerList.size()) { + return redistributor; + } else { + return consumerList.get(i); + } + } + private final ScheduledDeliveryHandler scheduledDeliveryHandler; private AtomicLong messagesAdded = new AtomicLong(0); @@ -321,6 +333,10 @@ public String debug() { out.println("consumer: " + holder.consumer.debug()); } + if (redistributor != null) { + out.println("Redistributor::" + redistributor); + } + for (MessageReference reference : intermediateMessageReferences) { out.print("Intermediate reference:" + reference); } @@ -544,6 +560,7 @@ public boolean isExclusive() { @Override public synchronized void setExclusive(boolean exclusive) { + new Exception("exclusive set at " + exclusive).printStackTrace(); this.exclusive = exclusive; } @@ -1077,7 +1094,8 @@ public void removeConsumer(final Consumer consumer) { private boolean checkConsumerDirectDeliver() { boolean supports = true; - for (ConsumerHolder consumerCheck : consumerList) { + for (int i = 0; i < getConsumerListSize(); i++) { + ConsumerHolder consumerCheck = getConsumerList(i); if (!consumerCheck.consumer.supportsDirectDelivery()) { supports = false; } @@ -2157,7 +2175,8 @@ public synchronized int changeReferencesPriority(final Filter filter, final byte @Override public synchronized void resetAllIterators() { - for (ConsumerHolder holder : this.consumerList) { + for (int i = 0; i < getConsumerListSize(); i++) { + ConsumerHolder holder = getConsumerList(i); if (holder.iter != null) { holder.iter.close(); } @@ -2382,24 +2401,23 @@ private void deliver() { } ConsumerHolder holder; - if (redistributor == null) { + if (endPos < 0 || consumersChanged) { + consumersChanged = false; - if (endPos < 0 || consumersChanged) { - consumersChanged = false; + size = getConsumerListSize(); - size = consumerList.size(); + endPos = pos - 1; - endPos = pos - 1; - - if (endPos < 0) { - endPos = size - 1; - noDelivery = 0; - } + if (endPos < 0) { + endPos = size - 1; + noDelivery = 0; } + } - holder = consumerList.get(pos); - } else { + if (!canDispatch() && redistributor != null) { holder = redistributor; + } else { + holder = getConsumerList(exclusive ? 0 : pos); } Consumer consumer = holder.consumer; @@ -2446,10 +2464,6 @@ private void deliver() { } } - if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; - } - HandleStatus status = handle(ref, consumer); if (status == HandleStatus.HANDLED) { @@ -2987,13 +3001,13 @@ private boolean deliverDirect(final MessageReference ref) { int startPos = pos; - int size = consumerList.size(); + int size = getConsumerListSize(); while (true) { - ConsumerHolder holder; - if (redistributor == null) { - holder = consumerList.get(pos); - } else { + + ConsumerHolder holder = getConsumerList(exclusive ? 0 : pos); + if (!canDispatch() && redistributor != null) { + // if you can't dispatch, the only possible one is the redistributor holder = redistributor; } @@ -3013,10 +3027,6 @@ private boolean deliverDirect(final MessageReference ref) { } } - if (exclusive && redistributor == null) { - consumer = consumerList.get(0).consumer; - } - // Only move onto the next position if the consumer on the current position was used. if (!exclusive && groupConsumer == null) { pos++; @@ -3123,6 +3133,9 @@ private List cloneConsumersList() { synchronized (this) { consumerListClone = new ArrayList<>(consumerList); + if (redistributor != null) { + consumerListClone.add(redistributor); + } } return consumerListClone; }