From aae549331fb96fb8e26998581974458eccf2b8ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Andr=C3=A9=20Pearce?= Date: Fri, 6 Jul 2018 07:44:15 +0100 Subject: [PATCH] ARTEMIS-856 - Support consumersBeforeDispatch and delayBeforeDispatch https://issues.apache.org/jira/browse/ARTEMIS-856 This is equivalent to consumersBeforeDispatchStarts and timeBeforeDispatchStarts in ActiveMQ 5.x http://activemq.apache.org/message-groups.html This is addressing one of the items on the artemis roadmap: http://activemq.apache.org/activemq-artemis-roadmap.html --- .../utils/AtomicBooleanFieldUpdater.java | 154 ++++++++++++ .../config/ActiveMQDefaultConfiguration.java | 12 + .../management/ActiveMQServerControl.java | 61 +++++ .../core/config/CoreQueueConfiguration.java | 38 ++- .../impl/FileConfigurationParser.java | 20 +- .../impl/ActiveMQServerControlImpl.java | 33 ++- .../core/persistence/QueueBindingInfo.java | 8 + .../AbstractJournalStorageManager.java | 2 +- .../codec/PersistentQueueBindingEncoding.java | 48 +++- .../artemis/core/postoffice/PostOffice.java | 2 + .../core/postoffice/impl/PostOfficeImpl.java | 10 + .../artemis/core/server/ActiveMQServer.java | 27 +++ .../activemq/artemis/core/server/Queue.java | 14 ++ .../artemis/core/server/QueueConfig.java | 39 ++- .../core/server/impl/ActiveMQServerImpl.java | 125 ++++++++-- .../core/server/impl/LastValueQueue.java | 4 +- .../server/impl/PostOfficeJournalLoader.java | 2 + .../core/server/impl/QueueFactoryImpl.java | 6 +- .../artemis/core/server/impl/QueueImpl.java | 120 +++++++++- .../core/settings/impl/AddressSettings.java | 70 +++++- .../schema/artemis-configuration.xsd | 20 ++ .../config/impl/FileConfigurationTest.java | 10 +- .../impl/ScheduledDeliveryHandlerTest.java | 35 +++ .../test/resources/artemis-configuration.xsd | 20 ++ .../jms/client/ConsumerDelayDispatchTest.java | 223 ++++++++++++++++++ .../ActiveMQServerControlUsingCoreTest.java | 10 + .../persistence/QueueConfigRestartTest.java | 46 ++++ .../unit/core/postoffice/impl/FakeQueue.java | 35 +++ .../server/impl/fakes/FakePostOffice.java | 2 + 29 files changed, 1145 insertions(+), 51 deletions(-) create mode 100644 artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AtomicBooleanFieldUpdater.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AtomicBooleanFieldUpdater.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AtomicBooleanFieldUpdater.java new file mode 100644 index 000000000000..f0a664ade828 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/AtomicBooleanFieldUpdater.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.utils; + +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import sun.reflect.CallerSensitive; + +public class AtomicBooleanFieldUpdater { + + /** + * Creates and returns an updater for objects with the given field. + * The Class argument is needed to check that reflective types and + * generic types match. + * + * @param tclass the class of the objects holding the field + * @param fieldName the name of the field to be updated + * @param the type of instances of tclass + * @return the updater + * @throws IllegalArgumentException if the field is not a + * volatile long type + * @throws RuntimeException with a nested reflection-based + * exception if the class does not hold field or is the wrong type, + * or the field is inaccessible to the caller according to Java language + * access control + */ + @CallerSensitive + public static AtomicBooleanFieldUpdater newUpdater(Class tclass, String fieldName) { + return new AtomicBooleanFieldUpdater<>(AtomicLongFieldUpdater.newUpdater(tclass, fieldName)); + } + + private static long toLong(boolean value) { + return value ? 1 : 0; + } + + private static boolean toBoolean(long value) { + return value != 0; + } + + private final AtomicLongFieldUpdater atomicLongFieldUpdater; + + /** + * Protected do-nothing constructor for use by subclasses. + */ + protected AtomicBooleanFieldUpdater(AtomicLongFieldUpdater atomicLongFieldUpdater) { + this.atomicLongFieldUpdater = atomicLongFieldUpdater; + } + + /** + * Atomically sets the field of the given object managed by this updater + * to the given updated value if the current value {@code ==} the + * expected value. This method is guaranteed to be atomic with respect to + * other calls to {@code compareAndSet} and {@code set}, but not + * necessarily with respect to other changes in the field. + * + * @param obj An object whose field to conditionally set + * @param expect the expected value + * @param update the new value + * @return {@code true} if successful + * @throws ClassCastException if {@code obj} is not an instance + * of the class possessing the field established in the constructor + */ + public boolean compareAndSet(T obj, boolean expect, boolean update) { + return atomicLongFieldUpdater.compareAndSet(obj, toLong(expect), toLong(update)); + } + + + + /** + * Atomically sets the field of the given object managed by this updater + * to the given updated value if the current value {@code ==} the + * expected value. This method is guaranteed to be atomic with respect to + * other calls to {@code compareAndSet} and {@code set}, but not + * necessarily with respect to other changes in the field. + * + *

