Skip to content

Commit

Permalink
ARTEMIS-4532 MQTT-to-core wildcard conversion is broken
Browse files Browse the repository at this point in the history
Currently when an MQTT topic filter contains characters from the
configured wildcard syntax the conversion to/from this syntax breaks.

For example, when using the default wildcard syntax if an MQTT topic
filter contains a . the conversion from the MQTT wildcard syntax to the
core wildcard syntax and back will result in the `.` being replaced with
a `/.`.

This commit fixes that plus a few other things...

 - Implements proper conversions to/from one WildcardConfiguration to
   another.
 - Refactors the MQTT code which invokes these conversion methods. This
   includes simplifying a lot of test code.
 - Adds lots of tests for everything.
 - Clarifies some variable naming to better distinguish between core and
   MQTT.
  • Loading branch information
jbertram authored and gemmellr committed Jan 29, 2024
1 parent ddac006 commit 513b782
Show file tree
Hide file tree
Showing 25 changed files with 641 additions and 240 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception
}
}
}
String coreAddress = MQTTUtil.convertMqttTopicFilterToCore(topic, session.getWildcardConfiguration());
String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(topic, session.getWildcardConfiguration());
SimpleString address = SimpleString.toSimpleString(coreAddress, session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
Message serverMessage = MQTTUtil.createServerMessageFromByteBuf(session, address, message);
int qos = message.fixedHeader().qosLevel().value();
Expand Down Expand Up @@ -392,7 +392,7 @@ void handlePubAck(int messageId) throws Exception {
}

private boolean publishToClient(int messageId, ICoreMessage message, int deliveryCount, int qos, long consumerId) throws Exception {
String address = MQTTUtil.convertCoreAddressToMqttTopicFilter(message.getAddress() == null ? "" : message.getAddress(), session.getWildcardConfiguration());
String topic = MQTTUtil.getMqttTopicFromCoreAddress(message.getAddress() == null ? "" : message.getAddress(), session.getWildcardConfiguration());

ByteBuf payload;
switch (message.getType()) {
Expand All @@ -418,29 +418,29 @@ private boolean publishToClient(int messageId, ICoreMessage message, int deliver

if (session.getVersion() == MQTTVersion.MQTT_5) {
if (!isRetain && message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY)) {
MqttTopicSubscription sub = session.getState().getSubscription(message.getAddress());
MqttTopicSubscription sub = session.getState().getSubscription(topic);
if (sub != null && sub.option().isRetainAsPublished()) {
isRetain = true;
}
}

if (session.getState().getClientTopicAliasMaximum() != null) {
Integer alias = session.getState().getServerTopicAlias(address);
Integer alias = session.getState().getServerTopicAlias(topic);
if (alias == null) {
alias = session.getState().addServerTopicAlias(address);
alias = session.getState().addServerTopicAlias(topic);
if (alias != null) {
mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
}
} else {
mqttProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
address = "";
topic = "";
}
}
}

int remainingLength = MQTTUtil.calculateRemainingLength(address, mqttProperties, payload);
int remainingLength = MQTTUtil.calculateRemainingLength(topic, mqttProperties, payload);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qos), isRetain, remainingLength);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(address, messageId, mqttProperties);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic, messageId, mqttProperties);
MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, payload);

int maxSize = session.getState().getClientMaxPacketSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public MQTTRetainMessageManager(MQTTSession session) {
* the retained queue and the previous retain message consumed to remove it from the queue.
*/
void handleRetainedMessage(Message messageParameter, String address, boolean reset, Transaction tx) throws Exception {
SimpleString retainAddress = new SimpleString(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, address, session.getWildcardConfiguration()));
String retainAddress = MQTTUtil.getCoreRetainAddressFromMqttTopic(address, session.getWildcardConfiguration());

Queue queue = session.getServer().locateQueue(retainAddress);
if (queue == null) {
Expand All @@ -65,7 +65,7 @@ void handleRetainedMessage(Message messageParameter, String address, boolean res

void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
// The address filter that matches all retained message queues.
String retainAddress = MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, address, session.getWildcardConfiguration());
String retainAddress = MQTTUtil.getCoreRetainAddressFromMqttTopic(address, session.getWildcardConfiguration());
BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));

