Skip to content

Commit

Permalink
ARTEMIS-4259 JMS consumer + FQQN + selector not working
Browse files Browse the repository at this point in the history
co-authored with Justin Bertram
  • Loading branch information
clebertsuconic committed Jun 7, 2023
1 parent f03b775 commit cea9ff6
Show file tree
Hide file tree
Showing 7 changed files with 524 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -840,26 +840,15 @@ private ActiveMQMessageConsumer createConsumer(final ActiveMQDestination dest,
throw new RuntimeException("Subscription name cannot be null for durable topic consumer");
// Non durable sub

queueName = new SimpleString(UUID.randomUUID().toString());

if (!CompositeAddress.isFullyQualified(dest.getAddress())) {
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
if (CompositeAddress.isFullyQualified(dest.getAddress())) {
queueName = createFQQNSubscription(dest, coreFilterString, response);
} else {
if (!response.isExists() || !response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, dest.getSimpleAddress()))) {
if (response.isAutoCreateQueues()) {
try {
createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), null, true, true, response);
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet
}
} else {
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
}
}
queueName = CompositeAddress.extractQueueName(dest.getSimpleAddress());
queueName = new SimpleString(UUID.randomUUID().toString());
createTemporaryQueue(dest, RoutingType.MULTICAST, queueName, coreFilterString, response);
}

consumer = createClientConsumer(dest, queueName, null);
consumer = createClientConsumer(dest, queueName, coreFilterString);
autoDeleteQueueName = queueName;
} else {
// Durable sub
Expand Down Expand Up @@ -928,6 +917,38 @@ private ActiveMQMessageConsumer createConsumer(final ActiveMQDestination dest,
}
}

// This method is for the actual queue creation on the Multicast queue / subscription
private SimpleString createFQQNSubscription(ActiveMQDestination dest,
SimpleString coreFilterString,
AddressQuery response) throws ActiveMQException, JMSException {
SimpleString queueName;
queueName = CompositeAddress.extractQueueName(dest.getSimpleAddress());
if (!response.isExists() || !response.getQueueNames().contains(AutoCreateUtil.getCoreQueueName(session, dest.getSimpleAddress()))) {
if (response.isAutoCreateQueues()) {
try {
createQueue(dest, RoutingType.MULTICAST, dest.getSimpleAddress(), coreFilterString, true, true, response);
return queueName;
} catch (ActiveMQQueueExistsException e) {
// The queue was created by another client/admin between the query check and send create queue packet
// on this case we will switch to the regular verification to validate the coreFilterString
}
} else {
throw new InvalidDestinationException("Destination " + dest.getName() + " does not exist");
}
}

QueueQuery queueQuery = session.queueQuery(queueName);

if (!queueQuery.isExists()) {
throw new InvalidDestinationException("Destination " + queueName + " does not exist");
}

if (coreFilterString != null && queueQuery.getFilterString() != null && !coreFilterString.equals(queueQuery.getFilterString())) {
throw new JMSException(queueName + " filter mismatch [" + coreFilterString + "] is different than existing filter [" + queueQuery.getFilterString() + "]");
}
return queueName;
}

private ClientConsumer createClientConsumer(ActiveMQDestination destination, SimpleString queueName, SimpleString coreFilterString) throws ActiveMQException {
QueueAttributes queueAttributes = destination.getQueueAttributes() == null ? new QueueAttributes() : destination.getQueueAttributes();
int priority = queueAttributes.getConsumerPriority() == null ? ActiveMQDefaultConfiguration.getDefaultConsumerPriority() : queueAttributes.getConsumerPriority();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,15 @@ public void createSharedVolatileQueue(SimpleString address,
}

public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate) throws Exception {
return queueQuery(queueName, routingType, autoCreate, null);
}

