Skip to content

Commit

Permalink
ARTEMIS-2686 Fix MQTT connect message rejection
Browse files Browse the repository at this point in the history
Initialize the session state with a default value to fix a NPE, when an incoming
MQTT interceptor rejects a MqttConnectMessage.
  • Loading branch information
brusdev authored and clebertsuconic committed Apr 8, 2020
1 parent a4f0e76 commit c361704
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
Expand Up @@ -77,6 +77,8 @@ public MQTTSession(MQTTProtocolHandler protocolHandler,
subscriptionManager = new MQTTSubscriptionManager(this);
retainMessageManager = new MQTTRetainMessageManager(this);

state = MQTTSessionState.DEFAULT;

log.debug("SESSION CREATED: " + id);
}

Expand Down
Expand Up @@ -32,6 +32,8 @@

public class MQTTSessionState {

public static final MQTTSessionState DEFAULT = new MQTTSessionState(null);

private String clientId;

private final ConcurrentMap<String, MqttTopicSubscription> subscriptions = new ConcurrentHashMap<>();
Expand Down
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;

import java.util.concurrent.CountDownLatch;

import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import org.apache.activemq.artemis.api.core.ActiveMQException;
Expand Down Expand Up @@ -61,4 +64,40 @@ public boolean intercept(MqttMessage packet, RemotingConnection connection) thro
subscribeProvider.disconnect();
publishProvider.disconnect();
}

@Test(timeout = 60000)
public void testRejectedMqttConnectMessage() throws Exception {
CountDownLatch publishThreadReady = new CountDownLatch(1);

server.getRemotingService().addIncomingInterceptor((MQTTInterceptor) (packet, connection) -> {
if (packet.getClass() == MqttConnectMessage.class) {
return false;
} else {
return true;
}
});

Thread publishThread = new Thread(() -> {
MQTTClientProvider publishProvider = getMQTTClientProvider();

publishThreadReady.countDown();

try {
initializeConnection(publishProvider);
publishProvider.disconnect();
fail("The connection should be rejected!");
} catch (Exception ignore) {
}
});

publishThread.start();

publishThreadReady.await();

publishThread.join(3000);

if (publishThread.isAlive()) {
fail("The connection is stuck!");
}
}
}

0 comments on commit c361704

Please sign in to comment.