Skip to content
Permalink
Browse files
ARTEMIS-3702 auth failures don't adhere to MQTT spec
The commit includes the following changes:
 - Don't drop the connection on subscribe or publish authorization
failures for 3.1 clients.
 - Don't drop the connection on subscribe authorization failures for
3.1.1 clients.
 - Add configuration parameter to control behavior on publish
authorization failures for 3.1.1 clients (either disconnect or not).
  • Loading branch information
jbertram authored and clebertsuconic committed Mar 15, 2022
1 parent f7915a2 commit 2b5a25a10611366fd4f643587efcf2292b0f548f
Showing 9 changed files with 261 additions and 32 deletions.
@@ -58,9 +58,8 @@ public MQTTConnectionManager(MQTTSession session) {
}

void connect(MqttConnectMessage connect, String validatedUser) throws Exception {
int packetVersion = connect.variableHeader().version();
if (packetVersion == MqttVersion.MQTT_5.protocolLevel()) {
session.set5(true);
session.setVersion(MQTTVersion.getVersion(connect.variableHeader().version()));
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setProtocolVersion(Byte.toString(MqttVersion.MQTT_5.protocolLevel()));
String authenticationMethod = MQTTUtil.getProperty(String.class, connect.variableHeader().properties(), AUTHENTICATION_METHOD);

@@ -121,7 +120,7 @@ void connect(MqttConnectMessage connect, String validatedUser) throws Exception
session.getState().setWillRetain(connect.variableHeader().isWillRetain());
session.getState().setWillTopic(connect.payload().willTopic());

if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
MqttProperties willProperties = connect.payload().willProperties();
if (willProperties != null) {
MqttProperties.MqttProperty willDelayInterval = willProperties.getProperty(WILL_DELAY_INTERVAL.value());
@@ -133,7 +132,7 @@ void connect(MqttConnectMessage connect, String validatedUser) throws Exception
}

MqttProperties connackProperties;
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getConnection().setReceiveMaximum(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), RECEIVE_MAXIMUM, -1));

