Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
client.setClientId(StringUtils.EMPTY);

if (StringUtils.isEmpty(clientId)) {
log.warn("This client {} is already released", client);
log.warn("This client {} is already released or rejected", client);
return Mono.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@
import com.ss.mqtt.broker.exception.ConnectionRejectException;
import com.ss.mqtt.broker.exception.MalformedPacketMqttException;
import com.ss.mqtt.broker.model.MqttSession;
import com.ss.mqtt.broker.model.MqttVersion;
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
import com.ss.mqtt.broker.network.packet.in.ConnectInPacket;
import com.ss.mqtt.broker.service.*;
import com.ss.mqtt.broker.service.AuthenticationService;
import com.ss.mqtt.broker.service.ClientIdRegistry;
import com.ss.mqtt.broker.service.MqttSessionService;
import com.ss.mqtt.broker.service.SubscriptionService;
import com.ss.rlib.common.util.StringUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
Expand Down Expand Up @@ -53,6 +57,16 @@ protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPa
return clientIdRegistry.register(requestedClientId)
.map(ifTrue(requestedClientId, client::setClientId));
} else {

var mqttVersion = client
.getConnection()
.getMqttVersion();

// we can't assign generated client if for mqtt version less than 5
if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal()) {
return Mono.just(false);
}

return clientIdRegistry.generate()
.flatMap(newClientId -> clientIdRegistry.register(newClientId)
.map(ifTrue(newClientId, client::setClientId)));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package com.ss.mqtt.broker.network.packet.in;

import com.ss.mqtt.broker.exception.ConnectionRejectException;
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
import com.ss.mqtt.broker.model.MqttPropertyConstants;
import com.ss.mqtt.broker.model.MqttVersion;
import com.ss.mqtt.broker.model.PacketProperty;
import com.ss.mqtt.broker.model.reason.code.ConnectAckReasonCode;
import com.ss.mqtt.broker.network.MqttConnection;
import com.ss.mqtt.broker.network.packet.PacketType;
import com.ss.mqtt.broker.util.DebugUtils;
Expand Down Expand Up @@ -281,6 +281,12 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B

hasUserName = NumberUtils.isSetBit(flags, 7);
hasPassword = NumberUtils.isSetBit(flags, 6);

// for mqtt < 5 we cannot have password without user
if (mqttVersion.ordinal() < MqttVersion.MQTT_5.ordinal() && !hasUserName && hasPassword) {
throw new ConnectionRejectException(ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD);
}

