Skip to content

Commit

Permalink
ARTEMIS-4512 JMS q consumer can wrongly connect to multicast queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Nov 28, 2023
1 parent 6597f02 commit 3bdef0e
Show file tree
Hide file tree
Showing 9 changed files with 64 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public QueueQueryImpl(final boolean durable,
final SimpleString name,
final boolean exists,
final boolean autoCreateQueues) {
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, -1, false, false, RoutingType.MULTICAST);
this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, -1, false, false, null);
}

public QueueQueryImpl(final boolean durable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@
*/
package org.apache.activemq.artemis.utils;

import java.lang.invoke.MethodHandles;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSession.AddressQuery;
import org.apache.activemq.artemis.api.core.client.ClientSession.QueueQuery;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;

import static org.apache.activemq.artemis.api.core.ActiveMQExceptionType.QUEUE_DOES_NOT_EXIST;

/**
* Utility class to create queues 'automatically'.
Expand Down Expand Up @@ -58,6 +61,13 @@ public static void autoCreateQueue(ClientSession session, SimpleString destAddr
} else {
throw new ActiveMQException("Destination " + destAddress + " does not exist", QUEUE_DOES_NOT_EXIST);
}
} else {
QueueQuery queueQueryResult = session.queueQuery(queueName);
// the routing type might be null if the server is very old in which case we default to the old behavior
RoutingType routingType = queueQueryResult.getRoutingType();
if (routingType != null && routingType != RoutingType.ANYCAST && !CompositeAddress.isFullyQualified(destAddress)) {
throw new ActiveMQException("Destination " + destAddress + " does not support JMS queue semantics", QUEUE_DOES_NOT_EXIST);
}
}
}

Expand All @@ -70,7 +80,7 @@ public static void autoCreateQueue(ClientSession session, SimpleString destAddr
* @param filter to apply on the queue
* @param durable if queue is durable
*/
public static void setRequiredQueueConfigurationIfNotSet(QueueConfiguration queueConfiguration, ClientSession.AddressQuery addressQuery, RoutingType routingType, SimpleString filter, boolean durable) {
public static void setRequiredQueueConfigurationIfNotSet(QueueConfiguration queueConfiguration, AddressQuery addressQuery, RoutingType routingType, SimpleString filter, boolean durable) {
if (queueConfiguration.getRoutingType() == null) {
queueConfiguration.setRoutingType(routingType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,10 @@ public RoutingType getDefaultRoutingType(SimpleString address) {
return manager.getServer().getAddressSettingsRepository().getMatch(address.toString()).getDefaultAddressRoutingType();
}

public RoutingType getRoutingTypeFromPrefix(SimpleString address, RoutingType defaultRoutingType) {
return serverSession.getRoutingTypeFromPrefix(address, defaultRoutingType);
}

public void check(SimpleString address, CheckType checkType, SecurityAuth session) throws Exception {
manager.getServer().getSecurityStore().check(address, checkType, session);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ protected RoutingType getRoutingType(Symbol[] symbols, SimpleString address) {
}

private RoutingType getDefaultRoutingType(SimpleString address) {
RoutingType defaultRoutingType = sessionSPI.getDefaultRoutingType(address);
RoutingType defaultRoutingType = sessionSPI.getRoutingTypeFromPrefix(address, sessionSPI.getDefaultRoutingType(address));
return defaultRoutingType == null ? ActiveMQDefaultConfiguration.getDefaultRoutingType() : defaultRoutingType;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1838,7 +1838,8 @@ public AutoCreateResult checkAutoCreate(final QueueConfiguration queueConfig) th
}

if (queueConfig.getRoutingType() == RoutingType.ANYCAST || queueConfig.isFqqn()) {
if (server.locateQueue(unPrefixedQueue) == null) {
Queue q = server.locateQueue(unPrefixedQueue);
if (q == null) {
// The queue doesn't exist.
Bindings bindings = server.getPostOffice().lookupBindingsForAddress(unPrefixedAddress);
if (bindings != null && bindings.hasLocalBinding() && !queueConfig.isFqqn()) {
Expand All @@ -1858,7 +1859,13 @@ public AutoCreateResult checkAutoCreate(final QueueConfiguration queueConfig) th
}
} else {
// The queue exists.
result = AutoCreateResult.EXISTED;
if (q.getRoutingType() != RoutingType.ANYCAST && !queueConfig.isFqqn()) {
// The queue exists, but it does not support the requested routing type, and it's not FQQN.
return AutoCreateResult.NOT_FOUND;
} else {
// The queue exists, and it supports the requested routing type or it's FQQN so it doesn't matter.
result = AutoCreateResult.EXISTED;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,36 @@ private void testDeliveryMode(Connection connection1, Connection connection2) th
}
}

@Test(timeout = 30000)
public void testQueueRoutingTypeMismatchCore() throws Exception {
testQueueRoutingTypeMismatch(createCoreConnection());
}

@Test(timeout = 30000)
public void testQueueRoutingTypeMismatchOpenWire() throws Exception {
testQueueRoutingTypeMismatch(createOpenWireConnection());
}

@Test(timeout = 30000)
public void testQueueRoutingTypeMismatchAMQP() throws Exception {
testQueueRoutingTypeMismatch(createConnection());
}

private void testQueueRoutingTypeMismatch(Connection connection) throws Exception {
server.getAddressSettingsRepository().getMatch("#").setAutoCreateQueues(false).setAutoCreateAddresses(false);
String name = getTopicName();
server.createQueue(new QueueConfiguration(name).setAddress(name).setRoutingType(RoutingType.MULTICAST).setAutoCreateAddress(true));
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue(name));
fail("Should have thrown a JMSException!");
} catch (JMSException e) {
// expected
} finally {
connection.close();
}
}

@Test(timeout = 30000)
public void testPriorityAMQPProducerCoreConsumer() throws Exception {
Connection connection = createConnection(); //AMQP
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ under the License.
</anycast>
</address>
<address name="target">
<multicast>
<anycast>
<queue name="target"/>
</multicast>
</anycast>
</address>
</addresses>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ under the License.
</anycast>
</address>
<address name="target">
<multicast>
<anycast>
<queue name="target"/>
</multicast>
</anycast>
</address>
</addresses>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,8 +218,8 @@ public void testSendMessageUsingCurrentLogonUser() throws Exception {
createQueueCommand.setUser(SERVER_ADMIN_USERNAME);
createQueueCommand.setPassword(SERVER_ADMIN_PASSWORD);
createQueueCommand.setName(queueName);
createQueueCommand.setMulticast(true);
createQueueCommand.setAnycast(false);
createQueueCommand.setMulticast(false);
createQueueCommand.setAnycast(true);
createQueueCommand.setAutoCreateAddress(true);
createQueueCommand.execute(new ActionContext());

Expand Down

0 comments on commit 3bdef0e

Please sign in to comment.