From da3918c3bc54c136a350c5f64dfa8462121df8d9 Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Mon, 2 Dec 2019 07:20:53 +0300 Subject: [PATCH 1/3] [broker-26] fix connect options for connect in packet with mqtt 3.1.1 version --- .../AbstractMqttClientReleaseHandler.java | 2 +- .../packet/in/ConnectInPacketHandler.java | 16 ++- .../network/packet/in/ConnectInPacket.java | 11 +- .../network/packet/in/MqttReadablePacket.java | 4 + .../ConnectSubscribePublishTest.groovy | 12 +- .../test/integration/ConnectionTest.groovy | 123 +++++++++++++++--- .../IntegrationSpecification.groovy | 34 ++++- .../test/integration/PublishRetryTest.groovy | 8 +- .../service/ClientIdRegistryTest.groovy | 2 +- .../service/MqttSessionServiceTest.groovy | 2 +- .../service/SubscribtionServiceTest.groovy | 2 +- 11 files changed, 174 insertions(+), 42 deletions(-) diff --git a/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java b/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java index 63d28824..96ad5094 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java @@ -34,7 +34,7 @@ public abstract class AbstractMqttClientReleaseHandler clientIdRegistry.register(newClientId) .map(ifTrue(newClientId, client::setClientId))); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java index 26e0db87..5930c932 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java @@ -1,10 +1,10 @@ package com.ss.mqtt.broker.network.packet.in; import com.ss.mqtt.broker.exception.ConnectionRejectException; -import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode; import com.ss.mqtt.broker.model.MqttPropertyConstants; import com.ss.mqtt.broker.model.MqttVersion; import com.ss.mqtt.broker.model.PacketProperty; +import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode; import com.ss.mqtt.broker.network.MqttConnection; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.mqtt.broker.util.DebugUtils; @@ -281,6 +281,15 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B hasUserName = NumberUtils.isSetBit(flags, 7); hasPassword = NumberUtils.isSetBit(flags, 6); + + if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal()) { + + // for mqtt < 5 we cannot have password without user + if (!hasUserName && hasPassword) { + throw new ConnectionRejectException(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD); + } + } + willFlag = NumberUtils.isSetBit(flags, 2); keepAlive = readUnsignedShort(buffer); } diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java index cb7d3530..3d97d67d 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/MqttReadablePacket.java @@ -29,6 +29,10 @@ public abstract class MqttReadablePacket extends AbstractReadablePacket { + static { + DebugUtils.registerIncludedFields("userProperties"); + } + @Getter @RequiredArgsConstructor private static class Utf8Decoder { diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy index 1637f9c7..c6a73d8c 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy @@ -14,9 +14,9 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { def "publisher should publish message QoS 0"() { given: def received = new AtomicReference() - def subscriber = buildClient() + def subscriber = buildMqtt5Client() def subscriberId = subscriber.getConfig().clientIdentifier.get()toString() - def publisher = buildClient() + def publisher = buildMqtt5Client() when: subscriber.connect().join() publisher.connect().join() @@ -47,9 +47,9 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { def "publisher should publish message QoS 1"() { given: def received = new AtomicReference() - def subscriber = buildClient() + def subscriber = buildMqtt5Client() def subscriberId = subscriber.getConfig().clientIdentifier.get()toString() - def publisher = buildClient() + def publisher = buildMqtt5Client() when: subscriber.connect().join() @@ -81,9 +81,9 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { def "publisher should publish message QoS 2"() { given: def received = new AtomicReference() - def subscriber = buildClient() + def subscriber = buildMqtt5Client() def subscriberId = subscriber.getConfig().clientIdentifier.get()toString() - def publisher = buildClient() + def publisher = buildMqtt5Client() when: subscriber.connect().join() diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy index 4bd8bc1d..f46cddc3 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy @@ -1,21 +1,36 @@ package com.ss.mqtt.broker.test.integration -import com.hivemq.client.mqtt.MqttClient +import com.hivemq.client.mqtt.mqtt3.exceptions.Mqtt3ConnAckException +import com.hivemq.client.mqtt.mqtt3.message.connect.connack.Mqtt3ConnAckReturnCode import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5ConnAckException import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode import com.ss.mqtt.broker.model.MqttPropertyConstants -import org.springframework.beans.factory.annotation.Autowired +import com.ss.mqtt.broker.model.QoS +import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode +import com.ss.mqtt.broker.network.packet.in.ConnectAckInPacket +import com.ss.mqtt.broker.network.packet.out.Connect311OutPacket +import com.ss.rlib.common.util.ArrayUtils +import java.nio.charset.StandardCharsets import java.util.concurrent.CompletionException class ConnectionTest extends IntegrationSpecification { - @Autowired - InetSocketAddress deviceNetworkAddress + def "client should connect to broker without user and pass using mqtt 3.1.1"() { + given: + def client = buildMqtt311Client() + when: + def result = client.connect().join() + then: + result.returnCode == Mqtt3ConnAckReturnCode.SUCCESS + !result.sessionPresent + cleanup: + client.disconnect().join() + } - def "subscriber should connect to broker without user and pass"() { + def "client should connect to broker without user and pass using mqtt 5"() { given: - def client = buildClient() + def client = buildMqtt5Client() when: def result = client.connect().join() then: @@ -32,9 +47,21 @@ class ConnectionTest extends IntegrationSpecification { client.disconnect().join() } - def "subscriber should connect to broker with user and pass"() { + def "client should connect to broker with user and pass using mqtt 3.1.1"() { given: - def client = buildClient('1') + def client = buildMqtt311Client() + when: + def result = connectWith(client, 'user1', 'password') + then: + result.returnCode == Mqtt3ConnAckReturnCode.SUCCESS + !result.sessionPresent + cleanup: + client.disconnect().join() + } + + def "client should connect to broker with user and pass using mqtt 5"() { + given: + def client = buildMqtt5Client() when: def result = connectWith(client, 'user1', 'password') then: @@ -51,9 +78,20 @@ class ConnectionTest extends IntegrationSpecification { client.disconnect().join() } - def "subscriber should connect to broker without providing a client id"() { + def "client should not connect to broker without providing a client id using mqtt 3.1.1"() { + given: + def client = buildMqtt311Client("") + when: + client.connect().join() + then: + def ex = thrown CompletionException + def cause = ex.cause as Mqtt3ConnAckException + cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.IDENTIFIER_REJECTED + } + + def "client should connect to broker without providing a client id using mqtt 5"() { given: - def client = buildClient("") + def client = buildMqtt5Client("") when: def result = client.connect().join() then: @@ -64,15 +102,22 @@ class ConnectionTest extends IntegrationSpecification { client.disconnect().join() } - def "subscriber should not connect to broker with invalid client id"(String clientId) { + def "client should not connect to broker with invalid client id using mqtt 3.1.1"(String clientId) { + given: + def client = buildMqtt311Client(clientId) + when: + client.connect().join() + then: + def ex = thrown CompletionException + def cause = ex.cause as Mqtt3ConnAckException + cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.IDENTIFIER_REJECTED + where: + clientId << ["!@#!@*()^&"] + } + + def "client should not connect to broker with invalid client id using mqtt 5"(String clientId) { given: - def client = MqttClient.builder() - .identifier(clientId) - .serverHost(deviceNetworkAddress.getHostName()) - .serverPort(deviceNetworkAddress.getPort()) - .useMqttVersion5() - .build() - .toAsync() + def client = buildMqtt5Client(clientId) when: client.connect().join() then: @@ -83,11 +128,47 @@ class ConnectionTest extends IntegrationSpecification { clientId << ["!@#!@*()^&"] } - def "subscriber should not connect to broker with wrong pass"() { + def "client should not connect to broker with wrong pass using mqtt 3.1.1"() { + given: + def client = buildMqtt311Client() + when: + connectWith(client, "user", "wrongPassword") + then: + def ex = thrown CompletionException + def cause = ex.cause as Mqtt3ConnAckException + cause.mqttMessage.returnCode == Mqtt3ConnAckReturnCode.BAD_USER_NAME_OR_PASSWORD + } + + def "client should not connect to broker without username and with pass using mqtt 3.1.1"() { + given: + def client = buildMqtt311MockClient() + def clientId = generateClientId() + when: + + client.connect() + client.send(new Connect311OutPacket( + "", + "", + clientId, + "wrongPassword".getBytes(StandardCharsets.UTF_8), + ArrayUtils.EMPTY_BYTE_ARRAY, + QoS.AT_MOST_ONCE, + keepAlive, + false, + false + )) + + def connectAck = client.readNext() as ConnectAckInPacket + + then: + connectAck.reasonCode == ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD + } + + def "client should not connect to broker with wrong pass using mqtt 5"() { given: - def client = buildClient() + def client = buildMqtt5Client() when: - connectWith(client, 'user', 'wrongPassword') + connectWith(client, "user", "wrongPassword") then: def ex = thrown CompletionException def cause = ex.cause as Mqtt5ConnAckException diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy index 7c36e7e4..07b50195 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy @@ -1,6 +1,7 @@ package com.ss.mqtt.broker.test.integration import com.hivemq.client.mqtt.MqttClient +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient import com.ss.mqtt.broker.config.MqttConnectionConfig import com.ss.mqtt.broker.model.MqttPropertyConstants @@ -9,7 +10,6 @@ import com.ss.mqtt.broker.network.MqttConnection import com.ss.mqtt.broker.test.integration.config.MqttBrokerTestConfig import com.ss.mqtt.broker.test.mock.MqttMockClient import org.springframework.beans.factory.annotation.Autowired -import org.springframework.context.annotation.Bean import org.springframework.test.context.ContextConfiguration import spock.lang.Specification @@ -32,12 +32,26 @@ class IntegrationSpecification extends Specification { @Autowired MqttConnectionConfig deviceConnectionConfig - - def buildClient() { - return buildClient(generateClientId()) + + def buildMqtt311Client() { + return buildMqtt311Client(generateClientId()) + } + + def buildMqtt5Client() { + return buildMqtt5Client(generateClientId()) } - def buildClient(String clientId) { + def buildMqtt311Client(String clientId) { + return MqttClient.builder() + .identifier(clientId) + .serverHost(deviceNetworkAddress.getHostName()) + .serverPort(deviceNetworkAddress.getPort()) + .useMqttVersion3() + .build() + .toAsync() + } + + def buildMqtt5Client(String clientId) { return MqttClient.builder() .identifier(clientId) .serverHost(deviceNetworkAddress.getHostName()) @@ -55,6 +69,16 @@ class IntegrationSpecification extends Specification { return prefix + "_" + idGenerator.incrementAndGet() } + def connectWith(Mqtt3AsyncClient client, String user, String pass) { + return client.connectWith() + .simpleAuth() + .username(user) + .password(pass.getBytes(encoding)) + .applySimpleAuth() + .send() + .join() + } + def connectWith(Mqtt5AsyncClient client, String user, String pass) { return client.connectWith() .simpleAuth() diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy index 8b906e44..83653aa7 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/PublishRetryTest.groovy @@ -30,7 +30,7 @@ class PublishRetryTest extends IntegrationSpecification { def "mqtt 3.1.1 client should be generate session with one pending QoS 1 packet"() { given: - def publisher = buildClient() + def publisher = buildMqtt5Client() def subscriber = buildMqtt311MockClient() def subscriberId = generateClientId() when: @@ -91,7 +91,7 @@ class PublishRetryTest extends IntegrationSpecification { def "mqtt 5 client should be generate session with one pending QoS 1 packet"() { given: - def publisher = buildClient() + def publisher = buildMqtt5Client() def subscriber = buildMqtt5MockClient() def subscriberId = generateClientId() when: @@ -152,7 +152,7 @@ class PublishRetryTest extends IntegrationSpecification { def "mqtt 3.1.1 client should be generate session with one pending QoS 2 packet"() { given: - def publisher = buildClient() + def publisher = buildMqtt5Client() def subscriber = buildMqtt311MockClient() def subscriberId = generateClientId() when: @@ -234,7 +234,7 @@ class PublishRetryTest extends IntegrationSpecification { def "mqtt 5 client should be generate session with one pending QoS 2 packet"() { given: - def publisher = buildClient() + def publisher = buildMqtt5Client() def subscriber = buildMqtt5MockClient() def subscriberId = generateClientId() when: diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy index 7a02232f..60e269e5 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy @@ -109,7 +109,7 @@ class ClientIdRegistryTest extends IntegrationSpecification { def "subscriber should register its client id on connect and unregister on disconnect"() { given: def clientId = clientIdRegistry.generate().block() - def client = buildClient(clientId) + def client = buildMqtt5Client(clientId) when: def result = client.connect().join() then: diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy index 57c9bd1d..01469c60 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy @@ -21,7 +21,7 @@ class MqttSessionServiceTest extends IntegrationSpecification { def "subscriber should create and re-use mqtt session"() { given: def clientId = clientIdRegistry.generate().block() - def client = buildClient(clientId) + def client = buildMqtt5Client(clientId) when: def shouldNoSession = mqttSessionService.restore(clientId).block() def result = client.connect().join() diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/SubscribtionServiceTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/SubscribtionServiceTest.groovy index b521cd36..7ff1436a 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/SubscribtionServiceTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/SubscribtionServiceTest.groovy @@ -24,7 +24,7 @@ class SubscribtionServiceTest extends IntegrationSpecification { def "should clear/restore topic subscribers after disconnect/reconnect"() { given: def clientId = clientIdRegistry.generate().block() - def subscriber = buildClient(clientId) + def subscriber = buildMqtt5Client(clientId) when: connectAndSubscribe(subscriber, true, topicFilter) def matches = subscriptionService.topicSubscribers.matches(TopicName.from(topicFilter)) From ebb468fbb3faef6f0bc99107c34826c17df8924c Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Mon, 2 Dec 2019 09:28:39 +0300 Subject: [PATCH 2/3] [broker-26] add additional publish/subscribe tests for mqtt 3.1.1 --- .../ConnectSubscribePublishTest.groovy | 132 +++++++++++++++++- 1 file changed, 129 insertions(+), 3 deletions(-) diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy index c6a73d8c..81435f16 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy @@ -1,6 +1,10 @@ package com.ss.mqtt.broker.test.integration import com.hivemq.client.mqtt.datatypes.MqttQos +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient +import com.hivemq.client.mqtt.mqtt3.message.Mqtt3MessageType +import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish +import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient import com.hivemq.client.mqtt.mqtt5.message.Mqtt5MessageType import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator @@ -11,7 +15,40 @@ import java.util.concurrent.atomic.AtomicReference class ConnectSubscribePublishTest extends IntegrationSpecification { - def "publisher should publish message QoS 0"() { + def "publisher should publish message QoS 0 using mqtt 3.1.1"() { + given: + def received = new AtomicReference() + def subscriber = buildMqtt311Client() + def subscriberId = subscriber.getConfig().clientIdentifier.get()toString() + def publisher = buildMqtt311Client() + when: + subscriber.connect().join() + publisher.connect().join() + + def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received) + def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE) + + Thread.sleep(100) + then: + noExceptionThrown() + + subscribeResult != null + subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_0) + subscribeResult.type == Mqtt3MessageType.SUBACK + + publishResult != null + publishResult.qos == MqttQos.AT_MOST_ONCE + publishResult.type == Mqtt3MessageType.PUBLISH + + received.get() != null + received.get().qos == MqttQos.AT_MOST_ONCE + received.get().type == Mqtt3MessageType.PUBLISH + cleanup: + subscriber.disconnect() + publisher.disconnect() + } + + def "publisher should publish message QoS 0 using mqtt 5"() { given: def received = new AtomicReference() def subscriber = buildMqtt5Client() @@ -44,7 +81,40 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { publisher.disconnect() } - def "publisher should publish message QoS 1"() { + def "publisher should publish message QoS 1 using mqtt 3.1.1"() { + given: + def received = new AtomicReference() + def subscriber = buildMqtt311Client() + def subscriberId = subscriber.getConfig().clientIdentifier.get()toString() + def publisher = buildMqtt311Client() + when: + subscriber.connect().join() + publisher.connect().join() + + def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received) + def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE) + + Thread.sleep(100) + then: + noExceptionThrown() + + subscribeResult != null + subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_1) + subscribeResult.type == Mqtt3MessageType.SUBACK + + publishResult != null + publishResult.qos == MqttQos.AT_LEAST_ONCE + publishResult.type == Mqtt3MessageType.PUBLISH + + received.get() != null + received.get().qos == MqttQos.AT_LEAST_ONCE + received.get().type == Mqtt3MessageType.PUBLISH + cleanup: + subscriber.disconnect() + publisher.disconnect() + } + + def "publisher should publish message QoS 1 using mqtt 5"() { given: def received = new AtomicReference() def subscriber = buildMqtt5Client() @@ -78,7 +148,40 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { publisher.disconnect().join() } - def "publisher should publish message QoS 2"() { + def "publisher should publish message QoS 2 using mqtt 3.1.1"() { + given: + def received = new AtomicReference() + def subscriber = buildMqtt311Client() + def subscriberId = subscriber.getConfig().clientIdentifier.get()toString() + def publisher = buildMqtt311Client() + when: + subscriber.connect().join() + publisher.connect().join() + + def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received) + def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE) + + Thread.sleep(100) + then: + noExceptionThrown() + + subscribeResult != null + subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_2) + subscribeResult.type == Mqtt3MessageType.SUBACK + + publishResult != null + publishResult.qos == MqttQos.EXACTLY_ONCE + publishResult.type == Mqtt3MessageType.PUBLISH + + received.get() != null + received.get().qos == MqttQos.EXACTLY_ONCE + received.get().type == Mqtt3MessageType.PUBLISH + cleanup: + subscriber.disconnect() + publisher.disconnect() + } + + def "publisher should publish message QoS 2 using mqtt 5"() { given: def received = new AtomicReference() def subscriber = buildMqtt5Client() @@ -135,4 +238,27 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { .send() .join() } + + def publish(Mqtt3AsyncClient publisher, String subscriberId, MqttQos qos) { + return publisher.publishWith() + .topic("test/$subscriberId") + .qos(qos) + .payload(publishPayload) + .send() + .join() + } + + def subscribe( + Mqtt3AsyncClient subscriber, + String subscriberId, + MqttQos qos, + AtomicReference received + ) { + return subscriber.subscribeWith() + .topicFilter("test/$subscriberId") + .qos(qos) + .callback({ publish -> received.set(publish) }) + .send() + .join() + } } From b79e1a5352e0daf5c6cc503b22b9dc130af3faa1 Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Sat, 14 Dec 2019 16:22:33 +0300 Subject: [PATCH 3/3] [broker-26] fixes for code review --- .../network/packet/in/ConnectInPacket.java | 9 +++------ .../ConnectSubscribePublishTest.groovy | 20 +++++++++---------- 2 files changed, 13 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java index 5930c932..b0494d20 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/ConnectInPacket.java @@ -282,12 +282,9 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B hasUserName = NumberUtils.isSetBit(flags, 7); hasPassword = NumberUtils.isSetBit(flags, 6); - if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal()) { - - // for mqtt < 5 we cannot have password without user - if (!hasUserName && hasPassword) { - throw new ConnectionRejectException(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD); - } + // for mqtt < 5 we cannot have password without user + if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal() && !hasUserName && hasPassword) { + throw new ConnectionRejectException(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD); } willFlag = NumberUtils.isSetBit(flags, 2); diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy index 19e5c5ff..bf836053 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy @@ -44,8 +44,8 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { received.get().qos == MqttQos.AT_MOST_ONCE received.get().type == Mqtt3MessageType.PUBLISH cleanup: - subscriber.disconnect() - publisher.disconnect() + subscriber.disconnect().join() + publisher.disconnect().join() } def "publisher should publish message QoS 0 using mqtt 5"() { @@ -77,8 +77,8 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { received.get().qos == MqttQos.AT_MOST_ONCE received.get().type == Mqtt5MessageType.PUBLISH cleanup: - subscriber.disconnect() - publisher.disconnect() + subscriber.disconnect().join() + publisher.disconnect().join() } def "publisher should publish message QoS 1 using mqtt 3.1.1"() { @@ -110,8 +110,8 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { received.get().qos == MqttQos.AT_LEAST_ONCE received.get().type == Mqtt3MessageType.PUBLISH cleanup: - subscriber.disconnect() - publisher.disconnect() + subscriber.disconnect().join() + publisher.disconnect().join() } def "publisher should publish message QoS 1 using mqtt 5"() { @@ -177,8 +177,8 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { received.get().qos == MqttQos.EXACTLY_ONCE received.get().type == Mqtt3MessageType.PUBLISH cleanup: - subscriber.disconnect() - publisher.disconnect() + subscriber.disconnect().join() + publisher.disconnect().join() } def "publisher should publish message QoS 2 using mqtt 5"() { @@ -211,8 +211,8 @@ class ConnectSubscribePublishTest extends IntegrationSpecification { received.get().qos == MqttQos.EXACTLY_ONCE received.get().type == Mqtt5MessageType.PUBLISH cleanup: - subscriber.disconnect() - publisher.disconnect() + subscriber.disconnect().join() + publisher.disconnect().join() } def publish(Mqtt5AsyncClient publisher, String subscriberId, MqttQos qos) {