From 3150759806b17f3c6d2a59b48cc801d7bc88a05d Mon Sep 17 00:00:00 2001 From: jbertram Date: Tue, 13 Dec 2016 15:05:09 -0600 Subject: [PATCH 1/6] ARTEMIS-880 use built-in prefixing for STOMP --- .../impl/netty/TransportConstants.java | 10 ---- .../core/protocol/stomp/StompConnection.java | 24 ++------- .../protocol/stomp/StompProtocolManager.java | 12 ++--- .../core/protocol/stomp/StompSession.java | 4 +- .../stomp/VersionedStompFrameHandler.java | 53 ++++++++----------- .../artemis/core/server/ServerSession.java | 33 ++++++++++++ .../core/server/impl/ServerSessionImpl.java | 9 ++-- .../tests/integration/stomp/StompTest.java | 11 ++-- 8 files changed, 78 insertions(+), 78 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java index a8e613e4063..14efb79011a 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/TransportConstants.java @@ -203,14 +203,6 @@ public class TransportConstants { public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size"; - public static final String STOMP_ANYCAST_PREFIX = "stompAnycastPrefix"; - - public static final String DEFAULT_STOMP_ANYCAST_PREFIX = ""; - - public static final String STOMP_MULTICAST_PREFIX = "stompMulticastPrefix"; - - public static final String DEFAULT_STOMP_MULTICAST_PREFIX = ""; - public static final String NETTY_CONNECT_TIMEOUT = "connect-timeout-millis"; public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1; @@ -250,8 +242,6 @@ public class TransportConstants { allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION); allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT); allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE); - allowableAcceptorKeys.add(TransportConstants.STOMP_ANYCAST_PREFIX); - allowableAcceptorKeys.add(TransportConstants.STOMP_MULTICAST_PREFIX); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX); allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN); diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java index 3f148f31802..5dafe6075ca 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java @@ -90,10 +90,6 @@ public final class StompConnection implements RemotingConnection { private final int minLargeMessageSize; - private final String anycastPrefix; - - private final String multicastPrefix; - private StompVersions version; private VersionedStompFrameHandler frameHandler; @@ -168,8 +164,6 @@ public boolean hasBytes() { this.enableMessageID = ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID, false, acceptorUsed.getConfiguration()); this.minLargeMessageSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, acceptorUsed.getConfiguration()); - this.anycastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_ANYCAST_PREFIX, TransportConstants.DEFAULT_STOMP_ANYCAST_PREFIX, acceptorUsed.getConfiguration()); - this.multicastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_MULTICAST_PREFIX, TransportConstants.DEFAULT_STOMP_MULTICAST_PREFIX, acceptorUsed.getConfiguration()); } @Override @@ -255,14 +249,14 @@ public synchronized boolean checkDataReceived() { // TODO this should take a type - send or receive so it knows whether to check the address or the queue public void checkDestination(String destination) throws ActiveMQStompException { - if (!manager.destinationExists(destination)) { + if (!manager.destinationExists(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString())) { throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler); } } public boolean autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException { boolean result = false; - ServerSession session = getSession().getSession(); + ServerSession session = getSession().getCoreSession(); try { if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) { @@ -291,9 +285,9 @@ public boolean autoCreateDestinationIfPossible(String queue, RoutingType routing } public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException { - Set actualDeliveryModesOfAddres = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes(); - if (routingType != null && !actualDeliveryModesOfAddres.contains(routingType)) { - throw BUNDLE.illegalSemantics(routingType.toString(), actualDeliveryModesOfAddres.toString()); + Set actualDeliveryModesOfAddress = manager.getServer().getAddressInfo(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes(); + if (routingType != null && !actualDeliveryModesOfAddress.contains(routingType)) { + throw BUNDLE.illegalSemantics(routingType.toString(), actualDeliveryModesOfAddress.toString()); } } @@ -757,14 +751,6 @@ public int getMinLargeMessageSize() { return minLargeMessageSize; } - public String getAnycastPrefix() { - return anycastPrefix; - } - - public String getMulticastPrefix() { - return multicastPrefix; - } - public StompProtocolManager getManager() { return manager; } diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java index aba363434e3..54339a4f78e 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java @@ -261,9 +261,9 @@ public void run() { StompSession session = sessions.remove(connection.getID()); if (session != null) { try { - session.getSession().stop(); - session.getSession().rollback(true); - session.getSession().close(false); + session.getCoreSession().stop(); + session.getCoreSession().rollback(true); + session.getCoreSession().close(false); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e); } @@ -274,7 +274,7 @@ public void run() { while (iterator.hasNext()) { Map.Entry entry = iterator.next(); if (entry.getValue().getConnection() == connection) { - ServerSession serverSession = entry.getValue().getSession(); + ServerSession serverSession = entry.getValue().getCoreSession(); try { serverSession.rollback(true); serverSession.close(false); @@ -355,7 +355,7 @@ public void commitTransaction(StompConnection connection, String txID) throws Ex throw new ActiveMQStompException(connection, "No transaction started: " + txID); } transactedSessions.remove(txID); - session.getSession().commit(); + session.getCoreSession().commit(); } public void abortTransaction(StompConnection connection, String txID) throws Exception { @@ -364,7 +364,7 @@ public void abortTransaction(StompConnection connection, String txID) throws Exc throw new ActiveMQStompException(connection, "No transaction started: " + txID); } transactedSessions.remove(txID); - session.getSession().rollback(false); + session.getCoreSession().rollback(false); } // Inner classes ------------------------------------------------- diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java index 11a80facb7e..c92b8236da3 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java @@ -90,7 +90,7 @@ void setServerSession(ServerSession session) { this.session = session; } - public ServerSession getSession() { + public ServerSession getCoreSession() { return session; } @@ -287,7 +287,7 @@ public void addSubscription(long consumerID, receiveCredits = -1; } - Set routingTypes = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes(); + Set routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes(); if (routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST)) { // subscribes to a topic pubSub = true; diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index cdd9e505748..51e3b992567 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -236,25 +236,25 @@ public StompFrame onAbort(StompFrame request) { return response; } - public StompFrame onSubscribe(StompFrame request) { + public StompFrame onSubscribe(StompFrame frame) { StompFrame response = null; - String destination = getDestination(request); - - String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR); - String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE); - String id = request.getHeader(Stomp.Headers.Subscribe.ID); - String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME); - if (durableSubscriptionName == null) { - durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); - } - RoutingType routingType = getRoutingType(request.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), request.getHeader(Headers.Subscribe.DESTINATION)); - boolean noLocal = false; + try { + String destination = getDestination(frame); - if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { - noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL)); - } + String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR); + String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE); + String id = frame.getHeader(Stomp.Headers.Subscribe.ID); + String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME); + if (durableSubscriptionName == null) { + durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME); + } + RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION)); + boolean noLocal = false; + + if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) { + noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL)); + } - try { connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType); } catch (ActiveMQStompException e) { response = e.getFrame(); @@ -264,14 +264,7 @@ public StompFrame onSubscribe(StompFrame request) { } public String getDestination(StompFrame request) { - String destination = request.getHeader(Headers.Subscribe.DESTINATION); - if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) { - destination = destination.substring(connection.getMulticastPrefix().length()); - } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) { - destination = destination.substring(connection.getAnycastPrefix().length()); - } - - return destination; + return request.getHeader(Headers.Subscribe.DESTINATION); } public StompFrame postprocess(StompFrame request) { @@ -344,17 +337,13 @@ public void onError(ActiveMQStompException e) { connection.destroy(); } - private RoutingType getRoutingType(String typeHeader, String destination) { + private RoutingType getRoutingType(String typeHeader, String destination) throws ActiveMQStompException { // null is valid to return here so we know when the user didn't provide any routing info - RoutingType routingType = null; + RoutingType routingType; if (typeHeader != null) { routingType = RoutingType.valueOf(typeHeader); - } else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) { - if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) { - routingType = RoutingType.MULTICAST; - } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) { - routingType = RoutingType.ANYCAST; - } + } else { + routingType = connection.getSession().getCoreSession().getAddressAndRoutingType(SimpleString.toSimpleString(destination), null).getB(); } return routingType; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java index 9559d74b480..fb3bd22af1f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Set; +import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.message.impl.MessageInternal; import org.apache.activemq.artemis.core.persistence.OperationContext; @@ -250,4 +251,36 @@ void createSharedQueue(SimpleString address, SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception; AddressInfo getAddress(SimpleString address); + + /** + * Strip the prefix (if it exists) from the address based on the prefixes provided to the ServerSession constructor. + * + * @param address the address to inspect + * @return the canonical (i.e. non-prefixed) address name + */ + SimpleString removePrefix(SimpleString address); + + /** + * Get the canonical (i.e. non-prefixed) address and the corresponding routing-type. + * + * @param address the address to inspect + * @param defaultRoutingType the {@code org.apache.activemq.artemis.core.server.RoutingType} to return if no prefix + * match is found. + * @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address + * name and the {@code org.apache.activemq.artemis.core.server.RoutingType} corresponding to the that prefix. + */ + Pair getAddressAndRoutingType(SimpleString address, RoutingType defaultRoutingType); + + /** + * Get the canonical (i.e. non-prefixed) address and the corresponding routing-type. + * + * @param address the address to inspect + * @param defaultRoutingTypes a the {@code java.util.Set} of {@code org.apache.activemq.artemis.core.server.RoutingType} + * objects to return if no prefix match is found. + * @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address + * name and the {@code java.util.Set} of {@code org.apache.activemq.artemis.core.server.RoutingType} objects + * corresponding to the that prefix. + */ + Pair> getAddressAndRoutingTypes(SimpleString address, + Set defaultRoutingTypes); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index 49cf471b55a..d622f5a548d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -1726,14 +1726,16 @@ public List getInTXMessagesForConsumer(long consumerId) { } } - private SimpleString removePrefix(SimpleString address) { + @Override + public SimpleString removePrefix(SimpleString address) { if (prefixEnabled) { return PrefixUtil.getAddress(address, prefixes); } return address; } - private Pair getAddressAndRoutingType(SimpleString address, + @Override + public Pair getAddressAndRoutingType(SimpleString address, RoutingType defaultRoutingType) { if (prefixEnabled) { return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes); @@ -1741,7 +1743,8 @@ private Pair getAddressAndRoutingType(SimpleString ad return new Pair<>(address, defaultRoutingType); } - private Pair> getAddressAndRoutingTypes(SimpleString address, + @Override + public Pair> getAddressAndRoutingTypes(SimpleString address, Set defaultRoutingTypes) { if (prefixEnabled) { return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 60ce16878f8..c0e0d95c064 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1306,7 +1306,7 @@ public void testPrefix(final String prefix, final RoutingType routingType, final final String ADDRESS = UUID.randomUUID().toString(); final String PREFIXED_ADDRESS = prefix + ADDRESS; String param = routingType.toString(); - String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix"; + String urlParam = param.toLowerCase() + "Prefix"; server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start(); conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn.connect(defUser, defPass); @@ -1329,9 +1329,9 @@ public void testPrefix(final String prefix, final RoutingType routingType, final AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS)); assertNotNull("No address was created with the name " + ADDRESS, addressInfo); - Set deliveryModest = new HashSet<>(); - deliveryModest.add(RoutingType.valueOf(param)); - assertEquals(deliveryModest, addressInfo.getRoutingTypes()); + Set routingTypes = new HashSet<>(); + routingTypes.add(RoutingType.valueOf(param)); + assertEquals(routingTypes, addressInfo.getRoutingTypes()); conn.disconnect(); } @@ -1360,8 +1360,7 @@ public void testPrefixedSendAndRecieve(final String prefix, RoutingType routingT int port = 61614; final String ADDRESS = UUID.randomUUID().toString(); final String PREFIXED_ADDRESS = prefix + ADDRESS; - String param = routingType.toString(); - String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix"; + String urlParam = routingType.toString().toLowerCase() + "Prefix"; server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start(); conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port); conn.connect(defUser, defPass); From 2290ab40c3d0b131d3a9d1b63e058d9860574c4b Mon Sep 17 00:00:00 2001 From: jbertram Date: Tue, 13 Dec 2016 17:58:10 -0600 Subject: [PATCH 2/6] Fix bug in createQueue invocation chain --- .../artemis/core/management/impl/ActiveMQServerControlImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java index ef8b9a898ef..382b3e364d3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java @@ -646,7 +646,7 @@ public void createQueue(final String address, final String name) throws Exceptio @Override public void createQueue(final String address, final String name, final String routingType) throws Exception { - createQueue(address, name, routingType, true); + createQueue(address, name, true, routingType); } @Override From 30b1335614f020e6d8051b6a3b5dd68ded5792bd Mon Sep 17 00:00:00 2001 From: jbertram Date: Tue, 13 Dec 2016 17:59:09 -0600 Subject: [PATCH 3/6] ARTEMIS-877 STOMP tests + routing header --- .../stomp/VersionedStompFrameHandler.java | 3 + .../tests/integration/stomp/StompTest.java | 70 +++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java index 51e3b992567..02facd6e269 100644 --- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java +++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java @@ -178,6 +178,9 @@ public StompFrame onSend(StompFrame frame) { long timestamp = System.currentTimeMillis(); ServerMessageImpl message = connection.createServerMessage(); + if (routingType != null) { + message.putByteProperty(Message.HDR_ROUTING_TYPE, routingType.getType()); + } message.setTimestamp(timestamp); message.setAddress(SimpleString.toSimpleString(destination)); StompUtils.copyStandardHeadersFromFrameToMessage(frame, message); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index c0e0d95c064..06e35631007 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -40,12 +40,14 @@ import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.stomp.Stomp; import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -1472,4 +1474,72 @@ public void testInvokeOperationFromStomp() throws Exception { conn.disconnect(); } + + @Test + public void testAnycastMessageRoutingExclusivity() throws Exception { + conn.connect(defUser, defPass); + + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServer activeMQServer = server.getActiveMQServer(); + ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + send(conn, addressA, null, "Hello World!", true, RoutingType.ANYCAST); + + assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); + } + + @Test + public void testMulticastMessageRoutingExclusivity() throws Exception { + conn.connect(defUser, defPass); + + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + + ActiveMQServer activeMQServer = server.getActiveMQServer(); + ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + + send(conn, addressA, null, "Hello World!", true, RoutingType.MULTICAST); + + assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); + assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + } + + @Test + public void testAmbiguousMessageRouting() throws Exception { + conn.connect(defUser, defPass); + + final String addressA = "addressA"; + final String queueA = "queueA"; + final String queueB = "queueB"; + final String queueC = "queueC"; + final String queueD = "queueD"; + + ActiveMQServer activeMQServer = server.getActiveMQServer(); + ActiveMQServerControl serverControl = server.getActiveMQServer().getActiveMQServerControl(); + serverControl.createAddress(addressA, RoutingType.ANYCAST.toString() + "," + RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueA, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueB, RoutingType.ANYCAST.toString()); + serverControl.createQueue(addressA, queueC, RoutingType.MULTICAST.toString()); + serverControl.createQueue(addressA, queueD, RoutingType.MULTICAST.toString()); + + send(conn, addressA, null, "Hello World!", true); + + assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount()); + } } From bdc2abf308cdc849897dd65679fc6647641b8621 Mon Sep 17 00:00:00 2001 From: jbertram Date: Tue, 13 Dec 2016 20:45:48 -0600 Subject: [PATCH 4/6] ARTEMIS-780 fix JmsProducerTest --- .../artemis/tests/integration/jms/JmsProducerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java index 07ef73c941e..d25f41371cb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/JmsProducerTest.java @@ -30,6 +30,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.client.impl.ClientSessionImpl; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.client.ActiveMQJMSContext; import org.apache.activemq.artemis.jms.client.ActiveMQSession; @@ -95,7 +96,7 @@ public void testDisMsgID() { @Test public void multipleSendsUsingSetters() throws Exception { - server.createQueue(SimpleString.toSimpleString("q1"), SimpleString.toSimpleString("q1"), null, true, false); + server.createQueue(SimpleString.toSimpleString("q1"), RoutingType.ANYCAST, SimpleString.toSimpleString("q1"), null, true, false); Queue q1 = context.createQueue("q1"); From 8263ef2f98949b34117caad9997a42ca66165429 Mon Sep 17 00:00:00 2001 From: jbertram Date: Tue, 13 Dec 2016 20:58:08 -0600 Subject: [PATCH 5/6] ARTEMIS-780 fix ReplicationWithDivertTest --- .../integration/divert/ReplicationWithDivertTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java index 40372086e2d..44b5ecf632f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/divert/ReplicationWithDivertTest.java @@ -32,6 +32,7 @@ import org.apache.activemq.artemis.core.config.CoreQueueConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -98,14 +99,14 @@ public void setUp() throws Exception { backupConfig = createDefaultInVMConfig().setBindingsDirectory(getBindingsDir(0, true)). setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)). setLargeMessagesDirectory(getLargeMessagesDir(0, true)); - backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE)); - backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE)); + backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setRoutingType(RoutingType.ANYCAST)); + backupConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setRoutingType(RoutingType.ANYCAST)); DivertConfiguration divertConfiguration = new DivertConfiguration().setName("Test").setAddress(SOURCE_QUEUE).setForwardingAddress(TARGET_QUEUE).setRoutingName("Test"); liveConfig = createDefaultInVMConfig(); - liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setDurable(true)); - liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setDurable(true)); + liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(SOURCE_QUEUE).setName(SOURCE_QUEUE).setDurable(true).setRoutingType(RoutingType.ANYCAST)); + liveConfig.addQueueConfiguration(new CoreQueueConfiguration().setAddress(TARGET_QUEUE).setName(TARGET_QUEUE).setDurable(true).setRoutingType(RoutingType.ANYCAST)); liveConfig.addDivertConfiguration(divertConfiguration); backupConfig.addDivertConfiguration(divertConfiguration); From ba1c83e1ed6f974d4f96a6a04aea16feb3c1de85 Mon Sep 17 00:00:00 2001 From: jbertram Date: Tue, 13 Dec 2016 21:08:36 -0600 Subject: [PATCH 6/6] Fix BackupAuthenticationTest --- .../activemq/artemis/core/server/impl/ActiveMQServerImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 097118fea68..3d8b9a99173 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -601,7 +601,9 @@ public void interrupBackupThread(NodeManager nodeManagerInUse) throws Interrupte } public void resetNodeManager() throws Exception { - nodeManager.stop(); + if (nodeManager != null) { + nodeManager.stop(); + } nodeManager = createNodeManager(configuration.getJournalLocation(), true); }