diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActorIT.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActorIT.java index 5a3a239bd2..c4d87e4dc4 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActorIT.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/MqttClientActorIT.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019 Contributors to the Eclipse Foundation + * Copyright (c) 2023 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -16,6 +16,7 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -34,6 +35,7 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.connectivity.api.BaseClientState; import org.eclipse.ditto.connectivity.model.*; +import org.eclipse.ditto.connectivity.model.mqtt.IllegalSessionExpiryIntervalSecondsException; import org.eclipse.ditto.connectivity.model.mqtt.ReceiveMaximum; import org.eclipse.ditto.connectivity.model.mqtt.SessionExpiryInterval; import org.eclipse.ditto.connectivity.model.signals.commands.modify.CloseConnection; @@ -74,14 +76,22 @@ @RunWith(Parameterized.class) public final class MqttClientActorIT { - @Parameterized.Parameters(name = "MQTT version {0}") - public static Collection mqttVersions() { - return List.of(MqttVersion.MQTT_3_1_1, MqttVersion.MQTT_5_0); + @Parameterized.Parameters(name = "MQTT version: {0}, separate publisher client: {1}") + public static Collection parameters() { + return Arrays.asList(new Object[][] { + { MqttVersion.MQTT_3_1_1, true }, + { MqttVersion.MQTT_3_1_1, false }, + { MqttVersion.MQTT_5_0, true }, + { MqttVersion.MQTT_5_0, false }, + }); } - @Parameterized.Parameter + @Parameterized.Parameter(0) public static MqttVersion mqttVersion; + @Parameterized.Parameter(1) + public static Boolean separatePublisherClient; + @ClassRule public static final DittoTracingInitResource DITTO_TRACING_INIT_RESOURCE = DittoTracingInitResource.disableDittoTracing(); @@ -101,6 +111,15 @@ public static Collection mqttVersions() { // https://github.com/eclipse-ditto/ditto/issues/1767 private static final int TIMEOUT_BEFORE_PUBLISH = 3; private static final int MESSAGES_FROM_PREVIOUS_SESSION_TIMEOUT = 3; + private static final SessionExpiryInterval SESSION_EXPIRY_INTERVAL; + + static { + try { + SESSION_EXPIRY_INTERVAL = SessionExpiryInterval.of(Duration.ofSeconds(60)); + } catch (IllegalSessionExpiryIntervalSecondsException e) { + throw new RuntimeException(e); + } + } private static Config actorsTestConfig; @@ -131,13 +150,6 @@ private void cleanPreviousSession() { mqttClient.cleanSession(); } - /*private static DittoMongoClient provideClientWrapper() { - return MongoClientWrapper.getBuilder() - .connectionString( - "mongodb://" + MONGO_RESOURCE.getBindIp() + ":" + MONGO_RESOURCE.getPort() + "/testSearchDB") - .build(); - }*/ - @After public void after() { // if (mongoClient != null) { @@ -241,7 +253,7 @@ public void testPersistentSession() { final var underTest = getMqttClientActor(getConnection(new String[] { TOPIC_NAME })); // create session for ditto client ID final var dittoMqttClient = getMqttClient(CLIENT_ID_DITTO); - dittoMqttClient.connect(GenericMqttConnect.newInstance(false, KeepAliveInterval.defaultKeepAlive(), SessionExpiryInterval.defaultSessionExpiryInterval(), ReceiveMaximum.defaultReceiveMaximum())); + dittoMqttClient.connect(GenericMqttConnect.newInstance(false, KeepAliveInterval.defaultKeepAlive(), SESSION_EXPIRY_INTERVAL, ReceiveMaximum.defaultReceiveMaximum())); dittoMqttClient.subscribe(GenericMqttSubscribe.of(Set.of( GenericMqttSubscription.newInstance(MqttTopicFilter.of(TOPIC_NAME), MqttQos.EXACTLY_ONCE)))); dittoMqttClient.disconnect(); @@ -271,7 +283,7 @@ public void testPersistentSessionMessageFromTopicWhichIsNoLongerSubscribed() { final var underTest = getMqttClientActor(getConnection(new String[] { TOPIC_NAME })); // create session for ditto client ID with subscription to 2 topics final var dittoMqttClient = getMqttClient(CLIENT_ID_DITTO); - dittoMqttClient.connect(GenericMqttConnect.newInstance(false, KeepAliveInterval.defaultKeepAlive(), SessionExpiryInterval.defaultSessionExpiryInterval(), ReceiveMaximum.defaultReceiveMaximum())); + dittoMqttClient.connect(GenericMqttConnect.newInstance(false, KeepAliveInterval.defaultKeepAlive(), SESSION_EXPIRY_INTERVAL, ReceiveMaximum.defaultReceiveMaximum())); dittoMqttClient.subscribe(GenericMqttSubscribe.of(Set.of( GenericMqttSubscription.newInstance(MqttTopicFilter.of(TOPIC_NAME), MqttQos.EXACTLY_ONCE), GenericMqttSubscription.newInstance(MqttTopicFilter.of(ANOTHER_TOPIC_NAME), MqttQos.EXACTLY_ONCE)))); @@ -313,7 +325,7 @@ private static Connection getConnection(final String[]... sourcesTopics) { .specificConfig(Map.of( "clientId", CLIENT_ID_DITTO, "cleanSession", "false", - "separatePublisherClient", "false")) + "separatePublisherClient", String.valueOf(separatePublisherClient))) .setSources(Arrays.stream(sourcesTopics) .map(topics -> ConnectivityModelFactory.newSourceBuilder() .authorizationContext(AUTHORIZATION_CONTEXT) diff --git a/connectivity/service/src/test/resources/test.conf b/connectivity/service/src/test/resources/test.conf index f76e7247ef..703fb57f6c 100644 --- a/connectivity/service/src/test/resources/test.conf +++ b/connectivity/service/src/test/resources/test.conf @@ -310,9 +310,6 @@ ditto { } - tracing { - enabled = false - } } pekko { diff --git a/internal/utils/test/src/test/java/org/eclipse/ditto/internal/utils/test/docker/mosquitto/MosquittoContainerFactory.java b/internal/utils/test/src/test/java/org/eclipse/ditto/internal/utils/test/docker/mosquitto/MosquittoContainerFactory.java index c652c638ba..c3c0e11ad2 100644 --- a/internal/utils/test/src/test/java/org/eclipse/ditto/internal/utils/test/docker/mosquitto/MosquittoContainerFactory.java +++ b/internal/utils/test/src/test/java/org/eclipse/ditto/internal/utils/test/docker/mosquitto/MosquittoContainerFactory.java @@ -17,7 +17,7 @@ import java.util.Objects; import com.github.dockerjava.api.model.AccessMode; -import com.github.dockerjava.api.model.PortBinding; +import com.google.common.base.MoreObjects; import org.eclipse.ditto.internal.utils.test.docker.ContainerFactory; import com.github.dockerjava.api.command.CreateContainerCmd; @@ -59,10 +59,8 @@ static MosquittoContainerFactory of(final String configResourceName) { @Override protected CreateContainerCmd configureContainer(CreateContainerCmd createContainerCmd) { - final var hostConfig = HostConfig.newHostConfig() - .withBinds(new Bind(CONFIG_RESOURCES_PATH, new Volume(CONFIG_CONTAINER_PATH), AccessMode.ro)) - .withPortBindings(PortBinding.parse(String.valueOf(MOSQUITTO_INTERNAL_PORT))); - return createContainerCmd - .withHostConfig(hostConfig); + final var hostConfig = MoreObjects.firstNonNull(createContainerCmd.getHostConfig(), HostConfig.newHostConfig()) + .withBinds(new Bind(CONFIG_RESOURCES_PATH, new Volume(CONFIG_CONTAINER_PATH), AccessMode.ro)); + return createContainerCmd.withHostConfig(hostConfig); } }