Skip to content

Commit

Permalink
Add MQTT version information to MQTT headers
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitriy Barbul <dimabarbul@gmail.com>
  • Loading branch information
dimabarbul committed Sep 20, 2023
1 parent 9d0011a commit 2114122
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import org.apache.pekko.actor.ActorSystem;

import com.hivemq.client.mqtt.MqttVersion;

/**
* Connection specification for Mqtt 3.1.1 protocol.
*/
Expand Down Expand Up @@ -150,12 +152,12 @@ private boolean containsOnlyAllowedTargetMappings(final HeaderMapping headerMapp
}

private static void checkIfKeyIsAllowed(final String key, final DittoHeaders dittoHeaders) {
if (!MqttHeader.getHeaderNames().contains(key)) {
if (!MqttHeader.getHeaderNames(MqttVersion.MQTT_3_1_1).contains(key)) {
final String message = String.format("The header '%s' is not allowed in MQTT 3.1.1 target header mapping.",
key);
final String description = String.format(
"The following headers are allowed and are directly applied to the published MQTT message: %s",
MqttHeader.getHeaderNames());
MqttHeader.getHeaderNames(MqttVersion.MQTT_3_1_1));
throw ConnectionConfigurationInvalidException
.newBuilder(message)
.description(description)
Expand Down Expand Up @@ -196,12 +198,12 @@ public String getPrefix() {

@Override
public List<String> getSupportedNames() {
return MqttHeader.getHeaderNames();
return MqttHeader.getHeaderNames(MqttVersion.MQTT_3_1_1);
}

@Override
public boolean supports(final String name) {
return MqttHeader.getHeaderNames().contains(name);
return MqttHeader.getHeaderNames(MqttVersion.MQTT_3_1_1).contains(name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,34 @@
import java.util.Arrays;
import java.util.List;

import com.hivemq.client.mqtt.MqttVersion;

/**
* Defines well-known MQTT properties that should be extracted from consumed mqtt messages and made available for
* source header mappings. E.g. the mqtt topic on which the message was received is available in the header
* {@code mqtt.topic}.
*/
public enum MqttHeader {

MQTT_TOPIC("mqtt.topic"),
MQTT_QOS("mqtt.qos"),
MQTT_RETAIN("mqtt.retain"),
MQTT_MESSAGE_EXPIRY_INTERVAL("mqtt.message-expiry-interval");
MQTT_TOPIC("mqtt.topic", MqttVersion.MQTT_3_1_1),
MQTT_QOS("mqtt.qos", MqttVersion.MQTT_3_1_1),
MQTT_RETAIN("mqtt.retain", MqttVersion.MQTT_3_1_1),
MQTT_MESSAGE_EXPIRY_INTERVAL("mqtt.message-expiry-interval", MqttVersion.MQTT_5_0);

private final String name;

/**
* MQTT version where the header was introduced.
*/
private final MqttVersion mqttVersion;

/**
* @param name the header name to be used in source header mappings
* @param mqttVersion MQTT version where the header was introduced
*/
MqttHeader(final String name) {
MqttHeader(final String name, final MqttVersion mqttVersion) {
this.name = name;
this.mqttVersion = mqttVersion;
}

/**
Expand All @@ -44,10 +53,22 @@ public String getName() {
}

/**
* @return list of default header names used for mqtt sources
@param mqttVersion MQTT version to get headers for
* @return list of header names that are available in provided MQTT version
*/
public static List<String> getHeaderNames(final MqttVersion mqttVersion) {
return Arrays.stream(values())
.filter(value -> value.isAvailableInMqttVersion(mqttVersion))
.map(MqttHeader::getName)
.toList();
}

/**
* @param mqttVersion MQTT version to check
* @return true if the header is available in provided MQTT version otherwise false
*/
public static List<String> getHeaderNames() {
return Arrays.stream(values()).map(MqttHeader::getName).toList();
private boolean isAvailableInMqttVersion(final MqttVersion mqttVersion) {
return this.mqttVersion.compareTo(mqttVersion) <= 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,14 @@ public void testInvalidSourceMappingValue() {
testSourceMapping(invalidHeaderMapping, "header:invalid");
}

@Test
public void testInvalidSourceMappingValueIfMqtt5HeaderIsUsed() {
final HeaderMapping invalidHeaderMapping = ConnectivityModelFactory.newHeaderMapping(Map.of(
"timeout", "{{ header:mqtt.message-expiry-interval }}"
));
testSourceMapping(invalidHeaderMapping, "header:mqtt.message-expiry-interval");
}

@Test
public void testValidSourceMappingKeys() {
final HeaderMapping validHeaderMapping = ConnectivityModelFactory.newHeaderMapping(Map.of(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2023 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging.mqtt;

import org.assertj.core.api.JUnitSoftAssertions;
import org.junit.Rule;
import org.junit.Test;

import com.hivemq.client.mqtt.MqttVersion;

import java.util.List;

/**
* Unit test for {@link MqttHeader}
*/
public class MqttHeaderTest {

@Rule
public final JUnitSoftAssertions softly = new JUnitSoftAssertions();

@Test
public void getHeaderNamesReturnsExpectedHeadersForMqtt3() {
final var expected = List.of(
MqttHeader.MQTT_TOPIC.getName(),
MqttHeader.MQTT_QOS.getName(),
MqttHeader.MQTT_RETAIN.getName());
final var actual = MqttHeader.getHeaderNames(MqttVersion.MQTT_3_1_1);

softly.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
}

@Test
public void getHeaderNamesReturnsExpectedHeadersForMqtt5() {
final var expected = List.of(
MqttHeader.MQTT_TOPIC.getName(),
MqttHeader.MQTT_QOS.getName(),
MqttHeader.MQTT_RETAIN.getName(),
MqttHeader.MQTT_MESSAGE_EXPIRY_INTERVAL.getName());
final var actual = MqttHeader.getHeaderNames(MqttVersion.MQTT_5_0);

softly.assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
import org.mockito.MockedStatic;
import org.mockito.junit.MockitoJUnitRunner;

import com.hivemq.client.mqtt.MqttVersion;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.datatypes.MqttTopic;
import com.typesafe.config.ConfigFactory;
Expand Down Expand Up @@ -289,7 +290,7 @@ public void consumeFromTopicAndRetrieveConnectionMetrics() {
testKit.expectNoMessage();
final var modifyThing = commandForwarder.expectMsgClass(ModifyThing.class);
assertThat(modifyThing.getDittoHeaders())
.doesNotContainKeys(MqttHeader.getHeaderNames().toArray(String[]::new));
.doesNotContainKeys(MqttHeader.getHeaderNames(MqttVersion.MQTT_5_0).toArray(String[]::new));

underTest.tell(RetrieveConnectionMetrics.of(CONNECTION_ID, dittoHeadersWithCorrelationId), testKit.getRef());

Expand Down

0 comments on commit 2114122

Please sign in to comment.