Skip to content

Commit

Permalink
Run MqttClientActorIT tests for clean session as well
Browse files Browse the repository at this point in the history
  • Loading branch information
dimabarbul committed Nov 17, 2023
1 parent 83762ed commit c0e433e
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 30 deletions.
Expand Up @@ -20,6 +20,7 @@
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -76,14 +77,14 @@
@RunWith(Parameterized.class)
public final class MqttClientActorIT {

@Parameterized.Parameters(name = "MQTT version: {0}, separate publisher client: {1}")
@Parameterized.Parameters(name = "MQTT version: {0}, separate publisher client: {1}, clean session: {2}")
public static Collection<Object[]> parameters() {
return Arrays.asList(new Object[][] {
{ MqttVersion.MQTT_3_1_1, true },
{ MqttVersion.MQTT_3_1_1, false },
{ MqttVersion.MQTT_5_0, true },
{ MqttVersion.MQTT_5_0, false },
});
return Stream.of(MqttVersion.MQTT_3_1_1, MqttVersion.MQTT_5_0).flatMap(mqttVersion ->
Stream.of(true, false).flatMap(separatePublisherClient ->
Stream.of(true, false).map(cleanSession ->
new Object[] { mqttVersion, separatePublisherClient, cleanSession })))
.map(Object[].class::cast)
.toList();
}

@Parameterized.Parameter(0)
Expand All @@ -92,6 +93,9 @@ public static Collection<Object[]> parameters() {
@Parameterized.Parameter(1)
public static Boolean separatePublisherClient;

@Parameterized.Parameter(2)
public static Boolean cleanSession;

@ClassRule
public static final DittoTracingInitResource DITTO_TRACING_INIT_RESOURCE =
DittoTracingInitResource.disableDittoTracing();
Expand Down Expand Up @@ -182,14 +186,6 @@ public void testSingleTopic() {
expectMsg(CONNECTION_TIMEOUT, new Status.Success(BaseClientState.DISCONNECTED));

ensureAllMessagesAcknowledged();

// insertTestThings();

// underTest.tell(queryThings(200, null), getRef());

// final QueryThingsResponse response = expectMsgClass(QueryThingsResponse.class);

// assertThat(response.getSearchResult().getItems()).isEqualTo(expectedIds(4, 2, 0, 1, 3));
}};
}

