diff --git a/build.gradle b/build.gradle index 3a54e9b6..24ca5be4 100644 --- a/build.gradle +++ b/build.gradle @@ -32,7 +32,7 @@ allprojects { ext { annotationVersion = "17.0.0" - rlibVersion = "9.5.0" + rlibVersion = "9.6.0" lombokVersion = '1.18.4' springbootVersion = '2.2.0.RELEASE' springVersion = '5.1.6.RELEASE' diff --git a/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java b/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java index 2e21cde1..d1189ba9 100644 --- a/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java +++ b/src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java @@ -1,13 +1,21 @@ package com.ss.mqtt.broker.config; +import com.ss.mqtt.broker.handler.client.DefaultMqttClientReleaseHandler; +import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler; import com.ss.mqtt.broker.handler.packet.in.*; +import com.ss.mqtt.broker.handler.publish.in.PublishInHandler; +import com.ss.mqtt.broker.handler.publish.in.Qos0PublishInHandler; +import com.ss.mqtt.broker.handler.publish.in.Qos1PublishInHandler; +import com.ss.mqtt.broker.handler.publish.in.Qos2PublishInHandler; +import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler; +import com.ss.mqtt.broker.handler.publish.out.Qos0PublishOutHandler; +import com.ss.mqtt.broker.handler.publish.out.Qos1PublishOutHandler; +import com.ss.mqtt.broker.handler.publish.out.Qos2PublishOutHandler; import com.ss.mqtt.broker.model.MqttPropertyConstants; import com.ss.mqtt.broker.model.QoS; import com.ss.mqtt.broker.network.MqttConnection; -import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler; -import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient; import com.ss.mqtt.broker.network.client.DeviceMqttClient; -import com.ss.mqtt.broker.handler.client.DeviceMqttClientReleaseHandler; +import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.mqtt.broker.service.*; import com.ss.mqtt.broker.service.impl.*; @@ -86,29 +94,33 @@ private interface ChannelFactory extends @NotNull ClientIdRegistry clientIdRegistry, @NotNull SubscriptionService subscriptionService, @NotNull PublishingService publishingService, - @NotNull MqttSessionService mqttSessionService + @NotNull MqttSessionService mqttSessionService, + @NotNull PublishRetryService publishRetryService ) { var handlers = new PacketInHandler[PacketType.INVALID.ordinal()]; handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler( clientIdRegistry, authenticationService, - mqttSessionService + mqttSessionService, + publishRetryService ); handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService); handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService); handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService); handlers[PacketType.DISCONNECT.ordinal()] = new DisconnetInPacketHandler(); + handlers[PacketType.PUBLISH_ACK.ordinal()] = new PublishAckInPacketHandler(); return handlers; } @Bean - @NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler( + @NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler( @NotNull ClientIdRegistry clientIdRegistry, - @NotNull MqttSessionService mqttSessionService + @NotNull MqttSessionService mqttSessionService, + @NotNull PublishRetryService publishRetryService ) { - return new DeviceMqttClientReleaseHandler(clientIdRegistry, mqttSessionService); + return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, publishRetryService); } @Bean @@ -130,6 +142,14 @@ private interface ChannelFactory extends ); } + @Bean + @NotNull PublishRetryService publishRetryService() { + return new DefaultPublishRetryService( + env.getProperty("publish.pending.check.interval", int.class, 60 * 1000), + env.getProperty("publish.retry.interval", int.class, 60 * 1000) + ); + } + @Bean @NotNull InetSocketAddress deviceNetworkAddress( @NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork, @@ -150,8 +170,29 @@ private interface ChannelFactory extends } @Bean - @NotNull PublishingService publishingService(@NotNull SubscriptionService subscriptionService) { - return new SimplePublishingService(subscriptionService); + @NotNull PublishOutHandler[] publishOutHandlers() { + return new PublishOutHandler[] { + new Qos0PublishOutHandler(), + new Qos1PublishOutHandler(), + new Qos2PublishOutHandler(), + }; + } + + @Bean + @NotNull PublishInHandler[] publishInHandlers( + @NotNull SubscriptionService subscriptionService, + @NotNull PublishOutHandler[] publishOutHandlers + ) { + return new PublishInHandler[] { + new Qos0PublishInHandler(subscriptionService, publishOutHandlers), + new Qos1PublishInHandler(subscriptionService, publishOutHandlers), + new Qos2PublishInHandler(subscriptionService, publishOutHandlers), + }; + } + + @Bean + @NotNull PublishingService publishingService(@NotNull PublishInHandler[] publishInHandlers) { + return new DefaultPublishingService(publishInHandlers); } @Bean diff --git a/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java b/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java index a355035e..4a449066 100644 --- a/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java +++ b/src/main/java/com/ss/mqtt/broker/config/MqttConnectionConfig.java @@ -1,7 +1,9 @@ package com.ss.mqtt.broker.config; import com.ss.mqtt.broker.model.QoS; +import lombok.Data; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.RequiredArgsConstructor; import org.jetbrains.annotations.NotNull; diff --git a/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java b/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java index 95c4718e..3f1bb032 100644 --- a/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java +++ b/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt311PacketOutFactory.java @@ -30,7 +30,7 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory { } @Override - public @NotNull MqttWritablePacket newPublish( + public @NotNull PublishOutPacket newPublish( @NotNull MqttClient client, int packetId, @NotNull QoS qos, diff --git a/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java b/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java index 2706bc2c..f6bda5f3 100644 --- a/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java +++ b/src/main/java/com/ss/mqtt/broker/factory/packet/out/Mqtt5PacketOutFactory.java @@ -44,7 +44,7 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory { } @Override - public @NotNull MqttWritablePacket newPublish( + public @NotNull PublishOutPacket newPublish( @NotNull MqttClient client, int packetId, @NotNull QoS qos, diff --git a/src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java b/src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java index 67c321ed..285a4cff 100644 --- a/src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java +++ b/src/main/java/com/ss/mqtt/broker/factory/packet/out/MqttPacketOutFactory.java @@ -5,6 +5,7 @@ import com.ss.mqtt.broker.model.reason.code.*; import com.ss.mqtt.broker.network.client.MqttClient; import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket; +import com.ss.mqtt.broker.network.packet.out.PublishOutPacket; import com.ss.rlib.common.util.ArrayUtils; import com.ss.rlib.common.util.StringUtils; import com.ss.rlib.common.util.array.Array; @@ -76,7 +77,7 @@ public abstract class MqttPacketOutFactory { } - public @NotNull MqttWritablePacket newPublish( + public @NotNull PublishOutPacket newPublish( @NotNull MqttClient client, int packetId, @NotNull QoS qos, @@ -101,7 +102,7 @@ public abstract class MqttPacketOutFactory { ); } - public abstract @NotNull MqttWritablePacket newPublish( + public abstract @NotNull PublishOutPacket newPublish( @NotNull MqttClient client, int packetId, @NotNull QoS qos, diff --git a/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java b/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java index 1ea5b3db..eeecefa7 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/client/AbstractMqttClientReleaseHandler.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.network.client.AbstractMqttClient; import com.ss.mqtt.broker.service.ClientIdRegistry; import com.ss.mqtt.broker.service.MqttSessionService; +import com.ss.mqtt.broker.service.PublishRetryService; import com.ss.rlib.common.util.StringUtils; import lombok.RequiredArgsConstructor; import lombok.extern.log4j.Log4j2; @@ -17,6 +18,7 @@ public abstract class AbstractMqttClientReleaseHandler release(@NotNull UnsafeMqttClient client) { @@ -25,6 +27,7 @@ public abstract class AbstractMqttClientReleaseHandler releaseImpl(@NotNull T client) { + publishRetryService.unregister(client); var clientId = client.getClientId(); client.setClientId(StringUtils.EMPTY); diff --git a/src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java b/src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java new file mode 100644 index 00000000..236193d0 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/handler/client/DefaultMqttClientReleaseHandler.java @@ -0,0 +1,18 @@ +package com.ss.mqtt.broker.handler.client; + +import com.ss.mqtt.broker.network.client.DeviceMqttClient; +import com.ss.mqtt.broker.service.ClientIdRegistry; +import com.ss.mqtt.broker.service.MqttSessionService; +import com.ss.mqtt.broker.service.PublishRetryService; +import org.jetbrains.annotations.NotNull; + +public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler { + + public DefaultMqttClientReleaseHandler( + @NotNull ClientIdRegistry clientIdRegistry, + @NotNull MqttSessionService sessionService, + @NotNull PublishRetryService publishRetryService + ) { + super(clientIdRegistry, sessionService, publishRetryService); + } +} diff --git a/src/main/java/com/ss/mqtt/broker/handler/client/DeviceMqttClientReleaseHandler.java b/src/main/java/com/ss/mqtt/broker/handler/client/DeviceMqttClientReleaseHandler.java deleted file mode 100644 index a8232259..00000000 --- a/src/main/java/com/ss/mqtt/broker/handler/client/DeviceMqttClientReleaseHandler.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.ss.mqtt.broker.handler.client; - -import com.ss.mqtt.broker.network.client.DeviceMqttClient; -import com.ss.mqtt.broker.service.ClientIdRegistry; -import com.ss.mqtt.broker.service.MqttSessionService; -import org.jetbrains.annotations.NotNull; - -public class DeviceMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler { - - public DeviceMqttClientReleaseHandler( - @NotNull ClientIdRegistry clientIdRegistry, - @NotNull MqttSessionService sessionService - ) { - super(clientIdRegistry, sessionService); - } -} diff --git a/src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java b/src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java index 0b5b22a9..0d21406b 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/packet/in/ConnectInPacketHandler.java @@ -13,6 +13,7 @@ import com.ss.mqtt.broker.service.AuthenticationService; import com.ss.mqtt.broker.service.ClientIdRegistry; import com.ss.mqtt.broker.service.MqttSessionService; +import com.ss.mqtt.broker.service.PublishRetryService; import com.ss.rlib.common.util.StringUtils; import lombok.RequiredArgsConstructor; import org.jetbrains.annotations.NotNull; @@ -21,9 +22,10 @@ @RequiredArgsConstructor public class ConnectInPacketHandler extends AbstractPacketHandler { - private final ClientIdRegistry clientIdRegistry; - private final AuthenticationService authenticationService; - private final MqttSessionService mqttSessionService; + private final @NotNull ClientIdRegistry clientIdRegistry; + private final @NotNull AuthenticationService authenticationService; + private final @NotNull MqttSessionService mqttSessionService; + private final @NotNull PublishRetryService publishRetryService; @Override protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) { @@ -135,6 +137,8 @@ private Mono onConnected( packet.getReceiveMax() )); + publishRetryService.register(client); + return Mono.just(Boolean.TRUE); } diff --git a/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java new file mode 100644 index 00000000..e3313ddc --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishAckInPacketHandler.java @@ -0,0 +1,15 @@ +package com.ss.mqtt.broker.handler.packet.in; + +import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient; +import com.ss.mqtt.broker.network.packet.in.PublishAckInPacket; +import lombok.RequiredArgsConstructor; +import org.jetbrains.annotations.NotNull; + +@RequiredArgsConstructor +public class PublishAckInPacketHandler extends AbstractPacketHandler { + + @Override + protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishAckInPacket packet) { + client.getSession().updatePendingPacket(client, packet); + } +} diff --git a/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishInPacketHandler.java b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishInPacketHandler.java index ab1dd3ea..027074a3 100644 --- a/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishInPacketHandler.java +++ b/src/main/java/com/ss/mqtt/broker/handler/packet/in/PublishInPacketHandler.java @@ -13,13 +13,6 @@ public class PublishInPacketHandler extends AbstractPacketHandler publishes) { + + var currentTime = System.currentTimeMillis(); + var array = publishes.array(); + + for (int i = 0, length = publishes.size(); i < length; i++) { + + var pendingPublish = array[i]; + + var publish = pendingPublish.publish; + var messageExpiryInterval = publish.getMessageExpiryInterval(); + + if (messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED || + messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_INFINITY) { + continue; + } + + var expiredTime = pendingPublish.registeredTime + (messageExpiryInterval * 1000); + + if (expiredTime < currentTime) { + log.debug("Remove pending publish {} by expiration reason", publish); + publishes.fastRemove(i); + i--; + length--; + } + } + } + private final @NotNull String clientId; + private final @NotNull ConcurrentArray pendingPublishes; + private final @NotNull AtomicInteger packetIdGenerator; private volatile @Getter @Setter long expirationTime = -1; + public DefaultMqttSession(@NotNull String clientId) { + this.clientId = clientId; + this.pendingPublishes = ConcurrentArray.ofType(PendingPublish.class); + this.packetIdGenerator = new AtomicInteger(0); + } + + @Override + public int nextPacketId() { + + var nextId = packetIdGenerator.incrementAndGet(); + + if (nextId >= MqttPropertyConstants.MAXIMUM_PACKET_ID) { + packetIdGenerator.compareAndSet(nextId, 0); + return nextPacketId(); + } + + return nextId; + } + @Override public @NotNull String getClientId() { return clientId; } + + @Override + public void registerPendingPublish( + @NotNull PublishInPacket publish, + @NotNull PendingPacketHandler handler, + int packetId + ) { + + var currentTime = System.currentTimeMillis(); + var pendingPublish = new PendingPublish(publish, handler, currentTime, packetId, currentTime); + + pendingPublishes.runInWriteLock(pendingPublish, Array::add); + } + + @Override + public boolean hasPendingPackets() { + return !pendingPublishes.isEmpty(); + } + + @Override + public void removeExpiredPackets() { + if (!pendingPublishes.isEmpty()) { + pendingPublishes.runInWriteLock(DefaultMqttSession::removeExpiredPackets); + } + } + + @Override + public void resendPendingPacketsAsync(@NotNull MqttClient client, int retryInterval) { + var currentTime = System.currentTimeMillis(); + var stamp = pendingPublishes.readLock(); + try { + + for (var pendingPublish : pendingPublishes) { + + if (currentTime - pendingPublish.lastAttemptTime <= retryInterval) { + continue; + } + + log.debug("Re-try to send publish {}", pendingPublish.publish); + + pendingPublish.lastAttemptTime = currentTime; + pendingPublish.handler.retryAsync(client, pendingPublish.publish, pendingPublish.packetId); + } + + } finally { + pendingPublishes.readUnlock(stamp); + } + } + + @Override + public void updatePendingPacket( + @NotNull MqttClient client, + @NotNull HasPacketId response + ) { + + var packetId = response.getPacketId(); + var pendingPublish = pendingPublishes.findAnyConvertedToIntInReadLock( + packetId, + pending -> pending.publish.getPacketId(), + (id, targetId) -> id == targetId + ); + + if (pendingPublish == null) { + log.warn("Not found pending publish for client {} by received packet {}", clientId, response); + return; + } + + var shouldBeRemoved = pendingPublish.handler.handleResponse(client, ClassUtils.unsafeNNCast(response)); + + if (shouldBeRemoved) { + pendingPublishes.runInWriteLock(pendingPublish, Array::fastRemove); + } + } + + @Override + public void clear() { + pendingPublishes.runInWriteLock(Collection::clear); + } } diff --git a/src/main/java/com/ss/mqtt/broker/network/client/AbstractMqttClient.java b/src/main/java/com/ss/mqtt/broker/network/client/AbstractMqttClient.java index 641fc637..d5d2aaf2 100644 --- a/src/main/java/com/ss/mqtt/broker/network/client/AbstractMqttClient.java +++ b/src/main/java/com/ss/mqtt/broker/network/client/AbstractMqttClient.java @@ -24,7 +24,6 @@ @Getter @Log4j2 @ToString(of = "clientId") -@EqualsAndHashCode(of = "clientId") public abstract class AbstractMqttClient implements UnsafeMqttClient { protected final @NotNull MqttConnection connection; diff --git a/src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java b/src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java index e7b051b9..6ef2f3c5 100644 --- a/src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java +++ b/src/main/java/com/ss/mqtt/broker/network/client/MqttClient.java @@ -8,6 +8,7 @@ import com.ss.mqtt.broker.network.packet.in.MqttReadablePacket; import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import reactor.core.publisher.Mono; public interface MqttClient { @@ -30,7 +31,7 @@ void configure( void setClientId(@NotNull String clientId); - void setSession(@NotNull MqttSession session); + void setSession(@Nullable MqttSession session); void reject(@NotNull ConnectAckReasonCode reasonCode); @@ -41,7 +42,7 @@ void configure( @NotNull MqttConnectionConfig getConnectionConfig(); @NotNull String getClientId(); - @NotNull MqttSession getSession(); + @Nullable MqttSession getSession(); int getKeepAlive(); int getMaximumPacketSize(); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/HasPacketId.java b/src/main/java/com/ss/mqtt/broker/network/packet/HasPacketId.java new file mode 100644 index 00000000..e39490c9 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/network/packet/HasPacketId.java @@ -0,0 +1,8 @@ +package com.ss.mqtt.broker.network.packet; + +import com.ss.rlib.network.packet.Packet; + +public interface HasPacketId extends Packet { + + int getPacketId(); +} diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java index 72ffbd79..6fabaf35 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishAckInPacket.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.model.PacketProperty; import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode; import com.ss.mqtt.broker.network.MqttConnection; +import com.ss.mqtt.broker.network.packet.HasPacketId; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.rlib.common.util.StringUtils; import lombok.Getter; @@ -17,7 +18,7 @@ * Publish acknowledgment (QoS 1). */ @Getter -public class PublishAckInPacket extends MqttReadablePacket { +public class PublishAckInPacket extends MqttReadablePacket implements HasPacketId { private static final int PACKET_TYPE = PacketType.PUBLISH_ACK.ordinal(); @@ -67,11 +68,16 @@ protected void readVariableHeader(@NotNull MqttConnection connection, @NotNull B packetId = readUnsignedShort(buffer); // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901123 - if (connection.isSupported(MqttVersion.MQTT_5)) { + if (connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining()) { reasonCode = PublishAckReasonCode.of(readUnsignedByte(buffer)); } } + @Override + protected boolean isPropertiesSupported(@NotNull MqttConnection connection, @NotNull ByteBuffer buffer) { + return connection.isSupported(MqttVersion.MQTT_5) && buffer.hasRemaining(); + } + @Override protected @NotNull Set getAvailableProperties() { return AVAILABLE_PROPERTIES; diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishCompleteInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishCompleteInPacket.java index 1f0c7cfc..37aa05ea 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishCompleteInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishCompleteInPacket.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.model.PacketProperty; import com.ss.mqtt.broker.model.reason.code.PublishCompletedReasonCode; import com.ss.mqtt.broker.network.MqttConnection; +import com.ss.mqtt.broker.network.packet.HasPacketId; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.rlib.common.util.StringUtils; import lombok.Getter; @@ -17,7 +18,7 @@ * Publish complete (QoS 2 delivery part 3). */ @Getter -public class PublishCompleteInPacket extends MqttReadablePacket { +public class PublishCompleteInPacket extends MqttReadablePacket implements HasPacketId { private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH_COMPLETED.ordinal(); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java index f1a0dc9b..6cb99c9e 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishInPacket.java @@ -265,7 +265,7 @@ public class PublishInPacket extends MqttReadablePacket { private @NotNull byte[] correlationData; - private long messageExpiryInterval = MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT; + private long messageExpiryInterval = MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED; private int topicAlias = MqttPropertyConstants.TOPIC_ALIAS_DEFAULT; private boolean payloadFormatIndicator = MqttPropertyConstants.PAYLOAD_FORMAT_INDICATOR_DEFAULT; @@ -315,8 +315,8 @@ protected void applyProperty(@NotNull PacketProperty property, long value) { topicAlias = NumberUtils.validate( (int) value, MqttPropertyConstants.TOPIC_ALIAS_MIN, - MqttPropertyConstants.TOPIC_ALIAS_MAX) - ; + MqttPropertyConstants.TOPIC_ALIAS_MAX + ); break; case MESSAGE_EXPIRY_INTERVAL: messageExpiryInterval = value; diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReceivedInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReceivedInPacket.java index b09aea31..94e45333 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReceivedInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReceivedInPacket.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.model.PacketProperty; import com.ss.mqtt.broker.model.reason.code.PublishReceivedReasonCode; import com.ss.mqtt.broker.network.MqttConnection; +import com.ss.mqtt.broker.network.packet.HasPacketId; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.rlib.common.util.StringUtils; import lombok.Getter; @@ -17,7 +18,7 @@ * Publish received (QoS 2 delivery part 1). */ @Getter -public class PublishReceivedInPacket extends MqttReadablePacket { +public class PublishReceivedInPacket extends MqttReadablePacket implements HasPacketId { private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH_RECEIVED.ordinal(); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReleaseInPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReleaseInPacket.java index b7687af1..006afabc 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReleaseInPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/in/PublishReleaseInPacket.java @@ -4,6 +4,7 @@ import com.ss.mqtt.broker.model.PacketProperty; import com.ss.mqtt.broker.model.reason.code.PublishReleaseReasonCode; import com.ss.mqtt.broker.network.MqttConnection; +import com.ss.mqtt.broker.network.packet.HasPacketId; import com.ss.mqtt.broker.network.packet.PacketType; import com.ss.rlib.common.util.StringUtils; import lombok.Getter; @@ -17,7 +18,7 @@ * Publish release (QoS 2 delivery part 2). */ @Getter -public class PublishReleaseInPacket extends MqttReadablePacket { +public class PublishReleaseInPacket extends MqttReadablePacket implements HasPacketId { private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH_RELEASED.ordinal(); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish311OutPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish311OutPacket.java index a247d253..2934a40f 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish311OutPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish311OutPacket.java @@ -2,16 +2,12 @@ import com.ss.mqtt.broker.model.QoS; import com.ss.mqtt.broker.network.client.MqttClient; -import com.ss.mqtt.broker.network.packet.PacketType; import org.jetbrains.annotations.NotNull; import java.nio.ByteBuffer; -public class Publish311OutPacket extends MqttWritablePacket { +public class Publish311OutPacket extends PublishOutPacket { - private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH.ordinal(); - - private final int packetId; private final boolean retained; private final boolean duplicate; private final @NotNull QoS qos; @@ -27,11 +23,10 @@ public Publish311OutPacket( @NotNull String topicName, @NotNull byte[] payload ) { - super(client); + super(client, packetId); this.qos = qos; this.retained = retained; this.duplicate = duplicate; - this.packetId = packetId; this.payload = payload; this.topicName = topicName; } @@ -57,11 +52,6 @@ protected byte getPacketFlags() { return info; } - @Override - protected byte getPacketType() { - return PACKET_TYPE; - } - @Override protected void writeVariableHeader(@NotNull ByteBuffer buffer) { // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc384800412 diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java index 6e81bfb8..1c97de8a 100644 --- a/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java +++ b/src/main/java/com/ss/mqtt/broker/network/packet/out/Publish5OutPacket.java @@ -135,7 +135,7 @@ public class Publish5OutPacket extends Publish311OutPacket { ); - private final int topciAlias; + private final int topicAlias; private final boolean stringPayload; private final @NotNull String responseTopic; private final @NotNull byte[] correlationData; @@ -148,7 +148,7 @@ public Publish5OutPacket( boolean retained, boolean duplicate, @NotNull String topicName, - int topciAlias, + int topicAlias, @NotNull byte[] payload, boolean stringPayload, @NotNull String responseTopic, @@ -156,7 +156,7 @@ public Publish5OutPacket( @NotNull Array userProperties ) { super(client, packetId, qos, retained, duplicate, topicName, payload); - this.topciAlias = topciAlias; + this.topicAlias = topicAlias; this.stringPayload = stringPayload; this.responseTopic = responseTopic; this.correlationData = correlationData; @@ -176,9 +176,9 @@ protected void writeProperties(@NotNull ByteBuffer buffer) { writeProperty(buffer, PacketProperty.MESSAGE_EXPIRY_INTERVAL, 0, - MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT + MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED ); - writeProperty(buffer, PacketProperty.TOPIC_ALIAS, topciAlias, MqttPropertyConstants.TOPIC_ALIAS_DEFAULT); + writeProperty(buffer, PacketProperty.TOPIC_ALIAS, topicAlias, MqttPropertyConstants.TOPIC_ALIAS_DEFAULT); writeNotEmptyProperty(buffer, PacketProperty.RESPONSE_TOPIC, responseTopic); writeNotEmptyProperty(buffer, PacketProperty.CORRELATION_DATA, correlationData); writeStringPairProperties(buffer, PacketProperty.USER_PROPERTY, userProperties); diff --git a/src/main/java/com/ss/mqtt/broker/network/packet/out/PublishOutPacket.java b/src/main/java/com/ss/mqtt/broker/network/packet/out/PublishOutPacket.java new file mode 100644 index 00000000..0cf9d127 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/network/packet/out/PublishOutPacket.java @@ -0,0 +1,24 @@ +package com.ss.mqtt.broker.network.packet.out; + +import com.ss.mqtt.broker.network.client.MqttClient; +import com.ss.mqtt.broker.network.packet.HasPacketId; +import com.ss.mqtt.broker.network.packet.PacketType; +import lombok.Getter; +import org.jetbrains.annotations.NotNull; + +public abstract class PublishOutPacket extends MqttWritablePacket implements HasPacketId { + + private static final byte PACKET_TYPE = (byte) PacketType.PUBLISH.ordinal(); + + protected final @Getter int packetId; + + public PublishOutPacket(@NotNull MqttClient client, int packetId) { + super(client); + this.packetId = packetId; + } + + @Override + protected byte getPacketType() { + return PACKET_TYPE; + } +} diff --git a/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java b/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java new file mode 100644 index 00000000..6e423bf0 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/service/PublishRetryService.java @@ -0,0 +1,13 @@ +package com.ss.mqtt.broker.service; + +import com.ss.mqtt.broker.network.client.MqttClient; +import org.jetbrains.annotations.NotNull; + +public interface PublishRetryService { + + void register(@NotNull MqttClient client); + + void unregister(@NotNull MqttClient client); + + boolean exist(@NotNull String clientId); +} diff --git a/src/main/java/com/ss/mqtt/broker/service/PublishingService.java b/src/main/java/com/ss/mqtt/broker/service/PublishingService.java index 2a1bca6a..28aa887b 100644 --- a/src/main/java/com/ss/mqtt/broker/service/PublishingService.java +++ b/src/main/java/com/ss/mqtt/broker/service/PublishingService.java @@ -1,20 +1,10 @@ package com.ss.mqtt.broker.service; -import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode; +import com.ss.mqtt.broker.network.client.MqttClient; import com.ss.mqtt.broker.network.packet.in.PublishInPacket; import org.jetbrains.annotations.NotNull; -/** - * Publishing service - */ public interface PublishingService { - /** - * Sends publish packet to all subscribers - * - * @param publish publish packet to send - * @return publish ack reason code - */ - @NotNull PublishAckReasonCode publish(@NotNull PublishInPacket publish); - + void publish(@NotNull MqttClient client, @NotNull PublishInPacket publish); } diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java new file mode 100644 index 00000000..1dc966ff --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishRetryService.java @@ -0,0 +1,93 @@ +package com.ss.mqtt.broker.service.impl; + +import com.ss.mqtt.broker.network.client.MqttClient; +import com.ss.mqtt.broker.service.PublishRetryService; +import com.ss.rlib.common.concurrent.util.ThreadUtils; +import com.ss.rlib.common.util.array.Array; +import com.ss.rlib.common.util.array.ArrayFactory; +import com.ss.rlib.common.util.array.ConcurrentArray; +import lombok.extern.log4j.Log4j2; +import org.jetbrains.annotations.NotNull; + +import java.io.Closeable; + +@Log4j2 +public class DefaultPublishRetryService implements PublishRetryService, Closeable { + + private final @NotNull ConcurrentArray registeredClients; + private final @NotNull Thread checkThread; + + private final int checkInterval; + private final int retryInterval; + + private volatile boolean closed; + + public DefaultPublishRetryService(int checkInterval, int retryInterval) { + this.checkInterval = checkInterval; + this.retryInterval = retryInterval; + this.registeredClients = ConcurrentArray.ofType(MqttClient.class); + this.checkThread = new Thread(this::checkPendingPackets, "DefaultPublishRetryService-Check"); + this.checkThread.setDaemon(true); + this.checkThread.start(); + } + + @Override + public void register(@NotNull MqttClient client) { + registeredClients.runInWriteLock(client, Array::add); + } + + @Override + public void unregister(@NotNull MqttClient client) { + registeredClients.runInWriteLock(client, Array::fastRemove); + } + + @Override + public boolean exist(@NotNull String clientId) { + return registeredClients.anyMatchInReadLock(clientId, (id, client) -> id.equals(client.getClientId())); + } + + private void checkPendingPackets() { + + var toCheck = ArrayFactory.newArray(MqttClient.class); + var toUnregister = ArrayFactory.newArray(MqttClient.class); + + while (!closed) { + + toCheck.clear(); + toUnregister.clear(); + + ThreadUtils.sleep(checkInterval); + + registeredClients.runInReadLock(toCheck, Array::copyTo); + + log.debug("Check pending packets in {} client(s)", toCheck.size()); + + for (var client : toCheck) { + + var session = client.getSession(); + + // if session is null it means that client was closed + if (session == null) { + toUnregister.add(client); + continue; + } + + session.removeExpiredPackets(); + session.resendPendingPacketsAsync(client, retryInterval); + } + + if (toUnregister.isEmpty()) { + continue; + } + + // unregister closed clients + registeredClients.runInWriteLock(toUnregister, Array::fastRemoveAll); + } + } + + @Override + public void close() { + closed = true; + checkThread.interrupt(); + } +} diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishingService.java b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishingService.java new file mode 100644 index 00000000..17ec06c5 --- /dev/null +++ b/src/main/java/com/ss/mqtt/broker/service/impl/DefaultPublishingService.java @@ -0,0 +1,21 @@ +package com.ss.mqtt.broker.service.impl; + +import com.ss.mqtt.broker.handler.publish.in.PublishInHandler; +import com.ss.mqtt.broker.model.Subscriber; +import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode; +import com.ss.mqtt.broker.network.client.MqttClient; +import com.ss.mqtt.broker.network.packet.in.PublishInPacket; +import com.ss.mqtt.broker.service.PublishingService; +import lombok.RequiredArgsConstructor; +import org.jetbrains.annotations.NotNull; + +@RequiredArgsConstructor +public class DefaultPublishingService implements PublishingService { + + private final PublishInHandler @NotNull [] publishInHandlers; + + @Override + public void publish(@NotNull MqttClient client, @NotNull PublishInPacket publish) { + publishInHandlers[publish.getQos().ordinal()].handle(client, publish); + } +} diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java b/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java index e7fe6002..699c586c 100644 --- a/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java +++ b/src/main/java/com/ss/mqtt/broker/service/impl/InMemoryMqttSessionService.java @@ -1,6 +1,7 @@ package com.ss.mqtt.broker.service.impl; import com.ss.mqtt.broker.model.MqttSession; +import com.ss.mqtt.broker.model.MqttSession.UnsafeMqttSession; import com.ss.mqtt.broker.model.impl.DefaultMqttSession; import com.ss.mqtt.broker.service.MqttSessionService; import com.ss.rlib.common.concurrent.util.ThreadUtils; @@ -18,7 +19,7 @@ @Log4j2 public class InMemoryMqttSessionService implements MqttSessionService, Closeable { - private final @NotNull ConcurrentObjectDictionary storedSession; + private final @NotNull ConcurrentObjectDictionary storedSession; private final @NotNull Thread cleanThread; private final int cleanInterval; @@ -63,12 +64,16 @@ public InMemoryMqttSessionService(int cleanInterval) { } @Override - public @NotNull Mono store(@NotNull String clientId, @NotNull MqttSession session, long expiryInterval) { + public @NotNull Mono store( + @NotNull String clientId, + @NotNull MqttSession session, + long expiryInterval + ) { - var unsafe = (MqttSession.UnsafeMqttSession) session; + var unsafe = (UnsafeMqttSession) session; unsafe.setExpirationTime(System.currentTimeMillis() + (expiryInterval * 1000)); - storedSession.runInWriteLock(clientId, session, ObjectDictionary::put); + storedSession.runInWriteLock(clientId, unsafe, ObjectDictionary::put); log.debug("Stored session for client {}", clientId); @@ -77,8 +82,8 @@ public InMemoryMqttSessionService(int cleanInterval) { private void cleanup() { - var toCheck = ArrayFactory.newArray(MqttSession.class); - var toRemove = ArrayFactory.newArray(MqttSession.class); + var toCheck = ArrayFactory.newArray(UnsafeMqttSession.class); + var toRemove = ArrayFactory.newArray(UnsafeMqttSession.class); while (!closed) { @@ -110,13 +115,18 @@ private void cleanup() { // if we already have new session under the same client id if (removed != null && removed != session) { dictionary.put(session.getClientId(), removed); + } else if (removed != null) { + removed.clear(); } } }); } } - private boolean findToRemove(@NotNull Array toCheck, @NotNull Array toRemove) { + private boolean findToRemove( + @NotNull Array toCheck, + @NotNull Array toRemove + ) { var currentTime = System.currentTimeMillis(); diff --git a/src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java b/src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java deleted file mode 100644 index 3f764473..00000000 --- a/src/main/java/com/ss/mqtt/broker/service/impl/SimplePublishingService.java +++ /dev/null @@ -1,59 +0,0 @@ -package com.ss.mqtt.broker.service.impl; - -import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode; -import com.ss.mqtt.broker.model.Subscriber; -import com.ss.mqtt.broker.network.packet.in.PublishInPacket; -import com.ss.mqtt.broker.service.PublishingService; -import com.ss.mqtt.broker.service.SubscriptionService; -import lombok.RequiredArgsConstructor; -import org.jetbrains.annotations.NotNull; - -/** - * Simple publishing service - */ -@RequiredArgsConstructor -public class SimplePublishingService implements PublishingService { - - private final @NotNull SubscriptionService subscriptionService; - - private static @NotNull PublishAckReasonCode send( - @NotNull Subscriber subscriber, - @NotNull PublishInPacket publish - ) { - - var mqttClient = subscriber.getMqttClient(); - mqttClient.send(mqttClient.getPacketOutFactory().newPublish( - mqttClient, - publish.getPacketId(), - publish.getQos(), - publish.isRetained(), - publish.isDuplicate(), - publish.getTopicName(), - publish.getTopicAlias(), - publish.getPayload(), - publish.isPayloadFormatIndicator(), - publish.getResponseTopic(), - publish.getCorrelationData(), - publish.getUserProperties() - )); - - // TODO this reason code only for QoS 0 - return PublishAckReasonCode.SUCCESS; - } - - @Override - public @NotNull PublishAckReasonCode publish(@NotNull PublishInPacket publish) { - - var subscribers = subscriptionService.getSubscribers(publish.getTopicName()); - - if (subscribers.isEmpty()) { - return PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS; - } - - var success = subscribers.stream() - .map(subscriber -> send(subscriber, publish)) - .allMatch(ackReasonCode -> ackReasonCode.equals(PublishAckReasonCode.SUCCESS)); - - return success ? PublishAckReasonCode.SUCCESS : PublishAckReasonCode.UNSPECIFIED_ERROR; - } -} diff --git a/src/test/groovy/com/ss/mqtt/broker/test/UnitSpecification.groovy b/src/test/groovy/com/ss/mqtt/broker/test/UnitSpecification.groovy new file mode 100644 index 00000000..1faebe21 --- /dev/null +++ b/src/test/groovy/com/ss/mqtt/broker/test/UnitSpecification.groovy @@ -0,0 +1,6 @@ +package com.ss.mqtt.broker.test + +import spock.lang.Specification + +class UnitSpecification extends Specification { +} diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy index e5b26aaf..5bb5d41b 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectSubscribePublishTest.groovy @@ -6,11 +6,13 @@ import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PayloadFormatIndicator import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAckReasonCode -class ConnectSubscribePublishTest extends MqttBrokerTest { +import java.util.concurrent.atomic.AtomicReference + +class ConnectSubscribePublishTest extends IntegrationSpecification { - def "publisher should publish message to broker"() { + def "publisher should publish message QoS 0"() { given: - Mqtt5Publish receivedMessage = null + def received = new AtomicReference() def subscriber = buildClient() def publisher = buildClient() when: @@ -20,7 +22,7 @@ class ConnectSubscribePublishTest extends MqttBrokerTest { def subscribeResult = subscriber.subscribeWith() .topicFilter(topicFilter) .qos(MqttQos.AT_MOST_ONCE) - .callback({ publish -> receivedMessage = publish }) + .callback({ publish -> received.set(publish) }) .send() .join() @@ -45,10 +47,57 @@ class ConnectSubscribePublishTest extends MqttBrokerTest { publishResult.publish.type == Mqtt5MessageType.PUBLISH publishResult.publish.topic.levels.join("/") == topicFilter - receivedMessage != null - receivedMessage.qos == MqttQos.AT_MOST_ONCE - receivedMessage.type == Mqtt5MessageType.PUBLISH - receivedMessage.topic.levels.join("/") == topicFilter + received.get() != null + received.get().qos == MqttQos.AT_MOST_ONCE + received.get().type == Mqtt5MessageType.PUBLISH + received.get().topic.levels.join("/") == topicFilter + cleanup: + subscriber.disconnect() + publisher.disconnect() + } + + def "publisher should publish message QoS 1"() { + given: + def received = new AtomicReference() + def subscriber = buildClient() + def publisher = buildClient() + when: + + subscriber.connect().join() + publisher.connect().join() + + def subscribeResult = subscriber.subscribeWith() + .topicFilter(topicFilter) + .qos(MqttQos.AT_LEAST_ONCE) + .callback({ publish -> received.set(publish) }) + .send() + .join() + + def publishResult = publisher.publishWith() + .topic(topicFilter) + .qos(MqttQos.AT_LEAST_ONCE) + .payload(publishPayload) + .payloadFormatIndicator(Mqtt5PayloadFormatIndicator.UTF_8) + .send() + .join() + + Thread.sleep(500) + then: + noExceptionThrown() + + subscribeResult != null + subscribeResult.reasonCodes.contains(Mqtt5SubAckReasonCode.GRANTED_QOS_1) + subscribeResult.type == Mqtt5MessageType.SUBACK + + publishResult != null + publishResult.publish.qos == MqttQos.AT_LEAST_ONCE + publishResult.publish.type == Mqtt5MessageType.PUBLISH + publishResult.publish.topic.levels.join("/") == topicFilter + + received.get() != null + received.get().qos == MqttQos.AT_LEAST_ONCE + received.get().type == Mqtt5MessageType.PUBLISH + received.get().topic.levels.join("/") == topicFilter cleanup: subscriber.disconnect() publisher.disconnect() diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy index 628a8dd1..5fac53f2 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/ConnectionTest.groovy @@ -8,7 +8,7 @@ import org.springframework.beans.factory.annotation.Autowired import java.util.concurrent.CompletionException -class ConnectionTest extends MqttBrokerTest { +class ConnectionTest extends IntegrationSpecification { @Autowired InetSocketAddress deviceNetworkAddress diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/MqttBrokerTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy similarity index 96% rename from src/test/groovy/com/ss/mqtt/broker/test/integration/MqttBrokerTest.groovy rename to src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy index 48c74557..d47e4b10 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/MqttBrokerTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/IntegrationSpecification.groovy @@ -10,7 +10,7 @@ import spock.lang.Specification import java.nio.charset.StandardCharsets @ContextConfiguration(classes = MqttBrokerTestConfig) -class MqttBrokerTest extends Specification { +class IntegrationSpecification extends Specification { @Autowired InetSocketAddress deviceNetworkAddress diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy index 69557bb2..7a02232f 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/ClientIdRegistryTest.groovy @@ -2,11 +2,11 @@ package com.ss.mqtt.broker.test.integration.service import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCode import com.ss.mqtt.broker.service.ClientIdRegistry -import com.ss.mqtt.broker.test.integration.MqttBrokerTest +import com.ss.mqtt.broker.test.integration.IntegrationSpecification import com.ss.rlib.common.util.StringUtils import org.springframework.beans.factory.annotation.Autowired -class ClientIdRegistryTest extends MqttBrokerTest { +class ClientIdRegistryTest extends IntegrationSpecification { @Autowired ClientIdRegistry clientIdRegistry diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy index e9a6f9b9..57c9bd1d 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/MqttSessionServiceTest.groovy @@ -4,10 +4,10 @@ import com.hivemq.client.mqtt.mqtt5.message.connect.connack.Mqtt5ConnAckReasonCo import com.ss.mqtt.broker.config.MqttConnectionConfig import com.ss.mqtt.broker.service.ClientIdRegistry import com.ss.mqtt.broker.service.MqttSessionService -import com.ss.mqtt.broker.test.integration.MqttBrokerTest +import com.ss.mqtt.broker.test.integration.IntegrationSpecification import org.springframework.beans.factory.annotation.Autowired -class MqttSessionServiceTest extends MqttBrokerTest { +class MqttSessionServiceTest extends IntegrationSpecification { @Autowired ClientIdRegistry clientIdRegistry diff --git a/src/test/groovy/com/ss/mqtt/broker/test/integration/service/PublishRetryServiceTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/PublishRetryServiceTest.groovy new file mode 100644 index 00000000..c44488f3 --- /dev/null +++ b/src/test/groovy/com/ss/mqtt/broker/test/integration/service/PublishRetryServiceTest.groovy @@ -0,0 +1,105 @@ +package com.ss.mqtt.broker.test.integration.service + +import com.ss.mqtt.broker.model.MqttSession +import com.ss.mqtt.broker.network.client.MqttClient +import com.ss.mqtt.broker.network.packet.in.PublishInPacket +import com.ss.mqtt.broker.service.MqttSessionService +import com.ss.mqtt.broker.service.PublishRetryService +import com.ss.mqtt.broker.test.integration.IntegrationSpecification +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Value + +import java.util.concurrent.atomic.AtomicInteger + +class PublishRetryServiceTest extends IntegrationSpecification { + + @Autowired + PublishRetryService publishRetryService + + @Autowired + MqttSessionService mqttSessionService + + @Value('${publish.pending.check.interval}') + int checkInterval + + @Value('${publish.retry.interval}') + int retryInterval + + def "client should be presented in re-try service after connecting"() { + given: + def client = buildClient() + def config = client.getConfig() + def identifier = config.getClientIdentifier() + when: + client.connect().join() + then: + noExceptionThrown() + publishRetryService.exist(identifier.get().toString()) + when: + client.disconnect().join() + Thread.sleep(500) + then: + noExceptionThrown() + !publishRetryService.exist(identifier.get().toString()) + } + + def "service should check expired and pending messages from time to time"() { + given: + + def session = Mock(MqttSession) + def client = Stub(MqttClient) { + getSession() >> session + } + + when: + publishRetryService.register(client) + Thread.sleep(checkInterval) + then: + 1 * session.removeExpiredPackets() + 1 * session.resendPendingPacketsAsync(client, retryInterval) + cleanup: + publishRetryService.unregister(client) + } + + def "service should remove expired and retry pending messages from time to time"() { + given: + + def session = mqttSessionService.create(UUID.randomUUID().toString()).block() + def retryAttempts = new AtomicInteger() + + def handler = Stub(MqttSession.PendingPacketHandler) { + retryAsync(_,_,_) >> { + retryAttempts.incrementAndGet() + } + } + def client = Stub(MqttClient) { + getSession() >> session + } + + publishRetryService.register(client) + when: + def publish = Stub(PublishInPacket) { + getMessageExpiryInterval() >> 1L + } + + session.registerPendingPublish(publish, handler, 1) + + Thread.sleep(1000 + checkInterval) + then: + retryAttempts.get() == 0 + !session.hasPendingPackets() + when: + publish = Stub(PublishInPacket) { + getMessageExpiryInterval() >> 1000L + } + + session.registerPendingPublish(publish, handler, 2) + + Thread.sleep(retryInterval + checkInterval) + then: + retryAttempts.get() == 1 + session.hasPendingPackets() + cleanup: + publishRetryService.unregister(client) + } +} diff --git a/src/test/groovy/com/ss/mqtt/broker/test/network/BasePacketTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/network/NetworkUnitSpecification.groovy similarity index 86% rename from src/test/groovy/com/ss/mqtt/broker/test/network/BasePacketTest.groovy rename to src/test/groovy/com/ss/mqtt/broker/test/network/NetworkUnitSpecification.groovy index bfec4cbe..c0462dd9 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/network/BasePacketTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/network/NetworkUnitSpecification.groovy @@ -9,6 +9,7 @@ import com.ss.mqtt.broker.model.reason.code.UnsubscribeAckReasonCode import com.ss.mqtt.broker.network.MqttConnection import com.ss.mqtt.broker.network.client.MqttClient +import com.ss.mqtt.broker.test.UnitSpecification import com.ss.rlib.common.util.array.Array import com.ss.rlib.common.util.array.ArrayFactory import com.ss.rlib.common.util.array.IntegerArray @@ -17,7 +18,7 @@ import spock.lang.Specification import java.nio.charset.StandardCharsets -class BasePacketTest extends Specification { +class NetworkUnitSpecification extends UnitSpecification { public static final keepAliveEnabled = true public static final sessionsEnabled = true @@ -96,12 +97,12 @@ class BasePacketTest extends Specification { getConfig() >> mqttConnectionConfig getClient() >> Stub(MqttClient.UnsafeMqttClient) { getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> BasePacketTest.sessionExpiryInterval - getReceiveMax() >> BasePacketTest.receiveMaximum - getMaximumPacketSize() >> BasePacketTest.maximumPacketSize + getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + getReceiveMax() >> NetworkUnitSpecification.receiveMaximum + getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize getClientId() >> clientId getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> BasePacketTest.topicAliasMaximum + getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } } @@ -112,12 +113,12 @@ class BasePacketTest extends Specification { getConfig() >> mqttConnectionConfig getClient() >> Stub(MqttClient.UnsafeMqttClient) { getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> BasePacketTest.sessionExpiryInterval - getReceiveMax() >> BasePacketTest.receiveMaximum - getMaximumPacketSize() >> BasePacketTest.maximumPacketSize + getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + getReceiveMax() >> NetworkUnitSpecification.receiveMaximum + getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize getClientId() >> clientId getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> BasePacketTest.topicAliasMaximum + getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } } } diff --git a/src/test/groovy/com/ss/mqtt/broker/test/network/in/BaseInPacketTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/network/in/BaseInPacketTest.groovy index 78e98c5f..ac35361f 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/network/in/BaseInPacketTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/network/in/BaseInPacketTest.groovy @@ -1,6 +1,6 @@ package com.ss.mqtt.broker.test.network.in -import com.ss.mqtt.broker.test.network.BasePacketTest +import com.ss.mqtt.broker.test.network.NetworkUnitSpecification -class BaseInPacketTest extends BasePacketTest { +class BaseInPacketTest extends NetworkUnitSpecification { } diff --git a/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy index 65e07d41..b40491ab 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/network/in/PublishInPacketTest.groovy @@ -36,7 +36,7 @@ class PublishInPacketTest extends BaseInPacketTest { packet.payload == publishPayload packet.packetId == packetId packet.userProperties == Array.empty() - packet.messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT + packet.messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED packet.topicAlias == MqttPropertyConstants.TOPIC_ALIAS_DEFAULT packet.payloadFormatIndicator == MqttPropertyConstants.PAYLOAD_FORMAT_INDICATOR_DEFAULT } @@ -105,7 +105,7 @@ class PublishInPacketTest extends BaseInPacketTest { packet.payload == publishPayload packet.packetId == packetId packet.userProperties == Array.empty() - packet.messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_DEFAULT + packet.messageExpiryInterval == MqttPropertyConstants.MESSAGE_EXPIRY_INTERVAL_UNDEFINED packet.topicAlias == MqttPropertyConstants.TOPIC_ALIAS_DEFAULT packet.payloadFormatIndicator == MqttPropertyConstants.PAYLOAD_FORMAT_INDICATOR_DEFAULT } diff --git a/src/test/groovy/com/ss/mqtt/broker/test/network/out/BaseOutPacketTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/network/out/BaseOutPacketTest.groovy index 7bd7fcc8..51b50748 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/network/out/BaseOutPacketTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/network/out/BaseOutPacketTest.groovy @@ -1,30 +1,30 @@ package com.ss.mqtt.broker.test.network.out import com.ss.mqtt.broker.network.client.MqttClient -import com.ss.mqtt.broker.test.network.BasePacketTest +import com.ss.mqtt.broker.test.network.NetworkUnitSpecification import spock.lang.Shared -class BaseOutPacketTest extends BasePacketTest { +class BaseOutPacketTest extends NetworkUnitSpecification { @Shared MqttClient mqtt5Client = Stub(MqttClient.UnsafeMqttClient) { getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> BasePacketTest.sessionExpiryInterval - getReceiveMax() >> BasePacketTest.receiveMaximum - getMaximumPacketSize() >> BasePacketTest.maximumPacketSize + getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + getReceiveMax() >> NetworkUnitSpecification.receiveMaximum + getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize getClientId() >> clientId getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> BasePacketTest.topicAliasMaximum + getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } @Shared MqttClient mqtt311Client = Stub(MqttClient.UnsafeMqttClient) { getConnectionConfig() >> mqttConnectionConfig - getSessionExpiryInterval() >> BasePacketTest.sessionExpiryInterval - getReceiveMax() >> BasePacketTest.receiveMaximum - getMaximumPacketSize() >> BasePacketTest.maximumPacketSize + getSessionExpiryInterval() >> NetworkUnitSpecification.sessionExpiryInterval + getReceiveMax() >> NetworkUnitSpecification.receiveMaximum + getMaximumPacketSize() >> NetworkUnitSpecification.maximumPacketSize getClientId() >> clientId getKeepAlive() >> serverKeepAlive - getTopicAliasMaximum() >> BasePacketTest.topicAliasMaximum + getTopicAliasMaximum() >> NetworkUnitSpecification.topicAliasMaximum } } diff --git a/src/test/groovy/com/ss/mqtt/broker/test/util/MqttDataUtilsTest.groovy b/src/test/groovy/com/ss/mqtt/broker/test/util/MqttDataUtilsTest.groovy index 21bff93f..a73b3736 100644 --- a/src/test/groovy/com/ss/mqtt/broker/test/util/MqttDataUtilsTest.groovy +++ b/src/test/groovy/com/ss/mqtt/broker/test/util/MqttDataUtilsTest.groovy @@ -1,11 +1,11 @@ package com.ss.mqtt.broker.test.util +import com.ss.mqtt.broker.test.UnitSpecification import com.ss.mqtt.broker.util.MqttDataUtils -import spock.lang.Specification import java.nio.ByteBuffer -class MqttDataUtilsTest extends Specification { +class MqttDataUtilsTest extends UnitSpecification { def "should write integer to MQTT multi byte integer successful"(int value, int expectedBytes) { given: diff --git a/src/test/resources/application-test.properties b/src/test/resources/application-test.properties index 8e5b4492..cd5527f3 100644 --- a/src/test/resources/application-test.properties +++ b/src/test/resources/application-test.properties @@ -1,2 +1,5 @@ authentication.allow.anonymous=true credentials.source.file.name=credentials-test + +publish.pending.check.interval=200 +publish.retry.interval=1500