Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-856 - Support consumersBeforeDispatch and delayBeforeDispatch #2175

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils;

import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import sun.reflect.CallerSensitive;

public class AtomicBooleanFieldUpdater<T> {

/**
* Creates and returns an updater for objects with the given field.
* The Class argument is needed to check that reflective types and
* generic types match.
*
* @param tclass the class of the objects holding the field
* @param fieldName the name of the field to be updated
* @param <U> the type of instances of tclass
* @return the updater
* @throws IllegalArgumentException if the field is not a
* volatile long type
* @throws RuntimeException with a nested reflection-based
* exception if the class does not hold field or is the wrong type,
* or the field is inaccessible to the caller according to Java language
* access control
*/
@CallerSensitive
public static <U> AtomicBooleanFieldUpdater<U> newUpdater(Class<U> tclass, String fieldName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that is a sad that JDK isn't providing this class with a native implementation, but I would use directly AtomicIntegerFiledUpdater or AtomicReferenceFieldUpdater with Boolean values instead.
The reason is that there is an implicit behaviour of any SomethingFiledUpdater that can't be enforced here: the declaration of a volatile Something.
Instead, the real purpose of this class is to treat integer values like boolean ones so I suppose that using a plain statitc util class to convert boolean<->integer will make it simple while avoiding to provide a more complex concurrent util class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franz1981 isn't that too much nit picking? :)

I like the work as is.. as anything in life.. it can always be improved.. but the excellent is the enemy of the very good :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@franz1981 using Boolean you might as well then just use atomic boolean, which you commented eadlier on and rightly so to use atomic updater. As the point is to save having object reference overheads.

The point of the class is just to make it reusable if else where you want else you end up with lots of dupe code just for all the int to bool logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree about the reasons but given that the JVM doesn't trust final fields you won't get the same performance you would have just using the int updater. Given that, I suppose that everything that let it works is ok for me: the PR is well done as always :)

return new AtomicBooleanFieldUpdater<>(AtomicIntegerFieldUpdater.newUpdater(tclass, fieldName));
}

private static int toInt(boolean value) {
return value ? 1 : 0;
}

private static boolean toBoolean(int value) {
return value != 0;
}

private final AtomicIntegerFieldUpdater<T> atomicIntegerFieldUpdater;

/**
* Protected do-nothing constructor for use by subclasses.
*/
protected AtomicBooleanFieldUpdater(AtomicIntegerFieldUpdater<T> atomicIntegerFieldUpdater) {
this.atomicIntegerFieldUpdater = atomicIntegerFieldUpdater;
}

/**
* Atomically sets the field of the given object managed by this updater
* to the given updated value if the current value {@code ==} the
* expected value. This method is guaranteed to be atomic with respect to
* other calls to {@code compareAndSet} and {@code set}, but not
* necessarily with respect to other changes in the field.
*
* @param obj An object whose field to conditionally set
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
* @throws ClassCastException if {@code obj} is not an instance
* of the class possessing the field established in the constructor
*/
public boolean compareAndSet(T obj, boolean expect, boolean update) {
return atomicIntegerFieldUpdater.compareAndSet(obj, toInt(expect), toInt(update));
}



/**
* Atomically sets the field of the given object managed by this updater
* to the given updated value if the current value {@code ==} the
* expected value. This method is guaranteed to be atomic with respect to
* other calls to {@code compareAndSet} and {@code set}, but not
* necessarily with respect to other changes in the field.
*
* <p><a href="package-summary.html#weakCompareAndSet">May fail
* spuriously and does not provide ordering guarantees</a>, so is
* only rarely an appropriate alternative to {@code compareAndSet}.
*
* @param obj An object whose field to conditionally set
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful
* @throws ClassCastException if {@code obj} is not an instance
* of the class possessing the field established in the constructor
*/
public boolean weakCompareAndSet(T obj, boolean expect, boolean update) {
return atomicIntegerFieldUpdater.weakCompareAndSet(obj, toInt(expect), toInt(update));
}

/**
* Sets the field of the given object managed by this updater to the
* given updated value. This operation is guaranteed to act as a volatile
* store with respect to subsequent invocations of {@code compareAndSet}.
*
* @param obj An object whose field to set
* @param newValue the new value
*/
public void set(T obj, boolean newValue) {
atomicIntegerFieldUpdater.set(obj, toInt(newValue));
}

/**
* Eventually sets the field of the given object managed by this
* updater to the given updated value.
*
* @param obj An object whose field to set
* @param newValue the new value
* @since 1.6
*/
public void lazySet(T obj, boolean newValue) {
atomicIntegerFieldUpdater.lazySet(obj, toInt(newValue));
}

/**
* Gets the current value held in the field of the given object managed
* by this updater.
*
* @param obj An object whose field to get
* @return the current value
*/
public boolean get(T obj) {
return toBoolean(atomicIntegerFieldUpdater.get(obj));
}

/**
* Atomically sets the field of the given object managed by this updater
* to the given value and returns the old value.
*
* @param obj An object whose field to get and set
* @param newValue the new value
* @return the previous value
*/
public boolean getAndSet(T obj, boolean newValue) {
return toBoolean(atomicIntegerFieldUpdater.getAndSet(obj, toInt(newValue)));
}

