From e8f43b995e1917dcffc0dca4e9a47e066a1e8ef1 Mon Sep 17 00:00:00 2001 From: Andrea Selva Date: Sun, 29 Jan 2023 11:31:19 +0100 Subject: [PATCH] Adds CONNACK properties, in particular return which capabilities are actually enabled (#709) * Added CONNACK capabilities * Extracted common shared code in abstract superclass * Fixed empty clientId management for MQTT5 * Added checks for majority of CONNACK properties, in particular assignedClientId * Introduce isProtocolVersion, and redefine isNotProtocolVersion --- .../java/io/moquette/BrokerConstants.java | 1 + .../io/moquette/broker/MQTTConnection.java | 77 +++++++++++++++---- .../mqtt5/AbstractServerIntegrationTest.java | 59 ++++++++++++++ .../integration/mqtt5/ConnectAckTest.java | 71 +++++++++++++++++ .../integration/mqtt5/ConnectTest.java | 57 ++------------ .../java/io/moquette/testclient/Client.java | 9 ++- 6 files changed, 202 insertions(+), 72 deletions(-) create mode 100644 broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java create mode 100644 broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java diff --git a/broker/src/main/java/io/moquette/BrokerConstants.java b/broker/src/main/java/io/moquette/BrokerConstants.java index 7cb58627c..526e480a0 100644 --- a/broker/src/main/java/io/moquette/BrokerConstants.java +++ b/broker/src/main/java/io/moquette/BrokerConstants.java @@ -96,6 +96,7 @@ public final class BrokerConstants { public static final int FLIGHT_BEFORE_RESEND_MS = 5_000; public static final int INFLIGHT_WINDOW_SIZE = 10; + public static final int INFINITE_SESSION_EXPIRY = 0xFFFFFFFF; private BrokerConstants() { } diff --git a/broker/src/main/java/io/moquette/broker/MQTTConnection.java b/broker/src/main/java/io/moquette/broker/MQTTConnection.java index ae66f3f8e..1f73702f9 100644 --- a/broker/src/main/java/io/moquette/broker/MQTTConnection.java +++ b/broker/src/main/java/io/moquette/broker/MQTTConnection.java @@ -15,6 +15,7 @@ */ package io.moquette.broker; +import io.moquette.BrokerConstants; import io.moquette.broker.subscriptions.Topic; import io.moquette.broker.security.IAuthenticator; import io.netty.buffer.ByteBuf; @@ -29,16 +30,23 @@ import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; -import java.util.*; +import java.util.List; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE; import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE; import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE; -import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE; +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION; import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from; -import static io.netty.handler.codec.mqtt.MqttQoS.*; +import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE; +import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; final class MQTTConnection { @@ -151,22 +159,28 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { return PostOffice.RouteResult.failed(clientId); } final boolean cleanSession = msg.variableHeader().isCleanSession(); + final boolean serverGeneratedClientId; if (clientId == null || clientId.length() == 0) { - if (!brokerConfig.isAllowZeroByteClientId()) { - LOG.info("Broker doesn't permit MQTT empty client ID. Username: {}", username); - abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED); - return PostOffice.RouteResult.failed(clientId); - } + if (isNotProtocolVersion(msg, MqttVersion.MQTT_5)) { + if (!brokerConfig.isAllowZeroByteClientId()) { + LOG.info("Broker doesn't permit MQTT empty client ID. Username: {}", username); + abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED); + return PostOffice.RouteResult.failed(clientId); + } - if (!cleanSession) { - LOG.info("MQTT client ID cannot be empty for persistent session. Username: {}", username); - abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED); - return PostOffice.RouteResult.failed(clientId); + if (!cleanSession) { + LOG.info("MQTT client ID cannot be empty for persistent session. Username: {}", username); + abortConnection(CONNECTION_REFUSED_IDENTIFIER_REJECTED); + return PostOffice.RouteResult.failed(clientId); + } } // Generating client id. clientId = UUID.randomUUID().toString().replace("-", ""); + serverGeneratedClientId = true; LOG.debug("Client has connected with integration generated id: {}, username: {}", clientId, username); + } else { + serverGeneratedClientId = false; } if (!login(msg, clientId)) { @@ -177,7 +191,7 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { final String sessionId = clientId; return postOffice.routeCommand(clientId, "CONN", () -> { - executeConnect(msg, sessionId); + executeConnect(msg, sessionId, serverGeneratedClientId); return null; }); } @@ -185,7 +199,7 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) { /** * Invoked by the Session's event loop. * */ - private void executeConnect(MqttConnectMessage msg, String clientId) { + private void executeConnect(MqttConnectMessage msg, String clientId, boolean serverGeneratedClientId) { final SessionRegistry.SessionCreationResult result; try { LOG.trace("Binding MQTTConnection to session"); @@ -200,11 +214,18 @@ private void executeConnect(MqttConnectMessage msg, String clientId) { NettyUtils.clientID(channel, clientId); final boolean msgCleanSessionFlag = msg.variableHeader().isCleanSession(); + // [MQTT-3.2.2-2, MQTT-3.2.2-3, MQTT-3.2.2-6] boolean isSessionAlreadyPresent = !msgCleanSessionFlag && result.alreadyStored; final String clientIdUsed = clientId; - final MqttConnAckMessage ackMessage = MqttMessageBuilders.connAck() + final MqttMessageBuilders.ConnAckBuilder connAckBuilder = MqttMessageBuilders.connAck() .returnCode(CONNECTION_ACCEPTED) - .sessionPresent(isSessionAlreadyPresent).build(); + .sessionPresent(isSessionAlreadyPresent); + if (isProtocolVersion(msg, MqttVersion.MQTT_5)) { + // set properties for MQTT 5 + final MqttProperties ackProperties = prepareConnAckProperties(serverGeneratedClientId, clientId); + connAckBuilder.properties(ackProperties); + } + final MqttConnAckMessage ackMessage = connAckBuilder.build(); channel.writeAndFlush(ackMessage).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { @@ -243,6 +264,24 @@ public void operationComplete(ChannelFuture future) throws Exception { }); } + private MqttProperties prepareConnAckProperties(boolean serverGeneratedClientId, String clientId) { + final MqttMessageBuilders.ConnAckPropertiesBuilder builder = new MqttMessageBuilders.ConnAckPropertiesBuilder(); + // default maximumQos is 2, [MQTT-3.2.2-10] + // unlimited maximumPacketSize inside however the protocol limit + if (serverGeneratedClientId) { + builder.assignedClientId(clientId); + } + + return builder + .sessionExpiryInterval(BrokerConstants.INFINITE_SESSION_EXPIRY) + .receiveMaximum(INFLIGHT_WINDOW_SIZE) + .retainAvailable(true) + .wildcardSubscriptionAvailable(true) + .subscriptionIdentifiersAvailable(false) + .sharedSubscriptionAvailable(false) + .build(); + } + private void setupInflightResender(Channel channel) { channel.pipeline() .addFirst("inflightResender", new InflightResender(5_000, TimeUnit.MILLISECONDS)); @@ -268,7 +307,11 @@ private void setIdleTime(ChannelPipeline pipeline, int idleTime) { } private boolean isNotProtocolVersion(MqttConnectMessage msg, MqttVersion version) { - return msg.variableHeader().version() != version.protocolLevel(); + return !isProtocolVersion(msg, version); + } + + private boolean isProtocolVersion(MqttConnectMessage msg, MqttVersion version) { + return msg.variableHeader().version() == version.protocolLevel(); } private void abortConnection(MqttConnectReturnCode returnCode) { diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java new file mode 100644 index 000000000..413c851e3 --- /dev/null +++ b/broker/src/test/java/io/moquette/integration/mqtt5/AbstractServerIntegrationTest.java @@ -0,0 +1,59 @@ +package io.moquette.integration.mqtt5; + +import io.moquette.broker.Server; +import io.moquette.broker.config.IConfig; +import io.moquette.broker.config.MemoryConfig; +import io.moquette.integration.IntegrationUtils; +import io.moquette.testclient.Client; +import org.awaitility.Awaitility; +import org.awaitility.Durations; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Properties; + +abstract class AbstractServerIntegrationTest { + Server broker; + IConfig config; + + @TempDir + Path tempFolder; + private String dbPath; + + Client lowLevelClient; + + abstract String clientName(); + + protected void startServer(String dbPath) throws IOException { + broker = new Server(); + final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath); + config = new MemoryConfig(configProps); + broker.startServer(config); + } + + @BeforeAll + public static void beforeTests() { + Awaitility.setDefaultTimeout(Durations.ONE_SECOND); + } + + @BeforeEach + public void setUp() throws Exception { + dbPath = IntegrationUtils.tempH2Path(tempFolder); + startServer(dbPath); + + lowLevelClient = new Client("localhost").clientId(clientName()); + } + + @AfterEach + public void tearDown() throws Exception { + stopServer(); + } + + private void stopServer() { + broker.stopServer(); + } +} diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java new file mode 100644 index 000000000..cec940548 --- /dev/null +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectAckTest.java @@ -0,0 +1,71 @@ +package io.moquette.integration.mqtt5; + +import io.moquette.BrokerConstants; +import io.moquette.testclient.Client; +import io.netty.handler.codec.mqtt.MqttConnAckMessage; +import io.netty.handler.codec.mqtt.MqttConnectReturnCode; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +class ConnectAckTest extends AbstractServerIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(ConnectAckTest.class); + private MqttConnAckMessage connAck; + + @Override + String clientName() { + return "client"; + } + + @BeforeEach + public void setUp() throws Exception { + super.setUp(); + + connAck = lowLevelClient.connectV5(); + assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Client connected"); + } + + private void verifyProperty(MqttPropertyType propertyType, MqttProperties props, T expectedValue, String comment) { + final MqttProperties.MqttProperty property = props.getProperty(propertyType.value()); + assertEquals(expectedValue, property.value(), comment); + } + private void verifyNotSet(MqttPropertyType propertyType, MqttProperties props, String message) { + assertNull(props.getProperty(propertyType.value()), message); + } + + @Test + public void testAckResponseProperties() { + final MqttProperties ackProps = connAck.variableHeader().properties(); + verifyProperty(MqttPropertyType.SESSION_EXPIRY_INTERVAL, ackProps, BrokerConstants.INFINITE_SESSION_EXPIRY, "Session expiry is infinite"); + verifyProperty(MqttPropertyType.RECEIVE_MAXIMUM, ackProps, INFLIGHT_WINDOW_SIZE, "Receive maximum property must equals flight window size"); + verifyNotSet(MqttPropertyType.MAXIMUM_QOS, ackProps, "Maximum QoS is not set => QoS 2 ready"); + verifyProperty(MqttPropertyType.RETAIN_AVAILABLE, ackProps, 1, "Retain feature is available"); + verifyNotSet(MqttPropertyType.MAXIMUM_PACKET_SIZE, ackProps, "Maximum packet size is the one defined by specs"); + verifyProperty(MqttPropertyType.TOPIC_ALIAS_MAXIMUM, ackProps, 0, "No topic alias available"); + verifyProperty(MqttPropertyType.WILDCARD_SUBSCRIPTION_AVAILABLE, ackProps, 1, "Wildcard subscription feature is available"); + verifyProperty(MqttPropertyType.SUBSCRIPTION_IDENTIFIER_AVAILABLE, ackProps, 0, "Subscription feature is NOT available"); + verifyProperty(MqttPropertyType.SHARED_SUBSCRIPTION_AVAILABLE, ackProps, 0, "Shared subscription feature is NOT available"); + verifyNotSet(MqttPropertyType.AUTHENTICATION_METHOD, ackProps, "No auth method available"); + verifyNotSet(MqttPropertyType.AUTHENTICATION_DATA, ackProps, "No auth data available"); + } + + @Test + public void testAssignedClientIdentifier() { + Client unnamedClient = new Client("localhost").clientId(""); + connAck = unnamedClient.connectV5(); + assertEquals(MqttConnectReturnCode.CONNECTION_ACCEPTED, connAck.variableHeader().connectReturnCode(), "Client connected"); + final MqttProperties ackProps = connAck.variableHeader().properties(); + final MqttProperties.MqttProperty property = ackProps.getProperty(MqttPropertyType.ASSIGNED_CLIENT_IDENTIFIER.value()); + final int clientServerGeneratedSize = 32; + assertEquals(clientServerGeneratedSize, property.value().length(), "Server assigned client ID"); + + } +} diff --git a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java index ea6bd4d15..c2f6be487 100644 --- a/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java +++ b/broker/src/test/java/io/moquette/integration/mqtt5/ConnectTest.java @@ -5,77 +5,30 @@ import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAck; import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; -import io.moquette.broker.Server; -import io.moquette.broker.config.IConfig; -import io.moquette.broker.config.MemoryConfig; -import io.moquette.integration.IntegrationUtils; import io.moquette.testclient.Client; import io.netty.handler.codec.mqtt.MqttConnAckMessage; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; -import io.netty.handler.codec.mqtt.MqttMessage; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttQoS; -import io.netty.handler.codec.mqtt.MqttSubAckMessage; import org.awaitility.Awaitility; -import org.awaitility.Durations; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.nio.file.Path; -import java.util.Optional; -import java.util.Properties; import java.util.concurrent.TimeUnit; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; -public class ConnectTest { +class ConnectTest extends AbstractServerIntegrationTest { private static final Logger LOG = LoggerFactory.getLogger(ConnectTest.class); - Server broker; - IConfig config; - - @TempDir - Path tempFolder; - private String dbPath; - private Client lowLevelClient; - - protected void startServer(String dbPath) throws IOException { - broker = new Server(); - final Properties configProps = IntegrationUtils.prepareTestProperties(dbPath); - config = new MemoryConfig(configProps); - broker.startServer(config); - } - - @BeforeAll - public static void beforeTests() { - Awaitility.setDefaultTimeout(Durations.ONE_SECOND); - } - - @BeforeEach - public void setUp() throws Exception { - dbPath = IntegrationUtils.tempH2Path(tempFolder); - startServer(dbPath); - - lowLevelClient = new Client("localhost").clientId("subscriber"); - } - - @AfterEach - public void tearDown() throws Exception { - stopServer(); - } - - private void stopServer() { - broker.stopServer(); + @Override + String clientName() { + return "subscriber"; } @Test @@ -102,7 +55,7 @@ public void sendConnectOnDisconnectedConnection() { lowLevelClient.connectV5(); fail("Connect on Disconnected TCP socket can't happen"); } catch (RuntimeException rex) { - assertEquals("Cannot receive ConnAck in 200 ms", rex.getMessage()); + assertEquals("Cannot receive ConnAck in 2 s", rex.getMessage()); } } diff --git a/broker/src/test/java/io/moquette/testclient/Client.java b/broker/src/test/java/io/moquette/testclient/Client.java index 95fbf906c..e98174c40 100644 --- a/broker/src/test/java/io/moquette/testclient/Client.java +++ b/broker/src/test/java/io/moquette/testclient/Client.java @@ -135,8 +135,11 @@ public void connect() { } public MqttConnAckMessage connectV5() { - MqttConnectMessage connectMessage = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_5) - .clientId(clientId) + final MqttMessageBuilders.ConnectBuilder builder = MqttMessageBuilders.connect().protocolVersion(MqttVersion.MQTT_5); + if (clientId != null) { + builder.clientId(clientId); + } + MqttConnectMessage connectMessage = builder .keepAlive(2) // secs .willFlag(false) .willQoS(MqttQoS.AT_MOST_ONCE) @@ -166,7 +169,7 @@ private MqttConnAckMessage doConnect(MqttConnectMessage connectMessage) { } if (waitElapsed) { - throw new RuntimeException("Cannot receive ConnAck in 200 ms"); + throw new RuntimeException("Cannot receive ConnAck in 2 s"); } final MqttMessage connAckMessage = this.receivedMsg.get();