Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import javasabr.mqtt.model.MqttProperties;
import javasabr.mqtt.model.MqttServerConnectionConfig;
import javasabr.mqtt.model.QoS;
import javasabr.mqtt.network.MqttClientFactory;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.MqttConnectionFactory;
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
import javasabr.mqtt.network.impl.ExternalMqttClient;
import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler;
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.network.user.NetworkMqttUserFactory;
import javasabr.mqtt.service.AuthenticationService;
import javasabr.mqtt.service.ClientIdRegistry;
import javasabr.mqtt.service.ConnectionService;
Expand All @@ -19,14 +19,14 @@
import javasabr.mqtt.service.PublishReceivingService;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.TopicService;
import javasabr.mqtt.service.handler.client.ExternalMqttClientReleaseHandler;
import javasabr.mqtt.service.handler.client.ExternalNetworkMqttUserReleaseHandler;
import javasabr.mqtt.service.impl.DefaultConnectionService;
import javasabr.mqtt.service.impl.DefaultMessageOutFactoryService;
import javasabr.mqtt.service.impl.DefaultMqttConnectionFactory;
import javasabr.mqtt.service.impl.DefaultPublishDeliveringService;
import javasabr.mqtt.service.impl.DefaultPublishReceivingService;
import javasabr.mqtt.service.impl.DefaultTopicService;
import javasabr.mqtt.service.impl.ExternalMqttClientFactory;
import javasabr.mqtt.service.impl.ExternalNetworkMqttUserFactory;
import javasabr.mqtt.service.impl.FileCredentialsSource;
import javasabr.mqtt.service.impl.InMemoryClientIdRegistry;
import javasabr.mqtt.service.impl.InMemorySubscriptionService;
Expand Down Expand Up @@ -195,7 +195,7 @@ MqttInMessageHandler unsubscribeMqttInMessageHandler(

@Bean
ConnectionService externalMqttConnectionService(Collection<? extends MqttInMessageHandler> inMessageHandlers) {
return new DefaultConnectionService(ExternalMqttClient.class, inMessageHandlers);
return new DefaultConnectionService(ExternalNetworkMqttUser.class, inMessageHandlers);
}

@Bean
Expand Down Expand Up @@ -265,11 +265,11 @@ PublishReceivingService publishReceivingService(
}

@Bean
MqttClientReleaseHandler externalMqttClientReleaseHandler(
NetworkMqttUserReleaseHandler externalMqttClientReleaseHandler(
ClientIdRegistry clientIdRegistry,
MqttSessionService sessionService,
SubscriptionService subscriptionService) {
return new ExternalMqttClientReleaseHandler(clientIdRegistry, sessionService, subscriptionService);
return new ExternalNetworkMqttUserReleaseHandler(clientIdRegistry, sessionService, subscriptionService);
}

@Bean
Expand Down Expand Up @@ -352,16 +352,16 @@ ServerNetworkConfig externalNetworkConfig(
}

@Bean
MqttClientFactory externalClientFactory(MqttClientReleaseHandler externalMqttClientReleaseHandler) {
return new ExternalMqttClientFactory(externalMqttClientReleaseHandler);
NetworkMqttUserFactory externalClientFactory(NetworkMqttUserReleaseHandler externalNetworkMqttUserReleaseHandler) {
return new ExternalNetworkMqttUserFactory(externalNetworkMqttUserReleaseHandler);
}

@Bean
MqttConnectionFactory externalConnectionFactory(
MqttServerConnectionConfig externalServerConnectionConfig,
MqttClientFactory externalClientFactory,
NetworkMqttUserFactory mqttUserFactory,
@Value("${mqtt.external.connection.max.packets.by.read:100}") int maxPacketsByRead) {
return new DefaultMqttConnectionFactory(externalServerConnectionConfig, externalClientFactory, maxPacketsByRead);
return new DefaultMqttConnectionFactory(externalServerConnectionConfig, mqttUserFactory, maxPacketsByRead);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import javasabr.mqtt.model.MqttServerConnectionConfig
import javasabr.mqtt.model.MqttVersion
import javasabr.mqtt.network.MqttConnection
import javasabr.mqtt.network.MqttMockClient
import javasabr.mqtt.network.user.ConfigurableNetworkMqttUser
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig
import spock.lang.Specification
Expand All @@ -18,8 +19,6 @@ import java.nio.charset.StandardCharsets
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference

import static javasabr.mqtt.network.MqttClient.UnsafeMqttClient

@SpringJUnitConfig(classes = MqttBrokerTestConfig)
class IntegrationSpecification extends Specification {

Expand Down Expand Up @@ -153,7 +152,7 @@ class IntegrationSpecification extends Specification {
isSupported(MqttVersion.MQTT_3_1_1) >> true
serverConnectionConfig() >> serverConnConfig
clientConnectionConfig() >> clientConnConfig
client() >> Stub(UnsafeMqttClient) {
user() >> Stub(ConfigurableNetworkMqttUser) {
connectionConfig() >> clientConnConfig
connection() >> connectionRef.get()
clientId() >> clientId
Expand Down Expand Up @@ -181,7 +180,7 @@ class IntegrationSpecification extends Specification {
isSupported(MqttVersion.MQTT_3_1_1) >> true
serverConnectionConfig() >> serverConnConfig
clientConnectionConfig() >> clientConnConfig
client() >> Stub(UnsafeMqttClient) {
user() >> Stub(ConfigurableNetworkMqttUser) {
connectionConfig() >> clientConnConfig
connection() >> connectionRef.get()
clientId() >> clientId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.hivemq.client.mqtt.datatypes.MqttQos
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5SubAckException
import javasabr.mqtt.broker.application.IntegrationSpecification
import javasabr.mqtt.model.topic.TopicName
import javasabr.mqtt.network.MqttClient
import javasabr.mqtt.network.user.NetworkMqttUser
import javasabr.mqtt.service.ClientIdRegistry
import javasabr.mqtt.service.impl.InMemorySubscriptionService
import org.springframework.beans.factory.annotation.Autowired
Expand Down Expand Up @@ -38,11 +38,11 @@ class SubscriptionServiceTest extends IntegrationSpecification {
.findSubscribers(topicName)
then: "should find the subscriber"
subscribers.size() == 1
subscribers.get(0).user() instanceof MqttClient
subscribers.get(0).user() instanceof NetworkMqttUser
when:
def matchedSubscriber = subscribers.get(0)
def subscription = matchedSubscriber.subscription()
def owner = matchedSubscriber.user() as MqttClient
def owner = matchedSubscriber.user() as NetworkMqttUser
then:
owner.clientId() == clientId
subscription.topicFilter().rawTopic() == topicFilter
Expand All @@ -61,11 +61,11 @@ class SubscriptionServiceTest extends IntegrationSpecification {
.findSubscribers(topicName)
then: "should find the reconnected subscriber"
subscribers3.size() == 1
subscribers3.get(0).user() instanceof MqttClient
subscribers3.get(0).user() instanceof NetworkMqttUser
when:
matchedSubscriber = subscribers3.get(0)
subscription = matchedSubscriber.subscription()
owner = matchedSubscriber.user() as MqttClient
owner = matchedSubscriber.user() as NetworkMqttUser
then:
owner.clientId() == clientId
subscription.topicFilter().rawTopic() == topicFilter
Expand Down Expand Up @@ -147,8 +147,8 @@ class SubscriptionServiceTest extends IntegrationSpecification {
def subscribers = subscriptionService.findSubscribers(TopicName.valueOf(topicName))
then:
subscribers.size() == targetCount
(subscribers[0].user() as MqttClient).clientId() == clientId1
(subscribers[1].user() as MqttClient).clientId() == clientId2
(subscribers[0].user() as NetworkMqttUser).clientId() == clientId1
(subscribers[1].user() as NetworkMqttUser).clientId() == clientId2
cleanup:
subscriber1.disconnect().join()
subscriber2.disconnect().join()
Expand Down
2 changes: 1 addition & 1 deletion application/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
</Console>
</Appenders>
<Loggers>
<Logger name="javasabr.mqtt.network.impl.AbstractMqttClient" level="DEBUG" additivity="false">
<Logger name="javasabr.mqtt.network.impl.AbstractNetworkMqttUser" level="DEBUG" additivity="false">
<AppenderRef ref="BrokerConsoleTest"/>
</Logger>
<Logger name="javasabr.mqtt.service.impl.DefaultConnectionService" level="DEBUG" additivity="false">
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package javasabr.mqtt.service;

import javasabr.mqtt.network.MqttClient;
import javasabr.mqtt.network.MqttConnection;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.mqtt.service.message.out.factory.MqttMessageOutFactory;

public interface MessageOutFactoryService {

MqttMessageOutFactory resolveFactory(MqttClient client);
MqttMessageOutFactory resolveFactory(NetworkMqttUser user);

MqttMessageOutFactory resolveFactory(MqttConnection connection);
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package javasabr.mqtt.service;

import javasabr.mqtt.model.publishing.Publish;
import javasabr.mqtt.network.MqttClient;
import javasabr.mqtt.network.user.NetworkMqttUser;

public interface PublishReceivingService {

void processPublish(MqttClient client, Publish publish);
void processPublish(NetworkMqttUser user, Publish publish);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import javasabr.mqtt.model.subscription.Subscription;
import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.network.MqttClient;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.network.MqttNetworkSession;
import javasabr.mqtt.network.user.NetworkMqttUser;
import javasabr.rlib.collections.array.Array;
import javasabr.rlib.collections.array.MutableArray;

Expand All @@ -17,7 +17,7 @@
*/
public interface SubscriptionService {

MqttClient resolveClient(Subscriber subscriber);
NetworkMqttUser resolveClient(Subscriber subscriber);

default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
return findSubscribersTo(MutableArray.ofType(SingleSubscriber.class), topicName);
Expand All @@ -28,22 +28,22 @@ default Array<SingleSubscriber> findSubscribers(TopicName topicName) {
/**
* Subscribes MQTT client to listen to topics.
*
* @param client MQTT client which requests subscriptions
* @param user MQTT client which requests subscriptions
* @param subscriptions the list of request to subscribe topics
* @return array of subscribe ack reason codes
*/
Array<SubscribeAckReasonCode> subscribe(MqttClient client, MqttNetworkSession session, Array<Subscription> subscriptions);
Array<SubscribeAckReasonCode> subscribe(NetworkMqttUser user, MqttNetworkSession session, Array<Subscription> subscriptions);

/**
* Removes MQTT client from listening to the topics.
*
* @param client MQTT client to be removed
* @param user MQTT client to be removed
* @param topicFilters topic filters
* @return array of unsubscribe ack reason codes
*/
Array<UnsubscribeAckReasonCode> unsubscribe(MqttClient client, MqttNetworkSession session, Array<TopicFilter> topicFilters);
Array<UnsubscribeAckReasonCode> unsubscribe(NetworkMqttUser user, MqttNetworkSession session, Array<TopicFilter> topicFilters);

void cleanSubscriptions(MqttClient client, MqttNetworkSession session);
void cleanSubscriptions(NetworkMqttUser user, MqttNetworkSession session);

void restoreSubscriptions(MqttClient client, MqttNetworkSession session);
void restoreSubscriptions(NetworkMqttUser user, MqttNetworkSession session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import javasabr.mqtt.model.topic.TopicFilter;
import javasabr.mqtt.model.topic.TopicName;
import javasabr.mqtt.network.MqttClient;
import javasabr.mqtt.network.user.NetworkMqttUser;

public interface TopicService {

TopicFilter createTopicFilter(MqttClient client, String rawTopicFilter);
TopicFilter createTopicFilter(NetworkMqttUser user, String rawTopicFilter);

boolean isValidTopicFilter(MqttClient client, String rawTopicFilter);
boolean isValidTopicFilter(NetworkMqttUser user, String rawTopicFilter);

TopicName createTopicName(MqttClient client, String rawTopicName);
TopicName createTopicName(NetworkMqttUser user, String rawTopicName);
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package javasabr.mqtt.service.handler.client;

import javasabr.mqtt.model.MqttClientConnectionConfig;
import javasabr.mqtt.network.MqttClient.UnsafeMqttClient;
import javasabr.mqtt.network.handler.MqttClientReleaseHandler;
import javasabr.mqtt.network.impl.AbstractMqttClient;
import javasabr.mqtt.network.session.MqttNetworkSession;
import javasabr.mqtt.network.MqttNetworkSession;
import javasabr.mqtt.network.handler.NetworkMqttUserReleaseHandler;
import javasabr.mqtt.network.impl.AbstractNetworkMqttUser;
import javasabr.mqtt.network.user.ConfigurableNetworkMqttUser;
import javasabr.mqtt.service.ClientIdRegistry;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.session.MqttSessionService;
Expand All @@ -18,40 +18,40 @@
@CustomLog
@RequiredArgsConstructor
@FieldDefaults(level = AccessLevel.PRIVATE, makeFinal = true)
public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttClient> implements
MqttClientReleaseHandler {
public abstract class AbstractNetworkMqttUserReleaseHandler<T extends AbstractNetworkMqttUser> implements
NetworkMqttUserReleaseHandler {

ClientIdRegistry clientIdRegistry;
MqttSessionService sessionService;
SubscriptionService subscriptionService;

@Override
public Mono<?> release(UnsafeMqttClient client) {
var clientId = client.clientId();
public Mono<?> release(ConfigurableNetworkMqttUser user) {
var clientId = user.clientId();
//noinspection unchecked
return releaseImpl((T) client)
return releaseImpl((T) user)
.doOnNext(_ -> log.info(clientId, "[%s] Client was released"::formatted));
}

protected Mono<?> releaseImpl(T client) {
protected Mono<?> releaseImpl(T user) {

String clientId = client.clientId();
client.clientId(StringUtils.EMPTY);
String clientId = user.clientId();
user.clientId(StringUtils.EMPTY);

if (StringUtils.isEmpty(clientId)) {
log.warning(client.clientId(), "[%s] This client is already released or rejected"::formatted);
log.warning(user.clientId(), "[%s] This client is already released or rejected"::formatted);
return Mono.empty();
}

MqttNetworkSession session = client.session();
MqttNetworkSession session = user.session();
Mono<?> asyncActions = null;

if (session != null) {
subscriptionService.cleanSubscriptions(client, session);
MqttClientConnectionConfig connectionConfig = client.connectionConfig();
subscriptionService.cleanSubscriptions(user, session);
MqttClientConnectionConfig connectionConfig = user.connectionConfig();
if (connectionConfig.sessionsEnabled()) {
asyncActions = sessionService.store(clientId, session, connectionConfig.sessionExpiryInterval());
client.session(null);
user.session(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package javasabr.mqtt.service.handler.client;

import javasabr.mqtt.network.impl.ExternalMqttClient;
import javasabr.mqtt.network.impl.ExternalNetworkMqttUser;
import javasabr.mqtt.service.ClientIdRegistry;
import javasabr.mqtt.service.SubscriptionService;
import javasabr.mqtt.service.session.MqttSessionService;

public class ExternalMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<ExternalMqttClient> {
public class ExternalNetworkMqttUserReleaseHandler extends
AbstractNetworkMqttUserReleaseHandler<ExternalNetworkMqttUser> {

public ExternalMqttClientReleaseHandler(
public ExternalNetworkMqttUserReleaseHandler(
ClientIdRegistry clientIdRegistry,
MqttSessionService sessionService,
SubscriptionService subscriptionService) {
Expand Down
Loading
Loading