From 513b7826a440eecbfa0f3d7000360186e35bc43d Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 13 Dec 2023 10:36:11 -0600 Subject: [PATCH] ARTEMIS-4532 MQTT-to-core wildcard conversion is broken Currently when an MQTT topic filter contains characters from the configured wildcard syntax the conversion to/from this syntax breaks. For example, when using the default wildcard syntax if an MQTT topic filter contains a . the conversion from the MQTT wildcard syntax to the core wildcard syntax and back will result in the `.` being replaced with a `/.`. This commit fixes that plus a few other things... - Implements proper conversions to/from one WildcardConfiguration to another. - Refactors the MQTT code which invokes these conversion methods. This includes simplifying a lot of test code. - Adds lots of tests for everything. - Clarifies some variable naming to better distinguish between core and MQTT. --- .../protocol/mqtt/MQTTPublishManager.java | 16 +-- .../mqtt/MQTTRetainMessageManager.java | 4 +- .../core/protocol/mqtt/MQTTSessionState.java | 4 +- .../mqtt/MQTTSubscriptionManager.java | 51 ++----- .../artemis/core/protocol/mqtt/MQTTUtil.java | 110 ++++++++++----- .../core/protocol/mqtt/MQTTUtilTest.java | 110 ++++++++++++++- .../protocol/openwire/util/OpenWireUtil.java | 10 +- .../core/config/WildcardConfiguration.java | 130 +++++++++++++++--- .../config/WildcardConfigurationTest.java | 124 +++++++++++++++++ docs/user-manual/mqtt.adoc | 20 ++- docs/user-manual/versions.adoc | 22 +++ .../tests/integration/mqtt/MQTTTest.java | 8 +- .../integration/mqtt/MQTTTestSupport.java | 3 +- .../mqtt/MqttWildCardSubAutoCreateTest.java | 6 +- .../mqtt/PahoMQTTQOS2SecurityTest.java | 3 +- .../tests/integration/mqtt5/MQTT5Test.java | 40 ++++-- .../integration/mqtt5/MQTT5TestSupport.java | 41 ++---- .../mqtt5/spec/ControlPacketFormatTests.java | 25 ++-- .../mqtt5/spec/MessageReceiptTests.java | 5 +- .../integration/mqtt5/spec/QoSTests.java | 60 ++++---- .../spec/controlpackets/ConnectTests.java | 4 +- .../spec/controlpackets/PublishTests.java | 64 +++++---- .../PublishTestsWithSecurity.java | 6 +- .../SubscribeTestsWithSecurity.java | 6 +- .../CertificateAuthenticationSslTests.java | 9 +- 25 files changed, 641 insertions(+), 240 deletions(-) create mode 100644 artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index 5c79a53b433..eec146287de 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -216,7 +216,7 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception } } } - String coreAddress = MQTTUtil.convertMqttTopicFilterToCore(topic, session.getWildcardConfiguration()); + String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(topic, session.getWildcardConfiguration()); SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool()); Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, address, message); int qos = message.fixedHeader().qosLevel().value(); @@ -392,7 +392,7 @@ void handlePubAck(int messageId) throws Exception { } private boolean publishToClient(int messageId, ICoreMessage message, int deliveryCount, int qos, long consumerId) throws Exception { - String address = MQTTUtil.convertCoreAddressToMqttTopicFilter(message.getAddress() == null ? "" : message.getAddress(), session.getWildcardConfiguration()); + String topic = MQTTUtil.getMqttTopicFromCoreAddress(message.getAddress() == null ? "" : message.getAddress(), session.getWildcardConfiguration()); ByteBuf payload; switch (message.getType()) { @@ -418,29 +418,29 @@ private boolean publishToClient(int messageId, ICoreMessage message, int deliver if (session.getVersion() == MQTTVersion.MQTT_5) { if (!isRetain && message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY)) { - MqttTopicSubscription sub = session.getState().getSubscription(message.getAddress()); + MqttTopicSubscription sub = session.getState().getSubscription(topic); if (sub != null && sub.option().isRetainAsPublished()) { isRetain = true; } } if (session.getState().getClientTopicAliasMaximum() != null) { - Integer alias = session.getState().getServerTopicAlias(address); + Integer alias = session.getState().getServerTopicAlias(topic); if (alias == null) { - alias = session.getState().addServerTopicAlias(address); + alias = session.getState().addServerTopicAlias(topic); if (alias != null) { mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias)); } } else { mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias)); - address = ""; + topic = ""; } } } - int remainingLength = MQTTUtil.calculateRemainingLength(address, mqttProperties, payload); + int remainingLength = MQTTUtil.calculateRemainingLength(topic, mqttProperties, payload); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qos), isRetain, remainingLength); - MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(address, messageId, mqttProperties); + MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId, mqttProperties); MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, payload); int maxSize = session.getState().getClientMaxPacketSize(); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java index 26be4bc53fc..b04f09f7847 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java @@ -48,7 +48,7 @@ public MQTTRetainMessageManager(MQTTSession session) { * the retained queue and the previous retain message consumed to remove it from the queue. */ void handleRetainedMessage(Message messageParameter, String address, boolean reset, Transaction tx) throws Exception { - SimpleString retainAddress = new SimpleString(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, address, session.getWildcardConfiguration())); + String retainAddress = MQTTUtil.getCoreRetainAddressFromMqttTopic(address, session.getWildcardConfiguration()); Queue queue = session.getServer().locateQueue(retainAddress); if (queue == null) { @@ -65,7 +65,7 @@ void handleRetainedMessage(Message messageParameter, String address, boolean res void addRetainedMessagesToQueue(Queue queue, String address) throws Exception { // The address filter that matches all retained message queues. - String retainAddress = MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, address, session.getWildcardConfiguration()); + String retainAddress = MQTTUtil.getCoreRetainAddressFromMqttTopic(address, session.getWildcardConfiguration()); BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress)); // Iterate over all matching retain queues and add the queue diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java index 2570cabcd60..ad354345ecc 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java @@ -199,7 +199,7 @@ public Collection> getSubscriptionsPlusID() public boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) throws Exception { // synchronized to prevent race with removeSubscription synchronized (subscriptions) { - addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCore(subscription.topicName(), wildcardConfiguration).toString(), new ConcurrentHashMap<>()); + addressMessageMap.putIfAbsent(MQTTUtil.getCoreAddressFromMqttTopic(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<>()); Pair existingSubscription = subscriptions.get(subscription.topicName()); if (existingSubscription != null) { @@ -237,7 +237,7 @@ public Pair getSubscriptionPlusID(String address } public List getMatchingSubscriptionIdentifiers(String address) { - address = MQTTUtil.convertCoreAddressToMqttTopicFilter(address, session.getServer().getConfiguration().getWildcardConfiguration()); + address = MQTTUtil.getMqttTopicFromCoreAddress(address, session.getServer().getConfiguration().getWildcardConfiguration()); List result = null; for (Pair pair : subscriptions.values()) { Pattern pattern = Match.createPattern(pair.getA().topicName(), MQTTUtil.MQTT_WILDCARD, true); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 5ca6679dc08..e66c880d192 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -28,7 +28,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.FilterConstants; -import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; @@ -40,7 +39,6 @@ import org.apache.activemq.artemis.utils.CompositeAddress; import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.DOLLAR; -import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.SLASH; import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING; public class MQTTSubscriptionManager { @@ -106,11 +104,10 @@ synchronized void start() throws Exception { private void addSubscription(MqttTopicSubscription subscription, Integer subscriptionIdentifier, boolean initialStart) throws Exception { String rawTopicName = CompositeAddress.extractAddressName(subscription.topicName()); String parsedTopicName = MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB(); - int qos = subscription.qualityOfService().value(); - String coreAddress = MQTTUtil.convertMqttTopicFilterToCore(parsedTopicName, session.getWildcardConfiguration()); - String coreQueue = getQueueNameForTopic(rawTopicName).toString(); - Queue q = createQueueForSubscription(coreAddress, coreQueue); + Queue q = createQueueForSubscription(rawTopicName, parsedTopicName); + + int qos = subscription.qualityOfService().value(); try { if (initialStart) { @@ -140,16 +137,6 @@ private void addSubscription(MqttTopicSubscription subscription, Integer subscri } } - private String parseTopicName(String rawTopicName) { - String parsedTopicName = rawTopicName; - - // if using a shared subscription then parse - if (rawTopicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) { - parsedTopicName = rawTopicName.substring(rawTopicName.indexOf(SLASH, rawTopicName.indexOf(SLASH) + 1) + 1); - } - return parsedTopicName; - } - synchronized void stop() throws Exception { for (ServerConsumer consumer : consumers.values()) { consumer.setStarted(false); @@ -159,13 +146,16 @@ synchronized void stop() throws Exception { } } - private Queue createQueueForSubscription(String address, String queueName) throws Exception { + private Queue createQueueForSubscription(String rawTopicName, String parsedTopicName) throws Exception { + String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(parsedTopicName, session.getWildcardConfiguration()); + String coreQueue = MQTTUtil.getCoreQueueFromMqttTopic(rawTopicName, session.getState().getClientId(), session.getWildcardConfiguration()); + // check to see if a subscription queue already exists. - Queue q = session.getServer().locateQueue(queueName); + Queue q = session.getServer().locateQueue(coreQueue); // The queue does not exist so we need to create it. if (q == null) { - SimpleString sAddress = SimpleString.toSimpleString(address); + SimpleString sAddress = SimpleString.toSimpleString(coreAddress); // Check we can auto create queues. BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(sAddress); @@ -182,7 +172,7 @@ private Queue createQueueForSubscription(String address, String queueName) throw addressInfo = session.getServerSession().createAddress(sAddress, RoutingType.MULTICAST, true); } - return findOrCreateQueue(bindingQueryResult, addressInfo, queueName); + return findOrCreateQueue(bindingQueryResult, addressInfo, coreQueue); } return q; } @@ -233,13 +223,13 @@ private SimpleString getMessageFilter(SimpleString addressName) { } } - private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos, boolean noLocal, Long existingConsumerId) throws Exception { + private void createConsumerForSubscriptionQueue(Queue queue, String topicFilter, int qos, boolean noLocal, Long existingConsumerId) throws Exception { long cid = existingConsumerId != null ? existingConsumerId : session.getServer().getStorageManager().generateID(); // for noLocal support we use the MQTT *client id* rather than the connection ID, but we still use the existing property name ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), noLocal ? SimpleString.toSimpleString(CONNECTION_ID_PROPERTY_NAME_STRING + " <> '" + session.getState().getClientId() + "'") : null, false, false, -1); - ServerConsumer existingConsumer = consumers.put(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic).getB(), consumer); + ServerConsumer existingConsumer = consumers.put(topicFilter, consumer); if (existingConsumer != null) { existingConsumer.setStarted(false); existingConsumer.close(false); @@ -257,7 +247,7 @@ short[] removeSubscriptions(List topics, boolean enforceSecurity) throws synchronized (state) { reasonCodes = new short[topics.size()]; for (int i = 0; i < topics.size(); i++) { - if (session.getState().getSubscription(topics.get(i)) == null) { + if (state.getSubscription(topics.get(i)) == null) { reasonCodes[i] = MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED; continue; } @@ -265,14 +255,14 @@ short[] removeSubscriptions(List topics, boolean enforceSecurity) throws short reasonCode = MQTTReasonCodes.SUCCESS; try { - session.getState().removeSubscription(topics.get(i)); + state.removeSubscription(topics.get(i)); ServerConsumer removed = consumers.remove(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topics.get(i)).getB()); if (removed != null) { removed.close(false); consumerQoSLevels.remove(removed.getID()); } - SimpleString internalQueueName = SimpleString.toSimpleString(getQueueNameForTopic(topics.get(i))); + SimpleString internalQueueName = SimpleString.toSimpleString(MQTTUtil.getCoreQueueFromMqttTopic(topics.get(i), state.getClientId(), session.getServer().getConfiguration().getWildcardConfiguration())); Queue queue = session.getServer().locateQueue(internalQueueName); if (queue != null) { if (queue.isConfigurationManaged()) { @@ -296,17 +286,6 @@ short[] removeSubscriptions(List topics, boolean enforceSecurity) throws return reasonCodes; } - private String getQueueNameForTopic(String topic) { - String queueName; - if (MQTTUtil.isSharedSubscription(topic)) { - Pair decomposed = MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic); - queueName = decomposed.getA().concat(".").concat(decomposed.getB()); - } else { - queueName = session.getState().getClientId().concat(".").concat(topic); - } - return MQTTUtil.convertMqttTopicFilterToCore(queueName, session.getWildcardConfiguration()); - } - /** * As per MQTT Spec. Subscribes this client to a number of MQTT topics. * diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java index 976e958005e..73bdd8d06a5 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java @@ -53,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.lang.invoke.MethodHandles; +import java.util.Objects; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE; import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA; @@ -123,7 +124,7 @@ public class MQTTUtil { public static final int TWO_BYTE_INT_MAX = Integer.decode("0xFFFF"); // 65_535 - // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011 + // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011 public static final int VARIABLE_BYTE_INT_MAX = 268_435_455; public static final int MAX_PACKET_SIZE = VARIABLE_BYTE_INT_MAX; @@ -138,26 +139,70 @@ public class MQTTUtil { public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE; - public static String convertMqttTopicFilterToCore(String filter, WildcardConfiguration wildcardConfiguration) { - return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration); - } + public static final WildcardConfiguration MQTT_WILDCARD = new WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS); - public static String convertMqttTopicFilterToCore(String prefixToAdd, String filter, WildcardConfiguration wildcardConfiguration) { - if (filter == null) { - return ""; - } + /** + * This method takes the MQTT-related input and translates it into the proper name for a core subscription queue. The + * {@code topicFilter} may be either for a shared subscription in the format {@code $share//} + * or a normal MQTT topic filter (e.g. {@code a/b/#}, {@code a/+/c}, {@code a/b/c}, etc.). + * + * @param topicFilter the MQTT topic filter + * @param clientId the MQTT client ID, used for normal (i.e. non-shared) subscriptions + * @param wildcardConfiguration the {@code WildcardConfiguration} governing the core broker + * @return the name of the core subscription queue based on the input + */ + public static String getCoreQueueFromMqttTopic(String topicFilter, String clientId, WildcardConfiguration wildcardConfiguration) { + Objects.requireNonNull(topicFilter, "MQTT topic filter must not be null"); + Objects.requireNonNull(wildcardConfiguration, "Broker wildcard configuration must not be null"); - String converted = MQTT_WILDCARD.convert(filter, wildcardConfiguration); - if (prefixToAdd != null) { - converted = prefixToAdd + converted; + if (isSharedSubscription(topicFilter)) { + Pair decomposed = decomposeSharedSubscriptionTopicFilter(topicFilter); + return new StringBuilder().append(decomposed.getA()).append(".").append(getCoreAddressFromMqttTopic(decomposed.getB(), wildcardConfiguration)).toString(); + } else { + Objects.requireNonNull(clientId, "MQTT client ID must not be null"); + return new StringBuilder().append(clientId).append(".").append(getCoreAddressFromMqttTopic(topicFilter, wildcardConfiguration)).toString(); } - return converted; } - public static String convertCoreAddressToMqttTopicFilter(String address, WildcardConfiguration wildcardConfiguration) { - if (address == null) { - return ""; - } + /** + * This method takes the MQTT-related input and translates it into the proper name for a core address. The + * {@code topicFilter} must be normal (i.e. non-shared). It should not be in the format + * {@code $share//}. + * + * @param topicFilter the MQTT topic filter + * @param wildcardConfiguration the {@code WildcardConfiguration} governing the core broker + * @return the name of the core addres based on the input + */ + public static String getCoreAddressFromMqttTopic(String topicFilter, WildcardConfiguration wildcardConfiguration) { + Objects.requireNonNull(topicFilter, "MQTT topic filter must not be null"); + Objects.requireNonNull(wildcardConfiguration, "Broker wildcard configuration must not be null"); + + return MQTT_WILDCARD.convert(topicFilter, wildcardConfiguration); + } + + /** + * This is exactly the same as {@link #getCoreAddressFromMqttTopic(String, WildcardConfiguration)} except that it + * also prefixes the return with + * {@link org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX} + * + * @param topicFilter the MQTT topic filter + * @param wildcardConfiguration the {@code WildcardConfiguration} governing the core broker + * @return the name of the core address based on the input, stripping + * {@link org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX} if it exists + */ + public static String getCoreRetainAddressFromMqttTopic(String topicFilter, WildcardConfiguration wildcardConfiguration) { + return MQTT_RETAIN_ADDRESS_PREFIX + getCoreAddressFromMqttTopic(topicFilter, wildcardConfiguration); + } + + /** + * + * @param address the core address + * @param wildcardConfiguration the {@code WildcardConfiguration} governing the core broker + * @return the name of the MQTT topic based on the input + */ + public static String getMqttTopicFromCoreAddress(String address, WildcardConfiguration wildcardConfiguration) { + Objects.requireNonNull(address, "Address must not be null"); + Objects.requireNonNull(wildcardConfiguration, "Broker wildcard configuration must not be null"); if (address.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) { address = address.substring(MQTT_RETAIN_ADDRESS_PREFIX.length()); @@ -166,16 +211,6 @@ public static String convertCoreAddressToMqttTopicFilter(String address, Wildcar return wildcardConfiguration.convert(address, MQTT_WILDCARD); } - public static class MQTTWildcardConfiguration extends WildcardConfiguration { - public MQTTWildcardConfiguration() { - setDelimiter(SLASH); - setSingleWord(PLUS); - setAnyWords(HASH); - } - } - - public static final WildcardConfiguration MQTT_WILDCARD = new MQTTWildcardConfiguration(); - private static ICoreMessage createServerMessage(MQTTSession session, SimpleString address, MqttPublishMessage mqttPublishMessage) { long id = session.getServer().getStorageManager().generateID(); @@ -530,25 +565,30 @@ public static T getProperty(Class type, MqttProperties properties, MqttPr return defaultReturnValue == null ? null : defaultReturnValue; } - - - /* - * MQTT shared subscriptions are specified with the syntax from - * https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250: - * $share// - * This method takes this syntax and returns the shareName and the topicFilter. + /** + * MQTT shared subscriptions are specified with + * this syntax. + * + * @param topicFilter String in the format {@code $share//} + * @return {@code Pair} with {@code shareName} and {@code topicFilter} respectively or {@code null} + * and {@code topicFilter} if not in the shared-subscription format. */ public static Pair decomposeSharedSubscriptionTopicFilter(String topicFilter) { if (isSharedSubscription(topicFilter)) { int prefix = SHARED_SUBSCRIPTION_PREFIX.length(); String shareName = topicFilter.substring(prefix, topicFilter.indexOf(SLASH, prefix)); String parsedTopicName = topicFilter.substring(topicFilter.indexOf(SLASH, prefix) + 1); - return new Pair(shareName, parsedTopicName); + return new Pair<>(shareName, parsedTopicName); } else { - return new Pair(null, topicFilter); + return new Pair<>(null, topicFilter); } } + /** + * + * @param topicFilter the topic filter + * @return {@code true} if the input starts with {@code $share/}, {@code false} otherwise + */ public static boolean isSharedSubscription(String topicFilter) { if (topicFilter.startsWith(SHARED_SUBSCRIPTION_PREFIX)) { return true; diff --git a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java index 910b24851ea..d24666e842a 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java @@ -18,13 +18,14 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import org.apache.activemq.artemis.api.core.Pair; +import org.apache.activemq.artemis.core.config.WildcardConfiguration; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; public class MQTTUtilTest { - @Test public void testDecompose() { String shareName = RandomUtil.randomString(); @@ -34,4 +35,111 @@ public void testDecompose() { assertEquals(shareName, decomposed.getA()); assertEquals(topicFilter, decomposed.getB()); } + + @Test + public void testGetCoreQueueFromMqttTopic() { + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreQueueFromMqttTopic(null, null, null)); + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreQueueFromMqttTopic(null, null, new WildcardConfiguration())); + + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreQueueFromMqttTopic("", null, null)); + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreQueueFromMqttTopic("", null, new WildcardConfiguration())); + + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreQueueFromMqttTopic("", "", null)); + + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreQueueFromMqttTopic(null, "", null)); + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreQueueFromMqttTopic(null, "", new WildcardConfiguration())); + + final String clientId = RandomUtil.randomString().replace("-", ""); + + WildcardConfiguration defaultWildCardConfig = new WildcardConfiguration(); + assertEquals(clientId + ".a.b.c", MQTTUtil.getCoreQueueFromMqttTopic("a/b/c", clientId, defaultWildCardConfig)); + assertEquals(clientId + ".a.*.c", MQTTUtil.getCoreQueueFromMqttTopic("a/+/c", clientId, defaultWildCardConfig)); + assertEquals(clientId + ".a.*.#", MQTTUtil.getCoreQueueFromMqttTopic("a/+/#", clientId, defaultWildCardConfig)); + assertEquals(clientId + ".1\\.0.device", MQTTUtil.getCoreQueueFromMqttTopic("1.0/device", clientId, defaultWildCardConfig)); + assertEquals(clientId + ".*", MQTTUtil.getCoreQueueFromMqttTopic("+", clientId, defaultWildCardConfig)); + assertEquals(clientId + "..", MQTTUtil.getCoreQueueFromMqttTopic("/", clientId, defaultWildCardConfig)); + assertEquals(clientId + ".#", MQTTUtil.getCoreQueueFromMqttTopic("#", clientId, defaultWildCardConfig)); + + WildcardConfiguration customWildCardConfig = new WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!'); + assertEquals(clientId + ".a|b|c", MQTTUtil.getCoreQueueFromMqttTopic("a/b/c", clientId, customWildCardConfig)); + assertEquals(clientId + ".a|$|c", MQTTUtil.getCoreQueueFromMqttTopic("a/+/c", clientId, customWildCardConfig)); + assertEquals(clientId + ".a|$|!", MQTTUtil.getCoreQueueFromMqttTopic("a/+/#", clientId, customWildCardConfig)); + assertEquals(clientId + ".1.0|device", MQTTUtil.getCoreQueueFromMqttTopic("1.0/device", clientId, customWildCardConfig)); + assertEquals(clientId + ".$", MQTTUtil.getCoreQueueFromMqttTopic("+", clientId, customWildCardConfig)); + assertEquals(clientId + ".|", MQTTUtil.getCoreQueueFromMqttTopic("/", clientId, customWildCardConfig)); + assertEquals(clientId + ".!", MQTTUtil.getCoreQueueFromMqttTopic("#", clientId, customWildCardConfig)); + } + + @Test + public void testGetCoreQueueFromMqttTopicWithSharedSubscription() { + final String clientId = RandomUtil.randomString().replace("-", ""); + + WildcardConfiguration defaultWildCardConfig = new WildcardConfiguration(); + assertEquals("shareName.a.b.c", MQTTUtil.getCoreQueueFromMqttTopic("$share/shareName/a/b/c", clientId, defaultWildCardConfig)); + + WildcardConfiguration customWildCardConfig = new WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!'); + assertEquals("shareName.a|b|c", MQTTUtil.getCoreQueueFromMqttTopic("$share/shareName/a/b/c", clientId, customWildCardConfig)); + + } + + @Test + public void testGetCoreAddressFromMqttTopic() { + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreAddressFromMqttTopic(null, null)); + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreAddressFromMqttTopic(null, new WildcardConfiguration())); + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreAddressFromMqttTopic("", null)); + + WildcardConfiguration defaultWildCardConfig = new WildcardConfiguration(); + assertEquals("a.b.c", MQTTUtil.getCoreAddressFromMqttTopic("a/b/c", defaultWildCardConfig)); + assertEquals("a.*.c", MQTTUtil.getCoreAddressFromMqttTopic("a/+/c", defaultWildCardConfig)); + assertEquals("a.*.#", MQTTUtil.getCoreAddressFromMqttTopic("a/+/#", defaultWildCardConfig)); + assertEquals("1\\.0.device", MQTTUtil.getCoreAddressFromMqttTopic("1.0/device", defaultWildCardConfig)); + assertEquals("*", MQTTUtil.getCoreAddressFromMqttTopic("+", defaultWildCardConfig)); + assertEquals(".", MQTTUtil.getCoreAddressFromMqttTopic("/", defaultWildCardConfig)); + assertEquals("#", MQTTUtil.getCoreAddressFromMqttTopic("#", defaultWildCardConfig)); + + WildcardConfiguration customWildCardConfig = new WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!'); + assertEquals("a|b|c", MQTTUtil.getCoreAddressFromMqttTopic("a/b/c", customWildCardConfig)); + assertEquals("a|$|c", MQTTUtil.getCoreAddressFromMqttTopic("a/+/c", customWildCardConfig)); + assertEquals("a|$|!", MQTTUtil.getCoreAddressFromMqttTopic("a/+/#", customWildCardConfig)); + assertEquals("1.0|device", MQTTUtil.getCoreAddressFromMqttTopic("1.0/device", customWildCardConfig)); + assertEquals("$", MQTTUtil.getCoreAddressFromMqttTopic("+", customWildCardConfig)); + assertEquals("|", MQTTUtil.getCoreAddressFromMqttTopic("/", customWildCardConfig)); + assertEquals("!", MQTTUtil.getCoreAddressFromMqttTopic("#", customWildCardConfig)); + } + + @Test + public void testGetCoreRetainAddressFromMqttTopic() { + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreRetainAddressFromMqttTopic(null, null)); + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreRetainAddressFromMqttTopic(null, new WildcardConfiguration())); + assertThrows(NullPointerException.class, () -> MQTTUtil.getCoreRetainAddressFromMqttTopic("", null)); + + final String retainPrefix = "$sys.mqtt.retain."; + WildcardConfiguration defaultWildCardConfig = new WildcardConfiguration(); + assertEquals(retainPrefix + "a.b.c", MQTTUtil.getCoreRetainAddressFromMqttTopic("a/b/c", defaultWildCardConfig)); + } + + @Test + public void testGetMqttTopicFromCoreAddress() { + assertThrows(NullPointerException.class, () -> MQTTUtil.getMqttTopicFromCoreAddress(null, null)); + assertThrows(NullPointerException.class, () -> MQTTUtil.getMqttTopicFromCoreAddress(null, new WildcardConfiguration())); + assertThrows(NullPointerException.class, () -> MQTTUtil.getMqttTopicFromCoreAddress("", null)); + + WildcardConfiguration defaultWildCardConfig = new WildcardConfiguration(); + assertEquals("a/b/c", MQTTUtil.getMqttTopicFromCoreAddress("a.b.c", defaultWildCardConfig)); + assertEquals("a/+/c", MQTTUtil.getMqttTopicFromCoreAddress("a.*.c", defaultWildCardConfig)); + assertEquals("a/+/#", MQTTUtil.getMqttTopicFromCoreAddress("a.*.#", defaultWildCardConfig)); + assertEquals("1.0/device", MQTTUtil.getMqttTopicFromCoreAddress("1\\.0.device", defaultWildCardConfig)); + assertEquals("+", MQTTUtil.getMqttTopicFromCoreAddress("*", defaultWildCardConfig)); + assertEquals("/", MQTTUtil.getMqttTopicFromCoreAddress(".", defaultWildCardConfig)); + assertEquals("#", MQTTUtil.getMqttTopicFromCoreAddress("#", defaultWildCardConfig)); + + WildcardConfiguration customWildCardConfig = new WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!'); + assertEquals("a/b/c", MQTTUtil.getMqttTopicFromCoreAddress("a|b|c", customWildCardConfig)); + assertEquals("a/+/c", MQTTUtil.getMqttTopicFromCoreAddress("a|$|c", customWildCardConfig)); + assertEquals("a/+/#", MQTTUtil.getMqttTopicFromCoreAddress("a|$|!", customWildCardConfig)); + assertEquals("1.0/device", MQTTUtil.getMqttTopicFromCoreAddress("1.0|device", customWildCardConfig)); + assertEquals("+", MQTTUtil.getMqttTopicFromCoreAddress("$", customWildCardConfig)); + assertEquals("/", MQTTUtil.getMqttTopicFromCoreAddress("|", customWildCardConfig)); + assertEquals("#", MQTTUtil.getMqttTopicFromCoreAddress("!", customWildCardConfig)); + } } diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java index 197e130c674..074e729dd05 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java @@ -28,15 +28,7 @@ public class OpenWireUtil { - public static class OpenWireWildcardConfiguration extends WildcardConfiguration { - public OpenWireWildcardConfiguration() { - setDelimiter('.'); - setSingleWord('*'); - setAnyWords('>'); - } - } - - public static final WildcardConfiguration OPENWIRE_WILDCARD = new OpenWireWildcardConfiguration(); + public static final WildcardConfiguration OPENWIRE_WILDCARD = new WildcardConfiguration().setDelimiter('.').setAnyWords('>').setSingleWord('*'); public static final String SELECTOR_AWARE_OPTION = "selectorAware"; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java index bdaf36ef17f..3ee2f7b6860 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java @@ -28,6 +28,8 @@ public class WildcardConfiguration implements Serializable { static final char DELIMITER = '.'; + static final char ESCAPE = '\\'; + boolean routingEnabled = true; char singleWord = SINGLE_WORD; @@ -42,19 +44,33 @@ public class WildcardConfiguration implements Serializable { String delimiterString = String.valueOf(delimiter); + String escapeString = String.valueOf(ESCAPE); + @Override public boolean equals(Object o) { - if (this == o) return true; - if (!(o instanceof WildcardConfiguration)) return false; + if (this == o) { + return true; + } + if (!(o instanceof WildcardConfiguration)) { + return false; + } WildcardConfiguration that = (WildcardConfiguration) o; - if (routingEnabled != that.routingEnabled) return false; - if (singleWord != that.singleWord) return false; - if (anyWords != that.anyWords) return false; - return delimiter == that.delimiter; - + if (routingEnabled != that.routingEnabled) { + return false; + } + if (singleWord != that.singleWord) { + return false; + } + if (anyWords != that.anyWords) { + return false; + } + if (delimiter != that.delimiter) { + return false; + } + return true; } @Override @@ -80,8 +96,9 @@ public boolean isRoutingEnabled() { return routingEnabled; } - public void setRoutingEnabled(boolean routingEnabled) { + public WildcardConfiguration setRoutingEnabled(boolean routingEnabled) { this.routingEnabled = routingEnabled; + return this; } public char getAnyWords() { @@ -93,9 +110,10 @@ public String getAnyWordsString() { } - public void setAnyWords(char anyWords) { + public WildcardConfiguration setAnyWords(char anyWords) { this.anyWords = anyWords; this.anyWordsString = String.valueOf(anyWords); + return this; } public char getDelimiter() { @@ -106,9 +124,10 @@ public String getDelimiterString() { return delimiterString; } - public void setDelimiter(char delimiter) { + public WildcardConfiguration setDelimiter(char delimiter) { this.delimiter = delimiter; this.delimiterString = String.valueOf(delimiter); + return this; } public char getSingleWord() { @@ -119,19 +138,94 @@ public String getSingleWordString() { return singleWordString; } - public void setSingleWord(char singleWord) { + public WildcardConfiguration setSingleWord(char singleWord) { this.singleWord = singleWord; this.singleWordString = String.valueOf(singleWord); + return this; } - public String convert(String filter, WildcardConfiguration to) { - if (this.equals(to)) { - return filter; + /** + * Convert the input from this WildcardConfiguration into the specified WildcardConfiguration. + * + * If the input already contains characters defined in the target WildcardConfiguration then those characters will + * be escaped and preserved as such in the returned String. That said, wildcard characters which are the same + * between the two configurations will not be escaped + * + * If the input already contains escaped characters defined in this WildcardConfiguration then those characters will + * be unescaped after conversion and restored in the returned String. + * + * @param input the String to convert + * @param target the WildcardConfiguration to convert the input into + * @return the converted String + */ + public String convert(final String input, final WildcardConfiguration target) { + if (this.equals(target)) { + return input; } else { - return filter - .replace(getDelimiter(), to.getDelimiter()) - .replace(getSingleWord(), to.getSingleWord()) - .replace(getAnyWords(), to.getAnyWords()); + boolean escaped = isEscaped(input); + StringBuilder result; + if (!escaped) { + result = new StringBuilder(target.escape(input, this)); + } else { + result = new StringBuilder(input); + } + replaceChar(result, getDelimiter(), target.getDelimiter()); + replaceChar(result, getSingleWord(), target.getSingleWord()); + replaceChar(result, getAnyWords(), target.getAnyWords()); + if (escaped) { + return unescape(result.toString()); + } else { + return result.toString(); + } + } + } + + private String escape(final String input, WildcardConfiguration from) { + String result = input.replace(escapeString, escapeString + escapeString); + if (delimiter != from.getDelimiter()) { + result = result.replace(getDelimiterString(), escapeString + getDelimiterString()); + } + if (singleWord != from.getSingleWord()) { + result = result.replace(getSingleWordString(), escapeString + getSingleWordString()); + } + if (anyWords != from.getAnyWords()) { + result = result.replace(getAnyWordsString(), escapeString + getAnyWordsString()); + } + return result; + } + + private String unescape(final String input) { + return input + .replace(escapeString + escapeString, escapeString) + .replace(ESCAPE + getDelimiterString(), getDelimiterString()) + .replace(ESCAPE + getSingleWordString(), getSingleWordString()) + .replace(ESCAPE + getAnyWordsString(), getAnyWordsString()); + } + + private boolean isEscaped(final String input) { + for (int i = 0; i < input.length() - 1; i++) { + if (input.charAt(i) == ESCAPE && (input.charAt(i + 1) == getDelimiter() || input.charAt(i + 1) == getSingleWord() || input.charAt(i + 1) == getAnyWords())) { + return true; + } + } + return false; + } + + /** + * This will replace one character with another while ignoring escaped characters (i.e. those proceeded with '\'). + * + * @param result the final result of the replacement + * @param replace the character to replace + * @param replacement the replacement character to use + */ + private void replaceChar(StringBuilder result, char replace, char replacement) { + if (replace == replacement) { + return; + } + for (int i = 0; i < result.length(); i++) { + if (result.charAt(i) == replace && (i == 0 || result.charAt(i - 1) != ESCAPE)) { + result.setCharAt(i, replacement); + } } } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java new file mode 100644 index 00000000000..394541b558e --- /dev/null +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.config; + +import org.apache.activemq.artemis.utils.RandomUtil; +import org.junit.Assert; +import org.junit.Test; + +public class WildcardConfigurationTest extends Assert { + + private static final WildcardConfiguration MQTT_WILDCARD = new WildcardConfiguration().setDelimiter('/').setAnyWords('#').setSingleWord('+'); + private static final WildcardConfiguration DEFAULT_WILDCARD = new WildcardConfiguration(); + + @Test + public void testDefaultWildcard() { + assertEquals('.', DEFAULT_WILDCARD.getDelimiter()); + assertEquals('*', DEFAULT_WILDCARD.getSingleWord()); + assertEquals('#', DEFAULT_WILDCARD.getAnyWords()); + } + + @Test + public void testToFromCoreMQTT() { + testToFromCoreMQTT("foo.foo", "foo/foo"); + testToFromCoreMQTT("foo.*.foo", "foo/+/foo"); + testToFromCoreMQTT("foo.#", "foo/#"); + testToFromCoreMQTT("foo.*.foo.#", "foo/+/foo/#"); + testToFromCoreMQTT("foo\\.foo.foo", "foo.foo/foo"); + } + + private void testToFromCoreMQTT(String coreAddress, String mqttTopicFilter) { + assertEquals(coreAddress, MQTT_WILDCARD.convert(mqttTopicFilter, DEFAULT_WILDCARD)); + assertEquals(mqttTopicFilter, DEFAULT_WILDCARD.convert(coreAddress, MQTT_WILDCARD)); + } + + @Test + public void testEquality() { + WildcardConfiguration a = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c'); + WildcardConfiguration b = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c'); + + assertEquals(a, b); + assertEquals(b, a); + assertEquals(a.hashCode(), b.hashCode()); + + String toConvert = RandomUtil.randomString(); + assertSame(toConvert, a.convert(toConvert, b)); + assertSame(toConvert, a.convert(toConvert, a)); + } + + @Test + public void testEqualityNegative() { + WildcardConfiguration a; + WildcardConfiguration b; + + // none equal + a = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c'); + b = new WildcardConfiguration().setDelimiter('x').setAnyWords('y').setSingleWord('z'); + + assertNotEquals(a, b); + assertNotEquals(b, a); + assertNotEquals(a.hashCode(), b.hashCode()); + + // only delimiter equal + a = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c'); + b = new WildcardConfiguration().setDelimiter('a').setAnyWords('y').setSingleWord('z'); + + assertNotEquals(a, b); + assertNotEquals(b, a); + assertNotEquals(a.hashCode(), b.hashCode()); + + // only anyWords equal + a = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c'); + b = new WildcardConfiguration().setDelimiter('x').setAnyWords('b').setSingleWord('z'); + + assertNotEquals(a, b); + assertNotEquals(b, a); + assertNotEquals(a.hashCode(), b.hashCode()); + + // only singleWord equal + a = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c'); + b = new WildcardConfiguration().setDelimiter('x').setAnyWords('y').setSingleWord('c'); + + assertNotEquals(a, b); + assertNotEquals(b, a); + assertNotEquals(a.hashCode(), b.hashCode()); + + // only delimiter not equal + a = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c'); + b = new WildcardConfiguration().setDelimiter('x').setAnyWords('b').setSingleWord('c'); + + assertNotEquals(a, b); + assertNotEquals(b, a); + assertNotEquals(a.hashCode(), b.hashCode()); + + // only anyWords not equal + a = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c'); + b = new WildcardConfiguration().setDelimiter('a').setAnyWords('y').setSingleWord('c'); + + assertNotEquals(a, b); + assertNotEquals(b, a); + assertNotEquals(a.hashCode(), b.hashCode()); + + // only singleWord not equal + a = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c'); + b = new WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('z'); + + assertNotEquals(a, b); + assertNotEquals(b, a); + assertNotEquals(a.hashCode(), b.hashCode()); + } +} \ No newline at end of file diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc index 0c02c47969d..8b74af49bc8 100644 --- a/docs/user-manual/mqtt.adoc +++ b/docs/user-manual/mqtt.adoc @@ -129,7 +129,8 @@ If you perform some custom validation of the client ID you can reject the client == Wildcard subscriptions -MQTT addresses are hierarchical much like a file system, and they use a special character (i.e. `/` by default) to separate hierarchical levels. +MQTT defines a special wildcard syntax for topic filters. This definition is found in section 4.7.1 of both the http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718107[3.1.1] and https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901242[5] specs. +MQTT topics are hierarchical much like a file system, and they use a special character (i.e. `/` by default) to separate hierarchical levels. Subscribers are able to subscribe to specific topics or to whole branches of a hierarchy. To subscribe to branches of an address hierarchy a subscriber can use wild cards. @@ -147,9 +148,20 @@ This can be useful, but should be done so with care since it has significant per Matches a single level in the address hierarchy. For example `/uk/+/stores` would match `/uk/newcastle/stores` but not `/uk/cities/newcastle/stores`. -These MQTT-specific wildcards are automatically _translated_ into the wildcard syntax used by ActiveMQ Artemis. -These wildcards are configurable. -See the xref:wildcard-syntax.adoc#customizing-the-syntax[Wildcard Syntax] chapter for details about how to configure custom wildcards. +This is _close_ to the default xref:wildcard-syntax.adoc#wildcard-syntax[wildcard syntax], but not exactly the same. +Therefore, some conversion is necessary. +This conversion isn't free so *if you want the best MQTT performance* use `broker.xml` to configure the wildcard syntax to match MQTT's, e.g.: + +[,xml] +---- + + / + # + * + +---- + +Of course, changing the default syntax also means other clients on other protocols will need to follow this same syntax as well as the `match` values of your `address-setting` configuration elements. == Web Sockets diff --git a/docs/user-manual/versions.adoc b/docs/user-manual/versions.adoc index e0be84f7c3a..610d681c38c 100644 --- a/docs/user-manual/versions.adoc +++ b/docs/user-manual/versions.adoc @@ -12,6 +12,28 @@ NOTE: If the upgrade spans multiple versions then the steps from *each* version NOTE: Follow the general upgrade procedure outlined in the xref:upgrading.adoc#upgrading-the-broker[Upgrading the Broker] chapter in addition to any version-specific upgrade instructions outlined here. +== 2.33.0 + +https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=...[Full release notes] + +=== Highlights + +* highlight 1 +* highlight 2 + +=== Upgrading from 2.32.0 + +* Due to https://issues.apache.org/jira/browse/ARTEMIS-4532[ARTEMIS-4532] the names of addresses and queues related to MQTT topics and subscriptions respectively may change. +This will only impact you if *both* of the following are true: ++ +. The broker is configured to use a xref:wildcard-syntax.adoc[wildcard syntax] which _doesn't match_ the xref:mqtt.adoc#wildcard-syntax[MQTT wildcard syntax] (e.g. the default wildcard syntax). +. You are using characters from the broker's wildcard syntax in your MQTT topic name or filter. +For example, if you were using the default wildcard syntax and an MQTT topic named `1.0/group/device`. +The dot (`.`) character here is part of the broker's wildcard syntax, and it is being used in the name of an MQTT topic. ++ +In this case the characters from the broker's wildcard syntax that do not match the characters in the MQTT wildcard syntax will be escaped with a backslash (i.e. `\`). +To avoid this conversion you can configure the broker to use the MQTT wildcard syntax or change the name of the MQTT topic name or filter. + == 2.32.0 https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12353769[Full release notes] diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java index 91ba3d732de..4575e29aa73 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java @@ -1903,10 +1903,10 @@ public void testClientDisconnectedOnMaxConsumerLimitReached() throws Exception { Exception peerDisconnectedException = null; try { String clientId = "test.client"; - String coreAddress = MQTTUtil.convertMqttTopicFilterToCore("foo/bar", server.getConfiguration().getWildcardConfiguration()); + String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic("foo/bar", server.getConfiguration().getWildcardConfiguration()); Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}; - getServer().createQueue(new QueueConfiguration(new SimpleString(clientId + "." + coreAddress)).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0)); + getServer().createQueue(new QueueConfiguration(MQTTUtil.getCoreQueueFromMqttTopic("foo/bar", clientId, server.getConfiguration().getWildcardConfiguration())).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0)); MQTT mqtt = createMQTTConnection(); mqtt.setClientId(clientId); @@ -2151,11 +2151,11 @@ public void autoDestroyAddress() throws Exception { @Test(timeout = 60 * 1000) public void testAutoDeleteRetainedQueue() throws Exception { final String TOPIC = "/abc/123"; - final String RETAINED_QUEUE = MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, server.getConfiguration().getWildcardConfiguration()); + final String RETAINED_QUEUE = MQTTUtil.getCoreRetainAddressFromMqttTopic(TOPIC, server.getConfiguration().getWildcardConfiguration()); final MQTTClientProvider publisher = getMQTTClientProvider(); final MQTTClientProvider subscriber = getMQTTClientProvider(); - server.getAddressSettingsRepository().addMatch(MQTTUtil.convertMqttTopicFilterToCore("#", server.getConfiguration().getWildcardConfiguration()), new AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true)); + server.getAddressSettingsRepository().addMatch(MQTTUtil.getCoreAddressFromMqttTopic("#", server.getConfiguration().getWildcardConfiguration()), new AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true)); initializeConnection(publisher); initializeConnection(subscriber); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java index 491c065fe38..a15360f115a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java @@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; @@ -201,7 +202,7 @@ protected void configureBrokerSecurity(ActiveMQServer server) { value.add(new Role("browser", false, false, false, false, false, false, false, true, false, false)); value.add(new Role("guest", false, true, false, false, false, false, false, true, false, false)); value.add(new Role("full", true, true, true, true, true, true, true, true, true, true)); - securityRepository.addMatch(getQueueName(), value); + securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(), server.getConfiguration().getWildcardConfiguration()), value); server.getConfiguration().setSecurityEnabled(true); } else { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java index 8a6341a8b07..01d2e63b7f8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java @@ -81,8 +81,8 @@ public void testWildcardSubAutoCreateDoesNotPageToWildcardAddress() throws Excep String subscriberId = UUID.randomUUID().toString(); String senderId = UUID.randomUUID().toString(); - String subscribeTo = "A.*"; - String publishTo = "A.a"; + String subscribeTo = "A/+"; + String publishTo = "A/a"; subscriber = createMqttClient(subscriberId); subscriber.subscribe(subscribeTo, 2); @@ -93,7 +93,7 @@ public void testWildcardSubAutoCreateDoesNotPageToWildcardAddress() throws Excep sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, false); sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, false); - assertTrue(server.getPagingManager().getPageStore(new SimpleString(subscribeTo)).isPaging()); + assertTrue(server.getPagingManager().getPageStore(new SimpleString(MQTTUtil.getCoreAddressFromMqttTopic(subscribeTo, server.getConfiguration().getWildcardConfiguration()))).isPaging()); subscriber = createMqttClient(subscriberId); subscriber.subscribe(subscribeTo, 2); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java index b13b2b5ef4c..0af994e89c2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.mqtt; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.security.Role; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; @@ -53,7 +54,7 @@ protected void configureBrokerSecurity(ActiveMQServer server) { HashSet value = new HashSet<>(); value.add(new Role("addressOnly", true, true, true, true, false, false, false, false, true, true)); - securityRepository.addMatch(getQueueName(), value); + securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(), server.getConfiguration().getWildcardConfiguration()), value); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java index 247fdb68ae7..05f7eb9b315 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; @@ -83,6 +84,27 @@ public void messageArrived(String topic, MqttMessage message) { assertTrue(latch.await(500, TimeUnit.MILLISECONDS)); } + @Test(timeout = DEFAULT_TIMEOUT) + public void testTopicNameEscape() throws Exception { + final String topic = "foo1.0/bar/baz"; + AtomicReference receivedTopic = new AtomicReference<>(); + + MqttClient subscriber = createPahoClient("subscriber"); + subscriber.connect(); + subscriber.setCallback(new DefaultMqttCallback() { + @Override + public void messageArrived(String t, MqttMessage message) { + receivedTopic.set(t); + } + }); + subscriber.subscribe(topic, AT_LEAST_ONCE); + + MqttClient producer = createPahoClient("producer"); + producer.connect(); + producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1, false); + Wait.assertEquals(topic, receivedTopic::get, 500, 50); + } + /* * Ensure that the broker adds a timestamp on the message when sending via MQTT */ @@ -333,7 +355,7 @@ public void testSharedSubscriptionsWithSameName() throws Exception { consumer1.subscribe(SHARED_SUB1, 1); assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1))); - Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME); + Queue q1 = getSharedSubscriptionQueue(SHARED_SUB1); assertNotNull(q1); assertEquals(TOPIC1, q1.getAddress().toString()); assertEquals(1, q1.getConsumerCount()); @@ -344,7 +366,7 @@ public void testSharedSubscriptionsWithSameName() throws Exception { consumer2.subscribe(SHARED_SUB2, 1); assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2))); - Queue q2 = getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME); + Queue q2 = getSharedSubscriptionQueue(SHARED_SUB2); assertNotNull(q2); assertEquals(TOPIC2, q2.getAddress().toString()); assertEquals(1, q2.getConsumerCount()); @@ -360,10 +382,10 @@ public void testSharedSubscriptionsWithSameName() throws Exception { assertTrue(ackLatch2.await(2, TimeUnit.SECONDS)); consumer1.unsubscribe(SHARED_SUB1); - assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME)); + assertNull(getSharedSubscriptionQueue(SHARED_SUB1)); consumer2.unsubscribe(SHARED_SUB2); - assertNull(getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME)); + assertNull(getSharedSubscriptionQueue(SHARED_SUB2)); consumer1.disconnect(); consumer1.close(); @@ -388,13 +410,13 @@ public void testSharedSubscriptionsWithSameName2() throws Exception { consumer.subscribe(SHARED_SUBS, new int[]{1, 1}); assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1))); - Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME); + Queue q1 = getSharedSubscriptionQueue(SHARED_SUBS[0]); assertNotNull(q1); assertEquals(TOPIC1, q1.getAddress().toString()); assertEquals(1, q1.getConsumerCount()); assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2))); - Queue q2 = getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME); + Queue q2 = getSharedSubscriptionQueue(SHARED_SUBS[1]); assertNotNull(q2); assertEquals(TOPIC2, q2.getAddress().toString()); assertEquals(1, q2.getConsumerCount()); @@ -409,8 +431,8 @@ public void testSharedSubscriptionsWithSameName2() throws Exception { assertTrue(ackLatch.await(2, TimeUnit.SECONDS)); consumer.unsubscribe(SHARED_SUBS); - assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME)); - assertNull(getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME)); + assertNull(getSharedSubscriptionQueue(SHARED_SUBS[0])); + assertNull(getSharedSubscriptionQueue(SHARED_SUBS[1])); consumer.disconnect(); consumer.close(); @@ -644,7 +666,7 @@ public void testSubscriptionQueueName() throws Exception { MqttClient client = createPahoClient(clientID); client.connect(); client.subscribe(topic, 1); - Wait.assertTrue(() -> server.locateQueue(SimpleString.toSimpleString(clientID.concat(".").concat(topic.replace('/', '.')))) != null, 2000, 100); + Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) != null, 2000, 100); client.disconnect(); client.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java index a4f106a2fda..e2cd5e66b97 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java @@ -37,11 +37,10 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.postoffice.Binding; -import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.security.Role; @@ -107,7 +106,7 @@ protected MqttAsyncClient createAsyncPahoClient(String clientId) throws MqttExce return new MqttAsyncClient(TCP + "://localhost:" + (isUseSsl() ? getSslPort() : getPort()), clientId, new MemoryPersistence()); } - private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected static final long DEFAULT_TIMEOUT = 300000; protected ActiveMQServer server; @@ -345,40 +344,16 @@ public MQTTProtocolManager getProtocolManager() { return null; } - protected Queue getSubscriptionQueue(String TOPIC) { - try { - Object[] array = server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(TOPIC)).getBindings().toArray(); - if (array.length == 0) { - return null; - } else { - return ((LocalQueueBinding)array[0]).getQueue(); - } - } catch (Exception e) { - e.printStackTrace(); - return null; - } + protected Queue getSharedSubscriptionQueue(String mqttTopicFilter) { + return getSubscriptionQueue(mqttTopicFilter, null); } - protected Queue getSubscriptionQueue(String TOPIC, String clientId) { - return getSubscriptionQueue(TOPIC, clientId, null); + protected Queue getSubscriptionQueue(String mqttTopicFilter, String clientId) { + return server.locateQueue(MQTTUtil.getCoreQueueFromMqttTopic(mqttTopicFilter, clientId, server.getConfiguration().getWildcardConfiguration())); } - protected Queue getSubscriptionQueue(String TOPIC, String clientId, String sharedSubscriptionName) { - try { - for (Binding b : server.getPostOffice().getMatchingBindings(SimpleString.toSimpleString(TOPIC))) { - if (sharedSubscriptionName != null) { - if (((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(sharedSubscriptionName))) { - return ((LocalQueueBinding)b).getQueue(); - } - } else if (((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId))) { - return ((LocalQueueBinding)b).getQueue(); - } - } - return null; - } catch (Exception e) { - e.printStackTrace(); - return null; - } + protected Queue getRetainedMessageQueue(String mqttTopicFilter) { + return server.locateQueue(MQTTUtil.getCoreRetainAddressFromMqttTopic(mqttTopicFilter, server.getConfiguration().getWildcardConfiguration())); } protected void setAcceptorProperty(String property) throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java index 3340df263de..2113fb194a3 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java @@ -55,10 +55,11 @@ public class ControlPacketFormatTests extends MQTT5TestSupport { @Test(timeout = DEFAULT_TIMEOUT) public void testPacketIdQoSZero() throws Exception { final String TOPIC = this.getTopicName(); + final String CONSUMER_CLIENT_ID = "consumer"; final int MESSAGE_COUNT = 100; final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT); - MqttClient consumer = createPahoClient("consumer"); + MqttClient consumer = createPahoClient(CONSUMER_CLIENT_ID); consumer.setCallback(new DefaultMqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { @@ -75,7 +76,7 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { for (int i = 0; i < MESSAGE_COUNT; i++) { producer.publish(TOPIC, ("foo" + i).getBytes(), 0, false); } - Wait.assertEquals(MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC).getMessagesAdded()); + Wait.assertEquals(MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC, CONSUMER_CLIENT_ID).getMessagesAdded()); producer.disconnect(); producer.close(); @@ -111,15 +112,15 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { }); consumer.connect(); consumer.subscribe(TOPIC, 2); - Wait.assertTrue(() -> getSubscriptionQueue(TOPIC) != null); - Wait.assertEquals(1, () -> getSubscriptionQueue(TOPIC).getConsumerCount()); + Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID) != null); + Wait.assertEquals(1, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getConsumerCount()); MqttClient producer = createPahoClient("producer"); producer.connect(); for (int i = 0; i < MESSAGE_COUNT; i++) { producer.publish(TOPIC, ("foo" + i).getBytes(), (RandomUtil.randomPositiveInt() % 2) + 1, false); } - Wait.assertEquals(MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC).getMessagesAdded()); + Wait.assertEquals(MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded()); producer.disconnect(); producer.close(); @@ -173,7 +174,8 @@ public void testPacketIdPubAckQoS2() throws Exception { final String TOPIC = this.getTopicName(); final CountDownLatch latch = new CountDownLatch(1); - MqttClient consumer = createPahoClient("consumer"); + final String CONSUMER_ID = "consumer"; + MqttClient consumer = createPahoClient(CONSUMER_ID); consumer.setCallback(new DefaultMqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { @@ -186,11 +188,11 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { MqttClient producer = createPahoClient("producer"); producer.connect(); producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 2, false); - Wait.assertEquals((long) 1, () -> getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100); + Wait.assertEquals((long) 1, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100); producer.disconnect(); producer.close(); - Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC).getMessagesAcknowledged(), 15000, 100); + Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAcknowledged(), 15000, 100); assertTrue(latch.await(15, TimeUnit.SECONDS)); Wait.assertFalse(() -> failed.get(), 2000, 100); Wait.assertEquals(8, () -> packetCount.get()); @@ -243,7 +245,8 @@ public void testPacketIdPubAckQoS1() throws Exception { final String TOPIC = this.getTopicName(); final CountDownLatch latch = new CountDownLatch(1); - MqttClient consumer = createPahoClient("consumer"); + final String CONSUMER_ID = "consumer"; + MqttClient consumer = createPahoClient(CONSUMER_ID); consumer.setCallback(new DefaultMqttCallback() { @Override public void messageArrived(String topic, MqttMessage message) throws Exception { @@ -256,11 +259,11 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { MqttClient producer = createPahoClient("producer"); producer.connect(); producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 1, false); - Wait.assertEquals((long) 1, () -> getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100); + Wait.assertEquals((long) 1, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100); producer.disconnect(); producer.close(); - Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC).getMessagesAcknowledged(), 15000, 100); + Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAcknowledged(), 15000, 100); assertTrue(latch.await(15, TimeUnit.SECONDS)); Wait.assertFalse(() -> failed.get(), 2000, 100); Wait.assertEquals(4, () -> packetCount.get()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java index 35b1dabeed3..3798edf8a1c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java @@ -46,12 +46,13 @@ public class MessageReceiptTests extends MQTT5TestSupport { @Test(timeout = DEFAULT_TIMEOUT) public void testMessageReceipt() throws Exception { final String TOPIC = RandomUtil.randomString(); + final String CONSUMER_ID = "consumer"; final int CONSUMER_COUNT = 25; final MqttClient[] consumers = new MqttClient[CONSUMER_COUNT]; final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT); for (int i = 0; i < CONSUMER_COUNT; i++) { - MqttClient consumer = createPahoClient(RandomUtil.randomString()); + MqttClient consumer = createPahoClient(CONSUMER_ID + i); consumers[i] = consumer; consumer.connect(); int finalI = i; @@ -75,7 +76,7 @@ public void messageArrived(String incomingTopic, MqttMessage message) throws Exc Wait.assertEquals((long) CONSUMER_COUNT, () -> { int totalMessagesAdded = 0; for (int i = 0; i < CONSUMER_COUNT; i++) { - totalMessagesAdded += getSubscriptionQueue(TOPIC + i).getMessagesAdded(); + totalMessagesAdded += getSubscriptionQueue(TOPIC + i, CONSUMER_ID + i).getMessagesAdded(); } return totalMessagesAdded; }, 2000, 100); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java index e28c6cf54c2..9e307b2de4f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java @@ -101,14 +101,15 @@ public void messageArrived(String incomingTopic, MqttMessage message) throws Exc @Test(timeout = DEFAULT_TIMEOUT) public void testQoS1PubAck() throws Exception { final String TOPIC = RandomUtil.randomString(); + final String CONSUMER_ID = "consumer"; final CountDownLatch ackLatch = new CountDownLatch(1); final AtomicInteger packetId = new AtomicInteger(); MQTTInterceptor incomingInterceptor = (packet, connection) -> { if (packet.fixedHeader().messageType() == MqttMessageType.PUBACK) { // ensure the message is still in the queue before we get the ack from the client - assertEquals(1, getSubscriptionQueue(TOPIC).getMessageCount()); - assertEquals(1, getSubscriptionQueue(TOPIC).getDeliveringCount()); + assertEquals(1, getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount()); + assertEquals(1, getSubscriptionQueue(TOPIC, CONSUMER_ID).getDeliveringCount()); // ensure the ids match so we know this is the "corresponding" PUBACK for the previous PUBLISH assertEquals(packetId.get(), ((MqttPubReplyMessageVariableHeader)packet.variableHeader()).messageId()); @@ -129,7 +130,7 @@ public void testQoS1PubAck() throws Exception { server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor); final CountDownLatch latch = new CountDownLatch(1); - MqttClient consumer = createPahoClient("consumer"); + MqttClient consumer = createPahoClient(CONSUMER_ID); consumer.connect(); consumer.setCallback(new LatchedMqttCallback(latch)); consumer.subscribe(TOPIC, 1); @@ -142,8 +143,8 @@ public void testQoS1PubAck() throws Exception { assertTrue(ackLatch.await(2, TimeUnit.SECONDS)); assertTrue(latch.await(2, TimeUnit.SECONDS)); - assertEquals(0, getSubscriptionQueue(TOPIC).getMessageCount()); - assertEquals(0, getSubscriptionQueue(TOPIC).getDeliveringCount()); + assertEquals(0, getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount()); + assertEquals(0, getSubscriptionQueue(TOPIC, CONSUMER_ID).getDeliveringCount()); consumer.disconnect(); consumer.close(); } @@ -241,14 +242,15 @@ public void testQoS1PubAckId() throws Exception { @Test(timeout = DEFAULT_TIMEOUT) public void testQoS2PubRec() throws Exception { final String TOPIC = RandomUtil.randomString(); + final String CONSUMER_ID = "consumer"; final CountDownLatch ackLatch = new CountDownLatch(1); final AtomicInteger packetId = new AtomicInteger(); MQTTInterceptor incomingInterceptor = (packet, connection) -> { if (packet.fixedHeader().messageType() == MqttMessageType.PUBREC) { // ensure the message is still in the queue before we get the ack from the client - assertEquals(1, getSubscriptionQueue(TOPIC).getMessageCount()); - assertEquals(1, getSubscriptionQueue(TOPIC).getDeliveringCount()); + assertEquals(1, getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount()); + assertEquals(1, getSubscriptionQueue(TOPIC, CONSUMER_ID).getDeliveringCount()); // ensure the ids match so we know this is the "corresponding" PUBREC for the previous PUBLISH assertEquals(packetId.get(), ((MqttPubReplyMessageVariableHeader)packet.variableHeader()).messageId()); @@ -269,7 +271,7 @@ public void testQoS2PubRec() throws Exception { server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor); final CountDownLatch latch = new CountDownLatch(1); - MqttClient consumer = createPahoClient("consumer"); + MqttClient consumer = createPahoClient(CONSUMER_ID); consumer.connect(); consumer.setCallback(new LatchedMqttCallback(latch)); consumer.subscribe(TOPIC, 2); @@ -282,8 +284,8 @@ public void testQoS2PubRec() throws Exception { assertTrue(ackLatch.await(2, TimeUnit.SECONDS)); assertTrue(latch.await(2, TimeUnit.SECONDS)); - assertEquals(0, getSubscriptionQueue(TOPIC).getMessageCount()); - assertEquals(0, getSubscriptionQueue(TOPIC).getDeliveringCount()); + assertEquals(0, getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount()); + assertEquals(0, getSubscriptionQueue(TOPIC, CONSUMER_ID).getDeliveringCount()); consumer.disconnect(); consumer.close(); } @@ -348,7 +350,7 @@ public void testQoS2PubRelId() throws Exception { @Test(timeout = DEFAULT_TIMEOUT) public void testQoS2PubRel() throws Exception { final String TOPIC = RandomUtil.randomString(); - final String CONSUMER_CLIENT_ID = "consumer"; + final String CONSUMER_ID = "consumer"; final CountDownLatch ackLatch = new CountDownLatch(1); final AtomicInteger packetId = new AtomicInteger(); @@ -356,8 +358,8 @@ public void testQoS2PubRel() throws Exception { if (packet.fixedHeader().messageType() == MqttMessageType.PUBCOMP) { try { // ensure the message is still in the management queue before we get the PUBCOMP from the client - Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + CONSUMER_CLIENT_ID).getMessageCount(), 2000, 100); - Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + CONSUMER_CLIENT_ID).getDeliveringCount(), 2000, 100); + Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + CONSUMER_ID).getMessageCount(), 2000, 100); + Wait.assertEquals(1L, () -> server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + CONSUMER_ID).getDeliveringCount(), 2000, 100); } catch (Exception e) { return false; } @@ -381,7 +383,7 @@ public void testQoS2PubRel() throws Exception { server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor); final CountDownLatch latch = new CountDownLatch(1); - MqttClient consumer = createPahoClient(CONSUMER_CLIENT_ID); + MqttClient consumer = createPahoClient(CONSUMER_ID); consumer.connect(); consumer.setCallback(new LatchedMqttCallback(latch)); consumer.subscribe(TOPIC, 2); @@ -394,8 +396,8 @@ public void testQoS2PubRel() throws Exception { assertTrue(ackLatch.await(2, TimeUnit.SECONDS)); assertTrue(latch.await(2, TimeUnit.SECONDS)); - assertEquals(0, getSubscriptionQueue(TOPIC).getMessageCount()); - assertEquals(0, getSubscriptionQueue(TOPIC).getDeliveringCount()); + assertEquals(0, getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount()); + assertEquals(0, getSubscriptionQueue(TOPIC, CONSUMER_ID).getDeliveringCount()); consumer.disconnect(); consumer.close(); } @@ -412,6 +414,7 @@ public void testQoS2PubRel() throws Exception { @Test(timeout = DEFAULT_TIMEOUT) public void testQoS2WithExpiration() throws Exception { final String TOPIC = "myTopic"; + final String CONSUMER_ID = "consumer"; final CountDownLatch ackLatch = new CountDownLatch(1); final CountDownLatch expireRefsLatch = new CountDownLatch(1); final long messageExpiryInterval = 2; @@ -419,12 +422,12 @@ public void testQoS2WithExpiration() throws Exception { MQTTInterceptor incomingInterceptor = (packet, connection) -> { if (packet.fixedHeader().messageType() == MqttMessageType.PUBREC) { // ensure the message is still in the queue before we get the PUBREC from the client - assertEquals(1, getSubscriptionQueue(TOPIC).getMessageCount()); - assertEquals(1, getSubscriptionQueue(TOPIC).getDeliveringCount()); + assertEquals(1, getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount()); + assertEquals(1, getSubscriptionQueue(TOPIC, CONSUMER_ID).getDeliveringCount()); try { // ensure enough time has passed for the message to expire Thread.sleep(messageExpiryInterval * 1500); - getSubscriptionQueue(TOPIC).expireReferences(expireRefsLatch::countDown); + getSubscriptionQueue(TOPIC, CONSUMER_ID).expireReferences(expireRefsLatch::countDown); assertTrue(expireRefsLatch.await(2, TimeUnit.SECONDS)); } catch (InterruptedException e) { e.printStackTrace(); @@ -438,7 +441,7 @@ public void testQoS2WithExpiration() throws Exception { server.getRemotingService().addIncomingInterceptor(incomingInterceptor); final CountDownLatch latch = new CountDownLatch(1); - MqttClient consumer = createPahoClient("consumer"); + MqttClient consumer = createPahoClient(CONSUMER_ID); consumer.connect(); consumer.setCallback(new DefaultMqttCallback() { @Override @@ -462,9 +465,9 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { assertTrue(ackLatch.await(messageExpiryInterval * 2, TimeUnit.SECONDS)); assertTrue(latch.await(messageExpiryInterval * 2, TimeUnit.SECONDS)); - Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC).getMessageCount()); - Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC).getDeliveringCount()); - Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC).getMessagesExpired()); + Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount()); + Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getDeliveringCount()); + Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesExpired()); consumer.disconnect(); consumer.close(); } @@ -628,7 +631,10 @@ public void testQoS2PubCompId() throws Exception { @Test(timeout = DEFAULT_TIMEOUT) public void testQoS2WithExpiration2() throws Exception { final String TOPIC = "myTopic"; - server.createQueue(new QueueConfiguration(RandomUtil.randomString()).setAddress(TOPIC).setRoutingType(RoutingType.MULTICAST)); + final String CONSUMER_ID = "consumer"; + server.createQueue(new QueueConfiguration(MQTTUtil.getCoreQueueFromMqttTopic(TOPIC, CONSUMER_ID, server.getConfiguration().getWildcardConfiguration())) + .setAddress(MQTTUtil.getCoreAddressFromMqttTopic(TOPIC, server.getConfiguration().getWildcardConfiguration())) + .setRoutingType(RoutingType.MULTICAST)); final CountDownLatch ackLatch = new CountDownLatch(1); final CountDownLatch expireRefsLatch = new CountDownLatch(1); final long messageExpiryInterval = 1; @@ -636,11 +642,11 @@ public void testQoS2WithExpiration2() throws Exception { MQTTInterceptor outgoingInterceptor = (packet, connection) -> { if (packet.fixedHeader().messageType() == MqttMessageType.PUBREC) { // ensure the message is in the queue before trying to expire - Wait.assertTrue(() -> getSubscriptionQueue(TOPIC).getMessageCount() == 1, 2000, 100); + Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount() == 1, 2000, 100); try { // ensure enough time has passed for the message to expire Thread.sleep(messageExpiryInterval * 1500); - getSubscriptionQueue(TOPIC).expireReferences(expireRefsLatch::countDown); + getSubscriptionQueue(TOPIC, CONSUMER_ID).expireReferences(expireRefsLatch::countDown); assertTrue(expireRefsLatch.await(2, TimeUnit.SECONDS)); } catch (InterruptedException e) { e.printStackTrace(); @@ -666,6 +672,6 @@ public void testQoS2WithExpiration2() throws Exception { producer.close(); assertTrue(ackLatch.await(messageExpiryInterval * 2, TimeUnit.SECONDS)); - Wait.assertEquals(1, () -> getSubscriptionQueue(TOPIC).getMessagesExpired()); + Wait.assertEquals(1, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesExpired()); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java index 6cea9dd86df..612882525db 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java @@ -480,13 +480,13 @@ public void testMaxPacketSize() throws Exception { producer.publish(TOPIC, bytes, 2, false); producer.disconnect(); producer.close(); - Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100); + Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100); // the client should *not* receive the message assertFalse(latch.await(2, TimeUnit.SECONDS)); // the broker should acknowledge the message since it exceeded the client's max packet size - Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC).getMessagesAcknowledged(), 2000, 100); + Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAcknowledged(), 2000, 100); consumer.disconnect(); consumer.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java index 655d5495edf..caa34d47c71 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets; +import java.lang.invoke.MethodHandles; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; @@ -29,7 +30,6 @@ import org.apache.activemq.artemis.api.core.QueueConfiguration; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; -import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport; import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.tests.util.Wait; @@ -48,7 +48,6 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.invoke.MethodHandles; /** * Fulfilled by client or Netty codec (i.e. not tested here): @@ -260,12 +259,14 @@ public void testRetainFlagWithEmptyMessage() throws Exception { final String CONSUMER_ID = RandomUtil.randomString(); final String TOPIC = this.getTopicName(); + assertNull(getRetainedMessageQueue(TOPIC)); + MqttClient producer = createPahoClient("producer"); producer.connect(); // send first retained message producer.publish(TOPIC, "retain1".getBytes(), 2, true); - Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100); + Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() == 1, 2000, 100); // send second retained message; should *remove* the first producer.publish(TOPIC, new byte[0], 2, true); @@ -273,7 +274,7 @@ public void testRetainFlagWithEmptyMessage() throws Exception { producer.disconnect(); producer.close(); - Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 0, 2000, 100); + Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() == 0, 2000, 100); final CountDownLatch latch = new CountDownLatch(1); MqttClient consumer = createPahoClient(CONSUMER_ID); @@ -302,12 +303,14 @@ public void testRetainFlagFalse() throws Exception { final String RETAINED_PAYLOAD = RandomUtil.randomString(); final String UNRETAINED_PAYLOAD = RandomUtil.randomString(); + assertNull(getRetainedMessageQueue(TOPIC)); + MqttClient producer = createPahoClient("producer"); producer.connect(); // send retained message producer.publish(TOPIC, RETAINED_PAYLOAD.getBytes(), 2, true); - Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 1000, 100); + Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() == 1, 1000, 100); // send an unretained message; should *not* remove the existing retained message producer.publish(TOPIC, UNRETAINED_PAYLOAD.getBytes(), 2, false); @@ -315,7 +318,7 @@ public void testRetainFlagFalse() throws Exception { producer.disconnect(); producer.close(); - Wait.assertFalse(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() > 1, 1000, 100); + Wait.assertFalse(() -> getRetainedMessageQueue(TOPIC).getMessageCount() > 1, 1000, 100); final CountDownLatch latch = new CountDownLatch(1); MqttClient consumer = createPahoClient(CONSUMER_ID); @@ -395,13 +398,17 @@ public void internalTestRetainHandlingZero(boolean filter, int subscriptionCount retainedPayloads[i] = RandomUtil.randomString(); } + for (int i = 0; i < SUBSCRIPTION_COUNT; i++) { + assertNull(getRetainedMessageQueue(topicNames[i])); + } + // send retained messages MqttClient producer = createPahoClient("producer"); producer.connect(); for (int i = 0; i < SUBSCRIPTION_COUNT; i++) { final String topicName = topicNames[i]; producer.publish(topicName, retainedPayloads[i].getBytes(), 2, true); - Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, topicName, server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1, 2000, 100); + Wait.assertTrue(() -> getRetainedMessageQueue(topicName).getMessageCount() == 1, 2000, 100); } producer.disconnect(); producer.close(); @@ -458,12 +465,14 @@ public void testRetainHandlingOne() throws Exception { final String CONSUMER_ID = RandomUtil.randomString(); final String TOPIC = this.getTopicName(); + assertNull(getRetainedMessageQueue(TOPIC)); + // send retained messages MqttClient producer = createPahoClient("producer"); producer.connect(); producer.publish(TOPIC, "retained".getBytes(), 2, true); - Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1, 2000, 100); + Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() == 1, 2000, 100); producer.disconnect(); producer.close(); @@ -491,7 +500,7 @@ public void messageArrived(String topic, MqttMessage message) { assertTrue(latch.await(2, TimeUnit.SECONDS)); // ensure the retained message has been successfully acknowledge and removed from the subscription queue - Wait.assertTrue(() -> getSubscriptionQueue(TOPIC).getMessageCount() == 0, 2000, 100); + Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount() == 0, 2000, 100); consumer.disconnect(); @@ -522,11 +531,13 @@ public void testRetainHandlingTwo() throws Exception { final String CONSUMER_ID = RandomUtil.randomString(); final String TOPIC = this.getTopicName(); + assertNull(getRetainedMessageQueue(TOPIC)); + // send first retained message MqttClient producer = createPahoClient("producer"); producer.connect(); producer.publish(TOPIC, "retained".getBytes(), 2, true); - Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100); + Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() == 1, 2000, 100); producer.disconnect(); producer.close(); @@ -578,11 +589,13 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { subscription.setRetainAsPublished(false); consumer.subscribe(new MqttSubscription[]{subscription}); + assertNull(getRetainedMessageQueue(TOPIC)); + // send retained message MqttClient producer = createPahoClient("producer"); producer.connect(); producer.publish(TOPIC, "retained".getBytes(), 2, true); - Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100); + Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() == 1, 2000, 100); producer.disconnect(); producer.close(); @@ -627,11 +640,13 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { subscription.setRetainAsPublished(true); consumer.subscribe(new MqttSubscription[]{subscription}); + assertNull(getRetainedMessageQueue(TOPIC)); + // send retained message MqttClient producer = createPahoClient("producer"); producer.connect(); producer.publish(TOPIC, "retained".getBytes(), 2, true); - Wait.assertTrue(() -> server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100); + Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() == 1, 2000, 100); producer.disconnect(); producer.close(); @@ -821,9 +836,9 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { producer.disconnect(); producer.close(); - Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC).getMessageCount(), 1000, 100); + Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount(), 1000, 100); Wait.assertEquals(1L, () -> server.locateQueue("EXPIRY").getMessageCount(), 3000, 100); - Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC).getMessageCount(), 1000, 100); + Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount(), 1000, 100); consumer.connect(options); assertFalse(latch.await(1, TimeUnit.SECONDS)); @@ -874,7 +889,7 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { producer.disconnect(); producer.close(); - Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC).getMessageCount(), 500, 100); + Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount(), 500, 100); Thread.sleep(SLEEP); @@ -1571,7 +1586,8 @@ public void testReceiveMaximum() throws Exception { final String TOPIC = this.getTopicName(); final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT); - MqttAsyncClient consumer = createAsyncPahoClient(RandomUtil.randomString()); + final String CONSUMER_ID = "consumer"; + MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID); MqttConnectionOptions options = new MqttConnectionOptions(); options.setReceiveMaximum(RECEIVE_MAXIMUM); consumer.connect(options).waitForCompletion(); @@ -1589,11 +1605,11 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { for (int i = 0; i < MESSAGE_COUNT; i++) { producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), (RandomUtil.randomPositiveInt() % 2) + 1, false); } - Wait.assertEquals((long) MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100); + Wait.assertEquals((long) MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100); producer.disconnect(); producer.close(); - Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC).getMessageCount(), 15000, 100); + Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount(), 15000, 100); assertTrue(latch.await(15, TimeUnit.SECONDS)); assertFalse(failed.get()); consumer.disconnect(); @@ -1633,7 +1649,8 @@ public void testReceiveMaximumQoS0() throws Exception { server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor); final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT); - MqttAsyncClient consumer = createAsyncPahoClient(RandomUtil.randomString()); + final String CONSUMER_ID = "consumer"; + MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID); MqttConnectionOptions options = new MqttConnectionOptions(); options.setReceiveMaximum(RECEIVE_MAXIMUM); consumer.connect(options).waitForCompletion(); @@ -1651,11 +1668,11 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { for (int i = 0; i < MESSAGE_COUNT; i++) { producer.publish(TOPIC, ("foo" + i).getBytes(StandardCharsets.UTF_8), 0, false); } - Wait.assertEquals((long) MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100); + Wait.assertEquals((long) MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100); producer.disconnect(); producer.close(); - Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC).getMessageCount(), 8000, 100); + Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessageCount(), 8000, 100); assertTrue(latch.await(8, TimeUnit.SECONDS)); assertTrue(succeeded.get()); consumer.disconnect(); @@ -1675,6 +1692,7 @@ public void testPacketDelayReceiveMaximum() throws Exception { final int MESSAGE_COUNT = 2; final int RECEIVE_MAXIMUM = 1; final String TOPIC = this.getTopicName(); + final String CONSUMER_ID = "consumer"; final AtomicBoolean messageArrived = new AtomicBoolean(false); MQTTInterceptor outgoingInterceptor = (packet, connection) -> { @@ -1686,7 +1704,7 @@ public void testPacketDelayReceiveMaximum() throws Exception { server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor); final CountDownLatch latch = new CountDownLatch(1); - MqttClient consumer = createPahoClient("consumer"); + MqttClient consumer = createPahoClient(CONSUMER_ID); MqttConnectionOptions options = new MqttConnectionOptions(); options.setReceiveMaximum(RECEIVE_MAXIMUM); options.setKeepAliveInterval(2); @@ -1705,7 +1723,7 @@ public void messageArrived(String topic, MqttMessage message) throws Exception { for (int i = 0; i < MESSAGE_COUNT; i++) { producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 2, false); } - Wait.assertEquals((long) MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100); + Wait.assertEquals((long) MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100); producer.disconnect(); producer.close(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java index a00c30a9f14..e2e69612505 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java @@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes; +import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil; import org.apache.activemq.artemis.core.security.CheckType; import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport; import org.apache.activemq.artemis.tests.util.RandomUtil; @@ -76,6 +77,7 @@ public void testCreateAddressAuthorizationFailure() throws Exception { @Test(timeout = DEFAULT_TIMEOUT) public void testSendAuthorizationFailure() throws Exception { final String CLIENT_ID = "publisher"; + final String TOPIC = "/foo"; final CountDownLatch latch = new CountDownLatch(1); MqttConnectionOptions options = new MqttConnectionOptionsBuilder() .username(createAddressUser) @@ -91,7 +93,7 @@ public void testSendAuthorizationFailure() throws Exception { }); try { - client.publish("/foo", new byte[0], 2, false); + client.publish(TOPIC, new byte[0], 2, false); fail("Publishing should have failed with a security problem"); } catch (MqttException e) { assertEquals(MQTTReasonCodes.NOT_AUTHORIZED, (byte) e.getReasonCode()); @@ -103,7 +105,7 @@ public void testSendAuthorizationFailure() throws Exception { assertFalse(client.isConnected()); - Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(".foo")) != null, 2000, 100); + Wait.assertTrue(() -> server.getAddressInfo(SimpleString.toSimpleString(MQTTUtil.getCoreAddressFromMqttTopic(TOPIC, server.getConfiguration().getWildcardConfiguration()))) != null, 2000, 100); } @Test(timeout = DEFAULT_TIMEOUT) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java index 9eaad18e9b0..14d7de37b0a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java @@ -92,17 +92,17 @@ public void testAuthorizationSuccess() throws Exception { @Test(timeout = DEFAULT_TIMEOUT) public void testSubscriptionQueueRemoved() throws Exception { - final String CLIENT_ID = "consumer"; + final String CONSUMER_ID = "consumer"; MqttConnectionOptions options = new MqttConnectionOptionsBuilder() .username(noDeleteUser) .password(noDeletePass.getBytes(StandardCharsets.UTF_8)) .build(); - MqttClient client = createPahoClient(CLIENT_ID); + MqttClient client = createPahoClient(CONSUMER_ID); client.connect(options); client.subscribe(getTopicName(), 0).waitForCompletion(); client.disconnect(); - Wait.assertTrue(() -> getSubscriptionQueue(getTopicName()) == null, 2000, 100); + Wait.assertTrue(() -> getSubscriptionQueue(getTopicName(), CONSUMER_ID) == null, 2000, 100); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java index d1514b0076a..2f06512d2d8 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java @@ -92,11 +92,12 @@ protected void configureBrokerSecurity(ActiveMQServer server) { */ @Test(timeout = DEFAULT_TIMEOUT) public void testSimpleSendReceive() throws Exception { - String topic = RandomUtil.randomString(); + final String topic = RandomUtil.randomString(); + final String clientId = "subscriber"; byte[] body = RandomUtil.randomBytes(32); CountDownLatch latch = new CountDownLatch(1); - MqttClient subscriber = createPahoClient(protocol,"subscriber"); + MqttClient subscriber = createPahoClient(protocol, clientId); subscriber.connect(getSslMqttConnectOptions()); subscriber.setCallback(new DefaultMqttCallback() { @Override @@ -107,8 +108,8 @@ public void messageArrived(String topic, MqttMessage message) { }); subscriber.subscribe(topic, AT_LEAST_ONCE); - Wait.assertTrue(() -> getSubscriptionQueue(topic) != null, 2000, 100); - Wait.assertEquals(1, () -> getSubscriptionQueue(topic).getConsumerCount(), 2000, 100); + Wait.assertTrue(() -> getSubscriptionQueue(topic, clientId) != null, 2000, 100); + Wait.assertEquals(1, () -> getSubscriptionQueue(topic, clientId).getConsumerCount(), 2000, 100); MqttClient producer = createPahoClient(protocol,"producer"); producer.connect(getSslMqttConnectOptions());