diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java index fcd4cef3..40d17c41 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java @@ -1,13 +1,13 @@ package javasabr.mqtt.service.message.handler.impl; import static javasabr.mqtt.base.util.ReactorUtils.ifTrue; -import static javasabr.mqtt.model.MqttProperties.MAXIMUM_MESSAGE_SIZE_UNDEFINED; -import static javasabr.mqtt.model.MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED; +import static javasabr.mqtt.model.MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET; +import static javasabr.mqtt.model.MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET; import static javasabr.mqtt.model.MqttProperties.SERVER_KEEP_ALIVE_DISABLED; import static javasabr.mqtt.model.MqttProperties.SESSION_EXPIRY_INTERVAL_DISABLED; -import static javasabr.mqtt.model.MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED; +import static javasabr.mqtt.model.MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET; import static javasabr.mqtt.model.MqttProperties.TOPIC_ALIAS_MAXIMUM_DISABLED; -import static javasabr.mqtt.model.MqttProperties.TOPIC_ALIAS_MAXIMUM_UNDEFINED; +import static javasabr.mqtt.model.MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET; import static javasabr.mqtt.model.reason.code.ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD; import static javasabr.mqtt.model.reason.code.ConnectAckReasonCode.CLIENT_IDENTIFIER_NOT_VALID; @@ -143,22 +143,22 @@ private void resolveClientConnectionConfig(MqttClient.UnsafeMqttClient client, C ? packet.sessionExpiryInterval() : SESSION_EXPIRY_INTERVAL_DISABLED; - if (sessionExpiryInterval == SESSION_EXPIRY_INTERVAL_UNDEFINED) { + if (sessionExpiryInterval == SESSION_EXPIRY_INTERVAL_IS_NOT_SET) { sessionExpiryInterval = serverConfig.defaultSessionExpiryInterval(); } // select result receive max - int receiveMaxPublishes = packet.receiveMaxPublishes() == RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED + int receiveMaxPublishes = packet.receiveMaxPublishes() == RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET ? serverConfig.receiveMaxPublishes() : Math.min(packet.receiveMaxPublishes(), serverConfig.receiveMaxPublishes()); // select result maximum packet size - var maximumPacketSize = packet.maxPacketSize() == MAXIMUM_MESSAGE_SIZE_UNDEFINED + var maximumPacketSize = packet.maxPacketSize() == MAXIMUM_MESSAGE_SIZE_IS_NOT_SET ? serverConfig.maxMessageSize() : Math.min(packet.maxPacketSize(), serverConfig.maxMessageSize()); // select result topic alias maximum - var topicAliasMaxValue = packet.topicAliasMaxValue() == TOPIC_ALIAS_MAXIMUM_UNDEFINED + var topicAliasMaxValue = packet.topicAliasMaxValue() == TOPIC_ALIAS_MAXIMUM_IS_NOT_SET ? TOPIC_ALIAS_MAXIMUM_DISABLED : Math.min(packet.topicAliasMaxValue(), serverConfig.topicAliasMaxValue()); diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java index 7e3c325a..2c510be5 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java @@ -204,7 +204,7 @@ private void handleInvalidTopicAlias(ExternalMqttClient client) { private void handleInvalidPayloadFormat(ExternalMqttClient client) { MqttOutMessage response = messageOutFactoryService .resolveFactory(client) - .newDisconnect(client, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.INVALID_PAYLOAD_FORMAT); + .newDisconnect(client, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.PROVIDED_INVALID_PAYLOAD_FORMAT); client.closeWithReason(response); } @@ -218,7 +218,7 @@ private void handleInvalidResponseTopicName(ExternalMqttClient client) { private void handleInvalidMessageExpiryInterval(ExternalMqttClient client) { MqttOutMessage response = messageOutFactoryService .resolveFactory(client) - .newDisconnect(client, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.INVALID_MESSAGE_EXPIRY_INTERVAL); + .newDisconnect(client, DisconnectReasonCode.PROTOCOL_ERROR, MqttProtocolErrors.PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL); client.closeWithReason(response); } diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java index 2dd44c35..67c525bb 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt311MessageOutFactory.java @@ -111,10 +111,10 @@ public MqttOutMessage newDisconnect( @Override public MqttOutMessage newAuthenticate( AuthenticateReasonCode reasonCode, - String authenticateMethod, - byte[] authenticateData, - Array userProperties, - String reason) { + @Nullable String reason, + @Nullable String authenticateMethod, + byte @Nullable [] authenticateData, + Array userProperties) { throw new UnsupportedOperationException(); } diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java index d22b9ed2..22a7758b 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/Mqtt5MessageOutFactory.java @@ -141,11 +141,16 @@ public MqttOutMessage newDisconnect( @Override public MqttOutMessage newAuthenticate( AuthenticateReasonCode reasonCode, - String authenticateMethod, - byte[] authenticateData, - Array userProperties, - String reason) { - return new AuthenticationMqtt5OutMessage(userProperties, reasonCode, reason, authenticateMethod, authenticateData); + @Nullable String reason, + @Nullable String authenticateMethod, + byte @Nullable [] authenticateData, + Array userProperties) { + return new AuthenticationMqtt5OutMessage( + reasonCode, + reason, + authenticateMethod, + authenticateData, + userProperties); } @Override diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java index 23d301d8..55b058b5 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/out/factory/MqttMessageOutFactory.java @@ -201,20 +201,21 @@ public MqttOutMessage newDisconnect( public abstract MqttOutMessage newAuthenticate( AuthenticateReasonCode reasonCode, - String authenticateMethod, - byte[] authenticateData, - Array userProperties, - String reason); + @Nullable String reason, + @Nullable String authenticateMethod, + byte @Nullable [] authenticateData, + Array userProperties); public MqttOutMessage newAuthenticate( AuthenticateReasonCode reasonCode, - String authenticateMethod, - byte[] authenticateData) { + @Nullable String authenticateMethod, + byte @Nullable [] authenticateData) { return newAuthenticate( reasonCode, + null, authenticateMethod, - authenticateData, EMPTY_USER_PROPERTIES, - StringUtils.EMPTY); + authenticateData, + EMPTY_USER_PROPERTIES); } public abstract MqttOutMessage newPingRequest(); diff --git a/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy b/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy index 08ef2b68..fba7f28f 100644 --- a/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy +++ b/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandlerTest.groovy @@ -221,7 +221,7 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { then: def disconnectReason = mqttClient.nextSentMessage(DisconnectMqtt5OutMessage) disconnectReason.reasonCode() == DisconnectReasonCode.PROTOCOL_ERROR - disconnectReason.reason() == MqttProtocolErrors.INVALID_PAYLOAD_FORMAT + disconnectReason.reason() == MqttProtocolErrors.PROVIDED_INVALID_PAYLOAD_FORMAT disconnectReason.serverReference() == null } @@ -244,7 +244,7 @@ class PublishMqttInMessageHandlerTest extends IntegrationServiceSpecification { then: def disconnectReason = mqttClient.nextSentMessage(DisconnectMqtt5OutMessage) disconnectReason.reasonCode() == DisconnectReasonCode.PROTOCOL_ERROR - disconnectReason.reason() == MqttProtocolErrors.INVALID_MESSAGE_EXPIRY_INTERVAL + disconnectReason.reason() == MqttProtocolErrors.PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL disconnectReason.serverReference() == null } diff --git a/model/src/main/java/javasabr/mqtt/model/MqttProperties.java b/model/src/main/java/javasabr/mqtt/model/MqttProperties.java index bea85eca..774c498b 100644 --- a/model/src/main/java/javasabr/mqtt/model/MqttProperties.java +++ b/model/src/main/java/javasabr/mqtt/model/MqttProperties.java @@ -11,16 +11,16 @@ public interface MqttProperties { long SESSION_EXPIRY_INTERVAL_DEFAULT = 120; long SESSION_EXPIRY_INTERVAL_MIN = 0; long SESSION_EXPIRY_INTERVAL_INFINITY = 0xFFFFFFFFL; - long SESSION_EXPIRY_INTERVAL_UNDEFINED = -1; + long SESSION_EXPIRY_INTERVAL_IS_NOT_SET = -1; - int RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED = -1; + int RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET = 0; int RECEIVE_MAXIMUM_PUBLISHES_MIN = 1; - int RECEIVE_MAXIMUM_PUBLISHES_DEFAULT = 10; int RECEIVE_MAXIMUM_PUBLISHES_MAX = 0xFFFF; + int RECEIVE_MAXIMUM_PUBLISHES_DEFAULT = RECEIVE_MAXIMUM_PUBLISHES_MAX; - int MAXIMUM_MESSAGE_SIZE_UNDEFINED = -1; + int MAXIMUM_MESSAGE_SIZE_IS_NOT_SET = -1; int MAXIMUM_MESSAGE_SIZE_DEFAULT = 3074; - int MAXIMUM_MESSAGE_SIZE_MIN = 128; + int MAXIMUM_MESSAGE_SIZE_MIN = 64; int MAXIMUM_MESSAGE_SIZE_MAX = MAXIMUM_PROTOCOL_MESSAGE_SIZE; int MAXIMUM_STRING_LENGTH = 1024; @@ -31,10 +31,10 @@ public interface MqttProperties { long MESSAGE_EXPIRY_INTERVAL_INFINITY = 0; long MESSAGE_EXPIRY_INTERVAL_MIN = 0; - int TOPIC_ALIAS_MAXIMUM_UNDEFINED = -1; + int TOPIC_ALIAS_MAXIMUM_IS_NOT_SET = 0; int TOPIC_ALIAS_MAXIMUM_DISABLED = 0; - int SERVER_KEEP_ALIVE_UNDEFINED = -1; + int SERVER_KEEP_ALIVE_IS_NOT_SET = -1; int SERVER_KEEP_ALIVE_DISABLED = 0; int SERVER_KEEP_ALIVE_DEFAULT = 0; int SERVER_KEEP_ALIVE_MIN = 0; @@ -52,12 +52,18 @@ public interface MqttProperties { int MESSAGE_ID_IS_NOT_SET = 0; - boolean SESSIONS_ENABLED_DEFAULT = true; - boolean KEEP_ALIVE_ENABLED_DEFAULT = false; - boolean RETAIN_AVAILABLE_DEFAULT = false; - boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = false; - boolean SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT = false; + boolean RETAIN_AVAILABLE_DEFAULT = true; + int RETAIN_AVAILABLE_IS_NOT_SET = -1; + + boolean WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT = true; + int WILDCARD_SUBSCRIPTION_AVAILABLE_IS_NOT_SET = -1; + boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT = true; + int SUBSCRIPTION_IDENTIFIER_AVAILABLE_IS_NOT_SET = -1; - int PACKET_ID_FOR_QOS_0 = 0; + boolean SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT = true; + int SHARED_SUBSCRIPTION_AVAILABLE_IS_NOT_SET = -1; + + boolean SESSIONS_ENABLED_DEFAULT = true; + boolean KEEP_ALIVE_ENABLED_DEFAULT = false; } diff --git a/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java b/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java index f001ef85..6b51de39 100644 --- a/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java +++ b/model/src/main/java/javasabr/mqtt/model/MqttProtocolErrors.java @@ -4,8 +4,20 @@ public interface MqttProtocolErrors { String NO_ANY_TOPIC_FILTERS = "Not provided any information about 'Topic Filters'"; String NO_ANY_TOPIC_NANE = "Not provided any information about TopicName"; //String INVALID_TOPIC_ALIAS = "Provided invalid TopicAlias"; - String INVALID_PAYLOAD_FORMAT = "Provided invalid PayloadFormat"; - String INVALID_MESSAGE_EXPIRY_INTERVAL = "Provided invalid MessageExpiryInterval"; + + String PROVIDED_INVALID_PAYLOAD_FORMAT = "Provided invalid PayloadFormat"; + String PROVIDED_INVALID_MESSAGE_EXPIRY_INTERVAL = "Provided invalid MessageExpiryInterval"; + String PROVIDED_INVALID_SESSION_EXPIRY_INTERVAL = "Provided invalid 'Session Expiry Interval'"; + String PROVIDED_INVALID_RECEIVED_MAX_PUBLISHES = "Provided invalid 'Receive Maximum'"; + String PROVIDED_INVALID_MAX_QOS = "Provided invalid 'Maximum QoS'"; + String PROVIDED_INVALID_RETAIN_AVAILABLE = "Provided invalid 'Retain Available'"; + String PROVIDED_INVALID_MAX_MESSAGE_SIZE = "Provided invalid 'Maximum Packet Size'"; + String PROVIDED_INVALID_TOPIC_ALIAS_MAX = "Provided invalid 'Topic Alias Maximum'"; + String PROVIDED_INVALID_WILDCARD_SUBSCRIPTION_AVAILABLE = "Provided invalid 'Wildcard Subscription Available'"; + String PROVIDED_INVALID_SUBSCRIPTION_IDENTIFIERS_AVAILABLE = "Provided invalid 'Subscription Identifiers Available'"; + String PROVIDED_INVALID_SHARED_SUBSCRIPTION_AVAILABLE = "Provided invalid 'Shared Subscription Available'"; + String PROVIDED_INVALID_SERVER_KEEP_ALIVE = "Provided invalid 'Server Keep Alive'"; + String INVALID_RESPONSE_TOPIC_NAME = "Provided invalid ResponseTopicName"; String UNSUPPORTED_QOS_OR_RETAIN_HANDLING = "Provided unsupported 'QoS' or 'RetainHandling'"; String MISSED_REQUIRED_MESSAGE_ID = "'Packet Identifier' must be presented'"; diff --git a/model/src/main/java/javasabr/mqtt/model/reason/code/AuthenticateReasonCode.java b/model/src/main/java/javasabr/mqtt/model/reason/code/AuthenticateReasonCode.java index e7849434..0e373403 100644 --- a/model/src/main/java/javasabr/mqtt/model/reason/code/AuthenticateReasonCode.java +++ b/model/src/main/java/javasabr/mqtt/model/reason/code/AuthenticateReasonCode.java @@ -1,52 +1,40 @@ package javasabr.mqtt.model.reason.code; -import java.util.stream.Stream; -import javasabr.rlib.common.util.ObjectUtils; +import javasabr.rlib.common.util.NumberedEnum; +import javasabr.rlib.common.util.NumberedEnumMap; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.experimental.Accessors; +@Getter +@Accessors @RequiredArgsConstructor -public enum AuthenticateReasonCode { +public enum AuthenticateReasonCode implements NumberedEnum, ReasonCode { /** * Authentication is successful. Server. */ - SUCCESS((byte) 0x00), + SUCCESS(0x00), /** * Continue the authentication with another step. Client or Server. */ - CONTINUE_AUTHENTICATION((byte) 0x18), + CONTINUE_AUTHENTICATION(0x18), /** * Initiate a re-authentication. Client. */ - RE_AUTHENTICATE((byte) 0x19); + RE_AUTHENTICATE(0x19); - private static final AuthenticateReasonCode[] VALUES; + private static final NumberedEnumMap NUMBERED_MAP = + new NumberedEnumMap<>(AuthenticateReasonCode.class); - static { - - var maxId = Stream - .of(values()) - .mapToInt(AuthenticateReasonCode::value) - .max() - .orElse(0); - - var values = new AuthenticateReasonCode[maxId + 1]; - - for (var value : values()) { - values[value.value] = value; - } - - VALUES = values; + public static AuthenticateReasonCode ofCode(int code) { + return NUMBERED_MAP.require(code); } - public static AuthenticateReasonCode of(int index) { - return ObjectUtils.notNull( - VALUES[index], - index, - arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg)); - } + private final int code; - private @Getter - final byte value; + @Override + public int number() { + return code; + } } diff --git a/model/src/main/java/javasabr/mqtt/model/reason/code/ConnectAckReasonCode.java b/model/src/main/java/javasabr/mqtt/model/reason/code/ConnectAckReasonCode.java index 474e9f3e..4e5b5879 100644 --- a/model/src/main/java/javasabr/mqtt/model/reason/code/ConnectAckReasonCode.java +++ b/model/src/main/java/javasabr/mqtt/model/reason/code/ConnectAckReasonCode.java @@ -1,156 +1,134 @@ package javasabr.mqtt.model.reason.code; -import java.util.stream.Stream; -import javasabr.rlib.common.util.ObjectUtils; +import javasabr.rlib.common.util.NumberedEnum; +import javasabr.rlib.common.util.NumberedEnumMap; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.experimental.Accessors; +@Getter +@Accessors @RequiredArgsConstructor -public enum ConnectAckReasonCode { +public enum ConnectAckReasonCode implements NumberedEnum { /** * The Connection is accepted. */ - SUCCESS((byte) 0x00, (byte) 0x00), + SUCCESS(0x00, 0x00), // WITH REASONS BELOW SERVER MUST CLOSE CONNECTION - /** * The Server does not wish to reveal the reason for the failure, or none of the other Reason Codes apply. */ - UNSPECIFIED_ERROR((byte) 0x01, (byte) 0x80), + UNSPECIFIED_ERROR(0x01, 0x80), /** * Data within the CONNECT packet could not be correctly parsed. */ - MALFORMED_PACKET((byte) 0x01, (byte) 0x81), + MALFORMED_PACKET(0x01, 0x81), /** * Data in the CONNECT packet does not conform to this specification. */ - PROTOCOL_ERROR((byte) 0x01, (byte) 0x82), + PROTOCOL_ERROR(0x01, 0x82), /** * The CONNECT is valid but is not accepted by this Server. */ - IMPLEMENTATION_SPECIFIC_ERROR((byte) 0x01, (byte) 0x83), + IMPLEMENTATION_SPECIFIC_ERROR(0x01, 0x83), /** * The Server does not support the version of the MQTT protocol requested by the Client */ - UNSUPPORTED_PROTOCOL_VERSION((byte) 0x01, (byte) 0x84), + UNSUPPORTED_PROTOCOL_VERSION(0x01, 0x84), /** * The Client Identifier is a valid string but is not allowed by the Server. */ - CLIENT_IDENTIFIER_NOT_VALID((byte) 0x02, (byte) 0x85), + CLIENT_IDENTIFIER_NOT_VALID(0x02, 0x85), /** * The Server does not accept the User Name or Password specified by the Client */ - BAD_USER_NAME_OR_PASSWORD((byte) 0x04, (byte) 0x86), + BAD_USER_NAME_OR_PASSWORD(0x04, 0x86), /** * The Client is not authorized to connect. */ - NOT_AUTHORIZED((byte) 0x05, (byte) 0x87), + NOT_AUTHORIZED(0x05, 0x87), /** * The MQTT Server is not available */ - SERVER_UNAVAILABLE((byte) 0x03, (byte) 0x88), + SERVER_UNAVAILABLE(0x03, 0x88), /** * The Server is busy. Try again later. */ - SERVER_BUSY((byte) 0x01, (byte) 0x89), + SERVER_BUSY(0x01, 0x89), /** * This Client has been banned by administrative action. Contact the server administrator. */ - BANNED((byte) 0x01, (byte) 0x8A), + BANNED(0x01, 0x8A), /** * The authentication method is not supported or does not match the authentication method currently in use. */ - BAD_AUTHENTICATION_METHOD((byte) 0x04, (byte) 0x8C), + BAD_AUTHENTICATION_METHOD(0x04, 0x8C), /** * The Will Topic Name is not malformed, but is not accepted by this Server. */ - TOPIC_NAME_INVALID((byte) 0x01, (byte) 0x90), + TOPIC_NAME_INVALID(0x01, 0x90), /** * The CONNECT packet exceeded the maximum permissible size. */ - PACKET_TOO_LARGE((byte) 0x01, (byte) 0x95), + PACKET_TOO_LARGE(0x01, 0x95), /** * An implementation or administrative imposed limit has been exceeded. */ - QUOTA_EXCEEDED((byte) 0x01, (byte) 0x97), + QUOTA_EXCEEDED(0x01, 0x97), /** * The Will Payload does not match the specified Payload Format Indicator. */ - PAYLOAD_FORMAT_INVALID((byte) 0x01, (byte) 0x99), + PAYLOAD_FORMAT_INVALID(0x01, 0x99), /** * The Server does not support retained messages, and Will Retain was set to 1. */ - RETAIN_NOT_SUPPORTED((byte) 0x01, (byte) 0x9A), + RETAIN_NOT_SUPPORTED(0x01, 0x9A), /** * The Server does not support the QoS set in Will QoS. */ - QOS_NOT_SUPPORTED((byte) 0x01, (byte) 0x9B), + QOS_NOT_SUPPORTED(0x01, 0x9B), /** * The Client should temporarily use another server. */ - USE_ANOTHER_SERVER((byte) 0x01, (byte) 0x9C), + USE_ANOTHER_SERVER(0x01, 0x9C), /** * The Client should permanently use another server. */ - SERVER_MOVED((byte) 0x01, (byte) 0x9D), + SERVER_MOVED(0x01, 0x9D), /** * The connection rate limit has been exceeded. */ - CONNECTION_RATE_EXCEEDED((byte) 0x01, (byte) 0x9F); - - private static final ConnectAckReasonCode[] MQTT_5_VALUES; - - static { - - var maxId = Stream - .of(values()) - .mapToInt(ConnectAckReasonCode::mqtt5) - .map(value -> Byte.toUnsignedInt((byte) value)) - .max() - .orElse(0); + CONNECTION_RATE_EXCEEDED(0x01, 0x9F); - var values = new ConnectAckReasonCode[maxId + 1]; + private static final NumberedEnumMap MQTT5_NUMBERED_MAP = + new NumberedEnumMap<>(ConnectAckReasonCode.class); - for (var value : values()) { - values[Byte.toUnsignedInt(value.mqtt5)] = value; - } - - MQTT_5_VALUES = values; + public static ConnectAckReasonCode ofCode(boolean mqtt5, int reasonCode) { + return mqtt5 ? ofMqtt5Code(reasonCode) : ofMqtt311Code(reasonCode); } - public static ConnectAckReasonCode of(boolean mqtt5, int reasonCode) { - return mqtt5 ? ofMqtt5(reasonCode) : ofMqtt311(reasonCode); + public static ConnectAckReasonCode ofMqtt311Code(int code) { + return switch (code) { + case 0x00 -> SUCCESS; + case 0x01 -> UNSUPPORTED_PROTOCOL_VERSION; + case 0x02 -> CLIENT_IDENTIFIER_NOT_VALID; + case 0x03 -> SERVER_UNAVAILABLE; + case 0x04 -> BAD_USER_NAME_OR_PASSWORD; + case 0x05 -> NOT_AUTHORIZED; + default -> throw new IllegalArgumentException("Unsupported reason code: " + code); + }; } - public static ConnectAckReasonCode ofMqtt311(int reasonCode) { - switch (reasonCode) { - case 0x00: - return SUCCESS; - case 0x01: - return UNSUPPORTED_PROTOCOL_VERSION; - case 0x02: - return CLIENT_IDENTIFIER_NOT_VALID; - case 0x03: - return SERVER_UNAVAILABLE; - case 0x04: - return BAD_USER_NAME_OR_PASSWORD; - case 0x05: - return NOT_AUTHORIZED; - default: - throw new IllegalArgumentException("Unsupported reason code: " + reasonCode); - } + public static ConnectAckReasonCode ofMqtt5Code(int code) { + return MQTT5_NUMBERED_MAP.require(code); } - public static ConnectAckReasonCode ofMqtt5(int reasonCode) { - return ObjectUtils.notNull( - MQTT_5_VALUES[reasonCode], - reasonCode, - arg -> new IndexOutOfBoundsException("Doesn't support reason code: " + arg)); - } + private final int mqtt311; + private final int mqtt5; - @Getter - private final byte mqtt311; - @Getter - private final byte mqtt5; + @Override + public int number() { + return mqtt5; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/AuthenticationMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/AuthenticationMqttInMessage.java index 28a859b9..fad21d90 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/AuthenticationMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/AuthenticationMqttInMessage.java @@ -3,27 +3,32 @@ import java.nio.ByteBuffer; import java.util.EnumSet; import java.util.Set; +import javasabr.mqtt.base.util.DebugUtils; import javasabr.mqtt.model.MqttMessageProperty; import javasabr.mqtt.model.message.MqttMessageType; import javasabr.mqtt.model.reason.code.AuthenticateReasonCode; import javasabr.mqtt.network.MqttConnection; -import javasabr.rlib.common.util.ArrayUtils; -import javasabr.rlib.common.util.StringUtils; import lombok.AccessLevel; import lombok.Getter; import lombok.experimental.Accessors; import lombok.experimental.FieldDefaults; +import org.jspecify.annotations.Nullable; /** * Authentication exchange. */ @Getter -@Accessors(fluent = true) -@FieldDefaults(level = AccessLevel.PRIVATE) +@Accessors +@FieldDefaults(level = AccessLevel.PROTECTED) public class AuthenticationMqttInMessage extends MqttInMessage { + public static final byte MESSAGE_FLAGS = 0b0000_0000; private static final byte MESSAGE_TYPE = (byte) MqttMessageType.AUTHENTICATION.ordinal(); + static { + DebugUtils.registerIncludedFields("reasonCode"); + } + private static final Set AVAILABLE_PROPERTIES = EnumSet.of( /* Followed by a UTF-8 Encoded String containing the name of the authentication method. It is a Protocol @@ -58,17 +63,16 @@ public class AuthenticationMqttInMessage extends MqttInMessage { AuthenticateReasonCode reasonCode; // properties + @Nullable String reason; + @Nullable String authenticationMethod; - byte[] authenticationData; + byte @Nullable [] authenticationData; public AuthenticationMqttInMessage(byte messageFlags) { super(messageFlags); this.reasonCode = AuthenticateReasonCode.SUCCESS; - this.reason = StringUtils.EMPTY; - this.authenticationMethod = StringUtils.EMPTY; - this.authenticationData = ArrayUtils.EMPTY_BYTE_ARRAY; } @Override @@ -76,10 +80,20 @@ public byte messageType() { return MESSAGE_TYPE; } + @Override + public String name() { + return MqttMessageType.AUTHENTICATION.name(); + } + + @Override + protected boolean validMessageFlags(byte messageFlags) { + return messageFlags == MESSAGE_FLAGS; + } + @Override protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) { // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901219 - reasonCode = AuthenticateReasonCode.of(readByteUnsigned(buffer)); + reasonCode = AuthenticateReasonCode.ofCode(readByteUnsigned(buffer)); } @Override @@ -90,7 +104,12 @@ protected Set availableProperties() { @Override protected void applyProperty(MqttMessageProperty property, byte[] value) { switch (property) { - case AUTHENTICATION_DATA -> authenticationData = value; + case AUTHENTICATION_DATA -> { + if (authenticationData != null) { + alreadyPresentedProperty(property); + } + authenticationData = value; + } default -> unsupportedProperty(property); } } @@ -98,8 +117,18 @@ protected void applyProperty(MqttMessageProperty property, byte[] value) { @Override protected void applyProperty(MqttMessageProperty property, String value) { switch (property) { - case REASON_STRING -> reason = value; - case AUTHENTICATION_METHOD -> authenticationMethod = value; + case REASON_STRING -> { + if (reason != null) { + alreadyPresentedProperty(property); + } + reason = value; + } + case AUTHENTICATION_METHOD -> { + if (authenticationMethod != null) { + alreadyPresentedProperty(property); + } + authenticationMethod = value; + } default -> unsupportedProperty(property); } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java index 320839aa..ac992cee 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java @@ -5,29 +5,29 @@ import java.util.Set; import javasabr.mqtt.model.MqttMessageProperty; import javasabr.mqtt.model.MqttProperties; +import javasabr.mqtt.model.MqttProtocolErrors; import javasabr.mqtt.model.MqttVersion; import javasabr.mqtt.model.QoS; -import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.exception.MalformedProtocolMqttException; import javasabr.mqtt.model.message.MqttMessageType; import javasabr.mqtt.model.reason.code.ConnectAckReasonCode; import javasabr.mqtt.network.MqttConnection; -import javasabr.rlib.collections.array.MutableArray; -import javasabr.rlib.common.util.ArrayUtils; -import javasabr.rlib.common.util.NumberUtils; -import javasabr.rlib.common.util.StringUtils; +import javasabr.mqtt.network.util.MqttDataUtils; import lombok.AccessLevel; import lombok.Getter; import lombok.experimental.Accessors; import lombok.experimental.FieldDefaults; +import org.jspecify.annotations.Nullable; /** * Acknowledge connection request. */ @Getter -@Accessors(fluent = true) +@Accessors @FieldDefaults(level = AccessLevel.PRIVATE) public class ConnectAckMqttInMessage extends MqttInMessage { + public static final byte MESSAGE_FLAGS = 0b0000_0000; private static final byte MESSAGE_TYPE = (byte) MqttMessageType.CONNECT_ACK.ordinal(); private static final Set AVAILABLE_PROPERTIES = EnumSet.of( @@ -240,7 +240,9 @@ public class ConnectAckMqttInMessage extends MqttInMessage { * it MUST then close the Network Connection */ ConnectAckReasonCode reasonCode; - QoS maximumQos; + + @Nullable + QoS maxQos; /** * The Session Present flag informs the Client whether the Server is using Session State from a previous connection @@ -251,12 +253,17 @@ public class ConnectAckMqttInMessage extends MqttInMessage { boolean sessionPresent; // properties + @Nullable String assignedClientId; + @Nullable String reason; + @Nullable String responseInformation; - String authenticationMethod; + @Nullable String serverReference; - byte[] authenticationData; + @Nullable + String authenticationMethod; + byte @Nullable [] authenticationData; long sessionExpiryInterval; @@ -265,31 +272,23 @@ public class ConnectAckMqttInMessage extends MqttInMessage { int topicAliasMaxValue; int serverKeepAlive; - boolean retainAvailable; - boolean wildcardSubscriptionAvailable; - boolean sharedSubscriptionAvailable; - boolean subscriptionIdAvailable; + int retainAvailable; + int wildcardSubscriptionAvailable; + int subscriptionIdAvailable; + int sharedSubscriptionAvailable; public ConnectAckMqttInMessage(byte messageFlags) { super(messageFlags); - this.userProperties = MutableArray.ofType(StringPair.class); this.reasonCode = ConnectAckReasonCode.SUCCESS; - this.maximumQos = QoS.EXACTLY_ONCE; - this.retainAvailable = MqttProperties.RETAIN_AVAILABLE_DEFAULT; - this.assignedClientId = StringUtils.EMPTY; - this.reason = StringUtils.EMPTY; - this.sharedSubscriptionAvailable = MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT; - this.wildcardSubscriptionAvailable = MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT; - this.subscriptionIdAvailable = MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT; - this.responseInformation = StringUtils.EMPTY; - this.serverReference = StringUtils.EMPTY; - this.authenticationMethod = StringUtils.EMPTY; - this.authenticationData = ArrayUtils.EMPTY_BYTE_ARRAY; - this.serverKeepAlive = MqttProperties.SERVER_KEEP_ALIVE_UNDEFINED; - this.maxMessageSize = MqttProperties.MAXIMUM_MESSAGE_SIZE_UNDEFINED; - this.sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED; - this.topicAliasMaxValue = MqttProperties.TOPIC_ALIAS_MAXIMUM_UNDEFINED; - this.receiveMaxPublishes = MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED; + this.sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET; + this.receiveMaxPublishes = MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET; + this.retainAvailable = MqttProperties.RETAIN_AVAILABLE_IS_NOT_SET; + this.maxMessageSize = MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET; + this.topicAliasMaxValue = MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET; + this.wildcardSubscriptionAvailable = MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_IS_NOT_SET; + this.subscriptionIdAvailable = MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_IS_NOT_SET; + this.sharedSubscriptionAvailable = MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE_IS_NOT_SET; + this.serverKeepAlive = MqttProperties.SERVER_KEEP_ALIVE_IS_NOT_SET; } @Override @@ -297,11 +296,22 @@ public byte messageType() { return MESSAGE_TYPE; } + @Override + public String name() { + return MqttMessageType.CONNECT_ACK.name(); + } + + @Override + protected boolean validMessageFlags(byte messageFlags) { + return messageFlags == MESSAGE_FLAGS; + } + @Override protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) { // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718035 - sessionPresent = readByteUnsigned(buffer) == 1; - reasonCode = ConnectAckReasonCode.of(connection.isSupported(MqttVersion.MQTT_5), readByteUnsigned(buffer)); + int connectAckFlags = readByteUnsigned(buffer); + sessionPresent = (connectAckFlags & 0b0000_0001) != 0; + reasonCode = ConnectAckReasonCode.ofCode(connection.isSupported(MqttVersion.MQTT_5), readByteUnsigned(buffer)); } @Override @@ -312,7 +322,12 @@ protected Set availableProperties() { @Override protected void applyProperty(MqttMessageProperty property, byte[] value) { switch (property) { - case AUTHENTICATION_DATA -> authenticationData = value; + case AUTHENTICATION_DATA -> { + if (authenticationData != null) { + alreadyPresentedProperty(property); + } + authenticationData = value; + } default -> unsupportedProperty(property); } } @@ -320,11 +335,36 @@ protected void applyProperty(MqttMessageProperty property, byte[] value) { @Override protected void applyProperty(MqttMessageProperty property, String value) { switch (property) { - case REASON_STRING -> reason = value; - case ASSIGNED_CLIENT_IDENTIFIER -> assignedClientId = value; - case RESPONSE_INFORMATION -> responseInformation = value; - case AUTHENTICATION_METHOD -> authenticationMethod = value; - case SERVER_REFERENCE -> serverReference = value; + case ASSIGNED_CLIENT_IDENTIFIER ->{ + if (assignedClientId != null) { + alreadyPresentedProperty(property); + } + assignedClientId = value; + } + case REASON_STRING -> { + if (reason != null) { + alreadyPresentedProperty(property); + } + reason = value; + } + case RESPONSE_INFORMATION -> { + if (responseInformation != null) { + alreadyPresentedProperty(property); + } + responseInformation = value; + } + case AUTHENTICATION_METHOD -> { + if (authenticationMethod != null) { + alreadyPresentedProperty(property); + } + authenticationMethod = value; + } + case SERVER_REFERENCE -> { + if (serverReference != null) { + alreadyPresentedProperty(property); + } + serverReference = value; + } default -> unsupportedProperty(property); } } @@ -332,31 +372,89 @@ protected void applyProperty(MqttMessageProperty property, String value) { @Override protected void applyProperty(MqttMessageProperty property, long value) { switch (property) { - case WILDCARD_SUBSCRIPTION_AVAILABLE -> wildcardSubscriptionAvailable = NumberUtils.toBoolean(value); - case SHARED_SUBSCRIPTION_AVAILABLE -> sharedSubscriptionAvailable = NumberUtils.toBoolean(value); - case SUBSCRIPTION_IDENTIFIER_AVAILABLE -> subscriptionIdAvailable = NumberUtils.toBoolean(value); - case RETAIN_AVAILABLE -> retainAvailable = NumberUtils.toBoolean(value); - case RECEIVE_MAXIMUM_PUBLISHES -> receiveMaxPublishes = (int) NumberUtils.validate( - value, - MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MIN, - MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MAX); - case MAXIMUM_QOS -> maximumQos = QoS.ofCode((int) value); - case SERVER_KEEP_ALIVE -> serverKeepAlive = NumberUtils.validate( - (int) value, - MqttProperties.SERVER_KEEP_ALIVE_MIN, - MqttProperties.SERVER_KEEP_ALIVE_MAX); - case TOPIC_ALIAS_MAXIMUM -> topicAliasMaxValue = NumberUtils.validate( - (int) value, - MqttProperties.TOPIC_ALIAS_MIN, - MqttProperties.TOPIC_ALIAS_MAX); - case SESSION_EXPIRY_INTERVAL -> sessionExpiryInterval = NumberUtils.validate( - value, - MqttProperties.SESSION_EXPIRY_INTERVAL_MIN, - MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY); - case MAXIMUM_MESSAGE_SIZE -> maxMessageSize = NumberUtils.validate( - (int) value, - MqttProperties.MAXIMUM_MESSAGE_SIZE_MIN, - MqttProperties.MAXIMUM_MESSAGE_SIZE_MAX); + case WILDCARD_SUBSCRIPTION_AVAILABLE -> { + if (wildcardSubscriptionAvailable != MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_IS_NOT_SET) { + alreadyPresentedProperty(property); + } else if (!MqttDataUtils.isValidBoolean(value)) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_WILDCARD_SUBSCRIPTION_AVAILABLE); + } + wildcardSubscriptionAvailable = (int) value; + } + case SHARED_SUBSCRIPTION_AVAILABLE -> { + if (sharedSubscriptionAvailable != MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE_IS_NOT_SET) { + alreadyPresentedProperty(property); + } else if (!MqttDataUtils.isValidBoolean(value)) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_SHARED_SUBSCRIPTION_AVAILABLE); + } + sharedSubscriptionAvailable = (int) value; + } + case SUBSCRIPTION_IDENTIFIER_AVAILABLE -> { + if (subscriptionIdAvailable != MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_IS_NOT_SET) { + alreadyPresentedProperty(property); + } else if (!MqttDataUtils.isValidBoolean(value)) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_SUBSCRIPTION_IDENTIFIERS_AVAILABLE); + } + subscriptionIdAvailable = (int) value; + } + case RETAIN_AVAILABLE -> { + if (retainAvailable != MqttProperties.RETAIN_AVAILABLE_IS_NOT_SET) { + alreadyPresentedProperty(property); + } else if (!MqttDataUtils.isValidBoolean(value)) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_RETAIN_AVAILABLE); + } + retainAvailable = (int) value; + } + case RECEIVE_MAXIMUM_PUBLISHES -> { + if (receiveMaxPublishes != MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET) { + alreadyPresentedProperty(property); + } else if (value < MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MIN + || value > MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_MAX) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_RECEIVED_MAX_PUBLISHES); + } + receiveMaxPublishes = (int) value; + } + case MAXIMUM_QOS -> { + if (maxQos != null) { + alreadyPresentedProperty(property); + } else if (value < QoS.AT_LEAST_ONCE.level() || value > QoS.EXACTLY_ONCE.level()) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_MAX_QOS); + } + maxQos = QoS.ofCode((int) value); + } + case SERVER_KEEP_ALIVE -> { + if (serverKeepAlive != MqttProperties.SERVER_KEEP_ALIVE_IS_NOT_SET) { + alreadyPresentedProperty(property); + } else if (value < MqttProperties.SERVER_KEEP_ALIVE_MIN || value > MqttProperties.SERVER_KEEP_ALIVE_MAX) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_SERVER_KEEP_ALIVE); + } + serverKeepAlive = (int) value; + } + case TOPIC_ALIAS_MAXIMUM -> { + if (topicAliasMaxValue != MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET) { + alreadyPresentedProperty(property); + } else if (value > MqttProperties.TOPIC_ALIAS_MAX) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_TOPIC_ALIAS_MAX); + } + topicAliasMaxValue = (int) value; + } + case SESSION_EXPIRY_INTERVAL -> { + if (sessionExpiryInterval != MqttProperties.MESSAGE_EXPIRY_INTERVAL_IS_NOT_SET) { + alreadyPresentedProperty(property); + } else if (value < MqttProperties.SESSION_EXPIRY_INTERVAL_MIN + || value > MqttProperties.SESSION_EXPIRY_INTERVAL_INFINITY) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_SESSION_EXPIRY_INTERVAL); + } + sessionExpiryInterval = value; + } + case MAXIMUM_MESSAGE_SIZE -> { + if (maxMessageSize != MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET) { + alreadyPresentedProperty(property); + } else if (value < MqttProperties.MAXIMUM_MESSAGE_SIZE_MIN + || value > MqttProperties.MAXIMUM_MESSAGE_SIZE_MAX) { + throw new MalformedProtocolMqttException(MqttProtocolErrors.PROVIDED_INVALID_MAX_MESSAGE_SIZE); + } + maxMessageSize = (int) value; + } default -> unsupportedProperty(property); } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java index ccbdcb9b..1184e0d6 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java @@ -214,10 +214,10 @@ public class ConnectMqttInMessage extends MqttInMessage { String authenticationMethod = StringUtils.EMPTY; byte[] authenticationData = ArrayUtils.EMPTY_BYTE_ARRAY; - long sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED; - int receiveMaxPublishes = MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED; - int maxPacketSize = MqttProperties.MAXIMUM_MESSAGE_SIZE_UNDEFINED; - int topicAliasMaxValue = MqttProperties.TOPIC_ALIAS_MAXIMUM_UNDEFINED; + long sessionExpiryInterval = MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET; + int receiveMaxPublishes = MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET; + int maxPacketSize = MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET; + int topicAliasMaxValue = MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET; boolean requestResponseInformation = false; boolean requestProblemInformation = false; diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessage.java index a65651b7..64eb9618 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessage.java @@ -12,6 +12,7 @@ import lombok.AccessLevel; import lombok.RequiredArgsConstructor; import lombok.experimental.FieldDefaults; +import org.jspecify.annotations.Nullable; /** * Authentication exchange. @@ -53,14 +54,16 @@ public class AuthenticationMqtt5OutMessage extends MqttOutMessage { */ MqttMessageProperty.USER_PROPERTY); - Array userProperties; - AuthenticateReasonCode reasonCode; + @Nullable String reason; + @Nullable String authenticateMethod; - byte[] authenticateData; + byte @Nullable [] authenticateData; + + Array userProperties; @Override protected byte messageType() { @@ -70,7 +73,7 @@ protected byte messageType() { @Override protected void writeVariableHeader(MqttConnection connection, ByteBuffer buffer) { // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901219 - writeByte(buffer, reasonCode.value()); + writeByte(buffer, reasonCode.code()); } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessage.java index 3627af63..92a18b8d 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessage.java @@ -56,6 +56,6 @@ protected void writeVariableHeader(MqttConnection connection, ByteBuffer buffer) } protected byte reasonCodeValue() { - return reasonCode.mqtt311(); + return (byte) reasonCode.mqtt311(); } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessage.java index cb282336..f53180a7 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessage.java @@ -13,6 +13,7 @@ import javasabr.rlib.collections.array.Array; import lombok.AccessLevel; import lombok.experimental.FieldDefaults; +import org.jspecify.annotations.Nullable; /** * Connect acknowledgment. @@ -230,14 +231,18 @@ public class ConnectAckMqtt5OutMessage extends ConnectAckMqtt311OutMessage { String clientId; + @Nullable + String requestedClientId; + @Nullable String responseInformation; + @Nullable String reason; + @Nullable String serverReference; - + @Nullable String authenticationMethod; - byte[] authenticationData; + byte @Nullable [] authenticationData; - String requestedClientId; long requestedSessionExpiryInterval; int requestedKeepAlive; int requestedReceiveMax; @@ -248,15 +253,15 @@ public ConnectAckMqtt5OutMessage( ConnectAckReasonCode reasonCode, boolean sessionPresent, String clientId, - String requestedClientId, + @Nullable String requestedClientId, long requestedSessionExpiryInterval, int requestedKeepAlive, int requestedReceiveMaxPublishes, - String reason, - String serverReference, - String responseInformation, - String authenticationMethod, - byte[] authenticationData, + @Nullable String reason, + @Nullable String serverReference, + @Nullable String responseInformation, + @Nullable String authenticationMethod, + byte @Nullable [] authenticationData, Array userProperties) { super(reasonCode, sessionPresent); this.clientId = clientId; @@ -279,7 +284,7 @@ public int expectedLength(MqttConnection connection) { @Override protected byte reasonCodeValue() { - return reasonCode.mqtt5(); + return (byte) reasonCode.mqtt5(); } @Override @@ -312,8 +317,14 @@ protected void writeProperties(MqttConnection connection, ByteBuffer buffer) { MqttMessageProperty.SESSION_EXPIRY_INTERVAL, connectionConfig.sessionExpiryInterval(), requestedSessionExpiryInterval); - writeProperty(buffer, MqttMessageProperty.ASSIGNED_CLIENT_IDENTIFIER, clientId, requestedClientId); - writeProperty(buffer, MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, connectionConfig.receiveMaxPublishes(), requestedReceiveMax); + if (requestedClientId != null) { + writeProperty(buffer, MqttMessageProperty.ASSIGNED_CLIENT_IDENTIFIER, clientId, requestedClientId); + } + writeProperty( + buffer, + MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, + connectionConfig.receiveMaxPublishes(), + requestedReceiveMax); writeProperty( buffer, MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, @@ -339,6 +350,10 @@ protected void writeProperties(MqttConnection connection, ByteBuffer buffer) { MqttMessageProperty.SHARED_SUBSCRIPTION_AVAILABLE, connectionConfig.sharedSubscriptionAvailable(), MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT); - writeProperty(buffer, MqttMessageProperty.SERVER_KEEP_ALIVE, connectionConfig.keepAlive(), requestedKeepAlive); + writeProperty( + buffer, + MqttMessageProperty.SERVER_KEEP_ALIVE, + connectionConfig.keepAlive(), + requestedKeepAlive); } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessage.java index 1277c38f..aece8567 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessage.java @@ -207,10 +207,10 @@ public ConnectMqtt5OutMessage(String clientId, int keepAlive) { Array.empty(StringPair.class), StringUtils.EMPTY, ArrayUtils.EMPTY_BYTE_ARRAY, - MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED, - MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED, - MqttProperties.MAXIMUM_MESSAGE_SIZE_UNDEFINED, - MqttProperties.TOPIC_ALIAS_MAXIMUM_UNDEFINED, + MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET, + MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET, + MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET, + MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET, false, false); } @@ -282,22 +282,22 @@ protected void writeProperties(MqttConnection connection, ByteBuffer buffer) { writeNotEmptyProperty(buffer, MqttMessageProperty.AUTHENTICATION_DATA, authenticationData); writeProperty(buffer, MqttMessageProperty.REQUEST_RESPONSE_INFORMATION, requestResponseInformation, false); writeProperty(buffer, MqttMessageProperty.REQUEST_PROBLEM_INFORMATION, requestProblemInformation, false); - writeProperty(buffer, MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, receiveMax, MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED); + writeProperty(buffer, MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, receiveMax, MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET); writeProperty( buffer, MqttMessageProperty.TOPIC_ALIAS_MAXIMUM, topicAliasMaximum, - MqttProperties.TOPIC_ALIAS_MAXIMUM_UNDEFINED); + MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET); writeProperty( buffer, MqttMessageProperty.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval, - MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED); + MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET); writeProperty( buffer, MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, maximumPacketSize, - MqttProperties.MAXIMUM_MESSAGE_SIZE_UNDEFINED); + MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET); } protected void writeWillProperties(ByteBuffer buffer) {} diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt5OutMessage.java index a09612d7..2390bd48 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt5OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt5OutMessage.java @@ -84,7 +84,7 @@ protected void writeProperties(MqttConnection connection, ByteBuffer buffer) { writeStringPairProperties(buffer, MqttMessageProperty.USER_PROPERTY, userProperties); writeNotEmptyProperty(buffer, MqttMessageProperty.REASON_STRING, reason); writeNotEmptyProperty(buffer, MqttMessageProperty.SERVER_REFERENCE, serverReference); - if (sessionExpiryInterval != MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED) { + if (sessionExpiryInterval != MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET) { writeProperty( buffer, MqttMessageProperty.SESSION_EXPIRY_INTERVAL, diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/MqttOutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/MqttOutMessage.java index f8da16bd..cbb0fcb8 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/MqttOutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/MqttOutMessage.java @@ -127,7 +127,6 @@ public void writeNotEmptyProperty(ByteBuffer buffer, MqttMessageProperty propert } } - public void writeNotEmptyProperty(ByteBuffer buffer, MqttMessageProperty property, byte @Nullable [] value) { if (value != null && value.length > 0) { writeProperty(buffer, property, value); diff --git a/network/src/main/java/javasabr/mqtt/network/util/MqttDataUtils.java b/network/src/main/java/javasabr/mqtt/network/util/MqttDataUtils.java index 7a59d3f2..5364c546 100644 --- a/network/src/main/java/javasabr/mqtt/network/util/MqttDataUtils.java +++ b/network/src/main/java/javasabr/mqtt/network/util/MqttDataUtils.java @@ -90,4 +90,8 @@ public static String toUnsignedBinary(byte value) { } return "0b" + binary.substring(0, 4) + "_" + binary.substring(4); } + + public static boolean isValidBoolean(long value) { + return value == 0 || value == 1; + } } diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/in/AuthenticationMqttInMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/in/AuthenticationMqttInMessageTest.groovy index b6fb96a7..eebf2f0d 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/in/AuthenticationMqttInMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/in/AuthenticationMqttInMessageTest.groovy @@ -1,12 +1,14 @@ package javasabr.mqtt.network.message.in import javasabr.mqtt.model.MqttMessageProperty +import javasabr.mqtt.model.exception.MalformedProtocolMqttException +import javasabr.mqtt.model.message.MqttMessageType import javasabr.mqtt.model.reason.code.AuthenticateReasonCode import javasabr.rlib.common.util.BufferUtils class AuthenticationMqttInMessageTest extends BaseMqttInMessageTest { - def "should read packet correctly as mqtt 5.0"() { + def "should read message correctly as MQTT 5.0"() { given: def propertiesBuffer = BufferUtils.prepareBuffer(512) { it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod) @@ -15,59 +17,148 @@ class AuthenticationMqttInMessageTest extends BaseMqttInMessageTest { it.putProperty(MqttMessageProperty.USER_PROPERTY, userProperties) } def dataBuffer = BufferUtils.prepareBuffer(512) { - it.put(AuthenticateReasonCode.SUCCESS.value) + it.put(AuthenticateReasonCode.SUCCESS) it.putMbi(propertiesBuffer.limit()) it.put(propertiesBuffer) } when: - def packet = new AuthenticationMqttInMessage(0b1111_0000 as byte) - def result = packet.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + def inMessage = new AuthenticationMqttInMessage(AuthenticationMqttInMessage.MESSAGE_FLAGS) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: result - packet.reasonCode == AuthenticateReasonCode.SUCCESS - packet.authenticationMethod == authMethod - packet.authenticationData == authData - packet.reason == reasonString - packet.userProperties() == userProperties + inMessage.reasonCode() == AuthenticateReasonCode.SUCCESS + inMessage.authenticationMethod() == authMethod + inMessage.authenticationData() == authData + inMessage.reason() == reasonString + inMessage.userProperties() == userProperties when: - propertiesBuffer = BufferUtils.prepareBuffer(512) { + def propertiesBuffer2 = BufferUtils.prepareBuffer(512) { it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod) it.putProperty(MqttMessageProperty.REASON_STRING, reasonString) it.putProperty(MqttMessageProperty.USER_PROPERTY, userProperties) it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData) } - dataBuffer = BufferUtils.prepareBuffer(512) { - it.put(AuthenticateReasonCode.CONTINUE_AUTHENTICATION.value) - it.putMbi(propertiesBuffer.limit()) - it.put(propertiesBuffer) + def dataBuffer2 = BufferUtils.prepareBuffer(512) { + it.put(AuthenticateReasonCode.CONTINUE_AUTHENTICATION) + it.putMbi(propertiesBuffer2.limit()) + it.put(propertiesBuffer2) } - packet = new AuthenticationMqttInMessage(0b1111_0000 as byte) - result = packet.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + def inMessage2 = new AuthenticationMqttInMessage(AuthenticationMqttInMessage.MESSAGE_FLAGS) + def result2 = inMessage2.read(defaultMqtt5Connection, dataBuffer2, dataBuffer2.limit()) then: - result - packet.reasonCode == AuthenticateReasonCode.CONTINUE_AUTHENTICATION - packet.authenticationMethod == authMethod - packet.authenticationData == authData - packet.reason == reasonString - packet.userProperties() == userProperties + result2 + inMessage2.reasonCode() == AuthenticateReasonCode.CONTINUE_AUTHENTICATION + inMessage2.authenticationMethod() == authMethod + inMessage2.authenticationData() == authData + inMessage2.reason() == reasonString + inMessage2.userProperties() == userProperties when: - propertiesBuffer = BufferUtils.prepareBuffer(512) { + def propertiesBuffer3 = BufferUtils.prepareBuffer(512) { it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod) it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData) } - dataBuffer = BufferUtils.prepareBuffer(512) { - it.put(AuthenticateReasonCode.CONTINUE_AUTHENTICATION.value) + def dataBuffer3 = BufferUtils.prepareBuffer(512) { + it.put(AuthenticateReasonCode.CONTINUE_AUTHENTICATION) + it.putMbi(propertiesBuffer3.limit()) + it.put(propertiesBuffer3) + } + def inMessage3 = new AuthenticationMqttInMessage(AuthenticationMqttInMessage.MESSAGE_FLAGS) + def result3 = inMessage3.read(defaultMqtt5Connection, dataBuffer3, dataBuffer3.limit()) + then: + result3 + inMessage3.reasonCode() == AuthenticateReasonCode.CONTINUE_AUTHENTICATION + inMessage3.authenticationMethod() == authMethod + inMessage3.authenticationData() == authData + inMessage3.reason() == null + inMessage3.userProperties() == MqttInMessage.EMPTY_USER_PROPERTIES + when: + def dataBuffer4 = BufferUtils.prepareBuffer(512) { + it.put(AuthenticateReasonCode.SUCCESS) + it.putMbi(0) + } + def inMessage4 = new AuthenticationMqttInMessage(AuthenticationMqttInMessage.MESSAGE_FLAGS) + def result4 = inMessage4.read(defaultMqtt5Connection, dataBuffer3, dataBuffer3.limit()) + then: + result4 + inMessage4.reasonCode() == AuthenticateReasonCode.SUCCESS + inMessage4.authenticationMethod() == null + inMessage4.authenticationData() == null + inMessage4.reason() == null + inMessage4.userProperties() == MqttInMessage.EMPTY_USER_PROPERTIES + } + + def "should not allow to put auth method 2 times"() { + given: + def propertiesBuffer = BufferUtils.prepareBuffer(512) { + it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, "method1") + it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, "method2") + } + def dataBuffer = BufferUtils.prepareBuffer(512) { + it.put(AuthenticateReasonCode.SUCCESS) it.putMbi(propertiesBuffer.limit()) it.put(propertiesBuffer) } - packet = new AuthenticationMqttInMessage(0b1111_0000 as byte) - result = packet.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + when: + def inMessage = new AuthenticationMqttInMessage(AuthenticationMqttInMessage.MESSAGE_FLAGS) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: - result - packet.reasonCode == AuthenticateReasonCode.CONTINUE_AUTHENTICATION - packet.authenticationMethod == authMethod - packet.authenticationData == authData - packet.reason == "" - packet.userProperties() == MqttInMessage.EMPTY_USER_PROPERTIES + !result + inMessage.exception() instanceof MalformedProtocolMqttException + inMessage.exception().message == "Property:[$MqttMessageProperty.AUTHENTICATION_METHOD] is already presented in message:[$MqttMessageType.AUTHENTICATION]" + } + + def "should not allow to put auth data 2 times"() { + given: + def propertiesBuffer = BufferUtils.prepareBuffer(512) { + it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, "data1".bytes) + it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, "data2".bytes) + } + def dataBuffer = BufferUtils.prepareBuffer(512) { + it.put(AuthenticateReasonCode.SUCCESS) + it.putMbi(propertiesBuffer.limit()) + it.put(propertiesBuffer) + } + when: + def inMessage = new AuthenticationMqttInMessage(AuthenticationMqttInMessage.MESSAGE_FLAGS) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + then: + !result + inMessage.exception() instanceof MalformedProtocolMqttException + inMessage.exception().message == "Property:[$MqttMessageProperty.AUTHENTICATION_DATA] is already presented in message:[$MqttMessageType.AUTHENTICATION]" + } + + def "should not allow to put reason 2 times"() { + given: + def propertiesBuffer = BufferUtils.prepareBuffer(512) { + it.putProperty(MqttMessageProperty.REASON_STRING, "reason1") + it.putProperty(MqttMessageProperty.REASON_STRING, "reason2") + } + def dataBuffer = BufferUtils.prepareBuffer(512) { + it.put(AuthenticateReasonCode.SUCCESS) + it.putMbi(propertiesBuffer.limit()) + it.put(propertiesBuffer) + } + when: + def inMessage = new AuthenticationMqttInMessage(AuthenticationMqttInMessage.MESSAGE_FLAGS) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + then: + !result + inMessage.exception() instanceof MalformedProtocolMqttException + inMessage.exception().message == "Property:[$MqttMessageProperty.REASON_STRING] is already presented in message:[$MqttMessageType.AUTHENTICATION]" + } + + def "should not allow invalid message flags"() { + given: + def dataBuffer = BufferUtils.prepareBuffer(512) { + it.put(AuthenticateReasonCode.SUCCESS) + it.putMbi(0) + } + when: + def inMessage = new AuthenticationMqttInMessage(0b0101_0101 as byte) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + then: + !result + inMessage.exception() instanceof MalformedProtocolMqttException + inMessage.exception().message == "Unexpected message flags:[0b0101_0101] in message:[$MqttMessageType.AUTHENTICATION]" } } diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectAckMqttInMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectAckMqttInMessageTest.groovy index 1434bc43..fff76d99 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectAckMqttInMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectAckMqttInMessageTest.groovy @@ -2,45 +2,49 @@ package javasabr.mqtt.network.message.in import javasabr.mqtt.model.MqttMessageProperty import javasabr.mqtt.model.MqttProperties +import javasabr.mqtt.model.MqttProtocolErrors import javasabr.mqtt.model.QoS +import javasabr.mqtt.model.exception.MalformedProtocolMqttException +import javasabr.mqtt.model.message.MqttMessageType import javasabr.mqtt.model.reason.code.ConnectAckReasonCode -import javasabr.rlib.common.util.ArrayUtils +import javasabr.mqtt.model.reason.code.PublishAckReasonCode import javasabr.rlib.common.util.BufferUtils +import javasabr.rlib.common.util.NumberUtils class ConnectAckMqttInMessageTest extends BaseMqttInMessageTest { - def "should read packet correctly as mqtt 3.1.1"() { + def "should read message correctly as MQTT 3.1.1"() { given: def dataBuffer = BufferUtils.prepareBuffer(512) { it.putBoolean(sessionPresent) - it.put(ConnectAckReasonCode.NOT_AUTHORIZED.mqtt311) + it.putByte(ConnectAckReasonCode.NOT_AUTHORIZED.mqtt311()) } when: - def packet = new ConnectAckMqttInMessage(0b0010_0000 as byte) - def result = packet.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit()) + def inMessage = new ConnectAckMqttInMessage(ConnectAckMqttInMessage.MESSAGE_FLAGS) + def result = inMessage.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit()) then: result - packet.reasonCode == ConnectAckReasonCode.NOT_AUTHORIZED - packet.sessionPresent == sessionPresent - packet.serverReference == "" - packet.reason == "" - packet.assignedClientId == "" - packet.authenticationData == ArrayUtils.EMPTY_BYTE_ARRAY - packet.authenticationMethod == "" - packet.maximumQos == QoS.EXACTLY_ONCE - packet.retainAvailable == MqttProperties.RETAIN_AVAILABLE_DEFAULT - packet.sharedSubscriptionAvailable == MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT - packet.wildcardSubscriptionAvailable == MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT - packet.subscriptionIdAvailable == MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT - packet.responseInformation == "" - packet.maxMessageSize == MqttProperties.MAXIMUM_MESSAGE_SIZE_UNDEFINED - packet.serverKeepAlive == MqttProperties.SERVER_KEEP_ALIVE_UNDEFINED - packet.sessionExpiryInterval == MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED - packet.topicAliasMaxValue == MqttProperties.TOPIC_ALIAS_MAXIMUM_UNDEFINED - packet.receiveMaxPublishes == MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED + inMessage.reasonCode() == ConnectAckReasonCode.NOT_AUTHORIZED + inMessage.sessionPresent() == sessionPresent + inMessage.serverReference() == null + inMessage.reason() == null + inMessage.assignedClientId() == null + inMessage.authenticationData() == null + inMessage.authenticationMethod() == null + inMessage.responseInformation() == null + inMessage.maxQos() == null + inMessage.retainAvailable() == MqttProperties.RETAIN_AVAILABLE_IS_NOT_SET + inMessage.sharedSubscriptionAvailable() == MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE_IS_NOT_SET + inMessage.wildcardSubscriptionAvailable() == MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_IS_NOT_SET + inMessage.subscriptionIdAvailable() == MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_IS_NOT_SET + inMessage.maxMessageSize() == MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET + inMessage.serverKeepAlive() == MqttProperties.SERVER_KEEP_ALIVE_IS_NOT_SET + inMessage.sessionExpiryInterval() == MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET + inMessage.topicAliasMaxValue() == MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET + inMessage.receiveMaxPublishes() == MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET } - def "should read packet correctly as mqtt 5.0"() { + def "should read message correctly as MQTT 5.0"() { given: def propertiesBuffer = BufferUtils.prepareBuffer(512) { it.putProperty(MqttMessageProperty.REASON_STRING, reasonString) @@ -48,7 +52,7 @@ class ConnectAckMqttInMessageTest extends BaseMqttInMessageTest { it.putProperty(MqttMessageProperty.ASSIGNED_CLIENT_IDENTIFIER, mqtt311ClientId) it.putProperty(MqttMessageProperty.AUTHENTICATION_DATA, authData) it.putProperty(MqttMessageProperty.AUTHENTICATION_METHOD, authMethod) - it.putProperty(MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, maxPacketSize) + it.putProperty(MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, maxMessageSize) it.putProperty(MqttMessageProperty.MAXIMUM_QOS, QoS.AT_LEAST_ONCE.ordinal()) it.putProperty(MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, receiveMaxPublishes) it.putProperty(MqttMessageProperty.RETAIN_AVAILABLE, retainAvailable) @@ -62,53 +66,141 @@ class ConnectAckMqttInMessageTest extends BaseMqttInMessageTest { } def dataBuffer = BufferUtils.prepareBuffer(512) { it.putBoolean(sessionPresent) - it.put(ConnectAckReasonCode.PAYLOAD_FORMAT_INVALID.mqtt5) + it.putByte(ConnectAckReasonCode.PAYLOAD_FORMAT_INVALID.mqtt5()) it.putMbi(propertiesBuffer.limit()) it.put(propertiesBuffer) } when: - def packet = new ConnectAckMqttInMessage(0b0010_0000 as byte) - def result = packet.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + def inMessage = new ConnectAckMqttInMessage(ConnectAckMqttInMessage.MESSAGE_FLAGS) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: result - packet.reasonCode == ConnectAckReasonCode.PAYLOAD_FORMAT_INVALID - packet.sessionPresent == sessionPresent - packet.serverReference == serverReference - packet.reason == reasonString - packet.assignedClientId == mqtt311ClientId - packet.authenticationData == authData - packet.authenticationMethod == authMethod - packet.maxMessageSize == maxPacketSize - packet.maximumQos == QoS.AT_LEAST_ONCE - packet.receiveMaxPublishes == receiveMaxPublishes - packet.retainAvailable == retainAvailable - packet.responseInformation == responseInformation - packet.serverKeepAlive == serverKeepAlive - packet.sessionExpiryInterval == sessionExpiryInterval - packet.sharedSubscriptionAvailable == sharedSubscriptionAvailable - packet.wildcardSubscriptionAvailable == wildcardSubscriptionAvailable - packet.subscriptionIdAvailable == subscriptionIdAvailable - packet.topicAliasMaxValue == topicAliasMaxValue - + inMessage.reasonCode() == ConnectAckReasonCode.PAYLOAD_FORMAT_INVALID + inMessage.sessionPresent() == sessionPresent + inMessage.serverReference() == serverReference + inMessage.reason() == reasonString + inMessage.assignedClientId() == mqtt311ClientId + inMessage.authenticationData() == authData + inMessage.authenticationMethod() == authMethod + inMessage.maxMessageSize() == maxMessageSize + inMessage.maxQos() == QoS.AT_LEAST_ONCE + inMessage.receiveMaxPublishes() == receiveMaxPublishes + inMessage.responseInformation() == responseInformation + inMessage.serverKeepAlive() == serverKeepAlive + inMessage.sessionExpiryInterval() == sessionExpiryInterval + NumberUtils.toBoolean(inMessage.sharedSubscriptionAvailable()) == sharedSubscriptionAvailable + NumberUtils.toBoolean(inMessage.wildcardSubscriptionAvailable()) == wildcardSubscriptionAvailable + NumberUtils.toBoolean(inMessage.subscriptionIdAvailable()) == subscriptionIdAvailable + NumberUtils.toBoolean(inMessage.retainAvailable()) == retainAvailable when: - propertiesBuffer = BufferUtils.prepareBuffer(512) { + def propertiesBuffer2 = BufferUtils.prepareBuffer(512) { it.putProperty(MqttMessageProperty.SHARED_SUBSCRIPTION_AVAILABLE, sharedSubscriptionAvailable) it.putProperty(MqttMessageProperty.WILDCARD_SUBSCRIPTION_AVAILABLE, wildcardSubscriptionAvailable) it.putProperty(MqttMessageProperty.SUBSCRIPTION_IDENTIFIER_AVAILABLE, subscriptionIdAvailable) } - dataBuffer = BufferUtils.prepareBuffer(512) { + def dataBuffer2 = BufferUtils.prepareBuffer(512) { it.putBoolean(sessionPresent) - it.put(ConnectAckReasonCode.PACKET_TOO_LARGE.mqtt5) + it.putByte(ConnectAckReasonCode.PACKET_TOO_LARGE.mqtt5()) + it.putMbi(propertiesBuffer2.limit()) + it.put(propertiesBuffer2) + } + def inMessage2 = new ConnectAckMqttInMessage(ConnectAckMqttInMessage.MESSAGE_FLAGS) + def result2 = inMessage2.read(defaultMqtt5Connection, dataBuffer2, dataBuffer2.limit()) + then: + result2 + inMessage2.reasonCode() == ConnectAckReasonCode.PACKET_TOO_LARGE + NumberUtils.toBoolean(inMessage2.sharedSubscriptionAvailable()) == sharedSubscriptionAvailable + NumberUtils.toBoolean(inMessage2.wildcardSubscriptionAvailable()) == wildcardSubscriptionAvailable + NumberUtils.toBoolean(inMessage2.subscriptionIdAvailable()) == subscriptionIdAvailable + } + + def "should not allow duplicated properties in message"(MqttMessageProperty property, Object value) { + given: + def propertiesBuffer = BufferUtils.prepareBuffer(512) { + it.putProperty(property, value) + it.putProperty(property, value) + } + def dataBuffer = BufferUtils.prepareBuffer(512) { + it.putBoolean(false) + it.putByte(ConnectAckReasonCode.SUCCESS.mqtt5()) it.putMbi(propertiesBuffer.limit()) it.put(propertiesBuffer) } - packet = new ConnectAckMqttInMessage(0b0010_0000 as byte) - result = packet.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + when: + def inMessage = new ConnectAckMqttInMessage(ConnectAckMqttInMessage.MESSAGE_FLAGS) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: - result - packet.reasonCode == ConnectAckReasonCode.PACKET_TOO_LARGE - packet.sharedSubscriptionAvailable == sharedSubscriptionAvailable - packet.wildcardSubscriptionAvailable == wildcardSubscriptionAvailable - packet.subscriptionIdAvailable == subscriptionIdAvailable + !result + inMessage.exception() instanceof MalformedProtocolMqttException + inMessage.exception().message == "Property:[$property] is already presented in message:[$MqttMessageType.CONNECT_ACK]" + where: + property | value + MqttMessageProperty.AUTHENTICATION_DATA | authData + MqttMessageProperty.ASSIGNED_CLIENT_IDENTIFIER | mqtt5ClientId + MqttMessageProperty.REASON_STRING | reasonString + MqttMessageProperty.RESPONSE_INFORMATION | responseInformation + MqttMessageProperty.AUTHENTICATION_METHOD | authMethod + MqttMessageProperty.SERVER_REFERENCE | serverReference + MqttMessageProperty.WILDCARD_SUBSCRIPTION_AVAILABLE | wildcardSubscriptionAvailable + MqttMessageProperty.SHARED_SUBSCRIPTION_AVAILABLE | sharedSubscriptionAvailable + MqttMessageProperty.SUBSCRIPTION_IDENTIFIER_AVAILABLE | subscriptionIdAvailable + MqttMessageProperty.RETAIN_AVAILABLE | retainAvailable + MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES | receiveMaxPublishes + MqttMessageProperty.MAXIMUM_QOS | QoS.EXACTLY_ONCE.level() + MqttMessageProperty.SERVER_KEEP_ALIVE | serverKeepAlive + MqttMessageProperty.TOPIC_ALIAS_MAXIMUM | topicAliasMaxValue + MqttMessageProperty.SESSION_EXPIRY_INTERVAL | sessionExpiryInterval + MqttMessageProperty.MAXIMUM_MESSAGE_SIZE | maxMessageSize + } + + def "should validate invalid properties in message"(MqttMessageProperty property, Object value, String expectedMessage) { + given: + def propertiesBuffer = BufferUtils.prepareBuffer(512) { + it.putProperty(property, value) + } + def dataBuffer = BufferUtils.prepareBuffer(512) { + it.putBoolean(false) + it.putByte(ConnectAckReasonCode.SUCCESS.mqtt5()) + it.putMbi(propertiesBuffer.limit()) + it.put(propertiesBuffer) + } + when: + def inMessage = new ConnectAckMqttInMessage(ConnectAckMqttInMessage.MESSAGE_FLAGS) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + then: + !result + inMessage.exception() instanceof MalformedProtocolMqttException + inMessage.exception().message == expectedMessage + where: + property | value | expectedMessage + MqttMessageProperty.WILDCARD_SUBSCRIPTION_AVAILABLE | 2 | MqttProtocolErrors.PROVIDED_INVALID_WILDCARD_SUBSCRIPTION_AVAILABLE + MqttMessageProperty.WILDCARD_SUBSCRIPTION_AVAILABLE | -1 | MqttProtocolErrors.PROVIDED_INVALID_WILDCARD_SUBSCRIPTION_AVAILABLE + MqttMessageProperty.SHARED_SUBSCRIPTION_AVAILABLE | 3 | MqttProtocolErrors.PROVIDED_INVALID_SHARED_SUBSCRIPTION_AVAILABLE + MqttMessageProperty.SHARED_SUBSCRIPTION_AVAILABLE | -2 | MqttProtocolErrors.PROVIDED_INVALID_SHARED_SUBSCRIPTION_AVAILABLE + MqttMessageProperty.SUBSCRIPTION_IDENTIFIER_AVAILABLE | 4 | MqttProtocolErrors.PROVIDED_INVALID_SUBSCRIPTION_IDENTIFIERS_AVAILABLE + MqttMessageProperty.SUBSCRIPTION_IDENTIFIER_AVAILABLE | -3 | MqttProtocolErrors.PROVIDED_INVALID_SUBSCRIPTION_IDENTIFIERS_AVAILABLE + MqttMessageProperty.RETAIN_AVAILABLE | 5 | MqttProtocolErrors.PROVIDED_INVALID_RETAIN_AVAILABLE + MqttMessageProperty.RETAIN_AVAILABLE | -4 | MqttProtocolErrors.PROVIDED_INVALID_RETAIN_AVAILABLE + MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES | 0 | MqttProtocolErrors.PROVIDED_INVALID_RECEIVED_MAX_PUBLISHES + MqttMessageProperty.MAXIMUM_QOS | 3 | MqttProtocolErrors.PROVIDED_INVALID_MAX_QOS + MqttMessageProperty.MAXIMUM_QOS | -1 | MqttProtocolErrors.PROVIDED_INVALID_MAX_QOS + MqttMessageProperty.MAXIMUM_MESSAGE_SIZE | 1 | MqttProtocolErrors.PROVIDED_INVALID_MAX_MESSAGE_SIZE + MqttMessageProperty.MAXIMUM_MESSAGE_SIZE | 300 * 1024 * 1024 | MqttProtocolErrors.PROVIDED_INVALID_MAX_MESSAGE_SIZE + } + + def "should not allow invalid message flags"() { + given: + def dataBuffer = BufferUtils.prepareBuffer(512) { + it.putShort(messageId) + it.put(PublishAckReasonCode.SUCCESS) + it.putMbi(0) + } + when: + def inMessage = new ConnectAckMqttInMessage(0b0101_0101 as byte) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) + then: + !result + inMessage.exception() instanceof MalformedProtocolMqttException + inMessage.exception().message == "Unexpected message flags:[0b0101_0101] in message:[$MqttMessageType.CONNECT_ACK]" } } diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectMqttInMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectMqttInMessageTest.groovy index 023cc30e..41597193 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectMqttInMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/in/ConnectMqttInMessageTest.groovy @@ -38,7 +38,7 @@ class ConnectMqttInMessageTest extends BaseMqttInMessageTest { def propertiesBuffer = BufferUtils.prepareBuffer(512) { it.putProperty(MqttMessageProperty.SESSION_EXPIRY_INTERVAL, sessionExpiryInterval) it.putProperty(MqttMessageProperty.RECEIVE_MAXIMUM_PUBLISHES, receiveMaxPublishes) - it.putProperty(MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, maxPacketSize) + it.putProperty(MqttMessageProperty.MAXIMUM_MESSAGE_SIZE, maxMessageSize) it.putProperty(MqttMessageProperty.TOPIC_ALIAS_MAXIMUM, topicAliasMaxValue) it.putProperty(MqttMessageProperty.REQUEST_RESPONSE_INFORMATION, requestResponseInformation ? 1 : 0) it.putProperty(MqttMessageProperty.REQUEST_PROBLEM_INFORMATION, requestProblemInformation ? 1 : 0) @@ -67,7 +67,7 @@ class ConnectMqttInMessageTest extends BaseMqttInMessageTest { packet.authenticationData() == authData packet.clientId() == mqtt311ClientId packet.mqttVersion() == MqttVersion.MQTT_5 - packet.maxPacketSize() == maxPacketSize + packet.maxPacketSize() == maxMessageSize packet.password() == userPassword packet.username() == userName packet.topicAliasMaxValue() == topicAliasMaxValue diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/in/PublishAckMqttInMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/in/PublishAckMqttInMessageTest.groovy index f82a2a00..e7e1f929 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/in/PublishAckMqttInMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/in/PublishAckMqttInMessageTest.groovy @@ -87,15 +87,10 @@ class PublishAckMqttInMessageTest extends BaseMqttInMessageTest { def "should not allow invalid message flags"() { given: - def propertiesBuffer = BufferUtils.prepareBuffer(512) { - it.putProperty(MqttMessageProperty.REASON_STRING, "reason2") - it.putProperty(MqttMessageProperty.REASON_STRING, "reason1") - } def dataBuffer = BufferUtils.prepareBuffer(512) { it.putShort(messageId) it.put(PublishAckReasonCode.SUCCESS) - it.putMbi(propertiesBuffer.limit()) - it.put(propertiesBuffer) + it.putMbi(0) } when: def inMessage = new PublishAckMqttInMessage(0b0101_0101 as byte) diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/in/PublishMqttInMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/in/PublishMqttInMessageTest.groovy index c91ecd2c..abc1f1c7 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/in/PublishMqttInMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/in/PublishMqttInMessageTest.groovy @@ -9,8 +9,6 @@ import javasabr.mqtt.model.message.MqttMessageType import javasabr.rlib.collections.array.IntArray import javasabr.rlib.common.util.BufferUtils -import java.nio.charset.StandardCharsets - class PublishMqttInMessageTest extends BaseMqttInMessageTest { def "should read message correctly as MQTT 3.1.1"() { @@ -103,7 +101,7 @@ class PublishMqttInMessageTest extends BaseMqttInMessageTest { message.payloadFormat() == PayloadFormat.UNDEFINED } - def "should not read invalid message as MQTT 5.0"() { + def "should not allow to send unexpected property"() { given: def propertiesBuffer = BufferUtils.prepareBuffer(512) { it.putProperty(MqttMessageProperty.SERVER_KEEP_ALIVE, 1) @@ -122,77 +120,32 @@ class PublishMqttInMessageTest extends BaseMqttInMessageTest { !successful inMessage.exception() instanceof MalformedProtocolMqttException inMessage.exception().message == "Property:[$MqttMessageProperty.SERVER_KEEP_ALIVE] is not available for message:[$MqttMessageType.PUBLISH]" - when: 'use 2 times topic alias' - def propertiesBuffer2 = BufferUtils.prepareBuffer(512) { - it.putProperty(MqttMessageProperty.TOPIC_ALIAS, 55) - it.putProperty(MqttMessageProperty.TOPIC_ALIAS, 55) - } - def dataBuffer2 = BufferUtils.prepareBuffer(512) { - it.putString(publishTopic.toString()) - it.putShort(messageId) - it.putMbi(propertiesBuffer2.limit()) - it.put(propertiesBuffer2) - it.put(publishPayload) - } - def inMessage2 = new PublishMqttInMessage(0b0110_0011 as byte) - def successful2 = inMessage2.read(defaultMqtt5Connection, dataBuffer2, dataBuffer2.limit()) - then: - !successful2 - inMessage2.exception() instanceof MalformedProtocolMqttException - inMessage2.exception().message == "Property:[$MqttMessageProperty.TOPIC_ALIAS] is already presented in message:[$MqttMessageType.PUBLISH]" - when: 'use 2 times response topic' - def propertiesBuffer3 = BufferUtils.prepareBuffer(512) { - it.putProperty(MqttMessageProperty.RESPONSE_TOPIC, "topic1") - it.putProperty(MqttMessageProperty.RESPONSE_TOPIC, "topic1") - } - def dataBuffer3 = BufferUtils.prepareBuffer(512) { - it.putString(publishTopic.toString()) - it.putShort(messageId) - it.putMbi(propertiesBuffer3.limit()) - it.put(propertiesBuffer3) - it.put(publishPayload) - } - def inMessage3 = new PublishMqttInMessage(0b0110_0011 as byte) - def successful3 = inMessage3.read(defaultMqtt5Connection, dataBuffer3, dataBuffer3.limit()) - then: - !successful3 - inMessage3.exception() instanceof MalformedProtocolMqttException - inMessage3.exception().message == "Property:[$MqttMessageProperty.RESPONSE_TOPIC] is already presented in message:[$MqttMessageType.PUBLISH]" - when: 'use 2 times content type' - def propertiesBuffer4 = BufferUtils.prepareBuffer(512) { - it.putProperty(MqttMessageProperty.CONTENT_TYPE, "json") - it.putProperty(MqttMessageProperty.CONTENT_TYPE, "json") - } - def dataBuffer4 = BufferUtils.prepareBuffer(512) { - it.putString(publishTopic.toString()) - it.putShort(messageId) - it.putMbi(propertiesBuffer4.limit()) - it.put(propertiesBuffer4) - it.put(publishPayload) - } - def inMessage4 = new PublishMqttInMessage(0b0110_0011 as byte) - def successful4 = inMessage4.read(defaultMqtt5Connection, dataBuffer4, dataBuffer4.limit()) - then: - !successful4 - inMessage4.exception() instanceof MalformedProtocolMqttException - inMessage4.exception().message == "Property:[$MqttMessageProperty.CONTENT_TYPE] is already presented in message:[$MqttMessageType.PUBLISH]" - when: 'use 2 times correlation data' - def propertiesBuffer5 = BufferUtils.prepareBuffer(512) { - it.putProperty(MqttMessageProperty.CORRELATION_DATA, "data".getBytes(StandardCharsets.UTF_8)) - it.putProperty(MqttMessageProperty.CORRELATION_DATA, "data".getBytes(StandardCharsets.UTF_8)) + } + + def "should not allow duplicated properties in message"(MqttMessageProperty property, Object value) { + given: + def propertiesBuffer = BufferUtils.prepareBuffer(512) { + it.putProperty(property, value) + it.putProperty(property, value) } - def dataBuffer5 = BufferUtils.prepareBuffer(512) { + def dataBuffer = BufferUtils.prepareBuffer(512) { it.putString(publishTopic.toString()) it.putShort(messageId) - it.putMbi(propertiesBuffer5.limit()) - it.put(propertiesBuffer5) - it.put(publishPayload) + it.putMbi(propertiesBuffer.limit()) + it.put(propertiesBuffer) } - def inMessage5 = new PublishMqttInMessage(0b0110_0011 as byte) - def successful5 = inMessage5.read(defaultMqtt5Connection, dataBuffer5, dataBuffer5.limit()) + when: + def inMessage = new PublishMqttInMessage(0b0110_0011 as byte) + def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: - !successful5 - inMessage5.exception() instanceof MalformedProtocolMqttException - inMessage5.exception().message == "Property:[$MqttMessageProperty.CORRELATION_DATA] is already presented in message:[$MqttMessageType.PUBLISH]" + !result + inMessage.exception() instanceof MalformedProtocolMqttException + inMessage.exception().message == "Property:[$property] is already presented in message:[$MqttMessageType.PUBLISH]" + where: + property | value + MqttMessageProperty.TOPIC_ALIAS | topicAlias + MqttMessageProperty.RESPONSE_TOPIC | responseTopic.rawTopic() + MqttMessageProperty.CONTENT_TYPE | contentType + MqttMessageProperty.CORRELATION_DATA | correlationData } } diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessageTest.groovy index ed750305..8db2f024 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessageTest.groovy @@ -1,31 +1,40 @@ package javasabr.mqtt.network.message.out +import javasabr.mqtt.model.message.MqttMessageType import javasabr.mqtt.model.reason.code.AuthenticateReasonCode import javasabr.mqtt.network.message.in.AuthenticationMqttInMessage import javasabr.rlib.common.util.BufferUtils +import javasabr.rlib.common.util.NumberUtils class AuthenticationMqtt5OutMessageTest extends BaseMqttOutMessageTest { - def "should write packet correctly"() { + def "should write message correctly"() { given: - def packet = new AuthenticationMqtt5OutMessage( - userProperties, + def outMessage = new AuthenticationMqtt5OutMessage( AuthenticateReasonCode.CONTINUE_AUTHENTICATION, reasonString, authMethod, - authData,) + authData, + userProperties) + when: + def typeAndFlags = outMessage.messageTypeAndFlags() + byte type = NumberUtils.getHighByteBits(typeAndFlags); + byte info = NumberUtils.getLowByteBits(typeAndFlags); + then: + MqttMessageType.fromByte(type) == MqttMessageType.AUTHENTICATION + info == AuthenticationMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(defaultMqtt5Connection, it) + outMessage.write(defaultMqtt5Connection, it) } - def reader = new AuthenticationMqttInMessage(0b1111_0000 as byte) + def reader = new AuthenticationMqttInMessage(info) def result = reader.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: result - reader.reasonCode == AuthenticateReasonCode.CONTINUE_AUTHENTICATION - reader.authenticationMethod == authMethod - reader.authenticationData == authData - reader.reason == reasonString + reader.reasonCode() == AuthenticateReasonCode.CONTINUE_AUTHENTICATION + reader.reason() == reasonString + reader.authenticationMethod() == authMethod + reader.authenticationData() == authData reader.userProperties() == userProperties } } diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessageTest.groovy index 5745e55a..a3a27df1 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessageTest.groovy @@ -1,43 +1,52 @@ package javasabr.mqtt.network.message.out import javasabr.mqtt.model.MqttProperties +import javasabr.mqtt.model.message.MqttMessageType import javasabr.mqtt.model.reason.code.ConnectAckReasonCode import javasabr.mqtt.network.message.in.ConnectAckMqttInMessage -import javasabr.rlib.common.util.ArrayUtils +import javasabr.mqtt.network.message.in.PublishAckMqttInMessage import javasabr.rlib.common.util.BufferUtils +import javasabr.rlib.common.util.NumberUtils class ConnectAckMqtt311OutMessageTest extends BaseMqttOutMessageTest { - def "should write packet correctly"() { + def "should write message correctly"() { given: - def packet = new ConnectAckMqtt311OutMessage( + def outMessage = new ConnectAckMqtt311OutMessage( ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD, sessionPresent) + when: + def typeAndFlags = outMessage.messageTypeAndFlags() + byte type = NumberUtils.getHighByteBits(typeAndFlags); + byte info = NumberUtils.getLowByteBits(typeAndFlags); + then: + MqttMessageType.fromByte(type) == MqttMessageType.CONNECT_ACK + info == PublishAckMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(defaultMqtt311Connection, it) + outMessage.write(defaultMqtt311Connection, it) } - def reader = new ConnectAckMqttInMessage(0b0010_0000 as byte) + def reader = new ConnectAckMqttInMessage(info) def result = reader.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit()) then: result reader.reasonCode() == ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD reader.sessionPresent() == sessionPresent - reader.assignedClientId() == "" - reader.reason() == "" + reader.assignedClientId() == null + reader.reason() == null + reader.responseInformation() == null + reader.serverReference() == null + reader.authenticationMethod() == null + reader.authenticationData() == null + reader.wildcardSubscriptionAvailable() == MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_IS_NOT_SET + reader.subscriptionIdAvailable() == MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_IS_NOT_SET + reader.sharedSubscriptionAvailable() == MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE_IS_NOT_SET + reader.retainAvailable() == MqttProperties.RETAIN_AVAILABLE_IS_NOT_SET + reader.topicAliasMaxValue() == MqttProperties.TOPIC_ALIAS_MAXIMUM_IS_NOT_SET + reader.serverKeepAlive() == MqttProperties.SERVER_KEEP_ALIVE_IS_NOT_SET + reader.receiveMaxPublishes() == MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_IS_NOT_SET + reader.sessionExpiryInterval() == MqttProperties.SESSION_EXPIRY_INTERVAL_IS_NOT_SET + reader.maxMessageSize() == MqttProperties.MAXIMUM_MESSAGE_SIZE_IS_NOT_SET reader.userProperties() == MqttOutMessage.EMPTY_USER_PROPERTIES - reader.retainAvailable() == MqttProperties.RETAIN_AVAILABLE_DEFAULT - reader.wildcardSubscriptionAvailable() == MqttProperties.WILDCARD_SUBSCRIPTION_AVAILABLE_DEFAULT - reader.subscriptionIdAvailable() == MqttProperties.SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT - reader.sharedSubscriptionAvailable() == MqttProperties.SHARED_SUBSCRIPTION_AVAILABLE_DEFAULT - reader.responseInformation() == "" - reader.serverReference() == "" - reader.authenticationData() == ArrayUtils.EMPTY_BYTE_ARRAY - reader.authenticationMethod() == "" - reader.topicAliasMaxValue() == MqttProperties.TOPIC_ALIAS_MAXIMUM_UNDEFINED - reader.serverKeepAlive() == MqttProperties.SERVER_KEEP_ALIVE_UNDEFINED - reader.receiveMaxPublishes() == MqttProperties.RECEIVE_MAXIMUM_PUBLISHES_UNDEFINED - reader.sessionExpiryInterval() == MqttProperties.SESSION_EXPIRY_INTERVAL_UNDEFINED - reader.maxMessageSize() == MqttProperties.MAXIMUM_MESSAGE_SIZE_UNDEFINED } } diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessageTest.groovy index c0fed0cf..8d94d698 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectAckMqtt5OutMessageTest.groovy @@ -1,27 +1,37 @@ package javasabr.mqtt.network.message.out + import javasabr.mqtt.model.MqttVersion +import javasabr.mqtt.model.QoS +import javasabr.mqtt.model.message.MqttMessageType import javasabr.mqtt.model.reason.code.ConnectAckReasonCode import javasabr.mqtt.network.message.in.ConnectAckMqttInMessage +import javasabr.mqtt.network.message.in.PublishAckMqttInMessage import javasabr.rlib.common.util.BufferUtils +import javasabr.rlib.common.util.NumberUtils class ConnectAckMqtt5OutMessageTest extends BaseMqttOutMessageTest { def "should write message correctly"() { given: + def serverConnectionConfig = defaultServerConnectionConfig() + .withSubscriptionIdAvailable(false) + .withSharedSubscriptionAvailable(false) + .withWildcardSubscriptionAvailable(false) + .withRetainAvailable(false) def clientConfig = clientConnectionConfig( - defaultServerConnectionConfig(), - maxQos, + serverConnectionConfig, + QoS.EXACTLY_ONCE, MqttVersion.MQTT_5, 240, 250, - maxPacketSize, + maxMessageSize, 300, 30, false, false); def connection = mqttConnection( - defaultServerConnectionConfig(), + serverConnectionConfig, clientConfig, mqtt5ClientId) def requestedClientId = "-1" @@ -31,7 +41,7 @@ class ConnectAckMqtt5OutMessageTest extends BaseMqttOutMessageTest { def outMessage = new ConnectAckMqtt5OutMessage( ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD, sessionPresent, - mqtt311ClientId, + mqtt5ClientId, requestedClientId, requestedSessionExpireInterval, requestedKeepAlive, @@ -42,31 +52,38 @@ class ConnectAckMqtt5OutMessageTest extends BaseMqttOutMessageTest { authMethod, authData, userProperties) + when: + def typeAndFlags = outMessage.messageTypeAndFlags() + byte type = NumberUtils.getHighByteBits(typeAndFlags); + byte info = NumberUtils.getLowByteBits(typeAndFlags); + then: + MqttMessageType.fromByte(type) == MqttMessageType.CONNECT_ACK + info == PublishAckMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(connection, it) } - def inMessage = new ConnectAckMqttInMessage(0b0010_0000 as byte) + def inMessage = new ConnectAckMqttInMessage(info) def result = inMessage.read(connection, dataBuffer, dataBuffer.limit()) then: result inMessage.reasonCode() == ConnectAckReasonCode.BAD_USER_NAME_OR_PASSWORD inMessage.sessionPresent() == sessionPresent - inMessage.retainAvailable() == retainAvailable inMessage.sessionExpiryInterval() == 240 inMessage.receiveMaxPublishes() == 250 - inMessage.maxMessageSize() == maxPacketSize - inMessage.assignedClientId() == mqtt311ClientId + inMessage.maxMessageSize() == maxMessageSize + inMessage.assignedClientId() == mqtt5ClientId inMessage.topicAliasMaxValue() == 300 inMessage.reason() == reasonString inMessage.userProperties() == userProperties - inMessage.wildcardSubscriptionAvailable() == wildcardSubscriptionAvailable - inMessage.subscriptionIdAvailable() == subscriptionIdAvailable - inMessage.sharedSubscriptionAvailable() == sharedSubscriptionAvailable inMessage.serverKeepAlive() == 30 inMessage.responseInformation() == responseInformation inMessage.serverReference() == serverReference inMessage.authenticationData() == authData inMessage.authenticationMethod() == authMethod + !NumberUtils.toBoolean(inMessage.wildcardSubscriptionAvailable()) + !NumberUtils.toBoolean(inMessage.subscriptionIdAvailable()) + !NumberUtils.toBoolean(inMessage.sharedSubscriptionAvailable()) + !NumberUtils.toBoolean(inMessage.retainAvailable()) } } diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessageTest.groovy index e332bbf0..647ed5cd 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/ConnectMqtt5OutMessageTest.groovy @@ -7,9 +7,9 @@ import javasabr.rlib.common.util.BufferUtils class ConnectMqtt5OutMessageTest extends BaseMqttOutMessageTest { - def "should write packet correctly"() { + def "should write message correctly"() { given: - def packet = new ConnectMqtt5OutMessage( + def outMessage = new ConnectMqtt5OutMessage( userName, "", mqtt311ClientId, @@ -24,13 +24,13 @@ class ConnectMqtt5OutMessageTest extends BaseMqttOutMessageTest { authData, sessionExpiryInterval, receiveMaxPublishes, - maxPacketSize, + maxMessageSize, topicAliasMaxValue, requestResponseInformation, requestProblemInformation) when: def dataBuffer = BufferUtils.prepareBuffer(512) { - packet.write(defaultMqtt5Connection, it) + outMessage.write(defaultMqtt5Connection, it) } def reader = new ConnectMqttInMessage(0b0001_0000 as byte) def result = reader.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) @@ -47,7 +47,7 @@ class ConnectMqtt5OutMessageTest extends BaseMqttOutMessageTest { reader.authenticationData() == authData reader.sessionExpiryInterval() == sessionExpiryInterval reader.receiveMaxPublishes() == receiveMaxPublishes - reader.maxPacketSize() == maxPacketSize + reader.maxPacketSize() == maxMessageSize reader.topicAliasMaxValue() == topicAliasMaxValue reader.requestResponseInformation() == requestResponseInformation reader.requestProblemInformation() == requestProblemInformation diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishAckMqtt311OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishAckMqtt311OutMessageTest.groovy index a3d153bc..ad97fc87 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishAckMqtt311OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishAckMqtt311OutMessageTest.groovy @@ -17,6 +17,7 @@ class PublishAckMqtt311OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.PUBLISH_ACK + info == PublishAckMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt311Connection, it) diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishAckMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishAckMqtt5OutMessageTest.groovy index 1f6df33f..39a9083b 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishAckMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishAckMqtt5OutMessageTest.groovy @@ -21,6 +21,7 @@ class PublishAckMqtt5OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.PUBLISH_ACK + info == PublishAckMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt5Connection, it) diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishCompleteMqtt311OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishCompleteMqtt311OutMessageTest.groovy index d3556c7d..ebcb6abd 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishCompleteMqtt311OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishCompleteMqtt311OutMessageTest.groovy @@ -17,11 +17,12 @@ class PublishCompleteMqtt311OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.PUBLISH_COMPLETE + info == PublishCompleteMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt311Connection, it) } - def reader = new PublishCompleteMqttInMessage(0 as byte) + def reader = new PublishCompleteMqttInMessage(info) def result = reader.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit()) then: result diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishCompleteMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishCompleteMqtt5OutMessageTest.groovy index 6a50655b..5eea2268 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishCompleteMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishCompleteMqtt5OutMessageTest.groovy @@ -21,11 +21,12 @@ class PublishCompleteMqtt5OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.PUBLISH_COMPLETE + info == PublishCompleteMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt5Connection, it) } - def reader = new PublishCompleteMqttInMessage(0 as byte) + def reader = new PublishCompleteMqttInMessage(info) def result = reader.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: result diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReceivedMqtt311OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReceivedMqtt311OutMessageTest.groovy index 2464df47..027738f2 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReceivedMqtt311OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReceivedMqtt311OutMessageTest.groovy @@ -18,11 +18,12 @@ class PublishReceivedMqtt311OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.PUBLISH_RECEIVED + info == PublishReceivedMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt311Connection, it) } - def reader = new PublishReceivedMqttInMessage(0 as byte) + def reader = new PublishReceivedMqttInMessage(info) def result = reader.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit()) then: result diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReceivedMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReceivedMqtt5OutMessageTest.groovy index e83d2d07..1e1d316f 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReceivedMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReceivedMqtt5OutMessageTest.groovy @@ -21,11 +21,12 @@ class PublishReceivedMqtt5OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.PUBLISH_RECEIVED + info == PublishReceivedMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt5Connection, it) } - def reader = new PublishReceivedMqttInMessage(0 as byte) + def reader = new PublishReceivedMqttInMessage(info) def result = reader.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: result diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReleaseMqtt311OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReleaseMqtt311OutMessageTest.groovy index acdec541..b606e4ea 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReleaseMqtt311OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReleaseMqtt311OutMessageTest.groovy @@ -17,11 +17,12 @@ class PublishReleaseMqtt311OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.PUBLISH_RELEASE + info == PublishReleaseMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt311Connection, it) } - def reader = new PublishReleaseMqttInMessage(0b0000_0010 as byte) + def reader = new PublishReleaseMqttInMessage(info) def result = reader.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit()) then: result diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReleaseMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReleaseMqtt5OutMessageTest.groovy index edb40e65..23c1973b 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReleaseMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/PublishReleaseMqtt5OutMessageTest.groovy @@ -21,11 +21,12 @@ class PublishReleaseMqtt5OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.PUBLISH_RELEASE + info == PublishReleaseMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt5Connection, it) } - def reader = new PublishReleaseMqttInMessage(0b0000_0010 as byte) + def reader = new PublishReleaseMqttInMessage(info) def result = reader.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: result diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeAckMqtt311OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeAckMqtt311OutMessageTest.groovy index 3790d017..b723a097 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeAckMqtt311OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeAckMqtt311OutMessageTest.groovy @@ -17,6 +17,7 @@ class SubscribeAckMqtt311OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.SUBSCRIBE_ACK + info == SubscribeAckMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt311Connection, it) diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeAckMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeAckMqtt5OutMessageTest.groovy index d8d82bca..e972a311 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeAckMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeAckMqtt5OutMessageTest.groovy @@ -20,6 +20,7 @@ class SubscribeAckMqtt5OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.SUBSCRIBE_ACK + info == SubscribeAckMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt5Connection, it) diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeMqtt311OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeMqtt311OutMessageTest.groovy index 6c2eff24..def889be 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeMqtt311OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeMqtt311OutMessageTest.groovy @@ -32,11 +32,12 @@ class SubscribeMqtt311OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.SUBSCRIBE + info == SubscribeMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt311Connection, it) } - def inMessage = new SubscribeMqttInMessage(0b0000_0010 as byte) + def inMessage = new SubscribeMqttInMessage(info) def result = inMessage.read(defaultMqtt311Connection, dataBuffer, dataBuffer.limit()) then: result diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeMqtt5OutMessageTest.groovy index 018c093a..3f720e36 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/SubscribeMqtt5OutMessageTest.groovy @@ -46,11 +46,12 @@ class SubscribeMqtt5OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.SUBSCRIBE + info == SubscribeMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt5Connection, it) } - def inMessage = new SubscribeMqttInMessage(0b0000_0010 as byte) + def inMessage = new SubscribeMqttInMessage(info) def result = inMessage.read(defaultMqtt5Connection, dataBuffer, dataBuffer.limit()) then: result diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt311OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt311OutMessageTest.groovy index e8abc940..67794210 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt311OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt311OutMessageTest.groovy @@ -17,6 +17,7 @@ class UnsubscribeAckMqtt311OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.UNSUBSCRIBE_ACK + info == UnsubscribeAckMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt311Connection, it) diff --git a/network/src/test/groovy/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt5OutMessageTest.groovy b/network/src/test/groovy/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt5OutMessageTest.groovy index 52ce7e89..fc0840e8 100644 --- a/network/src/test/groovy/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt5OutMessageTest.groovy +++ b/network/src/test/groovy/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt5OutMessageTest.groovy @@ -20,6 +20,7 @@ class UnsubscribeAckMqtt5OutMessageTest extends BaseMqttOutMessageTest { byte info = NumberUtils.getLowByteBits(typeAndFlags); then: MqttMessageType.fromByte(type) == MqttMessageType.UNSUBSCRIBE_ACK + info == UnsubscribeAckMqttInMessage.MESSAGE_FLAGS when: def dataBuffer = BufferUtils.prepareBuffer(512) { outMessage.write(defaultMqtt5Connection, it) diff --git a/network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy b/network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy index 9fbc13d4..0644fc3d 100644 --- a/network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy +++ b/network/src/testFixtures/groovy/javasabr/mqtt/network/NetworkUnitSpecification.groovy @@ -43,7 +43,7 @@ class NetworkUnitSpecification extends UnitSpecification { public static final messageExpiryInterval = 60 public static final topicAlias = 252 public static final receiveMaxPublishes = 10 - public static final maxPacketSize = 1024 + public static final maxMessageSize = 1024 public static final maxStringLength = 256 public static final maxBinarySize = 1024 public static final maxTopicLevels = 10 @@ -127,7 +127,7 @@ class NetworkUnitSpecification extends UnitSpecification { MqttServerConnectionConfig defaultServerConnectionConfig() { return serverConnectionConfig( maxQos, - maxPacketSize, + maxMessageSize, maxStringLength, maxBinarySize, maxTopicLevels, @@ -150,7 +150,7 @@ class NetworkUnitSpecification extends UnitSpecification { MqttVersion.MQTT_3_1_1, sessionExpiryInterval, receiveMaxPublishes, - maxPacketSize, + maxMessageSize, topicAliasMaxValue, keepAlive, false, @@ -164,7 +164,7 @@ class NetworkUnitSpecification extends UnitSpecification { MqttVersion.MQTT_5, sessionExpiryInterval, receiveMaxPublishes, - maxPacketSize, + maxMessageSize, topicAliasMaxValue, keepAlive, false,