Skip to content

Commit

Permalink
Add support for MQTT message expiry interval
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitriy Barbul <dimabarbul@gmail.com>
  • Loading branch information
dimabarbul committed Sep 20, 2023
1 parent b80497d commit 9d0011a
Show file tree
Hide file tree
Showing 10 changed files with 444 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.model.mqtt;

import javax.annotation.Nullable;

/**
* This exception is thrown to indicate that the seconds of an MQTT
* message expiry interval is outside its allowed range.
*/
public final class IllegalMessageExpiryIntervalSecondsException extends Exception {

private static final long serialVersionUID = -566567001721859949L;

/**
* Constructs a {@code IllegalMessageExpiryIntervalSecondsException} for the specified detail message argument.
*
* @param detailMessage the detail message of the exception.
* @param cause the cause of the exception or {@code null} if unknown.
*/
IllegalMessageExpiryIntervalSecondsException(final String detailMessage, @Nullable final Throwable cause) {
super(detailMessage, cause);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.model.mqtt;

import java.util.Objects;
import java.util.OptionalLong;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

/**
* Representation of the MQTT 5 message expiry interval.
* The minimum seconds is {@value MIN_INTERVAL_SECONDS}.
* The maximum seconds is {@value MAX_INTERVAL_SECONDS}.
*/
@Immutable
public class MessageExpiryInterval {

public static final long MIN_INTERVAL_SECONDS = 1L;

public static final long MAX_INTERVAL_SECONDS = 4_294_967_295L;

@Nullable private final Long seconds;

private MessageExpiryInterval(@Nullable final Long seconds) {
this.seconds = seconds;
}

public static MessageExpiryInterval of(final long seconds)
throws IllegalMessageExpiryIntervalSecondsException {
if (seconds < MIN_INTERVAL_SECONDS || seconds > MAX_INTERVAL_SECONDS) {
throw new IllegalMessageExpiryIntervalSecondsException(
String.format("Expected message expiry interval seconds to be within [%d, %d] but it was <%d>.",
MIN_INTERVAL_SECONDS,
MAX_INTERVAL_SECONDS,
seconds),
null);
}

return new MessageExpiryInterval(seconds);
}

public static MessageExpiryInterval empty() {
return new MessageExpiryInterval(null);
}

public OptionalLong getAsOptionalLong() {
return seconds != null ?
OptionalLong.of(seconds) :
OptionalLong.empty();
}

@Override
public boolean equals(@Nullable final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final MessageExpiryInterval that = (MessageExpiryInterval) o;
return Objects.equals(seconds, that.seconds);
}

@Override
public int hashCode() {
return Objects.hash(seconds);
}

@Override
public String toString() {
return getClass().getSimpleName() + " [" +
"seconds=" + seconds +
"]";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.model.mqtt;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf;
import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;

import org.junit.Test;

import nl.jqno.equalsverifier.EqualsVerifier;

/**
* Unit test for {@link MessageExpiryInterval}.
*/
public final class MessageExpiryIntervalTest {

@Test
public void assertImmutability() {
assertInstancesOf(MessageExpiryInterval.class, areImmutable());
}

@Test
public void testHashCodeAndEquals() {
EqualsVerifier.forClass(MessageExpiryInterval.class)
.usingGetClass()
.verify();
}

@Test
public void ofWithMaxValueReturnsExpected()
throws IllegalMessageExpiryIntervalSecondsException {
final MessageExpiryInterval underTest = MessageExpiryInterval.of(MessageExpiryInterval.MAX_INTERVAL_SECONDS);

assertThat(underTest.getAsOptionalLong()).hasValue(MessageExpiryInterval.MAX_INTERVAL_SECONDS);
}

@Test
public void ofWithNegativeOutOfBoundsValueThrowsException() {
final long negativeOutOfBoundsSeconds = MessageExpiryInterval.MIN_INTERVAL_SECONDS - 1;

assertThatExceptionOfType(IllegalMessageExpiryIntervalSecondsException.class)
.isThrownBy(() -> MessageExpiryInterval.of(negativeOutOfBoundsSeconds))
.withMessageEndingWith("but it was <%d>.", negativeOutOfBoundsSeconds)
.withNoCause();
}

@Test
public void ofWithPositiveOutOfBoundsValueThrowsException() {
final long positiveOutOfBoundsSeconds = MessageExpiryInterval.MAX_INTERVAL_SECONDS + 1;

assertThatExceptionOfType(IllegalMessageExpiryIntervalSecondsException.class)
.isThrownBy(() -> MessageExpiryInterval.of(positiveOutOfBoundsSeconds))
.withMessageEndingWith("but it was <%d>.", positiveOutOfBoundsSeconds)
.withNoCause();
}

@Test
public void emptyReturnsEmpty() {
final MessageExpiryInterval underTest = MessageExpiryInterval.empty();

assertThat(underTest.getAsOptionalLong()).isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public enum MqttHeader {

MQTT_TOPIC("mqtt.topic"),
MQTT_QOS("mqtt.qos"),
MQTT_RETAIN("mqtt.retain");
MQTT_RETAIN("mqtt.retain"),
MQTT_MESSAGE_EXPIRY_INTERVAL("mqtt.message-expiry-interval");

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
Expand Down Expand Up @@ -158,6 +159,9 @@ private static Map<String, String> getHeadersAsMap(final GenericMqttPublish gene
result.put(MqttHeader.MQTT_QOS.getName(), getQosCodeAsString(genericMqttPublish));
result.put(MqttHeader.MQTT_RETAIN.getName(), getIsRetainAsString(genericMqttPublish));

getMessageExpiryIntervalAsString(genericMqttPublish)
.ifPresent(messageExpiryInterval -> result.put(MqttHeader.MQTT_MESSAGE_EXPIRY_INTERVAL.getName(), messageExpiryInterval));

genericMqttPublish.getCorrelationData()
.map(ByteBufferUtils::toUtf8String)
.ifPresent(correlationId -> result.put(DittoHeaderDefinition.CORRELATION_ID.getKey(), correlationId));
Expand Down Expand Up @@ -188,6 +192,13 @@ private static String getIsRetainAsString(final GenericMqttPublish genericMqttPu
return String.valueOf(genericMqttPublish.isRetain());
}

private static Optional<String> getMessageExpiryIntervalAsString(final GenericMqttPublish genericMqttPublish) {
final var messageExpiryInterval = genericMqttPublish.getMessageExpiryInterval().getAsOptionalLong();
return messageExpiryInterval.isPresent() ?
Optional.of(String.valueOf(messageExpiryInterval.getAsLong())) :
Optional.empty();
}

private ExternalMessage getExternalMessage(final GenericMqttPublish genericMqttPublish,
final Map<String, String> headers) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.LinkedHashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -27,6 +28,8 @@
import org.eclipse.ditto.base.model.common.ByteBufferUtils;
import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.connectivity.model.mqtt.IllegalMessageExpiryIntervalSecondsException;
import org.eclipse.ditto.connectivity.model.mqtt.MessageExpiryInterval;

import com.hivemq.client.mqtt.MqttVersion;
import com.hivemq.client.mqtt.datatypes.MqttQos;
Expand Down Expand Up @@ -109,6 +112,8 @@ public static Builder builder(final MqttTopic mqttTopic, final MqttQos mqttQos)
*/
public abstract boolean isRetain();

public abstract MessageExpiryInterval getMessageExpiryInterval();

public abstract Optional<ByteBuffer> getCorrelationData();

public abstract Optional<MqttTopic> getResponseTopic();
Expand Down Expand Up @@ -169,9 +174,16 @@ private static Mqtt3Publish getAsMqtt3Publish(final GenericMqttPublish genericMq
public abstract Mqtt5Publish getAsMqtt5Publish();

private static Mqtt5Publish getAsMqtt5Publish(final GenericMqttPublish genericMqttPublish) {
return Mqtt5Publish.builder()
var builder = Mqtt5Publish.builder()
.topic(genericMqttPublish.getTopic())
.qos(genericMqttPublish.getQos())
.qos(genericMqttPublish.getQos());

final var messageExpiryInterval = genericMqttPublish.getMessageExpiryInterval().getAsOptionalLong();
if (messageExpiryInterval.isPresent()) {
builder = builder.messageExpiryInterval(messageExpiryInterval.getAsLong());
}

return builder
.retain(genericMqttPublish.isRetain())
.payload(genericMqttPublish.getPayload().orElse(null))
.correlationData(genericMqttPublish.getCorrelationData().orElse(null))
Expand All @@ -196,6 +208,7 @@ public static final class Builder {
private final MqttTopic mqttTopic;
private final MqttQos mqttQos;
private boolean retain;
private MessageExpiryInterval messageExpiryInterval;
@Nullable private ByteBuffer payload;
@Nullable private ByteBuffer correlationData;
@Nullable private MqttTopic responseTopic;
Expand All @@ -206,6 +219,7 @@ private Builder(final MqttTopic mqttTopic, final MqttQos mqttQos) {
this.mqttTopic = ConditionChecker.checkNotNull(mqttTopic, "mqttTopic");
this.mqttQos = ConditionChecker.checkNotNull(mqttQos, "mqttQos");
retain = false;
messageExpiryInterval = MessageExpiryInterval.empty();
payload = null;
correlationData = null;
responseTopic = null;
Expand All @@ -224,6 +238,11 @@ public Builder retain(final boolean retain) {
return this;
}

public Builder messageExpiryInterval(final MessageExpiryInterval messageExpiryInterval) {
this.messageExpiryInterval = messageExpiryInterval;
return this;
}

/**
* Sets the optional payload.
*
Expand Down Expand Up @@ -275,6 +294,7 @@ private static final class FromScratchGenericMqttPublish extends GenericMqttPubl
private final MqttTopic mqttTopic;
private final MqttQos mqttQos;
private final boolean retain;
private final MessageExpiryInterval messageExpiryInterval;
@Nullable private final ByteBuffer payload;
@Nullable private final ByteBuffer correlationData;
@Nullable private final MqttTopic responseTopic;
Expand All @@ -285,6 +305,7 @@ private FromScratchGenericMqttPublish(final Builder builder) {
mqttTopic = builder.mqttTopic;
mqttQos = builder.mqttQos;
retain = builder.retain;
messageExpiryInterval = builder.messageExpiryInterval;
payload = builder.payload;
correlationData = builder.correlationData;
responseTopic = builder.responseTopic;
Expand All @@ -307,6 +328,11 @@ public boolean isRetain() {
return retain;
}

@Override
public MessageExpiryInterval getMessageExpiryInterval() {
return messageExpiryInterval;
}

@Override
public Optional<ByteBuffer> getCorrelationData() {
return Optional.ofNullable(correlationData);
Expand Down Expand Up @@ -368,6 +394,7 @@ public boolean equals(@Nullable final Object o) {
return Objects.equals(mqttTopic, that.mqttTopic) &&
mqttQos == that.mqttQos &&
retain == that.retain &&
Objects.equals(messageExpiryInterval, that.messageExpiryInterval) &&
Objects.equals(payload, that.payload) &&
Objects.equals(correlationData, that.correlationData) &&
Objects.equals(responseTopic, that.responseTopic) &&
Expand All @@ -380,6 +407,7 @@ public int hashCode() {
return Objects.hash(mqttTopic,
mqttQos,
retain,
messageExpiryInterval,
payload,
correlationData,
responseTopic,
Expand All @@ -393,6 +421,7 @@ public String toString() {
"mqttTopic=" + mqttTopic +
", mqttQos=" + mqttQos +
", retain=" + retain +
", messageExpiryInterval=" + messageExpiryInterval +
", payload=" + getPayloadAsHumanReadable().orElse(null) +
", correlationData=" + correlationData +
", responseTopic=" + responseTopic +
Expand Down Expand Up @@ -426,6 +455,11 @@ public boolean isRetain() {
return mqtt3Publish.isRetain();
}

@Override
public MessageExpiryInterval getMessageExpiryInterval() {
return MessageExpiryInterval.empty();
}

@Override
public Optional<ByteBuffer> getCorrelationData() {
return Optional.empty();
Expand Down Expand Up @@ -525,6 +559,20 @@ public boolean isRetain() {
return mqtt5Publish.isRetain();
}

@Override
public MessageExpiryInterval getMessageExpiryInterval() {
final var messageExpiryInterval = mqtt5Publish.getMessageExpiryInterval();
try {
return messageExpiryInterval.isPresent() ?
MessageExpiryInterval.of(messageExpiryInterval.getAsLong()) :
MessageExpiryInterval.empty();
} catch (IllegalMessageExpiryIntervalSecondsException e) {
// If we received message, assume message broker knows that it's correct, so don't drop it.
// But as we can't handle such values, fall back to empty message expiry interval.
return MessageExpiryInterval.empty();
}
}

@Override
public Optional<ByteBuffer> getCorrelationData() {
return mqtt5Publish.getCorrelationData();
Expand Down

0 comments on commit 9d0011a

Please sign in to comment.