From e3195d9b9e2f2019c0194b16a7b553cbaf5e6c77 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Tue, 18 Jul 2017 19:43:43 +0100 Subject: [PATCH 1/2] Revert "ARTEMIS-1290 QueueQuery add prefix on address" This reverts commit 44506f2258271953018a951e11c1c53588995d9f. --- .../artemis/core/server/QueueQueryResult.java | 4 --- .../core/server/impl/ServerSessionImpl.java | 11 +------- .../tests/integration/client/SessionTest.java | 28 ++----------------- 3 files changed, 3 insertions(+), 40 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java index cf88d6224cc..3fd818d7d38 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java @@ -138,8 +138,4 @@ public RoutingType getRoutingType() { public int getMaxConsumers() { return maxConsumers; } - - public void setAddress(SimpleString address) { - this.address = address; - } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index bd8c3952559..8e557d3b06f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -728,16 +728,7 @@ public void deleteQueue(final SimpleString queueToDelete) throws Exception { @Override public QueueQueryResult executeQueueQuery(final SimpleString name) throws Exception { - QueueQueryResult result = server.queueQuery(removePrefix(name)); - if (prefixEnabled) { - for (Map.Entry entry : prefixes.entrySet()) { - if (entry.getValue() == result.getRoutingType()) { - result.setAddress(entry.getKey().concat(result.getAddress())); - break; - } - } - } - return result; + return server.queueQuery(removePrefix(name)); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java index de2cc236cfc..9954a4e6177 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SessionTest.java @@ -21,9 +21,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; -import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ClientConsumer; import org.apache.activemq.artemis.api.core.client.ClientMessage; import org.apache.activemq.artemis.api.core.client.ClientProducer; @@ -34,7 +32,6 @@ import org.apache.activemq.artemis.api.core.client.SessionFailureListener; import org.apache.activemq.artemis.core.client.impl.ClientSessionFactoryInternal; import org.apache.activemq.artemis.core.client.impl.ClientSessionInternal; -import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -62,11 +59,8 @@ public class SessionTest extends ActiveMQTestBase { public void setUp() throws Exception { super.setUp(); - locator = createNettyNonHALocator(); - Configuration configuration = createDefaultNettyConfig(); - configuration.addAcceptorConfiguration("prefixed", "tcp://localhost:61617?multicastPrefix=multicast://;anycastPrefix=anycast://"); - server = createServer(configuration); - createServer(false); + locator = createInVMNonHALocator(); + server = createServer(false); server.start(); waitForServerToStart(server); } @@ -212,24 +206,6 @@ public void testQueueQuery() throws Exception { clientSession.close(); } - @Test - public void testQueueQueryWithAddressPrefix() throws Exception { - String address = new String("testAddress"); - - cf = ActiveMQClient.createServerLocator("tcp://localhost:61617").createSessionFactory(); - ClientSession clientSession = cf.createSession(false, true, true); - - clientSession.createQueue(address, RoutingType.ANYCAST, queueName + "1", false); - clientSession.createQueue(address, RoutingType.MULTICAST, queueName + "2", false); - - QueueQuery respA = clientSession.queueQuery(new SimpleString(queueName + "1")); - QueueQuery respM = clientSession.queueQuery(new SimpleString(queueName + "2")); - - Assert.assertEquals(new SimpleString("anycast://" + address), respA.getAddress()); - Assert.assertEquals(new SimpleString("multicast://" + address), respM.getAddress()); - clientSession.close(); - } - private void flushQueue() throws Exception { Queue queue = server.locateQueue(SimpleString.toSimpleString(queueName)); assertNotNull(queue); From d918f9759b190bf11c5358859f73281fc34ed263 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Tue, 18 Jul 2017 19:43:23 +0100 Subject: [PATCH 2/2] ARTEMIS-1290 QueueQuery add prefix on address 2 --- .../core/impl/wireformat/QueueAbstractPacket.java | 9 +++++++++ .../activemq/artemis/core/server/QueueQueryResult.java | 4 ++++ .../core/protocol/core/ServerSessionPacketHandler.java | 9 +++++++-- 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java index 767cd0cfced..641d7cc7b92 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/QueueAbstractPacket.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; @@ -108,4 +109,12 @@ private static SimpleString jmsPrefixOf(SimpleString address) { public QueueAbstractPacket(byte type) { super(type); } + + public static SimpleString getOldPrefixedAddress(SimpleString address, RoutingType routingType) { + switch (routingType) { + case MULTICAST: return OLD_TOPIC_PREFIX.concat(address); + case ANYCAST: return OLD_QUEUE_PREFIX.concat(address); + default: return address; + } + } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java index 3fd818d7d38..cf88d6224cc 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java @@ -138,4 +138,8 @@ public RoutingType getRoutingType() { public int getMaxConsumers() { return maxConsumers; } + + public void setAddress(SimpleString address) { + this.address = address; + } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index bd979395c2e..847e18baca7 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -92,10 +92,10 @@ import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.spi.core.remoting.Connection; -import org.apache.activemq.artemis.utils.actors.Actor; -import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.apache.activemq.artemis.utils.SimpleFuture; import org.apache.activemq.artemis.utils.SimpleFutureImpl; +import org.apache.activemq.artemis.utils.actors.Actor; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.jboss.logging.Logger; import static org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl.CREATE_ADDRESS; @@ -384,6 +384,11 @@ private void slowPacketHandler(final Packet packet) { requiresResponse = true; SessionQueueQueryMessage request = (SessionQueueQueryMessage) packet; QueueQueryResult result = session.executeQueueQuery(request.getQueueName(remotingConnection.getClientVersion())); + + if (remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + result.setAddress(SessionQueueQueryMessage.getOldPrefixedAddress(result.getAddress(), result.getRoutingType())); + } + if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V3)) { response = new SessionQueueQueryResponseMessage_V3(result); } else if (channel.supports(PacketImpl.SESS_QUEUEQUERY_RESP_V2)) {