From 9d75d21578e0f2b3eed0004e1114cfad6c023024 Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Tue, 19 Nov 2019 08:56:28 +0300 Subject: [PATCH 1/8] [broker-15] working on publishing with QoS 1 --- .../mqtt/broker/config/MqttBrokerConfig.java | 45 +++++++++-- .../packet/out/Mqtt311PacketOutFactory.java | 2 +- .../packet/out/Mqtt5PacketOutFactory.java | 2 +- .../packet/out/MqttPacketOutFactory.java | 5 +- .../packet/in/PublishAckInPacketHandler.java | 15 ++++ .../packet/in/PublishInPacketHandler.java | 9 +-- .../publish/in/AbstractPublishInHandler.java | 18 +++++ .../handler/publish/in/PublishInHandler.java | 13 ++++ .../publish/in/Qos0PublishInHandler.java | 27 +++++++ .../publish/in/Qos1PublishInHandler.java | 39 ++++++++++ .../publish/in/Qos2PublishInHandler.java | 22 ++++++ .../out/AbstractPublishOutHandler.java | 3 + .../publish/out/PublishOutHandler.java | 13 ++++ .../publish/out/Qos0PublishOutHandler.java | 32 ++++++++ .../publish/out/Qos1PublishOutHandler.java | 41 ++++++++++ .../publish/out/Qos2PublishOutHandler.java | 15 ++++ .../broker/model/MqttPropertyConstants.java | 4 + .../com/ss/mqtt/broker/model/MqttSession.java | 28 +++++++ .../com/ss/mqtt/broker/model/Subscriber.java | 3 - .../broker/model/impl/DefaultMqttSession.java | 78 ++++++++++++++++++- .../broker/network/packet/HasPacketId.java | 8 ++ .../network/packet/in/PublishAckInPacket.java | 3 +- .../packet/in/PublishCompleteInPacket.java | 3 +- .../packet/in/PublishReceivedInPacket.java | 3 +- .../packet/in/PublishReleaseInPacket.java | 3 +- .../packet/out/Publish311OutPacket.java | 14 +--- .../network/packet/out/Publish5OutPacket.java | 8 +- .../network/packet/out/PublishOutPacket.java | 24 ++++++ .../broker/service/PacketIdGenerator.java | 6 ++ .../broker/service/PublishingService.java | 14 +--- .../impl/DefaultPacketIdGenerator.java | 29 +++++++ .../impl/DefaultPublishingService.java | 21 +++++ .../service/impl/SimplePublishingService.java | 59 -------------- 33 files changed, 496 insertions(+), 113 deletions(-) create mode 100644 src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/in/AbstractPublishInHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/in/PublishInHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos0PublishInHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos1PublishInHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/in/Qos2PublishInHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/out/AbstractPublishOutHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/out/PublishOutHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos0PublishOutHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos2PublishOutHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/network/packet/HasPacketId.java create mode 100644 src/main/java/com/ss/mqtt/broker/network/packet/out/PublishOutPacket.java create mode 100644 src/main/java/com/ss/mqtt/broker/service/PacketIdGenerator.java create mode 100644 src/main/java/com/ss/mqtt/broker/service/impl/DefaultPacketIdGenerator.java create mode 100644 src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishingService.java delete mode 100644 src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java diff --git a/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java b/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java index 2e21cde1..4970f82f 100644 --- a/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java +++ b/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java @@ -1,13 +1,21 @@ package com.ss.mqtt.broker.config; +import com.ss.mqtt.broker.handler.client.DeviceMqttClientReleaseHandler; +import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler; import com.ss.mqtt.broker.handler.packet.in.*; +import com.ss.mqtt.broker.handler.publish.in.PublishInHandler; +import com.ss.mqtt.broker.handler.publish.in.Qos0PublishInHandler; +import com.ss.mqtt.broker.handler.publish.in.Qos1PublishInHandler; +import com.ss.mqtt.broker.handler.publish.in.Qos2PublishInHandler; +import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler; +import com.ss.mqtt.broker.handler.publish.out.Qos0PublishOutHandler; +import com.ss.mqtt.broker.handler.publish.out.Qos1PublishOutHandler; +import com.ss.mqtt.broker.handler.publish.out.Qos2PublishOutHandler; import com.ss.mqtt.broker.model.MqttPropertyConstants; import com.ss.mqtt.broker.model.QoS; import com.ss.mqtt.broker.network.MqttConnection; -import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler; -import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient; import com.ss.mqtt.broker.network.client.DeviceMqttClient; -import com.ss.mqtt.broker.handler.client.DeviceMqttClientReleaseHandler; +import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.mqtt.broker.service.*; import com.ss.mqtt.broker.service.impl.*; @@ -80,6 +88,11 @@ private interface ChannelFactory extends ); } + @Bean + @NotNull PacketIdGenerator packetIdGenerator() { + return new DefaultPacketIdGenerator(); + } + @Bean PacketInHandler @NotNull [] devicePacketHandlers( @NotNull AuthenticationService authenticationService, @@ -99,6 +112,7 @@ private interface ChannelFactory extends handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService); handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService); handlers[PacketType.DISCONNECT.ordinal()] = new DisconnetInPacketHandler(); + handlers[PacketType.PUBLISH_ACK.ordinal()] = new PublishAckInPacketHandler(); return handlers; } @@ -150,8 +164,29 @@ private interface ChannelFactory extends } @Bean - @NotNull PublishingService publishingService(@NotNull SubscriptionService subscriptionService) { - return new SimplePublishingService(subscriptionService); + @NotNull PublishOutHandler[] publishOutHandlers(@NotNull PacketIdGenerator packetIdGenerator) { + return new PublishOutHandler[] { + new Qos0PublishOutHandler(), + new Qos1PublishOutHandler(packetIdGenerator), + new Qos2PublishOutHandler(), + }; + } + + @Bean + @NotNull PublishInHandler[] publishInHandlers( + @NotNull SubscriptionService subscriptionService, + @NotNull PublishOutHandler[] publishOutHandlers + ) { + return new PublishInHandler[] { + new Qos0PublishInHandler(subscriptionService, publishOutHandlers), + new Qos1PublishInHandler(subscriptionService, publishOutHandlers), + new Qos2PublishInHandler(subscriptionService, publishOutHandlers), + }; + } + + @Bean + @NotNull PublishingService publishingService(@NotNull PublishInHandler[] publishInHandlers) { + return new DefaultPublishingService(publishInHandlers); } @Bean diff --git a/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java b/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java index 95c4718e..3f1bb032 100644 --- a/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java +++ b/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java @@ -30,7 +30,7 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory { } @Override - public @NotNull MqttWritablePacket newPublish( + public @NotNull PublishOutPacket newPublish( @NotNull MqttClient client, int packetId, @NotNull QoS qos, diff --git a/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java b/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java index 2706bc2c..f6bda5f3 100644 --- a/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java +++ b/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java @@ -44,7 +44,7 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory { } @Override - public @NotNull MqttWritablePacket newPublish( + public @NotNull PublishOutPacket newPublish( @NotNull MqttClient client, int packetId, @NotNull QoS qos, diff --git a/src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java b/src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java index 67c321ed..285a4cff 100644 --- a/src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java +++ b/src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java @@ -5,6 +5,7 @@ import com.ss.mqtt.broker.model.reason.code.*; import com.ss.mqtt.broker.network.client.MqttClient; import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket; +import com.ss.mqtt.broker.network.packet.out.PublishOutPacket; import com.ss.rlib.common.util.ArrayUtils; import com.ss.rlib.common.util.StringUtils; import com.ss.rlib.common.util.array.Array; @@ -76,7 +77,7 @@ public abstract class MqttPacketOutFactory { } - public @NotNull MqttWritablePacket newPublish( + public @NotNull PublishOutPacket newPublish( @NotNull MqttClient client, int packetId, @NotNull QoS qos, @@ -101,7 +102,7 @@ public abstract class MqttPacketOutFactory { ); } - public abstract @NotNull MqttWritablePacket newPublish( + public abstract @NotNull PublishOutPacket newPublish( @NotNull MqttClient client, int packetId, @NotNull QoS qos, diff --git a/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java new file mode 100644 index 00000000..419f3ff3 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java @@ -0,0 +1,15 @@ +package com.ss.mqtt.broker.handler.packet.in; + +import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient; +import com.ss.mqtt.broker.network.packet.in.PublishAckInPacket; +import lombok.RequiredArgsConstructor; +import org.jetbrains.annotations.NotNull; + +@RequiredArgsConstructor +public class PublishAckInPacketHandler extends AbstractPacketHandler { + + @Override + protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishAckInPacket packet) { + client.getSession().unregisterPendingPacket(client, packet); + } +} diff --git a/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishInPacketHandler.java b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishInPacketHandler.java index ab1dd3ea..027074a3 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishInPacketHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishInPacketHandler.java @@ -13,13 +13,6 @@ public class PublishInPacketHandler extends AbstractPacketHandler { + + @NotNull PendingCallback EMPTY = (client, feedback) -> true; + + /** + * @return true of pending packet can be removed. + */ + boolean handle(@NotNull MqttClient client, @NotNull T feedback); } @NotNull String getClientId(); @@ -15,4 +31,16 @@ interface UnsafeMqttSession extends MqttSession { * @return the expiration time in ms or -1 if it should not be expired now. */ long getExpirationTime(); + + void registerPendingPublish(@NotNull PublishOutPacket publish); + + void registerPendingPublish( + @NotNull PublishOutPacket publish, + @NotNull MqttSession.PendingCallback callback + ); + + void unregisterPendingPacket( + @NotNull MqttClient client, + @NotNull T feedback + ); } diff --git a/src/main/java/com/ss/mqtt/broker/model/Subscriber.java b/src/main/java/com/ss/mqtt/broker/model/Subscriber.java index ce331a27..0efe2ab6 100644 --- a/src/main/java/com/ss/mqtt/broker/model/Subscriber.java +++ b/src/main/java/com/ss/mqtt/broker/model/Subscriber.java @@ -1,8 +1,5 @@ package com.ss.mqtt.broker.model; -import com.ss.mqtt.broker.model.QoS; -import com.ss.mqtt.broker.model.SubscribeRetainHandling; -import com.ss.mqtt.broker.model.SubscribeTopicFilter; import com.ss.mqtt.broker.network.client.MqttClient; import lombok.EqualsAndHashCode; import lombok.Getter; diff --git a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java index 8cb12669..aadcce3b 100644 --- a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java +++ b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java @@ -1,22 +1,96 @@ package com.ss.mqtt.broker.model.impl; import com.ss.mqtt.broker.model.MqttSession.UnsafeMqttSession; +import com.ss.mqtt.broker.network.client.MqttClient; +import com.ss.mqtt.broker.network.packet.HasPacketId; +import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket; +import com.ss.mqtt.broker.network.packet.out.PublishOutPacket; +import com.ss.rlib.common.util.ClassUtils; +import com.ss.rlib.common.util.array.Array; +import com.ss.rlib.common.util.array.ConcurrentArray; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; import lombok.ToString; +import lombok.extern.log4j.Log4j2; import org.jetbrains.annotations.NotNull; -@ToString -@RequiredArgsConstructor +import java.util.Collection; + +@Log4j2 +@ToString(of = "clientId") public class DefaultMqttSession implements UnsafeMqttSession { + @RequiredArgsConstructor + private static class PendingPublish { + private final PublishOutPacket publish; + private final PendingCallback callback; + private final long registeredTime; + } + private final @NotNull String clientId; + private final @NotNull ConcurrentArray> pendingPublishes; private volatile @Getter @Setter long expirationTime = -1; + public DefaultMqttSession(@NotNull String clientId) { + this.clientId = clientId; + this.pendingPublishes = ConcurrentArray.ofType(PendingPublish.class); + } + @Override public @NotNull String getClientId() { return clientId; } + + @Override + public void registerPendingPublish(@NotNull PublishOutPacket publish) { + pendingPublishes.runInWriteLock(publish, (array, packet) -> + array.add(new PendingPublish<>(packet, PendingCallback.EMPTY, System.currentTimeMillis()))); + } + + @Override + public void registerPendingPublish( + @NotNull PublishOutPacket publish, + @NotNull PendingCallback callback + ) { + // FIXME add new method to array + var stamp = pendingPublishes.writeLock(); + try { + pendingPublishes.add(new PendingPublish<>(publish, callback, System.currentTimeMillis())); + } finally { + pendingPublishes.writeUnlock(stamp); + } + } + + @Override + public void unregisterPendingPacket( + @NotNull MqttClient client, + @NotNull T feedback + ) { + + var packetId = feedback.getPacketId(); + + // FIXME add new method to array + var pendingPublish = pendingPublishes.findAnyInReadLock( + packetId, + (id, pending) -> id == pending.publish.getPacketId() + ); + + if (pendingPublish == null) { + log.warn("Not found pending publish for client {} by received packet {}", clientId, feedback); + return; + } + + var shouldBeRemoved = pendingPublish.callback.handle(client, ClassUtils.unsafeNNCast(feedback)); + + if (shouldBeRemoved) { + pendingPublishes.runInWriteLock(pendingPublish, Array::fastRemove); + } + } + + @Override + public void clear() { + pendingPublishes.runInWriteLock(Collection::clear); + } } diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/HasPacketId.java b/src/main/java/com/ss/mqtt/broker/network/packet/HasPacketId.java new file mode 100644 index 00000000..e39490c9 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/network/packet/HasPacketId.java @@ -0,0 +1,8 @@ +package com.ss.mqtt.broker.network.packet; + +import com.ss.rlib.network.packet.Packet; + +public interface HasPacketId extends Packet { + + int getPacketId(); +} diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java index 72ffbd79..99c7be6b 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.model.PacketProperty; import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode; import com.ss.mqtt.broker.network.MqttConnection; +import com.ss.mqtt.broker.network.packet.HasPacketId; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.rlib.common.util.StringUtils; import lombok.Getter; @@ -17,7 +18,7 @@ * Publish acknowledgment (QoS 1). */ @Getter -public class PublishAckInPacket extends MqttReadablePacket { +public class PublishAckInPacket extends MqttReadablePacket implements HasPacketId { private static final int PACKET_TYPE = PacketType.PUBLISH_ACK.ordinal(); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishCompleteInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishCompleteInPacket.java index 1f0c7cfc..37aa05ea 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishCompleteInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishCompleteInPacket.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.model.PacketProperty; import com.ss.mqtt.broker.model.reason.code.PublishCompletedReasonCode; import com.ss.mqtt.broker.network.MqttConnection; +import com.ss.mqtt.broker.network.packet.HasPacketId; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.rlib.common.util.StringUtils; import lombok.Getter; @@ -17,7 +18,7 @@ * Publish complete (QoS 2 delivery part 3). */ @Getter -public class PublishCompleteInPacket extends MqttReadablePacket { +public class PublishCompleteInPacket extends MqttReadablePacket implements HasPacketId { private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH_COMPLETED.ordinal(); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReceivedInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReceivedInPacket.java index b09aea31..94e45333 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReceivedInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReceivedInPacket.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.model.PacketProperty; import com.ss.mqtt.broker.model.reason.code.PublishReceivedReasonCode; import com.ss.mqtt.broker.network.MqttConnection; +import com.ss.mqtt.broker.network.packet.HasPacketId; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.rlib.common.util.StringUtils; import lombok.Getter; @@ -17,7 +18,7 @@ * Publish received (QoS 2 delivery part 1). */ @Getter -public class PublishReceivedInPacket extends MqttReadablePacket { +public class PublishReceivedInPacket extends MqttReadablePacket implements HasPacketId { private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH_RECEIVED.ordinal(); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReleaseInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReleaseInPacket.java index b7687af1..006afabc 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReleaseInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReleaseInPacket.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.model.PacketProperty; import com.ss.mqtt.broker.model.reason.code.PublishReleaseReasonCode; import com.ss.mqtt.broker.network.MqttConnection; +import com.ss.mqtt.broker.network.packet.HasPacketId; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.rlib.common.util.StringUtils; import lombok.Getter; @@ -17,7 +18,7 @@ * Publish release (QoS 2 delivery part 2). */ @Getter -public class PublishReleaseInPacket extends MqttReadablePacket { +public class PublishReleaseInPacket extends MqttReadablePacket implements HasPacketId { private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH_RELEASED.ordinal(); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish311OutPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish311OutPacket.java index a247d253..2934a40f 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish311OutPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish311OutPacket.java @@ -2,16 +2,12 @@ import com.ss.mqtt.broker.model.QoS; import com.ss.mqtt.broker.network.client.MqttClient; -import com.ss.mqtt.broker.network.packet.PacketType; import org.jetbrains.annotations.NotNull; import java.nio.ByteBuffer; -public class Publish311OutPacket extends MqttWritablePacket { +public class Publish311OutPacket extends PublishOutPacket { - private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH.ordinal(); - - private final int packetId; private final boolean retained; private final boolean duplicate; private final @NotNull QoS qos; @@ -27,11 +23,10 @@ public Publish311OutPacket( @NotNull String topicName, @NotNull byte[] payload ) { - super(client); + super(client, packetId); this.qos = qos; this.retained = retained; this.duplicate = duplicate; - this.packetId = packetId; this.payload = payload; this.topicName = topicName; } @@ -57,11 +52,6 @@ protected byte getPacketFlags() { return info; } - @Override - protected byte getPacketType() { - return PACKET_TYPE; - } - @Override protected void writeVariableHeader(@NotNull ByteBuffer buffer) { // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc384800412 diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java index 6e81bfb8..c5bf7b22 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java @@ -135,7 +135,7 @@ public class Publish5OutPacket extends Publish311OutPacket { ); - private final int topciAlias; + private final int topicAlias; private final boolean stringPayload; private final @NotNull String responseTopic; private final @NotNull byte[] correlationData; @@ -148,7 +148,7 @@ public Publish5OutPacket( boolean retained, boolean duplicate, @NotNull String topicName, - int topciAlias, + int topicAlias, @NotNull byte[] payload, boolean stringPayload, @NotNull String responseTopic, @@ -156,7 +156,7 @@ public Publish5OutPacket( @NotNull Array userProperties ) { super(client, packetId, qos, retained, duplicate, topicName, payload); - this.topciAlias = topciAlias; + this.topicAlias = topicAlias; this.stringPayload = stringPayload; this.responseTopic = responseTopic; this.correlationData = correlationData; @@ -178,7 +178,7 @@ protected void writeProperties(@NotNull ByteBuffer buffer) { 0, MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT ); - writeProperty(buffer, PacketProperty.TOPIC_ALIAS, topciAlias, MqttPropertyConstants.TOPIC_ALIAS_DEFAULT); + writeProperty(buffer, PacketProperty.TOPIC_ALIAS, topicAlias, MqttPropertyConstants.TOPIC_ALIAS_DEFAULT); writeNotEmptyProperty(buffer, PacketProperty.RESPONSE_TOPIC, responseTopic); writeNotEmptyProperty(buffer, PacketProperty.CORRELATION_DATA, correlationData); writeStringPairProperties(buffer, PacketProperty.USER_PROPERTY, userProperties); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/out/PublishOutPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/out/PublishOutPacket.java new file mode 100644 index 00000000..0cf9d127 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/network/packet/out/PublishOutPacket.java @@ -0,0 +1,24 @@ +package com.ss.mqtt.broker.network.packet.out; + +import com.ss.mqtt.broker.network.client.MqttClient; +import com.ss.mqtt.broker.network.packet.HasPacketId; +import com.ss.mqtt.broker.network.packet.PacketType; +import lombok.Getter; +import org.jetbrains.annotations.NotNull; + +public abstract class PublishOutPacket extends MqttWritablePacket implements HasPacketId { + + private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH.ordinal(); + + protected final @Getter int packetId; + + public PublishOutPacket(@NotNull MqttClient client, int packetId) { + super(client); + this.packetId = packetId; + } + + @Override + protected byte getPacketType() { + return PACKET_TYPE; + } +} diff --git a/src/main/java/com/ss/mqtt/broker/service/PacketIdGenerator.java b/src/main/java/com/ss/mqtt/broker/service/PacketIdGenerator.java new file mode 100644 index 00000000..140f1dba --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/service/PacketIdGenerator.java @@ -0,0 +1,6 @@ +package com.ss.mqtt.broker.service; + +public interface PacketIdGenerator { + + int nextPacketId(); +} diff --git a/src/main/java/com/ss/mqtt/broker/service/PublishingService.java b/src/main/java/com/ss/mqtt/broker/service/PublishingService.java index 2a1bca6a..28aa887b 100644 --- a/src/main/java/com/ss/mqtt/broker/service/PublishingService.java +++ b/src/main/java/com/ss/mqtt/broker/service/PublishingService.java @@ -1,20 +1,10 @@ package com.ss.mqtt.broker.service; -import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode; +import com.ss.mqtt.broker.network.client.MqttClient; import com.ss.mqtt.broker.network.packet.in.PublishInPacket; import org.jetbrains.annotations.NotNull; -/** - * Publishing service - */ public interface PublishingService { - /** - * Sends publish packet to all subscribers - * - * @param publish publish packet to send - * @return publish ack reason code - */ - @NotNull PublishAckReasonCode publish(@NotNull PublishInPacket publish); - + void publish(@NotNull MqttClient client, @NotNull PublishInPacket publish); } diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPacketIdGenerator.java b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPacketIdGenerator.java new file mode 100644 index 00000000..4179ded9 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPacketIdGenerator.java @@ -0,0 +1,29 @@ +package com.ss.mqtt.broker.service.impl; + +import com.ss.mqtt.broker.model.MqttPropertyConstants; +import com.ss.mqtt.broker.service.PacketIdGenerator; +import org.jetbrains.annotations.NotNull; + +import java.util.concurrent.atomic.AtomicInteger; + +public class DefaultPacketIdGenerator implements PacketIdGenerator { + + private final @NotNull AtomicInteger generator; + + public DefaultPacketIdGenerator() { + this.generator = new AtomicInteger(0); + } + + @Override + public int nextPacketId() { + + int nextId = generator.incrementAndGet(); + + if (nextId > MqttPropertyConstants.PACKET_ID_FOR_QOS_0) { + generator.compareAndSet(nextId, 0); + return nextPacketId(); + } + + return nextId; + } +} diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishingService.java b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishingService.java new file mode 100644 index 00000000..17ec06c5 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishingService.java @@ -0,0 +1,21 @@ +package com.ss.mqtt.broker.service.impl; + +import com.ss.mqtt.broker.handler.publish.in.PublishInHandler; +import com.ss.mqtt.broker.model.Subscriber; +import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode; +import com.ss.mqtt.broker.network.client.MqttClient; +import com.ss.mqtt.broker.network.packet.in.PublishInPacket; +import com.ss.mqtt.broker.service.PublishingService; +import lombok.RequiredArgsConstructor; +import org.jetbrains.annotations.NotNull; + +@RequiredArgsConstructor +public class DefaultPublishingService implements PublishingService { + + private final PublishInHandler @NotNull [] publishInHandlers; + + @Override + public void publish(@NotNull MqttClient client, @NotNull PublishInPacket publish) { + publishInHandlers[publish.getQos().ordinal()].handle(client, publish); + } +} diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java b/src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java deleted file mode 100644 index 3f764473..00000000 --- a/src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.ss.mqtt.broker.service.impl; - -import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode; -import com.ss.mqtt.broker.model.Subscriber; -import com.ss.mqtt.broker.network.packet.in.PublishInPacket; -import com.ss.mqtt.broker.service.PublishingService; -import com.ss.mqtt.broker.service.SubscriptionService; -import lombok.RequiredArgsConstructor; -import org.jetbrains.annotations.NotNull; - -/** - * Simple publishing service - */ -@RequiredArgsConstructor -public class SimplePublishingService implements PublishingService { - - private final @NotNull SubscriptionService subscriptionService; - - private static @NotNull PublishAckReasonCode send( - @NotNull Subscriber subscriber, - @NotNull PublishInPacket publish - ) { - - var mqttClient = subscriber.getMqttClient(); - mqttClient.send(mqttClient.getPacketOutFactory().newPublish( - mqttClient, - publish.getPacketId(), - publish.getQos(), - publish.isRetained(), - publish.isDuplicate(), - publish.getTopicName(), - publish.getTopicAlias(), - publish.getPayload(), - publish.isPayloadFormatIndicator(), - publish.getResponseTopic(), - publish.getCorrelationData(), - publish.getUserProperties() - )); - - // TODO this reason code only for QoS 0 - return PublishAckReasonCode.SUCCESS; - } - - @Override - public @NotNull PublishAckReasonCode publish(@NotNull PublishInPacket publish) { - - var subscribers = subscriptionService.getSubscribers(publish.getTopicName()); - - if (subscribers.isEmpty()) { - return PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS; - } - - var success = subscribers.stream() - .map(subscriber -> send(subscriber, publish)) - .allMatch(ackReasonCode -> ackReasonCode.equals(PublishAckReasonCode.SUCCESS)); - - return success ? PublishAckReasonCode.SUCCESS : PublishAckReasonCode.UNSPECIFIED_ERROR; - } -} From 8116d5ca6091188b83ef48ac0429399f13ce02c8 Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Tue, 19 Nov 2019 23:21:55 +0300 Subject: [PATCH 2/8] [broker-15] base implementation of publishing with QoS 1 --- build.gradle | 2 +- .../mqtt/broker/config/MqttBrokerConfig.java | 9 +-- .../publish/out/Qos1PublishOutHandler.java | 8 +-- .../broker/model/MqttPropertyConstants.java | 1 + .../com/ss/mqtt/broker/model/MqttSession.java | 2 + .../broker/model/impl/DefaultMqttSession.java | 35 +++++++---- .../network/packet/in/PublishAckInPacket.java | 7 ++- .../broker/service/PacketIdGenerator.java | 6 -- .../impl/DefaultPacketIdGenerator.java | 29 --------- .../impl/InMemoryMqttSessionService.java | 18 ++++-- .../ConnectSubscribePublishTest.groovy | 63 ++++++++++++++++--- 11 files changed, 107 insertions(+), 73 deletions(-) delete mode 100644 src/main/java/com/ss/mqtt/broker/service/PacketIdGenerator.java delete mode 100644 src/main/java/com/ss/mqtt/broker/service/impl/DefaultPacketIdGenerator.java diff --git a/build.gradle b/build.gradle index 3a54e9b6..24ca5be4 100644 --- a/build.gradle +++ b/build.gradle @@ -32,7 +32,7 @@ allprojects { ext { annotationVersion = "17.0.0" - rlibVersion = "9.5.0" + rlibVersion = "9.6.0" lombokVersion = '1.18.4' springbootVersion = '2.2.0.RELEASE' springVersion = '5.1.6.RELEASE' diff --git a/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java b/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java index 4970f82f..0d0b2f07 100644 --- a/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java +++ b/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java @@ -88,11 +88,6 @@ private interface ChannelFactory extends ); } - @Bean - @NotNull PacketIdGenerator packetIdGenerator() { - return new DefaultPacketIdGenerator(); - } - @Bean PacketInHandler @NotNull [] devicePacketHandlers( @NotNull AuthenticationService authenticationService, @@ -164,10 +159,10 @@ private interface ChannelFactory extends } @Bean - @NotNull PublishOutHandler[] publishOutHandlers(@NotNull PacketIdGenerator packetIdGenerator) { + @NotNull PublishOutHandler[] publishOutHandlers() { return new PublishOutHandler[] { new Qos0PublishOutHandler(), - new Qos1PublishOutHandler(packetIdGenerator), + new Qos1PublishOutHandler(), new Qos2PublishOutHandler(), }; } diff --git a/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java b/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java index c9828f56..6e246209 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java @@ -4,23 +4,22 @@ import com.ss.mqtt.broker.model.QoS; import com.ss.mqtt.broker.model.Subscriber; import com.ss.mqtt.broker.network.packet.in.PublishInPacket; -import com.ss.mqtt.broker.service.PacketIdGenerator; import lombok.RequiredArgsConstructor; import org.jetbrains.annotations.NotNull; @RequiredArgsConstructor public class Qos1PublishOutHandler extends AbstractPublishOutHandler { - private final @NotNull PacketIdGenerator packetIdGenerator; - @Override public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber) { var client = subscriber.getMqttClient(); + var session = client.getSession(); var packetOutFactory = client.getPacketOutFactory(); + var publish = packetOutFactory.newPublish( client, - packetIdGenerator.nextPacketId(), + session.nextPacketId(), QoS.AT_LEAST_ONCE_DELIVERY, packet.isRetained(), packet.isDuplicate(), @@ -33,7 +32,6 @@ public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscrib packet.getUserProperties() ); - var session = client.getSession(); session.registerPendingPublish(publish); client.send(publish); diff --git a/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java b/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java index e24ec02f..2786bfb1 100644 --- a/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java +++ b/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java @@ -5,6 +5,7 @@ public interface MqttPropertyConstants { QoS MAXIMUM_QOS_DEFAULT = QoS.EXACTLY_ONCE_DELIVERY; int MAXIMUM_PROTOCOL_PACKET_SIZE = 256 * 1024 * 1024; + int MAXIMUM_PACKET_ID = 0xFFFF; long SESSION_EXPIRY_INTERVAL_DISABLED = 0; long SESSION_EXPIRY_INTERVAL_DEFAULT = 120; diff --git a/src/main/java/com/ss/mqtt/broker/model/MqttSession.java b/src/main/java/com/ss/mqtt/broker/model/MqttSession.java index 9b01febd..44186ebb 100644 --- a/src/main/java/com/ss/mqtt/broker/model/MqttSession.java +++ b/src/main/java/com/ss/mqtt/broker/model/MqttSession.java @@ -27,6 +27,8 @@ interface PendingCallback { @NotNull String getClientId(); + int nextPacketId(); + /** * @return the expiration time in ms or -1 if it should not be expired now. */ diff --git a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java index aadcce3b..403ac29b 100644 --- a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java +++ b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java @@ -1,5 +1,6 @@ package com.ss.mqtt.broker.model.impl; +import com.ss.mqtt.broker.model.MqttPropertyConstants; import com.ss.mqtt.broker.model.MqttSession.UnsafeMqttSession; import com.ss.mqtt.broker.network.client.MqttClient; import com.ss.mqtt.broker.network.packet.HasPacketId; @@ -16,6 +17,7 @@ import org.jetbrains.annotations.NotNull; import java.util.Collection; +import java.util.concurrent.atomic.AtomicInteger; @Log4j2 @ToString(of = "clientId") @@ -30,12 +32,27 @@ private static class PendingPublish { private final @NotNull String clientId; private final @NotNull ConcurrentArray> pendingPublishes; + private final @NotNull AtomicInteger packetIdGenerator; private volatile @Getter @Setter long expirationTime = -1; public DefaultMqttSession(@NotNull String clientId) { this.clientId = clientId; this.pendingPublishes = ConcurrentArray.ofType(PendingPublish.class); + this.packetIdGenerator = new AtomicInteger(0); + } + + @Override + public int nextPacketId() { + + var nextId = packetIdGenerator.incrementAndGet(); + + if (nextId >= MqttPropertyConstants.MAXIMUM_PACKET_ID) { + packetIdGenerator.compareAndSet(nextId, 0); + return nextPacketId(); + } + + return nextId; } @Override @@ -54,13 +71,10 @@ public void registerPendingPublish( @NotNull PublishOutPacket publish, @NotNull PendingCallback callback ) { - // FIXME add new method to array - var stamp = pendingPublishes.writeLock(); - try { - pendingPublishes.add(new PendingPublish<>(publish, callback, System.currentTimeMillis())); - } finally { - pendingPublishes.writeUnlock(stamp); - } + pendingPublishes.runInWriteLock( + new PendingPublish<>(publish, callback, System.currentTimeMillis()), + Array::add + ); } @Override @@ -70,11 +84,10 @@ public void unregisterPendingPacket ) { var packetId = feedback.getPacketId(); - - // FIXME add new method to array - var pendingPublish = pendingPublishes.findAnyInReadLock( + var pendingPublish = pendingPublishes.findAnyConvertedToIntInReadLock( packetId, - (id, pending) -> id == pending.publish.getPacketId() + pending -> pending.publish.getPacketId(), + (id, targetId) -> id == targetId ); if (pendingPublish == null) { diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java index 99c7be6b..6fabaf35 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java @@ -68,11 +68,16 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B packetId = readUnsignedShort(buffer); // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901123 - if (connection.isSupported(MqttVersion.MQTT_5)) { + if (connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining()) { reasonCode = PublishAckReasonCode.of(readUnsignedByte(buffer)); } } + @Override + protected boolean isPropertiesSupported(@NotNull MqttConnection connection, @NotNull ByteBuffer buffer) { + return connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining(); + } + @Override protected @NotNull Set getAvailableProperties() { return AVAILABLE_PROPERTIES; diff --git a/src/main/java/com/ss/mqtt/broker/service/PacketIdGenerator.java b/src/main/java/com/ss/mqtt/broker/service/PacketIdGenerator.java deleted file mode 100644 index 140f1dba..00000000 --- a/src/main/java/com/ss/mqtt/broker/service/PacketIdGenerator.java +++ /dev/null @@ -1,6 +0,0 @@ -package com.ss.mqtt.broker.service; - -public interface PacketIdGenerator { - - int nextPacketId(); -} diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPacketIdGenerator.java b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPacketIdGenerator.java deleted file mode 100644 index 4179ded9..00000000 --- a/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPacketIdGenerator.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.ss.mqtt.broker.service.impl; - -import com.ss.mqtt.broker.model.MqttPropertyConstants; -import com.ss.mqtt.broker.service.PacketIdGenerator; -import org.jetbrains.annotations.NotNull; - -import java.util.concurrent.atomic.AtomicInteger; - -public class DefaultPacketIdGenerator implements PacketIdGenerator { - - private final @NotNull AtomicInteger generator; - - public DefaultPacketIdGenerator() { - this.generator = new AtomicInteger(0); - } - - @Override - public int nextPacketId() { - - int nextId = generator.incrementAndGet(); - - if (nextId > MqttPropertyConstants.PACKET_ID_FOR_QOS_0) { - generator.compareAndSet(nextId, 0); - return nextPacketId(); - } - - return nextId; - } -} diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java b/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java index e7fe6002..44ad140b 100644 --- a/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java +++ b/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java @@ -1,6 +1,7 @@ package com.ss.mqtt.broker.service.impl; import com.ss.mqtt.broker.model.MqttSession; +import com.ss.mqtt.broker.model.MqttSession.UnsafeMqttSession; import com.ss.mqtt.broker.model.impl.DefaultMqttSession; import com.ss.mqtt.broker.service.MqttSessionService; import com.ss.rlib.common.concurrent.util.ThreadUtils; @@ -18,7 +19,7 @@ @Log4j2 public class InMemoryMqttSessionService implements MqttSessionService, Closeable { - private final @NotNull ConcurrentObjectDictionary storedSession; + private final @NotNull ConcurrentObjectDictionary storedSession; private final @NotNull Thread cleanThread; private final int cleanInterval; @@ -65,10 +66,10 @@ public InMemoryMqttSessionService(int cleanInterval) { @Override public @NotNull Mono store(@NotNull String clientId, @NotNull MqttSession session, long expiryInterval) { - var unsafe = (MqttSession.UnsafeMqttSession) session; + var unsafe = (UnsafeMqttSession) session; unsafe.setExpirationTime(System.currentTimeMillis() + (expiryInterval * 1000)); - storedSession.runInWriteLock(clientId, session, ObjectDictionary::put); + storedSession.runInWriteLock(clientId, unsafe, ObjectDictionary::put); log.debug("Stored session for client {}", clientId); @@ -77,8 +78,8 @@ public InMemoryMqttSessionService(int cleanInterval) { private void cleanup() { - var toCheck = ArrayFactory.newArray(MqttSession.class); - var toRemove = ArrayFactory.newArray(MqttSession.class); + var toCheck = ArrayFactory.newArray(UnsafeMqttSession.class); + var toRemove = ArrayFactory.newArray(UnsafeMqttSession.class); while (!closed) { @@ -110,13 +111,18 @@ private void cleanup() { // if we already have new session under the same client id if (removed != null && removed != session) { dictionary.put(session.getClientId(), removed); + } else if (removed != null) { + removed.clear(); } } }); } } - private boolean findToRemove(@NotNull Array toCheck, @NotNull Array toRemove) { + private boolean findToRemove( + @NotNull Array toCheck, + @NotNull Array toRemove + ) { var currentTime = System.currentTimeMillis(); diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy index e5b26aaf..f5436978 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy @@ -6,11 +6,13 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode +import java.util.concurrent.atomic.AtomicReference + class ConnectSubscribePublishTest extends MqttBrokerTest { - def "publisher should publish message to broker"() { + def "publisher should publish message QoS 0"() { given: - Mqtt5Publish receivedMessage = null + def received = new AtomicReference() def subscriber = buildClient() def publisher = buildClient() when: @@ -20,7 +22,7 @@ class ConnectSubscribePublishTest extends MqttBrokerTest { def subscribeResult = subscriber.subscribeWith() .topicFilter(topicFilter) .qos(MqttQos.AT_MOST_ONCE) - .callback({ publish -> receivedMessage = publish }) + .callback({ publish -> received.set(publish) }) .send() .join() @@ -45,10 +47,57 @@ class ConnectSubscribePublishTest extends MqttBrokerTest { publishResult.publish.type == Mqtt5MessageType.PUBLISH publishResult.publish.topic.levels.join("/") == topicFilter - receivedMessage != null - receivedMessage.qos == MqttQos.AT_MOST_ONCE - receivedMessage.type == Mqtt5MessageType.PUBLISH - receivedMessage.topic.levels.join("/") == topicFilter + received.get() != null + received.get().qos == MqttQos.AT_MOST_ONCE + received.get().type == Mqtt5MessageType.PUBLISH + received.get().topic.levels.join("/") == topicFilter + cleanup: + subscriber.disconnect() + publisher.disconnect() + } + + def "publisher should publish message QoS 1"() { + given: + def received = new AtomicReference() + def subscriber = buildClient() + def publisher = buildClient() + when: + + subscriber.connect().join() + publisher.connect().join() + + def subscribeResult = subscriber.subscribeWith() + .topicFilter(topicFilter) + .qos(MqttQos.AT_LEAST_ONCE) + .callback({ publish -> received.set(publish) }) + .send() + .join() + + def publishResult = publisher.publishWith() + .topic(topicFilter) + .qos(MqttQos.AT_LEAST_ONCE) + .payload(publishPayload) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .send() + .join() + + Thread.sleep(500) + then: + noExceptionThrown() + + subscribeResult != null + subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1) + subscribeResult.type == Mqtt5MessageType.SUBACK + + publishResult != null + publishResult.publish.qos == MqttQos.AT_LEAST_ONCE + publishResult.publish.type == Mqtt5MessageType.PUBLISH + publishResult.publish.topic.levels.join("/") == topicFilter + + received.get() != null + received.get().qos == MqttQos.AT_LEAST_ONCE + received.get().type == Mqtt5MessageType.PUBLISH + received.get().topic.levels.join("/") == topicFilter cleanup: subscriber.disconnect() publisher.disconnect() From d36a5bc9543f5132d948f81250770266274f84c3 Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Wed, 20 Nov 2019 00:50:23 +0300 Subject: [PATCH 3/8] [broker-15] implement message expiration and re-trying for QoS 1+ messages --- .../mqtt/broker/config/MqttBrokerConfig.java | 23 ++-- .../AbstractMqttClientReleaseHandler.java | 3 + .../DefaultMqttClientReleaseHandler.java | 18 ++++ .../DeviceMqttClientReleaseHandler.java | 16 --- .../packet/in/ConnectInPacketHandler.java | 10 +- .../publish/out/Qos0PublishOutHandler.java | 2 +- .../publish/out/Qos1PublishOutHandler.java | 47 ++++++-- .../broker/model/MqttPropertyConstants.java | 1 + .../com/ss/mqtt/broker/model/MqttSession.java | 27 ++--- .../broker/model/impl/DefaultMqttSession.java | 101 +++++++++++++----- .../broker/network/client/MqttClient.java | 5 +- .../broker/service/PublishRetryService.java | 11 ++ .../impl/DefaultPublishRetryService.java | 84 +++++++++++++++ 13 files changed, 268 insertions(+), 80 deletions(-) create mode 100644 src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java delete mode 100644 src/main/java/com/ss/mqtt/broker/handler/client/DeviceMqttClientReleaseHandler.java create mode 100644 src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java create mode 100644 src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java diff --git a/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java b/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java index 0d0b2f07..d1189ba9 100644 --- a/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java +++ b/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java @@ -1,6 +1,6 @@ package com.ss.mqtt.broker.config; -import com.ss.mqtt.broker.handler.client.DeviceMqttClientReleaseHandler; +import com.ss.mqtt.broker.handler.client.DefaultMqttClientReleaseHandler; import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler; import com.ss.mqtt.broker.handler.packet.in.*; import com.ss.mqtt.broker.handler.publish.in.PublishInHandler; @@ -94,14 +94,16 @@ private interface ChannelFactory extends @NotNull ClientIdRegistry clientIdRegistry, @NotNull SubscriptionService subscriptionService, @NotNull PublishingService publishingService, - @NotNull MqttSessionService mqttSessionService + @NotNull MqttSessionService mqttSessionService, + @NotNull PublishRetryService publishRetryService ) { var handlers = new PacketInHandler[PacketType.INVALID.ordinal()]; handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler( clientIdRegistry, authenticationService, - mqttSessionService + mqttSessionService, + publishRetryService ); handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService); handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService); @@ -113,11 +115,12 @@ private interface ChannelFactory extends } @Bean - @NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler( + @NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler( @NotNull ClientIdRegistry clientIdRegistry, - @NotNull MqttSessionService mqttSessionService + @NotNull MqttSessionService mqttSessionService, + @NotNull PublishRetryService publishRetryService ) { - return new DeviceMqttClientReleaseHandler(clientIdRegistry, mqttSessionService); + return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, publishRetryService); } @Bean @@ -139,6 +142,14 @@ private interface ChannelFactory extends ); } + @Bean + @NotNull PublishRetryService publishRetryService() { + return new DefaultPublishRetryService( + env.getProperty("publish.pending.check.interval", int.class, 60 * 1000), + env.getProperty("publish.retry.interval", int.class, 60 * 1000) + ); + } + @Bean @NotNull InetSocketAddress deviceNetworkAddress( @NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork, diff --git a/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java b/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java index 1ea5b3db..eeecefa7 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.network.client.AbstractMqttClient; import com.ss.mqtt.broker.service.ClientIdRegistry; import com.ss.mqtt.broker.service.MqttSessionService; +import com.ss.mqtt.broker.service.PublishRetryService; import com.ss.rlib.common.util.StringUtils; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -17,6 +18,7 @@ public abstract class AbstractMqttClientReleaseHandler release(@NotNull UnsafeMqttClient client) { @@ -25,6 +27,7 @@ public abstract class AbstractMqttClientReleaseHandler releaseImpl(@NotNull T client) { + publishRetryService.unregister(client); var clientId = client.getClientId(); client.setClientId(StringUtils.EMPTY); diff --git a/src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java b/src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java new file mode 100644 index 00000000..236193d0 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java @@ -0,0 +1,18 @@ +package com.ss.mqtt.broker.handler.client; + +import com.ss.mqtt.broker.network.client.DeviceMqttClient; +import com.ss.mqtt.broker.service.ClientIdRegistry; +import com.ss.mqtt.broker.service.MqttSessionService; +import com.ss.mqtt.broker.service.PublishRetryService; +import org.jetbrains.annotations.NotNull; + +public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler { + + public DefaultMqttClientReleaseHandler( + @NotNull ClientIdRegistry clientIdRegistry, + @NotNull MqttSessionService sessionService, + @NotNull PublishRetryService publishRetryService + ) { + super(clientIdRegistry, sessionService, publishRetryService); + } +} diff --git a/src/main/java/com/ss/mqtt/broker/handler/client/DeviceMqttClientReleaseHandler.java b/src/main/java/com/ss/mqtt/broker/handler/client/DeviceMqttClientReleaseHandler.java deleted file mode 100644 index a8232259..00000000 --- a/src/main/java/com/ss/mqtt/broker/handler/client/DeviceMqttClientReleaseHandler.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.ss.mqtt.broker.handler.client; - -import com.ss.mqtt.broker.network.client.DeviceMqttClient; -import com.ss.mqtt.broker.service.ClientIdRegistry; -import com.ss.mqtt.broker.service.MqttSessionService; -import org.jetbrains.annotations.NotNull; - -public class DeviceMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler { - - public DeviceMqttClientReleaseHandler( - @NotNull ClientIdRegistry clientIdRegistry, - @NotNull MqttSessionService sessionService - ) { - super(clientIdRegistry, sessionService); - } -} diff --git a/src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java b/src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java index 0b5b22a9..0d21406b 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java @@ -13,6 +13,7 @@ import com.ss.mqtt.broker.service.AuthenticationService; import com.ss.mqtt.broker.service.ClientIdRegistry; import com.ss.mqtt.broker.service.MqttSessionService; +import com.ss.mqtt.broker.service.PublishRetryService; import com.ss.rlib.common.util.StringUtils; import lombok.RequiredArgsConstructor; import org.jetbrains.annotations.NotNull; @@ -21,9 +22,10 @@ @RequiredArgsConstructor public class ConnectInPacketHandler extends AbstractPacketHandler { - private final ClientIdRegistry clientIdRegistry; - private final AuthenticationService authenticationService; - private final MqttSessionService mqttSessionService; + private final @NotNull ClientIdRegistry clientIdRegistry; + private final @NotNull AuthenticationService authenticationService; + private final @NotNull MqttSessionService mqttSessionService; + private final @NotNull PublishRetryService publishRetryService; @Override protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) { @@ -135,6 +137,8 @@ private Mono onConnected( packet.getReceiveMax() )); + publishRetryService.register(client); + return Mono.just(Boolean.TRUE); } diff --git a/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos0PublishOutHandler.java b/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos0PublishOutHandler.java index b82bda8e..7a2e2a51 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos0PublishOutHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos0PublishOutHandler.java @@ -19,7 +19,7 @@ public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscrib MqttPropertyConstants.PACKET_ID_FOR_QOS_0, QoS.AT_MOST_ONCE_DELIVERY, packet.isRetained(), - packet.isDuplicate(), + false, packet.getTopicName(), MqttPropertyConstants.TOPIC_ALIAS_NOT_SET, packet.getPayload(), diff --git a/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java b/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java index 6e246209..1d628f52 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/publish/out/Qos1PublishOutHandler.java @@ -1,28 +1,39 @@ package com.ss.mqtt.broker.handler.publish.out; import com.ss.mqtt.broker.model.MqttPropertyConstants; +import com.ss.mqtt.broker.model.MqttSession; import com.ss.mqtt.broker.model.QoS; import com.ss.mqtt.broker.model.Subscriber; +import com.ss.mqtt.broker.network.client.MqttClient; +import com.ss.mqtt.broker.network.packet.HasPacketId; import com.ss.mqtt.broker.network.packet.in.PublishInPacket; import lombok.RequiredArgsConstructor; import org.jetbrains.annotations.NotNull; @RequiredArgsConstructor -public class Qos1PublishOutHandler extends AbstractPublishOutHandler { +public class Qos1PublishOutHandler extends AbstractPublishOutHandler implements MqttSession.PendingPacketHandler { @Override public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber) { var client = subscriber.getMqttClient(); var session = client.getSession(); - var packetOutFactory = client.getPacketOutFactory(); - var publish = packetOutFactory.newPublish( + // it means this client was already closed + if (session == null) { + return; + } + + var packetId = session.nextPacketId(); + session.registerPendingPublish(packet, this, packetId); + + var packetOutFactory = client.getPacketOutFactory(); + client.send(packetOutFactory.newPublish( client, - session.nextPacketId(), + packetId, QoS.AT_LEAST_ONCE_DELIVERY, packet.isRetained(), - packet.isDuplicate(), + false, packet.getTopicName(), MqttPropertyConstants.TOPIC_ALIAS_NOT_SET, packet.getPayload(), @@ -30,10 +41,30 @@ public void handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscrib packet.getResponseTopic(), packet.getCorrelationData(), packet.getUserProperties() - ); + )); + } - session.registerPendingPublish(publish); + @Override + public boolean handleResponse(@NotNull MqttClient client, @NotNull HasPacketId response) { + return true; + } - client.send(publish); + @Override + public void retryAsync(@NotNull MqttClient client, @NotNull PublishInPacket packet, int packetId) { + var packetOutFactory = client.getPacketOutFactory(); + client.send(packetOutFactory.newPublish( + client, + packetId, + QoS.AT_LEAST_ONCE_DELIVERY, + packet.isRetained(), + true, + packet.getTopicName(), + MqttPropertyConstants.TOPIC_ALIAS_NOT_SET, + packet.getPayload(), + packet.isPayloadFormatIndicator(), + packet.getResponseTopic(), + packet.getCorrelationData(), + packet.getUserProperties() + )); } } diff --git a/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java b/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java index 2786bfb1..04138cee 100644 --- a/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java +++ b/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java @@ -26,6 +26,7 @@ public interface MqttPropertyConstants { boolean PAYLOAD_FORMAT_INDICATOR_DEFAULT = false; long MESSAGE_EXPIRY_INTERVAL_DEFAULT = 0; + long MESSAGE_EXPIRY_INTERVAL_UNDEFINED = -1; int TOPIC_ALIAS_MAXIMUM_UNDEFINED = -1; int TOPIC_ALIAS_MAXIMUM_DISABLED = 0; diff --git a/src/main/java/com/ss/mqtt/broker/model/MqttSession.java b/src/main/java/com/ss/mqtt/broker/model/MqttSession.java index 44186ebb..8fc35640 100644 --- a/src/main/java/com/ss/mqtt/broker/model/MqttSession.java +++ b/src/main/java/com/ss/mqtt/broker/model/MqttSession.java @@ -2,8 +2,7 @@ import com.ss.mqtt.broker.network.client.MqttClient; import com.ss.mqtt.broker.network.packet.HasPacketId; -import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket; -import com.ss.mqtt.broker.network.packet.out.PublishOutPacket; +import com.ss.mqtt.broker.network.packet.in.PublishInPacket; import org.jetbrains.annotations.NotNull; public interface MqttSession { @@ -15,14 +14,14 @@ interface UnsafeMqttSession extends MqttSession { void clear(); } - interface PendingCallback { - - @NotNull PendingCallback EMPTY = (client, feedback) -> true; + interface PendingPacketHandler { /** - * @return true of pending packet can be removed. + * @return true if pending packet can be removed. */ - boolean handle(@NotNull MqttClient client, @NotNull T feedback); + boolean handleResponse(@NotNull MqttClient client, @NotNull HasPacketId response); + + void retryAsync(@NotNull MqttClient client, @NotNull PublishInPacket packet, int packetId); } @NotNull String getClientId(); @@ -34,15 +33,9 @@ interface PendingCallback { */ long getExpirationTime(); - void registerPendingPublish(@NotNull PublishOutPacket publish); - - void registerPendingPublish( - @NotNull PublishOutPacket publish, - @NotNull MqttSession.PendingCallback callback - ); + void removeExpiredPackets(); + void resendPendingPacketsAsync(@NotNull MqttClient client, int retryInterval); - void unregisterPendingPacket( - @NotNull MqttClient client, - @NotNull T feedback - ); + void registerPendingPublish(@NotNull PublishInPacket publish, @NotNull PendingPacketHandler handler, int packetId); + void unregisterPendingPacket(@NotNull MqttClient client, @NotNull HasPacketId response); } diff --git a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java index 403ac29b..1482294b 100644 --- a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java +++ b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java @@ -4,15 +4,11 @@ import com.ss.mqtt.broker.model.MqttSession.UnsafeMqttSession; import com.ss.mqtt.broker.network.client.MqttClient; import com.ss.mqtt.broker.network.packet.HasPacketId; -import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket; -import com.ss.mqtt.broker.network.packet.out.PublishOutPacket; +import com.ss.mqtt.broker.network.packet.in.PublishInPacket; import com.ss.rlib.common.util.ClassUtils; import com.ss.rlib.common.util.array.Array; import com.ss.rlib.common.util.array.ConcurrentArray; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import lombok.extern.log4j.Log4j2; import org.jetbrains.annotations.NotNull; @@ -21,17 +17,22 @@ @Log4j2 @ToString(of = "clientId") +@EqualsAndHashCode(of = "clientId") public class DefaultMqttSession implements UnsafeMqttSession { - @RequiredArgsConstructor - private static class PendingPublish { - private final PublishOutPacket publish; - private final PendingCallback callback; + @AllArgsConstructor + private static class PendingPublish { + + private final @NotNull PublishInPacket publish; + private final @NotNull PendingPacketHandler handler; private final long registeredTime; + private final int packetId; + + private volatile long lastAttemptTime; } private final @NotNull String clientId; - private final @NotNull ConcurrentArray> pendingPublishes; + private final @NotNull ConcurrentArray pendingPublishes; private final @NotNull AtomicInteger packetIdGenerator; private volatile @Getter @Setter long expirationTime = -1; @@ -61,29 +62,75 @@ public int nextPacketId() { } @Override - public void registerPendingPublish(@NotNull PublishOutPacket publish) { - pendingPublishes.runInWriteLock(publish, (array, packet) -> - array.add(new PendingPublish<>(packet, PendingCallback.EMPTY, System.currentTimeMillis()))); + public void registerPendingPublish( + @NotNull PublishInPacket publish, + @NotNull PendingPacketHandler handler, + int packetId + ) { + + var currentTime = System.currentTimeMillis(); + var pendingPublish = new PendingPublish(publish, handler, currentTime, packetId, currentTime); + + pendingPublishes.runInWriteLock(pendingPublish, Array::add); } @Override - public void registerPendingPublish( - @NotNull PublishOutPacket publish, - @NotNull PendingCallback callback - ) { - pendingPublishes.runInWriteLock( - new PendingPublish<>(publish, callback, System.currentTimeMillis()), - Array::add - ); + public void removeExpiredPackets() { + + if (pendingPublishes.isEmpty()) { + return; + } + + pendingPublishes.runInWriteLock(publishes -> { + + var currentTime = System.currentTimeMillis(); + var array = publishes.array(); + + for (int i = 0, length = publishes.size(); i < length; i++) { + + var pendingPublish = array[i]; + + var publish = pendingPublish.publish; + var messageExpiryInterval = publish.getMessageExpiryInterval(); + + if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED) { + continue; + } + + var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000); + + if (expiredTime < currentTime) { + publishes.fastRemove(i); + i--; + length--; + } + } + }); + } + + @Override + public void resendPendingPacketsAsync(@NotNull MqttClient client, int retryInterval) { + var currentTime = System.currentTimeMillis(); + var stamp = pendingPublishes.readLock(); + try { + for (var pendingPublish : pendingPublishes) { + if (currentTime - pendingPublish.lastAttemptTime > retryInterval) { + pendingPublish.lastAttemptTime = currentTime; + pendingPublish.handler.retryAsync(client, pendingPublish.publish, pendingPublish.packetId); + } + } + } finally { + pendingPublishes.readUnlock(stamp); + } } @Override - public void unregisterPendingPacket( + public void unregisterPendingPacket( @NotNull MqttClient client, - @NotNull T feedback + @NotNull HasPacketId response ) { - var packetId = feedback.getPacketId(); + var packetId = response.getPacketId(); var pendingPublish = pendingPublishes.findAnyConvertedToIntInReadLock( packetId, pending -> pending.publish.getPacketId(), @@ -91,11 +138,11 @@ public void unregisterPendingPacket ); if (pendingPublish == null) { - log.warn("Not found pending publish for client {} by received packet {}", clientId, feedback); + log.warn("Not found pending publish for client {} by received packet {}", clientId, response); return; } - var shouldBeRemoved = pendingPublish.callback.handle(client, ClassUtils.unsafeNNCast(feedback)); + var shouldBeRemoved = pendingPublish.handler.handleResponse(client, ClassUtils.unsafeNNCast(response)); if (shouldBeRemoved) { pendingPublishes.runInWriteLock(pendingPublish, Array::fastRemove); diff --git a/src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java b/src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java index e7b051b9..6ef2f3c5 100644 --- a/src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java +++ b/src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java @@ -8,6 +8,7 @@ import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket; import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; public interface MqttClient { @@ -30,7 +31,7 @@ void configure( void setClientId(@NotNull String clientId); - void setSession(@NotNull MqttSession session); + void setSession(@Nullable MqttSession session); void reject(@NotNull ConnectAckReasonCode reasonCode); @@ -41,7 +42,7 @@ void configure( @NotNull MqttConnectionConfig getConnectionConfig(); @NotNull String getClientId(); - @NotNull MqttSession getSession(); + @Nullable MqttSession getSession(); int getKeepAlive(); int getMaximumPacketSize(); diff --git a/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java b/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java new file mode 100644 index 00000000..df8f18fd --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java @@ -0,0 +1,11 @@ +package com.ss.mqtt.broker.service; + +import com.ss.mqtt.broker.network.client.MqttClient; +import org.jetbrains.annotations.NotNull; + +public interface PublishRetryService { + + void register(@NotNull MqttClient client); + + void unregister(@NotNull MqttClient client); +} diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java new file mode 100644 index 00000000..299d84da --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java @@ -0,0 +1,84 @@ +package com.ss.mqtt.broker.service.impl; + +import com.ss.mqtt.broker.network.client.MqttClient; +import com.ss.mqtt.broker.service.PublishRetryService; +import com.ss.rlib.common.concurrent.util.ThreadUtils; +import com.ss.rlib.common.util.array.Array; +import com.ss.rlib.common.util.array.ArrayFactory; +import com.ss.rlib.common.util.array.ConcurrentArray; +import org.jetbrains.annotations.NotNull; + +import java.io.Closeable; + +public class DefaultPublishRetryService implements PublishRetryService, Closeable { + + private final @NotNull ConcurrentArray registeredClients; + private final @NotNull Thread checkThread; + + private final int checkInterval; + private final int retryInterval; + + private volatile boolean closed; + + public DefaultPublishRetryService(int checkInterval, int retryInterval) { + this.checkInterval = checkInterval; + this.retryInterval = retryInterval; + this.registeredClients = ConcurrentArray.ofType(MqttClient.class); + this.checkThread = new Thread(this::checkPendingPackets, "DefaultPublishRetryService-Check"); + this.checkThread.setDaemon(true); + this.checkThread.start(); + } + + @Override + public void register(@NotNull MqttClient client) { + registeredClients.runInWriteLock(client, Array::add); + } + + @Override + public void unregister(@NotNull MqttClient client) { + registeredClients.runInWriteLock(client, Array::fastRemove); + } + + private void checkPendingPackets() { + + var toCheck = ArrayFactory.newArray(MqttClient.class); + var toUnregister = ArrayFactory.newArray(MqttClient.class); + + while (!closed) { + + toCheck.clear(); + toUnregister.clear(); + + ThreadUtils.sleep(checkInterval); + + registeredClients.runInReadLock(toCheck, Array::copyTo); + + for (var client : toCheck) { + + var session = client.getSession(); + + // if session is null it means that client was closed + if (session == null) { + toUnregister.add(client); + continue; + } + + session.removeExpiredPackets(); + session.resendPendingPacketsAsync(client, retryInterval); + } + + if (toUnregister.isEmpty()) { + return; + } + + // unregister closed clients + registeredClients.runInWriteLock(toUnregister, Array::fastRemoveAll); + } + } + + @Override + public void close() { + closed = true; + checkThread.interrupt(); + } +} From 0ffaf7b4f37f278a92dc9d70c6939cc11c75515b Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Thu, 21 Nov 2019 09:48:30 +0300 Subject: [PATCH 4/8] [broker-15] working on test coverage --- .../broker/config/MqttConnectionConfig.java | 5 + .../packet/in/PublishAckInPacketHandler.java | 2 +- .../broker/model/MqttPropertyConstants.java | 2 +- .../com/ss/mqtt/broker/model/MqttSession.java | 4 +- .../broker/model/impl/DefaultMqttSession.java | 24 +++- .../network/packet/in/PublishInPacket.java | 6 +- .../network/packet/out/Publish5OutPacket.java | 2 +- .../broker/service/PublishRetryService.java | 2 + .../impl/DefaultPublishRetryService.java | 11 +- .../mqtt/broker/test/UnitSpecification.groovy | 6 + .../ConnectSubscribePublishTest.groovy | 2 +- .../test/integration/ConnectionTest.groovy | 2 +- ...groovy => IntegrationSpecification.groovy} | 2 +- .../service/ClientIdRegistryTest.groovy | 4 +- .../service/MqttSessionServiceTest.groovy | 4 +- .../service/PublishRetryServiceTest.groovy | 105 ++++++++++++++++++ ...groovy => NetworkUnitSpecification.groovy} | 19 ++-- .../test/network/in/BaseInPacketTest.groovy | 4 +- .../network/in/PublishInPacketTest.groovy | 2 +- .../test/network/out/BaseOutPacketTest.groovy | 20 ++-- .../broker/test/util/MqttDataUtilsTest.groovy | 4 +- .../resources/application-test.properties | 3 + 22 files changed, 191 insertions(+), 44 deletions(-) create mode 100644 src/test/groovy/com/ss/mqtt/broker/test/UnitSpecification.groovy rename src/test/groovy/com/ss/mqtt/broker/test/integration/{MqttBrokerTest.groovy => IntegrationSpecification.groovy} (96%) create mode 100644 src/test/groovy/com/ss/mqtt/broker/test/integration/service/PublishRetryServiceTest.groovy rename src/test/groovy/com/ss/mqtt/broker/test/network/{BasePacketTest.groovy => NetworkUnitSpecification.groovy} (86%) diff --git a/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java b/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java index a355035e..3287563f 100644 --- a/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java +++ b/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java @@ -1,7 +1,9 @@ package com.ss.mqtt.broker.config; import com.ss.mqtt.broker.model.QoS; +import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import org.jetbrains.annotations.NotNull; @@ -24,4 +26,7 @@ public class MqttConnectionConfig { private final boolean wildcardSubscriptionAvailable; private final boolean subscriptionIdAvailable; private final boolean sharedSubscriptionAvailable; + + + @NoArgsConstructor @Data class MyClass { final String field = "";} } diff --git a/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java index 419f3ff3..e3313ddc 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java @@ -10,6 +10,6 @@ public class PublishAckInPacketHandler extends AbstractPacketHandler retryInterval) { - pendingPublish.lastAttemptTime = currentTime; - pendingPublish.handler.retryAsync(client, pendingPublish.publish, pendingPublish.packetId); + + if (currentTime - pendingPublish.lastAttemptTime <= retryInterval) { + continue; } + + log.debug("Re-try to send publish {}", pendingPublish.publish); + + pendingPublish.lastAttemptTime = currentTime; + pendingPublish.handler.retryAsync(client, pendingPublish.publish, pendingPublish.packetId); } + } finally { pendingPublishes.readUnlock(stamp); } } @Override - public void unregisterPendingPacket( + public void updatePendingPacket( @NotNull MqttClient client, @NotNull HasPacketId response ) { diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java index f1a0dc9b..6cb99c9e 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java @@ -265,7 +265,7 @@ public class PublishInPacket extends MqttReadablePacket { private @NotNull byte[] correlationData; - private long messageExpiryInterval = MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT; + private long messageExpiryInterval = MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED; private int topicAlias = MqttPropertyConstants.TOPIC_ALIAS_DEFAULT; private boolean payloadFormatIndicator = MqttPropertyConstants.PAYLOAD_FORMAT_INDICATOR_DEFAULT; @@ -315,8 +315,8 @@ protected void applyProperty(@NotNull PacketProperty property, long value) { topicAlias = NumberUtils.validate( (int) value, MqttPropertyConstants.TOPIC_ALIAS_MIN, - MqttPropertyConstants.TOPIC_ALIAS_MAX) - ; + MqttPropertyConstants.TOPIC_ALIAS_MAX + ); break; case MESSAGE_EXPIRY_INTERVAL: messageExpiryInterval = value; diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java index c5bf7b22..1c97de8a 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java @@ -176,7 +176,7 @@ protected void writeProperties(@NotNull ByteBuffer buffer) { writeProperty(buffer, PacketProperty.MESSAGE_EXPIRY_INTERVAL, 0, - MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT + MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED ); writeProperty(buffer, PacketProperty.TOPIC_ALIAS, topicAlias, MqttPropertyConstants.TOPIC_ALIAS_DEFAULT); writeNotEmptyProperty(buffer, PacketProperty.RESPONSE_TOPIC, responseTopic); diff --git a/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java b/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java index df8f18fd..6e423bf0 100644 --- a/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java +++ b/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java @@ -8,4 +8,6 @@ public interface PublishRetryService { void register(@NotNull MqttClient client); void unregister(@NotNull MqttClient client); + + boolean exist(@NotNull String clientId); } diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java index 299d84da..1dc966ff 100644 --- a/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java +++ b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java @@ -6,10 +6,12 @@ import com.ss.rlib.common.util.array.Array; import com.ss.rlib.common.util.array.ArrayFactory; import com.ss.rlib.common.util.array.ConcurrentArray; +import lombok.extern.log4j.Log4j2; import org.jetbrains.annotations.NotNull; import java.io.Closeable; +@Log4j2 public class DefaultPublishRetryService implements PublishRetryService, Closeable { private final @NotNull ConcurrentArray registeredClients; @@ -39,6 +41,11 @@ public void unregister(@NotNull MqttClient client) { registeredClients.runInWriteLock(client, Array::fastRemove); } + @Override + public boolean exist(@NotNull String clientId) { + return registeredClients.anyMatchInReadLock(clientId, (id, client) -> id.equals(client.getClientId())); + } + private void checkPendingPackets() { var toCheck = ArrayFactory.newArray(MqttClient.class); @@ -53,6 +60,8 @@ private void checkPendingPackets() { registeredClients.runInReadLock(toCheck, Array::copyTo); + log.debug("Check pending packets in {} client(s)", toCheck.size()); + for (var client : toCheck) { var session = client.getSession(); @@ -68,7 +77,7 @@ private void checkPendingPackets() { } if (toUnregister.isEmpty()) { - return; + continue; } // unregister closed clients diff --git a/src/test/groovy/com/ss/mqtt/broker/test/UnitSpecification.groovy b/src/test/groovy/com/ss/mqtt/broker/test/UnitSpecification.groovy new file mode 100644 index 00000000..1faebe21 --- /dev/null +++ b/src/test/groovy/com/ss/mqtt/broker/test/UnitSpecification.groovy @@ -0,0 +1,6 @@ +package com.ss.mqtt.broker.test + +import spock.lang.Specification + +class UnitSpecification extends Specification { +} diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy index f5436978..5bb5d41b 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy @@ -8,7 +8,7 @@ import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCo import java.util.concurrent.atomic.AtomicReference -class ConnectSubscribePublishTest extends MqttBrokerTest { +class ConnectSubscribePublishTest extends IntegrationSpecification { def "publisher should publish message QoS 0"() { given: diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy index 628a8dd1..5fac53f2 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy @@ -8,7 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired import java.util.concurrent.CompletionException -class ConnectionTest extends MqttBrokerTest { +class ConnectionTest extends IntegrationSpecification { @Autowired InetSocketAddress deviceNetworkAddress diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/MqttBrokerTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy similarity index 96% rename from src/test/groovy/com/ss/mqtt/broker/test/integration/MqttBrokerTest.groovy rename to src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy index 48c74557..d47e4b10 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/MqttBrokerTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy @@ -10,7 +10,7 @@ import spock.lang.Specification import java.nio.charset.StandardCharsets @ContextConfiguration(classes = MqttBrokerTestConfig) -class MqttBrokerTest extends Specification { +class IntegrationSpecification extends Specification { @Autowired InetSocketAddress deviceNetworkAddress diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy index 69557bb2..7a02232f 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy @@ -2,11 +2,11 @@ package com.ss.mqtt.broker.test.integration.service import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode import com.ss.mqtt.broker.service.ClientIdRegistry -import com.ss.mqtt.broker.test.integration.MqttBrokerTest +import com.ss.mqtt.broker.test.integration.IntegrationSpecification import com.ss.rlib.common.util.StringUtils import org.springframework.beans.factory.annotation.Autowired -class ClientIdRegistryTest extends MqttBrokerTest { +class ClientIdRegistryTest extends IntegrationSpecification { @Autowired ClientIdRegistry clientIdRegistry diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy index e9a6f9b9..57c9bd1d 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy @@ -4,10 +4,10 @@ import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCo import com.ss.mqtt.broker.config.MqttConnectionConfig import com.ss.mqtt.broker.service.ClientIdRegistry import com.ss.mqtt.broker.service.MqttSessionService -import com.ss.mqtt.broker.test.integration.MqttBrokerTest +import com.ss.mqtt.broker.test.integration.IntegrationSpecification import org.springframework.beans.factory.annotation.Autowired -class MqttSessionServiceTest extends MqttBrokerTest { +class MqttSessionServiceTest extends IntegrationSpecification { @Autowired ClientIdRegistry clientIdRegistry diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/PublishRetryServiceTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/PublishRetryServiceTest.groovy new file mode 100644 index 00000000..c44488f3 --- /dev/null +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/PublishRetryServiceTest.groovy @@ -0,0 +1,105 @@ +package com.ss.mqtt.broker.test.integration.service + +import com.ss.mqtt.broker.model.MqttSession +import com.ss.mqtt.broker.network.client.MqttClient +import com.ss.mqtt.broker.network.packet.in.PublishInPacket +import com.ss.mqtt.broker.service.MqttSessionService +import com.ss.mqtt.broker.service.PublishRetryService +import com.ss.mqtt.broker.test.integration.IntegrationSpecification +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value + +import java.util.concurrent.atomic.AtomicInteger + +class PublishRetryServiceTest extends IntegrationSpecification { + + @Autowired + PublishRetryService publishRetryService + + @Autowired + MqttSessionService mqttSessionService + + @Value('${publish.pending.check.interval}') + int checkInterval + + @Value('${publish.retry.interval}') + int retryInterval + + def "client should be presented in re-try service after connecting"() { + given: + def client = buildClient() + def config = client.getConfig() + def identifier = config.getClientIdentifier() + when: + client.connect().join() + then: + noExceptionThrown() + publishRetryService.exist(identifier.get().toString()) + when: + client.disconnect().join() + Thread.sleep(500) + then: + noExceptionThrown() + !publishRetryService.exist(identifier.get().toString()) + } + + def "service should check expired and pending messages from time to time"() { + given: + + def session = Mock(MqttSession) + def client = Stub(MqttClient) { + getSession() >> session + } + + when: + publishRetryService.register(client) + Thread.sleep(checkInterval) + then: + 1 * session.removeExpiredPackets() + 1 * session.resendPendingPacketsAsync(client, retryInterval) + cleanup: + publishRetryService.unregister(client) + } + + def "service should remove expired and retry pending messages from time to time"() { + given: + + def session = mqttSessionService.create(UUID.randomUUID().toString()).block() + def retryAttempts = new AtomicInteger() + + def handler = Stub(MqttSession.PendingPacketHandler) { + retryAsync(_,_,_) >> { + retryAttempts.incrementAndGet() + } + } + def client = Stub(MqttClient) { + getSession() >> session + } + + publishRetryService.register(client) + when: + def publish = Stub(PublishInPacket) { + getMessageExpiryInterval() >> 1L + } + + session.registerPendingPublish(publish, handler, 1) + + Thread.sleep(1000 + checkInterval) + then: + retryAttempts.get() == 0 + !session.hasPendingPackets() + when: + publish = Stub(PublishInPacket) { + getMessageExpiryInterval() >> 1000L + } + + session.registerPendingPublish(publish, handler, 2) + + Thread.sleep(retryInterval + checkInterval) + then: + retryAttempts.get() == 1 + session.hasPendingPackets() + cleanup: + publishRetryService.unregister(client) + } +} diff --git a/src/test/groovy/com/ss/mqtt/broker/test/network/BasePacketTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/network/NetworkUnitSpecification.groovy similarity index 86% rename from src/test/groovy/com/ss/mqtt/broker/test/network/BasePacketTest.groovy rename to src/test/groovy/com/ss/mqtt/broker/test/network/NetworkUnitSpecification.groovy index bfec4cbe..c0462dd9 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/network/BasePacketTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/network/NetworkUnitSpecification.groovy @@ -9,6 +9,7 @@ import com.ss.mqtt.broker.model.reason.code.UnsubscribeAckReasonCode import com.ss.mqtt.broker.network.MqttConnection import com.ss.mqtt.broker.network.client.MqttClient +import com.ss.mqtt.broker.test.UnitSpecification import com.ss.rlib.common.util.array.Array import com.ss.rlib.common.util.array.ArrayFactory import com.ss.rlib.common.util.array.IntegerArray @@ -17,7 +18,7 @@ import spock.lang.Specification import java.nio.charset.StandardCharsets -class BasePacketTest extends Specification { +class NetworkUnitSpecification extends UnitSpecification { public static final keepAliveEnabled = true public static final sessionsEnabled = true @@ -96,12 +97,12 @@ class BasePacketTest extends Specification { getConfig() >> mqttConnectionConfig getClient() >> Stub(MqttClient.UnsafeMqttClient) { getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> BasePacketTest.sessionExpiryInterval - getReceiveMax() >> BasePacketTest.receiveMaximum - getMaximumPacketSize() >> BasePacketTest.maximumPacketSize + getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + getReceiveMax() >> NetworkUnitSpecification.receiveMaximum + getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize getClientId() >> clientId getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> BasePacketTest.topicAliasMaximum + getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } } @@ -112,12 +113,12 @@ class BasePacketTest extends Specification { getConfig() >> mqttConnectionConfig getClient() >> Stub(MqttClient.UnsafeMqttClient) { getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> BasePacketTest.sessionExpiryInterval - getReceiveMax() >> BasePacketTest.receiveMaximum - getMaximumPacketSize() >> BasePacketTest.maximumPacketSize + getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + getReceiveMax() >> NetworkUnitSpecification.receiveMaximum + getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize getClientId() >> clientId getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> BasePacketTest.topicAliasMaximum + getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } } } diff --git a/src/test/groovy/com/ss/mqtt/broker/test/network/in/BaseInPacketTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/network/in/BaseInPacketTest.groovy index 78e98c5f..ac35361f 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/network/in/BaseInPacketTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/network/in/BaseInPacketTest.groovy @@ -1,6 +1,6 @@ package com.ss.mqtt.broker.test.network.in -import com.ss.mqtt.broker.test.network.BasePacketTest +import com.ss.mqtt.broker.test.network.NetworkUnitSpecification -class BaseInPacketTest extends BasePacketTest { +class BaseInPacketTest extends NetworkUnitSpecification { } diff --git a/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy index 65e07d41..3ce976f1 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy @@ -36,7 +36,7 @@ class PublishInPacketTest extends BaseInPacketTest { packet.payload == publishPayload packet.packetId == packetId packet.userProperties == Array.empty() - packet.messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT + packet.messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED packet.topicAlias == MqttPropertyConstants.TOPIC_ALIAS_DEFAULT packet.payloadFormatIndicator == MqttPropertyConstants.PAYLOAD_FORMAT_INDICATOR_DEFAULT } diff --git a/src/test/groovy/com/ss/mqtt/broker/test/network/out/BaseOutPacketTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/network/out/BaseOutPacketTest.groovy index 7bd7fcc8..51b50748 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/network/out/BaseOutPacketTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/network/out/BaseOutPacketTest.groovy @@ -1,30 +1,30 @@ package com.ss.mqtt.broker.test.network.out import com.ss.mqtt.broker.network.client.MqttClient -import com.ss.mqtt.broker.test.network.BasePacketTest +import com.ss.mqtt.broker.test.network.NetworkUnitSpecification import spock.lang.Shared -class BaseOutPacketTest extends BasePacketTest { +class BaseOutPacketTest extends NetworkUnitSpecification { @Shared MqttClient mqtt5Client = Stub(MqttClient.UnsafeMqttClient) { getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> BasePacketTest.sessionExpiryInterval - getReceiveMax() >> BasePacketTest.receiveMaximum - getMaximumPacketSize() >> BasePacketTest.maximumPacketSize + getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + getReceiveMax() >> NetworkUnitSpecification.receiveMaximum + getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize getClientId() >> clientId getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> BasePacketTest.topicAliasMaximum + getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } @Shared MqttClient mqtt311Client = Stub(MqttClient.UnsafeMqttClient) { getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> BasePacketTest.sessionExpiryInterval - getReceiveMax() >> BasePacketTest.receiveMaximum - getMaximumPacketSize() >> BasePacketTest.maximumPacketSize + getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + getReceiveMax() >> NetworkUnitSpecification.receiveMaximum + getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize getClientId() >> clientId getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> BasePacketTest.topicAliasMaximum + getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } } diff --git a/src/test/groovy/com/ss/mqtt/broker/test/util/MqttDataUtilsTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/util/MqttDataUtilsTest.groovy index 21bff93f..a73b3736 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/util/MqttDataUtilsTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/util/MqttDataUtilsTest.groovy @@ -1,11 +1,11 @@ package com.ss.mqtt.broker.test.util +import com.ss.mqtt.broker.test.UnitSpecification import com.ss.mqtt.broker.util.MqttDataUtils -import spock.lang.Specification import java.nio.ByteBuffer -class MqttDataUtilsTest extends Specification { +class MqttDataUtilsTest extends UnitSpecification { def "should write integer to MQTT multi byte integer successful"(int value, int expectedBytes) { given: diff --git a/src/test/resources/application-test.properties b/src/test/resources/application-test.properties index 8e5b4492..cd5527f3 100644 --- a/src/test/resources/application-test.properties +++ b/src/test/resources/application-test.properties @@ -1,2 +1,5 @@ authentication.allow.anonymous=true credentials.source.file.name=credentials-test + +publish.pending.check.interval=200 +publish.retry.interval=1500 From 34693cd281ab58891c3185d93e4e674a844c7a61 Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Thu, 21 Nov 2019 09:50:00 +0300 Subject: [PATCH 5/8] [broker-15] remove missed file --- .../java/com/ss/mqtt/broker/config/MqttConnectionConfig.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java b/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java index 3287563f..4a449066 100644 --- a/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java +++ b/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java @@ -26,7 +26,4 @@ public class MqttConnectionConfig { private final boolean wildcardSubscriptionAvailable; private final boolean subscriptionIdAvailable; private final boolean sharedSubscriptionAvailable; - - - @NoArgsConstructor @Data class MyClass { final String field = "";} } From 3bd10762b247c9e64d006d6fc8cbef4e315105eb Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Thu, 21 Nov 2019 20:56:46 +0300 Subject: [PATCH 6/8] [broker-15] small fixes --- .../com/ss/mqtt/broker/network/client/AbstractMqttClient.java | 1 - .../ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/java/com/ss/mqtt/broker/network/client/AbstractMqttClient.java b/src/main/java/com/ss/mqtt/broker/network/client/AbstractMqttClient.java index 641fc637..d5d2aaf2 100644 --- a/src/main/java/com/ss/mqtt/broker/network/client/AbstractMqttClient.java +++ b/src/main/java/com/ss/mqtt/broker/network/client/AbstractMqttClient.java @@ -24,7 +24,6 @@ @Getter @Log4j2 @ToString(of = "clientId") -@EqualsAndHashCode(of = "clientId") public abstract class AbstractMqttClient implements UnsafeMqttClient { protected final @NotNull MqttConnection connection; diff --git a/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy index 3ce976f1..b40491ab 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy @@ -105,7 +105,7 @@ class PublishInPacketTest extends BaseInPacketTest { packet.payload == publishPayload packet.packetId == packetId packet.userProperties == Array.empty() - packet.messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT + packet.messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED packet.topicAlias == MqttPropertyConstants.TOPIC_ALIAS_DEFAULT packet.payloadFormatIndicator == MqttPropertyConstants.PAYLOAD_FORMAT_INDICATOR_DEFAULT } From 510d5274ff68e646dc52e507c8d73bac5d099c20 Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Fri, 22 Nov 2019 08:50:39 +0300 Subject: [PATCH 7/8] [broker-15] fix for code review --- .../broker/model/MqttPropertyConstants.java | 1 - .../broker/model/impl/DefaultMqttSession.java | 61 +++++++++---------- .../impl/InMemoryMqttSessionService.java | 6 +- 3 files changed, 35 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java b/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java index ca3da5b6..436cdf51 100644 --- a/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java +++ b/src/main/java/com/ss/mqtt/broker/model/MqttPropertyConstants.java @@ -52,5 +52,4 @@ public interface MqttPropertyConstants { boolean SUBSCRIPTION_IDENTIFIER_AVAILABLE_DEFAULT = false; int PACKET_ID_FOR_QOS_0 = 0; - int PACKET_ID_MAX = 0xFFFF; } diff --git a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java index 10be0d8d..684cdf00 100644 --- a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java +++ b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java @@ -81,38 +81,9 @@ public boolean hasPendingPackets() { @Override public void removeExpiredPackets() { - - if (pendingPublishes.isEmpty()) { - return; + if (!pendingPublishes.isEmpty()) { + pendingPublishes.runInWriteLock(this::removeExpiredPackets); } - - pendingPublishes.runInWriteLock(publishes -> { - - var currentTime = System.currentTimeMillis(); - var array = publishes.array(); - - for (int i = 0, length = publishes.size(); i < length; i++) { - - var pendingPublish = array[i]; - - var publish = pendingPublish.publish; - var messageExpiryInterval = publish.getMessageExpiryInterval(); - - if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED || - messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) { - continue; - } - - var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000); - - if (expiredTime < currentTime) { - log.debug("Remove pending publish {} by expiration reason", publish); - publishes.fastRemove(i); - i--; - length--; - } - } - }); } @Override @@ -167,4 +138,32 @@ public void updatePendingPacket( public void clear() { pendingPublishes.runInWriteLock(Collection::clear); } + + private void removeExpiredPackets(@NotNull Array publishes) { + + var currentTime = System.currentTimeMillis(); + var array = publishes.array(); + + for (int i = 0, length = publishes.size(); i < length; i++) { + + var pendingPublish = array[i]; + + var publish = pendingPublish.publish; + var messageExpiryInterval = publish.getMessageExpiryInterval(); + + if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED || + messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) { + continue; + } + + var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000); + + if (expiredTime < currentTime) { + log.debug("Remove pending publish {} by expiration reason", publish); + publishes.fastRemove(i); + i--; + length--; + } + } + } } diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java b/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java index 44ad140b..699c586c 100644 --- a/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java +++ b/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java @@ -64,7 +64,11 @@ public InMemoryMqttSessionService(int cleanInterval) { } @Override - public @NotNull Mono store(@NotNull String clientId, @NotNull MqttSession session, long expiryInterval) { + public @NotNull Mono store( + @NotNull String clientId, + @NotNull MqttSession session, + long expiryInterval + ) { var unsafe = (UnsafeMqttSession) session; unsafe.setExpirationTime(System.currentTimeMillis() + (expiryInterval * 1000)); From 269e264e07482d71d0663e6d7082b5103ff6b034 Mon Sep 17 00:00:00 2001 From: Alex Brui Date: Sat, 23 Nov 2019 10:26:02 +0300 Subject: [PATCH 8/8] [broker-15] fix for code review --- .../broker/model/impl/DefaultMqttSession.java | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java index 684cdf00..f59501e0 100644 --- a/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java +++ b/src/main/java/com/ss/mqtt/broker/model/impl/DefaultMqttSession.java @@ -31,6 +31,34 @@ private static class PendingPublish { private volatile long lastAttemptTime; } + private static void removeExpiredPackets(@NotNull Array publishes) { + + var currentTime = System.currentTimeMillis(); + var array = publishes.array(); + + for (int i = 0, length = publishes.size(); i < length; i++) { + + var pendingPublish = array[i]; + + var publish = pendingPublish.publish; + var messageExpiryInterval = publish.getMessageExpiryInterval(); + + if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED || + messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) { + continue; + } + + var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000); + + if (expiredTime < currentTime) { + log.debug("Remove pending publish {} by expiration reason", publish); + publishes.fastRemove(i); + i--; + length--; + } + } + } + private final @NotNull String clientId; private final @NotNull ConcurrentArray pendingPublishes; private final @NotNull AtomicInteger packetIdGenerator; @@ -82,7 +110,7 @@ public boolean hasPendingPackets() { @Override public void removeExpiredPackets() { if (!pendingPublishes.isEmpty()) { - pendingPublishes.runInWriteLock(this::removeExpiredPackets); + pendingPublishes.runInWriteLock(DefaultMqttSession::removeExpiredPackets); } } @@ -138,32 +166,4 @@ public void updatePendingPacket( public void clear() { pendingPublishes.runInWriteLock(Collection::clear); } - - private void removeExpiredPackets(@NotNull Array publishes) { - - var currentTime = System.currentTimeMillis(); - var array = publishes.array(); - - for (int i = 0, length = publishes.size(); i < length; i++) { - - var pendingPublish = array[i]; - - var publish = pendingPublish.publish; - var messageExpiryInterval = publish.getMessageExpiryInterval(); - - if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED || - messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) { - continue; - } - - var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000); - - if (expiredTime < currentTime) { - log.debug("Remove pending publish {} by expiration reason", publish); - publishes.fastRemove(i); - i--; - length--; - } - } - } }