Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ allprojects {

ext {
annotationVersion = "17.0.0"
rlibVersion = "9.5.0"
rlibVersion = "9.6.0"
lombokVersion = '1.18.4'
springbootVersion = '2.2.0.RELEASE'
springVersion = '5.1.6.RELEASE'
Expand Down
61 changes: 51 additions & 10 deletions src/main/java/com/ss/mqtt/broker/config/MqttBrokerConfig.java
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
package com.ss.mqtt.broker.config;

import com.ss.mqtt.broker.handler.client.DefaultMqttClientReleaseHandler;
import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler;
import com.ss.mqtt.broker.handler.packet.in.*;
import com.ss.mqtt.broker.handler.publish.in.PublishInHandler;
import com.ss.mqtt.broker.handler.publish.in.Qos0PublishInHandler;
import com.ss.mqtt.broker.handler.publish.in.Qos1PublishInHandler;
import com.ss.mqtt.broker.handler.publish.in.Qos2PublishInHandler;
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
import com.ss.mqtt.broker.handler.publish.out.Qos0PublishOutHandler;
import com.ss.mqtt.broker.handler.publish.out.Qos1PublishOutHandler;
import com.ss.mqtt.broker.handler.publish.out.Qos2PublishOutHandler;
import com.ss.mqtt.broker.model.MqttPropertyConstants;
import com.ss.mqtt.broker.model.QoS;
import com.ss.mqtt.broker.network.MqttConnection;
import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler;
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
import com.ss.mqtt.broker.network.client.DeviceMqttClient;
import com.ss.mqtt.broker.handler.client.DeviceMqttClientReleaseHandler;
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
import com.ss.mqtt.broker.network.packet.PacketType;
import com.ss.mqtt.broker.service.*;
import com.ss.mqtt.broker.service.impl.*;
Expand Down Expand Up @@ -86,29 +94,33 @@ private interface ChannelFactory extends
@NotNull ClientIdRegistry clientIdRegistry,
@NotNull SubscriptionService subscriptionService,
@NotNull PublishingService publishingService,
@NotNull MqttSessionService mqttSessionService
@NotNull MqttSessionService mqttSessionService,
@NotNull PublishRetryService publishRetryService
) {

var handlers = new PacketInHandler[PacketType.INVALID.ordinal()];
handlers[PacketType.CONNECT.ordinal()] = new ConnectInPacketHandler(
clientIdRegistry,
authenticationService,
mqttSessionService
mqttSessionService,
publishRetryService
);
handlers[PacketType.SUBSCRIBE.ordinal()] = new SubscribeInPacketHandler(subscriptionService);
handlers[PacketType.UNSUBSCRIBE.ordinal()] = new UnsubscribeInPacketHandler(subscriptionService);
handlers[PacketType.PUBLISH.ordinal()] = new PublishInPacketHandler(publishingService);
handlers[PacketType.DISCONNECT.ordinal()] = new DisconnetInPacketHandler();
handlers[PacketType.PUBLISH_ACK.ordinal()] = new PublishAckInPacketHandler();

return handlers;
}

@Bean
@NotNull MqttClientReleaseHandler deviceMqttClientReleaseHandler(
@NotNull MqttClientReleaseHandler defaultMqttClientReleaseHandler(
@NotNull ClientIdRegistry clientIdRegistry,
@NotNull MqttSessionService mqttSessionService
@NotNull MqttSessionService mqttSessionService,
@NotNull PublishRetryService publishRetryService
) {
return new DeviceMqttClientReleaseHandler(clientIdRegistry, mqttSessionService);
return new DefaultMqttClientReleaseHandler(clientIdRegistry, mqttSessionService, publishRetryService);
}

@Bean
Expand All @@ -130,6 +142,14 @@ private interface ChannelFactory extends
);
}

@Bean
@NotNull PublishRetryService publishRetryService() {
return new DefaultPublishRetryService(
env.getProperty("publish.pending.check.interval", int.class, 60 * 1000),
env.getProperty("publish.retry.interval", int.class, 60 * 1000)
);
}

@Bean
@NotNull InetSocketAddress deviceNetworkAddress(
@NotNull ServerNetwork<@NotNull MqttConnection> deviceNetwork,
Expand All @@ -150,8 +170,29 @@ private interface ChannelFactory extends
}

