Skip to content

Commit

Permalink
ARTEMIS-4370 update existing topic alias for MQTT 5 publisher
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Jul 24, 2023
1 parent 04f29e0 commit 22e3b09
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,27 +189,31 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception
String topic = message.variableHeader().topicName();
if (session.getVersion() == MQTTVersion.MQTT_5) {
Integer alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), TOPIC_ALIAS);
Integer topicAliasMax = session.getProtocolManager().getTopicAliasMaximum();
if (alias != null) {
Integer topicAliasMax = session.getProtocolManager().getTopicAliasMaximum();
if (alias == 0) {
// [MQTT-3.3.2-8]
throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
} else if (topicAliasMax != null && alias > topicAliasMax) {
// [MQTT-3.3.2-9]
throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
} else {
topic = session.getState().getClientTopicAlias(alias);
if (topic == null) {
topic = message.variableHeader().topicName();
if (topic == null || topic.length() == 0) {
// using a topic alias with no matching topic in the state; potentially [MQTT-3.3.2-7]
throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
}
session.getState().addClientTopicAlias(alias, topic);
}

String existingTopicMapping = session.getState().getClientTopicAlias(alias);
if (existingTopicMapping == null) {
if (topic == null || topic.length() == 0) {
// using a topic alias with no matching topic in the state; potentially [MQTT-3.3.2-7]
throw new DisconnectException(MQTTReasonCodes.TOPIC_ALIAS_INVALID);
}
logger.debug("Adding new alias {} for topic {}", alias, topic);
session.getState().putClientTopicAlias(alias, topic);
} else if (topic != null && topic.length() > 0) {
logger.debug("Modifying existing alias {}. New value: {}; old value: {}", alias, topic, existingTopicMapping);
session.getState().putClientTopicAlias(alias, topic);
} else {
logger.debug("Applying topic {} for alias {}", existingTopicMapping, alias);
topic = existingTopicMapping;
}
} else {
topic = message.variableHeader().topicName();
}
}
String coreAddress = MQTTUtil.convertMqttTopicFilterToCoreAddress(topic, session.getWildcardConfiguration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ public void setClientMaxPacketSize(int clientMaxPacketSize) {
this.clientMaxPacketSize = clientMaxPacketSize;
}

public void addClientTopicAlias(Integer alias, String topicName) {
public void putClientTopicAlias(Integer alias, String topicName) {
if (clientTopicAliases == null) {
clientTopicAliases = new HashMap<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,71 @@ public void messageArrived(String topic, MqttMessage message) throws Exception {
consumer.disconnect();
}

/*
* From section 3.3.2.3.4 of the MQTT 5 specification:
*
* A sender can modify the Topic Alias mapping by sending another PUBLISH in the same Network Connection with the
* same Topic Alias value and a different non-zero length Topic Name.
*/
@Test(timeout = DEFAULT_TIMEOUT)
public void testModifiedTopicAlias() throws Exception {
final String TOPIC_1 = this.getTopicName() + "1";
final String TOPIC_2 = this.getTopicName() + "2";

MqttClient consumer1 = createPahoClient("consumer1");
CountDownLatch latch1 = new CountDownLatch(1);
consumer1.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
if (payload.equals("first")) {
latch1.countDown();
}
}
});
consumer1.connect();
consumer1.subscribe(TOPIC_1, 1);

MqttClient consumer2 = createPahoClient("consumer2");
CountDownLatch latch2 = new CountDownLatch(1);
consumer2.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String payload = new String(message.getPayload());
if (payload.equals("second")) {
latch2.countDown();
}
}
});
consumer2.connect();
consumer2.subscribe(TOPIC_2, 1);

MqttClient producer = createPahoClient("producer");
producer.connect();

MqttProperties properties = new MqttProperties();
properties.setTopicAlias(1);
MqttMessage m = new MqttMessage();
m.setProperties(properties);
m.setQos(1);
m.setRetained(false);
m.setPayload("first".getBytes(StandardCharsets.UTF_8));
producer.publish(TOPIC_1, m);
m.setPayload("second".getBytes(StandardCharsets.UTF_8));
producer.publish(TOPIC_2, m);

producer.disconnect();
producer.close();

assertTrue(latch1.await(2, TimeUnit.SECONDS));
assertTrue(latch2.await(2, TimeUnit.SECONDS));

consumer1.disconnect();
consumer1.close();
consumer2.disconnect();
consumer2.close();
}

/*
* [MQTT-3.3.2-15] The Server MUST send the Response Topic unaltered to all subscribers receiving the Application
* Message.
Expand Down

0 comments on commit 22e3b09

Please sign in to comment.