Skip to content

Commit

Permalink
ARTEMIS-4200 configurable link-stealing for MQTT
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Mar 10, 2023
1 parent 9b4204b commit 68c5bed
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,8 +254,11 @@ void handleConnect(MqttConnectMessage connect) throws Exception {
return;
}

MQTTConnection existingConnection = session.getProtocolManager().addConnectedClient(session.getConnection().getClientID(), session.getConnection());
disconnectExistingSession(existingConnection);
if (handleLinkStealing() == LinkStealingResult.NEW_LINK_DENIED) {
return;
} else {
protocolManager.addConnectedClient(session.getConnection().getClientID(), session.getConnection());
}

if (connection.getTransportConnection().getRouter() == null || !protocolManager.getRoutingHandler().route(connection, session, connect)) {
calculateKeepAlive(connect);
Expand Down Expand Up @@ -452,19 +455,49 @@ private boolean checkClientVersion() {
return true;
}

// [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client
private void disconnectExistingSession(MQTTConnection existingConnection) {
if (existingConnection != null) {
MQTTSession existingSession = session.getProtocolManager().getSessionState(session.getConnection().getClientID()).getSession();
if (existingSession != null) {
if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
/* The MQTT specification states:
*
* [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST
* disconnect the existing client
*
* However, this behavior is configurable via the "allowLinkStealing" acceptor URL property.
*/
private LinkStealingResult handleLinkStealing() {
final String clientID = session.getConnection().getClientID();
LinkStealingResult result;

if (protocolManager.isClientConnected(clientID)) {
MQTTConnection existingConnection = protocolManager.getConnectedClient(clientID);
if (protocolManager.isAllowLinkStealing()) {
MQTTSession existingSession = protocolManager.getSessionState(clientID).getSession();
if (existingSession != null) {
if (existingSession.getVersion() == MQTTVersion.MQTT_5) {
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
}
existingSession.getConnectionManager().disconnect(false);
} else {
existingConnection.disconnect(false);
}
existingSession.getConnectionManager().disconnect(false);
logger.debug("Existing MQTT session from {} closed due to incoming session from {} with the same client ID: {}", existingConnection.getRemoteAddress(), connection.getRemoteAddress(), session.getConnection().getClientID());
result = LinkStealingResult.EXISTING_LINK_STOLEN;
} else {
existingConnection.disconnect(false);
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.UNSPECIFIED_ERROR);
}
logger.debug("Incoming MQTT session from {} closed due to existing session from {} with the same client ID: {}", connection.getRemoteAddress(), existingConnection.getRemoteAddress(), session.getConnection().getClientID());
/*
* Stopping the session here prevents the connection failure listener from inadvertently removing the
* existing session once the connection is disconnected.
*/
session.setStopped(true);
connection.disconnect(false);
result = LinkStealingResult.NEW_LINK_DENIED;
}
} else {
result = LinkStealingResult.NO_ACTION;
}

return result;
}

private Pair<Boolean, String> validateUser(String username, String password) throws Exception {
Expand Down Expand Up @@ -510,4 +543,8 @@ private boolean handleInvalidClientId() {
disconnect(true);
return false;
}

private enum LinkStealingResult {
EXISTING_LINK_STOLEN, NEW_LINK_DENIED, NO_ACTION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ

private boolean closeMqttConnectionOnPublishAuthorizationFailure = true;

private boolean allowLinkStealing = true;

private final MQTTRoutingHandler routingHandler;

MQTTProtocolManager(ActiveMQServer server,
Expand Down Expand Up @@ -139,6 +141,14 @@ public void setCloseMqttConnectionOnPublishAuthorizationFailure(boolean closeMqt
this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
}

public boolean isAllowLinkStealing() {
return allowLinkStealing;
}

public void setAllowLinkStealing(boolean allowLinkStealing) {
this.allowLinkStealing = allowLinkStealing;
}

@Override
public void onNotification(Notification notification) {
if (!(notification.getType() instanceof CoreNotificationType))
Expand Down Expand Up @@ -348,6 +358,10 @@ public boolean isClientConnected(String clientId, MQTTConnection connection) {
return false;
}

public boolean isClientConnected(String clientId) {
return connectedClients.containsKey(clientId);
}

public void removeConnectedClient(String clientId) {
connectedClients.remove(clientId);
}
Expand All @@ -362,6 +376,10 @@ public MQTTConnection addConnectedClient(String clientId, MQTTConnection connect
return connectedClients.put(clientId, connection);
}

public MQTTConnection getConnectedClient(String clientId) {
return connectedClients.get(clientId);
}

public MQTTSessionState getSessionState(String clientId) {
/* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise create a new one. */
return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ boolean getStopped() {
return stopped;
}

void setStopped(boolean stopped) {
this.stopped = stopped;
}

boolean isClean() {
return clean;
}
Expand Down
14 changes: 14 additions & 0 deletions docs/user-manual/en/mqtt.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,20 @@ SSL/TLS is also available, e.g.:
Web browsers can then connect to `wss://<server>:8883` using a Web Socket to
send and receive MQTT messages.

## Link Stealing

The MQTT specifications define a behavior often referred to as "link stealing."
This means that whenever a new client connects with the same client ID as
another existing client then the existing client's session will be closed and
its network connection will be terminated.

In certain use-cases this behavior is not desired so it is configurable. The
URL parameter `allowLinkStealing` can be configured on the MQTT `acceptor` to
modify this behavior. By default `allowLinkStealing` is `true`. If it is set to
`false` then whenever a new client connects with the same client ID as another
existing client then the _new_ client's session will be closed and its network
connection will be terminated. In the case of MQTT 5 clients they will receive
a disconnect reason code of [`0x80` (i.e. "Unspecified error")](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901208).

## Automatic Subscription Clean-up

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.mqtt;

import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.tests.util.Wait;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.junit.Test;

public class MQTTDisabledLinkStealingTest extends MQTTTestSupport {

@Test(timeout = 60 * 1000)
public void testDisabledLinkStealing() throws Exception {
final String clientId = RandomUtil.randomString();
MQTT mqtt = createMQTTConnection(clientId, false);
mqtt.setKeepAlive((short) 2);

final BlockingConnection connection1 = mqtt.blockingConnection();
connection1.connect();

final BlockingConnection connection2 = mqtt.blockingConnection();
try {
connection2.connect();
fail("Should have thrown an exception on connect due to disabled link stealing");
} catch (Exception e) {
// ignore expected exception
}

assertTrue("Client no longer connected!", Wait.waitFor(() -> connection1.isConnected(), 3000, 200));
connection1.disconnect();
}

@Override
protected void addMQTTConnector() throws Exception {
server.getConfiguration().addAcceptorConfiguration("MQTT", "tcp://localhost:" + port + "?protocols=MQTT;anycastPrefix=anycast:;multicastPrefix=multicast:;allowLinkStealing=false");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -490,4 +490,30 @@ public void testAutoDeleteAddressWithWildcardSubscription() throws Exception {
consumer.disconnect();
consumer.close();
}

@Test(timeout = DEFAULT_TIMEOUT)
public void testConnectionStealingDisabled() throws Exception {
setAcceptorProperty("allowLinkStealing=false");
final String CLIENT_ID = RandomUtil.randomString();

MqttClient client = createPahoClient(CLIENT_ID);
client.connect();

MqttClient client2 = createPahoClient(CLIENT_ID);
try {
client2.connect();
fail("Should have thrown an exception on connect due to disabled link stealing");
} catch (Exception e) {
// ignore expected exception
}

// only 1 session should exist
Wait.assertEquals(1, () -> getSessionStates().size(), 2000, 100);
assertNotNull(getSessionStates().get(CLIENT_ID));

assertTrue(client.isConnected());

client.disconnect();
client.close();
}
}

0 comments on commit 68c5bed

Please sign in to comment.