Skip to content

Commit

Permalink
ARTEMIS-966 MQTT subscription state isn't durable
Browse files Browse the repository at this point in the history
Durable subscrption state is part of the MQTT specification which has
not been supported until now. This functionality is implemented via an
internal last-value queue. When an MQTT client creates, updates, or
adds a subscription a message using the client-ID as the last-value is
sent to the internal queue. When the broker restarts this data is read
from the queue and populates the in-memory MQTT data-structures.
Therefore subscribers can reconnect and resume their session's
subscriptions without have to manually resubscribe.

MQTT state is now managed centrally per-broker rather than in the
MQTTProtocolManager since there is one instance of MQTTProtocolManager
for each acceptor allowing MQTT connections. Managing state per acceptor
would allow odd behavior with clients connecting to different acceptors
with the same client ID.

The subscriptions are serialized as raw bytes with a "version" byte for
potential future use, but I intentionally avoided adding complex
scaffolding to support multiple versions. We can add that complexity
later if necessary.

Some tests needed to be changed since instantiating an MQTT protocol
manager now creates an internal queue. A handful of tests assume that no
queues will exist other than the ones they create themselves. I updated
the main test super-class so that an MQTT protocol manager is not
automatically instantiated when configuring a broker for in-vm support.
  • Loading branch information
jbertram authored and gemmellr committed Sep 13, 2023
1 parent e7a27f0 commit af2672e
Show file tree
Hide file tree
Showing 36 changed files with 810 additions and 225 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public static int randomPositiveInt() {
return Math.abs(RandomUtil.randomInt());
}

public static Integer randomPositiveIntOrNull() {
Integer random = RandomUtil.randomInt();
return random % 5 == 0 ? null : Math.abs(random);
}

