Skip to content

Commit

Permalink
Added reason string to CONNACK (moquette-io#713)
Browse files Browse the repository at this point in the history
  • Loading branch information
andsel committed Jun 3, 2023
1 parent b5603e4 commit 5cfe90f
Showing 1 changed file with 35 additions and 6 deletions.
41 changes: 35 additions & 6 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.*;
import io.netty.handler.codec.mqtt.MqttMessageBuilders.ConnAckPropertiesBuilder;
import io.netty.handler.timeout.IdleStateHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -40,7 +41,9 @@
import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
Expand Down Expand Up @@ -191,7 +194,14 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
}

if (!login(msg, clientId)) {
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
if (isProtocolVersion(msg, MqttVersion.MQTT_5)) {
final ConnAckPropertiesBuilder builder = prepareConnAckPropertiesBuilder(false, clientId);
builder.reasonString("User credentials provided are not recognized as valid");
abortConnectionV5(CONNECTION_REFUSED_BAD_USERNAME_OR_PASSWORD, builder);
} else {
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
}

channel.close().addListener(CLOSE_ON_FAILURE);
return PostOffice.RouteResult.failed(clientId);
}
Expand Down Expand Up @@ -227,7 +237,13 @@ private void executeConnect(MqttConnectMessage msg, String clientId, boolean ser
bindedSession = result.session;
} catch (SessionCorruptedException scex) {
LOG.warn("MQTT session for client ID {} cannot be created", clientId);
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
if (isProtocolVersion(msg, MqttVersion.MQTT_5)) {
final ConnAckPropertiesBuilder builder = prepareConnAckPropertiesBuilder(false, clientId);
builder.reasonString("Error creating the session, retry later");
abortConnectionV5(CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID, builder);
} else {
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
return;
}
NettyUtils.clientID(channel, clientId);
Expand Down Expand Up @@ -288,21 +304,25 @@ public void operationComplete(ChannelFuture future) throws Exception {
}

private MqttProperties prepareConnAckProperties(boolean serverGeneratedClientId, String clientId) {
final MqttMessageBuilders.ConnAckPropertiesBuilder builder = new MqttMessageBuilders.ConnAckPropertiesBuilder();
return prepareConnAckPropertiesBuilder(serverGeneratedClientId, clientId).build();
}

private ConnAckPropertiesBuilder prepareConnAckPropertiesBuilder(boolean serverGeneratedClientId, String clientId) {
final ConnAckPropertiesBuilder builder = new ConnAckPropertiesBuilder();
// default maximumQos is 2, [MQTT-3.2.2-10]
// unlimited maximumPacketSize inside however the protocol limit
if (serverGeneratedClientId) {
builder.assignedClientId(clientId);
}

return builder
builder
.sessionExpiryInterval(BrokerConstants.INFINITE_SESSION_EXPIRY)
.receiveMaximum(INFLIGHT_WINDOW_SIZE)
.retainAvailable(true)
.wildcardSubscriptionAvailable(true)
.subscriptionIdentifiersAvailable(false)
.sharedSubscriptionAvailable(false)
.build();
.sharedSubscriptionAvailable(false);
return builder;
}

private void setupInflightResender(Channel channel) {
Expand Down Expand Up @@ -345,6 +365,15 @@ private void abortConnection(MqttConnectReturnCode returnCode) {
channel.close().addListener(CLOSE_ON_FAILURE);
}

private void abortConnectionV5(MqttConnectReturnCode returnCode, ConnAckPropertiesBuilder propertiesBuilder) {
MqttConnAckMessage badProto = MqttMessageBuilders.connAck()
.returnCode(returnCode)
.properties(propertiesBuilder.build())
.sessionPresent(false).build();
channel.writeAndFlush(badProto).addListener(FIRE_EXCEPTION_ON_FAILURE);
channel.close().addListener(CLOSE_ON_FAILURE);
}

private boolean login(MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
Expand Down

0 comments on commit 5cfe90f

Please sign in to comment.