diff --git a/application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttSessionServiceTest.groovy b/application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttNetworkSessionServiceTest.groovy similarity index 95% rename from application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttSessionServiceTest.groovy rename to application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttNetworkSessionServiceTest.groovy index cf362778..fe5f654f 100644 --- a/application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttSessionServiceTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/broker/application/service/MqttNetworkSessionServiceTest.groovy @@ -6,7 +6,7 @@ import javasabr.mqtt.service.ClientIdRegistry import javasabr.mqtt.service.session.MqttSessionService import org.springframework.beans.factory.annotation.Autowired -class MqttSessionServiceTest extends IntegrationSpecification { +class MqttNetworkSessionServiceTest extends IntegrationSpecification { @Autowired ClientIdRegistry clientIdRegistry diff --git a/application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscribtionServiceTest.groovy b/application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscriptionServiceTest.groovy similarity index 94% rename from application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscribtionServiceTest.groovy rename to application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscriptionServiceTest.groovy index cd544d96..fe36ef53 100644 --- a/application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscribtionServiceTest.groovy +++ b/application/src/test/groovy/javasabr/mqtt/broker/application/service/SubscriptionServiceTest.groovy @@ -12,7 +12,7 @@ import spock.lang.Unroll import java.util.concurrent.CompletionException -class SubscribtionServiceTest extends IntegrationSpecification { +class SubscriptionServiceTest extends IntegrationSpecification { @Autowired ClientIdRegistry clientIdRegistry @@ -38,11 +38,11 @@ class SubscribtionServiceTest extends IntegrationSpecification { .findSubscribers(topicName) then: "should find the subscriber" subscribers.size() == 1 - subscribers.get(0).owner() instanceof MqttClient + subscribers.get(0).user() instanceof MqttClient when: def matchedSubscriber = subscribers.get(0) def subscription = matchedSubscriber.subscription() - def owner = matchedSubscriber.owner() as MqttClient + def owner = matchedSubscriber.user() as MqttClient then: owner.clientId() == clientId subscription.topicFilter().rawTopic() == topicFilter @@ -61,11 +61,11 @@ class SubscribtionServiceTest extends IntegrationSpecification { .findSubscribers(topicName) then: "should find the reconnected subscriber" subscribers3.size() == 1 - subscribers3.get(0).owner() instanceof MqttClient + subscribers3.get(0).user() instanceof MqttClient when: matchedSubscriber = subscribers3.get(0) subscription = matchedSubscriber.subscription() - owner = matchedSubscriber.owner() as MqttClient + owner = matchedSubscriber.user() as MqttClient then: owner.clientId() == clientId subscription.topicFilter().rawTopic() == topicFilter @@ -147,8 +147,8 @@ class SubscribtionServiceTest extends IntegrationSpecification { def subscribers = subscriptionService.findSubscribers(TopicName.valueOf(topicName)) then: subscribers.size() == targetCount - (subscribers[0].owner() as MqttClient).clientId() == clientId1 - (subscribers[1].owner() as MqttClient).clientId() == clientId2 + (subscribers[0].user() as MqttClient).clientId() == clientId1 + (subscribers[1].user() as MqttClient).clientId() == clientId2 cleanup: subscriber1.disconnect().join() subscriber2.disconnect().join() diff --git a/core-service/src/main/java/javasabr/mqtt/service/SubscriptionService.java b/core-service/src/main/java/javasabr/mqtt/service/SubscriptionService.java index 30be4e03..3286984a 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/SubscriptionService.java +++ b/core-service/src/main/java/javasabr/mqtt/service/SubscriptionService.java @@ -8,7 +8,7 @@ import javasabr.mqtt.model.topic.TopicFilter; import javasabr.mqtt.model.topic.TopicName; import javasabr.mqtt.network.MqttClient; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.rlib.collections.array.Array; import javasabr.rlib.collections.array.MutableArray; @@ -32,7 +32,7 @@ default Array findSubscribers(TopicName topicName) { * @param subscriptions the list of request to subscribe topics * @return array of subscribe ack reason codes */ - Array subscribe(MqttClient client, MqttSession session, Array subscriptions); + Array subscribe(MqttClient client, MqttNetworkSession session, Array subscriptions); /** * Removes MQTT client from listening to the topics. @@ -41,9 +41,9 @@ default Array findSubscribers(TopicName topicName) { * @param topicFilters topic filters * @return array of unsubscribe ack reason codes */ - Array unsubscribe(MqttClient client, MqttSession session, Array topicFilters); + Array unsubscribe(MqttClient client, MqttNetworkSession session, Array topicFilters); - void cleanSubscriptions(MqttClient client, MqttSession session); + void cleanSubscriptions(MqttClient client, MqttNetworkSession session); - void restoreSubscriptions(MqttClient client, MqttSession session); + void restoreSubscriptions(MqttClient client, MqttNetworkSession session); } diff --git a/core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java b/core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java index 393d0f02..992e0e27 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/handler/client/AbstractMqttClientReleaseHandler.java @@ -4,7 +4,7 @@ import javasabr.mqtt.network.MqttClient.UnsafeMqttClient; import javasabr.mqtt.network.handler.MqttClientReleaseHandler; import javasabr.mqtt.network.impl.AbstractMqttClient; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.ClientIdRegistry; import javasabr.mqtt.service.SubscriptionService; import javasabr.mqtt.service.session.MqttSessionService; @@ -43,7 +43,7 @@ protected Mono releaseImpl(T client) { return Mono.empty(); } - MqttSession session = client.session(); + MqttNetworkSession session = client.session(); Mono asyncActions = null; if (session != null) { diff --git a/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultConnectionService.java b/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultConnectionService.java index e1928f56..7ad86fb6 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultConnectionService.java +++ b/core-service/src/main/java/javasabr/mqtt/service/impl/DefaultConnectionService.java @@ -73,7 +73,7 @@ protected void processReceivedValidMessage( "[%s] Received from client valid message:[%s] %s"::formatted); try { - MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageType()]; + MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageTypeId()]; //noinspection DataFlowIssue messageHandler.processValidMessage(connection, mqttInMessage); } catch (IndexOutOfBoundsException | NullPointerException ex) { @@ -97,7 +97,7 @@ protected void processReceivedInvalidMessage( "[%s] Received from client invalid message:[%s] %s"::formatted); try { - MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageType()]; + MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageTypeId()]; //noinspection DataFlowIssue messageHandler.processInvalidMessage(connection, mqttInMessage); } catch (IndexOutOfBoundsException | NullPointerException ex) { diff --git a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java index 54591082..b5398fd6 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java +++ b/core-service/src/main/java/javasabr/mqtt/service/impl/InMemorySubscriptionService.java @@ -15,7 +15,7 @@ import javasabr.mqtt.model.topic.TopicFilter; import javasabr.mqtt.model.topic.TopicName; import javasabr.mqtt.network.MqttClient; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.SubscriptionService; import javasabr.rlib.collections.array.Array; import javasabr.rlib.collections.array.ArrayFactory; @@ -40,7 +40,7 @@ public InMemorySubscriptionService() { @Override public MqttClient resolveClient(Subscriber subscriber) { if (subscriber instanceof SingleSubscriber single) { - return (MqttClient) single.owner(); + return (MqttClient) single.user(); } throw new IllegalArgumentException("Unexpected subscriber: " + subscriber); } @@ -55,7 +55,7 @@ public Array findSubscribersTo(MutableArray @Override public Array subscribe( MqttClient client, - MqttSession session, + MqttNetworkSession session, Array subscriptions) { MutableArray subscribeResults = ArrayFactory.mutableArray( @@ -69,7 +69,7 @@ public Array subscribe( return subscribeResults; } - private SubscribeAckReasonCode addSubscription(MqttClient client, MqttSession session, Subscription subscription) { + private SubscribeAckReasonCode addSubscription(MqttClient client, MqttNetworkSession session, Subscription subscription) { MqttClientConnectionConfig connectionConfig = client.connectionConfig(); TopicFilter topicFilter = subscription.topicFilter(); if (topicFilter.isInvalid()) { @@ -91,7 +91,7 @@ private SubscribeAckReasonCode addSubscription(MqttClient client, MqttSession se @Override public Array unsubscribe( MqttClient client, - MqttSession session, + MqttNetworkSession session, Array topicFilters) { MutableArray unsubscribeResults = ArrayFactory.mutableArray( @@ -105,7 +105,7 @@ public Array unsubscribe( return unsubscribeResults; } - private UnsubscribeAckReasonCode removeSubscription(MqttClient client, MqttSession session, TopicFilter topicFilter) { + private UnsubscribeAckReasonCode removeSubscription(MqttClient client, MqttNetworkSession session, TopicFilter topicFilter) { if (topicFilter.isInvalid()) { return UnsubscribeAckReasonCode.TOPIC_FILTER_INVALID; } else if (subscriberTree.unsubscribe(client, topicFilter)) { @@ -119,7 +119,7 @@ private UnsubscribeAckReasonCode removeSubscription(MqttClient client, MqttSessi } @Override - public void cleanSubscriptions(MqttClient client, MqttSession session) { + public void cleanSubscriptions(MqttClient client, MqttNetworkSession session) { Array subscriptions = session .activeSubscriptions() .subscriptions(); @@ -129,7 +129,7 @@ public void cleanSubscriptions(MqttClient client, MqttSession session) { } @Override - public void restoreSubscriptions(MqttClient client, MqttSession session) { + public void restoreSubscriptions(MqttClient client, MqttNetworkSession session) { Array subscriptions = session .activeSubscriptions() .subscriptions(); diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/AbstractMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/AbstractMqttInMessageHandler.java index bba9fbc0..7e33c717 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/AbstractMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/AbstractMqttInMessageHandler.java @@ -6,7 +6,7 @@ import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.message.in.MqttInMessage; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.network.util.ExtraErrorReasons; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.message.handler.MqttInMessageHandler; @@ -47,7 +47,7 @@ public void processValidMessage(MqttConnection connection, MqttInMessage mqttInM C castedClient = expectedClient.cast(client); M castedMessage = expectedNetworkPacket.cast(mqttInMessage); if (requireSession()) { - MqttSession session = client.session(); + MqttNetworkSession session = client.session(); if (session == null) { log.warning(client.clientId(), "[%s] Session is already closed"::formatted); handleSessionIsAlreadyClosed(client); @@ -72,7 +72,7 @@ public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttI C castedClient = expectedClient.cast(client); M castedMessage = expectedNetworkPacket.cast(mqttInMessage); if (requireSession()) { - MqttSession session = client.session(); + MqttNetworkSession session = client.session(); if (session == null) { log.warning(client.clientId(), "[%s] Session is already closed"::formatted); handleSessionIsAlreadyClosed(client); @@ -86,7 +86,7 @@ public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttI protected void processValidMessage(MqttConnection connection, C client, M message) {} - protected void processValidMessage(MqttConnection connection, C client, MqttSession session, M message) {} + protected void processValidMessage(MqttConnection connection, C client, MqttNetworkSession session, M message) {} protected boolean processInvalidMessage(MqttConnection connection, C client, M message) { Exception exception = message.exception(); @@ -97,7 +97,7 @@ protected boolean processInvalidMessage(MqttConnection connection, C client, M m return false; } - protected boolean processInvalidMessage(MqttConnection connection, C client, MqttSession session, M message) { + protected boolean processInvalidMessage(MqttConnection connection, C client, MqttNetworkSession session, M message) { Exception exception = message.exception(); if (exception instanceof MalformedProtocolMqttException) { malformedProtocolError(connection, client, exception); diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java index 40d17c41..136946a2 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/ConnectInMqttInMessageHandler.java @@ -22,7 +22,7 @@ import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.in.ConnectMqttInMessage; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.AuthenticationService; import javasabr.mqtt.service.ClientIdRegistry; import javasabr.mqtt.service.MessageOutFactoryService; @@ -178,7 +178,7 @@ private void resolveClientConnectionConfig(MqttClient.UnsafeMqttClient client, C private Mono onConnected( MqttClient.UnsafeMqttClient client, ConnectMqttInMessage packet, - MqttSession session, + MqttNetworkSession session, boolean sessionRestored) { MqttConnection connection = client.connection(); @@ -211,7 +211,7 @@ private Mono onConnected( .thenApply(result -> onSentConnAck(client, session, result))); } - private boolean onSentConnAck(MqttClient.UnsafeMqttClient client, MqttSession session, boolean result) { + private boolean onSentConnAck(MqttClient.UnsafeMqttClient client, MqttNetworkSession session, boolean result) { if (!result) { log.warning(client.clientId(), "Was issue with sending conn ack packet to client:[%s]"::formatted); @@ -226,7 +226,7 @@ private boolean onSentConnAck(MqttClient.UnsafeMqttClient client, MqttSession se protected boolean processInvalidMessage( MqttConnection connection, ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, ConnectMqttInMessage message) { Exception exception = message.exception(); if (exception instanceof ConnectionRejectException cre) { diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/DisconnectMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/DisconnectMqttInMessageHandler.java index acbfbadf..32cb6b91 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/DisconnectMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/DisconnectMqttInMessageHandler.java @@ -5,7 +5,7 @@ import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.in.DisconnectMqttInMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import lombok.CustomLog; @@ -25,7 +25,7 @@ public MqttMessageType messageType() { protected void processValidMessage( MqttConnection connection, ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, DisconnectMqttInMessage message) { DisconnectReasonCode reasonCode = message.reasonCode(); if (reasonCode == DisconnectReasonCode.NORMAL_DISCONNECTION) { diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PendingOutResponseMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PendingOutResponseMqttInMessageHandler.java index 43cc8101..0a632138 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PendingOutResponseMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PendingOutResponseMqttInMessageHandler.java @@ -1,13 +1,13 @@ package javasabr.mqtt.service.message.handler.impl; -import javasabr.mqtt.model.TrackableMessage; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.in.MqttInMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; -public abstract class PendingOutResponseMqttInMessageHandler

+public abstract class PendingOutResponseMqttInMessageHandler

extends AbstractMqttInMessageHandler { protected PendingOutResponseMqttInMessageHandler( @@ -20,7 +20,7 @@ protected PendingOutResponseMqttInMessageHandler( protected void processValidMessage( MqttConnection connection, ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, P message) { session.updateOutPendingPacket(client, message); } diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java index 2c510be5..0fed4ff6 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishMqttInMessageHandler.java @@ -15,7 +15,7 @@ import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.in.PublishMqttInMessage; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.PublishReceivingService; import javasabr.mqtt.service.TopicService; @@ -50,7 +50,7 @@ public MqttMessageType messageType() { protected void processValidMessage( MqttConnection connection, ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, PublishMqttInMessage publishMessage) { if (!validateBaseFields(connection, client, publishMessage)) { @@ -224,7 +224,7 @@ private void handleInvalidMessageExpiryInterval(ExternalMqttClient client) { private void handleInvalidTopicName( ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, PublishMqttInMessage publishMessage) { int messagedId = publishMessage.messageId(); MqttOutMessage response = messageOutFactoryService diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishReleaseMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishReleaseMqttInMessageHandler.java index 843e2efb..961779e6 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishReleaseMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/PublishReleaseMqttInMessageHandler.java @@ -8,7 +8,7 @@ import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.in.PublishReleaseMqttInMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import lombok.AccessLevel; import lombok.CustomLog; @@ -32,7 +32,7 @@ public MqttMessageType messageType() { protected void processValidMessage( MqttConnection connection, ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, PublishReleaseMqttInMessage releaseMessage) { int messageId = releaseMessage.messageId(); diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandler.java index 0a43ac66..2b6e4519 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/SubscribeMqttInMessageHandler.java @@ -18,7 +18,7 @@ import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.in.SubscribeMqttInMessage; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.SubscriptionService; import javasabr.mqtt.service.TopicService; @@ -59,7 +59,7 @@ public MqttMessageType messageType() { protected void processValidMessage( MqttConnection connection, ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, SubscribeMqttInMessage subscribeMessage) { MqttClientConnectionConfig connectionConfig = client.connectionConfig(); @@ -145,7 +145,7 @@ private void handleMessageIdIsInUse( private void handleSubscriptionIdNotSupported( ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, SubscribeMqttInMessage subscribeMessage) { Array subscribeResults = Array.repeated( SubscribeAckReasonCode.SUBSCRIPTION_IDENTIFIERS_NOT_SUPPORTED, @@ -162,7 +162,7 @@ private void handleSubscriptionIdNotSupported( private void sendSubscribeResults( ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, SubscribeMqttInMessage subscribeMessage, Array subscribeResults) { int messageId = subscribeMessage.messageId(); diff --git a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/UnsubscribeMqttInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/UnsubscribeMqttInMessageHandler.java index ece47b93..a8d873fb 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/UnsubscribeMqttInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/message/handler/impl/UnsubscribeMqttInMessageHandler.java @@ -8,7 +8,7 @@ import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.in.UnsubscribeMqttInMessage; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.SubscriptionService; import javasabr.mqtt.service.TopicService; @@ -44,7 +44,7 @@ public MqttMessageType messageType() { protected void processValidMessage( MqttConnection connection, ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, UnsubscribeMqttInMessage unsubscribeMessage) { int messageId = unsubscribeMessage.messageId(); diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishInMessageHandler.java index ef2ee5f6..44899c46 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/AbstractMqttPublishInMessageHandler.java @@ -6,7 +6,7 @@ import javasabr.mqtt.model.topic.TopicName; import javasabr.mqtt.network.MqttClient; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.PublishDeliveringService; import javasabr.mqtt.service.SubscriptionService; @@ -37,7 +37,7 @@ public void handle(MqttClient client, Publish publish) { return; } C expectedClient = expectedClientType.cast(client); - MqttSession session = expectedClient.session(); + MqttNetworkSession session = expectedClient.session(); if (session == null) { log.warning(client.clientId(), "[%s] Session is already closed"::formatted); return; @@ -47,11 +47,11 @@ public void handle(MqttClient client, Publish publish) { } } - protected boolean validateImpl(C client, MqttSession session, Publish publish) { + protected boolean validateImpl(C client, MqttNetworkSession session, Publish publish) { return true; } - protected void handleImpl(C client, MqttSession session, Publish publish) { + protected void handleImpl(C client, MqttNetworkSession session, Publish publish) { TopicName topicName = publish.topicName(); Array subscribers = subscriptionService.findSubscribers(topicName); if (subscribers.isEmpty()) { @@ -82,17 +82,17 @@ protected void handleImpl(C client, MqttSession session, Publish publish) { } } - protected void handleNoMatchedSubscribers(C client, MqttSession session, Publish publish) {} + protected void handleNoMatchedSubscribers(C client, MqttNetworkSession session, Publish publish) {} protected void handleSuccess( C client, - MqttSession session, + MqttNetworkSession session, Publish publish, int matchedSubscribers) {} protected void handleError( C client, - MqttSession session, + MqttNetworkSession session, Publish publish, PublishHandlingResult handlingResult) {} @@ -105,7 +105,7 @@ protected PublishHandlingResult checkSubscriber( protected PublishHandlingResult startDelivering( C client, - MqttSession session, + MqttNetworkSession session, Publish publish, SingleSubscriber subscriber) { return publishDeliveringService.startDelivering(publish, subscriber); @@ -117,7 +117,7 @@ protected void sendFeedback(C client, MqttOutMessage response) { protected void sendFeedback( C client, - MqttSession session, + MqttNetworkSession session, MqttOutMessage response, int messageId) { MessageTacker messageTacker = session.inMessageTracker(); diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/PersistedMqttPublishOutMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/PersistedMqttPublishOutMessageHandler.java index 8d72062d..a06692fc 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/PersistedMqttPublishOutMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/PersistedMqttPublishOutMessageHandler.java @@ -1,12 +1,12 @@ package javasabr.mqtt.service.publish.handler.impl; import javasabr.mqtt.model.MqttProperties; -import javasabr.mqtt.model.TrackableMessage; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.publishing.Publish; import javasabr.mqtt.network.MqttClient; import javasabr.mqtt.network.impl.ExternalMqttClient; -import javasabr.mqtt.network.session.MqttSession; -import javasabr.mqtt.network.session.MqttSession.PendingMessageHandler; +import javasabr.mqtt.network.session.MqttNetworkSession; +import javasabr.mqtt.network.session.MqttNetworkSession.PendingMessageHandler; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.SubscriptionService; import javasabr.mqtt.service.publish.handler.PublishHandlingResult; @@ -26,7 +26,7 @@ protected PersistedMqttPublishOutMessageHandler( super(ExternalMqttClient.class, subscriptionService, messageOutFactoryService); this.pendingMessageHandler = new PendingMessageHandler() { @Override - public boolean handleResponse(MqttClient client, TrackableMessage response) { + public boolean handleResponse(MqttClient client, TrackableMqttMessage response) { return handleReceivedResponse(client, response); } @Override @@ -39,13 +39,13 @@ public void resend(MqttClient client, Publish publish) { @Nullable @Override protected Publish reconstruct(MqttClient client, Publish original) { - MqttSession session = client.session(); + MqttNetworkSession session = client.session(); if (session == null) { return null; } return original.with( // generate new uniq packet id per client - session.nextMessageId(), + session.generateMessageId(), qos(), false, MqttProperties.TOPIC_ALIAS_NOT_SET); @@ -54,7 +54,7 @@ protected Publish reconstruct(MqttClient client, Publish original) { @Override protected PublishHandlingResult handleImpl(Publish publish, ExternalMqttClient client) { - MqttSession session = client.session(); + MqttNetworkSession session = client.session(); if (session == null) { return PublishHandlingResult.SKIPPED; } @@ -67,7 +67,7 @@ protected PublishHandlingResult handleImpl(Publish publish, ExternalMqttClient c return PublishHandlingResult.SUCCESS; } - protected boolean handleReceivedResponse(MqttClient client, TrackableMessage response) { + protected boolean handleReceivedResponse(MqttClient client, TrackableMqttMessage response) { return false; } diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishInMessageHandler.java index 89c2e908..c4e343dc 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos0MqttPublishInMessageHandler.java @@ -7,7 +7,7 @@ import javasabr.mqtt.model.reason.code.DisconnectReasonCode; import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.PublishDeliveringService; import javasabr.mqtt.service.SubscriptionService; @@ -27,7 +27,7 @@ public QoS qos() { } @Override - protected boolean validateImpl(ExternalMqttClient client, MqttSession session, Publish publish) { + protected boolean validateImpl(ExternalMqttClient client, MqttNetworkSession session, Publish publish) { int messageId = publish.messageId(); if (messageId != MqttProperties.MESSAGE_ID_IS_NOT_SET) { handleNotExpectedMessageId(client); diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishInMessageHandler.java index 86e1a9d6..57248c72 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishInMessageHandler.java @@ -8,7 +8,7 @@ import javasabr.mqtt.model.session.TrackedMessageMeta; import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.PublishDeliveringService; import javasabr.mqtt.service.SubscriptionService; @@ -34,7 +34,7 @@ public QoS qos() { } @Override - protected boolean validateImpl(ExternalMqttClient client, MqttSession session, Publish publish) { + protected boolean validateImpl(ExternalMqttClient client, MqttNetworkSession session, Publish publish) { if (!super.validateImpl(client, session, publish)) { return false; } @@ -55,7 +55,7 @@ protected boolean validateImpl(ExternalMqttClient client, MqttSession session, P @Override protected void handleNoMatchedSubscribers( ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, Publish publish) { super.handleNoMatchedSubscribers(client, session, publish); int messageId = publish.messageId(); @@ -68,7 +68,7 @@ protected void handleNoMatchedSubscribers( @Override protected void handleSuccess( ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, Publish publish, int matchedSubscribers) { super.handleSuccess(client, session, publish, matchedSubscribers); @@ -82,7 +82,7 @@ protected void handleSuccess( @Override protected void handleError( ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, Publish publish, PublishHandlingResult handlingResult) { super.handleError(client, session, publish, handlingResult); diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java index b0681508..428c36fa 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos1MqttPublishOutMessageHandler.java @@ -1,7 +1,7 @@ package javasabr.mqtt.service.publish.handler.impl; import javasabr.mqtt.model.QoS; -import javasabr.mqtt.model.TrackableMessage; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.network.MqttClient; import javasabr.mqtt.network.message.in.PublishAckMqttInMessage; import javasabr.mqtt.service.MessageOutFactoryService; @@ -21,7 +21,7 @@ public QoS qos() { } @Override - protected boolean handleReceivedResponse(MqttClient client, TrackableMessage response) { + protected boolean handleReceivedResponse(MqttClient client, TrackableMqttMessage response) { if (!(response instanceof PublishAckMqttInMessage)) { throw new IllegalStateException("Unexpected response: " + response); } diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishInMessageHandler.java index b0571897..f092f0ad 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishInMessageHandler.java @@ -2,8 +2,8 @@ import javasabr.mqtt.model.MqttUser; import javasabr.mqtt.model.QoS; -import javasabr.mqtt.model.TrackableMessage; import javasabr.mqtt.model.message.MqttMessageType; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.publishing.Publish; import javasabr.mqtt.model.reason.code.PublishCompletedReasonCode; import javasabr.mqtt.model.reason.code.PublishReceivedReasonCode; @@ -15,7 +15,7 @@ import javasabr.mqtt.network.impl.ExternalMqttClient; import javasabr.mqtt.network.message.in.PublishReleaseMqttInMessage; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.PublishDeliveringService; import javasabr.mqtt.service.SubscriptionService; @@ -44,7 +44,7 @@ public QoS qos() { } @Override - protected boolean validateImpl(ExternalMqttClient client, MqttSession session, Publish publish) { + protected boolean validateImpl(ExternalMqttClient client, MqttNetworkSession session, Publish publish) { if (!super.validateImpl(client, session, publish)) { return false; } @@ -68,7 +68,7 @@ protected boolean validateImpl(ExternalMqttClient client, MqttSession session, P @Override protected void handleNoMatchedSubscribers( ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, Publish publish) { super.handleNoMatchedSubscribers(client, session, publish); var reasonCode = PublishReceivedReasonCode.NO_MATCHING_SUBSCRIBERS; @@ -81,7 +81,7 @@ protected void handleNoMatchedSubscribers( @Override protected void handleSuccess( ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, Publish publish, int matchedSubscribers) { super.handleSuccess(client, session, publish, matchedSubscribers); @@ -92,7 +92,7 @@ protected void handleSuccess( .newPublishReceived(publish.messageId(), PublishReceivedReasonCode.SUCCESS)); } - private void updateSessionState(MqttSession session, Publish publish, PublishReceivedReasonCode reasonCode) { + private void updateSessionState(MqttNetworkSession session, Publish publish, PublishReceivedReasonCode reasonCode) { // store response reason code for duplicated publishes MessageTacker messageTacker = session.inMessageTracker(); messageTacker.update(publish.messageId(), MqttMessageType.PUBLISH, reasonCode); @@ -104,7 +104,7 @@ private void updateSessionState(MqttSession session, Publish publish, PublishRec @Override protected void handleError( ExternalMqttClient client, - MqttSession session, + MqttNetworkSession session, Publish publish, PublishHandlingResult handlingResult) { super.handleError(client, session, publish, handlingResult); @@ -139,9 +139,9 @@ private void handleMessageIdIsInUse(ExternalMqttClient client, int messageId) { .newPublishReceived(messageId, PublishReceivedReasonCode.PACKET_IDENTIFIER_IN_USE)); } - private boolean handleReceivedTrackableMessage(MqttUser user, Object object, TrackableMessage message) { + private boolean handleReceivedTrackableMessage(MqttUser user, Object object, TrackableMqttMessage message) { ExternalMqttClient client = (ExternalMqttClient) user; - MqttSession session = (MqttSession) object; + MqttNetworkSession session = (MqttNetworkSession) object; int messageId = message.messageId(); MessageTacker messageTacker = session.inMessageTracker(); diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java index 1b104bc3..f5c81ae9 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/Qos2MqttPublishOutMessageHandler.java @@ -3,7 +3,7 @@ import static javasabr.mqtt.model.reason.code.PublishReleaseReasonCode.SUCCESS; import javasabr.mqtt.model.QoS; -import javasabr.mqtt.model.TrackableMessage; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.network.MqttClient; import javasabr.mqtt.network.message.in.PublishCompleteMqttInMessage; import javasabr.mqtt.network.message.in.PublishReceivedMqttInMessage; @@ -24,7 +24,7 @@ public QoS qos() { } @Override - protected boolean handleReceivedResponse(MqttClient client, TrackableMessage response) { + protected boolean handleReceivedResponse(MqttClient client, TrackableMqttMessage response) { if (response instanceof PublishReceivedMqttInMessage) { client.send(messageOutFactoryService .resolveFactory(client) diff --git a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishInMessageHandler.java b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishInMessageHandler.java index 170db476..33dc935e 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishInMessageHandler.java +++ b/core-service/src/main/java/javasabr/mqtt/service/publish/handler/impl/TrackableMqttPublishInMessageHandler.java @@ -8,7 +8,7 @@ import javasabr.mqtt.model.session.MessageTacker; import javasabr.mqtt.network.MqttClient; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.mqtt.service.MessageOutFactoryService; import javasabr.mqtt.service.PublishDeliveringService; import javasabr.mqtt.service.SubscriptionService; @@ -25,7 +25,7 @@ public TrackableMqttPublishInMessageHandler( } @Override - protected boolean validateImpl(C client, MqttSession session, Publish publish) { + protected boolean validateImpl(C client, MqttNetworkSession session, Publish publish) { int messagedId = publish.messageId(); if (messagedId == MqttProperties.MESSAGE_ID_IS_NOT_SET) { handleMissedMessageId(client); @@ -35,7 +35,7 @@ protected boolean validateImpl(C client, MqttSession session, Publish publish) { } @Override - protected void handleImpl(C client, MqttSession session, Publish publish) { + protected void handleImpl(C client, MqttNetworkSession session, Publish publish) { MessageTacker messageTacker = session.inMessageTracker(); messageTacker.add(publish.messageId(), MqttMessageType.PUBLISH); super.handleImpl(client, session, publish); diff --git a/core-service/src/main/java/javasabr/mqtt/service/session/MqttSessionService.java b/core-service/src/main/java/javasabr/mqtt/service/session/MqttSessionService.java index 2cf0092f..394f2d8d 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/session/MqttSessionService.java +++ b/core-service/src/main/java/javasabr/mqtt/service/session/MqttSessionService.java @@ -1,13 +1,13 @@ package javasabr.mqtt.service.session; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import reactor.core.publisher.Mono; public interface MqttSessionService { - Mono restore(String clientId); + Mono restore(String clientId); - Mono create(String clientId); + Mono create(String clientId); - Mono store(String clientId, MqttSession session, long expiryInterval); + Mono store(String clientId, MqttNetworkSession session, long expiryInterval); } diff --git a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSession.java b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttNetworkSession.java similarity index 92% rename from core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSession.java rename to core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttNetworkSession.java index 8206cfff..b66d2d88 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSession.java +++ b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttNetworkSession.java @@ -3,14 +3,14 @@ import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; import javasabr.mqtt.model.MqttProperties; -import javasabr.mqtt.model.TrackableMessage; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.publishing.Publish; import javasabr.mqtt.model.session.ActiveSubscriptions; import javasabr.mqtt.model.session.MessageTacker; import javasabr.mqtt.model.session.ProcessingPublishes; import javasabr.mqtt.model.session.TopicNameMapping; import javasabr.mqtt.network.MqttClient; -import javasabr.mqtt.network.session.MqttSession.UnsafeMqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession.UnsafeMqttNetworkSession; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.LockableArray; import lombok.AccessLevel; @@ -27,7 +27,7 @@ @EqualsAndHashCode(of = "clientId") @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PRIVATE) -public class InMemoryMqttSession implements UnsafeMqttSession { +public class InMemoryMqttNetworkSession implements UnsafeMqttNetworkSession { private record PendingPublish(Publish publish, PendingMessageHandler handler) {} @@ -43,7 +43,7 @@ private static void registerPublish( private static void updatePendingPacket( MqttClient client, - TrackableMessage response, + TrackableMqttMessage response, LockableArray pendingPublishes, String clientId) { @@ -93,7 +93,7 @@ private static void updatePendingPacket( @Setter volatile long expirationTime = -1; - public InMemoryMqttSession(String clientId) { + public InMemoryMqttNetworkSession(String clientId) { this.clientId = clientId; this.pendingOutPublishes = ArrayFactory.stampedLockBasedArray(PendingPublish.class); this.messageIdGenerator = new AtomicInteger(0); @@ -106,13 +106,13 @@ public InMemoryMqttSession(String clientId) { } @Override - public int nextMessageId() { + public int generateMessageId() { int nextId = messageIdGenerator.incrementAndGet(); if (nextId >= MqttProperties.MAXIMUM_PACKET_ID) { messageIdGenerator.compareAndSet(nextId, 0); - return nextMessageId(); + return generateMessageId(); } return nextId; @@ -160,7 +160,7 @@ public void resendPendingPackets(MqttClient client) { } @Override - public void updateOutPendingPacket(MqttClient client, TrackableMessage response) { + public void updateOutPendingPacket(MqttClient client, TrackableMqttMessage response) { updatePendingPacket(client, response, pendingOutPublishes, clientId); } diff --git a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java index 0517c751..359020d9 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java +++ b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryMqttSessionService.java @@ -1,8 +1,8 @@ package javasabr.mqtt.service.session.impl; import java.io.Closeable; -import javasabr.mqtt.network.session.MqttSession; -import javasabr.mqtt.network.session.MqttSession.UnsafeMqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; +import javasabr.mqtt.network.session.MqttNetworkSession.UnsafeMqttNetworkSession; import javasabr.mqtt.service.session.MqttSessionService; import javasabr.rlib.collections.array.ArrayFactory; import javasabr.rlib.collections.array.MutableArray; @@ -20,7 +20,7 @@ @FieldDefaults(level = AccessLevel.PRIVATE) public class InMemoryMqttSessionService implements MqttSessionService, Closeable { - final LockableRefToRefDictionary storedSession; + final LockableRefToRefDictionary storedSession; final Thread cleanThread; final int cleanIntervalInMs; @@ -36,9 +36,9 @@ public InMemoryMqttSessionService(int cleanIntervalInMs) { } @Override - public Mono restore(String clientId) { + public Mono restore(String clientId) { - UnsafeMqttSession session = storedSession + UnsafeMqttNetworkSession session = storedSession .operations() .getInWriteLock(clientId, MutableRefToRefDictionary::remove); @@ -53,9 +53,9 @@ public Mono restore(String clientId) { } @Override - public Mono create(String clientId) { + public Mono create(String clientId) { - UnsafeMqttSession session = storedSession + UnsafeMqttNetworkSession session = storedSession .operations() .getInWriteLock(clientId, MutableRefToRefDictionary::remove); @@ -65,13 +65,13 @@ public Mono create(String clientId) { log.debug(clientId, "Created new session for client:[%s]"::formatted); - return Mono.just(new InMemoryMqttSession(clientId)); + return Mono.just(new InMemoryMqttNetworkSession(clientId)); } @Override - public Mono store(String clientId, MqttSession session, long expiryInterval) { + public Mono store(String clientId, MqttNetworkSession session, long expiryInterval) { - var unsafe = (UnsafeMqttSession) session; + var unsafe = (UnsafeMqttNetworkSession) session; unsafe.expirationTime(System.currentTimeMillis() + (expiryInterval * 1000)); unsafe.onPersisted(); @@ -86,8 +86,8 @@ public Mono store(String clientId, MqttSession session, long expiryInte private void cleanup() { - var toCheck = ArrayFactory.mutableArray(UnsafeMqttSession.class); - var toRemove = ArrayFactory.mutableArray(UnsafeMqttSession.class); + var toCheck = ArrayFactory.mutableArray(UnsafeMqttNetworkSession.class); + var toRemove = ArrayFactory.mutableArray(UnsafeMqttNetworkSession.class); while (!closed) { ThreadUtils.sleep(cleanIntervalInMs); @@ -110,15 +110,15 @@ private void cleanup() { } private static void removeExpiredSessions( - LockableRefToRefDictionary sessions, - MutableArray expired) { + LockableRefToRefDictionary sessions, + MutableArray expired) { long time = System.currentTimeMillis(); - for (UnsafeMqttSession session : expired) { + for (UnsafeMqttNetworkSession session : expired) { if (session.expirationTime() <= time) { continue; } - UnsafeMqttSession removed = sessions.remove(session.clientId()); + UnsafeMqttNetworkSession removed = sessions.remove(session.clientId()); log.debug(session.clientId(), "Removed expired session for client:[%]"::formatted); // if we already have new session under the same client id @@ -130,11 +130,11 @@ private static void removeExpiredSessions( } } - private boolean findToRemove(MutableArray toCheck, MutableArray toRemove) { + private boolean findToRemove(MutableArray toCheck, MutableArray toRemove) { var currentTime = System.currentTimeMillis(); - for (UnsafeMqttSession session : toCheck) { + for (UnsafeMqttNetworkSession session : toCheck) { if (session.expirationTime() > currentTime) { toRemove.add(session); } diff --git a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryProcessingPublishes.java b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryProcessingPublishes.java index 8c8b414c..37ee69b7 100644 --- a/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryProcessingPublishes.java +++ b/core-service/src/main/java/javasabr/mqtt/service/session/impl/InMemoryProcessingPublishes.java @@ -2,12 +2,12 @@ import java.util.concurrent.locks.StampedLock; import javasabr.mqtt.model.MqttUser; -import javasabr.mqtt.model.TrackableMessage; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.publishing.Publish; import javasabr.mqtt.model.session.ProcessingPublishes; import javasabr.mqtt.model.session.PublishRetryer; import javasabr.mqtt.model.session.TrackableMessageCallback; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import javasabr.rlib.collections.dictionary.DictionaryFactory; import javasabr.rlib.collections.dictionary.MutableIntToRefDictionary; import lombok.AccessLevel; @@ -18,11 +18,11 @@ public class InMemoryProcessingPublishes implements ProcessingPublishes { record InProcessPublish(Publish publish, TrackableMessageCallback callback, PublishRetryer retryer) {} - MqttSession session; + MqttNetworkSession session; MutableIntToRefDictionary processing; StampedLock lock; - public InMemoryProcessingPublishes(MqttSession session) { + public InMemoryProcessingPublishes(MqttNetworkSession session) { this.session = session; this.processing = DictionaryFactory.mutableIntToRefDictionary(); this.lock = new StampedLock(); @@ -43,7 +43,7 @@ public void register(Publish publish, TrackableMessageCallback callback, Publish } @Override - public boolean apply(MqttUser user, TrackableMessage message) { + public boolean apply(MqttUser user, TrackableMqttMessage message) { long stamp = lock.writeLock(); try { InProcessPublish inProcessPublish = processing.get(message.messageId()); @@ -61,7 +61,7 @@ public boolean apply(MqttUser user, TrackableMessage message) { } @Override - public boolean remove(TrackableMessage message) { + public boolean remove(TrackableMqttMessage message) { long stamp = lock.writeLock(); try { InProcessPublish inProcessPublish = processing.remove(message.messageId()); diff --git a/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/UnsubscribeMqttInMessageHandlerTest.groovy b/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/UnsubscribeMqttInMessageHandlerTest.groovy index 4e4925d0..33a37300 100644 --- a/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/UnsubscribeMqttInMessageHandlerTest.groovy +++ b/core-service/src/test/groovy/javasabr/mqtt/service/message/handler/impl/UnsubscribeMqttInMessageHandlerTest.groovy @@ -113,7 +113,7 @@ class UnsubscribeMqttInMessageHandlerTest extends IntegrationServiceSpecificatio then: subscribers1.isEmpty() subscribers2.isEmpty() - subscribers3.size() == 1 && subscribers3.first().owner() == mqttClient + subscribers3.size() == 1 && subscribers3.first().user() == mqttClient } def "should close connection by reason MQTT protocol error"() { diff --git a/model/src/main/java/javasabr/mqtt/model/MqttUser.java b/model/src/main/java/javasabr/mqtt/model/MqttUser.java index f7f686ed..7c7913d3 100644 --- a/model/src/main/java/javasabr/mqtt/model/MqttUser.java +++ b/model/src/main/java/javasabr/mqtt/model/MqttUser.java @@ -1,3 +1,17 @@ package javasabr.mqtt.model; -public interface MqttUser {} +import javasabr.mqtt.model.session.MqttSession; +import org.jspecify.annotations.Nullable; + +public interface MqttUser { + + String clientId(); + + @Nullable + String userName(); + + String ipAddress(); + + @Nullable + MqttSession session(); +} diff --git a/model/src/main/java/javasabr/mqtt/model/TrackableMessage.java b/model/src/main/java/javasabr/mqtt/model/TrackableMessage.java deleted file mode 100644 index d406861e..00000000 --- a/model/src/main/java/javasabr/mqtt/model/TrackableMessage.java +++ /dev/null @@ -1,6 +0,0 @@ -package javasabr.mqtt.model; - -public interface TrackableMessage { - - int messageId(); -} diff --git a/model/src/main/java/javasabr/mqtt/model/message/MqttMessage.java b/model/src/main/java/javasabr/mqtt/model/message/MqttMessage.java new file mode 100644 index 00000000..30c217da --- /dev/null +++ b/model/src/main/java/javasabr/mqtt/model/message/MqttMessage.java @@ -0,0 +1,11 @@ +package javasabr.mqtt.model.message; + +import javasabr.mqtt.model.data.type.StringPair; +import javasabr.rlib.collections.array.Array; + +public interface MqttMessage { + + Array EMPTY_USER_PROPERTIES = Array.empty(StringPair.class); + + MqttMessageType messageType(); +} diff --git a/model/src/main/java/javasabr/mqtt/model/message/TrackableMqttMessage.java b/model/src/main/java/javasabr/mqtt/model/message/TrackableMqttMessage.java new file mode 100644 index 00000000..74456694 --- /dev/null +++ b/model/src/main/java/javasabr/mqtt/model/message/TrackableMqttMessage.java @@ -0,0 +1,6 @@ +package javasabr.mqtt.model.message; + +public interface TrackableMqttMessage extends MqttMessage { + + int messageId(); +} diff --git a/model/src/main/java/javasabr/mqtt/model/publishing/Publish.java b/model/src/main/java/javasabr/mqtt/model/publishing/Publish.java index 33412d4d..56e30ebb 100644 --- a/model/src/main/java/javasabr/mqtt/model/publishing/Publish.java +++ b/model/src/main/java/javasabr/mqtt/model/publishing/Publish.java @@ -4,8 +4,8 @@ import javasabr.mqtt.model.MqttProperties; import javasabr.mqtt.model.PayloadFormat; import javasabr.mqtt.model.QoS; -import javasabr.mqtt.model.TrackableMessage; import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.message.MqttMessage; import javasabr.mqtt.model.topic.TopicName; import javasabr.rlib.collections.array.Array; import javasabr.rlib.collections.array.IntArray; @@ -25,9 +25,7 @@ public record Publish( long messageExpiryInterval, int topicAlias, PayloadFormat payloadFormat, - Array userProperties) implements TrackableMessage { - - private static final Array EMPTY_USER_PROPERTIES = Array.empty(StringPair.class); + Array userProperties) { static { DebugUtils.registerIncludedFields("topicName", "messageId", "qos", "topicAlias", "payloadFormat"); @@ -47,8 +45,8 @@ public static Publish minimal(int messageId, QoS qos, TopicName topicName, byte[ null, MqttProperties.MESSAGE_EXPIRY_INTERVAL_IS_NOT_SET, MqttProperties.TOPIC_ALIAS_NOT_SET, - PayloadFormat.BINARY, - EMPTY_USER_PROPERTIES); + PayloadFormat.BINARY, + MqttMessage.EMPTY_USER_PROPERTIES); } public static Publish minimal(QoS qos, TopicName topicName, byte[] payload) { diff --git a/model/src/main/java/javasabr/mqtt/model/session/MqttSession.java b/model/src/main/java/javasabr/mqtt/model/session/MqttSession.java new file mode 100644 index 00000000..273428d6 --- /dev/null +++ b/model/src/main/java/javasabr/mqtt/model/session/MqttSession.java @@ -0,0 +1,23 @@ +package javasabr.mqtt.model.session; + +public interface MqttSession { + + String clientId(); + + int generateMessageId(); + + /** + * @return the expiration time in ms or -1 if it should not be expired now. + */ + long expirationTime(); + + MessageTacker inMessageTracker(); + MessageTacker outMessageTracker(); + + ProcessingPublishes inProcessingPublishes(); + ProcessingPublishes outProcessingPublishes(); + + ActiveSubscriptions activeSubscriptions(); + TopicNameMapping topicNameMapping(); + +} diff --git a/model/src/main/java/javasabr/mqtt/model/session/ProcessingPublishes.java b/model/src/main/java/javasabr/mqtt/model/session/ProcessingPublishes.java index ff0b0ebb..65fb722f 100644 --- a/model/src/main/java/javasabr/mqtt/model/session/ProcessingPublishes.java +++ b/model/src/main/java/javasabr/mqtt/model/session/ProcessingPublishes.java @@ -1,7 +1,7 @@ package javasabr.mqtt.model.session; import javasabr.mqtt.model.MqttUser; -import javasabr.mqtt.model.TrackableMessage; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.publishing.Publish; public interface ProcessingPublishes { @@ -11,10 +11,10 @@ public interface ProcessingPublishes { /** * @return true if was found some callback for this message */ - boolean apply(MqttUser user, TrackableMessage message); + boolean apply(MqttUser user, TrackableMqttMessage message); /** * @return true if was found some callback for this message */ - boolean remove(TrackableMessage message); + boolean remove(TrackableMqttMessage message); } diff --git a/model/src/main/java/javasabr/mqtt/model/session/TrackableMessageCallback.java b/model/src/main/java/javasabr/mqtt/model/session/TrackableMessageCallback.java index 61cbb946..cb2d9861 100644 --- a/model/src/main/java/javasabr/mqtt/model/session/TrackableMessageCallback.java +++ b/model/src/main/java/javasabr/mqtt/model/session/TrackableMessageCallback.java @@ -1,12 +1,12 @@ package javasabr.mqtt.model.session; import javasabr.mqtt.model.MqttUser; -import javasabr.mqtt.model.TrackableMessage; +import javasabr.mqtt.model.message.TrackableMqttMessage; public interface TrackableMessageCallback { /** * @return true if this handler should be de-register */ - boolean accept(MqttUser owner, Object session, TrackableMessage message); + boolean accept(MqttUser owner, Object session, TrackableMqttMessage message); } diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/SharedSubscriber.java b/model/src/main/java/javasabr/mqtt/model/subscriber/SharedSubscriber.java index dd5a4943..9380d655 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/SharedSubscriber.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/SharedSubscriber.java @@ -2,7 +2,7 @@ import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; -import javasabr.mqtt.model.subscription.SubscriptionOwner; +import javasabr.mqtt.model.MqttUser; import javasabr.mqtt.model.topic.SharedTopicFilter; import javasabr.rlib.collections.array.Array; import javasabr.rlib.collections.array.ArrayFactory; @@ -41,13 +41,13 @@ public void addSubscriber(SingleSubscriber subscriber) { .inWriteLock(subscriber, Collection::add); } - public boolean removeSubscriberWithOwner(SubscriptionOwner owner) { + public boolean removeSubscriberWithUser(MqttUser user) { if (subscribers.isEmpty()) { return false; } long stamp = subscribers.writeLock(); try { - int index = subscribers.indexOf(SingleSubscriber::owner, owner); + int index = subscribers.indexOf(SingleSubscriber::user, user); if (index >= 0) { subscribers.remove(index); return true; diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/SingleSubscriber.java b/model/src/main/java/javasabr/mqtt/model/subscriber/SingleSubscriber.java index 2bada235..d4320e71 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/SingleSubscriber.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/SingleSubscriber.java @@ -1,11 +1,11 @@ package javasabr.mqtt.model.subscriber; import com.fasterxml.jackson.annotation.JsonValue; +import javasabr.mqtt.model.MqttUser; import javasabr.mqtt.model.QoS; import javasabr.mqtt.model.subscription.Subscription; -import javasabr.mqtt.model.subscription.SubscriptionOwner; -public record SingleSubscriber(SubscriptionOwner owner, Subscription subscription) implements Subscriber { +public record SingleSubscriber(MqttUser user, Subscription subscription) implements Subscriber { @Override public SingleSubscriber resolveSingle() { @@ -19,6 +19,6 @@ public QoS qos() { @JsonValue @Override public String toString() { - return "[" + owner + "]->[" + subscription.topicFilter().rawTopic() + "|" + subscription.qos().level() + "]"; + return "[" + user + "]->[" + subscription.topicFilter().rawTopic() + "|" + subscription.qos().level() + "]"; } } diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/Subscriber.java b/model/src/main/java/javasabr/mqtt/model/subscriber/Subscriber.java index 54dd458b..744b393d 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/Subscriber.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/Subscriber.java @@ -1,14 +1,14 @@ package javasabr.mqtt.model.subscriber; -import javasabr.mqtt.model.subscription.SubscriptionOwner; +import javasabr.mqtt.model.MqttUser; public sealed interface Subscriber permits SingleSubscriber, SharedSubscriber { /** * Resolves the owner of a subscription to send a publishing. */ - default SubscriptionOwner resolveOwner() { - return resolveSingle().owner(); + default MqttUser resolveUser() { + return resolveSingle().user(); } /** diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java index ac37126f..307db58c 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/ConcurrentSubscriberTree.java @@ -1,8 +1,8 @@ package javasabr.mqtt.model.subscriber.tree; +import javasabr.mqtt.model.MqttUser; import javasabr.mqtt.model.subscriber.SingleSubscriber; import javasabr.mqtt.model.subscription.Subscription; -import javasabr.mqtt.model.subscription.SubscriptionOwner; import javasabr.mqtt.model.topic.TopicFilter; import javasabr.mqtt.model.topic.TopicName; import javasabr.rlib.collections.array.Array; @@ -22,12 +22,12 @@ public ConcurrentSubscriberTree() { } @Nullable - public SingleSubscriber subscribe(SubscriptionOwner owner, Subscription subscription) { - return rootNode.subscribe(0, owner, subscription, subscription.topicFilter()); + public SingleSubscriber subscribe(MqttUser user, Subscription subscription) { + return rootNode.subscribe(0, user, subscription, subscription.topicFilter()); } - public boolean unsubscribe(SubscriptionOwner owner, TopicFilter topicFilter) { - return rootNode.unsubscribe(0, owner, topicFilter); + public boolean unsubscribe(MqttUser user, TopicFilter topicFilter) { + return rootNode.unsubscribe(0, user, topicFilter); } public Array matches(TopicName topicName) { diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java index cb6ab285..4a6579c2 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberNode.java @@ -2,10 +2,10 @@ import java.util.function.Supplier; import javasabr.mqtt.base.util.DebugUtils; +import javasabr.mqtt.model.MqttUser; import javasabr.mqtt.model.subscriber.SingleSubscriber; import javasabr.mqtt.model.subscriber.Subscriber; import javasabr.mqtt.model.subscription.Subscription; -import javasabr.mqtt.model.subscription.SubscriptionOwner; import javasabr.mqtt.model.topic.TopicFilter; import javasabr.mqtt.model.topic.TopicName; import javasabr.rlib.collections.array.ArrayFactory; @@ -39,7 +39,7 @@ class SubscriberNode extends SubscriberTreeBase { * @return the previous subscription from the same owner */ @Nullable - public SingleSubscriber subscribe(int level, SubscriptionOwner owner, Subscription subscription, TopicFilter topicFilter) { + public SingleSubscriber subscribe(int level, MqttUser owner, Subscription subscription, TopicFilter topicFilter) { if (level == topicFilter.levelsCount()) { return addSubscriber(getOrCreateSubscribers(), owner, subscription, topicFilter); } @@ -47,7 +47,7 @@ public SingleSubscriber subscribe(int level, SubscriptionOwner owner, Subscripti return childNode.subscribe(level + 1, owner, subscription, topicFilter); } - public boolean unsubscribe(int level, SubscriptionOwner owner, TopicFilter topicFilter) { + public boolean unsubscribe(int level, MqttUser owner, TopicFilter topicFilter) { if (level == topicFilter.levelsCount()) { return removeSubscriber(subscribers(), owner, topicFilter); } @@ -114,7 +114,6 @@ private SubscriberNode getOrCreateChildNode(String segment) { } stamp = childNodes.writeLock(); try { - //noinspection DataFlowIssue return childNodes.getOrCompute(segment, SUBSCRIBER_NODE_FACTORY); } finally { childNodes.writeUnlock(stamp); diff --git a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java index 5553f8fe..972b696b 100644 --- a/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java +++ b/model/src/main/java/javasabr/mqtt/model/subscriber/tree/SubscriberTreeBase.java @@ -1,12 +1,12 @@ package javasabr.mqtt.model.subscriber.tree; import java.util.Objects; +import javasabr.mqtt.model.MqttUser; import javasabr.mqtt.model.QoS; import javasabr.mqtt.model.subscriber.SharedSubscriber; import javasabr.mqtt.model.subscriber.SingleSubscriber; import javasabr.mqtt.model.subscriber.Subscriber; import javasabr.mqtt.model.subscription.Subscription; -import javasabr.mqtt.model.subscription.SubscriptionOwner; import javasabr.mqtt.model.topic.SharedTopicFilter; import javasabr.mqtt.model.topic.TopicFilter; import javasabr.rlib.collections.array.LockableArray; @@ -21,22 +21,22 @@ abstract class SubscriberTreeBase { /** - * @return previous subscriber with the same owner + * @return previous subscriber with the same user */ @Nullable protected static SingleSubscriber addSubscriber( LockableArray subscribers, - SubscriptionOwner owner, + MqttUser user, Subscription subscription, TopicFilter topicFilter) { long stamp = subscribers.writeLock(); try { if (topicFilter instanceof SharedTopicFilter stf) { - addSharedSubscriber(subscribers, owner, subscription, stf); + addSharedSubscriber(subscribers, user, subscription, stf); return null; } else { - SingleSubscriber previous = removePreviousIfExist(subscribers, owner); - subscribers.add(new SingleSubscriber(owner, subscription)); + SingleSubscriber previous = removePreviousIfExist(subscribers, user); + subscribers.add(new SingleSubscriber(user, subscription)); return previous; } } finally { @@ -47,8 +47,8 @@ protected static SingleSubscriber addSubscriber( @Nullable private static SingleSubscriber removePreviousIfExist( LockableArray subscribers, - SubscriptionOwner owner) { - int index = subscribers.indexOf(Subscriber::resolveOwner, owner); + MqttUser user) { + int index = subscribers.indexOf(Subscriber::resolveUser, user); if (index < 0) { return null; } @@ -59,7 +59,7 @@ private static SingleSubscriber removePreviousIfExist( private static void addSharedSubscriber( LockableArray subscribers, - SubscriptionOwner owner, + MqttUser user, Subscription subscription, SharedTopicFilter sharedTopicFilter) { @@ -73,7 +73,7 @@ private static void addSharedSubscriber( subscribers.add(sharedSubscriber); } - sharedSubscriber.addSubscriber(new SingleSubscriber(owner, subscription)); + sharedSubscriber.addSubscriber(new SingleSubscriber(user, subscription)); } protected static void appendSubscribersTo(MutableArray result, SubscriberNode subscriberNode) { @@ -96,7 +96,7 @@ protected static void appendSubscribersTo(MutableArray result, protected static boolean removeSubscriber( @Nullable LockableArray subscribers, - SubscriptionOwner owner, + MqttUser user, TopicFilter topicFilter) { if (subscribers == null) { return false; @@ -104,9 +104,9 @@ protected static boolean removeSubscriber( long stamp = subscribers.writeLock(); try { if (topicFilter instanceof SharedTopicFilter stf) { - return removeSharedSubscriber(subscribers, owner, stf); + return removeSharedSubscriber(subscribers, user, stf); } else { - int index = subscribers.indexOf(Subscriber::resolveOwner, owner); + int index = subscribers.indexOf(Subscriber::resolveUser, user); if (index >= 0) { subscribers.remove(index); return true; @@ -120,14 +120,14 @@ protected static boolean removeSubscriber( private static boolean removeSharedSubscriber( LockableArray subscribers, - SubscriptionOwner owner, + MqttUser user, SharedTopicFilter sharedTopicFilter) { String group = sharedTopicFilter.shareName(); SharedSubscriber sharedSubscriber = (SharedSubscriber) subscribers .iterations() .findAny(group, SubscriberTreeBase::isSharedSubscriberWithGroup); if (sharedSubscriber != null) { - boolean removed = sharedSubscriber.removeSubscriberWithOwner(owner); + boolean removed = sharedSubscriber.removeSubscriberWithUser(user); if (sharedSubscriber.isEmpty()) { // if it was last member subscribers.remove(sharedSubscriber); @@ -144,7 +144,7 @@ private static boolean isSharedSubscriberWithGroup(Subscriber subscriber, String private static boolean removeDuplicateWithLowerQoS( MutableArray result, SingleSubscriber candidate) { - int found = result.indexOf(SingleSubscriber::owner, candidate.owner()); + int found = result.indexOf(SingleSubscriber::user, candidate.user()); if (found == -1) { return true; } diff --git a/model/src/main/java/javasabr/mqtt/model/subscription/SubscriptionOwner.java b/model/src/main/java/javasabr/mqtt/model/subscription/SubscriptionOwner.java deleted file mode 100644 index 813d1a58..00000000 --- a/model/src/main/java/javasabr/mqtt/model/subscription/SubscriptionOwner.java +++ /dev/null @@ -1,3 +0,0 @@ -package javasabr.mqtt.model.subscription; - -public interface SubscriptionOwner {} diff --git a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy index 3bfa3bd5..cfb17623 100644 --- a/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy +++ b/model/src/test/groovy/javasabr/mqtt/model/topic/tree/SubscriberTreeTest.groovy @@ -1,13 +1,13 @@ package javasabr.mqtt.model.topic.tree import javasabr.mqtt.model.MqttProperties +import javasabr.mqtt.model.MqttUser import javasabr.mqtt.model.QoS import javasabr.mqtt.model.SubscribeRetainHandling import javasabr.mqtt.model.subscriber.SingleSubscriber import javasabr.mqtt.model.subscriber.tree.ConcurrentSubscriberTree import javasabr.mqtt.model.subscription.Subscription -import javasabr.mqtt.model.subscription.SubscriptionOwner -import javasabr.mqtt.model.subscription.TestSubscriptionOwner +import javasabr.mqtt.model.subscription.TestMqttUser import javasabr.mqtt.model.topic.SharedTopicFilter import javasabr.mqtt.model.topic.TopicFilter import javasabr.mqtt.model.topic.TopicName @@ -17,19 +17,19 @@ class SubscriberTreeTest extends UnitSpecification { def "should match simple topic correctly"( List subscriptions, - List owners, + List users, String topicName, - List expectedOwners) { + List expectedUsers) { given: ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriptions.eachWithIndex { Subscription subscription, int i -> - subscriberTree.subscribe(owners.get(i), subscription) + subscriberTree.subscribe(users.get(i), subscription) } when: def found = subscriberTree.matches(TopicName.valueOf(topicName)) - .collect { it.resolveOwner() } + .collect { it.resolveUser() } then: - found ==~ expectedOwners + found ==~ expectedUsers where: topicName << [ "/topic/segment1", @@ -62,62 +62,62 @@ class SubscriberTreeTest extends UnitSpecification { makeSubscription("/topic/segment3") ] ] - owners << [ + users << [ [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5") + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5") ], [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5"), - makeOwner("id6"), - makeOwner("id7") + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7") ], [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id3"), - makeOwner("id3"), - makeOwner("id4") + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id3"), + makeUser("id3"), + makeUser("id4") ] ] - expectedOwners << [ + expectedUsers << [ [ - makeOwner("id1") + makeUser("id1") ], [ - makeOwner("id2"), - makeOwner("id5") + makeUser("id2"), + makeUser("id5") ], [ - makeOwner("id3"), - makeOwner("id4") + makeUser("id3"), + makeUser("id4") ] ] } def "should match single wildcard topic correctly"( List subscriptions, - List owners, + List users, String topicName, - List expectedOwners) { + List expectedUsers) { given: ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriptions.eachWithIndex { Subscription subscription, int i -> - subscriberTree.subscribe(owners.get(i), subscription) + subscriberTree.subscribe(users.get(i), subscription) } when: def found = subscriberTree.matches(TopicName.valueOf(topicName)) - .collect { it.resolveOwner() } + .collect { it.resolveUser() } then: - found ==~ expectedOwners + found ==~ expectedUsers where: topicName << [ "/topic/segment1", @@ -156,73 +156,73 @@ class SubscriberTreeTest extends UnitSpecification { makeSubscription("/topic2/+") ] ] - owners << [ - [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5"), - makeOwner("id6"), - makeOwner("id7"), - makeOwner("id8") + users << [ + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8") ], [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5"), - makeOwner("id6"), - makeOwner("id7"), - makeOwner("id8") + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8") ], [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5"), - makeOwner("id6"), - makeOwner("id7"), - makeOwner("id8") + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8") ] ] - expectedOwners << [ + expectedUsers << [ [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4") + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4") ], [ - makeOwner("id2"), - makeOwner("id4"), - makeOwner("id5"), - makeOwner("id7") + makeUser("id2"), + makeUser("id4"), + makeUser("id5"), + makeUser("id7") ], [ - makeOwner("id2"), - makeOwner("id4") + makeUser("id2"), + makeUser("id4") ] ] } def "should match multi wildcard topic correctly"( List subscriptions, - List owners, + List users, String topicName, - List expectedOwners) { + List expectedUsers) { given: ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriptions.eachWithIndex { Subscription subscription, int i -> - subscriberTree.subscribe(owners.get(i), subscription) + subscriberTree.subscribe(users.get(i), subscription) } when: def found = subscriberTree.matches(TopicName.valueOf(topicName)) - .collect { it.resolveOwner() } + .collect { it.resolveUser() } then: - found ==~ expectedOwners + found ==~ expectedUsers where: topicName << [ "/topic/segment1/segment2", @@ -264,73 +264,73 @@ class SubscriberTreeTest extends UnitSpecification { makeSubscription("/topic/segment3/#") ] ] - owners << [ - [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5"), - makeOwner("id6"), - makeOwner("id7"), - makeOwner("id8"), - makeOwner("id9") + users << [ + [ + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8"), + makeUser("id9") ], [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5"), - makeOwner("id6"), - makeOwner("id7"), - makeOwner("id8"), - makeOwner("id9") + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8"), + makeUser("id9") ], [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5"), - makeOwner("id6"), - makeOwner("id7"), - makeOwner("id8"), - makeOwner("id9") + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5"), + makeUser("id6"), + makeUser("id7"), + makeUser("id8"), + makeUser("id9") ] ] - expectedOwners << [ + expectedUsers << [ [ - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5") + makeUser("id1"), + makeUser("id2"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5") ], [ - makeOwner("id8"), - makeOwner("id9"), - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5") + makeUser("id8"), + makeUser("id9"), + makeUser("id3"), + makeUser("id4"), + makeUser("id5") ], [ - makeOwner("id3"), - makeOwner("id4"), - makeOwner("id5") + makeUser("id3"), + makeUser("id4"), + makeUser("id5") ] ] } def "should choose strongest QoS when the same subscriber has several matches"( List subscriptions, - List owners, + List users, String topicName, List expectedSubscribers) { given: ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() subscriptions.eachWithIndex { Subscription subscription, int i -> - subscriberTree.subscribe(owners.get(i), subscription) + subscriberTree.subscribe(users.get(i), subscription) } when: def found = subscriberTree.matches(TopicName.valueOf(topicName)) @@ -377,56 +377,56 @@ class SubscriberTreeTest extends UnitSpecification { makeSubscription("/topic/#", 0) ] ] - owners << [ - [ - makeOwner("id1"), - makeOwner("id1"), - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id2"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id3"), - makeOwner("id3") + users << [ + [ + makeUser("id1"), + makeUser("id1"), + makeUser("id1"), + makeUser("id2"), + makeUser("id2"), + makeUser("id2"), + makeUser("id3"), + makeUser("id3"), + makeUser("id3") ], [ - makeOwner("id1"), - makeOwner("id1"), - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id2"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id3"), - makeOwner("id3") + makeUser("id1"), + makeUser("id1"), + makeUser("id1"), + makeUser("id2"), + makeUser("id2"), + makeUser("id2"), + makeUser("id3"), + makeUser("id3"), + makeUser("id3") ], [ - makeOwner("id1"), - makeOwner("id1"), - makeOwner("id1"), - makeOwner("id2"), - makeOwner("id2"), - makeOwner("id2"), - makeOwner("id3"), - makeOwner("id3"), - makeOwner("id3") + makeUser("id1"), + makeUser("id1"), + makeUser("id1"), + makeUser("id2"), + makeUser("id2"), + makeUser("id2"), + makeUser("id3"), + makeUser("id3"), + makeUser("id3") ] ] expectedSubscribers << [ [ - new SingleSubscriber(makeOwner("id1"), makeSubscription("/topic/segment1/segment2", 2)), - new SingleSubscriber(makeOwner("id2"), makeSubscription("/topic/segment1/#", 1)), - new SingleSubscriber(makeOwner("id3"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/segment1/segment2", 2)), + new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/segment1/#", 1)), + new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/#", 0)), ], [ - new SingleSubscriber(makeOwner("id1"), makeSubscription("/topic/#", 0)), - new SingleSubscriber(makeOwner("id2"), makeSubscription("/topic/#", 0)), - new SingleSubscriber(makeOwner("id3"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/#", 0)), ], [ - new SingleSubscriber(makeOwner("id1"), makeSubscription("/topic/#", 0)), - new SingleSubscriber(makeOwner("id2"), makeSubscription("/topic/#", 0)), - new SingleSubscriber(makeOwner("id3"), makeSubscription("/topic/segment2/#", 1)), + new SingleSubscriber(makeUser("id1"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id2"), makeSubscription("/topic/#", 0)), + new SingleSubscriber(makeUser("id3"), makeSubscription("/topic/segment2/#", 1)), ] ] } @@ -436,26 +436,26 @@ class SubscriberTreeTest extends UnitSpecification { def group1 = ["id1", "id2", "id3", "id4", "id5"] def group2 = ["id6", "id7", "id8", "id9", "id10"] ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() - subscriberTree.subscribe(makeOwner("id1"), makeSharedSubscription('$share/group1/topic/name1')) - subscriberTree.subscribe(makeOwner("id2"), makeSharedSubscription('$share/group1/topic/name1')) - subscriberTree.subscribe(makeOwner("id3"), makeSharedSubscription('$share/group1/topic/name1')) - subscriberTree.subscribe(makeOwner("id4"), makeSharedSubscription('$share/group1/topic/name1')) - subscriberTree.subscribe(makeOwner("id5"), makeSharedSubscription('$share/group1/topic/name1')) - subscriberTree.subscribe(makeOwner("id6"), makeSharedSubscription('$share/group2/topic/name1')) - subscriberTree.subscribe(makeOwner("id7"), makeSharedSubscription('$share/group2/topic/name1')) - subscriberTree.subscribe(makeOwner("id8"), makeSharedSubscription('$share/group2/topic/name1')) - subscriberTree.subscribe(makeOwner("id9"), makeSharedSubscription('$share/group2/topic/name1')) - subscriberTree.subscribe(makeOwner("id10"), makeSharedSubscription('$share/group2/topic/name1')) + subscriberTree.subscribe(makeUser("id1"), makeSharedSubscription('$share/group1/topic/name1')) + subscriberTree.subscribe(makeUser("id2"), makeSharedSubscription('$share/group1/topic/name1')) + subscriberTree.subscribe(makeUser("id3"), makeSharedSubscription('$share/group1/topic/name1')) + subscriberTree.subscribe(makeUser("id4"), makeSharedSubscription('$share/group1/topic/name1')) + subscriberTree.subscribe(makeUser("id5"), makeSharedSubscription('$share/group1/topic/name1')) + subscriberTree.subscribe(makeUser("id6"), makeSharedSubscription('$share/group2/topic/name1')) + subscriberTree.subscribe(makeUser("id7"), makeSharedSubscription('$share/group2/topic/name1')) + subscriberTree.subscribe(makeUser("id8"), makeSharedSubscription('$share/group2/topic/name1')) + subscriberTree.subscribe(makeUser("id9"), makeSharedSubscription('$share/group2/topic/name1')) + subscriberTree.subscribe(makeUser("id10"), makeSharedSubscription('$share/group2/topic/name1')) when: def matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) - .collect { it.owner().toString() } + .collect { it.user().toString() } then: matched.size() == 2 when: def matched2 = subscriberTree .matches(TopicName.valueOf("topic/name1")) - .collect { it.owner().toString() } + .collect { it.user().toString() } then: matched2.size() == 2 matched2 != matched @@ -469,33 +469,33 @@ class SubscriberTreeTest extends UnitSpecification { def "should subscribe and unsubscribe simple topic correctly correctly"() { given: ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() - subscriberTree.subscribe(makeOwner("id1"), makeSubscription('topic/name1')) - subscriberTree.subscribe(makeOwner("id2"), makeSubscription('topic/name1')) - subscriberTree.subscribe(makeOwner("id3"), makeSubscription('topic/name1')) + subscriberTree.subscribe(makeUser("id1"), makeSubscription('topic/name1')) + subscriberTree.subscribe(makeUser("id2"), makeSubscription('topic/name1')) + subscriberTree.subscribe(makeUser("id3"), makeSubscription('topic/name1')) when: def matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) - .collect { it.owner().toString() } + .collect { it.user().toString() } .toSet() then: matched.size() == 3 when: - def id2WasUnsubscribed = subscriberTree.unsubscribe(makeOwner("id2"), TopicFilter.valueOf('topic/name1')) - def id3WasUnsubscribed = subscriberTree.unsubscribe(makeOwner("id3"), TopicFilter.valueOf('topic/name1')) + def id2WasUnsubscribed = subscriberTree.unsubscribe(makeUser("id2"), TopicFilter.valueOf('topic/name1')) + def id3WasUnsubscribed = subscriberTree.unsubscribe(makeUser("id3"), TopicFilter.valueOf('topic/name1')) matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) - .collect { it.owner().toString() } + .collect { it.user().toString() } .toSet() then: matched.size() == 1 id2WasUnsubscribed id3WasUnsubscribed when: - def id1WasUnsubscribed = subscriberTree.unsubscribe(makeOwner("id1"), TopicFilter.valueOf('topic/name1')) - id3WasUnsubscribed = subscriberTree.unsubscribe(makeOwner("id3"), TopicFilter.valueOf('topic/name1')) + def id1WasUnsubscribed = subscriberTree.unsubscribe(makeUser("id1"), TopicFilter.valueOf('topic/name1')) + id3WasUnsubscribed = subscriberTree.unsubscribe(makeUser("id3"), TopicFilter.valueOf('topic/name1')) matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) - .collect { it.owner().toString() } + .collect { it.user().toString() } .toSet() then: matched.size() == 0 @@ -506,33 +506,33 @@ class SubscriberTreeTest extends UnitSpecification { def "should subscribe and unsubscribe shared topic correctly correctly"() { given: ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() - subscriberTree.subscribe(makeOwner("id1"), makeSharedSubscription('$share/group1/topic/name1')) - subscriberTree.subscribe(makeOwner("id2"), makeSharedSubscription('$share/group1/topic/name1')) - subscriberTree.subscribe(makeOwner("id3"), makeSharedSubscription('$share/group1/topic/name1')) + subscriberTree.subscribe(makeUser("id1"), makeSharedSubscription('$share/group1/topic/name1')) + subscriberTree.subscribe(makeUser("id2"), makeSharedSubscription('$share/group1/topic/name1')) + subscriberTree.subscribe(makeUser("id3"), makeSharedSubscription('$share/group1/topic/name1')) when: def matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) - .collect { it.owner().toString() } + .collect { it.user().toString() } .toSet() then: matched.size() == 1 when: - def id2WasUnsubscribed = subscriberTree.unsubscribe(makeOwner("id2"), SharedTopicFilter.valueOf('$share/group1/topic/name1')) - def id3WasUnsubscribed = subscriberTree.unsubscribe(makeOwner("id3"), SharedTopicFilter.valueOf('$share/group1/topic/name1')) + def id2WasUnsubscribed = subscriberTree.unsubscribe(makeUser("id2"), SharedTopicFilter.valueOf('$share/group1/topic/name1')) + def id3WasUnsubscribed = subscriberTree.unsubscribe(makeUser("id3"), SharedTopicFilter.valueOf('$share/group1/topic/name1')) matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) - .collect { it.owner().toString() } + .collect { it.user().toString() } .toSet() then: matched.size() == 1 id2WasUnsubscribed id3WasUnsubscribed when: - def id1WasUnsubscribed = subscriberTree.unsubscribe(makeOwner("id1"), SharedTopicFilter.valueOf('$share/group1/topic/name1')) - id3WasUnsubscribed = subscriberTree.unsubscribe(makeOwner("id3"), SharedTopicFilter.valueOf('$share/group1/topic/name1')) + def id1WasUnsubscribed = subscriberTree.unsubscribe(makeUser("id1"), SharedTopicFilter.valueOf('$share/group1/topic/name1')) + id3WasUnsubscribed = subscriberTree.unsubscribe(makeUser("id3"), SharedTopicFilter.valueOf('$share/group1/topic/name1')) matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) - .collect { it.owner().toString() } + .collect { it.user().toString() } .toSet() then: matched.size() == 0 @@ -543,11 +543,11 @@ class SubscriberTreeTest extends UnitSpecification { def "should replace the same subscriptions"() { given: ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() - def owner1 = makeOwner("id1") + def owner1 = makeUser("id1") def originalSub = makeSubscription('topic/name1') def replacementSub = makeSubscription('topic/name1') - subscriberTree.subscribe(makeOwner("id2"), makeSubscription('topic/name1')) - subscriberTree.subscribe(makeOwner("id3"), makeSubscription('topic/name1')) + subscriberTree.subscribe(makeUser("id2"), makeSubscription('topic/name1')) + subscriberTree.subscribe(makeUser("id3"), makeSubscription('topic/name1')) when: def previous = subscriberTree.subscribe(owner1, originalSub) def matched = subscriberTree @@ -571,8 +571,8 @@ class SubscriberTreeTest extends UnitSpecification { def "should extend shared subscription group on multiply subscribing by the same topic"() { given: ConcurrentSubscriberTree subscriberTree = new ConcurrentSubscriberTree() - def owner1 = makeOwner("id1") - def owner2 = makeOwner("id2") + def owner1 = makeUser("id1") + def owner2 = makeUser("id2") subscriberTree.subscribe(owner1, makeSharedSubscription('$share/group1/topic/name1')) subscriberTree.subscribe(owner2, makeSharedSubscription('$share/group1/topic/name1')) when: @@ -581,21 +581,21 @@ class SubscriberTreeTest extends UnitSpecification { .toSet() then: matched.size() == 1 - matched.first().owner() == owner2 + matched.first().user() == owner2 when: matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) .toSet() then: matched.size() == 1 - matched.first().owner() == owner1 + matched.first().user() == owner1 when: matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) .toSet() then: matched.size() == 1 - matched.first().owner() == owner2 + matched.first().user() == owner2 when: subscriberTree.subscribe(owner1, makeSharedSubscription('$share/group1/topic/name1')) matched = subscriberTree @@ -603,25 +603,25 @@ class SubscriberTreeTest extends UnitSpecification { .toSet() then: matched.size() == 1 - matched.first().owner() == owner2 + matched.first().user() == owner2 when: matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) .toSet() then: matched.size() == 1 - matched.first().owner() == owner1 + matched.first().user() == owner1 when: matched = subscriberTree .matches(TopicName.valueOf("topic/name1")) .toSet() then: matched.size() == 1 - matched.first().owner() == owner1 + matched.first().user() == owner1 } - static def makeOwner(String id) { - return new TestSubscriptionOwner(id) + static def makeUser(String id) { + return new TestMqttUser(id) } static def makeSubscription(String topicFilter) { diff --git a/model/src/testFixtures/groovy/javasabr/mqtt/model/subscription/TestMqttUser.groovy b/model/src/testFixtures/groovy/javasabr/mqtt/model/subscription/TestMqttUser.groovy new file mode 100644 index 00000000..2946ac61 --- /dev/null +++ b/model/src/testFixtures/groovy/javasabr/mqtt/model/subscription/TestMqttUser.groovy @@ -0,0 +1,34 @@ +package javasabr.mqtt.model.subscription + +import com.fasterxml.jackson.annotation.JsonValue +import javasabr.mqtt.model.MqttUser +import javasabr.mqtt.model.session.MqttSession + +record TestMqttUser(String id) implements MqttUser { + + @Override + String clientId() { + return id + } + + @JsonValue + @Override + String toString() { + return id + } + + @Override + String userName() { + return null + } + + @Override + String ipAddress() { + return "localhost" + } + + @Override + MqttSession session() { + return null + } +} diff --git a/model/src/testFixtures/groovy/javasabr/mqtt/model/subscription/TestSubscriptionOwner.groovy b/model/src/testFixtures/groovy/javasabr/mqtt/model/subscription/TestSubscriptionOwner.groovy deleted file mode 100644 index c9dde7ca..00000000 --- a/model/src/testFixtures/groovy/javasabr/mqtt/model/subscription/TestSubscriptionOwner.groovy +++ /dev/null @@ -1,12 +0,0 @@ -package javasabr.mqtt.model.subscription - -import com.fasterxml.jackson.annotation.JsonValue - -record TestSubscriptionOwner(String id) implements SubscriptionOwner { - - @JsonValue - @Override - String toString() { - return id - } -} \ No newline at end of file diff --git a/network/src/main/java/javasabr/mqtt/network/MqttClient.java b/network/src/main/java/javasabr/mqtt/network/MqttClient.java index 52c5dbbc..c6f0720f 100644 --- a/network/src/main/java/javasabr/mqtt/network/MqttClient.java +++ b/network/src/main/java/javasabr/mqtt/network/MqttClient.java @@ -3,14 +3,13 @@ import java.util.concurrent.CompletableFuture; import javasabr.mqtt.model.MqttClientConnectionConfig; import javasabr.mqtt.model.MqttUser; -import javasabr.mqtt.model.subscription.SubscriptionOwner; import javasabr.mqtt.network.message.out.ConnectAckMqtt311OutMessage; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import org.jspecify.annotations.Nullable; import reactor.core.publisher.Mono; -public interface MqttClient extends SubscriptionOwner, MqttUser { +public interface MqttClient extends MqttUser { interface UnsafeMqttClient extends MqttClient { @@ -18,17 +17,16 @@ interface UnsafeMqttClient extends MqttClient { void clientId(String clientId); - void session(@Nullable MqttSession session); + void session(@Nullable MqttNetworkSession session); void reject(ConnectAckMqtt311OutMessage connectAsk); Mono release(); } - String clientId(); - - @Nullable - MqttSession session(); + @Nullable + @Override + MqttNetworkSession session(); MqttClientConnectionConfig connectionConfig(); diff --git a/network/src/main/java/javasabr/mqtt/network/impl/AbstractMqttClient.java b/network/src/main/java/javasabr/mqtt/network/impl/AbstractMqttClient.java index 25caa187..b26df952 100644 --- a/network/src/main/java/javasabr/mqtt/network/impl/AbstractMqttClient.java +++ b/network/src/main/java/javasabr/mqtt/network/impl/AbstractMqttClient.java @@ -9,7 +9,7 @@ import javasabr.mqtt.network.handler.MqttClientReleaseHandler; import javasabr.mqtt.network.message.out.ConnectAckMqtt311OutMessage; import javasabr.mqtt.network.message.out.MqttOutMessage; -import javasabr.mqtt.network.session.MqttSession; +import javasabr.mqtt.network.session.MqttNetworkSession; import lombok.AccessLevel; import lombok.CustomLog; import lombok.Getter; @@ -35,11 +35,13 @@ public abstract class AbstractMqttClient implements UnsafeMqttClient { @Setter volatile String clientId; + @Setter + volatile String userName; @Setter @Getter @Nullable - volatile MqttSession session; + volatile MqttNetworkSession session; public AbstractMqttClient(MqttConnection connection, MqttClientReleaseHandler releaseHandler) { this.connection = connection; @@ -48,6 +50,11 @@ public AbstractMqttClient(MqttConnection connection, MqttClientReleaseHandler re this.clientId = connection.remoteAddress(); } + @Override + public String ipAddress() { + return connection.remoteAddress(); + } + @Override public void send(MqttOutMessage message) { log.debug(clientId, message.name(), message, "[%s] Send to client packet:[%s] %s"::formatted); diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/AuthenticationMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/AuthenticationMqttInMessage.java index fad21d90..8461d686 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/AuthenticationMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/AuthenticationMqttInMessage.java @@ -76,15 +76,15 @@ public AuthenticationMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.AUTHENTICATION.name(); + public MqttMessageType messageType() { + return MqttMessageType.AUTHENTICATION; } - + @Override protected boolean validMessageFlags(byte messageFlags) { return messageFlags == MESSAGE_FLAGS; diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java index ac992cee..839b8b45 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectAckMqttInMessage.java @@ -292,13 +292,13 @@ public ConnectAckMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.CONNECT_ACK.name(); + public MqttMessageType messageType() { + return MqttMessageType.CONNECT_ACK; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java index 1184e0d6..d41e86bb 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/ConnectMqttInMessage.java @@ -226,10 +226,15 @@ public ConnectMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.CONNECT; + } + @Override protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) { var connectionConfig = connection.serverConnectionConfig(); diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/DisconnectMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/DisconnectMqttInMessage.java index 568bc33b..de7225bd 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/DisconnectMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/DisconnectMqttInMessage.java @@ -79,10 +79,15 @@ public DisconnectMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.DISCONNECT; + } + @Override protected void readImpl(MqttConnection connection, ByteBuffer buffer) { this.sessionExpiryInterval = connection diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java index 85eb3818..e5d72ba0 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/MqttInMessage.java @@ -16,7 +16,7 @@ import javasabr.mqtt.model.exception.ConnectionRejectException; import javasabr.mqtt.model.exception.MalformedProtocolMqttException; import javasabr.mqtt.model.exception.MqttException; -import javasabr.mqtt.model.message.MqttMessageType; +import javasabr.mqtt.model.message.MqttMessage; import javasabr.mqtt.model.reason.code.ConnectAckReasonCode; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.util.MqttDataUtils; @@ -32,7 +32,8 @@ @Accessors(fluent = true, chain = false) @FieldDefaults(level = AccessLevel.PROTECTED) -public abstract class MqttInMessage extends AbstractReadableNetworkPacket { +public abstract class MqttInMessage extends AbstractReadableNetworkPacket + implements MqttMessage { static { DebugUtils.registerIncludedFields("userProperties"); @@ -80,7 +81,7 @@ protected boolean validMessageFlags(byte messageFlags) { return true; } - public abstract byte messageType(); + public abstract byte messageTypeId(); public Array userProperties() { return userProperties == null ? EMPTY_USER_PROPERTIES : userProperties; @@ -273,7 +274,7 @@ protected void alreadyPresentedProperty(MqttMessageProperty property) { @Override public String name() { - return MqttMessageType.fromByte(messageType()).name(); + return messageType().name(); } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/PingRequestMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/PingRequestMqttInMessage.java index c79b72b6..8fa8a6de 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/PingRequestMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/PingRequestMqttInMessage.java @@ -14,7 +14,12 @@ public PingRequestMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } + + @Override + public MqttMessageType messageType() { + return MqttMessageType.PING_REQUEST; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/PingResponseMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/PingResponseMqttInMessage.java index 436909fc..ed2f6b93 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/PingResponseMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/PingResponseMqttInMessage.java @@ -14,7 +14,12 @@ public PingResponseMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } + + @Override + public MqttMessageType messageType() { + return MqttMessageType.PING_RESPONSE; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/PublishAckMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/PublishAckMqttInMessage.java index 3df0ba6d..902ac5c7 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/PublishAckMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/PublishAckMqttInMessage.java @@ -3,15 +3,15 @@ import java.util.EnumSet; import java.util.Set; import javasabr.mqtt.model.MqttMessageProperty; -import javasabr.mqtt.model.TrackableMessage; import javasabr.mqtt.model.message.MqttMessageType; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.reason.code.PublishAckReasonCode; /** * Publish acknowledgment (QoS 1). */ public class PublishAckMqttInMessage extends PublishControlMqttInMessage - implements TrackableMessage { + implements TrackableMqttMessage { private static final int MESSAGE_TYPE = MqttMessageType.PUBLISH_ACK.ordinal(); @@ -41,15 +41,15 @@ public PublishAckMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return (byte) MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.PUBLISH_ACK.name(); + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH_ACK; } - + @Override protected PublishAckReasonCode defaultReasonCode() { return PublishAckReasonCode.SUCCESS; diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/PublishCompleteMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/PublishCompleteMqttInMessage.java index 80e4e787..3827f2ff 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/PublishCompleteMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/PublishCompleteMqttInMessage.java @@ -3,15 +3,15 @@ import java.util.EnumSet; import java.util.Set; import javasabr.mqtt.model.MqttMessageProperty; -import javasabr.mqtt.model.TrackableMessage; import javasabr.mqtt.model.message.MqttMessageType; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.reason.code.PublishCompletedReasonCode; /** * Publish complete (QoS 2 delivery part 3). */ public class PublishCompleteMqttInMessage extends PublishControlMqttInMessage - implements TrackableMessage { + implements TrackableMqttMessage { private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PUBLISH_COMPLETE.ordinal(); @@ -41,13 +41,13 @@ public PublishCompleteMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.PUBLISH_COMPLETE.name(); + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH_COMPLETE; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/PublishControlMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/PublishControlMqttInMessage.java index c042c368..87a359b7 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/PublishControlMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/PublishControlMqttInMessage.java @@ -4,7 +4,7 @@ import javasabr.mqtt.base.util.DebugUtils; import javasabr.mqtt.model.MqttMessageProperty; import javasabr.mqtt.model.MqttVersion; -import javasabr.mqtt.model.TrackableMessage; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.reason.code.ReasonCode; import javasabr.mqtt.network.MqttConnection; import lombok.AccessLevel; @@ -17,7 +17,7 @@ @Accessors @FieldDefaults(level = AccessLevel.PROTECTED) public abstract class PublishControlMqttInMessage extends TrackableMqttInMessage - implements TrackableMessage { + implements TrackableMqttMessage { public static final byte MESSAGE_FLAGS = 0b0000_0000; diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/PublishMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/PublishMqttInMessage.java index c049b9dc..4b5c5005 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/PublishMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/PublishMqttInMessage.java @@ -290,15 +290,15 @@ public PublishMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.PUBLISH.name(); + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH; } - + @Override protected void readVariableHeader(MqttConnection connection, ByteBuffer buffer) { MqttClientConnectionConfig connectionConfig = connection.clientConnectionConfig(); diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/PublishReceivedMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/PublishReceivedMqttInMessage.java index b4062216..d25cfd98 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/PublishReceivedMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/PublishReceivedMqttInMessage.java @@ -3,15 +3,15 @@ import java.util.EnumSet; import java.util.Set; import javasabr.mqtt.model.MqttMessageProperty; -import javasabr.mqtt.model.TrackableMessage; import javasabr.mqtt.model.message.MqttMessageType; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.reason.code.PublishReceivedReasonCode; /** * Publish received (QoS 2 delivery part 1). */ public class PublishReceivedMqttInMessage extends PublishControlMqttInMessage - implements TrackableMessage { + implements TrackableMqttMessage { private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PUBLISH_RECEIVED.ordinal(); @@ -41,15 +41,15 @@ public PublishReceivedMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.PUBLISH_RECEIVED.name(); + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH_RECEIVED; } - + @Override protected PublishReceivedReasonCode defaultReasonCode() { return PublishReceivedReasonCode.SUCCESS; diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/PublishReleaseMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/PublishReleaseMqttInMessage.java index 791df543..39ce7d0c 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/PublishReleaseMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/PublishReleaseMqttInMessage.java @@ -3,15 +3,15 @@ import java.util.EnumSet; import java.util.Set; import javasabr.mqtt.model.MqttMessageProperty; -import javasabr.mqtt.model.TrackableMessage; import javasabr.mqtt.model.message.MqttMessageType; +import javasabr.mqtt.model.message.TrackableMqttMessage; import javasabr.mqtt.model.reason.code.PublishReleaseReasonCode; /** * Publish release (QoS 2 delivery part 2). */ public class PublishReleaseMqttInMessage extends PublishControlMqttInMessage - implements TrackableMessage { + implements TrackableMqttMessage { public static final byte MESSAGE_FLAGS = 0b0000_0010; private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PUBLISH_RELEASE.ordinal(); @@ -42,15 +42,15 @@ public PublishReleaseMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.PUBLISH_RELEASE.name(); + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH_RELEASE; } - + @Override protected boolean validMessageFlags(byte messageFlags) { return messageFlags == MESSAGE_FLAGS; diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/SubscribeAckMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/SubscribeAckMqttInMessage.java index ad49aea9..d7f2dfd8 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/SubscribeAckMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/SubscribeAckMqttInMessage.java @@ -66,15 +66,15 @@ public SubscribeAckMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.SUBSCRIBE_ACK.name(); + public MqttMessageType messageType() { + return MqttMessageType.SUBSCRIBE_ACK; } - + @Override protected boolean validMessageFlags(byte messageFlags) { return messageFlags == MESSAGE_FLAGS; diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/SubscribeMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/SubscribeMqttInMessage.java index c5d43b68..0941300d 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/SubscribeMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/SubscribeMqttInMessage.java @@ -70,13 +70,13 @@ public SubscribeMqttInMessage(byte info) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.SUBSCRIBE.name(); + public MqttMessageType messageType() { + return MqttMessageType.SUBSCRIBE; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/UnsubscribeAckMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/UnsubscribeAckMqttInMessage.java index fa738721..632d262b 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/UnsubscribeAckMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/UnsubscribeAckMqttInMessage.java @@ -68,13 +68,13 @@ public UnsubscribeAckMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } @Override - public String name() { - return MqttMessageType.UNSUBSCRIBE_ACK.name(); + public MqttMessageType messageType() { + return MqttMessageType.UNSUBSCRIBE_ACK; } @Override diff --git a/network/src/main/java/javasabr/mqtt/network/message/in/UnsubscribeMqttInMessage.java b/network/src/main/java/javasabr/mqtt/network/message/in/UnsubscribeMqttInMessage.java index 5e3d6956..8d48ef9e 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/in/UnsubscribeMqttInMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/in/UnsubscribeMqttInMessage.java @@ -49,10 +49,15 @@ public UnsubscribeMqttInMessage(byte messageFlags) { } @Override - public byte messageType() { + public byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.UNSUBSCRIBE; + } + @Override protected boolean validMessageFlags(byte messageFlags) { return messageFlags == 0b0000_0010; diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessage.java index 64eb9618..228d205f 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/AuthenticationMqtt5OutMessage.java @@ -66,10 +66,15 @@ public class AuthenticationMqtt5OutMessage extends MqttOutMessage { Array userProperties; @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.AUTHENTICATION; + } + @Override protected void writeVariableHeader(MqttConnection connection, ByteBuffer buffer) { // https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901219 diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessage.java index 92a18b8d..8a7c46a9 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectAckMqtt311OutMessage.java @@ -39,10 +39,15 @@ public class ConnectAckMqtt311OutMessage extends MqttOutMessage { boolean sessionPresent; @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.CONNECT_ACK; + } + @Override public int expectedLength(MqttConnection connection) { return PACKET_ID_SIZE; diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt311OutMessage.java index 1e6f61ef..b1386763 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/ConnectMqtt311OutMessage.java @@ -52,10 +52,15 @@ protected MqttVersion mqttVersion() { } @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.CONNECT; + } + @Override protected void writeVariableHeader(MqttConnection connection, ByteBuffer buffer) { MqttVersion mqttVersion = mqttVersion(); diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt311OutMessage.java index 8bbe7aa3..3664d6c7 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/DisconnectMqtt311OutMessage.java @@ -16,7 +16,12 @@ public int expectedLength(MqttConnection connection) { } @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + + @Override + public MqttMessageType messageType() { + return MqttMessageType.DISCONNECT; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/MqttOutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/MqttOutMessage.java index cbb0fcb8..5b08d3eb 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/MqttOutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/MqttOutMessage.java @@ -5,6 +5,7 @@ import javasabr.mqtt.base.util.DebugUtils; import javasabr.mqtt.model.MqttMessageProperty; import javasabr.mqtt.model.data.type.StringPair; +import javasabr.mqtt.model.message.MqttMessage; import javasabr.mqtt.network.MqttConnection; import javasabr.mqtt.network.util.MqttDataUtils; import javasabr.rlib.collections.array.Array; @@ -14,12 +15,11 @@ import org.jspecify.annotations.Nullable; @RequiredArgsConstructor -public abstract class MqttOutMessage extends AbstractWritableNetworkPacket { - - public static final Array EMPTY_USER_PROPERTIES = Array.empty(StringPair.class); - - private static final ThreadLocal LOCAL_BUFFER = ThreadLocal.withInitial(() -> ByteBuffer.allocate( - 1024 * 1024)); +public abstract class MqttOutMessage extends AbstractWritableNetworkPacket + implements MqttMessage { + + private static final ThreadLocal LOCAL_BUFFER = ThreadLocal + .withInitial(() -> ByteBuffer.allocate(1024 * 1024)); protected static final int PACKET_ID_SIZE = 2; @@ -45,12 +45,12 @@ protected boolean isPropertiesSupported(MqttConnection connection) { protected void writeProperties(MqttConnection connection, ByteBuffer buffer) {} public final int messageTypeAndFlags() { - byte type = messageType(); + byte type = messageTypeId(); byte controlFlags = messageFlags(); return NumberUtils.setHighByteBits(controlFlags, type); } - protected byte messageType() { + protected byte messageTypeId() { throw new UnsupportedOperationException(); } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/PingRequestMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/PingRequestMqtt311OutMessage.java index 1c6c0a17..c513ea2d 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/PingRequestMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/PingRequestMqtt311OutMessage.java @@ -10,7 +10,12 @@ public class PingRequestMqtt311OutMessage extends MqttOutMessage { private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PING_REQUEST.ordinal(); @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + + @Override + public MqttMessageType messageType() { + return MqttMessageType.PING_REQUEST; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/PingResponseMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/PingResponseMqtt311OutMessage.java index 99c338fc..1aed8ddd 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/PingResponseMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/PingResponseMqtt311OutMessage.java @@ -10,7 +10,12 @@ public class PingResponseMqtt311OutMessage extends MqttOutMessage { private static final byte MESSAGE_TYPE = (byte) MqttMessageType.PING_RESPONSE.ordinal(); @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + + @Override + public MqttMessageType messageType() { + return MqttMessageType.PING_RESPONSE; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/PublishAckMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/PublishAckMqtt311OutMessage.java index b2cb0b3d..3a932eae 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/PublishAckMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/PublishAckMqtt311OutMessage.java @@ -20,7 +20,12 @@ public int expectedLength(MqttConnection connection) { } @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + + @Override + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH_ACK; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/PublishCompleteMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/PublishCompleteMqtt311OutMessage.java index cb9690ae..b4ce3fc2 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/PublishCompleteMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/PublishCompleteMqtt311OutMessage.java @@ -23,7 +23,12 @@ public int expectedLength(MqttConnection connection) { } @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + + @Override + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH_COMPLETE; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt311OutMessage.java index e9f074fe..a09159a6 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/PublishMqtt311OutMessage.java @@ -45,10 +45,15 @@ public PublishMqtt311OutMessage( } @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH; + } + @Override public int expectedLength(MqttConnection connection) { return 7 + payload.length; diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/PublishReceivedMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/PublishReceivedMqtt311OutMessage.java index a1cd0332..951677a3 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/PublishReceivedMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/PublishReceivedMqtt311OutMessage.java @@ -23,7 +23,12 @@ public int expectedLength(MqttConnection connection) { } @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + + @Override + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH_RECEIVED; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/PublishReleaseMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/PublishReleaseMqtt311OutMessage.java index 2bf88f76..0b607726 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/PublishReleaseMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/PublishReleaseMqtt311OutMessage.java @@ -19,10 +19,15 @@ public class PublishReleaseMqtt311OutMessage extends MqttOutMessage { int messageId; @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.PUBLISH_RELEASE; + } + @Override protected byte messageFlags() { return 0b0000_0010; @@ -38,4 +43,5 @@ protected void writeVariableHeader(MqttConnection connection, ByteBuffer buffer) // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718055 writeShort(buffer, messageId); } + } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeAckMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeAckMqtt311OutMessage.java index 2982074d..ed099a73 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeAckMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeAckMqtt311OutMessage.java @@ -15,7 +15,7 @@ * Subscribe acknowledgement. */ @Getter -@Accessors(fluent = true) +@Accessors @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class SubscribeAckMqtt311OutMessage extends TrackableMqttOutMessage { @@ -41,10 +41,15 @@ public int expectedLength(MqttConnection connection) { } @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.SUBSCRIBE_ACK; + } + @Override protected void writePayload(MqttConnection connection, ByteBuffer buffer) { // http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718071 diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeAckMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeAckMqtt5OutMessage.java index fb02a18e..ff1e4f25 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeAckMqtt5OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeAckMqtt5OutMessage.java @@ -18,7 +18,7 @@ * Subscribe acknowledgement. */ @Getter -@Accessors(fluent = true) +@Accessors @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class SubscribeAckMqtt5OutMessage extends SubscribeAckMqtt311OutMessage { diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeMqtt311OutMessage.java index cd041153..79461fcc 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/SubscribeMqtt311OutMessage.java @@ -25,10 +25,15 @@ public SubscribeMqtt311OutMessage(int messageId, Array subscriptio } @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + @Override + public MqttMessageType messageType() { + return MqttMessageType.SUBSCRIBE; + } + @Override protected byte messageFlags() { return 0b0000_0010; diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt311OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt311OutMessage.java index 5d953fbc..5611de81 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt311OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt311OutMessage.java @@ -11,7 +11,7 @@ * Unsubscribe acknowledgement. */ @Getter -@Accessors(fluent = true) +@Accessors @FieldDefaults(level = AccessLevel.PROTECTED, makeFinal = true) public class UnsubscribeAckMqtt311OutMessage extends TrackableMqttOutMessage { @@ -27,7 +27,12 @@ public int expectedLength(MqttConnection connection) { } @Override - protected byte messageType() { + protected byte messageTypeId() { return MESSAGE_TYPE; } + + @Override + public MqttMessageType messageType() { + return MqttMessageType.UNSUBSCRIBE_ACK; + } } diff --git a/network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt5OutMessage.java b/network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt5OutMessage.java index f9223f82..1962373d 100644 --- a/network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt5OutMessage.java +++ b/network/src/main/java/javasabr/mqtt/network/message/out/UnsubscribeAckMqtt5OutMessage.java @@ -17,7 +17,7 @@ * Unsubscribe acknowledgement. */ @Getter -@Accessors(fluent = true) +@Accessors @FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true) public class UnsubscribeAckMqtt5OutMessage extends UnsubscribeAckMqtt311OutMessage { diff --git a/network/src/main/java/javasabr/mqtt/network/session/MqttNetworkSession.java b/network/src/main/java/javasabr/mqtt/network/session/MqttNetworkSession.java new file mode 100644 index 00000000..fe04c2ef --- /dev/null +++ b/network/src/main/java/javasabr/mqtt/network/session/MqttNetworkSession.java @@ -0,0 +1,40 @@ +package javasabr.mqtt.network.session; + +import javasabr.mqtt.model.message.TrackableMqttMessage; +import javasabr.mqtt.model.publishing.Publish; +import javasabr.mqtt.model.session.MqttSession; +import javasabr.mqtt.network.MqttClient; + +public interface MqttNetworkSession extends MqttSession { + + interface UnsafeMqttNetworkSession extends MqttNetworkSession { + + void expirationTime(long expirationTime); + + void clear(); + + void onPersisted(); + + void onRestored(); + } + + interface PendingMessageHandler { + + /** + * @return true if pending packet can be removed. + */ + boolean handleResponse(MqttClient client, TrackableMqttMessage response); + + default void resend(MqttClient client, Publish publish) {} + } + + void resendPendingPackets(MqttClient client); + + boolean hasOutPending(); + + boolean hasOutPending(int messageId); + + void registerOutPublish(Publish publish, PendingMessageHandler handler); + + void updateOutPendingPacket(MqttClient client, TrackableMqttMessage response); +} diff --git a/network/src/main/java/javasabr/mqtt/network/session/MqttSession.java b/network/src/main/java/javasabr/mqtt/network/session/MqttSession.java deleted file mode 100644 index 2ed3cbab..00000000 --- a/network/src/main/java/javasabr/mqtt/network/session/MqttSession.java +++ /dev/null @@ -1,63 +0,0 @@ -package javasabr.mqtt.network.session; - -import javasabr.mqtt.model.TrackableMessage; -import javasabr.mqtt.model.publishing.Publish; -import javasabr.mqtt.model.session.ActiveSubscriptions; -import javasabr.mqtt.model.session.MessageTacker; -import javasabr.mqtt.model.session.ProcessingPublishes; -import javasabr.mqtt.model.session.TopicNameMapping; -import javasabr.mqtt.network.MqttClient; - -public interface MqttSession { - - interface UnsafeMqttSession extends MqttSession { - - void expirationTime(long expirationTime); - - void clear(); - - void onPersisted(); - - void onRestored(); - } - - interface PendingMessageHandler { - - /** - * @return true if pending packet can be removed. - */ - boolean handleResponse(MqttClient client, TrackableMessage response); - - default void resend(MqttClient client, Publish publish) {} - } - - String clientId(); - - int nextMessageId(); - - /** - * @return the expiration time in ms or -1 if it should not be expired now. - */ - long expirationTime(); - - void resendPendingPackets(MqttClient client); - - MessageTacker inMessageTracker(); - MessageTacker outMessageTracker(); - - ProcessingPublishes inProcessingPublishes(); - ProcessingPublishes outProcessingPublishes(); - - ActiveSubscriptions activeSubscriptions(); - - TopicNameMapping topicNameMapping(); - - boolean hasOutPending(); - - - boolean hasOutPending(int messageId); - - void registerOutPublish(Publish publish, PendingMessageHandler handler); - - void updateOutPendingPacket(MqttClient client, TrackableMessage response); -} diff --git a/network/src/testFixtures/groovy/javasabr/mqtt/network/SpecificationNetworkExtensions.groovy b/network/src/testFixtures/groovy/javasabr/mqtt/network/SpecificationNetworkExtensions.groovy index 86f7d7bc..b4d08aba 100644 --- a/network/src/testFixtures/groovy/javasabr/mqtt/network/SpecificationNetworkExtensions.groovy +++ b/network/src/testFixtures/groovy/javasabr/mqtt/network/SpecificationNetworkExtensions.groovy @@ -3,6 +3,7 @@ package javasabr.mqtt.network import javasabr.mqtt.model.MqttMessageProperty import javasabr.mqtt.model.data.type.MqttDataType import javasabr.mqtt.model.data.type.StringPair +import javasabr.mqtt.model.message.MqttMessageType import javasabr.mqtt.model.reason.code.ReasonCode import javasabr.mqtt.network.message.out.MqttOutMessage import javasabr.mqtt.network.util.MqttDataUtils @@ -18,6 +19,11 @@ class SpecificationNetworkExtensions extends Specification { @Override protected void writeImpl(MqttConnection connection, ByteBuffer buffer) {} + + @Override + MqttMessageType messageType() { + return MqttMessageType.PUBLISH + } } static ByteBuffer putByte(ByteBuffer self, int value) {