sessionState.setClientSessionExpiryInterval(MQTTUtil.getProperty(Integer.class, connect.variableHeader().properties(), SESSION_EXPIRY_INTERVAL, 0));
@@ -241,7 +240,7 @@ private Pair<String, Boolean> validateClientId(String clientId, boolean cleanSes

if (connection != null) {
MQTTSession existingSession = session.getProtocolManager().getSessionState(clientId).getSession();
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
existingSession.getProtocolHandler().sendDisconnect(MQTTReasonCodes.SESSION_TAKEN_OVER);
}
// [MQTT-3.1.4-2] If the client ID represents a client already connected to the server then the server MUST disconnect the existing client
@@ -99,7 +99,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
MqttMessage message = (MqttMessage) msg;

if (stopped) {
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
}
disconnect(true);
@@ -109,7 +109,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
// Disconnect if Netty codec failed to decode the stream.
if (message.decoderResult().isFailure()) {
logger.debugf(message.decoderResult().cause(), "Disconnecting client due to message decoding failure.");
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.MALFORMED_PACKET);
}
disconnect(true);
@@ -186,7 +186,7 @@ public void act(MqttMessage message) {
}
} catch (Exception e) {
MQTTLogger.LOGGER.errorProcessingControlPacket(message.toString(), e);
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendDisconnect(MQTTReasonCodes.IMPLEMENTATION_SPECIFIC_ERROR);
}
disconnect(true);
@@ -232,7 +232,7 @@ void handleConnect(MqttConnectMessage connect) throws Exception {
try {
validatedUser = session.getServer().validateUser(username, password, session.getConnection(), session.getProtocolManager().getSecurityDomain());
} catch (ActiveMQSecurityException e) {
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
session.getProtocolHandler().sendConnack(MQTTReasonCodes.BAD_USER_NAME_OR_PASSWORD);
} else {
session.getProtocolHandler().sendConnack(MQTTReasonCodes.NOT_AUTHORIZED_3);
@@ -314,7 +314,7 @@ void sendDisconnect(byte reasonCode) {
}

void handlePublish(MqttPublishMessage message) throws Exception {
if (session.is5() && session.getProtocolManager().getMaximumPacketSize() != -1 && MQTTUtil.calculateMessageSize(message) > session.getProtocolManager().getMaximumPacketSize()) {
if (session.getVersion() == MQTTVersion.MQTT_5 && session.getProtocolManager().getMaximumPacketSize() != -1 && MQTTUtil.calculateMessageSize(message) > session.getProtocolManager().getMaximumPacketSize()) {
sendDisconnect(MQTTReasonCodes.PACKET_TOO_LARGE);
disconnect(true);
return;
@@ -353,7 +353,7 @@ void sendPublishProtocolControlMessage(int messageId, MqttMessageType messageTyp
MqttFixedHeader fixedHeader = new MqttFixedHeader(messageType, false, (messageType == MqttMessageType.PUBREL) ? MqttQoS.AT_LEAST_ONCE : MqttQoS.AT_MOST_ONCE, false, 0);

MqttMessageIdVariableHeader variableHeader;
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
variableHeader = new MqttPubReplyMessageVariableHeader(messageId, reasonCode, MqttProperties.NO_PROPERTIES);
} else {
variableHeader = MqttMessageIdVariableHeader.from(messageId);
@@ -391,7 +391,7 @@ void handleUnsubscribe(MqttUnsubscribeMessage message) throws Exception {
short[] reasonCodes = session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
MqttUnsubAckMessage unsubAck;
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
unsubAck = new MqttUnsubAckMessage(header, message.variableHeader(), new MqttUnsubAckPayload(reasonCodes));
} else {
unsubAck = new MqttUnsubAckMessage(header, message.variableHeader());
@@ -72,6 +72,8 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ

private int maximumPacketSize = MQTTUtil.DEFAULT_MAXIMUM_PACKET_SIZE;

private boolean closeMqttConnectionOnPublishAuthorizationFailure = true;

private final MQTTRoutingHandler routingHandler;

MQTTProtocolManager(ActiveMQServer server,
@@ -128,6 +130,14 @@ public MQTTProtocolManager setServerKeepAlive(int serverKeepAlive) {
return this;
}

public boolean isCloseMqttConnectionOnPublishAuthorizationFailure() {
return closeMqttConnectionOnPublishAuthorizationFailure;
}

public void setCloseMqttConnectionOnPublishAuthorizationFailure(boolean closeMqttConnectionOnPublishAuthorizationFailure) {
this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
}

@Override
public void onNotification(Notification notification) {
if (!(notification.getType() instanceof CoreNotificationType))
@@ -84,8 +84,11 @@ public class MQTTPublishManager {

private MQTTSessionState.OutboundStore outboundStore;

public MQTTPublishManager(MQTTSession session) {
private boolean closeMqttConnectionOnPublishAuthorizationFailure;

public MQTTPublishManager(MQTTSession session, boolean closeMqttConnectionOnPublishAuthorizationFailure) {
this.session = session;
this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
}

synchronized void start() {
@@ -173,7 +176,7 @@ protected void sendMessage(ICoreMessage message, ServerConsumer consumer, int de
void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception {
synchronized (lock) {
String topic = message.variableHeader().topicName();
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
Integer alias = MQTTUtil.getProperty(Integer.class, message.variableHeader().properties(), TOPIC_ALIAS);
Integer topicAliasMax = session.getProtocolManager().getTopicAliasMaximum();
if (alias != null) {
@@ -222,18 +225,40 @@ void sendToQueue(MqttPublishMessage message, boolean internal) throws Exception
tx.commit();
} catch (ActiveMQSecurityException e) {
tx.rollback();
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendMessageAck(internal, qos, messageId, MQTTReasonCodes.NOT_AUTHORIZED);
return;
} else {
} else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
/*
* For MQTT 3.x clients:
* For MQTT 3.1.1 clients:
*
* [MQTT-3.3.5-2] If a Server implementation does not authorize a PUBLISH to be performed by a Client;
* it has no way of informing that Client. It MUST either make a positive acknowledgement, according
* to the normal QoS rules, or close the Network Connection
*
* Throwing an exception here will ultimately close the connection. This is the default behavior.
*/
if (closeMqttConnectionOnPublishAuthorizationFailure) {
throw e;
} else {
if (logger.isDebugEnabled()) {
logger.debug("MQTT 3.1.1 client not authorized to publish message.");
}
}
} else {
/*
* For MQTT 3.1 clients:
*
* Note that if a server implementation does not authorize a PUBLISH to be made by a client, it has no
* way of informing that client. It must therefore make a positive acknowledgement, according to the
* normal QoS rules, and the client will *not* be informed that it was not authorized to publish the
* message.
*
* Log the failure since we have to just swallow it.
*/
throw e;
if (logger.isDebugEnabled()) {
logger.debug("MQTT 3.1 client not authorized to publish message.");
}
}
} catch (Throwable t) {
MQTTLogger.LOGGER.failedToPublishMqttMessage(t.getMessage(), t);
@@ -365,7 +390,7 @@ private boolean publishToClient(int messageId, ICoreMessage message, int deliver
boolean isRetain = message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY);
MqttProperties mqttProperties = getPublishProperties(message);

if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
if (session.getState().getSubscription(message.getAddress()) != null && !session.getState().getSubscription(message.getAddress()).option().isRetainAsPublished()) {
isRetain = false;
}
@@ -393,7 +418,7 @@ private boolean publishToClient(int messageId, ICoreMessage message, int deliver
MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, payload);

int maxSize = session.getState().getClientMaxPacketSize();
if (session.is5() && maxSize != 0) {
if (session.getVersion() == MQTTVersion.MQTT_5 && maxSize != 0) {
int size = MQTTUtil.calculateMessageSize(publish);
if (size > maxSize) {
/*
@@ -65,7 +65,7 @@ public class MQTTSession {

private CoreMessageObjectPools coreMessageObjectPools = new CoreMessageObjectPools();

private boolean five = false;
private MQTTVersion version = null;

private boolean usingServerKeepAlive = false;

@@ -80,7 +80,7 @@ public MQTTSession(MQTTProtocolHandler protocolHandler,
this.connection = connection;

mqttConnectionManager = new MQTTConnectionManager(this);
mqttPublishManager = new MQTTPublishManager(this);
mqttPublishManager = new MQTTPublishManager(this, protocolManager.isCloseMqttConnectionOnPublishAuthorizationFailure());
sessionCallback = new MQTTSessionCallback(this, connection);
subscriptionManager = new MQTTSubscriptionManager(this);
retainMessageManager = new MQTTRetainMessageManager(this);
@@ -120,7 +120,7 @@ synchronized void stop(boolean failure) throws Exception {
state.setDisconnectedTime(System.currentTimeMillis());
}

if (is5()) {
if (getVersion() == MQTTVersion.MQTT_5) {
if (state.getClientSessionExpiryInterval() == 0) {
if (state.isWill() && failure) {
// If the session expires the will message must be sent no matter the will delay
@@ -234,12 +234,12 @@ public CoreMessageObjectPools getCoreMessageObjectPools() {
return coreMessageObjectPools;
}

public boolean is5() {
return five;
public void setVersion(MQTTVersion version) {
this.version = version;
}

public void set5(boolean five) {
this.five = five;
public MQTTVersion getVersion() {
return this.version;
}

public boolean isUsingServerKeepAlive() {
@@ -309,13 +309,30 @@ int[] addSubscriptions(List<MqttTopicSubscription> subscriptions, MqttProperties
qos[i] = subscriptions.get(i).qualityOfService().value();
} catch (ActiveMQSecurityException e) {
// user is not authorized to create subsription
if (session.is5()) {
if (session.getVersion() == MQTTVersion.MQTT_5) {
qos[i] = MQTTReasonCodes.NOT_AUTHORIZED;
} else {
} else if (session.getVersion() == MQTTVersion.MQTT_3_1_1) {
qos[i] = MQTTReasonCodes.UNSPECIFIED_ERROR;
} else {
/*
* For MQTT 3.1 clients:
*
* Note that if a server implementation does not authorize a SUBSCRIBE request to be made by a client,
* it has no way of informing that client. It must therefore make a positive acknowledgement with a
* SUBACK, and the client will not be informed that it was not authorized to subscribe.
*
*
* For MQTT 3.1.1 clients:
*
* The 3.1.1 spec doesn't directly address the situation where the server does not authorize a
* SUBSCRIBE. It really just says this:
*
* [MQTT-3.8.4-1] When the Server receives a SUBSCRIBE Packet from a Client, the Server MUST respond
* with a SUBACK Packet.
*/
qos[i] = subscriptions.get(i).qualityOfService().value();
}
}

}
return qos;
}
@@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.artemis.core.protocol.mqtt;

public enum MQTTVersion {

MQTT_3_1, MQTT_3_1_1, MQTT_5;

public int getVersion() {
switch (this) {
case MQTT_3_1:
return 3;
case MQTT_3_1_1:
return 4;
case MQTT_5:
return 5;
default:
return -1;
}
}

public static MQTTVersion getVersion(int version) {
switch (version) {
case 3:
return MQTT_3_1;
case 4:
return MQTT_3_1_1;
case 5:
return MQTT_5;
default:
return null;
}
}
}
@@ -262,4 +262,24 @@ response style authentication.
However, there are currently no challenge / response mechanisms implemented so if
a client passes the "Authentication Method" property in its `CONNECT` packet it will
receive a `CONNACK` with a reason code of `0x8C` (i.e. bad authentication method)
and the network connection will be closed.
and the network connection will be closed.

## Publish Authorization Failures

The MQTT 3.1.1 specification is ambiguous regarding the broker's behavior when
a `PUBLISH` packet fails due to a lack of authorization. In [section 3.3.5](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718042)
it says:

> If a Server implementation does not authorize a PUBLISH to be performed by a
> Client; it has no way of informing that Client. It MUST either make a positive
> acknowledgement, according to the normal QoS rules, or close the Network
> Connection
By default the broker will close the network connection. However if you'd rather
have the broker make a positive acknowledgement then set the URL parameter
`closeMqttConnectionOnPublishAuthorizationFailure` to `false` on the relevant
MQTT `acceptor` in `broker.xml`, e.g.:

```xml
<acceptor name="mqtt">tcp://0.0.0:1883?protocols=MQTT;closeMqttConnectionOnPublishAuthorizationFailure=false</acceptor>
```

0 comments on commit 2b5a25a

Please sign in to comment.