public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingType, boolean autoCreate, SimpleString filter) throws Exception {
QueueQueryResult queueQueryResult = serverSession.executeQueueQuery(queueName);

if (!queueQueryResult.isExists() && queueQueryResult.isAutoCreateQueues() && autoCreate) {
try {
serverSession.createQueue(new QueueConfiguration(queueName).setRoutingType(routingType).setAutoCreated(true));
serverSession.createQueue(new QueueConfiguration(queueName).setRoutingType(routingType).setFilterString(filter).setAutoCreated(true));
} catch (ActiveMQQueueExistsException e) {
// The queue may have been created by another thread in the mean time. Catch and do nothing.
}
Expand All @@ -321,7 +325,7 @@ public QueueQueryResult queueQuery(SimpleString queueName, RoutingType routingTy
if (queueQueryResult.isExists() && !queueQueryResult.isAutoCreated()) {
//if routingType is null we bypass the check
if (routingType != null && queueQueryResult.getRoutingType() != routingType) {
throw new IllegalStateException("Incorrect Routing Type for queue, expecting: " + routingType);
throw new IllegalStateException("Incorrect Routing Type for queue " + queueName + ", expecting: " + routingType + " while it had " + queueQueryResult.getRoutingType());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Message;
Expand Down Expand Up @@ -1083,11 +1084,15 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
shared = hasCapabilities(SHARED, source);
global = hasCapabilities(GLOBAL, source);

final boolean isFQQN;

//find out if we have an address made up of the address and queue name, if yes then set queue name
if (CompositeAddress.isFullyQualified(source.getAddress())) {
isFQQN = true;
addressToUse = SimpleString.toSimpleString(CompositeAddress.extractAddressName(source.getAddress()));
queueNameToUse = SimpleString.toSimpleString(CompositeAddress.extractQueueName(source.getAddress()));
} else {
isFQQN = false;
addressToUse = SimpleString.toSimpleString(source.getAddress());
}
//check to see if the client has defined how we act
Expand Down Expand Up @@ -1169,8 +1174,8 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
supportedFilters.put(filter.getKey(), filter.getValue());
}

queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST);
SimpleString simpleStringSelector = SimpleString.toSimpleString(selector);
queue = getMatchingQueue(queueNameToUse, addressToUse, RoutingType.MULTICAST, simpleStringSelector, isFQQN);

//if the address specifies a broker configured queue then we always use this, treat it as a queue
if (queue != null) {
Expand Down Expand Up @@ -1234,10 +1239,13 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
}
} else {
if (queueNameToUse != null) {
//a queue consumer can receive from a multicast queue if it uses a fully qualified name
//setting routingType to null means do not check the routingType against the Queue's routing type.
routingTypeToUse = null;
SimpleString matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, null);
SimpleString matchingAnycastQueue;
QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(addressToUse, queueNameToUse), null, false, null);
if (result.isExists()) {
// if the queue exists and we're using FQQN then just ignore the routing-type
routingTypeToUse = null;
}
matchingAnycastQueue = getMatchingQueue(queueNameToUse, addressToUse, routingTypeToUse, null, false);
if (matchingAnycastQueue != null) {
queue = matchingAnycastQueue;
} else {
Expand Down Expand Up @@ -1284,15 +1292,19 @@ public Consumer init(ProtonServerSenderContext senderContext) throws Exception {
}


private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType) throws Exception {
private SimpleString getMatchingQueue(SimpleString queueName, SimpleString address, RoutingType routingType, SimpleString filter, boolean matchFilter) throws Exception {
if (queueName != null) {
QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), routingType, true);
QueueQueryResult result = sessionSPI.queueQuery(CompositeAddress.toFullyQualified(address, queueName), routingType, true, filter);
if (!result.isExists()) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist");
} else {
if (!result.getAddress().equals(address)) {
throw new ActiveMQAMQPNotFoundException("Queue: '" + queueName + "' does not exist for address '" + address + "'");
}
if (matchFilter && filter != null && result.getFilterString() != null && !filter.equals(result.getFilterString())) {
throw new ActiveMQIllegalStateException("Queue: " + queueName + " filter mismatch [" + filter + "] is different than existing filter [" + result.getFilterString() + "]");

}
return sessionSPI.getMatchingQueue(address, queueName, routingType);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1786,7 +1786,9 @@ private void recoverOperationContext() {
}

private void clearupOperationContext() {
server.getStorageManager().clearContext();
if (server != null && server.getStorageManager() != null) {
server.getStorageManager().clearContext();
}
}

private Transaction lookupTX(TransactionId txID, AMQSession session) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void init(SlowConsumerDetectionListener slowConsumerDetectionListener, lo
if (openwireDestination.isTopic()) {
SimpleString queueName = createTopicSubscription(info.isDurable(), info.getClientId(), destinationName.toString(), info.getSubscriptionName(), selector, destinationName);

serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, null, info.getPriority(), info.isBrowser(), false, -1);
serverConsumer = session.getCoreSession().createConsumer(nativeId, queueName, CompositeAddress.isFullyQualified(destinationName.toString()) ? selector : null, info.getPriority(), info.isBrowser(), false, -1);
serverConsumer.setlowConsumerDetection(slowConsumerDetectionListener);
//only advisory topic consumers need this.
((ServerConsumerImpl)serverConsumer).setPreAcknowledge(preAck);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ public void testTemporaryTopic() throws Exception {
@Test
public void testAutoCreateOnReconnect() throws Exception {
Connection connection = cf.createConnection();
runAfter(() -> ((ActiveMQConnectionFactory)cf).close());
runAfter(connection::close);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Expand Down

0 comments on commit cea9ff6

Please sign in to comment.