// Iterate over all matching retain queues and add the queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public Collection<Pair<MqttTopicSubscription, Integer>> getSubscriptionsPlusID()
public boolean addSubscription(MqttTopicSubscription subscription, WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) throws Exception {
// synchronized to prevent race with removeSubscription
synchronized (subscriptions) {
addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCore(subscription.topicName(), wildcardConfiguration).toString(), new ConcurrentHashMap<>());
addressMessageMap.putIfAbsent(MQTTUtil.getCoreAddressFromMqttTopic(subscription.topicName(), wildcardConfiguration), new ConcurrentHashMap<>());

Pair<MqttTopicSubscription, Integer> existingSubscription = subscriptions.get(subscription.topicName());
if (existingSubscription != null) {
Expand Down Expand Up @@ -237,7 +237,7 @@ public Pair<MqttTopicSubscription, Integer> getSubscriptionPlusID(String address
}

public List<Integer> getMatchingSubscriptionIdentifiers(String address) {
address = MQTTUtil.convertCoreAddressToMqttTopicFilter(address, session.getServer().getConfiguration().getWildcardConfiguration());
address = MQTTUtil.getMqttTopicFromCoreAddress(address, session.getServer().getConfiguration().getWildcardConfiguration());
List<Integer> result = null;
for (Pair<MqttTopicSubscription, Integer> pair : subscriptions.values()) {
Pattern pattern = Match.createPattern(pair.getA().topicName(), MQTTUtil.MQTT_WILDCARD, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.FilterConstants;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
Expand All @@ -40,7 +39,6 @@
import org.apache.activemq.artemis.utils.CompositeAddress;

import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.DOLLAR;
import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.SLASH;
import static org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING;

public class MQTTSubscriptionManager {
Expand Down Expand Up @@ -106,11 +104,10 @@ synchronized void start() throws Exception {
private void addSubscription(MqttTopicSubscription subscription, Integer subscriptionIdentifier, boolean initialStart) throws Exception {
String rawTopicName = CompositeAddress.extractAddressName(subscription.topicName());
String parsedTopicName = MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
int qos = subscription.qualityOfService().value();
String coreAddress = MQTTUtil.convertMqttTopicFilterToCore(parsedTopicName, session.getWildcardConfiguration());
String coreQueue = getQueueNameForTopic(rawTopicName).toString();

Queue q = createQueueForSubscription(coreAddress, coreQueue);
Queue q = createQueueForSubscription(rawTopicName, parsedTopicName);

int qos = subscription.qualityOfService().value();

try {
if (initialStart) {
Expand Down Expand Up @@ -140,16 +137,6 @@ private void addSubscription(MqttTopicSubscription subscription, Integer subscri
}
}

private String parseTopicName(String rawTopicName) {
String parsedTopicName = rawTopicName;

// if using a shared subscription then parse
if (rawTopicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
parsedTopicName = rawTopicName.substring(rawTopicName.indexOf(SLASH, rawTopicName.indexOf(SLASH) + 1) + 1);
}
return parsedTopicName;
}

synchronized void stop() throws Exception {
for (ServerConsumer consumer : consumers.values()) {
consumer.setStarted(false);
Expand All @@ -159,13 +146,16 @@ synchronized void stop() throws Exception {
}
}

private Queue createQueueForSubscription(String address, String queueName) throws Exception {
private Queue createQueueForSubscription(String rawTopicName, String parsedTopicName) throws Exception {
String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(parsedTopicName, session.getWildcardConfiguration());
String coreQueue = MQTTUtil.getCoreQueueFromMqttTopic(rawTopicName, session.getState().getClientId(), session.getWildcardConfiguration());

// check to see if a subscription queue already exists.
Queue q = session.getServer().locateQueue(queueName);
Queue q = session.getServer().locateQueue(coreQueue);

// The queue does not exist so we need to create it.
if (q == null) {
SimpleString sAddress = SimpleString.toSimpleString(address);
SimpleString sAddress = SimpleString.toSimpleString(coreAddress);

// Check we can auto create queues.
BindingQueryResult bindingQueryResult = session.getServerSession().executeBindingQuery(sAddress);
Expand All @@ -182,7 +172,7 @@ private Queue createQueueForSubscription(String address, String queueName) throw
addressInfo = session.getServerSession().createAddress(sAddress,
RoutingType.MULTICAST, true);
}
return findOrCreateQueue(bindingQueryResult, addressInfo, queueName);
return findOrCreateQueue(bindingQueryResult, addressInfo, coreQueue);
}
return q;
}
Expand Down Expand Up @@ -233,13 +223,13 @@ private SimpleString getMessageFilter(SimpleString addressName) {
}
}

private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos, boolean noLocal, Long existingConsumerId) throws Exception {
private void createConsumerForSubscriptionQueue(Queue queue, String topicFilter, int qos, boolean noLocal, Long existingConsumerId) throws Exception {
long cid = existingConsumerId != null ? existingConsumerId : session.getServer().getStorageManager().generateID();

// for noLocal support we use the MQTT *client id* rather than the connection ID, but we still use the existing property name
ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), noLocal ? SimpleString.toSimpleString(CONNECTION_ID_PROPERTY_NAME_STRING + " <> '" + session.getState().getClientId() + "'") : null, false, false, -1);

ServerConsumer existingConsumer = consumers.put(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic).getB(), consumer);
ServerConsumer existingConsumer = consumers.put(topicFilter, consumer);
if (existingConsumer != null) {
existingConsumer.setStarted(false);
existingConsumer.close(false);
Expand All @@ -257,22 +247,22 @@ short[] removeSubscriptions(List<String> topics, boolean enforceSecurity) throws
synchronized (state) {
reasonCodes = new short[topics.size()];
for (int i = 0; i < topics.size(); i++) {
if (session.getState().getSubscription(topics.get(i)) == null) {
if (state.getSubscription(topics.get(i)) == null) {
reasonCodes[i] = MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED;
continue;
}

short reasonCode = MQTTReasonCodes.SUCCESS;

try {
session.getState().removeSubscription(topics.get(i));
state.removeSubscription(topics.get(i));
ServerConsumer removed = consumers.remove(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topics.get(i)).getB());
if (removed != null) {
removed.close(false);
consumerQoSLevels.remove(removed.getID());
}

SimpleString internalQueueName = SimpleString.toSimpleString(getQueueNameForTopic(topics.get(i)));
SimpleString internalQueueName = SimpleString.toSimpleString(MQTTUtil.getCoreQueueFromMqttTopic(topics.get(i), state.getClientId(), session.getServer().getConfiguration().getWildcardConfiguration()));
Queue queue = session.getServer().locateQueue(internalQueueName);
if (queue != null) {
if (queue.isConfigurationManaged()) {
Expand All @@ -296,17 +286,6 @@ short[] removeSubscriptions(List<String> topics, boolean enforceSecurity) throws
return reasonCodes;
}

private String getQueueNameForTopic(String topic) {
String queueName;
if (MQTTUtil.isSharedSubscription(topic)) {
Pair<String, String> decomposed = MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic);
queueName = decomposed.getA().concat(".").concat(decomposed.getB());
} else {
queueName = session.getState().getClientId().concat(".").concat(topic);
}
return MQTTUtil.convertMqttTopicFilterToCore(queueName, session.getWildcardConfiguration());
}

/**
* As per MQTT Spec. Subscribes this client to a number of MQTT topics.
*
Expand Down
Loading

0 comments on commit 513b782

Please sign in to comment.