Skip to content
Permalink
Browse files
ARTEMIS-3774 support user properties on MQTT will message
  • Loading branch information
jbertram authored and clebertsuconic committed Apr 13, 2022
1 parent 9d94348 commit a6abf68ba5826c1bd217ac1a26abe9811cf509ce
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 0 deletions.
@@ -18,6 +18,7 @@
package org.apache.activemq.artemis.core.protocol.mqtt;

import java.util.UUID;
import java.util.List;

import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
@@ -126,6 +127,10 @@ void connect(MqttConnectMessage connect, String validatedUser) throws Exception
if (willDelayInterval != null) {
session.getState().setWillDelayInterval(( int) willDelayInterval.value());
}
List<? extends MqttProperties.MqttProperty> userProperties = willProperties.getProperties(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
if (userProperties != null) {
session.getState().setWillUserProperties(userProperties);
}
}
}
}
@@ -20,6 +20,7 @@
import java.util.UUID;

import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
@@ -252,12 +253,22 @@ public void setUsingServerKeepAlive(boolean usingServerKeepAlive) {

public void sendWillMessage() {
try {
MqttProperties properties;
if (state.getWillUserProperties() == null) {
properties = MqttProperties.NO_PROPERTIES;
} else {
properties = new MqttProperties();
for (MqttProperties.MqttProperty userProperty : state.getWillUserProperties()) {
properties.add(userProperty);
}
}
MqttPublishMessage publishMessage = MqttMessageBuilders.publish()
.messageId(0)
.qos(MqttQoS.valueOf(state.getWillQoSLevel()))
.retained(state.isWillRetain())
.topicName(state.getWillTopic())
.payload(state.getWillMessage())
.properties(properties)
.build();
logger.debugf("%s sending will message: %s", this, publishMessage);
getMqttPublishManager().sendToQueue(publishMessage, true);
@@ -30,6 +30,7 @@
import java.util.regex.Pattern;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
@@ -75,6 +76,8 @@ public class MQTTSessionState {

private long willDelayInterval = 0;

private List<? extends MqttProperties.MqttProperty> willUserProperties;

private boolean willSent = false;

private boolean failed = false;
@@ -278,6 +281,14 @@ public void setWillDelayInterval(long willDelayInterval) {
this.willDelayInterval = willDelayInterval;
}

public void setWillUserProperties(List<? extends MqttProperties.MqttProperty> userProperties) {
this.willUserProperties = userProperties;
}

public List<? extends MqttProperties.MqttProperty> getWillUserProperties() {
return willUserProperties;
}

public boolean isWillSent() {
return willSent;
}
@@ -22,14 +22,20 @@
import javax.jms.Message;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.UserProperty;
import org.jboss.logging.Logger;
import org.junit.Assume;
import org.junit.Test;
@@ -110,4 +116,52 @@ public void messageArrived(String topic, MqttMessage message) throws Exception {
consumer.disconnect();
consumer.close();
}

/*
* There is no normative statement in the spec about supporting user properties on will messages, but it is implied
* in various places.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testWillMessageProperties() throws Exception {
final byte[] WILL = RandomUtil.randomBytes();
final String[][] properties = new String[10][2];
for (String[] property : properties) {
property[0] = RandomUtil.randomString();
property[1] = RandomUtil.randomString();
}

// consumer of the will message
MqttClient client1 = createPahoClient("willConsumer");
CountDownLatch latch = new CountDownLatch(1);
client1.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
int i = 0;
for (UserProperty property : message.getProperties().getUserProperties()) {
assertEquals(properties[i][0], property.getKey());
assertEquals(properties[i][1], property.getValue());
i++;
}
latch.countDown();
}
});
client1.connect();
client1.subscribe("/topic/foo", 1);

// consumer to generate the will
MqttClient client2 = createPahoClient("willGenerator");
MqttProperties willMessageProperties = new MqttProperties();
List<UserProperty> userProperties = new ArrayList<>();
for (String[] property : properties) {
userProperties.add(new UserProperty(property[0], property[1]));
}
willMessageProperties.setUserProperties(userProperties);
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.will("/topic/foo", new MqttMessage(WILL))
.build();
options.setWillMessageProperties(willMessageProperties);
client2.connect(options);
client2.disconnectForcibly(0, 0, false);
assertTrue(latch.await(2, TimeUnit.SECONDS));
}
}

0 comments on commit a6abf68

Please sign in to comment.