@Bean
@NotNull PublishingService publishingService(@NotNull SubscriptionService subscriptionService) {
return new SimplePublishingService(subscriptionService);
@NotNull PublishOutHandler[] publishOutHandlers() {
return new PublishOutHandler[] {
new Qos0PublishOutHandler(),
new Qos1PublishOutHandler(),
new Qos2PublishOutHandler(),
};
}

@Bean
@NotNull PublishInHandler[] publishInHandlers(
@NotNull SubscriptionService subscriptionService,
@NotNull PublishOutHandler[] publishOutHandlers
) {
return new PublishInHandler[] {
new Qos0PublishInHandler(subscriptionService, publishOutHandlers),
new Qos1PublishInHandler(subscriptionService, publishOutHandlers),
new Qos2PublishInHandler(subscriptionService, publishOutHandlers),
};
}

@Bean
@NotNull PublishingService publishingService(@NotNull PublishInHandler[] publishInHandlers) {
return new DefaultPublishingService(publishInHandlers);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.ss.mqtt.broker.config;

import com.ss.mqtt.broker.model.QoS;
import lombok.Data;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class Mqtt311PacketOutFactory extends MqttPacketOutFactory {
}

@Override
public @NotNull MqttWritablePacket newPublish(
public @NotNull PublishOutPacket newPublish(
@NotNull MqttClient client,
int packetId,
@NotNull QoS qos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class Mqtt5PacketOutFactory extends Mqtt311PacketOutFactory {
}

@Override
public @NotNull MqttWritablePacket newPublish(
public @NotNull PublishOutPacket newPublish(
@NotNull MqttClient client,
int packetId,
@NotNull QoS qos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.ss.mqtt.broker.model.reason.code.*;
import com.ss.mqtt.broker.network.client.MqttClient;
import com.ss.mqtt.broker.network.packet.out.MqttWritablePacket;
import com.ss.mqtt.broker.network.packet.out.PublishOutPacket;
import com.ss.rlib.common.util.ArrayUtils;
import com.ss.rlib.common.util.StringUtils;
import com.ss.rlib.common.util.array.Array;
Expand Down Expand Up @@ -76,7 +77,7 @@ public abstract class MqttPacketOutFactory {
}


public @NotNull MqttWritablePacket newPublish(
public @NotNull PublishOutPacket newPublish(
@NotNull MqttClient client,
int packetId,
@NotNull QoS qos,
Expand All @@ -101,7 +102,7 @@ public abstract class MqttPacketOutFactory {
);
}

public abstract @NotNull MqttWritablePacket newPublish(
public abstract @NotNull PublishOutPacket newPublish(
@NotNull MqttClient client,
int packetId,
@NotNull QoS qos,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.ss.mqtt.broker.network.client.AbstractMqttClient;
import com.ss.mqtt.broker.service.ClientIdRegistry;
import com.ss.mqtt.broker.service.MqttSessionService;
import com.ss.mqtt.broker.service.PublishRetryService;
import com.ss.rlib.common.util.StringUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
Expand All @@ -17,6 +18,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli

private final @NotNull ClientIdRegistry clientIdRegistry;
private final @NotNull MqttSessionService sessionService;
private final @NotNull PublishRetryService publishRetryService;

@Override
public @NotNull Mono<?> release(@NotNull UnsafeMqttClient client) {
Expand All @@ -25,6 +27,7 @@ public abstract class AbstractMqttClientReleaseHandler<T extends AbstractMqttCli
}

protected @NotNull Mono<?> releaseImpl(@NotNull T client) {
publishRetryService.unregister(client);

var clientId = client.getClientId();
client.setClientId(StringUtils.EMPTY);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ss.mqtt.broker.handler.client;

import com.ss.mqtt.broker.network.client.DeviceMqttClient;
import com.ss.mqtt.broker.service.ClientIdRegistry;
import com.ss.mqtt.broker.service.MqttSessionService;
import com.ss.mqtt.broker.service.PublishRetryService;
import org.jetbrains.annotations.NotNull;

public class DefaultMqttClientReleaseHandler extends AbstractMqttClientReleaseHandler<DeviceMqttClient> {

public DefaultMqttClientReleaseHandler(
@NotNull ClientIdRegistry clientIdRegistry,
@NotNull MqttSessionService sessionService,
@NotNull PublishRetryService publishRetryService
) {
super(clientIdRegistry, sessionService, publishRetryService);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.ss.mqtt.broker.service.AuthenticationService;
import com.ss.mqtt.broker.service.ClientIdRegistry;
import com.ss.mqtt.broker.service.MqttSessionService;
import com.ss.mqtt.broker.service.PublishRetryService;
import com.ss.rlib.common.util.StringUtils;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
Expand All @@ -21,9 +22,10 @@
@RequiredArgsConstructor
public class ConnectInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, ConnectInPacket> {

private final ClientIdRegistry clientIdRegistry;
private final AuthenticationService authenticationService;
private final MqttSessionService mqttSessionService;
private final @NotNull ClientIdRegistry clientIdRegistry;
private final @NotNull AuthenticationService authenticationService;
private final @NotNull MqttSessionService mqttSessionService;
private final @NotNull PublishRetryService publishRetryService;

@Override
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull ConnectInPacket packet) {
Expand Down Expand Up @@ -135,6 +137,8 @@ private Mono<Boolean> onConnected(
packet.getReceiveMax()
));

publishRetryService.register(client);

return Mono.just(Boolean.TRUE);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.ss.mqtt.broker.handler.packet.in;

import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
import com.ss.mqtt.broker.network.packet.in.PublishAckInPacket;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;

@RequiredArgsConstructor
public class PublishAckInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, PublishAckInPacket> {

@Override
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishAckInPacket packet) {
client.getSession().updatePendingPacket(client, packet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,6 @@ public class PublishInPacketHandler extends AbstractPacketHandler<UnsafeMqttClie

@Override
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull PublishInPacket packet) {

var ackReasonCode = publishingService.publish(packet);

client.send(client.getPacketOutFactory().newPublishAck(
client,
packet.getPacketId(),
ackReasonCode
));
publishingService.publish(client, packet);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.ss.mqtt.broker.handler.publish.in;

import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
import com.ss.mqtt.broker.model.QoS;
import com.ss.mqtt.broker.service.SubscriptionService;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;

@RequiredArgsConstructor
abstract class AbstractPublishInHandler implements PublishInHandler {

protected final @NotNull SubscriptionService subscriptionService;
protected final @NotNull PublishOutHandler[] publishOutHandlers;

protected @NotNull PublishOutHandler publishOutHandler(@NotNull QoS qos) {
return publishOutHandlers[qos.ordinal()];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.ss.mqtt.broker.handler.publish.in;

import com.ss.mqtt.broker.network.client.MqttClient;
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
import org.jetbrains.annotations.NotNull;

/**
* Interface to handle incoming publish packets.
*/
public interface PublishInHandler {

void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.ss.mqtt.broker.handler.publish.in;

import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
import com.ss.mqtt.broker.network.client.MqttClient;
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
import com.ss.mqtt.broker.service.SubscriptionService;
import org.jetbrains.annotations.NotNull;

public class Qos0PublishInHandler extends AbstractPublishInHandler {

public Qos0PublishInHandler(
@NotNull SubscriptionService subscriptionService,
@NotNull PublishOutHandler[] publishOutHandlers
) {
super(subscriptionService, publishOutHandlers);
}

@Override
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {

var subscribers = subscriptionService.getSubscribers(packet.getTopicName());

for (var subscriber : subscribers) {
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.ss.mqtt.broker.handler.publish.in;

import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
import com.ss.mqtt.broker.model.reason.code.PublishAckReasonCode;
import com.ss.mqtt.broker.network.client.MqttClient;
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
import com.ss.mqtt.broker.service.SubscriptionService;
import org.jetbrains.annotations.NotNull;

public class Qos1PublishInHandler extends AbstractPublishInHandler {

public Qos1PublishInHandler(
@NotNull SubscriptionService subscriptionService,
@NotNull PublishOutHandler[] publishOutHandlers
) {
super(subscriptionService, publishOutHandlers);
}

@Override
public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet) {

var subscribers = subscriptionService.getSubscribers(packet.getTopicName());

for (var subscriber : subscribers) {
publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
}

var reasonCode = subscribers.isEmpty() ?
PublishAckReasonCode.NO_MATCHING_SUBSCRIBERS : PublishAckReasonCode.SUCCESS;

var ackPacket = client.getPacketOutFactory().newPublishAck(
client,
packet.getPacketId(),
reasonCode
);

client.send(ackPacket);
}
}
Loading