public String toString(T obj) {
return Boolean.toString(get(obj));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,10 @@ public static String getDefaultHapolicyBackupStrategy() {

public static final boolean DEFAULT_PURGE_ON_NO_CONSUMERS = false;

public static final int DEFAULT_CONSUMERS_BEFORE_DISPATCH = 0;

public static final long DEFAULT_DELAY_BEFORE_DISPATCH = -1;

public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST;

public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig.";
Expand Down Expand Up @@ -1302,6 +1306,14 @@ public static boolean getDefaultPurgeOnNoConsumers() {
return DEFAULT_PURGE_ON_NO_CONSUMERS;
}

public static int getDefaultConsumersBeforeDispatch() {
return DEFAULT_CONSUMERS_BEFORE_DISPATCH;
}

public static long getDefaultDelayBeforeDispatch() {
return DEFAULT_DELAY_BEFORE_DISPATCH;
}

public static String getInternalNamingPrefix() {
return DEFAULT_INTERNAL_NAMING_PREFIX;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,10 @@ void createQueue(@Parameter(name = "address", desc = "Address of the queue") Str
* @param durable is the queue durable?
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
* @param purgeOnNoConsumers delete this queue when the last consumer disconnects
* @param exclusive if the queue should route exclusively to one consumer
* @param lastValue use last-value semantics
* @param consumersBeforeDispatch number of consumers needed before dispatch can start
* @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
* @param autoCreateAddress create an address with default values should a matching address not be found
* @return a textual summary of the queue
* @throws Exception
Expand All @@ -593,8 +597,41 @@ String createQueue(@Parameter(name = "address", desc = "Address of the queue") S
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers,
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") boolean exclusive,
@Parameter(name = "lastValue", desc = "Use last-value semantics") boolean lastValue,
@Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") int consumersBeforeDispatch,
@Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") long delayBeforeDispatch,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;

/**
* Create a queue.
* <br>
* If {@code address} is {@code null} it will be defaulted to {@code name}.
* <br>
* This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
*
* @param address address to bind the queue to
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
* @param name name of the queue
* @param filterStr filter of the queue
* @param durable is the queue durable?
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
* @param purgeOnNoConsumers delete this queue when the last consumer disconnects
* @param autoCreateAddress create an address with default values should a matching address not be found
* @return a textual summary of the queue
* @throws Exception
*/
@Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION)
String createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers,
@Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress) throws Exception;


/**
* Update a queue.
*
Expand Down Expand Up @@ -651,6 +688,30 @@ String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
@Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception;

/**
* Update a queue
*
* @param name name of the queue
* @param routingType the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
* @param maxConsumers the maximum number of consumers allowed on this queue at any one time
* @param purgeOnNoConsumers delete this queue when the last consumer disconnects
* @param exclusive if the queue should route exclusively to one consumer
* @param consumersBeforeDispatch number of consumers needed before dispatch can start
* @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
* @param user the user associated with this queue
* @return
* @throws Exception
*/
@Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
@Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
@Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
@Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
@Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch,
@Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch,
@Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception;

/**
* Deploy a durable queue.
* <br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,16 @@ public class CoreQueueConfiguration implements Serializable {

private Boolean lastValue;

private Integer maxConsumers = ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers();
private Integer maxConsumers;

private Integer consumersBeforeDispatch;

private Long delayBeforeDispatch;

private Boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();

private RoutingType routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType();

private boolean maxConsumerConfigured = false;

public CoreQueueConfiguration() {
}

Expand Down Expand Up @@ -78,13 +80,12 @@ public Boolean isLastValue() {
return lastValue;
}

public boolean isMaxConsumerConfigured() {
return maxConsumerConfigured;
public Integer getConsumersBeforeDispatch() {
return consumersBeforeDispatch;
}

public CoreQueueConfiguration setMaxConsumerConfigured(boolean maxConsumerConfigured) {
this.maxConsumerConfigured = maxConsumerConfigured;
return this;
public Long getDelayBeforeDispatch() {
return delayBeforeDispatch;
}

/**
Expand Down Expand Up @@ -127,6 +128,22 @@ public CoreQueueConfiguration setMaxConsumers(Integer maxConsumers) {
return this;
}

/**
* @param consumersBeforeDispatch for this queue, default is 0 (dispatch as soon as 1 consumer)
*/
public CoreQueueConfiguration setConsumersBeforeDispatch(Integer consumersBeforeDispatch) {
this.consumersBeforeDispatch = consumersBeforeDispatch;
return this;
}

/**
* @param delayBeforeDispatch for this queue, default is 0 (start dispatch with no delay)
*/
public CoreQueueConfiguration setDelayBeforeDispatch(Long delayBeforeDispatch) {
this.delayBeforeDispatch = delayBeforeDispatch;
return this;
}

/**
* @param purgeOnNoConsumers delete this queue when consumer count reaches 0, default is false
*/
Expand Down Expand Up @@ -157,7 +174,7 @@ public boolean getPurgeOnNoConsumers() {
return purgeOnNoConsumers;
}

public int getMaxConsumers() {
public Integer getMaxConsumers() {
return maxConsumers;
}

Expand All @@ -182,7 +199,6 @@ public int hashCode() {
result = prime * result + ((purgeOnNoConsumers == null) ? 0 : purgeOnNoConsumers.hashCode());
result = prime * result + ((exclusive == null) ? 0 : exclusive.hashCode());
result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
result = prime * result + (maxConsumerConfigured ? 1331 : 1337);
return result;
}

Expand All @@ -202,8 +218,6 @@ public boolean equals(Object obj) {
return false;
if (durable != other.durable)
return false;
if (maxConsumerConfigured != other.maxConsumerConfigured)
return false;
if (filterString == null) {
if (other.filterString != null)
return false;
Expand Down
Loading