Skip to content

Commit

Permalink
ARTEMIS-856 - Support consumersBeforeDispatch and delayBeforeDispatch
Browse files Browse the repository at this point in the history
https://issues.apache.org/jira/browse/ARTEMIS-856

This is equivalent to consumersBeforeDispatchStarts and timeBeforeDispatchStarts in ActiveMQ 5.x

http://activemq.apache.org/message-groups.html

This is addressing one of the items on the artemis roadmap: http://activemq.apache.org/activemq-artemis-roadmap.html
  • Loading branch information
michaelandrepearce committed Jul 10, 2018
1 parent ae29edf commit aae5493
Show file tree
Hide file tree
Showing 29 changed files with 1,145 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;

import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import sun.reflect.CallerSensitive;

public class AtomicBooleanFieldUpdater<T> {

/**
* Creates and returns an updater for objects with the given field.
* The Class argument is needed to check that reflective types and
* generic types match.
*
* @param tclass the class of the objects holding the field
* @param fieldName the name of the field to be updated
* @param <U> the type of instances of tclass
* @return the updater
* @throws IllegalArgumentException if the field is not a
* volatile long type
* @throws RuntimeException with a nested reflection-based
* exception if the class does not hold field or is the wrong type,
* or the field is inaccessible to the caller according to Java language
* access control
*/
@CallerSensitive
public static <U> AtomicBooleanFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName) {
return new AtomicBooleanFieldUpdater<>(AtomicLongFieldUpdater.newUpdater(tclass, fieldName));
}

private static long toLong(boolean value) {
return value ? 1 : 0;
}

private static boolean toBoolean(long value) {
return value != 0;
}

private final AtomicLongFieldUpdater<T> atomicLongFieldUpdater;

/**
* Protected do-nothing constructor for use by subclasses.
*/
protected AtomicBooleanFieldUpdater(AtomicLongFieldUpdater<T> atomicLongFieldUpdater) {
this.atomicLongFieldUpdater = atomicLongFieldUpdater;
}

/**
* Atomically sets the field of the given object managed by this updater
* to the given updated value if the current value {@code ==} the
* expected value. This method is guaranteed to be atomic with respect to
* other calls to {@code compareAndSet} and {@code set}, but not
* necessarily with respect to other changes in the field.
*
* @param obj An object whose field to conditionally set
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
* @throws ClassCastException if {@code obj} is not an instance
* of the class possessing the field established in the constructor
*/
public boolean compareAndSet(T obj, boolean expect, boolean update) {
return atomicLongFieldUpdater.compareAndSet(obj, toLong(expect), toLong(update));
}



/**
* Atomically sets the field of the given object managed by this updater
* to the given updated value if the current value {@code ==} the
* expected value. This method is guaranteed to be atomic with respect to
* other calls to {@code compareAndSet} and {@code set}, but not
* necessarily with respect to other changes in the field.
*
* <p><a href="package-summary.html#weakCompareAndSet">May fail
* spuriously and does not provide ordering guarantees</a>, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param obj An object whose field to conditionally set
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
* @throws ClassCastException if {@code obj} is not an instance
* of the class possessing the field established in the constructor
*/
public boolean weakCompareAndSet(T obj, boolean expect, boolean update) {
return atomicLongFieldUpdater.weakCompareAndSet(obj, toLong(expect), toLong(update));
}

/**
* Sets the field of the given object managed by this updater to the
* given updated value. This operation is guaranteed to act as a volatile
* store with respect to subsequent invocations of {@code compareAndSet}.
*
* @param obj An object whose field to set
* @param newValue the new value
*/
public void set(T obj, boolean newValue) {
atomicLongFieldUpdater.set(obj, toLong(newValue));
}

/**
* Eventually sets the field of the given object managed by this
* updater to the given updated value.
*
* @param obj An object whose field to set
* @param newValue the new value
* @since 1.6
*/
public void lazySet(T obj, boolean newValue) {
atomicLongFieldUpdater.lazySet(obj, toLong(newValue));
}

/**
* Gets the current value held in the field of the given object managed
* by this updater.
*
* @param obj An object whose field to get
* @return the current value
*/
public boolean get(T obj) {
return toBoolean(atomicLongFieldUpdater.get(obj));
}

/**
* Atomically sets the field of the given object managed by this updater
* to the given value and returns the old value.
*
* @param obj An object whose field to get and set
* @param newValue the new value
* @return the previous value
*/
public boolean getAndSet(T obj, boolean newValue) {
return toBoolean(atomicLongFieldUpdater.getAndSet(obj, toLong(newValue)));
}