May fail + * spuriously and does not provide ordering guarantees, so is + * only rarely an appropriate alternative to {@code compareAndSet}. + * + * @param obj An object whose field to conditionally set + * @param expect the expected value + * @param update the new value + * @return {@code true} if successful + * @throws ClassCastException if {@code obj} is not an instance + * of the class possessing the field established in the constructor + */ + public boolean weakCompareAndSet(T obj, boolean expect, boolean update) { + return atomicLongFieldUpdater.weakCompareAndSet(obj, toLong(expect), toLong(update)); + } + + /** + * Sets the field of the given object managed by this updater to the + * given updated value. This operation is guaranteed to act as a volatile + * store with respect to subsequent invocations of {@code compareAndSet}. + * + * @param obj An object whose field to set + * @param newValue the new value + */ + public void set(T obj, boolean newValue) { + atomicLongFieldUpdater.set(obj, toLong(newValue)); + } + + /** + * Eventually sets the field of the given object managed by this + * updater to the given updated value. + * + * @param obj An object whose field to set + * @param newValue the new value + * @since 1.6 + */ + public void lazySet(T obj, boolean newValue) { + atomicLongFieldUpdater.lazySet(obj, toLong(newValue)); + } + + /** + * Gets the current value held in the field of the given object managed + * by this updater. + * + * @param obj An object whose field to get + * @return the current value + */ + public boolean get(T obj) { + return toBoolean(atomicLongFieldUpdater.get(obj)); + } + + /** + * Atomically sets the field of the given object managed by this updater + * to the given value and returns the old value. + * + * @param obj An object whose field to get and set + * @param newValue the new value + * @return the previous value + */ + public boolean getAndSet(T obj, boolean newValue) { + return toBoolean(atomicLongFieldUpdater.getAndSet(obj, toLong(newValue))); + } + + public String toString(T obj) { + return Boolean.toString(get(obj)); + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java index 70ba3149ddf1..45397f386c93 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java @@ -473,6 +473,10 @@ public static String getDefaultHapolicyBackupStrategy() { public static final boolean DEFAULT_PURGE_ON_NO_CONSUMERS = false; + public static final int DEFAULT_CONSUMERS_BEFORE_DISPATCH = 0; + + public static final long DEFAULT_DELAY_BEFORE_DISPATCH = -1; + public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST; public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig."; @@ -1302,6 +1306,14 @@ public static boolean getDefaultPurgeOnNoConsumers() { return DEFAULT_PURGE_ON_NO_CONSUMERS; } + public static int getDefaultConsumersBeforeDispatch() { + return DEFAULT_CONSUMERS_BEFORE_DISPATCH; + } + + public static long getDefaultDelayBeforeDispatch() { + return DEFAULT_DELAY_BEFORE_DISPATCH; + } + public static String getInternalNamingPrefix() { return DEFAULT_INTERNAL_NAMING_PREFIX; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java index 234a2d514c87..6ce945c41134 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java @@ -581,6 +581,10 @@ void createQueue(@Parameter(name = "address", desc = "Address of the queue") Str * @param durable is the queue durable? * @param maxConsumers the maximum number of consumers allowed on this queue at any one time * @param purgeOnNoConsumers delete this queue when the last consumer disconnects + * @param exclusive if the queue should route exclusively to one consumer + * @param lastValue use last-value semantics + * @param consumersBeforeDispatch number of consumers needed before dispatch can start + * @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met * @param autoCreateAddress create an address with default values should a matching address not be found * @return a textual summary of the queue * @throws Exception @@ -593,8 +597,41 @@ String createQueue(@Parameter(name = "address", desc = "Address of the queue") S @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers, + @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") boolean exclusive, + @Parameter(name = "lastValue", desc = "Use last-value semantics") boolean lastValue, + @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") int consumersBeforeDispatch, + @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") long delayBeforeDispatch, @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception; + /** + * Create a queue. + *
+ * If {@code address} is {@code null} it will be defaulted to {@code name}. + *
+ * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits. + * + * @param address address to bind the queue to + * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} + * @param name name of the queue + * @param filterStr filter of the queue + * @param durable is the queue durable? + * @param maxConsumers the maximum number of consumers allowed on this queue at any one time + * @param purgeOnNoConsumers delete this queue when the last consumer disconnects + * @param autoCreateAddress create an address with default values should a matching address not be found + * @return a textual summary of the queue + * @throws Exception + */ + @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION) + String createQueue(@Parameter(name = "address", desc = "Address of the queue") String address, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, + @Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "filter", desc = "Filter of the queue") String filterStr, + @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers, + @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers, + @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception; + + /** * Update a queue. * @@ -651,6 +688,30 @@ String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive, @Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception; + /** + * Update a queue + * + * @param name name of the queue + * @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST} + * @param maxConsumers the maximum number of consumers allowed on this queue at any one time + * @param purgeOnNoConsumers delete this queue when the last consumer disconnects + * @param exclusive if the queue should route exclusively to one consumer + * @param consumersBeforeDispatch number of consumers needed before dispatch can start + * @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met + * @param user the user associated with this queue + * @return + * @throws Exception + */ + @Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION) + String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name, + @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType, + @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers, + @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers, + @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive, + @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch, + @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch, + @Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception; + /** * Deploy a durable queue. *
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java index f301b90e8e89..4fc5a21fe365 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java @@ -39,14 +39,16 @@ public class CoreQueueConfiguration implements Serializable { private Boolean lastValue; - private Integer maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); + private Integer maxConsumers; + + private Integer consumersBeforeDispatch; + + private Long delayBeforeDispatch; private Boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(); private RoutingType routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType(); - private boolean maxConsumerConfigured = false; - public CoreQueueConfiguration() { } @@ -78,13 +80,12 @@ public Boolean isLastValue() { return lastValue; } - public boolean isMaxConsumerConfigured() { - return maxConsumerConfigured; + public Integer getConsumersBeforeDispatch() { + return consumersBeforeDispatch; } - public CoreQueueConfiguration setMaxConsumerConfigured(boolean maxConsumerConfigured) { - this.maxConsumerConfigured = maxConsumerConfigured; - return this; + public Long getDelayBeforeDispatch() { + return delayBeforeDispatch; } /** @@ -127,6 +128,22 @@ public CoreQueueConfiguration setMaxConsumers(Integer maxConsumers) { return this; } + /** + * @param consumersBeforeDispatch for this queue, default is 0 (dispatch as soon as 1 consumer) + */ + public CoreQueueConfiguration setConsumersBeforeDispatch(Integer consumersBeforeDispatch) { + this.consumersBeforeDispatch = consumersBeforeDispatch; + return this; + } + + /** + * @param delayBeforeDispatch for this queue, default is 0 (start dispatch with no delay) + */ + public CoreQueueConfiguration setDelayBeforeDispatch(Long delayBeforeDispatch) { + this.delayBeforeDispatch = delayBeforeDispatch; + return this; + } + /** * @param purgeOnNoConsumers delete this queue when consumer count reaches 0, default is false */ @@ -157,7 +174,7 @@ public boolean getPurgeOnNoConsumers() { return purgeOnNoConsumers; } - public int getMaxConsumers() { + public Integer getMaxConsumers() { return maxConsumers; } @@ -182,7 +199,6 @@ public int hashCode() { result = prime * result + ((purgeOnNoConsumers == null) ? 0 : purgeOnNoConsumers.hashCode()); result = prime * result + ((exclusive == null) ? 0 : exclusive.hashCode()); result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode()); - result = prime * result + (maxConsumerConfigured ? 1331 : 1337); return result; } @@ -202,8 +218,6 @@ public boolean equals(Object obj) { return false; if (durable != other.durable) return false; - if (maxConsumerConfigured != other.maxConsumerConfigured) - return false; if (filterString == null) { if (other.filterString != null) return false; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java index 27badac4d021..dc110b0a1385 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java @@ -183,6 +183,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil { private static final String DEFAULT_EXCLUSIVE_NODE_NAME = "default-exclusive-queue"; + private static final String DEFAULT_CONSUMERS_BEFORE_DISPATCH = "default-consumers-before-dispatch"; + + private static final String DEFAULT_DELAY_BEFORE_DISPATCH = "default-delay-before-dispatch"; + private static final String REDISTRIBUTION_DELAY_NODE_NAME = "redistribution-delay"; private static final String SEND_TO_DLA_ON_NO_ROUTE = "send-to-dla-on-no-route"; @@ -1050,6 +1054,10 @@ protected Pair parseAddressSettings(final Node node) { addressSettings.setDefaultPurgeOnNoConsumers(XMLUtil.parseBoolean(child)); } else if (DEFAULT_MAX_CONSUMERS.equalsIgnoreCase(name)) { addressSettings.setDefaultMaxConsumers(XMLUtil.parseInt(child)); + } else if (DEFAULT_CONSUMERS_BEFORE_DISPATCH.equalsIgnoreCase(name)) { + addressSettings.setDefaultConsumersBeforeDispatch(XMLUtil.parseInt(child)); + } else if (DEFAULT_DELAY_BEFORE_DISPATCH.equalsIgnoreCase(name)) { + addressSettings.setDefaultDelayBeforeDispatch(XMLUtil.parseLong(child)); } else if (DEFAULT_QUEUE_ROUTING_TYPE.equalsIgnoreCase(name)) { String value = getTrimmedTextContent(child); Validators.ROUTING_TYPE.validate(DEFAULT_QUEUE_ROUTING_TYPE, value); @@ -1093,12 +1101,13 @@ protected CoreQueueConfiguration parseQueueConfiguration(final Node node) { String address = null; String filterString = null; boolean durable = true; - boolean maxConumserConfigured = false; - int maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(); + Integer maxConsumers = null; boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(); String user = null; Boolean exclusive = null; Boolean lastValue = null; + Integer consumersBeforeDispatch = null; + Long delayBeforeDispatch = null; NamedNodeMap attributes = node.getAttributes(); for (int i = 0; i < attributes.getLength(); i++) { @@ -1106,13 +1115,16 @@ protected CoreQueueConfiguration parseQueueConfiguration(final Node node) { if (item.getNodeName().equals("max-consumers")) { maxConsumers = Integer.parseInt(item.getNodeValue()); Validators.MAX_QUEUE_CONSUMERS.validate(name, maxConsumers); - maxConumserConfigured = true; } else if (item.getNodeName().equals("purge-on-no-consumers")) { purgeOnNoConsumers = Boolean.parseBoolean(item.getNodeValue()); } else if (item.getNodeName().equals("exclusive")) { exclusive = Boolean.parseBoolean(item.getNodeValue()); } else if (item.getNodeName().equals("last-value")) { lastValue = Boolean.parseBoolean(item.getNodeValue()); + } else if (item.getNodeName().equals("consumers-before-dispatch")) { + consumersBeforeDispatch = Integer.parseInt(item.getNodeValue()); + } else if (item.getNodeName().equals("delay-before-dispatch")) { + delayBeforeDispatch = Long.parseLong(item.getNodeValue()); } } @@ -1132,7 +1144,7 @@ protected CoreQueueConfiguration parseQueueConfiguration(final Node node) { } return new CoreQueueConfiguration().setAddress(address).setName(name).setFilterString(filterString).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setUser(user) - .setExclusive(exclusive).setLastValue(lastValue).setMaxConsumerConfigured(maxConumserConfigured); + .setExclusive(exclusive).setLastValue(lastValue); } protected CoreAddressConfiguration parseAddressConfiguration(final Node node) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index 9ed7acdf3de5..ee190c62d42d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -806,6 +806,23 @@ public String createQueue(String address, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception { + AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address == null ? name : address); + return createQueue(address, routingType, name, filterStr, durable, addressSettings.getDefaultMaxConsumers(), addressSettings.isDefaultPurgeOnNoConsumers(), addressSettings.isDefaultExclusiveQueue(), addressSettings.isDefaultLastValueQueue(), addressSettings.getDefaultConsumersBeforeDispatch(), addressSettings.getDefaultDelayBeforeDispatch(), addressSettings.isAutoCreateAddresses()); + } + + @Override + public String createQueue(String address, + String routingType, + String name, + String filterStr, + boolean durable, + int maxConsumers, + boolean purgeOnNoConsumers, + boolean exclusive, + boolean lastValue, + int consumersBeforeDispatch, + long delayBeforeDispatch, + boolean autoCreateAddress) throws Exception { checkStarted(); clearIO(); @@ -816,7 +833,7 @@ public String createQueue(String address, filter = new SimpleString(filterStr); } - final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, autoCreateAddress); + final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), new SimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress); return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString(); } catch (ActiveMQException e) { throw new IllegalStateException(e.getMessage()); @@ -851,12 +868,24 @@ public String updateQueue(String name, Boolean purgeOnNoConsumers, Boolean exclusive, String user) throws Exception { + return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user); + } + + @Override + public String updateQueue(String name, + String routingType, + Integer maxConsumers, + Boolean purgeOnNoConsumers, + Boolean exclusive, + Integer consumersBeforeDispatch, + Long delayBeforeDispatch, + String user) throws Exception { checkStarted(); clearIO(); try { - final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive, user); + final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, user); if (queue == null) { throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name)); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java index 7e18311d9d57..ebc86fc1bb71 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java @@ -62,6 +62,14 @@ public interface QueueBindingInfo { void setLastValue(boolean lastValue); + int getConsumersBeforeDispatch(); + + void setConsumersBeforeDispatch(int consumersBeforeDispatch); + + long getDelayBeforeDispatch(); + + void setDelayBeforeDispatch(long delayBeforeDispatch); + byte getRoutingType(); void setRoutingType(byte routingType); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java index dc2e20bfd9ec..7c821a98f693 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java @@ -1275,7 +1275,7 @@ private void internalQueueBinding(boolean update, final long tx, final Binding b SimpleString filterString = filter == null ? null : filter.getFilterString(); - PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getRoutingType().getType()); + PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType()); readLock(); try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java index 2ab43965a4a0..11503878a865 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java @@ -50,6 +50,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin public boolean lastValue; + public int consumersBeforeDispatch; + + public long delayBeforeDispatch; + public byte routingType; public PersistentQueueBindingEncoding() { @@ -76,6 +80,10 @@ public String toString() { exclusive + ", lastValue=" + lastValue + + ", consumersBeforeDispatch=" + + consumersBeforeDispatch + + ", delayBeforeDispatch=" + + delayBeforeDispatch + ", routingType=" + routingType + "]"; @@ -90,6 +98,8 @@ public PersistentQueueBindingEncoding(final SimpleString name, final boolean purgeOnNoConsumers, final boolean exclusive, final boolean lastValue, + final int consumersBeforeDispatch, + final long delayBeforeDispatch, final byte routingType) { this.name = name; this.address = address; @@ -100,6 +110,8 @@ public PersistentQueueBindingEncoding(final SimpleString name, this.purgeOnNoConsumers = purgeOnNoConsumers; this.exclusive = exclusive; this.lastValue = lastValue; + this.consumersBeforeDispatch = consumersBeforeDispatch; + this.delayBeforeDispatch = delayBeforeDispatch; this.routingType = routingType; } @@ -195,6 +207,26 @@ public void setLastValue(boolean lastValue) { this.lastValue = lastValue; } + @Override + public int getConsumersBeforeDispatch() { + return consumersBeforeDispatch; + } + + @Override + public void setConsumersBeforeDispatch(int consumersBeforeDispatch) { + this.consumersBeforeDispatch = consumersBeforeDispatch; + } + + @Override + public long getDelayBeforeDispatch() { + return delayBeforeDispatch; + } + + @Override + public void setDelayBeforeDispatch(long delayBeforeDispatch) { + this.delayBeforeDispatch = delayBeforeDispatch; + } + @Override public byte getRoutingType() { return routingType; @@ -246,6 +278,16 @@ public void decode(final ActiveMQBuffer buffer) { } else { lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue(); } + if (buffer.readableBytes() > 0) { + consumersBeforeDispatch = buffer.readInt(); + } else { + consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(); + } + if (buffer.readableBytes() > 0) { + delayBeforeDispatch = buffer.readLong(); + } else { + delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(); + } } @Override @@ -260,6 +302,8 @@ public void encode(final ActiveMQBuffer buffer) { buffer.writeByte(routingType); buffer.writeBoolean(exclusive); buffer.writeBoolean(lastValue); + buffer.writeInt(consumersBeforeDispatch); + buffer.writeLong(delayBeforeDispatch); } @Override @@ -271,7 +315,9 @@ public int getEncodeSize() { DataConstants.SIZE_BOOLEAN + DataConstants.SIZE_BYTE + DataConstants.SIZE_BOOLEAN + - DataConstants.SIZE_BOOLEAN; + DataConstants.SIZE_BOOLEAN + + DataConstants.INT + + DataConstants.LONG; } private SimpleString createMetadata() { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java index d95526dcb2a5..ce1fcfd172d5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java @@ -69,6 +69,8 @@ QueueBinding updateQueue(SimpleString name, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, + Integer consumersBeforeDispatch, + Long delayBeforeDispatch, SimpleString user) throws Exception; List listQueuesForAddress(SimpleString address) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java index 21a75048c566..247b9ed9bfb1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java @@ -468,6 +468,8 @@ public QueueBinding updateQueue(SimpleString name, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, + Integer consumersBeforeDispatch, + Long delayBeforeDispatch, SimpleString user) throws Exception { synchronized (addressLock) { final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name); @@ -512,6 +514,14 @@ public QueueBinding updateQueue(SimpleString name, changed = true; queue.setExclusive(exclusive); } + if (consumersBeforeDispatch != null && !consumersBeforeDispatch.equals(queue.getConsumersBeforeDispatch())) { + changed = true; + queue.setConsumersBeforeDispatch(consumersBeforeDispatch.intValue()); + } + if (delayBeforeDispatch != null && !delayBeforeDispatch.equals(queue.getDelayBeforeDispatch())) { + changed = true; + queue.setDelayBeforeDispatch(delayBeforeDispatch.longValue()); + } if (logger.isDebugEnabled()) { if (user == null && queue.getUser() != null) { logger.debug("Ignoring updating Queue to a NULL user"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java index c2e4cbf2b169..33f9ae8fbd53 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java @@ -336,6 +336,10 @@ void createSharedQueue(SimpleString address, RoutingType routingType, SimpleStri void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString name, SimpleString filterString, SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue) throws Exception; + void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString name, SimpleString filterString, + SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, + int consumersBeforeDispatch, long delayBeforeDispatch) throws Exception; + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception; @@ -346,6 +350,10 @@ Queue createQueue(SimpleString address, RoutingType routingType, SimpleString qu boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception; + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, + boolean lastValue, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception; + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception; @@ -358,6 +366,11 @@ Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, boolean autoCreateAddress) throws Exception; + Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, + SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, + Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, Integer consumersBeforeDispatch, + Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception; + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue, boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception; @@ -366,6 +379,11 @@ Queue createQueue(SimpleString address, RoutingType routingType, SimpleString qu SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue, boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, boolean autoCreateAddress) throws Exception; + Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue, + boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, int consumersBeforeDispatch, + long delayBeforeDispatch, boolean autoCreateAddress) throws Exception; + @Deprecated Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception; @@ -452,6 +470,15 @@ Queue updateQueue(String name, Boolean exclusive, String user) throws Exception; + Queue updateQueue(String name, + RoutingType routingType, + Integer maxConsumers, + Boolean purgeOnNoConsumers, + Boolean exclusive, + Integer consumersBeforeDispatch, + Long delayBeforeDispatch, + String user) throws Exception; + /* * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will * replace any factories with the same protocol diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 0e16718af64f..70051c0cb993 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -68,6 +68,20 @@ public interface Queue extends Bindable,CriticalComponent { void setPurgeOnNoConsumers(boolean value); + int getConsumersBeforeDispatch(); + + void setConsumersBeforeDispatch(int consumersBeforeDispatch); + + long getDelayBeforeDispatch(); + + void setDelayBeforeDispatch(long delayBeforeDispatch); + + long getDispatchStartTime(); + + boolean isDispatching(); + + void setDispatching(boolean dispatching); + boolean isExclusive(); void setExclusive(boolean value); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java index 75f859d409b7..c83e08a7a9c8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java @@ -40,6 +40,8 @@ public final class QueueConfig { private final boolean exclusive; private final boolean lastValue; private final boolean purgeOnNoConsumers; + private final int consumersBeforeDispatch; + private final long delayBeforeDispatch; public static final class Builder { @@ -57,6 +59,8 @@ public static final class Builder { private boolean exclusive; private boolean lastValue; private boolean purgeOnNoConsumers; + private int consumersBeforeDispatch; + private long delayBeforeDispatch; private Builder(final long id, final SimpleString name) { this(id, name, name); @@ -77,6 +81,8 @@ private Builder(final long id, final SimpleString name, final SimpleString addre this.exclusive = ActiveMQDefaultConfiguration.getDefaultExclusive(); this.lastValue = ActiveMQDefaultConfiguration.getDefaultLastValue(); this.purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(); + this.consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(); + this.delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(); validateState(); } @@ -138,6 +144,15 @@ public Builder lastValue(final boolean lastValue) { return this; } + public Builder consumersBeforeDispatch(final int consumersBeforeDispatch) { + this.consumersBeforeDispatch = consumersBeforeDispatch; + return this; + } + + public Builder delayBeforeDispatch(final long delayBeforeDispatch) { + this.delayBeforeDispatch = delayBeforeDispatch; + return this; + } public Builder purgeOnNoConsumers(final boolean purgeOnNoConsumers) { this.purgeOnNoConsumers = purgeOnNoConsumers; @@ -170,7 +185,7 @@ public QueueConfig build() { } else { pageSubscription = null; } - return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, purgeOnNoConsumers); + return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers); } } @@ -216,6 +231,8 @@ private QueueConfig(final long id, final int maxConsumers, final boolean exclusive, final boolean lastValue, + final int consumersBeforeDispatch, + final long delayBeforeDispatch, final boolean purgeOnNoConsumers) { this.id = id; this.address = address; @@ -231,6 +248,8 @@ private QueueConfig(final long id, this.exclusive = exclusive; this.lastValue = lastValue; this.maxConsumers = maxConsumers; + this.consumersBeforeDispatch = consumersBeforeDispatch; + this.delayBeforeDispatch = delayBeforeDispatch; } public long id() { @@ -289,6 +308,14 @@ public RoutingType deliveryMode() { return routingType; } + public int consumersBeforeDispatch() { + return consumersBeforeDispatch; + } + + public long delayBeforeDispatch() { + return delayBeforeDispatch; + } + @Override public boolean equals(Object o) { if (this == o) @@ -324,6 +351,12 @@ public boolean equals(Object o) { return false; if (purgeOnNoConsumers != that.purgeOnNoConsumers) return false; + if (consumersBeforeDispatch != that.consumersBeforeDispatch) + return false; + if (delayBeforeDispatch != that.delayBeforeDispatch) + return false; + if (purgeOnNoConsumers != that.purgeOnNoConsumers) + return false; return user != null ? user.equals(that.user) : that.user == null; } @@ -343,6 +376,8 @@ public int hashCode() { result = 31 * result + maxConsumers; result = 31 * result + (exclusive ? 1 : 0); result = 31 * result + (lastValue ? 1 : 0); + result = 31 * result + consumersBeforeDispatch; + result = 31 * result + Long.hashCode(delayBeforeDispatch); result = 31 * result + (purgeOnNoConsumers ? 1 : 0); return result; } @@ -363,6 +398,8 @@ public String toString() { + ", maxConsumers=" + maxConsumers + ", exclusive=" + exclusive + ", lastValue=" + lastValue + + ", consumersBeforeDispatch=" + consumersBeforeDispatch + + ", delayBeforeDispatch=" + delayBeforeDispatch + ", purgeOnNoConsumers=" + purgeOnNoConsumers + '}'; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 9359eccd2f62..84682b1e9bc8 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1663,6 +1663,23 @@ public Queue createQueue(final SimpleString address, return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, maxConsumers, purgeOnNoConsumers, autoCreateAddress); } + @Override + public Queue createQueue(final SimpleString address, + final RoutingType routingType, + final SimpleString queueName, + final SimpleString filter, + final boolean durable, + final boolean temporary, + final int maxConsumers, + final boolean purgeOnNoConsumers, + final boolean exclusive, + final boolean lastValue, + final int consumersBeforeDispatch, + final long delayBeforeDispatch, + final boolean autoCreateAddress) throws Exception { + return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, false, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress); + } + @Override @Deprecated public Queue createQueue(SimpleString address, @@ -1682,12 +1699,18 @@ public Queue createQueue(SimpleString address, @Override public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception { AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString()); - return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), autoCreateAddress); + return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress); } @Override public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, boolean autoCreateAddress) throws Exception { - return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, autoCreateAddress); + AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString()); + return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress); + } + + @Override + public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception { + return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress); } @@ -1696,7 +1719,15 @@ public Queue createQueue(SimpleString address, RoutingType routingType, SimpleSt SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue, boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception { AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString()); - return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), autoCreateAddress); + return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress); + } + + @Override + public Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter, + SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue, + boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, boolean autoCreateAddress) throws Exception { + AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString()); + return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress); } @@ -1732,6 +1763,23 @@ public void createSharedQueue(final SimpleString address, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue) throws Exception { + AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? name.toString() : address.toString()); + createSharedQueue(address, routingType, name, filterString, user, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch()); + } + + @Override + public void createSharedQueue(final SimpleString address, + RoutingType routingType, + final SimpleString name, + final SimpleString filterString, + final SimpleString user, + boolean durable, + int maxConsumers, + boolean purgeOnNoConsumers, + boolean exclusive, + boolean lastValue, + int consumersBeforeDispatch, + long delayBeforeDispatch) throws Exception { //force the old contract about address if (address == null) { throw new NullPointerException("address can't be null!"); @@ -1745,7 +1793,7 @@ public void createSharedQueue(final SimpleString address, } } - final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, true); + final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, consumersBeforeDispatch, delayBeforeDispatch, true); if (!queue.getAddress().equals(address)) { throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name); @@ -2541,21 +2589,22 @@ private void deployQueuesFromListCoreQueueConfiguration(List plugin.beforeCreateQueue(queueConfig) : null); @@ -2852,6 +2917,8 @@ public Queue createQueue(final SimpleString address, final boolean purgeOnNoConsumers, final boolean exclusive, final boolean lastValue, + final int consumersBeforeDispatch, + final long delayBeforeDispatch, final boolean autoCreateAddress) throws Exception { final QueueBinding binding = (QueueBinding) postOffice.getBinding(queueName); @@ -2893,7 +2960,20 @@ public Queue createQueue(final SimpleString address, throw ActiveMQMessageBundle.BUNDLE.invalidRoutingTypeForAddress(routingType, info.getName().toString(), info.getRoutingTypes()); } - final QueueConfig queueConfig = queueConfigBuilder.filter(filter).pagingManager(pagingManager).user(user).durable(durable).temporary(temporary).autoCreated(autoCreated).routingType(routingType).maxConsumers(maxConsumers).purgeOnNoConsumers(purgeOnNoConsumers).exclusive(exclusive).lastValue(lastValue).build(); + final QueueConfig queueConfig = queueConfigBuilder + .filter(filter) + .pagingManager(pagingManager) + .user(user) + .durable(durable) + .temporary(temporary) + .autoCreated(autoCreated).routingType(routingType) + .maxConsumers(maxConsumers) + .purgeOnNoConsumers(purgeOnNoConsumers) + .exclusive(exclusive) + .lastValue(lastValue) + .consumersBeforeDispatch(consumersBeforeDispatch) + .delayBeforeDispatch(delayBeforeDispatch) + .build(); callBrokerPlugins(hasBrokerPlugins() ? plugin -> plugin.beforeCreateQueue(queueConfig) : null); @@ -2963,14 +3043,27 @@ public Queue updateQueue(String name, return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, null, null); } + @Deprecated + @Override + public Queue updateQueue(String name, + RoutingType routingType, + Integer maxConsumers, + Boolean purgeOnNoConsumers, + Boolean exclusive, + String user) throws Exception { + return updateQueue(name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, null, null, user); + } + @Override public Queue updateQueue(String name, RoutingType routingType, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, + Integer consumersBeforeDispatch, + Long delayBeforeDispatch, String user) throws Exception { - final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, SimpleString.toSimpleString(user)); + final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, maxConsumers, purgeOnNoConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user)); if (queueBinding != null) { final Queue queue = queueBinding.getQueue(); return queue; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java index fc965912f0c8..7c2ffeedd998 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java @@ -63,6 +63,8 @@ public LastValueQueue(final long persistenceID, final RoutingType routingType, final Integer maxConsumers, final Boolean exclusive, + final Integer consumersBeforeDispatch, + final Long delayBeforeDispatch, final Boolean purgeOnNoConsumers, final ScheduledExecutorService scheduledExecutor, final PostOffice postOffice, @@ -71,7 +73,7 @@ public LastValueQueue(final long persistenceID, final ArtemisExecutor executor, final ActiveMQServer server, final QueueFactory factory) { - super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); + super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java index f9ec9647aa9d..59a318c0cfb2 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java @@ -153,6 +153,8 @@ public void initQueues(Map queueBindingInfosMap, .maxConsumers(queueBindingInfo.getMaxConsumers()) .exclusive(queueBindingInfo.isExclusive()) .lastValue(queueBindingInfo.isLastValue()) + .consumersBeforeDispatch(queueBindingInfo.getConsumersBeforeDispatch()) + .delayBeforeDispatch(queueBindingInfo.getDelayBeforeDispatch()) .routingType(RoutingType.getType(queueBindingInfo.getRoutingType())); final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build()); queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName())); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java index 7f23d099dfe3..24b36e6111b5 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java @@ -74,9 +74,9 @@ public void setPostOffice(final PostOffice postOffice) { public Queue createQueueWith(final QueueConfig config) { final Queue queue; if (config.isLastValue()) { - queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); + queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } else { - queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); + queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } server.getCriticalAnalyzer().add(queue); @@ -102,7 +102,7 @@ public Queue createQueue(final long persistenceID, Queue queue; if (addressSettings.isDefaultLastValueQueue()) { - queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); + queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } else { queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index fdb0ddd84bb3..bd5aad07a8bb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -39,6 +39,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQNullRefException; @@ -87,6 +88,7 @@ import org.apache.activemq.artemis.core.transaction.impl.BindingsTransactionImpl; import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.utils.AtomicBooleanFieldUpdater; import org.apache.activemq.artemis.utils.Env; import org.apache.activemq.artemis.utils.ReferenceCounter; import org.apache.activemq.artemis.utils.ReusableLatch; @@ -113,6 +115,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { protected static final int CRITICAL_CONSUMER = 3; private static final Logger logger = Logger.getLogger(QueueImpl.class); + private static final AtomicBooleanFieldUpdater dispatchingUpdater = AtomicBooleanFieldUpdater.newUpdater(QueueImpl.class, "dispatching"); + private static final AtomicLongFieldUpdater dispatchStartTimeUpdater = AtomicLongFieldUpdater.newUpdater(QueueImpl.class, "dispatchStartTime"); public static final int REDISTRIBUTOR_BATCH_SIZE = 100; @@ -270,6 +274,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { private final QueueFactory factory; + public volatile long dispatching = 0; + + public volatile long dispatchStartTime = -1; + + private int consumersBeforeDispatch = 0; + + private long delayBeforeDispatch = 0; + + /** * This is to avoid multi-thread races on calculating direct delivery, * to guarantee ordering will be always be correct @@ -415,6 +428,31 @@ public QueueImpl(final long id, final ArtemisExecutor executor, final ActiveMQServer server, final QueueFactory factory) { + this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, purgeOnNoConsumers, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory); + } + + public QueueImpl(final long id, + final SimpleString address, + final SimpleString name, + final Filter filter, + final PageSubscription pageSubscription, + final SimpleString user, + final boolean durable, + final boolean temporary, + final boolean autoCreated, + final RoutingType routingType, + final Integer maxConsumers, + final Boolean exclusive, + final Integer consumersBeforeDispatch, + final Long delayBeforeDispatch, + final Boolean purgeOnNoConsumers, + final ScheduledExecutorService scheduledExecutor, + final PostOffice postOffice, + final StorageManager storageManager, + final HierarchicalRepository addressSettingsRepository, + final ArtemisExecutor executor, + final ActiveMQServer server, + final QueueFactory factory) { super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS); this.id = id; @@ -443,6 +481,10 @@ public QueueImpl(final long id, this.purgeOnNoConsumers = purgeOnNoConsumers == null ? ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers() : purgeOnNoConsumers; + this.consumersBeforeDispatch = consumersBeforeDispatch == null ? ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch() : consumersBeforeDispatch; + + this.delayBeforeDispatch = delayBeforeDispatch == null ? ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch() : delayBeforeDispatch; + this.postOffice = postOffice; this.storageManager = storageManager; @@ -506,6 +548,48 @@ public synchronized void setExclusive(boolean exclusive) { this.exclusive = exclusive; } + @Override + public int getConsumersBeforeDispatch() { + return consumersBeforeDispatch; + } + + @Override + public synchronized void setConsumersBeforeDispatch(int consumersBeforeDispatch) { + this.consumersBeforeDispatch = consumersBeforeDispatch; + } + + @Override + public long getDelayBeforeDispatch() { + return delayBeforeDispatch; + } + + @Override + public synchronized void setDelayBeforeDispatch(long delayBeforeDispatch) { + this.delayBeforeDispatch = delayBeforeDispatch; + } + + @Override + public long getDispatchStartTime() { + return dispatchStartTimeUpdater.get(this); + } + + @Override + public boolean isDispatching() { + return dispatchingUpdater.get(this); + } + + @Override + public synchronized void setDispatching(boolean dispatching) { + if (dispatchingUpdater.compareAndSet(this, !dispatching, dispatching)) { + if (dispatching) { + dispatchStartTimeUpdater.set(this, System.currentTimeMillis()); + } else { + dispatchStartTimeUpdater.set(this, -1); + } + } + } + + @Override public boolean isLastValue() { return false; @@ -874,6 +958,21 @@ private boolean internalFlushExecutor(long timeout, boolean log) { } } + private boolean canDispatch() { + boolean canDispatch = dispatchingUpdater.get(this); + if (canDispatch) { + return true; + } else { + long currentDispatchStartTime = dispatchStartTimeUpdater.get(this); + if (currentDispatchStartTime != -1 && currentDispatchStartTime < System.currentTimeMillis()) { + dispatchingUpdater.set(this, true); + return true; + } else { + return false; + } + } + } + @Override public void addConsumer(final Consumer consumer) throws Exception { if (logger.isDebugEnabled()) { @@ -883,7 +982,6 @@ public void addConsumer(final Consumer consumer) throws Exception { enterCritical(CRITICAL_CONSUMER); try { synchronized (this) { - if (maxConsumers != MAX_CONSUMERS_UNLIMITED && consumersCount.get() >= maxConsumers) { throw ActiveMQMessageBundle.BUNDLE.maxConsumerLimitReachedForQueue(address, name); } @@ -899,7 +997,15 @@ public void addConsumer(final Consumer consumer) throws Exception { consumerList.add(new ConsumerHolder(consumer)); if (consumerSet.add(consumer)) { - consumersCount.incrementAndGet(); + int currentConsumerCount = consumersCount.incrementAndGet(); + if (delayBeforeDispatch >= 0) { + dispatchStartTimeUpdater.compareAndSet(this,-1, delayBeforeDispatch + System.currentTimeMillis()); + } + if (currentConsumerCount >= consumersBeforeDispatch) { + if (dispatchingUpdater.compareAndSet(this, false, true)) { + dispatchStartTimeUpdater.set(this, System.currentTimeMillis()); + } + } } if (refCountForConsumers != null) { @@ -938,7 +1044,11 @@ public void removeConsumer(final Consumer consumer) { } if (consumerSet.remove(consumer)) { - consumersCount.decrementAndGet(); + int currentConsumerCount = consumersCount.decrementAndGet(); + boolean stopped = dispatchingUpdater.compareAndSet(this,true, currentConsumerCount != 0); + if (stopped) { + dispatchStartTimeUpdater.set(this, -1); + } } LinkedList groupsToRemove = null; @@ -2272,7 +2382,7 @@ private void deliver() { synchronized (this) { // Need to do these checks inside the synchronized - if (paused || consumerList.isEmpty()) { + if (paused || !canDispatch()) { return; } @@ -2871,7 +2981,7 @@ private boolean deliverDirect(final MessageReference ref) { // this would protect any eventual bug return false; } - if (paused || consumerList.isEmpty()) { + if (paused || !canDispatch()) { return false; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java index c7021fdc4fb3..6fc5019c17d3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java @@ -170,6 +170,10 @@ public class AddressSettings implements Mergeable, Serializable private Boolean defaultPurgeOnNoConsumers = null; + private Integer defaultConsumersBeforeDispatch = null; + + private Long defaultDelayBeforeDispatch = null; + private RoutingType defaultQueueRoutingType = null; private RoutingType defaultAddressRoutingType = null; @@ -214,6 +218,8 @@ public AddressSettings(AddressSettings other) { this.maxSizeBytesRejectThreshold = other.maxSizeBytesRejectThreshold; this.defaultMaxConsumers = other.defaultMaxConsumers; this.defaultPurgeOnNoConsumers = other.defaultPurgeOnNoConsumers; + this.defaultConsumersBeforeDispatch = other.defaultConsumersBeforeDispatch; + this.defaultDelayBeforeDispatch = other.defaultDelayBeforeDispatch; this.defaultQueueRoutingType = other.defaultQueueRoutingType; this.defaultAddressRoutingType = other.defaultAddressRoutingType; } @@ -328,6 +334,24 @@ public AddressSettings setDefaultMaxConsumers(Integer defaultMaxConsumers) { return this; } + public int getDefaultConsumersBeforeDispatch() { + return defaultConsumersBeforeDispatch != null ? defaultConsumersBeforeDispatch : ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(); + } + + public AddressSettings setDefaultConsumersBeforeDispatch(Integer defaultConsumersBeforeDispatch) { + this.defaultConsumersBeforeDispatch = defaultConsumersBeforeDispatch; + return this; + } + + public long getDefaultDelayBeforeDispatch() { + return defaultDelayBeforeDispatch != null ? defaultDelayBeforeDispatch : ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(); + } + + public AddressSettings setDefaultDelayBeforeDispatch(Long defaultDelayBeforeDispatch) { + this.defaultDelayBeforeDispatch = defaultDelayBeforeDispatch; + return this; + } + public boolean isDefaultPurgeOnNoConsumers() { return defaultPurgeOnNoConsumers != null ? defaultPurgeOnNoConsumers : ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(); } @@ -667,6 +691,18 @@ public void merge(final AddressSettings merged) { if (defaultAddressRoutingType == null) { defaultAddressRoutingType = merged.defaultAddressRoutingType; } + if (defaultExclusiveQueue == null) { + defaultExclusiveQueue = merged.defaultExclusiveQueue; + } + if (defaultLastValueQueue == null) { + defaultLastValueQueue = merged.defaultLastValueQueue; + } + if (defaultConsumersBeforeDispatch == null) { + defaultConsumersBeforeDispatch = merged.defaultConsumersBeforeDispatch; + } + if (defaultDelayBeforeDispatch == null) { + defaultDelayBeforeDispatch = merged.defaultDelayBeforeDispatch; + } } @Override @@ -767,6 +803,14 @@ public void decode(ActiveMQBuffer buffer) { if (buffer.readableBytes() > 0) { defaultExclusiveQueue = BufferHelper.readNullableBoolean(buffer); } + + if (buffer.readableBytes() > 0) { + defaultConsumersBeforeDispatch = BufferHelper.readNullableInteger(buffer); + } + + if (buffer.readableBytes() > 0) { + defaultDelayBeforeDispatch = BufferHelper.readNullableLong(buffer); + } } @Override @@ -805,7 +849,9 @@ public int getEncodeSize() { BufferHelper.sizeOfNullableBoolean(defaultPurgeOnNoConsumers) + DataConstants.SIZE_BYTE + DataConstants.SIZE_BYTE + - BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue); + BufferHelper.sizeOfNullableBoolean(defaultExclusiveQueue) + + BufferHelper.sizeOfNullableInteger(defaultConsumersBeforeDispatch) + + BufferHelper.sizeOfNullableLong(defaultDelayBeforeDispatch); } @Override @@ -882,6 +928,10 @@ public void encode(ActiveMQBuffer buffer) { BufferHelper.writeNullableBoolean(buffer, defaultExclusiveQueue); + BufferHelper.writeNullableInteger(buffer, defaultConsumersBeforeDispatch); + + BufferHelper.writeNullableLong(buffer, defaultDelayBeforeDispatch); + } /* (non-Javadoc) @@ -928,6 +978,8 @@ public int hashCode() { result = prime * result + ((defaultPurgeOnNoConsumers == null) ? 0 : defaultPurgeOnNoConsumers.hashCode()); result = prime * result + ((defaultQueueRoutingType == null) ? 0 : defaultQueueRoutingType.hashCode()); result = prime * result + ((defaultAddressRoutingType == null) ? 0 : defaultAddressRoutingType.hashCode()); + result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode()); + result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode()); return result; } @@ -1133,6 +1185,18 @@ public boolean equals(Object obj) { return false; } else if (!defaultAddressRoutingType.equals(other.defaultAddressRoutingType)) return false; + + if (defaultConsumersBeforeDispatch == null) { + if (other.defaultConsumersBeforeDispatch != null) + return false; + } else if (!defaultConsumersBeforeDispatch.equals(other.defaultConsumersBeforeDispatch)) + return false; + + if (defaultDelayBeforeDispatch == null) { + if (other.defaultDelayBeforeDispatch != null) + return false; + } else if (!defaultDelayBeforeDispatch.equals(other.defaultDelayBeforeDispatch)) + return false; return true; } @@ -1212,6 +1276,10 @@ public String toString() { defaultQueueRoutingType + ", defaultAddressRoutingType=" + defaultAddressRoutingType + + ", defaultConsumersBeforeDispatch=" + + defaultConsumersBeforeDispatch + + ", defaultDelayBeforeDispatch=" + + defaultDelayBeforeDispatch + "]"; } } diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd index 831b4cb8ddd3..e96923d0f436 100644 --- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd +++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd @@ -510,6 +510,8 @@ + + @@ -2802,6 +2804,22 @@ + + + + the default number of consumers needed before dispatch can start for queues under the address. + + + + + + + + the default delay to wait before dispatching if number of consumers before dispatch is not met for queues under the address. + + + + @@ -3119,6 +3137,8 @@ + + diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java index 4cdd11c477e1..8fcac204cdbc 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java @@ -423,7 +423,8 @@ private void verifyAddresses() { assertEquals("color='blue'", queueConfiguration.getFilterString()); assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queueConfiguration.getPurgeOnNoConsumers()); assertEquals("addr1", queueConfiguration.getAddress()); - assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers()); + // If null, then default will be taken from address-settings (which defaults to ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers()) + assertEquals(null, queueConfiguration.getMaxConsumers()); // Addr 1 Queue 2 queueConfiguration = addressConfiguration.getQueueConfigurations().get(1); @@ -431,7 +432,7 @@ private void verifyAddresses() { assertEquals("q2", queueConfiguration.getName()); assertTrue(queueConfiguration.isDurable()); assertEquals("color='green'", queueConfiguration.getFilterString()); - assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers()); + assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers().intValue()); assertFalse(queueConfiguration.getPurgeOnNoConsumers()); assertEquals("addr1", queueConfiguration.getAddress()); @@ -449,7 +450,7 @@ private void verifyAddresses() { assertEquals("q3", queueConfiguration.getName()); assertTrue(queueConfiguration.isDurable()); assertEquals("color='red'", queueConfiguration.getFilterString()); - assertEquals(10, queueConfiguration.getMaxConsumers()); + assertEquals(10, queueConfiguration.getMaxConsumers().intValue()); assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queueConfiguration.getPurgeOnNoConsumers()); assertEquals("addr2", queueConfiguration.getAddress()); @@ -459,7 +460,8 @@ private void verifyAddresses() { assertEquals("q4", queueConfiguration.getName()); assertTrue(queueConfiguration.isDurable()); assertNull(queueConfiguration.getFilterString()); - assertEquals(ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), queueConfiguration.getMaxConsumers()); + // If null, then default will be taken from address-settings (which defaults to ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers()) + assertEquals(null, queueConfiguration.getMaxConsumers()); assertTrue(queueConfiguration.getPurgeOnNoConsumers()); assertEquals("addr2", queueConfiguration.getAddress()); diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index f45a1ddce2aa..96de8c761be6 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -793,6 +793,41 @@ public void setPurgeOnNoConsumers(boolean value) { } + @Override + public int getConsumersBeforeDispatch() { + return 0; + } + + @Override + public void setConsumersBeforeDispatch(int consumersBeforeDispatch) { + + } + + @Override + public long getDelayBeforeDispatch() { + return 0; + } + + @Override + public void setDelayBeforeDispatch(long delayBeforeDispatch) { + + } + + @Override + public long getDispatchStartTime() { + return 0; + } + + @Override + public boolean isDispatching() { + return false; + } + + @Override + public void setDispatching(boolean dispatching) { + + } + @Override public void setMaxConsumer(int maxConsumers) { diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd index 41c881e6de21..30de90ba4174 100644 --- a/artemis-tools/src/test/resources/artemis-configuration.xsd +++ b/artemis-tools/src/test/resources/artemis-configuration.xsd @@ -491,6 +491,8 @@ + + @@ -2498,6 +2500,22 @@ + + + + the default number of consumers needed before dispatch can start for queues under the address. + + + + + + + + the default delay to wait before dispatching if number of consumers before dispatch is not met for queues under the address. + + + + @@ -2769,6 +2787,8 @@ + + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java new file mode 100644 index 000000000000..4d2d195c5ecc --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.jms.client; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** + * Exclusive Test + */ +public class ConsumerDelayDispatchTest extends JMSTestBase { + + private SimpleString queueName = SimpleString.toSimpleString("jms.consumer.delay.queue"); + private SimpleString normalQueueName = SimpleString.toSimpleString("jms.noraml.queue"); + + private static final long DELAY_BEFORE_DISPATCH = 10000L; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, -1, false, true, false, 2, DELAY_BEFORE_DISPATCH, true); + server.createQueue(normalQueueName, RoutingType.ANYCAST, normalQueueName, null, null, true, false, false, false, false, -1, false, true, false, 0, -1, true); + + } + + + protected ConnectionFactory getCF() throws Exception { + return cf; + } + + @Test + public void testNoDelayOnDefault() throws Exception { + sendMessage(normalQueueName); + + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + + Destination queue = session.createQueue(normalQueueName.toString()); + MessageConsumer consumer1 = session.createConsumer(queue); + + Assert.assertNotNull(receive(consumer1)); + } finally { + connection.close(); + } + } + + @Test + public void testDelayBeforeDispatch() throws Exception { + sendMessage(queueName); + + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + + Destination queue = session.createQueue(queueName.toString()); + MessageConsumer consumer1 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer1)); + Thread.sleep(DELAY_BEFORE_DISPATCH); + + Assert.assertNotNull(receive(consumer1)); + } finally { + connection.close(); + } + } + + @Test + public void testConsumersBeforeDispatch() throws Exception { + sendMessage(queueName); + + + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + Destination queue = session.createQueue(queueName.toString()); + + MessageConsumer consumer1 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer1)); + + MessageConsumer consumer2 = session.createConsumer(queue); + + Assert.assertNotNull(receive(consumer1, consumer2)); + } finally { + connection.close(); + } + } + + + @Test + public void testContinueAndResetConsumer() throws Exception { + sendMessage(queueName); + + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + + try { + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + Destination queue = session.createQueue(queueName.toString()); + + MessageConsumer consumer1 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer1)); + + MessageConsumer consumer2 = session.createConsumer(queue); + + Assert.assertNotNull(receive(consumer1, consumer2)); + + consumer2.close(); + + //Ensure that now dispatch is active, if we close a consumer, dispatching continues. + sendMessage(queueName); + + Assert.assertNotNull(receive(consumer1)); + + //Stop all consumers, which should reset dispatch rules. + consumer1.close(); + + //Ensure that once all consumers are stopped, that dispatch rules reset and wait for min consumers. + sendMessage(queueName); + + MessageConsumer consumer3 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer3)); + + MessageConsumer consumer4 = session.createConsumer(queue); + + Assert.assertNotNull(receive(consumer3, consumer4)); + + + //Stop all consumers, which should reset dispatch rules. + consumer3.close(); + consumer4.close(); + + //Ensure that once all consumers are stopped, that dispatch rules reset and wait for delay. + sendMessage(queueName); + + MessageConsumer consumer5 = session.createConsumer(queue); + + Assert.assertNull(receive(consumer5)); + + Thread.sleep(DELAY_BEFORE_DISPATCH); + + Assert.assertNotNull(receive(consumer5)); + + } finally { + connection.close(); + } + } + + private Message receive(MessageConsumer consumer1) throws JMSException { + return consumer1.receive(1000); + } + + private Message receive(MessageConsumer consumer1, MessageConsumer consumer2) throws JMSException { + Message receivedMessage = receive(consumer1); + if (receivedMessage == null) { + receivedMessage = receive(consumer2); + } + return receivedMessage; + } + + public void sendMessage(SimpleString queue) throws Exception { + ConnectionFactory fact = getCF(); + Connection connection = fact.createConnection(); + try { + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + connection.start(); + + Destination destination = session.createQueue(queue.toString()); + MessageProducer producer = session.createProducer(destination); + + TextMessage message = session.createTextMessage(); + message.setText("Message"); + producer.send(message); + } finally { + connection.close(); + } + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java index ae24a45d2a37..2d78092a51c2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java @@ -155,6 +155,11 @@ public String updateQueue(@Parameter(name = "name", desc = "Name of the queue") return (String) proxy.invokeOperation("updateQueue", name, routingType, maxConsumers, purgeOnNoConsumers, exclusive, user); } + @Override + public String updateQueue(String name, String routingType, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user) throws Exception { + return null; + } + @Override public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception { proxy.invokeOperation("deleteAddress", name); @@ -188,6 +193,11 @@ public void createQueue(String address,String name, String filter, boolean durab proxy.invokeOperation("createQueue", address, name, filter, durable, routingType); } + @Override + public String createQueue(String address, String routingType, String name, String filterStr, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception { + return null; + } + @Override public void createQueue(final String address, final String name, final boolean durable) throws Exception { proxy.invokeOperation("createQueue", address, name, durable); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java index e3b179b9106c..ac2ed61669a4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java @@ -105,6 +105,52 @@ public void testQueueConfigExclusiveAndRestart() throws Exception { Assert.assertTrue(queueBinding2.getQueue().isExclusive()); } + @Test + public void testQueueConfigConsumersBeforeDispatchAndRestart() throws Exception { + int consumersBeforeDispatch = 5; + ActiveMQServer server = createServer(true); + + server.start(); + + SimpleString address = new SimpleString("test.address"); + SimpleString queue = new SimpleString("test.queue"); + + server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, consumersBeforeDispatch, -1, true); + + QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue); + Assert.assertEquals(consumersBeforeDispatch, queueBinding1.getQueue().getConsumersBeforeDispatch()); + + server.stop(); + + server.start(); + + QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue); + Assert.assertEquals(consumersBeforeDispatch, queueBinding1.getQueue().getConsumersBeforeDispatch()); + } + + @Test + public void testQueueConfigDelayBeforeDispatchAndRestart() throws Exception { + long delayBeforeDispatch = 5000L; + ActiveMQServer server = createServer(true); + + server.start(); + + SimpleString address = new SimpleString("test.address"); + SimpleString queue = new SimpleString("test.queue"); + + server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, 0, delayBeforeDispatch, true); + + QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue); + Assert.assertEquals(delayBeforeDispatch, queueBinding1.getQueue().getDelayBeforeDispatch()); + + server.stop(); + + server.start(); + + QueueBinding queueBinding2 = (QueueBinding)server.getPostOffice().getBinding(queue); + Assert.assertEquals(delayBeforeDispatch, queueBinding1.getQueue().getDelayBeforeDispatch()); + } + @Test public void testQueueConfigUserAndRestart() throws Exception { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java index 192d7009d161..71ced7f35a8e 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java @@ -48,6 +48,41 @@ public void setPurgeOnNoConsumers(boolean value) { } + @Override + public int getConsumersBeforeDispatch() { + return 0; + } + + @Override + public void setConsumersBeforeDispatch(int consumersBeforeDispatch) { + + } + + @Override + public long getDelayBeforeDispatch() { + return 0; + } + + @Override + public void setDelayBeforeDispatch(long delayBeforeDispatch) { + + } + + @Override + public long getDispatchStartTime() { + return 0; + } + + @Override + public boolean isDispatching() { + return false; + } + + @Override + public void setDispatching(boolean dispatching) { + + } + @Override public boolean isExclusive() { // no-op diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java index 3f350847d85a..44e5823b2b73 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java @@ -49,6 +49,8 @@ public QueueBinding updateQueue(SimpleString name, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, + Integer consumersBeforeDispatch, + Long delayBeforeDispatch, SimpleString user) throws Exception { return null; }