willFlag = NumberUtils.isSetBit(flags, 2);
keepAlive = readUnsignedShort(buffer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@

public abstract class MqttReadablePacket extends AbstractReadablePacket<MqttConnection> {

static {
DebugUtils.registerIncludedFields("userProperties");
}

@Getter
@RequiredArgsConstructor
private static class Utf8Decoder {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package com.ss.mqtt.broker.test.integration

import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient
import com.hivemq.client.mqtt.mqtt3.message.Mqtt3MessageType
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish
import com.hivemq.client.mqtt.mqtt3.message.subscribe.suback.Mqtt3SubAckReturnCode
import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient
import com.hivemq.client.mqtt.mqtt5.message.Mqtt5MessageType
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator
Expand All @@ -11,53 +15,119 @@ import java.util.concurrent.atomic.AtomicReference

class ConnectSubscribePublishTest extends IntegrationSpecification {

def "publisher should publish message QoS 0"() {
def "publisher should publish message QoS 0 using mqtt 3.1.1"() {
given:
def received = new AtomicReference<Mqtt5Publish>()
def subscriber = buildClient()
def received = new AtomicReference<Mqtt3Publish>()
def subscriber = buildMqtt311Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildClient()
def publisher = buildMqtt311Client()
when:
subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_0)
subscribeResult.type == Mqtt3MessageType.SUBACK

publishResult != null
publishResult.qos == MqttQos.AT_MOST_ONCE
publishResult.type == Mqtt3MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.AT_MOST_ONCE
received.get().type == Mqtt3MessageType.PUBLISH
cleanup:
subscriber.disconnect().join()
publisher.disconnect().join()
}

def "publisher should publish message QoS 0 using mqtt 5"() {
given:
def received = new AtomicReference<Mqtt5Publish>()
def subscriber = buildMqtt5Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildMqtt5Client()
when:
subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_MOST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_MOST_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_0)
subscribeResult.type == Mqtt5MessageType.SUBACK

publishResult != null
publishResult.publish.qos == MqttQos.AT_MOST_ONCE
publishResult.publish.type == Mqtt5MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.AT_MOST_ONCE
received.get().type == Mqtt5MessageType.PUBLISH
cleanup:
subscriber.disconnect()
publisher.disconnect()
subscriber.disconnect().join()
publisher.disconnect().join()
}

def "publisher should publish message QoS 1"() {
def "publisher should publish message QoS 1 using mqtt 3.1.1"() {
given:
def received = new AtomicReference<Mqtt5Publish>()
def subscriber = buildClient()
def received = new AtomicReference<Mqtt3Publish>()
def subscriber = buildMqtt311Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildClient()
def publisher = buildMqtt311Client()
when:

subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_1)
subscribeResult.type == Mqtt3MessageType.SUBACK

publishResult != null
publishResult.qos == MqttQos.AT_LEAST_ONCE
publishResult.type == Mqtt3MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.AT_LEAST_ONCE
received.get().type == Mqtt3MessageType.PUBLISH
cleanup:
subscriber.disconnect().join()
publisher.disconnect().join()
}

def "publisher should publish message QoS 1 using mqtt 5"() {
given:
def received = new AtomicReference<Mqtt5Publish>()
def subscriber = buildMqtt5Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildMqtt5Client()
when:

subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.AT_LEAST_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.AT_LEAST_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()
Expand All @@ -69,7 +139,7 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publishResult != null
publishResult.publish.qos == MqttQos.AT_LEAST_ONCE
publishResult.publish.type == Mqtt5MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.AT_LEAST_ONCE
received.get().type == Mqtt5MessageType.PUBLISH
Expand All @@ -78,17 +148,50 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
publisher.disconnect().join()
}

def "publisher should publish message QoS 2"() {
def "publisher should publish message QoS 2 using mqtt 3.1.1"() {
given:
def received = new AtomicReference<Mqtt3Publish>()
def subscriber = buildMqtt311Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildMqtt311Client()
when:
subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)

Thread.sleep(100)
then:
noExceptionThrown()

subscribeResult != null
subscribeResult.returnCodes.contains(Mqtt3SubAckReturnCode.SUCCESS_MAXIMUM_QOS_2)
subscribeResult.type == Mqtt3MessageType.SUBACK

publishResult != null
publishResult.qos == MqttQos.EXACTLY_ONCE
publishResult.type == Mqtt3MessageType.PUBLISH

received.get() != null
received.get().qos == MqttQos.EXACTLY_ONCE
received.get().type == Mqtt3MessageType.PUBLISH
cleanup:
subscriber.disconnect().join()
publisher.disconnect().join()
}

def "publisher should publish message QoS 2 using mqtt 5"() {
given:
def received = new AtomicReference<Mqtt5Publish>()
def subscriber = buildClient()
def subscriber = buildMqtt5Client()
def subscriberId = subscriber.getConfig().clientIdentifier.get().toString()
def publisher = buildClient()
def publisher = buildMqtt5Client()
when:

subscriber.connect().join()
publisher.connect().join()

def subscribeResult = subscribe(subscriber, subscriberId, MqttQos.EXACTLY_ONCE, received)
def publishResult = publish(publisher, subscriberId, MqttQos.EXACTLY_ONCE)

Expand All @@ -108,8 +211,8 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
received.get().qos == MqttQos.EXACTLY_ONCE
received.get().type == Mqtt5MessageType.PUBLISH
cleanup:
subscriber.disconnect()
publisher.disconnect()
subscriber.disconnect().join()
publisher.disconnect().join()
}

def publish(Mqtt5AsyncClient publisher, String subscriberId, MqttQos qos) {
Expand All @@ -135,4 +238,27 @@ class ConnectSubscribePublishTest extends IntegrationSpecification {
.send()
.join()
}

def publish(Mqtt3AsyncClient publisher, String subscriberId, MqttQos qos) {
return publisher.publishWith()
.topic("test/$subscriberId")
.qos(qos)
.payload(publishPayload)
.send()
.join()
}

def subscribe(
Mqtt3AsyncClient subscriber,
String subscriberId,
MqttQos qos,
AtomicReference<Mqtt3Publish> received
) {
return subscriber.subscribeWith()
.topicFilter("test/$subscriberId")
.qos(qos)
.callback({ publish -> received.set(publish) })
.send()
.join()
}
}
Loading