Skip to content

Commit

Permalink
ARTEMIS-3638 Support MQTT 5
Browse files Browse the repository at this point in the history
MQTT 5 is an OASIS standard which debuted in March 2019. It boasts
numerous improvments over its predecessor (i.e. MQTT 3.1.1) which will
benefit users. These improvements are summarized in the specification
at:
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901293

The specification describes all the behavior necessary for a client or
server to conform. The spec is highlighted with special "normative"
conformance statements which distill the descriptions into concise
terms. The specification provides a helpful summary of all these
statements. See:
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901292

This commit implements all of the mandatory elements from the
specification and provides tests which are identified using the
corresponding normative conformance statement. All normative
conformance statements either have an explicit test or are noted in
comments with an explanation of why an explicit test doesn't exist. See
org.apache.activemq.artemis.tests.integration.mqtt5 for all those
details.

This commit also includes documentation about how to configure
everything related to the new MQTT 5 features.
  • Loading branch information
jbertram authored and clebertsuconic committed Feb 4, 2022
1 parent 605079d commit 8063110
Show file tree
Hide file tree
Showing 107 changed files with 9,486 additions and 605 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ public static void clearBuffer() {
}
}

// TODO look at replacing this with io.netty.buffer.ByteBufUtil.utf8Bytes(java.lang.CharSequence)
public static int calculateUTFSize(final String str) {
int calculatedLen = 0;
for (int i = 0, stringLength = str.length(); i < stringLength; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public static void assertTrue(String failureMessage, Condition condition, final
assertTrue(failureMessage, condition, duration, SLEEP_MILLIS);
}

public static void assertTrue(Condition condition, final long duration, final long sleep) throws Exception {
public static void assertTrue(Condition condition, final long duration, final long sleep) {
assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,9 @@ public static String getDefaultHapolicyBackupStrategy() {
// Whether or not to report Netty pool metrics
private static final boolean DEFAULT_NETTY_POOL_METRICS = false;

// How often (in ms) to scan for expired MQTT sessions
private static long DEFAULT_MQTT_SESSION_SCAN_INTERVAL = 500;

/**
* If true then the ActiveMQ Artemis Server will make use of any Protocol Managers that are in available on the classpath. If false then only the core protocol will be available, unless in Embedded mode where users can inject their own Protocol Managers.
*/
Expand Down Expand Up @@ -1762,4 +1765,11 @@ public static int getDefaultBridgeConcurrency() {
public static Boolean getDefaultNettyPoolMetrics() {
return DEFAULT_NETTY_POOL_METRICS;
}

/**
* How often (in ms) to scan for expired MQTT sessions
*/
public static long getMqttSessionScanInterval() {
return DEFAULT_MQTT_SESSION_SCAN_INTERVAL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,11 +252,6 @@ private ConnectionEntry internalConnectionEntry(Connection remotingConnection, b
return entry;
}

@Override
public void removeHandler(String name) {

}

@Override
public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) {
ActiveMQProtonRemotingConnection protonConnection = (ActiveMQProtonRemotingConnection) connection;
Expand Down
5 changes: 5 additions & 0 deletions artemis-protocols/artemis-mqtt-protocol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.javafaker</groupId>
<artifactId>javafaker</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,13 @@ public class MQTTConnection implements RemotingConnection {
private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>();

private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>();

private Subject subject;

private int receiveMaximum = -1;

private String protocolVersion;

public MQTTConnection(Connection transportConnection) throws Exception {
this.transportConnection = transportConnection;
this.creationTime = System.currentTimeMillis();
Expand Down Expand Up @@ -189,7 +194,6 @@ public Future asyncFail(ActiveMQException me) {

@Override
public void destroy() {
//TODO(mtaylor) ensure this properly destroys this connection.
destroyed = true;
disconnect(false);
}
Expand Down Expand Up @@ -282,7 +286,7 @@ public Subject getSubject() {
*/
@Override
public String getProtocolName() {
return MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME;
return MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME + (protocolVersion != null ? protocolVersion : "");
}

/**
Expand Down Expand Up @@ -310,4 +314,15 @@ public String getTransportLocalAddress() {
return getTransportConnection().getLocalAddress();
}

public int getReceiveMaximum() {
return receiveMaximum;
}

public void setReceiveMaximum(int maxReceive) {
this.receiveMaximum = maxReceive;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,105 +19,161 @@

import java.util.UUID;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttVersion;
import io.netty.util.CharsetUtil;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.impl.ServerSessionImpl;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;

import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.ASSIGNED_CLIENT_IDENTIFIER;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.MAXIMUM_PACKET_SIZE;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.RECEIVE_MAXIMUM;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SERVER_KEEP_ALIVE;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.TOPIC_ALIAS_MAXIMUM;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.WILL_DELAY_INTERVAL;

/**
* MQTTConnectionManager is responsible for handle Connect and Disconnect packets and any resulting behaviour of these
* events.
*/
public class MQTTConnectionManager {

private MQTTSession session;

private MQTTLogger log = MQTTLogger.LOGGER;

private boolean isWill = false;

private ByteBuf willMessage;
private static final Logger logger = Logger.getLogger(MQTTConnectionManager.class);

private String willTopic;

private int willQoSLevel;

private boolean willRetain;
private MQTTSession session;

public MQTTConnectionManager(MQTTSession session) {
this.session = session;
MQTTFailureListener failureListener = new MQTTFailureListener(this);
session.getConnection().addFailureListener(failureListener);
}

/**
* Handles the connect packet. See spec for details on each of parameters.
*/
void connect(String cId,
String username,
String password,
boolean will,
byte[] willMessage,
String willTopic,
boolean willRetain,
int willQosLevel,
boolean cleanSession, String validatedUser) throws Exception {
String clientId = validateClientId(cId, cleanSession);
if (clientId == null) {
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
session.getProtocolHandler().disconnect(true);
void connect(MqttConnectMessage connect, String validatedUser) throws Exception {
int packetVersion = connect.variableHeader().version();
if (packetVersion == MqttVersion.MQTT_5.protocolLevel()) {
session.set5(true);
session.getConnection().setProtocolVersion(Byte.toString(MqttVersion.MQTT_5.protocolLevel()));
String authenticationMethod = MQTTUtil.getProperty(String.class, connect.variableHeader().properties(), AUTHENTICATION_METHOD);

if (authenticationMethod != null) {
session.getProtocolHandler().sendConnack(MQTTReasonCodes.BAD_AUTHENTICATION_METHOD);
disconnect(true);
return;
}
}

String password = connect.payload().passwordInBytes() == null ? null : new String( connect.payload().passwordInBytes(), CharsetUtil.UTF_8);
String username = connect.payload().userName();

// the Netty codec uses "CleanSession" for both 3.1.1 "clean session" and 5 "clean start" which have slightly different semantics
boolean cleanStart = connect.variableHeader().isCleanSession();

Pair<String, Boolean> clientIdValidation = validateClientId(connect.payload().clientIdentifier(), cleanStart);
if (clientIdValidation == null) {
// this represents an invalid client ID for MQTT 5 clients
session.getProtocolHandler().sendConnack(MQTTReasonCodes.CLIENT_IDENTIFIER_NOT_VALID);
disconnect(true);
return;
} else if (clientIdValidation.getA() == null) {
// this represents an invalid client ID for MQTT 3.x clients
session.getProtocolHandler().sendConnack(MQTTReasonCodes.IDENTIFIER_REJECTED_3);
disconnect(true);
return;
}
String clientId = clientIdValidation.getA();
boolean assignedClientId = clientIdValidation.getB();

boolean sessionPresent = session.getProtocolManager().getSessionStates().containsKey(clientId);
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
session.getConnection().setClientID(clientId);
sessionState.setFailed(false);
ServerSessionImpl serverSession = createServerSession(username, password, validatedUser);
serverSession.start();
ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);

if (cleanSession) {
if (cleanStart) {
/* [MQTT-3.1.2-6] If CleanSession is set to 1, the Client and Server MUST discard any previous Session and
* start a new one. This Session lasts as long as the Network Connection. State data associated with this Session
* MUST NOT be reused in any subsequent Session */
session.clean();
session.setClean(true);
}

if (will) {
isWill = true;
this.willMessage = ByteBufAllocator.DEFAULT.buffer(willMessage.length);
this.willMessage.writeBytes(willMessage);
this.willQoSLevel = willQosLevel;
this.willRetain = willRetain;
this.willTopic = willTopic;
if (connect.variableHeader().isWillFlag()) {
session.getState().setWill(true);
byte[] willMessage = connect.payload().willMessageInBytes();
session.getState().setWillMessage(ByteBufAllocator.DEFAULT.buffer(willMessage.length).writeBytes(willMessage));
session.getState().setWillQoSLevel(connect.variableHeader().willQos());
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
session.getState().setWillTopic(connect.payload().willTopic());

if (session.is5()) {
MqttProperties willProperties = connect.payload().willProperties();
if (willProperties != null) {
MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
if (willDelayInterval != null) {
session.getState().setWillDelayInterval(( int) willDelayInterval.value());
}
}
}
}

MqttProperties connackProperties;
if (session.is5()) {
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));

sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
sessionState.setClientMaxPacketSize(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), MAXIMUM_PACKET_SIZE, 0));
sessionState.setClientTopicAliasMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), TOPIC_ALIAS_MAXIMUM));

connackProperties = getConnackProperties(clientId, assignedClientId);
} else {
connackProperties = MqttProperties.NO_PROPERTIES;
}

session.getConnection().setConnected(true);
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_ACCEPTED, sessionPresent && !cleanSession, MqttProperties.NO_PROPERTIES);
session.getProtocolHandler().sendConnack(MQTTReasonCodes.SUCCESS, sessionPresent && !cleanStart, connackProperties);
// ensure we don't publish before the CONNACK
session.start();
}
}

/**
* Creates an internal Server Session.
*
* @param username
* @param password
* @return
* @throws Exception
*/
private MqttProperties getConnackProperties(String clientId, boolean assignedClientId) {
MqttProperties connackProperties = new MqttProperties();

if (assignedClientId) {
connackProperties.add(new MqttProperties.StringProperty(ASSIGNED_CLIENT_IDENTIFIER.value(), clientId));
}

if (this.session.getProtocolManager().getTopicAliasMaximum() != -1) {
connackProperties.add(new MqttProperties.IntegerProperty(TOPIC_ALIAS_MAXIMUM.value(), this.session.getProtocolManager().getTopicAliasMaximum()));
}

if (this.session.isUsingServerKeepAlive()) {
connackProperties.add(new MqttProperties.IntegerProperty(SERVER_KEEP_ALIVE.value(), this.session.getProtocolManager().getServerKeepAlive()));
}

if (this.session.getProtocolManager().getMaximumPacketSize() != -1) {
connackProperties.add(new MqttProperties.IntegerProperty(MAXIMUM_PACKET_SIZE.value(), this.session.getProtocolManager().getMaximumPacketSize()));
}

return connackProperties;
}

ServerSessionImpl createServerSession(String username, String password, String validatedUser) throws Exception {
String id = UUIDGenerator.getInstance().generateStringUUID();
ActiveMQServer server = session.getServer();
Expand All @@ -144,19 +200,15 @@ void disconnect(boolean failure) {
return;
}

synchronized (session.getSessionState()) {
synchronized (session.getState()) {
try {
if (isWill && failure) {
session.getMqttPublishManager().sendInternal(0, willTopic, willQoSLevel, willMessage, willRetain, true);
}
session.stop();
session.stop(failure);
session.getConnection().destroy();
} catch (Exception e) {
log.error("Error disconnecting client: " + e.getMessage());
MQTTLogger.LOGGER.errorDisconnectingClient(e);
} finally {
if (session.getSessionState() != null) {
session.getSessionState().setAttached(false);
String clientId = session.getSessionState().getClientId();
if (session.getState() != null) {
String clientId = session.getState().getClientId();
/**
* ensure that the connection for the client ID matches *this* connection otherwise we could remove the
* entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
Expand All @@ -173,10 +225,12 @@ private synchronized MQTTSessionState getSessionState(String clientId) {
return session.getProtocolManager().getSessionState(clientId);
}

private String validateClientId(String clientId, boolean cleanSession) {
private Pair<String, Boolean> validateClientId(String clientId, boolean cleanSession) {
Boolean assigned = Boolean.FALSE;
if (clientId == null || clientId.isEmpty()) {
// [MQTT-3.1.3-7] [MQTT-3.1.3-6] If client does not specify a client ID and clean session is set to 1 create it.
if (cleanSession) {
assigned = Boolean.TRUE;
clientId = UUID.randomUUID().toString();
} else {
// [MQTT-3.1.3-8] Return ID rejected and disconnect if clean session = false and client id is null
Expand All @@ -186,10 +240,14 @@ private String validateClientId(String clientId, boolean cleanSession) {
MQTTConnection connection = session.getProtocolManager().addConnectedClient(clientId, session.getConnection());

if (connection != null) {
MQTTSession existingSession = session.getProtocolManager().getSessionState(clientId).getSession();
if (session.is5()) {
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
}
// [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client
connection.disconnect(false);
existingSession.getConnectionManager().disconnect(false);
}
}
return clientId;
return new Pair<>(clientId, assigned);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public MQTTFailureListener(MQTTConnectionManager connectionManager) {

@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
connectionManager.disconnect(true);
connectionFailed(exception, failedOver, null);
}

@Override
Expand Down

0 comments on commit 8063110

Please sign in to comment.