public static ActiveMQBuffer randomBuffer(final int size, final long... data) {
ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(size + 8 * data.length);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1270,9 +1270,17 @@ public CoreMessage toCore(CoreMessageObjectPools coreMessageObjectPools) {
public String toString() {
try {
final TypedProperties properties = getProperties();
return "CoreMessage[messageID=" + messageID + ",durable=" + isDurable() + ",userID=" + getUserID() + ",priority=" + this.getPriority() +
", timestamp=" + toDate(getTimestamp()) + ",expiration=" + toDate(getExpiration()) +
", durable=" + durable + ", address=" + getAddress() + ",size=" + getPersistentSize() + ",properties=" + properties + "]@" + System.identityHashCode(this);
return "CoreMessage[messageID=" + messageID +
", durable=" + isDurable() +
", userID=" + getUserID() +
", priority=" + this.getPriority() +
", timestamp=" + toDate(getTimestamp()) +
", expiration=" + toDate(getExpiration()) +
", durable=" + durable +
", address=" + getAddress() +
", size=" + getPersistentSize() +
", properties=" + properties +
"]@" + System.identityHashCode(this);
} catch (Throwable e) {
logger.warn("Error creating String for message: ", e);
return "ServerMessage[messageID=" + messageID + "]";
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.protocol.mqtt;

import org.apache.activemq.artemis.logs.BundleFactory;
import org.apache.activemq.artemis.logs.annotation.LogBundle;
import org.apache.activemq.artemis.logs.annotation.Message;

/**
* Logger Code 85
*/
@LogBundle(projectCode = "AMQ", regexID = "85[0-9]{4}")
public interface MQTTBundle {

MQTTBundle BUNDLE = BundleFactory.newBundle(MQTTBundle.class);

@Message(id = 850000, value = "Unable to store MQTT state within given timeout: {}ms")
IllegalStateException unableToStoreMqttState(long timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void connect(MqttConnectMessage connect, String validatedUser, String username,
boolean cleanStart = connect.variableHeader().isCleanSession();

String clientId = session.getConnection().getClientID();
boolean sessionPresent = session.getProtocolManager().getSessionStates().containsKey(clientId);
boolean sessionPresent = session.getStateManager().getSessionStates().containsKey(clientId);
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
Expand Down Expand Up @@ -120,6 +120,7 @@ void connect(MqttConnectMessage connect, String validatedUser, String username,

connackProperties = getConnackProperties();
} else {
sessionState.setClientSessionExpiryInterval(session.getProtocolManager().getDefaultMqttSessionExpiryInterval());
connackProperties = MqttProperties.NO_PROPERTIES;
}

Expand Down Expand Up @@ -193,15 +194,15 @@ void disconnect(boolean failure) {
* ensure that the connection for the client ID matches *this* connection otherwise we could remove the
* entry for the client who "stole" this client ID via [MQTT-3.1.4-2]
*/
if (clientId != null && session.getProtocolManager().isClientConnected(clientId, session.getConnection())) {
session.getProtocolManager().removeConnectedClient(clientId);
if (clientId != null && session.getStateManager().isClientConnected(clientId, session.getConnection())) {
session.getStateManager().removeConnectedClient(clientId);
}
}
}
}
}

private synchronized MQTTSessionState getSessionState(String clientId) {
return session.getProtocolManager().getSessionState(clientId);
private synchronized MQTTSessionState getSessionState(String clientId) throws Exception {
return session.getStateManager().getSessionState(clientId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,7 @@ public interface MQTTLogger {

@LogMessage(id = 834007, value = "Authorization failure sending will message: {}", level = LogMessage.Level.ERROR)
void authorizationFailureSendingWillMessage(String message);

@LogMessage(id = 834008, value = "Failed to remove session state for client with ID: {}", level = LogMessage.Level.ERROR)
void failedToRemoveSessionState(String clientID, Exception e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
import io.netty.util.ReferenceCountUtil;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.DisconnectException;
import org.apache.activemq.artemis.core.protocol.mqtt.exceptions.InvalidClientIdException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
Expand All @@ -57,6 +57,7 @@
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_DATA;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.AUTHENTICATION_METHOD;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SESSION_EXPIRY_INTERVAL;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SUBSCRIPTION_IDENTIFIER;

/**
* This class is responsible for receiving and sending MQTT packets, delegating behaviour to one of the
Expand Down Expand Up @@ -257,7 +258,7 @@ void handleConnect(MqttConnectMessage connect) throws Exception {
if (handleLinkStealing() == LinkStealingResult.NEW_LINK_DENIED) {
return;
} else {
protocolManager.addConnectedClient(session.getConnection().getClientID(), session.getConnection());
protocolManager.getStateManager().addConnectedClient(session.getConnection().getClientID(), session.getConnection());
}

if (connection.getTransportConnection().getRouter() == null || !protocolManager.getRoutingHandler().route(connection, session, connect)) {
Expand Down Expand Up @@ -377,15 +378,16 @@ void handlePubcomp(MqttMessage message) throws Exception {
}

void handleSubscribe(MqttSubscribeMessage message) throws Exception {
int[] qos = session.getSubscriptionManager().addSubscriptions(message.payload().topicSubscriptions(), message.idAndPropertiesVariableHeader().properties());
Integer subscriptionIdentifier = MQTTUtil.getProperty(Integer.class, message.idAndPropertiesVariableHeader().properties(), SUBSCRIPTION_IDENTIFIER, null);
int[] qos = session.getSubscriptionManager().addSubscriptions(message.payload().topicSubscriptions(), subscriptionIdentifier);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttMessageIdAndPropertiesVariableHeader variableHeader = new MqttMessageIdAndPropertiesVariableHeader(message.variableHeader().messageId(), MqttProperties.NO_PROPERTIES);
MqttSubAckMessage subAck = new MqttSubAckMessage(header, variableHeader, new MqttSubAckPayload(qos));
sendToClient(subAck);
}

void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics(), true);
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttUnsubAckMessage unsubAck;
if (session.getVersion() == MQTTVersion.MQTT_5) {
Expand Down Expand Up @@ -462,14 +464,14 @@ private boolean checkClientVersion() {
*
* However, this behavior is configurable via the "allowLinkStealing" acceptor URL property.
*/
private LinkStealingResult handleLinkStealing() {
private LinkStealingResult handleLinkStealing() throws Exception {
final String clientID = session.getConnection().getClientID();
LinkStealingResult result;

if (protocolManager.isClientConnected(clientID)) {
MQTTConnection existingConnection = protocolManager.getConnectedClient(clientID);
if (protocolManager.getStateManager().isClientConnected(clientID)) {
MQTTConnection existingConnection = protocolManager.getStateManager().getConnectedClient(clientID);
if (protocolManager.isAllowLinkStealing()) {
MQTTSession existingSession = protocolManager.getSessionState(clientID).getSession();
MQTTSession existingSession = protocolManager.getStateManager().getSessionState(clientID).getSession();
if (existingSession != null) {
if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -36,6 +34,7 @@
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
Expand All @@ -47,7 +46,6 @@
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;

public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInterceptor, MQTTConnection, MQTTRoutingHandler> implements NotificationListener {

Expand All @@ -60,9 +58,6 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<>();
private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<>();

private final Map<String, MQTTConnection> connectedClients = new ConcurrentHashMap<>();
private final Map<String, MQTTSessionState> sessionStates = new ConcurrentHashMap<>();

private int defaultMqttSessionExpiryInterval = -1;

private int topicAliasMaximum = MQTTUtil.DEFAULT_TOPIC_ALIAS_MAX;
Expand All @@ -79,13 +74,23 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ

private final MQTTRoutingHandler routingHandler;

private MQTTStateManager sessionStateManager;

MQTTProtocolManager(ActiveMQServer server,
List<BaseInterceptor> incomingInterceptors,
List<BaseInterceptor> outgoingInterceptors) {
List<BaseInterceptor> outgoingInterceptors) throws Exception {
this.server = server;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
server.getManagementService().addNotificationListener(this);
routingHandler = new MQTTRoutingHandler(server);
sessionStateManager = MQTTStateManager.getInstance(server);
server.registerActivateCallback(new CleaningActivateCallback() {
@Override
public void deActivate() {
MQTTStateManager.removeInstance(server);
sessionStateManager = null;
}
});
}

public int getDefaultMqttSessionExpiryInterval() {
Expand Down Expand Up @@ -176,7 +181,7 @@ public void onNotification(Notification notification) {
* in the SESSION_CREATED notification, you need to close this connection.
* Avoid consumers with the same client ID in the cluster appearing at different nodes at the same time.
*/
MQTTConnection mqttConnection = connectedClients.get(clientId);
MQTTConnection mqttConnection = sessionStateManager.getConnectedClients().get(clientId);
if (mqttConnection != null) {
mqttConnection.destroy();
}
Expand All @@ -197,39 +202,6 @@ public void updateInterceptors(List incoming, List outgoing) {
this.outgoingInterceptors.addAll(getFactory().filterInterceptors(outgoing));
}

public void scanSessions() {
List<String> toRemove = new ArrayList();
for (Map.Entry<String, MQTTSessionState> entry : sessionStates.entrySet()) {
MQTTSessionState state = entry.getValue();
logger.debug("Inspecting session: {}", state);
int sessionExpiryInterval = getSessionExpiryInterval(state);
if (!state.isAttached() && sessionExpiryInterval > 0 && state.getDisconnectedTime() + (sessionExpiryInterval * 1000) < System.currentTimeMillis()) {
toRemove.add(entry.getKey());
}
if (state.isWill() && !state.isAttached() && state.isFailed() && state.getWillDelayInterval() > 0 && state.getDisconnectedTime() + (state.getWillDelayInterval() * 1000) < System.currentTimeMillis()) {
state.getSession().sendWillMessage();
}
}

for (String key : toRemove) {
logger.debug("Removing state for session: {}", key);
MQTTSessionState state = removeSessionState(key);
if (state != null && state.isWill() && !state.isAttached() && state.isFailed()) {
state.getSession().sendWillMessage();
}
}
}

private int getSessionExpiryInterval(MQTTSessionState state) {
int sessionExpiryInterval;
if (state.getClientSessionExpiryInterval() == 0) {
sessionExpiryInterval = getDefaultMqttSessionExpiryInterval();
} else {
sessionExpiryInterval = state.getClientSessionExpiryInterval();
}
return sessionExpiryInterval;
}

@Override
public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
try {
Expand Down Expand Up @@ -348,56 +320,7 @@ public String invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection)
return super.invokeInterceptors(this.outgoingInterceptors, mqttMessage, connection);
}

public boolean isClientConnected(String clientId, MQTTConnection connection) {
MQTTConnection connectedConn = connectedClients.get(clientId);

if (connectedConn != null) {
return connectedConn.equals(connection);
}

return false;
}

public boolean isClientConnected(String clientId) {
return connectedClients.containsKey(clientId);
}

public void removeConnectedClient(String clientId) {
connectedClients.remove(clientId);
}

/**
* @param clientId
* @param connection
* @return the {@code MQTTConnection} that the added connection replaced or null if there was no previous entry for
* the {@code clientId}
*/
public MQTTConnection addConnectedClient(String clientId, MQTTConnection connection) {
return connectedClients.put(clientId, connection);
}

public MQTTConnection getConnectedClient(String clientId) {
return connectedClients.get(clientId);
}

public MQTTSessionState getSessionState(String clientId) {
/* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */
return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new);
}

public MQTTSessionState removeSessionState(String clientId) {
if (clientId == null) {
return null;
}
return sessionStates.remove(clientId);
}

public Map<String, MQTTSessionState> getSessionStates() {
return new HashMap<>(sessionStates);
}

/** For DEBUG only */
public Map<String, MQTTConnection> getConnectedClients() {
return connectedClients;
public MQTTStateManager getStateManager() {
return sessionStateManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void run() {
if (protocolHandler != null) {
protocolHandler.getProtocolMap().values().forEach(m -> {
if (m instanceof MQTTProtocolManager) {
((MQTTProtocolManager)m).scanSessions();
((MQTTProtocolManager)m).getStateManager().scanSessions();
}
});
}
Expand Down

0 comments on commit af2672e

Please sign in to comment.