Expand Down Expand Up @@ -251,12 +247,15 @@ public void testMultipleSources() {
public void testPersistentSession() {
new TestKit(actorSystem) {{
final var underTest = getMqttClientActor(getConnection(new String[] { TOPIC_NAME }));
// create session for ditto client ID
final var dittoMqttClient = getMqttClient(CLIENT_ID_DITTO);
dittoMqttClient.connect(GenericMqttConnect.newInstance(false, KeepAliveInterval.defaultKeepAlive(), SESSION_EXPIRY_INTERVAL, ReceiveMaximum.defaultReceiveMaximum()));
dittoMqttClient.subscribe(GenericMqttSubscribe.of(Set.of(
GenericMqttSubscription.newInstance(MqttTopicFilter.of(TOPIC_NAME), MqttQos.EXACTLY_ONCE))));
dittoMqttClient.disconnect();

// create session
System.out.println("Connecting to create session...");
underTest.tell(OpenConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTION_TIMEOUT, new Status.Success(BaseClientState.CONNECTED));
System.out.println("Connected");
underTest.tell(CloseConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTION_TIMEOUT, new Status.Success(BaseClientState.DISCONNECTED));
System.out.println("Disconnected");

final var mqttClient = getMqttClient(name.getMethodName());
mqttClient.connect();
Expand All @@ -266,8 +265,7 @@ public void testPersistentSession() {
underTest.tell(OpenConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTION_TIMEOUT, new Status.Success(BaseClientState.CONNECTED));

expectMergeThingMessage(commandForwarderProbe, "key", "test");
commandForwarderProbe.expectNoMessage(NO_MESSAGE_TIMEOUT);
expectMergeThingMessageIfNotCleanSession(commandForwarderProbe, "key", "test");

underTest.tell(CloseConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTION_TIMEOUT, new Status.Success(BaseClientState.DISCONNECTED));
Expand All @@ -277,7 +275,6 @@ public void testPersistentSession() {
}

@Test
@Ignore("https://github.com/eclipse-ditto/ditto/issues/1767")
public void testPersistentSessionMessageFromTopicWhichIsNoLongerSubscribed() {
new TestKit(actorSystem) {{
final var underTest = getMqttClientActor(getConnection(new String[] { TOPIC_NAME }));
Expand All @@ -298,8 +295,7 @@ public void testPersistentSessionMessageFromTopicWhichIsNoLongerSubscribed() {
underTest.tell(OpenConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTION_TIMEOUT, new Status.Success(BaseClientState.CONNECTED));

expectMergeThingMessage(commandForwarderProbe, "key", "test");
commandForwarderProbe.expectNoMessage(NO_MESSAGE_TIMEOUT);
expectMergeThingMessageIfNotCleanSession(commandForwarderProbe, "key", "test");

underTest.tell(CloseConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTION_TIMEOUT, new Status.Success(BaseClientState.DISCONNECTED));
Expand All @@ -323,8 +319,8 @@ private static Connection getConnection(final String[]... sourcesTopics) {
ConnectivityStatus.CLOSED,
"tcp://" + MOSQUITTO_RESOURCE.getBindIp() + ":" + MOSQUITTO_RESOURCE.getPort())
.specificConfig(Map.of(
"clientId", CLIENT_ID_DITTO,
"cleanSession", "false",
"clientId", getDittoClientId(),
"cleanSession", String.valueOf(cleanSession),
"separatePublisherClient", String.valueOf(separatePublisherClient)))
.setSources(Arrays.stream(sourcesTopics)
.map(topics -> ConnectivityModelFactory.newSourceBuilder()
Expand Down Expand Up @@ -371,6 +367,14 @@ private static GenericBlockingMqttClient getMqttClient(final String clientId) {
.build();
}

private void expectMergeThingMessageIfNotCleanSession(final TestProbe testProbe, final String key, final String value) {
if (!cleanSession) {
expectMergeThingMessage(testProbe, key, value);
}

testProbe.expectNoMsg(NO_MESSAGE_TIMEOUT);
}

private void expectMergeThingMessage(final TestProbe testProbe, final String key, final String value) {
final var command = testProbe.expectMsgClass(COMMAND_TIMEOUT, MergeThing.class);
softly.assertThat((CharSequence) command.getEntityId())
Expand Down
2 changes: 2 additions & 0 deletions connectivity/service/src/test/resources/mosquitto.conf
@@ -1,3 +1,5 @@
listener 1883
persistence true
allow_anonymous true

log_type all
1 change: 0 additions & 1 deletion connectivity/service/src/test/resources/test.conf
Expand Up @@ -309,7 +309,6 @@ ditto {
}

}

}

pekko {
Expand Down
Expand Up @@ -12,7 +12,6 @@
*/
package org.eclipse.ditto.internal.utils.test.docker;

import java.io.Closeable;
import java.io.IOException;

import javax.annotation.Nullable;
Expand Down
Expand Up @@ -34,6 +34,7 @@ final class MosquittoContainerFactory extends ContainerFactory {
private static final String MOSQUITTO_VERSION = "2.0.18";
private static final int MOSQUITTO_INTERNAL_PORT = 1883;
private static final String CONFIG_CONTAINER_PATH = "/mosquitto/config/mosquitto.conf";

private final String CONFIG_RESOURCES_PATH;

private MosquittoContainerFactory(final String configResourceName, final String mosquittoVersion) {
Expand Down

0 comments on commit c0e433e

Please sign in to comment.