From b21c04e7cf2881fedef711b7d798bdb59bf91a85 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Wed, 20 Jul 2016 13:24:09 +0100 Subject: [PATCH 1/3] ARTEMIS-335 Remove dead code from MQTTPublishManager --- .../core/protocol/mqtt/MQTTPublishManager.java | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java index fc61dd9fa9d..c985c0f3fde 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java @@ -90,16 +90,7 @@ boolean isManagementConsumer(ServerConsumer consumer) { } private int generateMqttId(int qos) { - if (qos == 1) { - return session.getSessionState().generateId(); - } - else { - Integer mqttid = session.getSessionState().generateId(); - if (mqttid == null) { - mqttid = (int) session.getServer().getStorageManager().generateID(); - } - return mqttid; - } + return session.getSessionState().generateId(); } /** From 69db92c82b53e3533fa35332bcdef568d9e31321 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Wed, 20 Jul 2016 13:21:53 +0100 Subject: [PATCH 2/3] ARTEMIS-641 Enable filtering on address in server consumer --- .../apache/activemq/artemis/api/core/FilterConstants.java | 6 ++++++ .../activemq/artemis/core/filter/impl/FilterImpl.java | 3 +++ 2 files changed, 9 insertions(+) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java index 0fbd35f6f85..27aa9b4889f 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/FilterConstants.java @@ -62,11 +62,17 @@ public final class FilterConstants { */ public static final SimpleString ACTIVEMQ_SIZE = new SimpleString("AMQSize"); + /** + * Name of the ActiveMQ Artemis Address header + */ + public static final SimpleString ACTIVEMQ_ADDRESS = new SimpleString("AMQAddress"); + /** * All ActiveMQ Artemis headers are prepended by this prefix. */ public static final SimpleString ACTIVEMQ_PREFIX = new SimpleString("AMQ"); + private FilterConstants() { // Utility class } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java index 77bf7c5eb8e..f807a183d12 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/filter/impl/FilterImpl.java @@ -169,6 +169,9 @@ else if (FilterConstants.ACTIVEMQ_EXPIRATION.equals(fieldName)) { else if (FilterConstants.ACTIVEMQ_SIZE.equals(fieldName)) { return msg.getEncodeSize(); } + else if (FilterConstants.ACTIVEMQ_ADDRESS.equals(fieldName)) { + return msg.getAddress(); + } else { return null; } From 24314874f41a41d63fa49825edcaf583df97daa5 Mon Sep 17 00:00:00 2001 From: Martyn Taylor Date: Wed, 20 Jul 2016 13:30:34 +0100 Subject: [PATCH 3/3] ARTEMIS-641 filter out management notifications in MQTT --- .../mqtt/MQTTSubscriptionManager.java | 20 +++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java index 954a1bd1f81..cbe64a6a533 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentMap; import io.netty.handler.codec.mqtt.MqttTopicSubscription; +import org.apache.activemq.artemis.api.core.FilterConstants; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.ServerConsumer; @@ -37,11 +38,27 @@ public class MQTTSubscriptionManager { private MQTTLogger log = MQTTLogger.LOGGER; + // We filter out Artemis managment messages and notifications + private SimpleString managementFilter; + public MQTTSubscriptionManager(MQTTSession session) { this.session = session; consumers = new ConcurrentHashMap<>(); consumerQoSLevels = new ConcurrentHashMap<>(); + + // Create filter string to ignore management messages + StringBuilder builder = new StringBuilder(); + builder.append("NOT (("); + builder.append(FilterConstants.ACTIVEMQ_ADDRESS); + builder.append(" = '"); + builder.append(session.getServer().getConfiguration().getManagementAddress()); + builder.append("') OR ("); + builder.append(FilterConstants.ACTIVEMQ_ADDRESS); + builder.append(" = '"); + builder.append(session.getServer().getConfiguration().getManagementNotificationAddress()); + builder.append("'))"); + managementFilter = new SimpleString(builder.toString()); } synchronized void start() throws Exception { @@ -85,8 +102,7 @@ private SimpleString createQueueForSubscription(String topic, int qos) throws Ex */ private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception { long cid = session.getServer().getStorageManager().generateID(); - - ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1); + ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, managementFilter, false, true, -1); consumer.setStarted(true); consumers.put(topic, consumer);