Skip to content
Permalink
Browse files
ARTEMIS-2692 refactor queue creation
This commit does the following:
- Deprecates existing overloaded createQueue, createSharedQueue,
  createTemporaryQueue, & updateQueue methods for ClientSession,
  ServerSession, ActiveMQServer, & ActiveMQServerControl where
  applicable.
- Deprecates QueueAttributes, QueueConfig, & CoreQueueConfiguration.
- Deprecates existing overloaded constructors for QueueImpl.
- Implements QueueConfiguration with JavaDoc to be the single,
  centralized configuration object for both client-side and broker-side
  queue creation including methods to convert to & from JSON for use in
  the management API.
- Implements new createQueue, createSharedQueue & updateQueue methods
  with JavaDoc for ClientSession, ServerSession, ActiveMQServer, &
  ActiveMQServerControl as well as a new constructor for QueueImpl all
  using the new QueueConfiguration object.
- Changes all internal broker code to use the new methods.
  • Loading branch information
jbertram committed Apr 13, 2020
1 parent 0dfbfaa commit 2efa44daf52994790737e21ee29ae830f8f0a12c
Showing 76 changed files with 2,607 additions and 633 deletions.
@@ -41,6 +41,7 @@
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@@ -415,7 +416,7 @@ private void oldBinding() throws Exception {
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));

if (!queueQuery.isExists()) {
session.createQueue(address, routingType, queueName, filter, true);
session.createQueue(new QueueConfiguration(queueName).setAddress(address).setRoutingType(routingType).setFilterString(filter));
if (logger.isDebugEnabled()) {
logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
}
@@ -457,7 +458,7 @@ private void bindQueue() throws Exception {
ClientSession.QueueQuery queueQuery = session.queueQuery(new SimpleString(queueName));

if (!queueQuery.isExists()) {
session.createQueue(address, RoutingType.valueOf(routingType), queueName, filter, true);
session.createQueue(new QueueConfiguration(queueName).setAddress(address).setRoutingType(RoutingType.valueOf(routingType)).setFilterString(filter));
if (logger.isDebugEnabled()) {
logger.debug("Binding queue(name=" + queueName + ", address=" + address + ", filter=" + filter + ")");
}
@@ -76,6 +76,10 @@
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-json_1.0_spec</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -43,23 +43,39 @@ public static String toParameterisedAddress(String address, Map<String, String>
}

private final SimpleString address;
private final QueueAttributes queueAttributes;
private final QueueConfiguration queueConfiguration;

public SimpleString getAddress() {
return address;
}

@Deprecated
public QueueAttributes getQueueAttributes() {
return queueAttributes;
return QueueAttributes.fromQueueConfiguration(queueConfiguration);
}

public QueueConfiguration getQueueConfiguration() {
return queueConfiguration;
}

@Deprecated
public ParameterisedAddress(SimpleString address, QueueAttributes queueAttributes) {
this.address = address;
this.queueAttributes = queueAttributes;
this.queueConfiguration = queueAttributes.toQueueConfiguration();
}

public ParameterisedAddress(SimpleString address, QueueConfiguration queueConfiguration) {
this.address = address;
this.queueConfiguration = queueConfiguration;
}

@Deprecated
public ParameterisedAddress(String address, QueueAttributes queueAttributes) {
this(SimpleString.toSimpleString(address), queueAttributes);
this(SimpleString.toSimpleString(address), queueAttributes.toQueueConfiguration());
}

public ParameterisedAddress(String address, QueueConfiguration queueConfiguration) {
this(SimpleString.toSimpleString(address), queueConfiguration);
}

public ParameterisedAddress(SimpleString address) {
@@ -70,21 +86,21 @@ public ParameterisedAddress(String address) {
int index = address.indexOf('?');
if (index == -1) {
this.address = SimpleString.toSimpleString(address);
this.queueAttributes = null;
this.queueConfiguration = null;
} else {
this.address = SimpleString.toSimpleString(address.substring(0, index));
QueueAttributes queueAttributes = new QueueAttributes();
QueueConfiguration queueConfiguration = new QueueConfiguration(address);
try {
parseQuery(address).forEach(queueAttributes::set);
parseQuery(address).forEach(queueConfiguration::set);
} catch (URISyntaxException use) {
throw new IllegalArgumentException("Malformed parameters in address " + address);
}
this.queueAttributes = queueAttributes;
this.queueConfiguration = queueConfiguration;
}
}

public boolean isParameterised() {
return this.queueAttributes != null;
return this.queueConfiguration != null;
}

public static boolean isParameterised(String address) {
@@ -19,6 +19,7 @@

import java.io.Serializable;

@Deprecated
public class QueueAttributes implements Serializable {

public static final String ROUTING_TYPE = "routing-type";
@@ -106,6 +107,56 @@ public void set(String key, String value) {
}
}

public QueueConfiguration toQueueConfiguration() {
return new QueueConfiguration("")
.setDurable(this.getDurable())
.setRoutingType(this.getRoutingType())
.setExclusive(this.getExclusive())
.setRingSize(this.getRingSize())
.setGroupRebalance(this.getGroupRebalance())
.setNonDestructive(this.getNonDestructive())
.setLastValue(this.getLastValue())
.setFilterString(this.getFilterString())
.setMaxConsumers(this.getMaxConsumers())
.setPurgeOnNoConsumers(this.getPurgeOnNoConsumers())
.setConsumersBeforeDispatch(this.getConsumersBeforeDispatch())
.setDelayBeforeDispatch(this.getDelayBeforeDispatch())
.setGroupBuckets(this.getGroupBuckets())
.setGroupFirstKey(this.getGroupFirstKey())
.setLastValueKey(this.getLastValueKey())
.setConsumerPriority(this.getConsumerPriority())
.setAutoDelete(this.getAutoDelete())
.setAutoDeleteMessageCount(this.getAutoDeleteMessageCount())
.setAutoDeleteDelay(this.getAutoDeleteDelay());
}

public static QueueAttributes fromQueueConfiguration(QueueConfiguration queueConfiguration) {
if (queueConfiguration == null) {
return null;
} else {
return new QueueAttributes()
.setDurable(queueConfiguration.isDurable())
.setRoutingType(queueConfiguration.getRoutingType())
.setExclusive(queueConfiguration.isExclusive())
.setRingSize(queueConfiguration.getRingSize())
.setGroupRebalance(queueConfiguration.isGroupRebalance())
.setNonDestructive(queueConfiguration.isNonDestructive())
.setLastValue(queueConfiguration.isLastValue())
.setFilterString(queueConfiguration.getFilterString())
.setMaxConsumers(queueConfiguration.getMaxConsumers())
.setPurgeOnNoConsumers(queueConfiguration.isPurgeOnNoConsumers())
.setConsumersBeforeDispatch(queueConfiguration.getConsumersBeforeDispatch())
.setDelayBeforeDispatch(queueConfiguration.getDelayBeforeDispatch())
.setGroupBuckets(queueConfiguration.getGroupBuckets())
.setGroupFirstKey(queueConfiguration.getGroupFirstKey())
.setLastValueKey(queueConfiguration.getLastValueKey())
.setConsumerPriority(queueConfiguration.getConsumerPriority())
.setAutoDelete(queueConfiguration.isAutoDelete())
.setAutoDeleteDelay(queueConfiguration.getAutoDeleteDelay())
.setAutoDeleteMessageCount(queueConfiguration.getAutoDeleteMessageCount());
}
}

public RoutingType getRoutingType() {
return routingType;
}

0 comments on commit 2efa44d

Please sign in to comment.