Skip to content

Commit

Permalink
Do not allow MQTT 5 user properties to overwrite special headers
Browse files Browse the repository at this point in the history
  • Loading branch information
dimabarbul committed Oct 4, 2023
1 parent dae3369 commit efddf04
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
Expand All @@ -41,6 +44,7 @@
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.TransformationFailure;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.TransformationResult;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.TransformationSuccess;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.publish.UserProperty;
import org.eclipse.ditto.connectivity.service.placeholders.ConnectivityPlaceholders;
import org.eclipse.ditto.placeholders.PlaceholderFactory;

Expand All @@ -53,6 +57,13 @@
@NotThreadSafe
final class MqttPublishToExternalMessageTransformer {

private static final Set<String> KNOWN_MQTT_HEADER_NAMES = Stream.concat(
MqttHeader.getHeaderNames().stream(),
Stream.of(DittoHeaderDefinition.CONTENT_TYPE.getKey(),
DittoHeaderDefinition.CORRELATION_ID.getKey(),
DittoHeaderDefinition.REPLY_TO.getKey())
).collect(Collectors.toSet());

private final String sourceAddress;
private final Source connectionSource;
@Nullable private final EnforcementFilterFactory<Map<String, String>, Signal<?>> headerEnforcementFilterFactory;
Expand Down Expand Up @@ -174,11 +185,16 @@ private static Map<String, String> getHeadersAsMap(final GenericMqttPublish gene
.ifPresent(contentType -> result.put(DittoHeaderDefinition.CONTENT_TYPE.getKey(), contentType));

genericMqttPublish.userProperties()
.filter(MqttPublishToExternalMessageTransformer::notKnownMqttHeader)
.forEach(userProperty -> result.put(userProperty.name(), userProperty.value()));

return result;
}

private static boolean notKnownMqttHeader(UserProperty userProperty) {
return !KNOWN_MQTT_HEADER_NAMES.contains(userProperty.name());
}

private static String getTopicAsString(final GenericMqttPublish genericMqttPublish) {
return String.valueOf(genericMqttPublish.getTopic());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,4 +335,47 @@ public void transformMqtt5PublishWithThrownDittoRuntimeExceptionReturnsTransform
.hasCauseInstanceOf(UnresolvedPlaceholderException.class);
}

@Test
public void transformMqtt5PublishWithKnownHeadersAsUserPropertiesReturnsExpectedTransformationSuccess() {
final var underTest =
MqttPublishToExternalMessageTransformer.newInstance(SOURCE_ADDRESS, SOURCE_WITHOUT_ENFORCEMENT);
final var userPropertyValue = "user property";

final var transformationResult = underTest.transform(GenericMqttPublish.ofMqtt5Publish(Mqtt5Publish.builder()
.topic(MQTT_TOPIC)
.qos(MQTT_QOS)
.retain(RETAIN)
.correlationData(ByteBufferUtils.fromUtf8String(CORRELATION_ID))
.responseTopic(RESPONSE_TOPIC)
.contentType(CONTENT_TYPE)
.userProperties(Mqtt5UserProperties.of(Stream.concat(
MqttHeader.getHeaderNames()
.stream()
.map(specialHeaderName -> Mqtt5UserProperty.of(specialHeaderName, userPropertyValue)),
Stream.of(
Mqtt5UserProperty.of(DittoHeaderDefinition.CONTENT_TYPE.getKey(), userPropertyValue),
Mqtt5UserProperty.of(DittoHeaderDefinition.REPLY_TO.getKey(), userPropertyValue),
Mqtt5UserProperty.of(DittoHeaderDefinition.CORRELATION_ID.getKey(), userPropertyValue))
).collect(Collectors.toList())))
.payload(PAYLOAD)
.build()));

assertThat(transformationResult.getSuccessValueOrThrow())
.isEqualTo(ExternalMessageFactory.newExternalMessageBuilder(
Stream.of(
Map.entry(MqttHeader.MQTT_TOPIC.getName(), MQTT_TOPIC.toString()),
Map.entry(MqttHeader.MQTT_QOS.getName(), String.valueOf(MQTT_QOS.getCode())),
Map.entry(MqttHeader.MQTT_RETAIN.getName(), String.valueOf(RETAIN)),
Map.entry(DittoHeaderDefinition.CORRELATION_ID.getKey(), CORRELATION_ID),
Map.entry(DittoHeaderDefinition.REPLY_TO.getKey(), RESPONSE_TOPIC.toString()),
Map.entry(DittoHeaderDefinition.CONTENT_TYPE.getKey(), CONTENT_TYPE)
).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)))
.withTextAndBytes(ByteBufferUtils.toUtf8String(PAYLOAD), PAYLOAD)
.withAuthorizationContext(AUTHORIZATION_CONTEXT)
.withSourceAddress(SOURCE_ADDRESS)
.withHeaderMapping(HEADER_MAPPING)
.withPayloadMapping(PAYLOAD_MAPPING)
.build());
}

}

0 comments on commit efddf04

Please sign in to comment.