Skip to content
Permalink
Browse files
This closes #4180
  • Loading branch information
jbertram committed Aug 17, 2022
2 parents aa11a82 + 682f505 commit 05b3879cba9bb295a1ddcde7e069ae04f1c8383e
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 5 deletions.
@@ -80,4 +80,8 @@ public interface MQTTLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 834006, value = "Failed to publish MQTT message: {0}.", format = Message.Format.MESSAGE_FORMAT)
void failedToPublishMqttMessage(String exceptionMessage, @Cause Throwable t);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 834007, value = "Authorization failure sending will message: {0}", format = Message.Format.MESSAGE_FORMAT)
void authorizationFailureSendingWillMessage(String message);
}
@@ -214,11 +214,7 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception

Transaction tx = session.getServerSession().newTransaction();
try {
if (internal) {
session.getServer().getPostOffice().route(serverMessage, tx, true);
} else {
session.getServerSession().send(tx, serverMessage, true, false);
}
session.getServerSession().send(tx, serverMessage, true, false);

if (message.fixedHeader().isRetain()) {
ByteBuf payload = message.payload();
@@ -228,6 +224,9 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception
tx.commit();
} catch (ActiveMQSecurityException e) {
tx.rollback();
if (internal) {
throw e;
}
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendMessageAck(internal, qos, messageId, MQTTReasonCodes.NOT_AUTHORIZED);
return;
@@ -25,6 +25,7 @@
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -277,6 +278,8 @@ public void sendWillMessage() {
getMqttPublishManager().sendToQueue(publishMessage, true);
state.setWillSent(true);
state.setWillMessage(null);
} catch (ActiveMQSecurityException e) {
MQTTLogger.LOGGER.authorizationFailureSendingWillMessage(e.getMessage());
} catch (Exception e) {
MQTTLogger.LOGGER.errorSendingWillMessage(e);
}
@@ -18,13 +18,17 @@
package org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttException;
import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.jboss.logging.Logger;
import org.junit.Test;

@@ -83,4 +87,47 @@ public void testAuthorizationSuccess() throws Exception {

client.isConnected();
}

@Test(timeout = DEFAULT_TIMEOUT)
public void testWillAuthorizationSuccess() throws Exception {
internalTestWillAuthorization(fullUser, fullPass, true);
}

@Test(timeout = DEFAULT_TIMEOUT)
public void testWillAuthorizationFailure() throws Exception {
internalTestWillAuthorization(noprivUser, noprivPass, false);
}

private void internalTestWillAuthorization(String username, String password, boolean succeed) throws Exception {
final byte[] WILL = RandomUtil.randomBytes();
final String TOPIC = RandomUtil.randomString();

// consumer of the will message
MqttClient client1 = createPahoClient("willConsumer");
CountDownLatch latch = new CountDownLatch(1);
client1.setCallback(new DefaultMqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) {
latch.countDown();
}
});
MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
.username(fullUser)
.password(fullPass.getBytes(StandardCharsets.UTF_8))
.build();
client1.connect(options);
client1.subscribe(TOPIC, 1);

// consumer to generate the will
MqttClient client2 = createPahoClient("willGenerator");
options = new MqttConnectionOptionsBuilder()
.username(username)
.password(password.getBytes(StandardCharsets.UTF_8))
.will(TOPIC, new MqttMessage(WILL))
.build();
client2.connect(options);
client2.disconnectForcibly(0, 0, false);

assertEquals(succeed, latch.await(2, TimeUnit.SECONDS));
}
}

0 comments on commit 05b3879

Please sign in to comment.