Skip to content

Commit

Permalink
Adds CONNACK properties, in particular return which capabilities are …
Browse files Browse the repository at this point in the history
…actually enabled (moquette-io#709)

* Added CONNACK capabilities
* Extracted common shared code in abstract superclass
* Fixed empty clientId management for MQTT5
* Added checks for majority of CONNACK properties, in particular assignedClientId
* Introduce isProtocolVersion, and redefine isNotProtocolVersion
  • Loading branch information
andsel committed Jan 29, 2023
1 parent a4b5ee9 commit e8f43b9
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 72 deletions.
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public final class BrokerConstants {

public static final int FLIGHT_BEFORE_RESEND_MS = 5_000;
public static final int INFLIGHT_WINDOW_SIZE = 10;
public static final int INFINITE_SESSION_EXPIRY = 0xFFFFFFFF;

private BrokerConstants() {
}
Expand Down
77 changes: 60 additions & 17 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.moquette.broker;

import io.moquette.BrokerConstants;
import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.netty.buffer.ByteBuf;
Expand All @@ -29,16 +30,23 @@
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.*;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;

final class MQTTConnection {

Expand Down Expand Up @@ -151,22 +159,28 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
return PostOffice.RouteResult.failed(clientId);
}
final boolean cleanSession = msg.variableHeader().isCleanSession();
final boolean serverGeneratedClientId;
if (clientId == null || clientId.length() == 0) {
if (!brokerConfig.isAllowZeroByteClientId()) {
LOG.info("Broker doesn't permit MQTT empty client ID. Username: {}", username);
abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
return PostOffice.RouteResult.failed(clientId);
}
if (isNotProtocolVersion(msg, MqttVersion.MQTT_5)) {
if (!brokerConfig.isAllowZeroByteClientId()) {
LOG.info("Broker doesn't permit MQTT empty client ID. Username: {}", username);
abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
return PostOffice.RouteResult.failed(clientId);
}

if (!cleanSession) {
LOG.info("MQTT client ID cannot be empty for persistent session. Username: {}", username);
abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
return PostOffice.RouteResult.failed(clientId);
if (!cleanSession) {
LOG.info("MQTT client ID cannot be empty for persistent session. Username: {}", username);
abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED);
return PostOffice.RouteResult.failed(clientId);
}
}

// Generating client id.
clientId = UUID.randomUUID().toString().replace("-", "");
serverGeneratedClientId = true;
LOG.debug("Client has connected with integration generated id: {}, username: {}", clientId, username);
} else {
serverGeneratedClientId = false;
}

if (!login(msg, clientId)) {
Expand All @@ -177,15 +191,15 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {

final String sessionId = clientId;
return postOffice.routeCommand(clientId, "CONN", () -> {
executeConnect(msg, sessionId);
executeConnect(msg, sessionId, serverGeneratedClientId);
return null;
});
}

/**
* Invoked by the Session's event loop.
* */
private void executeConnect(MqttConnectMessage msg, String clientId) {
private void executeConnect(MqttConnectMessage msg, String clientId, boolean serverGeneratedClientId) {
final SessionRegistry.SessionCreationResult result;
try {
LOG.trace("Binding MQTTConnection to session");
Expand All @@ -200,11 +214,18 @@ private void executeConnect(MqttConnectMessage msg, String clientId) {
NettyUtils.clientID(channel, clientId);

final boolean msgCleanSessionFlag = msg.variableHeader().isCleanSession();
// [MQTT-3.2.2-2, MQTT-3.2.2-3, MQTT-3.2.2-6]
boolean isSessionAlreadyPresent = !msgCleanSessionFlag && result.alreadyStored;
final String clientIdUsed = clientId;
final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck()
final MqttMessageBuilders.ConnAckBuilder connAckBuilder = MqttMessageBuilders.connAck()
.returnCode(CONNECTION_ACCEPTED)
.sessionPresent(isSessionAlreadyPresent).build();
.sessionPresent(isSessionAlreadyPresent);
if (isProtocolVersion(msg, MqttVersion.MQTT_5)) {
// set properties for MQTT 5
final MqttProperties ackProperties = prepareConnAckProperties(serverGeneratedClientId, clientId);
connAckBuilder.properties(ackProperties);
}
final MqttConnAckMessage ackMessage = connAckBuilder.build();
channel.writeAndFlush(ackMessage).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Expand Down Expand Up @@ -243,6 +264,24 @@ public void operationComplete(ChannelFuture future) throws Exception {
});
}

private MqttProperties prepareConnAckProperties(boolean serverGeneratedClientId, String clientId) {
final MqttMessageBuilders.ConnAckPropertiesBuilder builder = new MqttMessageBuilders.ConnAckPropertiesBuilder();
// default maximumQos is 2, [MQTT-3.2.2-10]
// unlimited maximumPacketSize inside however the protocol limit
if (serverGeneratedClientId) {
builder.assignedClientId(clientId);
}

return builder
.sessionExpiryInterval(BrokerConstants.INFINITE_SESSION_EXPIRY)
.receiveMaximum(INFLIGHT_WINDOW_SIZE)
.retainAvailable(true)
.wildcardSubscriptionAvailable(true)
.subscriptionIdentifiersAvailable(false)
.sharedSubscriptionAvailable(false)
.build();
}

private void setupInflightResender(Channel channel) {
channel.pipeline()
.addFirst("inflightResender", new InflightResender(5_000, TimeUnit.MILLISECONDS));
Expand All @@ -268,7 +307,11 @@ private void setIdleTime(ChannelPipeline pipeline, int idleTime) {
}

private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) {
return msg.variableHeader().version() != version.protocolLevel();
return !isProtocolVersion(msg, version);
}

private boolean isProtocolVersion(MqttConnectMessage msg, MqttVersion version) {
return msg.variableHeader().version() == version.protocolLevel();
}

private void abortConnection(MqttConnectReturnCode returnCode) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.moquette.integration.mqtt5;

import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.integration.IntegrationUtils;
import io.moquette.testclient.Client;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Properties;

abstract class AbstractServerIntegrationTest {
Server broker;
IConfig config;

@TempDir
Path tempFolder;
private String dbPath;

Client lowLevelClient;

abstract String clientName();

protected void startServer(String dbPath) throws IOException {
broker = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath);
config = new MemoryConfig(configProps);
broker.startServer(config);
}

@BeforeAll
public static void beforeTests() {
Awaitility.setDefaultTimeout(Durations.ONE_SECOND);
}

@BeforeEach
public void setUp() throws Exception {
dbPath = IntegrationUtils.tempH2Path(tempFolder);
startServer(dbPath);

lowLevelClient = new Client("localhost").clientId(clientName());
}

@AfterEach
public void tearDown() throws Exception {
stopServer();
}

private void stopServer() {
broker.stopServer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.moquette.integration.mqtt5;

import io.moquette.BrokerConstants;
import io.moquette.testclient.Client;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;

class ConnectAckTest extends AbstractServerIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(ConnectAckTest.class);
private MqttConnAckMessage connAck;

@Override
String clientName() {
return "client";
}

@BeforeEach
public void setUp() throws Exception {
super.setUp();

connAck = lowLevelClient.connectV5();
assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Client connected");
}

private <T> void verifyProperty(MqttPropertyType propertyType, MqttProperties props, T expectedValue, String comment) {
final MqttProperties.MqttProperty<Integer> property = props.getProperty(propertyType.value());
assertEquals(expectedValue, property.value(), comment);
}
private void verifyNotSet(MqttPropertyType propertyType, MqttProperties props, String message) {
assertNull(props.getProperty(propertyType.value()), message);
}

@Test
public void testAckResponseProperties() {
final MqttProperties ackProps = connAck.variableHeader().properties();
verifyProperty(MqttPropertyType.SESSION_EXPIRY_INTERVAL, ackProps, BrokerConstants.INFINITE_SESSION_EXPIRY, "Session expiry is infinite");
verifyProperty(MqttPropertyType.RECEIVE_MAXIMUM, ackProps, INFLIGHT_WINDOW_SIZE, "Receive maximum property must equals flight window size");
verifyNotSet(MqttPropertyType.MAXIMUM_QOS, ackProps, "Maximum QoS is not set => QoS 2 ready");
verifyProperty(MqttPropertyType.RETAIN_AVAILABLE, ackProps, 1, "Retain feature is available");
verifyNotSet(MqttPropertyType.MAXIMUM_PACKET_SIZE, ackProps, "Maximum packet size is the one defined by specs");
verifyProperty(MqttPropertyType.TOPIC_ALIAS_MAXIMUM, ackProps, 0, "No topic alias available");
verifyProperty(MqttPropertyType.WILDCARD_SUBSCRIPTION_AVAILABLE, ackProps, 1, "Wildcard subscription feature is available");
verifyProperty(MqttPropertyType.SUBSCRIPTION_IDENTIFIER_AVAILABLE, ackProps, 0, "Subscription feature is NOT available");
verifyProperty(MqttPropertyType.SHARED_SUBSCRIPTION_AVAILABLE, ackProps, 0, "Shared subscription feature is NOT available");
verifyNotSet(MqttPropertyType.AUTHENTICATION_METHOD, ackProps, "No auth method available");
verifyNotSet(MqttPropertyType.AUTHENTICATION_DATA, ackProps, "No auth data available");
}

@Test
public void testAssignedClientIdentifier() {
Client unnamedClient = new Client("localhost").clientId("");
connAck = unnamedClient.connectV5();
assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Client connected");
final MqttProperties ackProps = connAck.variableHeader().properties();
final MqttProperties.MqttProperty<String> property = ackProps.getProperty(MqttPropertyType.ASSIGNED_CLIENT_IDENTIFIER.value());
final int clientServerGeneratedSize = 32;
assertEquals(clientServerGeneratedSize, property.value().length(), "Server assigned client ID");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,77 +5,30 @@
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck;
import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import io.moquette.broker.Server;
import io.moquette.broker.config.IConfig;
import io.moquette.broker.config.MemoryConfig;
import io.moquette.integration.IntegrationUtils;
import io.moquette.testclient.Client;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import org.awaitility.Awaitility;
import org.awaitility.Durations;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

public class ConnectTest {
class ConnectTest extends AbstractServerIntegrationTest {
private static final Logger LOG = LoggerFactory.getLogger(ConnectTest.class);

Server broker;
IConfig config;

@TempDir
Path tempFolder;
private String dbPath;
private Client lowLevelClient;

protected void startServer(String dbPath) throws IOException {
broker = new Server();
final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath);
config = new MemoryConfig(configProps);
broker.startServer(config);
}

@BeforeAll
public static void beforeTests() {
Awaitility.setDefaultTimeout(Durations.ONE_SECOND);
}

@BeforeEach
public void setUp() throws Exception {
dbPath = IntegrationUtils.tempH2Path(tempFolder);
startServer(dbPath);

lowLevelClient = new Client("localhost").clientId("subscriber");
}

@AfterEach
public void tearDown() throws Exception {
stopServer();
}

private void stopServer() {
broker.stopServer();
@Override
String clientName() {
return "subscriber";
}

@Test
Expand All @@ -102,7 +55,7 @@ public void sendConnectOnDisconnectedConnection() {
lowLevelClient.connectV5();
fail("Connect on Disconnected TCP socket can't happen");
} catch (RuntimeException rex) {
assertEquals("Cannot receive ConnAck in 200 ms", rex.getMessage());
assertEquals("Cannot receive ConnAck in 2 s", rex.getMessage());
}
}

Expand Down
Loading

0 comments on commit e8f43b9

Please sign in to comment.