Skip to content

Commit

Permalink
Do not include known MQTT headers in outgoing message based on featur…
Browse files Browse the repository at this point in the history
…e toggle
  • Loading branch information
dimabarbul committed Oct 5, 2023
1 parent efddf04 commit 94bbc67
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ public final class FeatureToggle {
*/
public static final String HISTORICAL_APIS_ENABLED = "ditto.devops.feature.historical-apis-enabled";

/**
* System property name of the property defining whether the known MQTT headers (e.g., mqtt.topic) are preserved in outgoing message.
* @since 3.4.0
*/
public static final String PRESERVE_KNOWN_MQTT_HEADERS_ENABLED = "ditto.devops.feature.preserve-known-mqtt-headers-enabled";

/**
* Resolves the system property {@value MERGE_THINGS_ENABLED}.
*/
Expand All @@ -54,6 +60,11 @@ public final class FeatureToggle {
*/
private static final boolean IS_HISTORICAL_APIS_ENABLED = resolveProperty(HISTORICAL_APIS_ENABLED);

/**
* Resolves the system property {@value PRESERVE_KNOWN_MQTT_HEADERS_ENABLED}.
*/
private static final boolean IS_PRESERVE_KNOWN_MQTT_HEADERS_ENABLED = resolveProperty(PRESERVE_KNOWN_MQTT_HEADERS_ENABLED);

private static boolean resolveProperty(final String propertyName) {
final String propertyValue = System.getProperty(propertyName, Boolean.TRUE.toString());
return !Boolean.FALSE.toString().equalsIgnoreCase(propertyValue);
Expand Down Expand Up @@ -143,4 +154,15 @@ public static DittoHeaders checkHistoricalApiAccessFeatureEnabled(final String s
public static boolean isHistoricalApiAccessFeatureEnabled() {
return IS_HISTORICAL_APIS_ENABLED;
}

/**
* Returns whether the known MQTT headers are preserved in outgoing message based on the system property
* {@value PRESERVE_KNOWN_MQTT_HEADERS_ENABLED}.
*
* @return whether the known MQTT headers are preserved or not.
* @since 3.4.0
*/
public static boolean isPreserveKnownMqttHeadersFeatureEnabled() {
return IS_PRESERVE_KNOWN_MQTT_HEADERS_ENABLED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ public static List<String> getHeaderNames(final MqttVersion mqttVersion) {
.toList();
}

/**
* @return list of header names that are available in at least one MQTT version
*/
public static List<String> getAllHeaderNames() {
return Arrays.stream(values()).map(MqttHeader::getName).toList();
}

/**
* @param mqttVersion MQTT version to check
* @return true if the header is available in provided MQTT version otherwise false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
final class MqttPublishToExternalMessageTransformer {

private static final Set<String> KNOWN_MQTT_HEADER_NAMES = Stream.concat(
MqttHeader.getHeaderNames().stream(),
MqttHeader.getAllHeaderNames().stream(),
Stream.of(DittoHeaderDefinition.CONTENT_TYPE.getKey(),
DittoHeaderDefinition.CORRELATION_ID.getKey(),
DittoHeaderDefinition.REPLY_TO.getKey())
Expand Down Expand Up @@ -185,16 +185,12 @@ private static Map<String, String> getHeadersAsMap(final GenericMqttPublish gene
.ifPresent(contentType -> result.put(DittoHeaderDefinition.CONTENT_TYPE.getKey(), contentType));

genericMqttPublish.userProperties()
.filter(MqttPublishToExternalMessageTransformer::notKnownMqttHeader)
.filter(userProperty -> !KNOWN_MQTT_HEADER_NAMES.contains(userProperty.name()))
.forEach(userProperty -> result.put(userProperty.name(), userProperty.value()));

return result;
}

private static boolean notKnownMqttHeader(UserProperty userProperty) {
return !KNOWN_MQTT_HEADER_NAMES.contains(userProperty.name());
}

private static String getTopicAsString(final GenericMqttPublish genericMqttPublish) {
return String.valueOf(genericMqttPublish.getTopic());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

import org.eclipse.ditto.base.model.common.CharsetDeterminer;
import org.eclipse.ditto.base.model.common.ConditionChecker;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.signals.FeatureToggle;
import org.eclipse.ditto.connectivity.api.ExternalMessage;
import org.eclipse.ditto.connectivity.model.mqtt.IllegalMessageExpiryIntervalSecondsException;
import org.eclipse.ditto.connectivity.model.mqtt.MessageExpiryInterval;
Expand All @@ -49,19 +51,14 @@
@Immutable
final class ExternalMessageToMqttPublishTransformer {

/*
* Actually it would be correct to also include MqttHeader.MQTT_TOPIC,
* MqttHeader.MQTT_QOS and MqttHeader.MQTT_RETAIN in the set of the known
* MQTT header names because they are already dedicated properties in the
* MQTT Publish.
* However, as the named headers were included in user properties
* up to the present, there might be users who rely on their presence.
* Excluding the headers from user properties would then break
* functionality.
*/
private static final Set<String> KNOWN_MQTT_HEADER_NAMES = Set.of(DittoHeaderDefinition.CORRELATION_ID.getKey(),
ExternalMessage.REPLY_TO_HEADER,
ExternalMessage.CONTENT_TYPE_HEADER);
private static final Set<String> KNOWN_MQTT_HEADER_NAMES = Stream.concat(
Stream.of(DittoHeaderDefinition.CORRELATION_ID.getKey(),
ExternalMessage.REPLY_TO_HEADER,
ExternalMessage.CONTENT_TYPE_HEADER),
FeatureToggle.isPreserveKnownMqttHeadersFeatureEnabled() ?
Stream.empty() :
MqttHeader.getAllHeaderNames().stream())
.collect(Collectors.toSet());

private ExternalMessageToMqttPublishTransformer() {
throw new AssertionError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,16 @@ public void getHeaderNamesReturnsExpectedHeadersForMqtt5() {

softly.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
}

@Test
public void getAllHeaderNamesReturnsExpectedHeaders() {
final var expected = List.of(
MqttHeader.MQTT_TOPIC.getName(),
MqttHeader.MQTT_QOS.getName(),
MqttHeader.MQTT_RETAIN.getName(),
MqttHeader.MQTT_MESSAGE_EXPIRY_INTERVAL.getName());
final var actual = MqttHeader.getAllHeaderNames();

softly.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ public void consumeFromTopicAndRetrieveConnectionMetrics() {
testKit.expectNoMessage();
final var modifyThing = commandForwarder.expectMsgClass(ModifyThing.class);
assertThat(modifyThing.getDittoHeaders())
.doesNotContainKeys(MqttHeader.getHeaderNames(MqttVersion.MQTT_5_0).toArray(String[]::new));
.doesNotContainKeys(MqttHeader.getAllHeaderNames().toArray(String[]::new));

underTest.tell(RetrieveConnectionMetrics.of(CONNECTION_ID, dittoHeadersWithCorrelationId), testKit.getRef());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.junit.Test;
import org.mockito.Mockito;

import com.hivemq.client.mqtt.MqttVersion;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.datatypes.MqttTopic;
import com.hivemq.client.mqtt.mqtt3.message.publish.Mqtt3Publish;
Expand Down Expand Up @@ -341,22 +342,23 @@ public void transformMqtt5PublishWithKnownHeadersAsUserPropertiesReturnsExpected
MqttPublishToExternalMessageTransformer.newInstance(SOURCE_ADDRESS, SOURCE_WITHOUT_ENFORCEMENT);
final var userPropertyValue = "user property";

var userProperties = Stream.concat(
MqttHeader.getHeaderNames(MqttVersion.MQTT_5_0)
.stream()
.map(specialHeaderName -> Mqtt5UserProperty.of(specialHeaderName, userPropertyValue)),
Stream.of(
Mqtt5UserProperty.of(DittoHeaderDefinition.CONTENT_TYPE.getKey(), userPropertyValue),
Mqtt5UserProperty.of(DittoHeaderDefinition.REPLY_TO.getKey(), userPropertyValue),
Mqtt5UserProperty.of(DittoHeaderDefinition.CORRELATION_ID.getKey(), userPropertyValue))
).collect(Collectors.toList());
final var transformationResult = underTest.transform(GenericMqttPublish.ofMqtt5Publish(Mqtt5Publish.builder()
.topic(MQTT_TOPIC)
.qos(MQTT_QOS)
.retain(RETAIN)
.correlationData(ByteBufferUtils.fromUtf8String(CORRELATION_ID))
.responseTopic(RESPONSE_TOPIC)
.contentType(CONTENT_TYPE)
.userProperties(Mqtt5UserProperties.of(Stream.concat(
MqttHeader.getHeaderNames()
.stream()
.map(specialHeaderName -> Mqtt5UserProperty.of(specialHeaderName, userPropertyValue)),
Stream.of(
Mqtt5UserProperty.of(DittoHeaderDefinition.CONTENT_TYPE.getKey(), userPropertyValue),
Mqtt5UserProperty.of(DittoHeaderDefinition.REPLY_TO.getKey(), userPropertyValue),
Mqtt5UserProperty.of(DittoHeaderDefinition.CORRELATION_ID.getKey(), userPropertyValue))
).collect(Collectors.toList())))
.userProperties(Mqtt5UserProperties.of(userProperties))
.payload(PAYLOAD)
.build()));

Expand Down
4 changes: 4 additions & 0 deletions internal/utils/config/src/main/resources/ditto-devops.conf
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,9 @@ ditto.devops {
// enables/disables the historical API access feature
historical-apis-enabled = true
historical-apis-enabled = ${?DITTO_DEVOPS_FEATURE_HISTORICAL_APIS_ENABLED}

// enables/disables the preserving known MQTT headers in outgoing message feature
preserve-known-mqtt-headers-enabled = true
preserve-known-mqtt-headers-enabled = ${?DITTO_DEVOPS_FEATURE_PRESERVE_KNOWN_MQTT_HEADERS_ENABLED}
}
}

0 comments on commit 94bbc67

Please sign in to comment.