Skip to content
Permalink
Browse files
ARTEMIS-3777 fix MQTT request/response + nolocal
Removing the connection ID property from the actual *message* breaks the
nolocal functionality. Removing the property isn't necessary in the
first place so this commit reomves that code.
  • Loading branch information
jbertram authored and clebertsuconic committed Apr 18, 2022
1 parent 1e13979 commit 7d11cf81ba0c89934eb48ec2653a5ee79e485e7e
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 4 deletions.
@@ -45,7 +45,6 @@
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.jboss.logging.Logger;

import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
@@ -395,9 +394,6 @@ private boolean publishToClient(int messageId, ICoreMessage message, int deliver
isRetain = false;
}

// [MQTT-3.8.3-3] remove property used for no-local implementation
message.removeProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);

if (session.getState().getClientTopicAliasMaximum() != null) {
Integer alias = session.getState().getServerTopicAlias(address);
if (alias == null) {
@@ -17,6 +17,7 @@

package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -34,6 +35,7 @@
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.eclipse.paho.mqttv5.common.MqttSubscription;
import org.eclipse.paho.mqttv5.common.packet.MqttProperties;
import org.eclipse.paho.mqttv5.common.packet.MqttSubAck;
import org.jboss.logging.Logger;
import org.junit.Test;
@@ -82,6 +84,75 @@ public void testSubscribeNoLocal() throws Exception {
client.close();
}

/*
* [MQTT-3.8.3-3]
*
* This test was adapted from Test.test_request_response in client_test5.py at https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability
*
* It involves 2 clients subscribing to and performing a request/response on the same topic so it's imperative they
* don't receive the messages that they send themselves.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testRequestResponseNoLocal() throws Exception {
final String TOPIC = RandomUtil.randomString();
final String REQUEST = "request";
final String RESPONSE = "response";
final CountDownLatch aclientLatch = new CountDownLatch(2);
final CountDownLatch bclientLatch = new CountDownLatch(1);

MqttClient aclient = createPahoClient("aclientid");
aclient.connect();
aclient.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
assertEquals(RESPONSE, new String(message.getPayload()));
aclientLatch.countDown();
}
});

MqttClient bclient = createPahoClient("bclientid");
bclient.connect();
bclient.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
assertEquals(REQUEST, new String(message.getPayload()));
bclientLatch.countDown();
MqttMessage m = new MqttMessage();
m.setPayload(RESPONSE.getBytes(StandardCharsets.UTF_8));
m.setQos(1);
MqttProperties properties = new MqttProperties();
properties.setResponseTopic(TOPIC);
properties.setCorrelationData("334".getBytes(StandardCharsets.UTF_8));
m.setProperties(properties);
bclient.publish(TOPIC, m);
}
});

MqttSubscription sub = new MqttSubscription(TOPIC, 2);
sub.setNoLocal(true);
aclient.subscribe(new MqttSubscription[]{sub});
bclient.subscribe(new MqttSubscription[]{sub});

MqttMessage m = new MqttMessage();
m.setPayload(REQUEST.getBytes(StandardCharsets.UTF_8));
m.setQos(1);
MqttProperties properties = new MqttProperties();
properties.setResponseTopic(TOPIC);
properties.setCorrelationData("334".getBytes(StandardCharsets.UTF_8));
m.setProperties(properties);
aclient.publish(TOPIC, m);

assertTrue(bclientLatch.await(2, TimeUnit.SECONDS));

Wait.assertEquals(1L, () -> aclientLatch.getCount(), 2000, 100);
assertFalse(aclientLatch.await(2, TimeUnit.SECONDS));

aclient.disconnect();
aclient.close();
bclient.disconnect();
bclient.close();
}

/*
* [MQTT-3.8.4-1] When the Server receives a SUBSCRIBE packet from a Client, the Server MUST respond with a SUBACK packet.
*/

0 comments on commit 7d11cf8

Please sign in to comment.