diff --git a/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java b/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java index 63d28824..96ad5094 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java @@ -34,7 +34,7 @@ public abstract class AbstractMqttClientReleaseHandler clientIdRegistry.register(newClientId) .map(ifTrue(newClientId, client::setClientId))); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java index 26e0db87..b0494d20 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java @@ -1,10 +1,10 @@ package com.ss.mqtt.broker.network.packet.in; import com.ss.mqtt.broker.exception.ConnectionRejectException; -import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode; import com.ss.mqtt.broker.model.MqttPropertyConstants; import com.ss.mqtt.broker.model.MqttVersion; import com.ss.mqtt.broker.model.PacketProperty; +import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode; import com.ss.mqtt.broker.network.MqttConnection; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.mqtt.broker.util.DebugUtils; @@ -281,6 +281,12 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B hasUserName = NumberUtils.isSetBit(flags, 7); hasPassword = NumberUtils.isSetBit(flags, 6); + + // for mqtt < 5 we cannot have password without user + if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal() && !hasUserName && hasPassword) { + throw new ConnectionRejectException(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + } + willFlag = NumberUtils.isSetBit(flags, 2); keepAlive = readUnsignedShort(buffer); } diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java index cb7d3530..3d97d67d 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java @@ -29,6 +29,10 @@ public abstract class MqttReadablePacket extends AbstractReadablePacket { + static { + DebugUtils.registerIncludedFields("userProperties"); + } + @Getter @RequiredArgsConstructor private static class Utf8Decoder { diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy index 2e27b36b..bf836053 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy @@ -1,6 +1,10 @@ package com.ss.mqtt.broker.test.integration import com.hivemq.client.mqtt.datatypes.MqttQos +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient +import com.hivemq.client.mqtt.mqtt3.message.Mqtt3MessageType +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish +import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient import com.hivemq.client.mqtt.mqtt5.message.Mqtt5MessageType import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator @@ -11,19 +15,52 @@ import java.util.concurrent.atomic.AtomicReference class ConnectSubscribePublishTest extends IntegrationSpecification { - def "publisher should publish message QoS 0"() { + def "publisher should publish message QoS 0 using mqtt 3.1.1"() { given: - def received = new AtomicReference() - def subscriber = buildClient() + def received = new AtomicReference() + def subscriber = buildMqtt311Client() def subscriberId = subscriber.getConfig().clientIdentifier.get().toString() - def publisher = buildClient() + def publisher = buildMqtt311Client() when: subscriber.connect().join() publisher.connect().join() + + def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received) + def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE) + + Thread.sleep(100) + then: + noExceptionThrown() + + subscribeResult != null + subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_0) + subscribeResult.type == Mqtt3MessageType.SUBACK + + publishResult != null + publishResult.qos == MqttQos.AT_MOST_ONCE + publishResult.type == Mqtt3MessageType.PUBLISH + + received.get() != null + received.get().qos == MqttQos.AT_MOST_ONCE + received.get().type == Mqtt3MessageType.PUBLISH + cleanup: + subscriber.disconnect().join() + publisher.disconnect().join() + } + def "publisher should publish message QoS 0 using mqtt 5"() { + given: + def received = new AtomicReference() + def subscriber = buildMqtt5Client() + def subscriberId = subscriber.getConfig().clientIdentifier.get().toString() + def publisher = buildMqtt5Client() + when: + subscriber.connect().join() + publisher.connect().join() + def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received) def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE) - + Thread.sleep(100) then: noExceptionThrown() @@ -31,33 +68,66 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { subscribeResult != null subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_0) subscribeResult.type == Mqtt5MessageType.SUBACK - + publishResult != null publishResult.publish.qos == MqttQos.AT_MOST_ONCE publishResult.publish.type == Mqtt5MessageType.PUBLISH - + received.get() != null received.get().qos == MqttQos.AT_MOST_ONCE received.get().type == Mqtt5MessageType.PUBLISH cleanup: - subscriber.disconnect() - publisher.disconnect() + subscriber.disconnect().join() + publisher.disconnect().join() } - def "publisher should publish message QoS 1"() { + def "publisher should publish message QoS 1 using mqtt 3.1.1"() { given: - def received = new AtomicReference() - def subscriber = buildClient() + def received = new AtomicReference() + def subscriber = buildMqtt311Client() def subscriberId = subscriber.getConfig().clientIdentifier.get().toString() - def publisher = buildClient() + def publisher = buildMqtt311Client() when: - subscriber.connect().join() publisher.connect().join() + + def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received) + def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE) + + Thread.sleep(100) + then: + noExceptionThrown() + + subscribeResult != null + subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_1) + subscribeResult.type == Mqtt3MessageType.SUBACK + + publishResult != null + publishResult.qos == MqttQos.AT_LEAST_ONCE + publishResult.type == Mqtt3MessageType.PUBLISH + + received.get() != null + received.get().qos == MqttQos.AT_LEAST_ONCE + received.get().type == Mqtt3MessageType.PUBLISH + cleanup: + subscriber.disconnect().join() + publisher.disconnect().join() + } + def "publisher should publish message QoS 1 using mqtt 5"() { + given: + def received = new AtomicReference() + def subscriber = buildMqtt5Client() + def subscriberId = subscriber.getConfig().clientIdentifier.get().toString() + def publisher = buildMqtt5Client() + when: + + subscriber.connect().join() + publisher.connect().join() + def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received) def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE) - + Thread.sleep(100) then: noExceptionThrown() @@ -69,7 +139,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { publishResult != null publishResult.publish.qos == MqttQos.AT_LEAST_ONCE publishResult.publish.type == Mqtt5MessageType.PUBLISH - + received.get() != null received.get().qos == MqttQos.AT_LEAST_ONCE received.get().type == Mqtt5MessageType.PUBLISH @@ -78,17 +148,50 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { publisher.disconnect().join() } - def "publisher should publish message QoS 2"() { + def "publisher should publish message QoS 2 using mqtt 3.1.1"() { + given: + def received = new AtomicReference() + def subscriber = buildMqtt311Client() + def subscriberId = subscriber.getConfig().clientIdentifier.get().toString() + def publisher = buildMqtt311Client() + when: + subscriber.connect().join() + publisher.connect().join() + + def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received) + def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE) + + Thread.sleep(100) + then: + noExceptionThrown() + + subscribeResult != null + subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_2) + subscribeResult.type == Mqtt3MessageType.SUBACK + + publishResult != null + publishResult.qos == MqttQos.EXACTLY_ONCE + publishResult.type == Mqtt3MessageType.PUBLISH + + received.get() != null + received.get().qos == MqttQos.EXACTLY_ONCE + received.get().type == Mqtt3MessageType.PUBLISH + cleanup: + subscriber.disconnect().join() + publisher.disconnect().join() + } + + def "publisher should publish message QoS 2 using mqtt 5"() { given: def received = new AtomicReference() - def subscriber = buildClient() + def subscriber = buildMqtt5Client() def subscriberId = subscriber.getConfig().clientIdentifier.get().toString() - def publisher = buildClient() + def publisher = buildMqtt5Client() when: subscriber.connect().join() publisher.connect().join() - + def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received) def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE) @@ -108,8 +211,8 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { received.get().qos == MqttQos.EXACTLY_ONCE received.get().type == Mqtt5MessageType.PUBLISH cleanup: - subscriber.disconnect() - publisher.disconnect() + subscriber.disconnect().join() + publisher.disconnect().join() } def publish(Mqtt5AsyncClient publisher, String subscriberId, MqttQos qos) { @@ -135,4 +238,27 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { .send() .join() } + + def publish(Mqtt3AsyncClient publisher, String subscriberId, MqttQos qos) { + return publisher.publishWith() + .topic("test/$subscriberId") + .qos(qos) + .payload(publishPayload) + .send() + .join() + } + + def subscribe( + Mqtt3AsyncClient subscriber, + String subscriberId, + MqttQos qos, + AtomicReference received + ) { + return subscriber.subscribeWith() + .topicFilter("test/$subscriberId") + .qos(qos) + .callback({ publish -> received.set(publish) }) + .send() + .join() + } } diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy index 4bd8bc1d..f46cddc3 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy @@ -1,21 +1,36 @@ package com.ss.mqtt.broker.test.integration -import com.hivemq.client.mqtt.MqttClient +import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3ConnAckException +import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode import com.ss.mqtt.broker.model.MqttPropertyConstants -import org.springframework.beans.factory.annotation.Autowired +import com.ss.mqtt.broker.model.QoS +import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode +import com.ss.mqtt.broker.network.packet.in.ConnectAckInPacket +import com.ss.mqtt.broker.network.packet.out.Connect311OutPacket +import com.ss.rlib.common.util.ArrayUtils +import java.nio.charset.StandardCharsets import java.util.concurrent.CompletionException class ConnectionTest extends IntegrationSpecification { - @Autowired - InetSocketAddress deviceNetworkAddress + def "client should connect to broker without user and pass using mqtt 3.1.1"() { + given: + def client = buildMqtt311Client() + when: + def result = client.connect().join() + then: + result.returnCode == Mqtt3ConnAckReturnCode.SUCCESS + !result.sessionPresent + cleanup: + client.disconnect().join() + } - def "subscriber should connect to broker without user and pass"() { + def "client should connect to broker without user and pass using mqtt 5"() { given: - def client = buildClient() + def client = buildMqtt5Client() when: def result = client.connect().join() then: @@ -32,9 +47,21 @@ class ConnectionTest extends IntegrationSpecification { client.disconnect().join() } - def "subscriber should connect to broker with user and pass"() { + def "client should connect to broker with user and pass using mqtt 3.1.1"() { given: - def client = buildClient('1') + def client = buildMqtt311Client() + when: + def result = connectWith(client, 'user1', 'password') + then: + result.returnCode == Mqtt3ConnAckReturnCode.SUCCESS + !result.sessionPresent + cleanup: + client.disconnect().join() + } + + def "client should connect to broker with user and pass using mqtt 5"() { + given: + def client = buildMqtt5Client() when: def result = connectWith(client, 'user1', 'password') then: @@ -51,9 +78,20 @@ class ConnectionTest extends IntegrationSpecification { client.disconnect().join() } - def "subscriber should connect to broker without providing a client id"() { + def "client should not connect to broker without providing a client id using mqtt 3.1.1"() { + given: + def client = buildMqtt311Client("") + when: + client.connect().join() + then: + def ex = thrown CompletionException + def cause = ex.cause as Mqtt3ConnAckException + cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.IDENTIFIER_REJECTED + } + + def "client should connect to broker without providing a client id using mqtt 5"() { given: - def client = buildClient("") + def client = buildMqtt5Client("") when: def result = client.connect().join() then: @@ -64,15 +102,22 @@ class ConnectionTest extends IntegrationSpecification { client.disconnect().join() } - def "subscriber should not connect to broker with invalid client id"(String clientId) { + def "client should not connect to broker with invalid client id using mqtt 3.1.1"(String clientId) { + given: + def client = buildMqtt311Client(clientId) + when: + client.connect().join() + then: + def ex = thrown CompletionException + def cause = ex.cause as Mqtt3ConnAckException + cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.IDENTIFIER_REJECTED + where: + clientId << ["!@#!@*()^&"] + } + + def "client should not connect to broker with invalid client id using mqtt 5"(String clientId) { given: - def client = MqttClient.builder() - .identifier(clientId) - .serverHost(deviceNetworkAddress.getHostName()) - .serverPort(deviceNetworkAddress.getPort()) - .useMqttVersion5() - .build() - .toAsync() + def client = buildMqtt5Client(clientId) when: client.connect().join() then: @@ -83,11 +128,47 @@ class ConnectionTest extends IntegrationSpecification { clientId << ["!@#!@*()^&"] } - def "subscriber should not connect to broker with wrong pass"() { + def "client should not connect to broker with wrong pass using mqtt 3.1.1"() { + given: + def client = buildMqtt311Client() + when: + connectWith(client, "user", "wrongPassword") + then: + def ex = thrown CompletionException + def cause = ex.cause as Mqtt3ConnAckException + cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.BAD_USER_NAME_OR_PASSWORD + } + + def "client should not connect to broker without username and with pass using mqtt 3.1.1"() { + given: + def client = buildMqtt311MockClient() + def clientId = generateClientId() + when: + + client.connect() + client.send(new Connect311OutPacket( + "", + "", + clientId, + "wrongPassword".getBytes(StandardCharsets.UTF_8), + ArrayUtils.EMPTY_BYTE_ARRAY, + QoS.AT_MOST_ONCE, + keepAlive, + false, + false + )) + + def connectAck = client.readNext() as ConnectAckInPacket + + then: + connectAck.reasonCode == ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD + } + + def "client should not connect to broker with wrong pass using mqtt 5"() { given: - def client = buildClient() + def client = buildMqtt5Client() when: - connectWith(client, 'user', 'wrongPassword') + connectWith(client, "user", "wrongPassword") then: def ex = thrown CompletionException def cause = ex.cause as Mqtt5ConnAckException diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy index 7c36e7e4..07b50195 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy @@ -1,6 +1,7 @@ package com.ss.mqtt.broker.test.integration import com.hivemq.client.mqtt.MqttClient +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient import com.ss.mqtt.broker.config.MqttConnectionConfig import com.ss.mqtt.broker.model.MqttPropertyConstants @@ -9,7 +10,6 @@ import com.ss.mqtt.broker.network.MqttConnection import com.ss.mqtt.broker.test.integration.config.MqttBrokerTestConfig import com.ss.mqtt.broker.test.mock.MqttMockClient import org.springframework.beans.factory.annotation.Autowired -import org.springframework.context.annotation.Bean import org.springframework.test.context.ContextConfiguration import spock.lang.Specification @@ -32,12 +32,26 @@ class IntegrationSpecification extends Specification { @Autowired MqttConnectionConfig deviceConnectionConfig - - def buildClient() { - return buildClient(generateClientId()) + + def buildMqtt311Client() { + return buildMqtt311Client(generateClientId()) + } + + def buildMqtt5Client() { + return buildMqtt5Client(generateClientId()) } - def buildClient(String clientId) { + def buildMqtt311Client(String clientId) { + return MqttClient.builder() + .identifier(clientId) + .serverHost(deviceNetworkAddress.getHostName()) + .serverPort(deviceNetworkAddress.getPort()) + .useMqttVersion3() + .build() + .toAsync() + } + + def buildMqtt5Client(String clientId) { return MqttClient.builder() .identifier(clientId) .serverHost(deviceNetworkAddress.getHostName()) @@ -55,6 +69,16 @@ class IntegrationSpecification extends Specification { return prefix + "_" + idGenerator.incrementAndGet() } + def connectWith(Mqtt3AsyncClient client, String user, String pass) { + return client.connectWith() + .simpleAuth() + .username(user) + .password(pass.getBytes(encoding)) + .applySimpleAuth() + .send() + .join() + } + def connectWith(Mqtt5AsyncClient client, String user, String pass) { return client.connectWith() .simpleAuth() diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy index 8b906e44..83653aa7 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy @@ -30,7 +30,7 @@ class PublishRetryTest extends IntegrationSpecification { def "mqtt 3.1.1 client should be generate session with one pending QoS 1 packet"() { given: - def publisher = buildClient() + def publisher = buildMqtt5Client() def subscriber = buildMqtt311MockClient() def subscriberId = generateClientId() when: @@ -91,7 +91,7 @@ class PublishRetryTest extends IntegrationSpecification { def "mqtt 5 client should be generate session with one pending QoS 1 packet"() { given: - def publisher = buildClient() + def publisher = buildMqtt5Client() def subscriber = buildMqtt5MockClient() def subscriberId = generateClientId() when: @@ -152,7 +152,7 @@ class PublishRetryTest extends IntegrationSpecification { def "mqtt 3.1.1 client should be generate session with one pending QoS 2 packet"() { given: - def publisher = buildClient() + def publisher = buildMqtt5Client() def subscriber = buildMqtt311MockClient() def subscriberId = generateClientId() when: @@ -234,7 +234,7 @@ class PublishRetryTest extends IntegrationSpecification { def "mqtt 5 client should be generate session with one pending QoS 2 packet"() { given: - def publisher = buildClient() + def publisher = buildMqtt5Client() def subscriber = buildMqtt5MockClient() def subscriberId = generateClientId() when: diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy index 7a02232f..60e269e5 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy @@ -109,7 +109,7 @@ class ClientIdRegistryTest extends IntegrationSpecification { def "subscriber should register its client id on connect and unregister on disconnect"() { given: def clientId = clientIdRegistry.generate().block() - def client = buildClient(clientId) + def client = buildMqtt5Client(clientId) when: def result = client.connect().join() then: diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy index 57c9bd1d..01469c60 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy @@ -21,7 +21,7 @@ class MqttSessionServiceTest extends IntegrationSpecification { def "subscriber should create and re-use mqtt session"() { given: def clientId = clientIdRegistry.generate().block() - def client = buildClient(clientId) + def client = buildMqtt5Client(clientId) when: def shouldNoSession = mqttSessionService.restore(clientId).block() def result = client.connect().join() diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/SubscribtionServiceTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/SubscribtionServiceTest.groovy index b521cd36..7ff1436a 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/SubscribtionServiceTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/SubscribtionServiceTest.groovy @@ -24,7 +24,7 @@ class SubscribtionServiceTest extends IntegrationSpecification { def "should clear/restore topic subscribers after disconnect/reconnect"() { given: def clientId = clientIdRegistry.generate().block() - def subscriber = buildClient(clientId) + def subscriber = buildMqtt5Client(clientId) when: connectAndSubscribe(subscriber, true, topicFilter) def matches = subscriptionService.topicSubscribers.matches(TopicName.from(topicFilter))