Skip to content

Commit

Permalink
[MQTT 5] Avoid to pub retries on timeout (moquette-io#697)
Browse files Browse the repository at this point in the history
This PR stores the MQTT version into the Session instance and use that to keep the existing behavior for inflight resends (happening on a timeout basis on ACK received) in case the version is MQTT 3.1 or MQTT 3.1.1.
When the version of the Session is MQTT 5 it removes the resend on PUB ACK timeouts and switch to send only in case the same client reconnects with cleanStart = 0 and there is any peding publishes in the flight zone to get acknowledged.

To test this use the raw Client has been extended, so now can also subscribe and collect publish messages.
  • Loading branch information
andsel committed Feb 3, 2023
1 parent 140696c commit 319f9b7
Show file tree
Hide file tree
Showing 9 changed files with 341 additions and 53 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/maven_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ name: Java CI with Maven

on:
push:
branches: [ main ]
branches: [ main, mqtt5_development ]
pull_request:
branches: [ main ]
branches: [ main, mqtt5_development ]

jobs:
test:
Expand Down
8 changes: 8 additions & 0 deletions broker/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
https://github.com/netty/netty/blob/netty-4.1.85.Final/pom.xml#L594 -->
<netty.tcnative.version>2.0.54.Final</netty.tcnative.version>
<paho.version>1.2.5</paho.version>
<hivemqclient.version>1.3.0</hivemqclient.version>
<h2.version>2.1.212</h2.version>
</properties>

Expand Down Expand Up @@ -146,6 +147,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.hivemq</groupId>
<artifactId>hivemq-mqtt-client</artifactId>
<version>${hivemqclient.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.eclipse.jetty.websocket</groupId>
<artifactId>websocket-client</artifactId>
Expand Down
12 changes: 9 additions & 3 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ PostOffice.RouteResult processConnect(MqttConnectMessage msg) {
final String username = payload.userName();
LOG.trace("Processing CONNECT message. CId: {} username: {}", clientId, username);

if (isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) && isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1)) {
if (isNotProtocolVersion(msg, MqttVersion.MQTT_3_1) &&
isNotProtocolVersion(msg, MqttVersion.MQTT_3_1_1) &&
isNotProtocolVersion(msg, MqttVersion.MQTT_5)
) {
LOG.warn("MQTT protocol version is not valid. CId: {}", clientId);
abortConnection(CONNECTION_REFUSED_UNACCEPTABLE_PROTOCOL_VERSION);
return PostOffice.RouteResult.failed(clientId);
Expand Down Expand Up @@ -236,11 +239,14 @@ public void operationComplete(ChannelFuture future) throws Exception {
// OK continue with sending queued messages and normal flow

if (result.mode == SessionRegistry.CreationModeEnum.REOPEN_EXISTING) {
result.session.sendQueuedMessagesWhileOffline();
result.session.reconnectSession();
}

initializeKeepAliveTimeout(channel, msg, clientIdUsed);
setupInflightResender(channel);
if (isNotProtocolVersion(msg, MqttVersion.MQTT_5)) {
// In MQTT5 MQTT-4.4.0-1 avoid retries messages on timer base.
setupInflightResender(channel);
}

postOffice.dispatchConnection(msg);
LOG.trace("dispatch connection: {}", msg);
Expand Down
87 changes: 60 additions & 27 deletions broker/src/main/java/io/moquette/broker/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,20 @@
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

class Session {

Expand Down Expand Up @@ -95,22 +103,25 @@ static final class Will {
private MQTTConnection mqttConnection;
private final Set<Subscription> subscriptions = new HashSet<>();
private final Map<Integer, SessionRegistry.EnqueuedMessage> inflightWindow = new HashMap<>();
// used only in MQTT3 where resends are done on timeout of ACKs.
private final DelayQueue<InFlightPacket> inflightTimeouts = new DelayQueue<>();
private final Map<Integer, MqttPublishMessage> qos2Receiving = new HashMap<>();
private final AtomicInteger inflightSlots = new AtomicInteger(INFLIGHT_WINDOW_SIZE); // this should be configurable
private final boolean resendInflightOnTimeout;

Session(String clientId, boolean clean, Will will, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
this(clientId, clean, sessionQueue);
Session(String clientId, boolean clean, MqttVersion protocolVersion, Will will, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
this(clientId, clean, protocolVersion, sessionQueue);
this.will = will;
}

Session(String clientId, boolean clean, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
Session(String clientId, boolean clean, MqttVersion protocolVersion, SessionMessageQueue<SessionRegistry.EnqueuedMessage> sessionQueue) {
if (sessionQueue == null) {
throw new IllegalArgumentException("sessionQueue parameter can't be null");
}
this.clientId = clientId;
this.clean = clean;
this.sessionQueue = sessionQueue;
this.resendInflightOnTimeout = protocolVersion != MqttVersion.MQTT_5;
}

void update(boolean clean, Will will) {
Expand Down Expand Up @@ -211,7 +222,9 @@ public void processPubRec(int pubRecPacketId) {
return;
}
inflightWindow.put(pubRecPacketId, new SessionRegistry.PubRelMarker());
inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS));
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(pubRecPacketId, FLIGHT_BEFORE_RESEND_MS));
}
MqttMessage pubRel = MQTTConnection.pubrel(pubRecPacketId);
mqttConnection.sendIfWritableElseDrop(pubRel);

Expand Down Expand Up @@ -280,7 +293,9 @@ private void sendPublishQos1(Topic topic, MqttQoS qos, ByteBuf payload, boolean
old.release();
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));
}

MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(topic.toString(), qos,
payload, packetId);
Expand Down Expand Up @@ -314,8 +329,9 @@ private void sendPublishQos2(Topic topic, MqttQoS qos, ByteBuf payload, boolean
old.release();
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));

if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(packetId, FLIGHT_BEFORE_RESEND_MS));
}
MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(topic.toString(), qos,
payload, packetId);
localMqttConnectionRef.sendPublish(publishMsg);
Expand All @@ -337,7 +353,7 @@ private boolean canSkipQueue(MQTTConnection localMqttConnectionRef) {
localMqttConnectionRef.channel.isWritable();
}

