Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import javasabr.mqtt.service.ClientIdRegistry
import javasabr.mqtt.service.session.MqttSessionService
import org.springframework.beans.factory.annotation.Autowired

class MqttSessionServiceTest extends IntegrationSpecification {
class MqttNetworkSessionServiceTest extends IntegrationSpecification {

@Autowired
ClientIdRegistry clientIdRegistry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import spock.lang.Unroll

import java.util.concurrent.CompletionException

class SubscribtionServiceTest extends IntegrationSpecification {
class SubscriptionServiceTest extends IntegrationSpecification {

@Autowired
ClientIdRegistry clientIdRegistry
Expand All @@ -38,11 +38,11 @@ class SubscribtionServiceTest extends IntegrationSpecification {
.findSubscribers(topicName)
then: "should find the subscriber"
subscribers.size() == 1
subscribers.get(0).owner() instanceof MqttClient
subscribers.get(0).user() instanceof MqttClient
when:
def matchedSubscriber = subscribers.get(0)
def subscription = matchedSubscriber.subscription()
def owner = matchedSubscriber.owner() as MqttClient
def owner = matchedSubscriber.user() as MqttClient
then:
owner.clientId() == clientId
subscription.topicFilter().rawTopic() == topicFilter
Expand All @@ -61,11 +61,11 @@ class SubscribtionServiceTest extends IntegrationSpecification {
.findSubscribers(topicName)
then: "should find the reconnected subscriber"
subscribers3.size() == 1
subscribers3.get(0).owner() instanceof MqttClient
subscribers3.get(0).user() instanceof MqttClient
when:
matchedSubscriber = subscribers3.get(0)
subscription = matchedSubscriber.subscription()
owner = matchedSubscriber.owner() as MqttClient
owner = matchedSubscriber.user() as MqttClient
then:
owner.clientId() == clientId
subscription.topicFilter().rawTopic() == topicFilter
Expand Down Expand Up @@ -147,8 +147,8 @@ class SubscribtionServiceTest extends IntegrationSpecification {
def subscribers = subscriptionService.findSubscribers(TopicName.valueOf(topicName))
then:
subscribers.size() == targetCount
(subscribers[0].owner() as MqttClient).clientId() == clientId1
(subscribers[1].owner() as MqttClient).clientId() == clientId2
(subscribers[0].user() as MqttClient).clientId() == clientId1
(subscribers[1].user() as MqttClient).clientId() == clientId2
cleanup:
subscriber1.disconnect().join()
subscriber2.disconnect().join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.network.MqttClient;
import javasabr.mqtt.network.session.MqttSession;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.MutableArray;

Expand All @@ -32,7 +32,7 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
* @param subscriptions the list of request to subscribe topics
* @return array of subscribe ack reason codes
*/
Array<SubscribeAckReasonCode> subscribe(MqttClient client, MqttSession session, Array<Subscription> subscriptions);
Array<SubscribeAckReasonCode> subscribe(MqttClient client, MqttNetworkSession session, Array<Subscription> subscriptions);

/**
* Removes MQTT client from listening to the topics.
Expand All @@ -41,9 +41,9 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
* @param topicFilters topic filters
* @return array of unsubscribe ack reason codes
*/
Array<UnsubscribeAckReasonCode> unsubscribe(MqttClient client, MqttSession session, Array<TopicFilter> topicFilters);
Array<UnsubscribeAckReasonCode> unsubscribe(MqttClient client, MqttNetworkSession session, Array<TopicFilter> topicFilters);

void cleanSubscriptions(MqttClient client, MqttSession session);
void cleanSubscriptions(MqttClient client, MqttNetworkSession session);

void restoreSubscriptions(MqttClient client, MqttSession session);
void restoreSubscriptions(MqttClient client, MqttNetworkSession session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import javasabr.mqtt.network.MqttClient.UnsafeMqttClient;
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
import javasabr.mqtt.network.impl.AbstractMqttClient;
import javasabr.mqtt.network.session.MqttSession;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.service.ClientIdRegistry;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.session.MqttSessionService;
Expand Down Expand Up @@ -43,7 +43,7 @@ protected Mono<?> releaseImpl(T client) {
return Mono.empty();
}

MqttSession session = client.session();
MqttNetworkSession session = client.session();
Mono<?> asyncActions = null;

if (session != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ protected void processReceivedValidMessage(
"[%s] Received from client valid message:[%s] %s"::formatted);

try {
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageType()];
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageTypeId()];
//noinspection DataFlowIssue
messageHandler.processValidMessage(connection, mqttInMessage);
} catch (IndexOutOfBoundsException | NullPointerException ex) {
Expand All @@ -97,7 +97,7 @@ protected void processReceivedInvalidMessage(
"[%s] Received from client invalid message:[%s] %s"::formatted);

try {
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageType()];
MqttInMessageHandler messageHandler = inMessageHandlers[mqttInMessage.messageTypeId()];
//noinspection DataFlowIssue
messageHandler.processInvalidMessage(connection, mqttInMessage);
} catch (IndexOutOfBoundsException | NullPointerException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.network.MqttClient;
import javasabr.mqtt.network.session.MqttSession;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.ArrayFactory;
Expand All @@ -40,7 +40,7 @@ public InMemorySubscriptionService() {
@Override
public MqttClient resolveClient(Subscriber subscriber) {
if (subscriber instanceof SingleSubscriber single) {
return (MqttClient) single.owner();
return (MqttClient) single.user();
}
throw new IllegalArgumentException("Unexpected subscriber: " + subscriber);
}
Expand All @@ -55,7 +55,7 @@ public Array<SingleSubscriber> findSubscribersTo(MutableArray<SingleSubscriber>
@Override
public Array<SubscribeAckReasonCode> subscribe(
MqttClient client,
MqttSession session,
MqttNetworkSession session,
Array<Subscription> subscriptions) {

MutableArray<SubscribeAckReasonCode> subscribeResults = ArrayFactory.mutableArray(
Expand All @@ -69,7 +69,7 @@ public Array<SubscribeAckReasonCode> subscribe(
return subscribeResults;
}

private SubscribeAckReasonCode addSubscription(MqttClient client, MqttSession session, Subscription subscription) {
private SubscribeAckReasonCode addSubscription(MqttClient client, MqttNetworkSession session, Subscription subscription) {
MqttClientConnectionConfig connectionConfig = client.connectionConfig();
TopicFilter topicFilter = subscription.topicFilter();
if (topicFilter.isInvalid()) {
Expand All @@ -91,7 +91,7 @@ private SubscribeAckReasonCode addSubscription(MqttClient client, MqttSession se
@Override
public Array<UnsubscribeAckReasonCode> unsubscribe(
MqttClient client,
MqttSession session,
MqttNetworkSession session,
Array<TopicFilter> topicFilters) {

MutableArray<UnsubscribeAckReasonCode> unsubscribeResults = ArrayFactory.mutableArray(
Expand All @@ -105,7 +105,7 @@ public Array<UnsubscribeAckReasonCode> unsubscribe(
return unsubscribeResults;
}

private UnsubscribeAckReasonCode removeSubscription(MqttClient client, MqttSession session, TopicFilter topicFilter) {
private UnsubscribeAckReasonCode removeSubscription(MqttClient client, MqttNetworkSession session, TopicFilter topicFilter) {
if (topicFilter.isInvalid()) {
return UnsubscribeAckReasonCode.TOPIC_FILTER_INVALID;
} else if (subscriberTree.unsubscribe(client, topicFilter)) {
Expand All @@ -119,7 +119,7 @@ private UnsubscribeAckReasonCode removeSubscription(MqttClient client, MqttSessi
}

@Override
public void cleanSubscriptions(MqttClient client, MqttSession session) {
public void cleanSubscriptions(MqttClient client, MqttNetworkSession session) {
Array<Subscription> subscriptions = session
.activeSubscriptions()
.subscriptions();
Expand All @@ -129,7 +129,7 @@ public void cleanSubscriptions(MqttClient client, MqttSession session) {
}

@Override
public void restoreSubscriptions(MqttClient client, MqttSession session) {
public void restoreSubscriptions(MqttClient client, MqttNetworkSession session) {
Array<Subscription> subscriptions = session
.activeSubscriptions()
.subscriptions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.message.in.MqttInMessage;
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.session.MqttSession;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.network.util.ExtraErrorReasons;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.message.handler.MqttInMessageHandler;
Expand Down Expand Up @@ -47,7 +47,7 @@ public void processValidMessage(MqttConnection connection, MqttInMessage mqttInM
C castedClient = expectedClient.cast(client);
M castedMessage = expectedNetworkPacket.cast(mqttInMessage);
if (requireSession()) {
MqttSession session = client.session();
MqttNetworkSession session = client.session();
if (session == null) {
log.warning(client.clientId(), "[%s] Session is already closed"::formatted);
handleSessionIsAlreadyClosed(client);
Expand All @@ -72,7 +72,7 @@ public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttI
C castedClient = expectedClient.cast(client);
M castedMessage = expectedNetworkPacket.cast(mqttInMessage);
if (requireSession()) {
MqttSession session = client.session();
MqttNetworkSession session = client.session();
if (session == null) {
log.warning(client.clientId(), "[%s] Session is already closed"::formatted);
handleSessionIsAlreadyClosed(client);
Expand All @@ -86,7 +86,7 @@ public void processInvalidMessage(MqttConnection connection, MqttInMessage mqttI

protected void processValidMessage(MqttConnection connection, C client, M message) {}

protected void processValidMessage(MqttConnection connection, C client, MqttSession session, M message) {}
protected void processValidMessage(MqttConnection connection, C client, MqttNetworkSession session, M message) {}

protected boolean processInvalidMessage(MqttConnection connection, C client, M message) {
Exception exception = message.exception();
Expand All @@ -97,7 +97,7 @@ protected boolean processInvalidMessage(MqttConnection connection, C client, M m
return false;
}

protected boolean processInvalidMessage(MqttConnection connection, C client, MqttSession session, M message) {
protected boolean processInvalidMessage(MqttConnection connection, C client, MqttNetworkSession session, M message) {
Exception exception = message.exception();
if (exception instanceof MalformedProtocolMqttException) {
malformedProtocolError(connection, client, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import javasabr.mqtt.network.impl.ExternalMqttClient;
import javasabr.mqtt.network.message.in.ConnectMqttInMessage;
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.session.MqttSession;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.service.AuthenticationService;
import javasabr.mqtt.service.ClientIdRegistry;
import javasabr.mqtt.service.MessageOutFactoryService;
Expand Down Expand Up @@ -178,7 +178,7 @@ private void resolveClientConnectionConfig(MqttClient.UnsafeMqttClient client, C
private Mono<Boolean> onConnected(
MqttClient.UnsafeMqttClient client,
ConnectMqttInMessage packet,
MqttSession session,
MqttNetworkSession session,
boolean sessionRestored) {

MqttConnection connection = client.connection();
Expand Down Expand Up @@ -211,7 +211,7 @@ private Mono<Boolean> onConnected(
.thenApply(result -> onSentConnAck(client, session, result)));
}

private boolean onSentConnAck(MqttClient.UnsafeMqttClient client, MqttSession session, boolean result) {
private boolean onSentConnAck(MqttClient.UnsafeMqttClient client, MqttNetworkSession session, boolean result) {

if (!result) {
log.warning(client.clientId(), "Was issue with sending conn ack packet to client:[%s]"::formatted);
Expand All @@ -226,7 +226,7 @@ private boolean onSentConnAck(MqttClient.UnsafeMqttClient client, MqttSession se
protected boolean processInvalidMessage(
MqttConnection connection,
ExternalMqttClient client,
MqttSession session,
MqttNetworkSession session,
ConnectMqttInMessage message) {
Exception exception = message.exception();
if (exception instanceof ConnectionRejectException cre) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.impl.ExternalMqttClient;
import javasabr.mqtt.network.message.in.DisconnectMqttInMessage;
import javasabr.mqtt.network.session.MqttSession;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.service.MessageOutFactoryService;
import lombok.CustomLog;

Expand All @@ -25,7 +25,7 @@ public MqttMessageType messageType() {
protected void processValidMessage(
MqttConnection connection,
ExternalMqttClient client,
MqttSession session,
MqttNetworkSession session,
DisconnectMqttInMessage message) {
DisconnectReasonCode reasonCode = message.reasonCode();
if (reasonCode == DisconnectReasonCode.NORMAL_DISCONNECTION) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package javasabr.mqtt.service.message.handler.impl;

import javasabr.mqtt.model.TrackableMessage;
import javasabr.mqtt.model.message.TrackableMqttMessage;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.impl.ExternalMqttClient;
import javasabr.mqtt.network.message.in.MqttInMessage;
import javasabr.mqtt.network.session.MqttSession;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.service.MessageOutFactoryService;

public abstract class PendingOutResponseMqttInMessageHandler<P extends MqttInMessage & TrackableMessage>
public abstract class PendingOutResponseMqttInMessageHandler<P extends MqttInMessage & TrackableMqttMessage>
extends AbstractMqttInMessageHandler<ExternalMqttClient, P> {

protected PendingOutResponseMqttInMessageHandler(
Expand All @@ -20,7 +20,7 @@ protected PendingOutResponseMqttInMessageHandler(
protected void processValidMessage(
MqttConnection connection,
ExternalMqttClient client,
MqttSession session,
MqttNetworkSession session,
P message) {
session.updateOutPendingPacket(client, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import javasabr.mqtt.network.impl.ExternalMqttClient;
import javasabr.mqtt.network.message.in.PublishMqttInMessage;
import javasabr.mqtt.network.message.out.MqttOutMessage;
import javasabr.mqtt.network.session.MqttSession;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.service.MessageOutFactoryService;
import javasabr.mqtt.service.PublishReceivingService;
import javasabr.mqtt.service.TopicService;
Expand Down Expand Up @@ -50,7 +50,7 @@ public MqttMessageType messageType() {
protected void processValidMessage(
MqttConnection connection,
ExternalMqttClient client,
MqttSession session,
MqttNetworkSession session,
PublishMqttInMessage publishMessage) {

if (!validateBaseFields(connection, client, publishMessage)) {
Expand Down Expand Up @@ -224,7 +224,7 @@ private void handleInvalidMessageExpiryInterval(ExternalMqttClient client) {

private void handleInvalidTopicName(
ExternalMqttClient client,
MqttSession session,
MqttNetworkSession session,
PublishMqttInMessage publishMessage) {
int messagedId = publishMessage.messageId();
MqttOutMessage response = messageOutFactoryService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.impl.ExternalMqttClient;
import javasabr.mqtt.network.message.in.PublishReleaseMqttInMessage;
import javasabr.mqtt.network.session.MqttSession;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.service.MessageOutFactoryService;
import lombok.AccessLevel;
import lombok.CustomLog;
Expand All @@ -32,7 +32,7 @@ public MqttMessageType messageType() {
protected void processValidMessage(
MqttConnection connection,
ExternalMqttClient client,
MqttSession session,
MqttNetworkSession session,
PublishReleaseMqttInMessage releaseMessage) {

int messageId = releaseMessage.messageId();
Expand Down
Loading
Loading