public String toString(T obj) {
return Boolean.toString(get(obj));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ public static String getDefaultHapolicyBackupStrategy() {

public static final boolean DEFAULT_PURGE_ON_NO_CONSUMERS = false;

public static final int DEFAULT_CONSUMERS_BEFORE_DISPATCH = 0;

public static final long DEFAULT_DELAY_BEFORE_DISPATCH = -1;

public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST;

public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig.";
Expand Down Expand Up @@ -1302,6 +1306,14 @@ public static boolean getDefaultPurgeOnNoConsumers() {
return DEFAULT_PURGE_ON_NO_CONSUMERS;
}

public static int getDefaultConsumersBeforeDispatch() {
return DEFAULT_CONSUMERS_BEFORE_DISPATCH;
}

public static long getDefaultDelayBeforeDispatch() {
return DEFAULT_DELAY_BEFORE_DISPATCH;
}

public static String getInternalNamingPrefix() {
return DEFAULT_INTERNAL_NAMING_PREFIX;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,10 @@ void createQueue(@Parameter(name = "address", desc = "Address of the queue") Str
* @param durable is the queue durable?
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
* @param purgeOnNoConsumers delete this queue when the last consumer disconnects
* @param exclusive if the queue should route exclusively to one consumer
* @param lastValue use last-value semantics
* @param consumersBeforeDispatch number of consumers needed before dispatch can start
* @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
* @param autoCreateAddress create an address with default values should a matching address not be found
* @return a textual summary of the queue
* @throws Exception
Expand All @@ -593,8 +597,41 @@ String createQueue(@Parameter(name = "address", desc = "Address of the queue") S
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers,
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") boolean exclusive,
@Parameter(name = "lastValue", desc = "Use last-value semantics") boolean lastValue,
@Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") int consumersBeforeDispatch,
@Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") long delayBeforeDispatch,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;

/**
* Create a queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
* @param name name of the queue
* @param filterStr filter of the queue
* @param durable is the queue durable?
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
* @param purgeOnNoConsumers delete this queue when the last consumer disconnects
* @param autoCreateAddress create an address with default values should a matching address not be found
* @return a textual summary of the queue
* @throws Exception
*/
@Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION)
String createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;


/**
* Update a queue.
*
Expand Down Expand Up @@ -651,6 +688,30 @@ String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
@Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception;

/**
* Update a queue
*
* @param name name of the queue
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
* @param purgeOnNoConsumers delete this queue when the last consumer disconnects
* @param exclusive if the queue should route exclusively to one consumer
* @param consumersBeforeDispatch number of consumers needed before dispatch can start
* @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
* @param user the user associated with this queue
* @return
* @throws Exception
*/
@Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
@Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch,
@Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch,
@Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception;

/**
* Deploy a durable queue.
* <br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ public class CoreQueueConfiguration implements Serializable {

private Boolean lastValue;

private Integer maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
private Integer maxConsumers;

private Integer consumersBeforeDispatch;

private Long delayBeforeDispatch;

private Boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();

private RoutingType routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType();

private boolean maxConsumerConfigured = false;

public CoreQueueConfiguration() {
}

Expand Down Expand Up @@ -78,13 +80,12 @@ public Boolean isLastValue() {
return lastValue;
}

public boolean isMaxConsumerConfigured() {
return maxConsumerConfigured;
public Integer getConsumersBeforeDispatch() {
return consumersBeforeDispatch;
}

public CoreQueueConfiguration setMaxConsumerConfigured(boolean maxConsumerConfigured) {
this.maxConsumerConfigured = maxConsumerConfigured;
return this;
public Long getDelayBeforeDispatch() {
return delayBeforeDispatch;
}

/**
Expand Down Expand Up @@ -127,6 +128,22 @@ public CoreQueueConfiguration setMaxConsumers(Integer maxConsumers) {
return this;
}

/**
* @param consumersBeforeDispatch for this queue, default is 0 (dispatch as soon as 1 consumer)
*/
public CoreQueueConfiguration setConsumersBeforeDispatch(Integer consumersBeforeDispatch) {
this.consumersBeforeDispatch = consumersBeforeDispatch;
return this;
}

/**
* @param delayBeforeDispatch for this queue, default is 0 (start dispatch with no delay)
*/
public CoreQueueConfiguration setDelayBeforeDispatch(Long delayBeforeDispatch) {
this.delayBeforeDispatch = delayBeforeDispatch;
return this;
}

/**
* @param purgeOnNoConsumers delete this queue when consumer count reaches 0, default is false
*/
Expand Down Expand Up @@ -157,7 +174,7 @@ public boolean getPurgeOnNoConsumers() {
return purgeOnNoConsumers;
}

public int getMaxConsumers() {
public Integer getMaxConsumers() {
return maxConsumers;
}

Expand All @@ -182,7 +199,6 @@ public int hashCode() {
result = prime * result + ((purgeOnNoConsumers == null) ? 0 : purgeOnNoConsumers.hashCode());
result = prime * result + ((exclusive == null) ? 0 : exclusive.hashCode());
result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
result = prime * result + (maxConsumerConfigured ? 1331 : 1337);
return result;
}

Expand All @@ -202,8 +218,6 @@ public boolean equals(Object obj) {
return false;
if (durable != other.durable)
return false;
if (maxConsumerConfigured != other.maxConsumerConfigured)
return false;
if (filterString == null) {
if (other.filterString != null)
return false;
Expand Down
Loading

0 comments on commit aae5493

Please sign in to comment.