private boolean inflighHasSlotsAndConnectionIsUp() {
private boolean inflightHasSlotsAndConnectionIsUp() {
return inflightSlots.get() > 0 &&
connected() &&
mqttConnection.channel.isWritable();
Expand All @@ -361,20 +377,30 @@ public void flushAllQueuedMessages() {
}

public void resendInflightNotAcked() {
Collection<InFlightPacket> expired = new ArrayList<>(INFLIGHT_WINDOW_SIZE);
inflightTimeouts.drainTo(expired);
Collection<Integer> nonAckPacketIds;
if (resendInflightOnTimeout) {
// MQTT3 behavior, resend on timeout
Collection<InFlightPacket> expired = new ArrayList<>(INFLIGHT_WINDOW_SIZE);
inflightTimeouts.drainTo(expired);
nonAckPacketIds = expired.stream().map(p -> p.packetId).collect(Collectors.toList());
} else {
// MQTT5 behavior resend only not acked present in reopened session.
nonAckPacketIds = inflightWindow.keySet();
}

debugLogPacketIds(expired);
debugLogPacketIds(nonAckPacketIds);

for (InFlightPacket notAckPacketId : expired) {
final SessionRegistry.EnqueuedMessage msg = inflightWindow.get(notAckPacketId.packetId);
for (Integer notAckPacketId : nonAckPacketIds) {
final SessionRegistry.EnqueuedMessage msg = inflightWindow.get(notAckPacketId);
if (msg == null) {
// Already acked...
continue;
}
if (msg instanceof SessionRegistry.PubRelMarker) {
MqttMessage pubRel = MQTTConnection.pubrel(notAckPacketId.packetId);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
MqttMessage pubRel = MQTTConnection.pubrel(notAckPacketId);
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(notAckPacketId, FLIGHT_BEFORE_RESEND_MS));
}
mqttConnection.sendIfWritableElseDrop(pubRel);
} else {
final SessionRegistry.PublishedMessage pubMsg = (SessionRegistry.PublishedMessage) msg;
Expand All @@ -383,34 +409,36 @@ public void resendInflightNotAcked() {
final ByteBuf payload = pubMsg.payload;
// message fetched from map, but not removed from map. No need to duplicate or release.
MqttPublishMessage publishMsg = publishNotRetainedDuplicated(notAckPacketId, topic, qos, payload);
inflightTimeouts.add(new InFlightPacket(notAckPacketId.packetId, FLIGHT_BEFORE_RESEND_MS));
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(notAckPacketId, FLIGHT_BEFORE_RESEND_MS));
}
mqttConnection.sendPublish(publishMsg);
}
}
}

