From 48b9573c66d428722fab172290865b986573f764 Mon Sep 17 00:00:00 2001 From: "Otavio R. Piske" Date: Mon, 14 Aug 2017 19:12:39 +0200 Subject: [PATCH] Implement support for intercepting additional MQTT control packets Previously, only the PUBLISH packet was intercepted. This patch modifies the code to add support for the other incoming/outgoing MQTT control packets. --- .../protocol/mqtt/MQTTProtocolHandler.java | 25 +++++++++++++++++++ .../mqtt/example/InterceptorExample.java | 6 ++--- .../mqtt/example/SimpleMQTTInterceptor.java | 9 +++++++ .../resources/activemq/server0/broker.xml | 4 +++ 4 files changed, 41 insertions(+), 3 deletions(-) diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index 7c14403b3bf..0c0be01f148 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -172,6 +172,8 @@ void handleConnect(MqttConnectMessage connect, ChannelHandlerContext ctx) throws String clientId = connect.payload().clientIdentifier(); session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession()); + + this.protocolManager.invokeIncoming(connect, this.connection); } void disconnect(boolean error) { @@ -183,6 +185,7 @@ void sendConnack(MqttConnectReturnCode returnCode) { MqttConnAckVariableHeader varHeader = new MqttConnAckVariableHeader(returnCode, true); MqttConnAckMessage message = new MqttConnAckMessage(fixedHeader, varHeader); + this.protocolManager.invokeOutgoing(message, this.connection); ctx.write(message); ctx.flush(); } @@ -225,30 +228,43 @@ void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageTyp MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, qos, // Spec requires 01 in header for rel false, 0); MqttPubAckMessage rel = new MqttPubAckMessage(fixedHeader, MqttMessageIdVariableHeader.from(messageId)); + + this.protocolManager.invokeOutgoing(rel, this.connection); + ctx.write(rel); ctx.flush(); } void handlePuback(MqttPubAckMessage message) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + session.getMqttPublishManager().handlePubAck(message.variableHeader().messageId()); } void handlePubrec(MqttMessage message) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); session.getMqttPublishManager().handlePubRec(messageId); } void handlePubrel(MqttMessage message) { + this.protocolManager.invokeIncoming(message, this.connection); + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); session.getMqttPublishManager().handlePubRel(messageId); } void handlePubcomp(MqttMessage message) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + int messageId = ((MqttMessageIdVariableHeader) message.variableHeader()).messageId(); session.getMqttPublishManager().handlePubComp(messageId); } void handleSubscribe(MqttSubscribeMessage message, ChannelHandlerContext ctx) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + MQTTSubscriptionManager subscriptionManager = session.getSubscriptionManager(); int[] qos = subscriptionManager.addSubscriptions(message.payload().topicSubscriptions()); @@ -264,6 +280,8 @@ void handleSuback(MqttSubAckMessage message) { } void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception { + this.protocolManager.invokeIncoming(message, this.connection); + session.getSubscriptionManager().removeSubscriptions(message.payload().topics()); MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0); MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader()); @@ -273,10 +291,14 @@ void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception { } void handleUnsuback(MqttUnsubAckMessage message) { + this.protocolManager.invokeOutgoing(message, this.connection); + disconnect(true); } void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) { + this.protocolManager.invokeIncoming(message, this.connection); + MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)); MQTTUtil.logMessage(session.getSessionState(), pingResp, false); ctx.write(pingResp); @@ -288,6 +310,8 @@ void handlePingresp(MqttMessage message) { } void handleDisconnect(MqttMessage message) { + this.protocolManager.invokeIncoming(message, this.connection); + disconnect(false); } @@ -296,6 +320,7 @@ protected int send(int messageId, String topicName, int qosLevel, boolean isReta MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, redelivery, MqttQoS.valueOf(qosLevel), isRetain, 0); MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topicName, messageId); MqttMessage publish = new MqttPublishMessage(header, varHeader, payload); + this.protocolManager.invokeOutgoing(publish, connection); MQTTUtil.logMessage(session.getSessionState(), publish, false); diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java index 59265530313..4fb5abf5300 100644 --- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java +++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/InterceptorExample.java @@ -41,20 +41,20 @@ public static void main(final String[] args) throws Exception { System.out.println("Connected to Artemis"); // Subscribe to a topic - Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.AT_LEAST_ONCE)}; + Topic[] topics = {new Topic("mqtt/example/interceptor", QoS.EXACTLY_ONCE)}; connection.subscribe(topics); System.out.println("Subscribed to topics."); // Publish message String payload1 = "This is message 1"; - connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.AT_LEAST_ONCE, false); + connection.publish("mqtt/example/interceptor", payload1.getBytes(), QoS.EXACTLY_ONCE, false); System.out.println("Sent message"); // Receive the sent message Message message1 = connection.receive(5, TimeUnit.SECONDS); - + String messagePayload = new String(message1.getPayload(), StandardCharsets.UTF_8); System.out.println("Received message: " + messagePayload); diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java index 677328c73bd..1b7b482e224 100644 --- a/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java +++ b/examples/features/standard/interceptor-client-mqtt/src/main/java/org/apache/activemq/artemis/mqtt/example/SimpleMQTTInterceptor.java @@ -19,6 +19,7 @@ import java.nio.charset.Charset; import io.netty.handler.codec.mqtt.MqttPublishMessage; +import io.netty.handler.codec.mqtt.MqttConnectMessage; import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor; @@ -36,7 +37,9 @@ public class SimpleMQTTInterceptor implements MQTTInterceptor { public boolean intercept(final MqttMessage mqttMessage, RemotingConnection connection) { System.out.println("MQTT Interceptor gets called "); + System.out.println("A MQTT control packet was intercepted " + mqttMessage.fixedHeader().messageType()); + // If you need to handle an specific packet type: if (mqttMessage instanceof MqttPublishMessage) { MqttPublishMessage message = (MqttPublishMessage) mqttMessage; @@ -49,6 +52,12 @@ public boolean intercept(final MqttMessage mqttMessage, RemotingConnection conne message.payload().setBytes(0, modifiedMessage.getBytes()); } + else { + if (mqttMessage instanceof MqttConnectMessage) { + MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage; + System.out.println("A MQTT CONNECT control packet was intercepted " + connectMessage); + } + } // We return true which means "call next interceptor" (if there is one) or target. diff --git a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml index f93a4045e83..9318e0ca3d2 100644 --- a/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml +++ b/examples/features/standard/interceptor-client-mqtt/src/main/resources/activemq/server0/broker.xml @@ -39,6 +39,10 @@ under the License. org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor + + org.apache.activemq.artemis.mqtt.example.SimpleMQTTInterceptor + + ./data/bindings ./data/journal