private void debugLogPacketIds(Collection<InFlightPacket> expired) {
if (!LOG.isDebugEnabled() || expired.isEmpty()) {
private void debugLogPacketIds(Collection<Integer> packetIds) {
if (!LOG.isDebugEnabled() || packetIds.isEmpty()) {
return;
}

StringBuilder sb = new StringBuilder();
for (InFlightPacket packet : expired) {
sb.append(packet.packetId).append(", ");
for (Integer packetId : packetIds) {
sb.append(packetId).append(", ");
}
LOG.debug("Resending {} in flight packets [{}]", expired.size(), sb);
LOG.debug("Resending {} in flight packets [{}]", packetIds.size(), sb);
}

private MqttPublishMessage publishNotRetainedDuplicated(InFlightPacket notAckPacketId, Topic topic, MqttQoS qos,
private MqttPublishMessage publishNotRetainedDuplicated(int packetId, Topic topic, MqttQoS qos,
ByteBuf payload) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, true, qos, false, 0);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic.toString(), notAckPacketId.packetId);
MqttPublishVariableHeader varHeader = new MqttPublishVariableHeader(topic.toString(), packetId);
return new MqttPublishMessage(fixedHeader, varHeader, payload);
}

private void drainQueueToConnection() {
// consume the queue
while (!sessionQueue.isEmpty() && inflighHasSlotsAndConnectionIsUp()) {
while (!sessionQueue.isEmpty() && inflightHasSlotsAndConnectionIsUp()) {
final SessionRegistry.EnqueuedMessage msg = sessionQueue.dequeue();
if (msg == null) {
// Our message was already fetched by another Thread.
Expand All @@ -425,7 +453,9 @@ private void drainQueueToConnection() {
old.release();
inflightSlots.incrementAndGet();
}
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
if (resendInflightOnTimeout) {
inflightTimeouts.add(new InFlightPacket(sendPacketId, FLIGHT_BEFORE_RESEND_MS));
}
final SessionRegistry.PublishedMessage msgPub = (SessionRegistry.PublishedMessage) msg;
MqttPublishMessage publishMsg = MQTTConnection.createNotRetainedPublishMessage(
msgPub.topic.toString(),
Expand All @@ -441,8 +471,11 @@ public void writabilityChanged() {
drainQueueToConnection();
}

public void sendQueuedMessagesWhileOffline() {
public void reconnectSession() {
LOG.trace("Republishing all saved messages for session {}", this);
resendInflightNotAcked();

// send queued messages while offline
drainQueueToConnection();
}

Expand Down
10 changes: 7 additions & 3 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,7 +134,7 @@ private void recreateSessionPool() {
if (queueRepository.containsQueue(clientId)) {
final SessionMessageQueue<EnqueuedMessage> persistentQueue = queueRepository.getOrCreateQueue(clientId);
queues.remove(clientId);
Session rehydrated = new Session(clientId, false, persistentQueue);
Session rehydrated = new Session(clientId, false, MqttVersion.MQTT_3_1, persistentQueue);
pool.put(clientId, rehydrated);
}
}
Expand Down Expand Up @@ -234,11 +235,14 @@ private Session createNewSession(MqttConnectMessage msg, String clientId) {
} else {
queue = new InMemoryQueue();
}

final MqttVersion mqttVersion = Utils.versionFromConnect(msg);

if (msg.variableHeader().isWillFlag()) {
final Session.Will will = createWill(msg);
newSession = new Session(clientId, clean, will, queue);
newSession = new Session(clientId, clean, mqttVersion, will, queue);
} else {
newSession = new Session(clientId, clean, queue);
newSession = new Session(clientId, clean, mqttVersion, queue);
}

newSession.markConnecting();
Expand Down
7 changes: 7 additions & 0 deletions broker/src/main/java/io/moquette/broker/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package io.moquette.broker;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttVersion;

import java.util.Map;

/**
Expand Down Expand Up @@ -46,6 +49,10 @@ public static byte[] readBytesAndRewind(ByteBuf payload) {
return payloadContent;
}

public static MqttVersion versionFromConnect(MqttConnectMessage msg) {
return MqttVersion.fromProtocolNameAndLevel(msg.variableHeader().name(), (byte) msg.variableHeader().version());
}

private Utils() {
}
}
3 changes: 2 additions & 1 deletion broker/src/test/java/io/moquette/broker/SessionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttVersion;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -27,7 +28,7 @@ public class SessionTest {
public void setUp() {
testChannel = new EmbeddedChannel();
queuedMessages = new InMemoryQueue();
client = new Session(CLIENT_ID, true, null, queuedMessages);
client = new Session(CLIENT_ID, true, MqttVersion.MQTT_3_1, null, queuedMessages);
createConnection(client);
}

Expand Down
Loading

0 comments on commit 319f9b7

Please sign in to comment.