From 2253bd4694c78cd66e9c168b1f19c387c4993a4e Mon Sep 17 00:00:00 2001 From: Juergen Fickel Date: Tue, 31 May 2022 09:29:15 +0200 Subject: [PATCH] Switched to generic implementation of MQTT with backpressure. * Legacy MQTT implementation still has to be removed after testing. * Refactored `GenericMqttClientFactory` to be a utility class instead of an interface because it did not make sense to have an instance of this factory. This entailed some static mocking in `GenericMqttClientActorTest` and `ConnectionTesterTest`. * Provide `MqttSpecificConfig` to `HiveMqttClientProperties` because this config is required in `GenericMqttActor` anyway. This prevents duplicate creation of `MqttSpecificConfig`. * Implemented possibility of reconnecting of consumer client to `GenericMqttActor`. Signed-off-by: Juergen Fickel --- .../DefaultClientActorPropsFactory.java | 66 ++-- .../messaging/mqtt/MqttSpecificConfig.java | 10 + .../messaging/mqtt/ReconnectDelay.java | 94 ++++++ .../mqtt/hivemq/ConnectionTester.java | 20 +- .../mqtt/hivemq/GenericMqttClientActor.java | 110 ++++++- .../client/DefaultGenericMqttClient.java | 12 +- .../DefaultGenericMqttClientFactory.java | 192 ----------- .../mqtt/hivemq/client/GenericMqttClient.java | 18 ++ .../client/GenericMqttClientFactory.java | 163 +++++++++- .../client/HiveMqttClientProperties.java | 15 +- .../HiveMqttClientPropertiesStepBuilder.java | 19 +- .../consuming/MqttClientActorControl.java | 25 -- .../hivemq/consuming/MqttConsumerActor.java | 2 +- .../consuming/ReconnectConsumerClient.java | 79 +++++ .../mqtt/MqttSpecificConfigTest.java | 6 +- .../messaging/mqtt/ReconnectDelayTest.java | 84 +++++ .../mqtt/hivemq/ConnectionTesterTest.java | 45 +-- .../hivemq/GenericMqttClientActorTest.java | 302 ++++++++++-------- .../client/DefaultGenericMqttClientTest.java | 54 ++++ .../client/HiveMqttClientFactoryTest.java | 10 +- .../client/HiveMqttClientPropertiesTest.java | 7 + ...PublishingClientIdentifierFactoryTest.java | 2 + ...ubscribingClientIdentifierFactoryTest.java | 2 + .../consuming/MqttConsumerActorTest.java | 2 +- .../ReconnectConsumerClientTest.java | 59 ++++ 25 files changed, 939 insertions(+), 459 deletions(-) create mode 100644 connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/ReconnectDelay.java delete mode 100644 connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClientFactory.java delete mode 100644 connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttClientActorControl.java create mode 100644 connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/ReconnectConsumerClient.java create mode 100644 connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/ReconnectDelayTest.java create mode 100644 connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/ReconnectConsumerClientTest.java diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/DefaultClientActorPropsFactory.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/DefaultClientActorPropsFactory.java index 5840bb2505..dd414f12b5 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/DefaultClientActorPropsFactory.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/DefaultClientActorPropsFactory.java @@ -17,12 +17,10 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.connectivity.model.Connection; -import org.eclipse.ditto.connectivity.model.ConnectionType; import org.eclipse.ditto.connectivity.service.messaging.amqp.AmqpClientActor; import org.eclipse.ditto.connectivity.service.messaging.httppush.HttpPushClientActor; import org.eclipse.ditto.connectivity.service.messaging.kafka.KafkaClientActor; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.HiveMqtt3ClientActor; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.HiveMqtt5ClientActor; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.GenericMqttClientActor; import org.eclipse.ditto.connectivity.service.messaging.rabbitmq.RabbitMQClientActor; import com.typesafe.config.Config; @@ -55,43 +53,41 @@ public static DefaultClientActorPropsFactory getInstance() { } @Override - public Props getActorPropsForType(final Connection connection, final ActorRef proxyActor, + public Props getActorPropsForType(final Connection connection, + final ActorRef proxyActor, final ActorRef connectionActor, final ActorSystem actorSystem, final DittoHeaders dittoHeaders, final Config connectivityConfigOverwrites) { - final ConnectionType connectionType = connection.getConnectionType(); - final Props result; - switch (connectionType) { - case AMQP_091: - result = RabbitMQClientActor.props(connection, proxyActor, connectionActor, dittoHeaders, - connectivityConfigOverwrites); - break; - case AMQP_10: - result = AmqpClientActor.props(connection, proxyActor, connectionActor, connectivityConfigOverwrites, - actorSystem, dittoHeaders); - break; - case MQTT: - result = HiveMqtt3ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders, - connectivityConfigOverwrites); - break; - case MQTT_5: - result = HiveMqtt5ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders, - connectivityConfigOverwrites); - break; - case KAFKA: - result = KafkaClientActor.props(connection, proxyActor, connectionActor, dittoHeaders, - connectivityConfigOverwrites); - break; - case HTTP_PUSH: - result = HttpPushClientActor.props(connection, proxyActor, connectionActor, dittoHeaders, - connectivityConfigOverwrites); - break; - default: - throw new IllegalArgumentException("ConnectionType <" + connectionType + "> is not supported."); - } - return result; + return switch (connection.getConnectionType()) { + case AMQP_091 -> RabbitMQClientActor.props(connection, + proxyActor, + connectionActor, + dittoHeaders, + connectivityConfigOverwrites); + case AMQP_10 -> AmqpClientActor.props(connection, + proxyActor, + connectionActor, + connectivityConfigOverwrites, + actorSystem, + dittoHeaders); + case MQTT, MQTT_5 -> GenericMqttClientActor.props(connection, + proxyActor, + connectionActor, + dittoHeaders, + connectivityConfigOverwrites); + case KAFKA -> KafkaClientActor.props(connection, + proxyActor, + connectionActor, + dittoHeaders, + connectivityConfigOverwrites); + case HTTP_PUSH -> HttpPushClientActor.props(connection, + proxyActor, + connectionActor, + dittoHeaders, + connectivityConfigOverwrites); + }; } } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/MqttSpecificConfig.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/MqttSpecificConfig.java index f84604e5bb..5ba36997b6 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/MqttSpecificConfig.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/MqttSpecificConfig.java @@ -109,10 +109,20 @@ public boolean separatePublisherClient() { /** * @return how long to wait before reconnect a consumer client for redelivery. */ + // TODO jff delete as soon as unused. public Duration getReconnectForDeliveryDelay() { return specificConfig.getDuration(RECONNECT_FOR_REDELIVERY_DELAY); } + /** + * Returns the delay how long to wait before reconnecting a consumer client for redelivery. + * + * @return the reconnect delay. + */ + public ReconnectDelay getReconnectForDeliveryDelayNg() { + return ReconnectDelay.ofOrLowerBoundary(specificConfig.getDuration(RECONNECT_FOR_REDELIVERY_DELAY)); + } + /** * @return the optional clientId which should be used by the MQTT client when connecting to the MQTT broker. */ diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/ReconnectDelay.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/ReconnectDelay.java new file mode 100644 index 0000000000..54da2984f9 --- /dev/null +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/ReconnectDelay.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2022 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.connectivity.service.messaging.mqtt; + +import java.time.Duration; +import java.util.Objects; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.base.model.common.ConditionChecker; + +/** + * Delay how long to wait before reconnecting an MQTT client. + * This delay is guaranteed + */ +@Immutable +public final class ReconnectDelay implements Comparable { + + /** + * The lower boundary a {@code ReconnectDelay} a never falls below. + */ + static final Duration LOWER_BOUNDARY = Duration.ofSeconds(1L); + + private final Duration duration; + + private ReconnectDelay(final Duration duration) { + this.duration = duration; + } + + /** + * Returns an instance of {@code ReconnectDelay} for the specified {@code Duration} argument or + * {@link #LOWER_BOUNDARY} if the argument is less than the lower boundary. + * + * @param duration the duration of the returned {@code ReconnectDelay}. + * @return the instance. + * @throws NullPointerException if {@code duration} is {@code null}. + */ + static ReconnectDelay ofOrLowerBoundary(final Duration duration) { + final Duration durationWithinLowerBoundary; + if (0 < LOWER_BOUNDARY.compareTo(ConditionChecker.checkNotNull(duration, "duration"))) { + durationWithinLowerBoundary = LOWER_BOUNDARY; + } else { + durationWithinLowerBoundary = duration; + } + return new ReconnectDelay(durationWithinLowerBoundary); + } + + public Duration getDuration() { + return duration; + } + + @Override + public int compareTo(final ReconnectDelay o) { + ConditionChecker.checkNotNull(o, "o"); + return duration.compareTo(o.getDuration()); + } + + @Override + public boolean equals(@Nullable final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final var that = (ReconnectDelay) o; + return Objects.equals(duration, that.duration); + } + + @Override + public int hashCode() { + return Objects.hash(duration); + } + + /** + * @return the string representation of the wrapped {@code Duration}. + */ + @Override + public String toString() { + return duration.toString(); + } + +} diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/ConnectionTester.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/ConnectionTester.java index b0b30bebd4..2325da416e 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/ConnectionTester.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/ConnectionTester.java @@ -31,18 +31,18 @@ import org.eclipse.ditto.connectivity.service.messaging.ChildActorNanny; import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver; import org.eclipse.ditto.connectivity.service.messaging.mqtt.KeepAliveInterval; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.AllSubscriptionsFailedException; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClient; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClientFactory; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.HiveMqttClientProperties; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttSubscribeException; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SomeSubscriptionsFailedException; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SubscriptionStatus; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.MqttConsumerActor; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.connect.GenericMqttConnect; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing.MqttPublisherActor; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.AllSubscriptionsFailedException; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttSubscribeException; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.MqttSubscriber; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SomeSubscriptionsFailedException; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.SubscribeResult; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SubscriptionStatus; import org.eclipse.ditto.connectivity.service.util.ConnectivityMdcEntryKey; import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory; import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLogger; @@ -61,7 +61,6 @@ @NotThreadSafe final class ConnectionTester { - private final GenericMqttClientFactory genericMqttClientFactory; private final HiveMqttClientProperties hiveMqttClientProperties; private final Sink inboundMappingSink; private final ConnectivityStatusResolver connectivityStatusResolver; @@ -70,7 +69,6 @@ final class ConnectionTester { private final ThreadSafeDittoLogger logger; private ConnectionTester(final Builder builder) { - genericMqttClientFactory = checkNotNull(builder.genericMqttClientFactory, "genericMqttClientFactory"); hiveMqttClientProperties = checkNotNull(builder.hiveMqttClientProperties, "hiveMqttClientProperties"); inboundMappingSink = checkNotNull(builder.inboundMappingSink, "inboundMappingSink"); connectivityStatusResolver = checkNotNull(builder.connectivityStatusResolver, "connectivityStatusResolver"); @@ -110,7 +108,7 @@ CompletionStage testConnection() { private CompletionStage tryToGetGenericMqttClient() { try { return CompletableFuture.completedFuture( - genericMqttClientFactory.getGenericMqttClientForConnectionTesting(hiveMqttClientProperties) + GenericMqttClientFactory.getGenericMqttClientForConnectionTesting(hiveMqttClientProperties) ); } catch (final Exception e) { return CompletableFuture.failedFuture(e); @@ -263,7 +261,6 @@ private static Throwable unwrapCauseIfCompletionException(final Throwable throwa @NotThreadSafe static final class Builder { - private GenericMqttClientFactory genericMqttClientFactory; private HiveMqttClientProperties hiveMqttClientProperties; private Sink inboundMappingSink; private ConnectivityStatusResolver connectivityStatusResolver; @@ -280,13 +277,6 @@ private Builder() { correlationId = null; } - Builder withGenericMqttClientFactory( - final GenericMqttClientFactory genericMqttClientFactory - ) { - this.genericMqttClientFactory = checkNotNull(genericMqttClientFactory, "genericMqttClientFactory"); - return this; - } - Builder withHiveMqttClientProperties(final HiveMqttClientProperties hiveMqttClientProperties) { this.hiveMqttClientProperties = checkNotNull(hiveMqttClientProperties, "hiveMqttClientProperties"); return this; diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/GenericMqttClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/GenericMqttClientActor.java index 5ac5d3e419..9cfaf07748 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/GenericMqttClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/GenericMqttClientActor.java @@ -18,19 +18,23 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import javax.annotation.Nullable; import org.eclipse.ditto.base.model.common.ConditionChecker; import org.eclipse.ditto.base.model.headers.DittoHeaders; +import org.eclipse.ditto.connectivity.api.BaseClientState; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection; import org.eclipse.ditto.connectivity.service.config.MqttConfig; import org.eclipse.ditto.connectivity.service.messaging.BaseClientActor; +import org.eclipse.ditto.connectivity.service.messaging.BaseClientData; import org.eclipse.ditto.connectivity.service.messaging.backoff.RetryTimeoutStrategy; import org.eclipse.ditto.connectivity.service.messaging.internal.ClientConnected; import org.eclipse.ditto.connectivity.service.messaging.internal.ClientDisconnected; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.ClientRole; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClient; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClientDisconnectedListener; @@ -38,6 +42,7 @@ import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.HiveMqttClientProperties; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.NoMqttConnectionException; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.MqttConsumerActor; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.ReconnectConsumerClient; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing.MqttPublisherActor; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.MqttSubscriber; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.SubscribeResult; @@ -54,6 +59,7 @@ import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.Status; +import akka.japi.pf.FSMStateFunctionBuilder; import akka.pattern.Patterns; import akka.stream.javadsl.Keep; import akka.stream.javadsl.Sink; @@ -65,12 +71,13 @@ /** * Actor for handling connection to an MQTT broker for protocol versions 3 or 5. */ -final class GenericMqttClientActor extends BaseClientActor { +public final class GenericMqttClientActor extends BaseClientActor { private final MqttConfig mqttConfig; + private final MqttSpecificConfig mqttSpecificConfig; - private final GenericMqttClientFactory genericMqttClientFactory; @Nullable private GenericMqttClient genericMqttClient; + private final AtomicBoolean automaticReconnect; @Nullable private ActorRef publishingActorRef; private final List mqttConsumerActorRefs; @@ -79,8 +86,7 @@ private GenericMqttClientActor(final Connection connection, final ActorRef proxyActor, final ActorRef connectionActor, final DittoHeaders dittoHeaders, - final Config connectivityConfigOverwrites, - final GenericMqttClientFactory genericMqttClientFactory) { + final Config connectivityConfigOverwrites) { super(connection, proxyActor, connectionActor, dittoHeaders, connectivityConfigOverwrites); @@ -88,8 +94,10 @@ private GenericMqttClientActor(final Connection connection, final var connectionConfig = connectivityConfig.getConnectionConfig(); mqttConfig = connectionConfig.getMqttConfig(); - this.genericMqttClientFactory = genericMqttClientFactory; + mqttSpecificConfig = MqttSpecificConfig.fromConnection(connection, mqttConfig); + genericMqttClient = null; + automaticReconnect = new AtomicBoolean(true); publishingActorRef = null; mqttConsumerActorRefs = new ArrayList<>(); } @@ -109,16 +117,70 @@ public static Props props(final Connection mqttConnection, final ActorRef proxyActor, final ActorRef connectionActor, final DittoHeaders dittoHeaders, - final Config connectivityConfigOverwrites, - final GenericMqttClientFactory genericMqttClientFactory) { + final Config connectivityConfigOverwrites) { return Props.create(GenericMqttClientActor.class, ConditionChecker.checkNotNull(mqttConnection, "mqttConnection"), ConditionChecker.checkNotNull(proxyActor, "proxyActor"), ConditionChecker.checkNotNull(connectionActor, "connectionActor"), ConditionChecker.checkNotNull(dittoHeaders, "dittoHeaders"), - ConditionChecker.checkNotNull(connectivityConfigOverwrites, "connectivityConfigOverwrites"), - ConditionChecker.checkNotNull(genericMqttClientFactory, "genericMqttClientFactory")); + ConditionChecker.checkNotNull(connectivityConfigOverwrites, "connectivityConfigOverwrites")); + } + + @Override + protected FSMStateFunctionBuilder inConnectingState() { + final FSMStateFunctionBuilder result; + if (isReconnectForRedelivery()) { + result = super.inConnectingState() + .event(ReconnectConsumerClient.class, this::scheduleConsumerClientReconnect) + .eventEquals(Control.RECONNECT_CONSUMER_CLIENT, this::reconnectConsumerClient); + } else { + result = super.inConnectingState(); + } + return result; + } + + private boolean isReconnectForRedelivery() { + return mqttSpecificConfig.reconnectForRedelivery(); + } + + private State scheduleConsumerClientReconnect( + final ReconnectConsumerClient reconnectConsumerClient, + final BaseClientData baseClientData + ) { + final var trigger = Control.RECONNECT_CONSUMER_CLIENT; + if (isTimerActive(trigger.name())) { + logger.debug("Timer <{}> is active, thus not scheduling reconnecting consumer client again.", + trigger.name()); + } else { + final var reconnectForRedeliveryDelay = reconnectConsumerClient.getReconnectDelay(); + logger.info("Scheduling reconnecting of consumer client in <{}>.", reconnectForRedeliveryDelay); + startSingleTimer(trigger.name(), trigger, reconnectForRedeliveryDelay.getDuration()); + } + return stay(); + } + + private State reconnectConsumerClient( + final Control control, + final BaseClientData baseClientData) { + if (null != genericMqttClient) { + enableAutomaticReconnect(); + genericMqttClient.disconnectClientRole(ClientRole.CONSUMER); + } + return stay(); + } + + @Override + protected FSMStateFunctionBuilder inConnectedState() { + final FSMStateFunctionBuilder result; + if (isReconnectForRedelivery()) { + result = super.inConnectedState() + .event(ReconnectConsumerClient.class, this::scheduleConsumerClientReconnect) + .eventEquals(Control.RECONNECT_CONSUMER_CLIENT, this::reconnectConsumerClient); + } else { + result = super.inConnectedState(); + } + return result; } @Override @@ -146,6 +208,7 @@ private Try tryToGetHiveMqttClientPropertiesForConnect return new Success<>(HiveMqttClientProperties.builder() .withMqttConnection(connection) .withConnectivityConfig(connectivityConfig()) + .withMqttSpecificConfig(mqttSpecificConfig) .withSshTunnelStateSupplier(this::getSshTunnelState) .withConnectionLogger(connectionLogger) .withActorUuid(actorUuid) @@ -159,7 +222,6 @@ private ConnectionTester getConnectionTester(final HiveMqttClientProperties hive final TestConnection testConnectionCmd) { return ConnectionTester.builder() - .withGenericMqttClientFactory(genericMqttClientFactory) .withHiveMqttClientProperties(hiveMqttClientProperties) .withInboundMappingSink(getInboundMappingSink()) .withConnectivityStatusResolver(connectivityStatusResolver) @@ -174,6 +236,7 @@ protected void cleanupResourcesForConnection() { mqttConsumerActorRefs.forEach(this::stopChildActor); stopChildActor(publishingActorRef); if (null != genericMqttClient) { + disableAutomaticReconnect(); genericMqttClient.disconnect(); } @@ -182,12 +245,17 @@ protected void cleanupResourcesForConnection() { mqttConsumerActorRefs.clear(); } + private void disableAutomaticReconnect() { + automaticReconnect.set(false); + } + @Override protected void doConnectClient(final Connection connection, @Nullable final ActorRef origin) { if (null == genericMqttClient) { - genericMqttClient = genericMqttClientFactory.getProductiveGenericMqttClient( + genericMqttClient = GenericMqttClientFactory.getProductiveGenericMqttClient( getHiveMqttClientPropertiesOrThrow(connection) ); + enableAutomaticReconnect(); } Patterns.pipe( genericMqttClient.connect().thenApply(aVoid -> MqttClientConnected.of(origin)), @@ -200,6 +268,7 @@ private HiveMqttClientProperties getHiveMqttClientPropertiesOrThrow(final Connec return HiveMqttClientProperties.builder() .withMqttConnection(connection) .withConnectivityConfig(connectivityConfig()) + .withMqttSpecificConfig(mqttSpecificConfig) .withSshTunnelStateSupplier(this::getSshTunnelState) .withConnectionLogger(connectionLogger) .withActorUuid(actorUuid) @@ -248,7 +317,7 @@ private GenericMqttClientDisconnectedListener getClientDisconnectedListener() { mqttClientReconnector.reconnect(false); } else { final var mqttDisconnectSource = context.getSource(); - final var reconnect = connection().isFailoverEnabled(); + final var reconnect = isReconnect(); final var reconnectDelayMillis = getReconnectDelayMillis(retryTimeoutStrategy, mqttDisconnectSource); logger.info("Client <{}> disconnected by <{}>.", clientId, mqttDisconnectSource); if (reconnect) { @@ -274,6 +343,11 @@ private static boolean isMqttClientInConnectingState(final MqttClientConfig mqtt return MqttClientState.CONNECTING == mqttClientConfig.getState(); } + private boolean isReconnect() { + final var connection = connection(); + return connection.isFailoverEnabled() && automaticReconnect.get(); + } + private long getReconnectDelayMillis(final RetryTimeoutStrategy retryTimeoutStrategy, final MqttDisconnectSource mqttDisconnectSource) { @@ -289,6 +363,10 @@ private long getReconnectDelayMillis(final RetryTimeoutStrategy retryTimeoutStra return result; } + private void enableAutomaticReconnect() { + automaticReconnect.set(true); + } + private ExecutionContextExecutor getContextDispatcher() { final var actorContext = getContext(); return actorContext.getDispatcher(); @@ -300,6 +378,7 @@ protected void doDisconnectClient(final Connection connection, final boolean shutdownAfterDisconnect) { if (null != genericMqttClient) { + disableAutomaticReconnect(); Patterns.pipe( genericMqttClient.disconnect() .thenApply(aVoid -> ClientDisconnected.of(origin, shutdownAfterDisconnect)), @@ -417,9 +496,16 @@ protected Stream getSourceAddresses() { public void postStop() { logger.info("Actor stopped, stopping clients."); if (null != genericMqttClient) { + disableAutomaticReconnect(); genericMqttClient.disconnect(); } super.postStop(); } + private enum Control { + + RECONNECT_CONSUMER_CLIENT; + + } + } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClient.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClient.java index 3ebf97b339..1702295698 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClient.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClient.java @@ -120,7 +120,17 @@ private CompletableFuture disconnectSubscribingClient() { @Override public CompletionStage disconnect() { - return CompletableFuture.allOf(disconnectPublishingClient(), disconnectSubscribingClient()); + return disconnectClientRole(ClientRole.CONSUMER_PUBLISHER); + } + + @Override + public CompletionStage disconnectClientRole(final ClientRole clientRole) { + return switch (checkNotNull(clientRole, "clientRole")) { + case CONSUMER -> disconnectSubscribingClient(); + case PUBLISHER -> disconnectPublishingClient(); + case CONSUMER_PUBLISHER -> + CompletableFuture.allOf(disconnectPublishingClient(), disconnectSubscribingClient()); + }; } private CompletableFuture disconnectPublishingClient() { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClientFactory.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClientFactory.java deleted file mode 100644 index bf2e56b5c6..0000000000 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClientFactory.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Copyright (c) 2022 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client; - -import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; - -import javax.annotation.concurrent.Immutable; - -import org.eclipse.ditto.connectivity.model.Connection; -import org.eclipse.ditto.connectivity.model.ConnectionType; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig; - -import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; -import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient; -import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; -import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient; - -/** - * Default implementation for {@link GenericMqttClientFactory}. - */ -@Immutable -final class DefaultGenericMqttClientFactory implements GenericMqttClientFactory { - - private DefaultGenericMqttClientFactory() { - super(); - } - - /** - * Returns a new instance of {@code DefaultGenericMqttClientFactory}. - * - * @return the instance. - */ - static DefaultGenericMqttClientFactory newInstance() { - return new DefaultGenericMqttClientFactory(); - } - - @Override - public GenericMqttClient getProductiveGenericMqttClient(final HiveMqttClientProperties hiveMqttClientProperties) { - return getGenericMqttClient(hiveMqttClientProperties, false); - } - - private static GenericMqttClient getGenericMqttClient(final HiveMqttClientProperties hiveMqttClientProperties, - final boolean forcefullyDisableLastWill) { - - checkNotNull(hiveMqttClientProperties, "hiveMqttClientProperties"); - final GenericMqttClient result; - final var factory = new FactoryImplementation(hiveMqttClientProperties, forcefullyDisableLastWill); - if (isMqtt3ProtocolVersion(hiveMqttClientProperties.getMqttConnection())) { - result = factory.getGenericMqttClientForMqtt3(); - } else { - result = factory.getGenericMqttClientForMqtt5(); - } - return result; - } - - private static boolean isMqtt3ProtocolVersion(final Connection mqttConnection) { - return ConnectionType.MQTT == mqttConnection.getConnectionType(); - } - - @Override - public GenericMqttClient getGenericMqttClientForConnectionTesting( - final HiveMqttClientProperties hiveMqttClientProperties - ) { - return getGenericMqttClient(hiveMqttClientProperties, true); - } - - private static final class FactoryImplementation { - - private final HiveMqttClientProperties hiveMqttClientProperties; - private final MqttSpecificConfig mqttSpecificConfig; - private final MqttClientIdentifierFactory subscribingClientIdFactory; - private final MqttClientIdentifierFactory publishingClientIdFactory; - private final boolean forcefullyDisableLastWill; - - private FactoryImplementation(final HiveMqttClientProperties hiveMqttClientProperties, - final boolean forcefullyDisableLastWill) { - - this.hiveMqttClientProperties = hiveMqttClientProperties; - mqttSpecificConfig = hiveMqttClientProperties.getMqttSpecificConfig(); - subscribingClientIdFactory = MqttClientIdentifierFactory.forSubscribingClient(hiveMqttClientProperties); - publishingClientIdFactory = MqttClientIdentifierFactory.forPublishingClient(hiveMqttClientProperties); - this.forcefullyDisableLastWill = forcefullyDisableLastWill; - } - - private GenericMqttClient getGenericMqttClientForMqtt3() { - final BaseGenericMqttSubscribingClient subscribingClient; - final BaseGenericMqttPublishingClient publishingClient; - if (mqttSpecificConfig.separatePublisherClient()) { - - // Create separate HiveMQ MQTT client instance for subscribing client and publishing client. - subscribingClient = getSubscribingClientForMqtt3(); - publishingClient = getPublishingClientForMqtt3(); - } else { - - // Re-use same HiveMQ MQTT client instance for subscribing client and publishing client. - final var clientRole = ClientRole.CONSUMER_PUBLISHER; - final var mqtt3Client = HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties, - subscribingClientIdFactory.getMqttClientIdentifier(), - !forcefullyDisableLastWill, - clientRole); - subscribingClient = BaseGenericMqttSubscribingClient.ofMqtt3RxClient(mqtt3Client.toRx(), clientRole); - publishingClient = - BaseGenericMqttPublishingClient.ofMqtt3AsyncClient(mqtt3Client.toAsync(), clientRole); - } - return DefaultGenericMqttClient.newInstance(subscribingClient, publishingClient, hiveMqttClientProperties); - } - - private BaseGenericMqttSubscribingClient getSubscribingClientForMqtt3() { - final var clientRole = ClientRole.CONSUMER; - return BaseGenericMqttSubscribingClient.ofMqtt3RxClient( - HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties, - subscribingClientIdFactory.getMqttClientIdentifier(), - false, - clientRole - ).toRx(), - clientRole - ); - } - - private BaseGenericMqttPublishingClient getPublishingClientForMqtt3() { - final var clientRole = ClientRole.PUBLISHER; - return BaseGenericMqttPublishingClient.ofMqtt3AsyncClient( - HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties, - publishingClientIdFactory.getMqttClientIdentifier(), - !forcefullyDisableLastWill, - clientRole - ).toAsync(), - clientRole - ); - } - - private GenericMqttClient getGenericMqttClientForMqtt5() { - final BaseGenericMqttSubscribingClient subscribingClient; - final BaseGenericMqttPublishingClient publishingClient; - if (mqttSpecificConfig.separatePublisherClient()) { - - // Create separate HiveMQ MQTT client instance for subscribing client and publishing client. - subscribingClient = getSubscribingClientForMqtt5(); - publishingClient = getPublishingClientForMqtt5(); - } else { - - // Re-use same HiveMQ MQTT client instance for subscribing client and publishing client. - final var clientRole = ClientRole.CONSUMER_PUBLISHER; - final var mqtt5Client = HiveMqttClientFactory.getMqtt5Client(hiveMqttClientProperties, - subscribingClientIdFactory.getMqttClientIdentifier(), - !forcefullyDisableLastWill, - clientRole); - subscribingClient = BaseGenericMqttSubscribingClient.ofMqtt5RxClient(mqtt5Client.toRx(), clientRole); - publishingClient = - BaseGenericMqttPublishingClient.ofMqtt5AsyncClient(mqtt5Client.toAsync(), clientRole); - } - return DefaultGenericMqttClient.newInstance(subscribingClient, publishingClient, hiveMqttClientProperties); - } - - private BaseGenericMqttSubscribingClient getSubscribingClientForMqtt5() { - final var clientRole = ClientRole.CONSUMER; - return BaseGenericMqttSubscribingClient.ofMqtt5RxClient( - HiveMqttClientFactory.getMqtt5Client(hiveMqttClientProperties, - subscribingClientIdFactory.getMqttClientIdentifier(), - false, - clientRole - ).toRx(), - clientRole - ); - } - - private BaseGenericMqttPublishingClient getPublishingClientForMqtt5( - ) { - final var clientRole = ClientRole.PUBLISHER; - return BaseGenericMqttPublishingClient.ofMqtt5AsyncClient( - HiveMqttClientFactory.getMqtt5Client(hiveMqttClientProperties, - publishingClientIdFactory.getMqttClientIdentifier(), - !forcefullyDisableLastWill, - clientRole - ).toAsync(), - clientRole - ); - } - - } - -} diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/GenericMqttClient.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/GenericMqttClient.java index f3afde2232..e72d421202 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/GenericMqttClient.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/GenericMqttClient.java @@ -39,4 +39,22 @@ public interface GenericMqttClient */ CompletionStage connect(); + /** + * Disconnects the specified role of this client. + * Based on configuration this may trigger an automatic reconnect of that role. + *

+ * It might happen, that more roles than the specified one will be disconnected, for example, if configuration + * states that no separate publisher client should be used because in this case consuming and publishing is + * performed by the same HiveMQ MQTT client. + * + * @param clientRole the role of the client to be disconnected. + * @return a CompletionStage which + *

    + *
  • completes when the client role was successfully disconnected or
  • + *
  • completes exceptionally if the client role did not disconnect gracefully
  • + *
+ * @throws NullPointerException if {@code clientRole} is {@code null}. + */ + CompletionStage disconnectClientRole(ClientRole clientRole); + } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/GenericMqttClientFactory.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/GenericMqttClientFactory.java index 6f6e2da5de..b0ba60a1e6 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/GenericMqttClientFactory.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/GenericMqttClientFactory.java @@ -12,10 +12,28 @@ */ package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client; +import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull; + +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.connectivity.model.Connection; +import org.eclipse.ditto.connectivity.model.ConnectionType; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig; + +import com.hivemq.client.mqtt.mqtt3.Mqtt3AsyncClient; +import com.hivemq.client.mqtt.mqtt3.Mqtt3RxClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5AsyncClient; +import com.hivemq.client.mqtt.mqtt5.Mqtt5RxClient; + /** * Factory for creating instances of {@link GenericMqttClient}. */ -public interface GenericMqttClientFactory { +@Immutable +public final class GenericMqttClientFactory { + + private GenericMqttClientFactory() { + throw new AssertionError(); + } /** * Returns an instance of {@link GenericMqttClient} for use in production. @@ -24,7 +42,27 @@ public interface GenericMqttClientFactory { * @return the new {@code GenericMqttClient}. * @throws NullPointerException if {@code hiveMqttClientProperties} is {@code null}. */ - GenericMqttClient getProductiveGenericMqttClient(HiveMqttClientProperties hiveMqttClientProperties); + public static GenericMqttClient getProductiveGenericMqttClient(final HiveMqttClientProperties hiveMqttClientProperties) { + return getGenericMqttClient(hiveMqttClientProperties, false); + } + + private static GenericMqttClient getGenericMqttClient(final HiveMqttClientProperties hiveMqttClientProperties, + final boolean forcefullyDisableLastWill) { + + checkNotNull(hiveMqttClientProperties, "hiveMqttClientProperties"); + final GenericMqttClient result; + final var factory = new FactoryImplementation(hiveMqttClientProperties, forcefullyDisableLastWill); + if (isMqtt3ProtocolVersion(hiveMqttClientProperties.getMqttConnection())) { + result = factory.getGenericMqttClientForMqtt3(); + } else { + result = factory.getGenericMqttClientForMqtt5(); + } + return result; + } + + private static boolean isMqtt3ProtocolVersion(final Connection mqttConnection) { + return ConnectionType.MQTT == mqttConnection.getConnectionType(); + } /** * Returns an instance of {@link GenericMqttClient} for testing a connection. @@ -33,6 +71,125 @@ public interface GenericMqttClientFactory { * @return the new {@code GenericMqttClient}. * @throws NullPointerException if {@code hiveMqttClientProperties} is {@code null}. */ - GenericMqttClient getGenericMqttClientForConnectionTesting(HiveMqttClientProperties hiveMqttClientProperties); + public static GenericMqttClient getGenericMqttClientForConnectionTesting( + final HiveMqttClientProperties hiveMqttClientProperties + ) { + return getGenericMqttClient(hiveMqttClientProperties, true); + } + + private static final class FactoryImplementation { + + private final HiveMqttClientProperties hiveMqttClientProperties; + private final MqttSpecificConfig mqttSpecificConfig; + private final MqttClientIdentifierFactory subscribingClientIdFactory; + private final MqttClientIdentifierFactory publishingClientIdFactory; + private final boolean forcefullyDisableLastWill; + + private FactoryImplementation(final HiveMqttClientProperties hiveMqttClientProperties, + final boolean forcefullyDisableLastWill) { + + this.hiveMqttClientProperties = hiveMqttClientProperties; + mqttSpecificConfig = hiveMqttClientProperties.getMqttSpecificConfig(); + subscribingClientIdFactory = MqttClientIdentifierFactory.forSubscribingClient(hiveMqttClientProperties); + publishingClientIdFactory = MqttClientIdentifierFactory.forPublishingClient(hiveMqttClientProperties); + this.forcefullyDisableLastWill = forcefullyDisableLastWill; + } + + private GenericMqttClient getGenericMqttClientForMqtt3() { + final BaseGenericMqttSubscribingClient subscribingClient; + final BaseGenericMqttPublishingClient publishingClient; + if (mqttSpecificConfig.separatePublisherClient()) { + + // Create separate HiveMQ MQTT client instance for subscribing client and publishing client. + subscribingClient = getSubscribingClientForMqtt3(); + publishingClient = getPublishingClientForMqtt3(); + } else { + + // Re-use same HiveMQ MQTT client instance for subscribing client and publishing client. + final var clientRole = ClientRole.CONSUMER_PUBLISHER; + final var mqtt3Client = HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties, + subscribingClientIdFactory.getMqttClientIdentifier(), + !forcefullyDisableLastWill, + clientRole); + subscribingClient = BaseGenericMqttSubscribingClient.ofMqtt3RxClient(mqtt3Client.toRx(), clientRole); + publishingClient = + BaseGenericMqttPublishingClient.ofMqtt3AsyncClient(mqtt3Client.toAsync(), clientRole); + } + return DefaultGenericMqttClient.newInstance(subscribingClient, publishingClient, hiveMqttClientProperties); + } + + private BaseGenericMqttSubscribingClient getSubscribingClientForMqtt3() { + final var clientRole = ClientRole.CONSUMER; + return BaseGenericMqttSubscribingClient.ofMqtt3RxClient( + HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties, + subscribingClientIdFactory.getMqttClientIdentifier(), + false, + clientRole + ).toRx(), + clientRole + ); + } + + private BaseGenericMqttPublishingClient getPublishingClientForMqtt3() { + final var clientRole = ClientRole.PUBLISHER; + return BaseGenericMqttPublishingClient.ofMqtt3AsyncClient( + HiveMqttClientFactory.getMqtt3Client(hiveMqttClientProperties, + publishingClientIdFactory.getMqttClientIdentifier(), + !forcefullyDisableLastWill, + clientRole + ).toAsync(), + clientRole + ); + } + + private GenericMqttClient getGenericMqttClientForMqtt5() { + final BaseGenericMqttSubscribingClient subscribingClient; + final BaseGenericMqttPublishingClient publishingClient; + if (mqttSpecificConfig.separatePublisherClient()) { + + // Create separate HiveMQ MQTT client instance for subscribing client and publishing client. + subscribingClient = getSubscribingClientForMqtt5(); + publishingClient = getPublishingClientForMqtt5(); + } else { + + // Re-use same HiveMQ MQTT client instance for subscribing client and publishing client. + final var clientRole = ClientRole.CONSUMER_PUBLISHER; + final var mqtt5Client = HiveMqttClientFactory.getMqtt5Client(hiveMqttClientProperties, + subscribingClientIdFactory.getMqttClientIdentifier(), + !forcefullyDisableLastWill, + clientRole); + subscribingClient = BaseGenericMqttSubscribingClient.ofMqtt5RxClient(mqtt5Client.toRx(), clientRole); + publishingClient = + BaseGenericMqttPublishingClient.ofMqtt5AsyncClient(mqtt5Client.toAsync(), clientRole); + } + return DefaultGenericMqttClient.newInstance(subscribingClient, publishingClient, hiveMqttClientProperties); + } + + private BaseGenericMqttSubscribingClient getSubscribingClientForMqtt5() { + final var clientRole = ClientRole.CONSUMER; + return BaseGenericMqttSubscribingClient.ofMqtt5RxClient( + HiveMqttClientFactory.getMqtt5Client(hiveMqttClientProperties, + subscribingClientIdFactory.getMqttClientIdentifier(), + false, + clientRole + ).toRx(), + clientRole + ); + } + + private BaseGenericMqttPublishingClient getPublishingClientForMqtt5( + ) { + final var clientRole = ClientRole.PUBLISHER; + return BaseGenericMqttPublishingClient.ofMqtt5AsyncClient( + HiveMqttClientFactory.getMqtt5Client(hiveMqttClientProperties, + publishingClientIdFactory.getMqttClientIdentifier(), + !forcefullyDisableLastWill, + clientRole + ).toAsync(), + clientRole + ); + } + + } } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientProperties.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientProperties.java index f9933428f6..b2c58d6fa7 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientProperties.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientProperties.java @@ -46,7 +46,7 @@ private HiveMqttClientProperties(final Builder builder) { mqttConnection = builder.mqttConnection; connectivityConfig = builder.connectivityConfig; mqttConfig = connectivityConfig.getConnectionConfig().getMqttConfig(); - mqttSpecificConfig = MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig); + mqttSpecificConfig = builder.mqttSpecificConfig; sshTunnelStateSupplier = builder.sshTunnelStateSupplier; connectionLogger = builder.connectionLogger; mqttClientConnectedListener = builder.mqttClientConnectedListener; @@ -118,6 +118,7 @@ public ConnectivityConfig getConnectivityConfig() { private static final class Builder implements HiveMqttClientPropertiesStepBuilder.MqttConnectionStep, HiveMqttClientPropertiesStepBuilder.ConnectivityConfigStep, + HiveMqttClientPropertiesStepBuilder.MqttSpecificConfigStep, HiveMqttClientPropertiesStepBuilder.SshTunnelStateSupplierStep, HiveMqttClientPropertiesStepBuilder.ConnectionLoggerStep, HiveMqttClientPropertiesStepBuilder.ActorUuidStep, @@ -125,6 +126,7 @@ private static final class Builder implements HiveMqttClientPropertiesStepBuilde private Connection mqttConnection; private ConnectivityConfig connectivityConfig; + private MqttSpecificConfig mqttSpecificConfig; private Supplier sshTunnelStateSupplier; private ConnectionLogger connectionLogger; private GenericMqttClientConnectedListener mqttClientConnectedListener; @@ -134,6 +136,7 @@ private static final class Builder implements HiveMqttClientPropertiesStepBuilde private Builder() { mqttConnection = null; connectivityConfig = null; + mqttSpecificConfig = null; sshTunnelStateSupplier = null; connectionLogger = null; mqttClientConnectedListener = (context, clientRole) -> {/* Do nothing.*/}; @@ -151,13 +154,21 @@ public HiveMqttClientPropertiesStepBuilder.ConnectivityConfigStep withMqttConnec } @Override - public HiveMqttClientPropertiesStepBuilder.SshTunnelStateSupplierStep withConnectivityConfig( + public HiveMqttClientPropertiesStepBuilder.MqttSpecificConfigStep withConnectivityConfig( final ConnectivityConfig connectivityConfig ) { this.connectivityConfig = checkNotNull(connectivityConfig, "connectivityConfig"); return this; } + @Override + public HiveMqttClientPropertiesStepBuilder.SshTunnelStateSupplierStep withMqttSpecificConfig( + final MqttSpecificConfig mqttSpecificConfig + ) { + this.mqttSpecificConfig = checkNotNull(mqttSpecificConfig, "mqttSpecificConfig"); + return this; + } + @Override public HiveMqttClientPropertiesStepBuilder.ConnectionLoggerStep withSshTunnelStateSupplier( final Supplier sshTunnelStateSupplier diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientPropertiesStepBuilder.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientPropertiesStepBuilder.java index 8c44a51d7a..d7f3df0b7e 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientPropertiesStepBuilder.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientPropertiesStepBuilder.java @@ -19,6 +19,7 @@ import org.eclipse.ditto.connectivity.model.ConnectionType; import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig; import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState; /** @@ -57,7 +58,23 @@ interface ConnectivityConfigStep { * @return the next builder step. * @throws NullPointerException if {@code connectivityConfig} is {@code null}. */ - SshTunnelStateSupplierStep withConnectivityConfig(ConnectivityConfig connectivityConfig); + MqttSpecificConfigStep withConnectivityConfig(ConnectivityConfig connectivityConfig); + + } + + /** + * Builder step for setting the {@link MqttSpecificConfig}. + */ + interface MqttSpecificConfigStep { + + /** + * Sets the MQTT specific config to this builder. + * + * @param mqttSpecificConfig the specific MQTT config to be set. + * @return the next builder step. + * @throws NullPointerException if {@code mqttSpecificConfig} is {@code null}. + */ + SshTunnelStateSupplierStep withMqttSpecificConfig(MqttSpecificConfig mqttSpecificConfig); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttClientActorControl.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttClientActorControl.java deleted file mode 100644 index 7f35cffe86..0000000000 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttClientActorControl.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2022 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming; - -/** - * Messages that enable MQTT consumer actors for controlling their MQTT client actor. - */ -public enum MqttClientActorControl { - - /** - * Message to trigger consumer client restart via its parent MQTT client actor. - */ - RECONNECT_CONSUMER_CLIENT, - -} diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttConsumerActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttConsumerActor.java index c0591ba84c..0b1a3d1091 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttConsumerActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttConsumerActor.java @@ -311,7 +311,7 @@ private void rejectIncomingMessage(final boolean shouldRedeliver, private void reconnectConsumerClientForRedelivery() { final var context = getContext(); final var parent = context.getParent(); - parent.tell(MqttClientActorControl.RECONNECT_CONSUMER_CLIENT, getSelf()); + parent.tell(ReconnectConsumerClient.of(mqttSpecificConfig.getReconnectForDeliveryDelayNg()), getSelf()); } @Override diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/ReconnectConsumerClient.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/ReconnectConsumerClient.java new file mode 100644 index 0000000000..7c0e1d7b22 --- /dev/null +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/ReconnectConsumerClient.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2022 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming; + +import java.util.Objects; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; + +import org.eclipse.ditto.base.model.common.ConditionChecker; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.ReconnectDelay; + +/** + * Message that triggers reconnection of the consumer client after a particular delay. + */ +@Immutable +public final class ReconnectConsumerClient { + + private final ReconnectDelay reconnectDelay; + + private ReconnectConsumerClient(final ReconnectDelay reconnectDelay) { + this.reconnectDelay = reconnectDelay; + } + + /** + * Returns an instance of {@code ReconnectConsumerClient} with the specified {@code ReconnectDelay} argument. + * + * @param reconnectDelay the delay how long to wait before reconnecting a consumer client for redelivery. + * @return the instance. + * @throws NullPointerException if {@code reconnectDelay} is {@code null}. + */ + static ReconnectConsumerClient of(final ReconnectDelay reconnectDelay) { + return new ReconnectConsumerClient(ConditionChecker.checkNotNull(reconnectDelay, "reconnectDelay")); + } + + /** + * Returns the delay how long to wait before reconnecting a consumer client for redelivery. + * + * @return the delay for reconnecting the consumer client. + */ + public ReconnectDelay getReconnectDelay() { + return reconnectDelay; + } + + @Override + public boolean equals(@Nullable final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final var that = (ReconnectConsumerClient) o; + return Objects.equals(reconnectDelay, that.reconnectDelay); + } + + @Override + public int hashCode() { + return Objects.hash(reconnectDelay); + } + + @Override + public String toString() { + return getClass().getSimpleName() + " [" + + "reconnectDelay=" + reconnectDelay + + "]"; + } + +} diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/MqttSpecificConfigTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/MqttSpecificConfigTest.java index 35e5f82781..6ed84bfa2d 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/MqttSpecificConfigTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/MqttSpecificConfigTest.java @@ -71,7 +71,8 @@ public void parseMqttSpecificConfig() throws IllegalKeepAliveIntervalSecondsExce assertThat(specificConfig.separatePublisherClient()).isFalse(); assertThat(specificConfig.getMqttClientId()).contains("consumer-client-id"); assertThat(specificConfig.getMqttPublisherId()).contains("publisher-client-id"); - assertThat(specificConfig.getReconnectForDeliveryDelay()).isEqualTo(Duration.ofMinutes(4L)); + assertThat(specificConfig.getReconnectForDeliveryDelayNg()) + .isEqualTo(ReconnectDelay.ofOrLowerBoundary(Duration.ofMinutes(4L))); assertThat(specificConfig.getKeepAliveIntervalOrDefault()) .isEqualTo(KeepAliveInterval.of(Duration.ofSeconds(30L))); @@ -90,7 +91,8 @@ public void defaultConfig() throws IllegalKeepAliveIntervalSecondsException { assertThat(specificConfig.separatePublisherClient()).isFalse(); assertThat(specificConfig.getMqttClientId()).isEmpty(); assertThat(specificConfig.getMqttPublisherId()).isEmpty(); - assertThat(specificConfig.getReconnectForDeliveryDelay()).isEqualTo(Duration.ofSeconds(2L)); + assertThat(specificConfig.getReconnectForDeliveryDelayNg()) + .isEqualTo(ReconnectDelay.ofOrLowerBoundary(Duration.ofSeconds(2L))); assertThat(specificConfig.getKeepAliveIntervalOrDefault()).isEqualTo(KeepAliveInterval.defaultKeepAlive()); assertThat(specificConfig.getMqttLastWillTopic()).isEmpty(); assertThat(specificConfig.getLastWillQosOrThrow()).isEqualTo(MqttSpecificConfig.DEFAULT_LAST_WILL_QOS); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/ReconnectDelayTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/ReconnectDelayTest.java new file mode 100644 index 0000000000..73b5014310 --- /dev/null +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/ReconnectDelayTest.java @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2022 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.connectivity.service.messaging.mqtt; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNullPointerException; +import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf; +import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable; + +import java.time.Duration; + +import org.junit.Test; + +import nl.jqno.equalsverifier.EqualsVerifier; + +/** + * Unit test for {@link ReconnectDelay}. + */ +public final class ReconnectDelayTest { + + @Test + public void assertImmutability() { + assertInstancesOf(ReconnectDelay.class, areImmutable()); + } + + @Test + public void testHashCodeAndEquals() { + EqualsVerifier.forClass(ReconnectDelay.class) + .usingGetClass() + .verify(); + } + + @Test + public void ofOrLowerBoundaryWithNullDurationThrowsException() { + assertThatNullPointerException() + .isThrownBy(() -> ReconnectDelay.ofOrLowerBoundary(null)) + .withMessage("The duration must not be null!") + .withNoCause(); + } + + @Test + public void ofOrLowerBoundaryReturnsReconnectDelayWithLowerBoundIfSpecifiedArgumentIsBelow() { + final var reconnectDelay = ReconnectDelay.ofOrLowerBoundary(Duration.ofMillis(999L)); + + assertThat(reconnectDelay.getDuration()).isEqualTo(ReconnectDelay.LOWER_BOUNDARY); + } + + @Test + public void ofOrLowerBoundaryReturnsReconnectDelayWithSpecifiedArgument() { + final var duration = Duration.ofMillis(1_001L); + final var reconnectDelay = ReconnectDelay.ofOrLowerBoundary(duration); + + assertThat(reconnectDelay.getDuration()).isEqualTo(duration); + } + + @Test + public void compareToWorksAsExpected() { + final var reconnectDelayOneSecond = ReconnectDelay.ofOrLowerBoundary(Duration.ofSeconds(1L)); + final var reconnectDelayTwoSeconds = ReconnectDelay.ofOrLowerBoundary(Duration.ofSeconds(2L)); + + assertThat(reconnectDelayOneSecond).isEqualByComparingTo(reconnectDelayOneSecond); + assertThat(reconnectDelayOneSecond).isLessThan(reconnectDelayTwoSeconds); + assertThat(reconnectDelayTwoSeconds).isGreaterThan(reconnectDelayOneSecond); + } + + @Test + public void toStringReturnsStringRepresentationOfWrappedDuration() { + final var duration = Duration.ofMillis(1_001L); + final var reconnectDelay = ReconnectDelay.ofOrLowerBoundary(duration); + + assertThat(reconnectDelay).hasToString(duration.toString()); + } + +} \ No newline at end of file diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/ConnectionTesterTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/ConnectionTesterTest.java index 8cf683835f..a551cb00a5 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/ConnectionTesterTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/ConnectionTesterTest.java @@ -34,21 +34,22 @@ import org.eclipse.ditto.connectivity.service.messaging.ChildActorNanny; import org.eclipse.ditto.connectivity.service.messaging.ConnectivityStatusResolver; import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.AllSubscriptionsFailedException; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClient; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.GenericMqttClientFactory; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.HiveMqttClientProperties; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.NoMqttConnectionException; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.connect.GenericMqttConnect; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttClientConnectException; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttSubscribeException; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.NoMqttConnectionException; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SomeSubscriptionsFailedException; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SubscriptionStatus; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming.MqttConsumerActor; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing.MqttPublisherActor; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.AllSubscriptionsFailedException; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.connect.GenericMqttConnect; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.message.subscribe.GenericMqttSubAckStatus; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.MqttSubscribeException; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.publishing.MqttPublisherActor; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.MqttSubscriber; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SomeSubscriptionsFailedException; import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.subscribing.SubscribeResult; -import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.client.SubscriptionStatus; import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState; import org.eclipse.ditto.internal.utils.akka.ActorSystemResource; import org.junit.Before; @@ -56,6 +57,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @@ -85,8 +87,9 @@ public final class ConnectionTesterTest { @Rule public final TestNameCorrelationId testNameCorrelationId = TestNameCorrelationId.newInstance(); + @Mock private static MockedStatic genericMqttClientFactory; + @Mock private GenericMqttClient genericMqttClient; - @Mock private GenericMqttClientFactory genericMqttClientFactory; @Mock private Connection connection; @Mock private ConnectivityConfig connectivityConfig; @Mock private ConnectionLogger connectionLogger; @@ -110,6 +113,7 @@ public void before() throws NoMqttConnectionException { hiveMqttClientProperties = HiveMqttClientProperties.builder() .withMqttConnection(connection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(connection, mqttConfig)) .withSshTunnelStateSupplier(SshTunnelState::disabled) .withConnectionLogger(connectionLogger) .withActorUuid(UUID.randomUUID()) @@ -118,31 +122,16 @@ public void before() throws NoMqttConnectionException { Mockito.when(genericMqttClient.connect(Mockito.any(GenericMqttConnect.class))) .thenReturn(CompletableFuture.completedFuture(null)); - Mockito.when(genericMqttClientFactory.getGenericMqttClientForConnectionTesting(Mockito.any())) + genericMqttClientFactory.when(() -> GenericMqttClientFactory.getGenericMqttClientForConnectionTesting(Mockito.any())) .thenReturn(genericMqttClient); childActorNanny = ChildActorNanny.newInstance(actorRefFactory, Mockito.mock(LoggingAdapter.class)); } - @Test - public void buildInstanceWithNullGenericMqttClientFactoryThrowsException() { - assertThatNullPointerException() - .isThrownBy(() -> ConnectionTester.builder() - .withHiveMqttClientProperties(hiveMqttClientProperties) - .withInboundMappingSink(inboundMappingSink) - .withConnectivityStatusResolver(connectivityStatusResolver) - .withChildActorNanny(childActorNanny) - .withActorSystemProvider(actorSystemResource.getActorSystem()) - .build()) - .withMessage("The genericMqttClientFactory must not be null!") - .withNoCause(); - } - @Test public void buildInstanceWithNullHiveMqttClientPropertiesThrowsException() { assertThatNullPointerException() .isThrownBy(() -> ConnectionTester.builder() - .withGenericMqttClientFactory(genericMqttClientFactory) .withInboundMappingSink(inboundMappingSink) .withConnectivityStatusResolver(connectivityStatusResolver) .withChildActorNanny(childActorNanny) @@ -156,7 +145,6 @@ public void buildInstanceWithNullHiveMqttClientPropertiesThrowsException() { public void buildInstanceWithNullInboundMappingSinkThrowsException() { assertThatNullPointerException() .isThrownBy(() -> ConnectionTester.builder() - .withGenericMqttClientFactory(genericMqttClientFactory) .withHiveMqttClientProperties(hiveMqttClientProperties) .withConnectivityStatusResolver(connectivityStatusResolver) .withChildActorNanny(childActorNanny) @@ -170,7 +158,6 @@ public void buildInstanceWithNullInboundMappingSinkThrowsException() { public void buildInstanceWithNullConnectivityStatusResolverThrowsException() { assertThatNullPointerException() .isThrownBy(() -> ConnectionTester.builder() - .withGenericMqttClientFactory(genericMqttClientFactory) .withHiveMqttClientProperties(hiveMqttClientProperties) .withInboundMappingSink(inboundMappingSink) .withChildActorNanny(childActorNanny) @@ -184,7 +171,6 @@ public void buildInstanceWithNullConnectivityStatusResolverThrowsException() { public void buildInstanceWithNullChildActorNannyThrowsException() { assertThatNullPointerException() .isThrownBy(() -> ConnectionTester.builder() - .withGenericMqttClientFactory(genericMqttClientFactory) .withHiveMqttClientProperties(hiveMqttClientProperties) .withInboundMappingSink(inboundMappingSink) .withConnectivityStatusResolver(connectivityStatusResolver) @@ -198,7 +184,6 @@ public void buildInstanceWithNullChildActorNannyThrowsException() { public void buildInstanceWithNullSystemProviderThrowsException() { assertThatNullPointerException() .isThrownBy(() -> ConnectionTester.builder() - .withGenericMqttClientFactory(genericMqttClientFactory) .withHiveMqttClientProperties(hiveMqttClientProperties) .withInboundMappingSink(inboundMappingSink) .withConnectivityStatusResolver(connectivityStatusResolver) @@ -211,10 +196,9 @@ public void buildInstanceWithNullSystemProviderThrowsException() { @Test public void testConnectionWhenGettingGenericMqttClientFailsReturnsFailureStatus() { final var error = new IllegalArgumentException("Some argument is invalid."); - Mockito.when(genericMqttClientFactory.getGenericMqttClientForConnectionTesting(Mockito.any())).thenThrow(error); + Mockito.when(GenericMqttClientFactory.getGenericMqttClientForConnectionTesting(Mockito.any())).thenThrow(error); final var underTest = ConnectionTester.builder() - .withGenericMqttClientFactory(genericMqttClientFactory) .withHiveMqttClientProperties(hiveMqttClientProperties) .withInboundMappingSink(inboundMappingSink) .withConnectivityStatusResolver(connectivityStatusResolver) @@ -251,7 +235,6 @@ public void testConnectionWhenConnectingGenericMqttClientFailsReturnsFailureStat private ConnectionTester getDefaultConnectionTester() { return ConnectionTester.builder() - .withGenericMqttClientFactory(genericMqttClientFactory) .withHiveMqttClientProperties(hiveMqttClientProperties) .withInboundMappingSink(inboundMappingSink) .withConnectivityStatusResolver(connectivityStatusResolver) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/GenericMqttClientActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/GenericMqttClientActorTest.java index 27f1bf5c61..2cc1bac985 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/GenericMqttClientActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/GenericMqttClientActorTest.java @@ -75,6 +75,7 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @@ -87,13 +88,18 @@ import akka.actor.Props; import akka.actor.Status; import akka.http.javadsl.model.Uri; +import akka.testkit.TestActorRef; import akka.testkit.javadsl.TestKit; import io.reactivex.Flowable; import io.reactivex.Single; /** - * Unit test for {@link GenericMqttClientActor} + * Unit test for {@link GenericMqttClientActor}. */ +// It is crucial to use `TestActorRef` for the GenericMqttClientActor. +// This ensures that the actor runs in the same thread as the tests. +// The same thread is necessary because otherwise Mockito's static mocking +// of `GenericMqttClientFactory` would not work. @RunWith(MockitoJUnitRunner.class) public final class GenericMqttClientActorTest extends AbstractBaseClientActorTest { @@ -119,20 +125,16 @@ public final class GenericMqttClientActorTest extends AbstractBaseClientActorTes .topics(Topic.TWIN_EVENTS) .build(); - private static Supplier mqttConnectionTypeSupplier; - private static final Status.Success CONNECTED_SUCCESS = new Status.Success(BaseClientState.CONNECTED); private static final Status.Success DISCONNECTED_SUCCESS = new Status.Success(BaseClientState.DISCONNECTED); - @Rule - public final ActorSystemResource actorSystemResource = ActorSystemResource.newInstance(TestConstants.CONFIG); - @Rule - public final TestNameCorrelationId testNameCorrelationId = TestNameCorrelationId.newInstance(); + private static Supplier mqttConnectionTypeSupplier; + @Mock private static MockedStatic genericMqttClientFactory; - @Mock - private GenericMqttClient genericMqttClient; - @Mock - private GenericMqttClientFactory genericMqttClientFactory; + @Rule public final ActorSystemResource actorSystemResource = ActorSystemResource.newInstance(TestConstants.CONFIG); + @Rule public final TestNameCorrelationId testNameCorrelationId = TestNameCorrelationId.newInstance(); + + @Mock private GenericMqttClient genericMqttClient; private TestKit proxyActor; private TestKit connectionActor; @@ -174,9 +176,9 @@ private void enableGenericMqttClientMethodStubbing() { } private void enableGenericMqttClientFactoryMethodStubbing() { - Mockito.when(genericMqttClientFactory.getProductiveGenericMqttClient(Mockito.any())) + genericMqttClientFactory.when(() -> GenericMqttClientFactory.getProductiveGenericMqttClient(Mockito.any())) .thenReturn(genericMqttClient); - Mockito.when(genericMqttClientFactory.getGenericMqttClientForConnectionTesting(Mockito.any())) + genericMqttClientFactory.when(() -> GenericMqttClientFactory.getGenericMqttClientForConnectionTesting(Mockito.any())) .thenReturn(genericMqttClient); } @@ -201,8 +203,7 @@ protected Props createClientActor(final ActorRef proxyActor, final Connection co proxyActor, connectionActor.getRef(), DittoHeaders.empty(), - ConfigFactory.empty(), - genericMqttClientFactory); + ConfigFactory.empty()); } @Override @@ -212,13 +213,15 @@ protected ActorSystem getActorSystem() { @Test public void openAndCloseConnection() { - final var testKit = actorSystemResource.newTestKit(); + final var underTest = TestActorRef.apply(createClientActor(proxyActor.getRef(), + ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .connectionStatus(ConnectivityStatus.CLOSED) + .specificConfig(Map.of("separatePublisherClient", "false")) + .build()), + actorSystemResource.getActorSystem()); + final var dittoHeaders = getDittoHeadersWithCorrelationId(); - final var underTest = testKit.childActorOf(createClientActor(proxyActor.getRef(), - ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) - .connectionStatus(ConnectivityStatus.CLOSED) - .specificConfig(Map.of("separatePublisherClient", "false")) - .build())); + final var testKit = actorSystemResource.newTestKit(); underTest.tell(OpenConnection.of(CONNECTION_ID, dittoHeaders), testKit.getRef()); @@ -235,10 +238,14 @@ private DittoHeaders getDittoHeadersWithCorrelationId() { @Test public void testConnectionIsSuccessful() { + final var connection = ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .connectionStatus(ConnectivityStatus.CLOSED) + .build(); final var testKit = actorSystemResource.newTestKit(); - final var connection = getConnection(false); - final var underTest = - testKit.watch(testKit.childActorOf(createClientActor(proxyActor.getRef(), connection))); + final var underTest = testKit.watch(TestActorRef.apply( + createClientActor(proxyActor.getRef(), connection), + actorSystemResource.getActorSystem() + )); underTest.tell(TestConnection.of(connection, getDittoHeadersWithCorrelationId()), testKit.getRef()); @@ -252,10 +259,14 @@ public void testConnectionIsSuccessful() { public void testConnectionFails() { final var mqttClientConnectException = new MqttClientConnectException("Failed to connect.", null); Mockito.when(genericMqttClient.connect(Mockito.any())).thenThrow(mqttClientConnectException); + final var connection = ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .connectionStatus(ConnectivityStatus.CLOSED) + .build(); final var testKit = actorSystemResource.newTestKit(); - final var connection = getConnection(false); - final var underTest = - testKit.watch(testKit.childActorOf(createClientActor(proxyActor.getRef(), connection))); + final var underTest = testKit.watch(TestActorRef.apply( + createClientActor(proxyActor.getRef(), connection), + actorSystemResource.getActorSystem() + )); underTest.tell(TestConnection.of(connection, getDittoHeadersWithCorrelationId()), testKit.getRef()); @@ -274,13 +285,14 @@ public void testConnectionFails() { public void subscribeFails() { final var mqttSubscribeException = new MqttSubscribeException("Quisquam omnis in quia hic et libero.", null); Mockito.when(genericMqttClient.subscribe(Mockito.any())).thenReturn(Single.error(mqttSubscribeException)); + final var underTest = TestActorRef.apply( + createClientActor(proxyActor.getRef(), + ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .connectionStatus(ConnectivityStatus.CLOSED) + .build()), + actorSystemResource.getActorSystem() + ); final var testKit = actorSystemResource.newTestKit(); - final var underTest = testKit.childActorOf(createClientActor( - proxyActor.getRef(), - ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) - .connectionStatus(ConnectivityStatus.CLOSED) - .build() - )); underTest.tell(OpenConnection.of(CONNECTION_ID, getDittoHeadersWithCorrelationId()), testKit.getRef()); @@ -292,22 +304,25 @@ public void subscribeFails() { @Test public void consumeFromTopicAndRetrieveConnectionMetrics() { - final var testKit = actorSystemResource.newTestKit(); enableSubscribingAndConsumingMethodStubbing(getMqttPublish(SOURCE_ADDRESS, getSerializedModifyThingCommand())); - final var underTest = testKit.childActorOf(createClientActor( - proxyActor.getRef(), - ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) - .sources(List.of(ConnectivityModelFactory.newSourceBuilder() - .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) - .index(2) - .consumerCount(1) - .address("topic1") - .address("topic2") - .qos(1) - .build())) - .build()) + final var underTest = TestActorRef.apply( + createClientActor( + proxyActor.getRef(), + ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .sources(List.of(ConnectivityModelFactory.newSourceBuilder() + .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) + .index(2) + .consumerCount(1) + .address("topic1") + .address("topic2") + .qos(1) + .build())) + .build() + ), + actorSystemResource.getActorSystem() ); final var dittoHeadersWithCorrelationId = getDittoHeadersWithCorrelationId(); + final var testKit = actorSystemResource.newTestKit(); testKit.expectNoMessage(); final var modifyThing = proxyActor.expectMsgClass(ModifyThing.class); @@ -365,7 +380,6 @@ private void enableSubscribingAndConsumingMethodStubbing(final GenericMqttPublis @Test public void consumeFromTopicWithSourceHeaderMapping() { enableSubscribingAndConsumingMethodStubbing(getMqttPublish(SOURCE_ADDRESS, getSerializedModifyThingCommand())); - final var testKit = actorSystemResource.newTestKit(); final var connection = getConnection(false); final var headerMapping = ConnectivityModelFactory.newHeaderMapping(Map.of( MqttHeader.MQTT_TOPIC.getName(), getHeaderPlaceholder(MqttHeader.MQTT_TOPIC.getName()), @@ -375,16 +389,18 @@ public void consumeFromTopicWithSourceHeaderMapping() { "custom.qos", getHeaderPlaceholder(MqttHeader.MQTT_QOS.getName()), "custom.retain", getHeaderPlaceholder(MqttHeader.MQTT_RETAIN.getName()) )); - testKit.childActorOf(createClientActor( - proxyActor.getRef(), - ConnectivityModelFactory.newConnectionBuilder(connection) - .setSources(connection.getSources() - .stream() - .map(ConnectivityModelFactory::newSourceBuilder) - .map(sourceBuilder -> sourceBuilder.headerMapping(headerMapping)) - .map(SourceBuilder::build) - .collect(Collectors.toList())) - .build()) + TestActorRef.apply( + createClientActor( + proxyActor.getRef(), + ConnectivityModelFactory.newConnectionBuilder(connection) + .setSources(connection.getSources() + .stream() + .map(ConnectivityModelFactory::newSourceBuilder) + .map(sourceBuilder -> sourceBuilder.headerMapping(headerMapping)) + .map(SourceBuilder::build) + .collect(Collectors.toList())) + .build()), + actorSystemResource.getActorSystem() ); final var modifyThing = proxyActor.expectMsgClass(ModifyThing.class); @@ -407,22 +423,23 @@ private static String getHeaderPlaceholder(final String headerName) { public void consumeFromTopicWithIdEnforcement() { enableSubscribingAndConsumingMethodStubbing(getMqttPublish("eclipse/ditto/thing", getSerializedModifyThingCommand())); - final var testKit = actorSystemResource.newTestKit(); - testKit.childActorOf(createClientActor( - proxyActor.getRef(), - ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) - .setSources(List.of(ConnectivityModelFactory.newSourceBuilder() - .addresses(Set.of("eclipse/+/+")) - .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) - .consumerCount(1) - .enforcement(ConnectivityModelFactory.newSourceAddressEnforcement( - "eclipse/{{ thing:namespace }}/{{ thing:name }}" - )) - .replyTarget(ReplyTarget.newBuilder().address("{{ header:reply-to }}").build()) - .qos(1) - .build())) - .build() - )); + TestActorRef.apply( + createClientActor( + proxyActor.getRef(), + ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .setSources(List.of(ConnectivityModelFactory.newSourceBuilder() + .addresses(Set.of("eclipse/+/+")) + .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) + .consumerCount(1) + .enforcement(ConnectivityModelFactory.newSourceAddressEnforcement( + "eclipse/{{ thing:namespace }}/{{ thing:name }}" + )) + .replyTarget(ReplyTarget.newBuilder().address("{{ header:reply-to }}").build()) + .qos(1) + .build())) + .build() + ), + actorSystemResource.getActorSystem()); proxyActor.expectMsgClass(ModifyThing.class); } @@ -431,21 +448,23 @@ public void consumeFromTopicWithIdEnforcement() { public void consumeFromTopicWithIdEnforcementExpectErrorResponse() { enableSubscribingAndConsumingMethodStubbing(getMqttPublish("eclipse/invalid/address", getSerializedModifyThingCommand())); - final var testKit = actorSystemResource.newTestKit(); - testKit.childActorOf(createClientActor( - proxyActor.getRef(), - ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) - .setSources(List.of(ConnectivityModelFactory.newSourceBuilder() - .addresses(Set.of("eclipse/+/+")) - .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) - .consumerCount(1) - .enforcement(ConnectivityModelFactory.newSourceAddressEnforcement( - "eclipse/{{ thing:namespace }}/{{ thing:name }}" - )) - .replyTarget(ReplyTarget.newBuilder().address("{{ header:reply-to }}").build()) - .qos(1) - .build())) - .build()) + TestActorRef.apply( + createClientActor( + proxyActor.getRef(), + ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .setSources(List.of(ConnectivityModelFactory.newSourceBuilder() + .addresses(Set.of("eclipse/+/+")) + .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) + .consumerCount(1) + .enforcement(ConnectivityModelFactory.newSourceAddressEnforcement( + "eclipse/{{ thing:namespace }}/{{ thing:name }}" + )) + .replyTarget(ReplyTarget.newBuilder().address("{{ header:reply-to }}").build()) + .qos(1) + .build())) + .build() + ), + actorSystemResource.getActorSystem() ); final var genericMqttPublish = proxyActor.expectMsgClass(GenericMqttPublish.class); @@ -483,41 +502,43 @@ public void consumeMultipleSources() { genericMqttPublishesForRelevantTopics.stream() ).toArray(GenericMqttPublish[]::new) ); - final var testKit = actorSystemResource.newTestKit(); final var headerMapping = ConnectivityModelFactory.newHeaderMapping(Map.of( MqttHeader.MQTT_TOPIC.getName(), getHeaderPlaceholder(MqttHeader.MQTT_TOPIC.getName()), "custom.topic", getHeaderPlaceholder(MqttHeader.MQTT_TOPIC.getName()) )); - testKit.childActorOf(createClientActor( - proxyActor.getRef(), - ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) - .sources(List.of( - ConnectivityModelFactory.newSourceBuilder() - .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) - .index(1) - .consumerCount(3) - .addresses(Set.of(topicA1)) - .headerMapping(headerMapping) - .qos(1) - .build(), - ConnectivityModelFactory.newSourceBuilder() - .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) - .index(2) - .consumerCount(3) - .addresses(Set.of(topicB1, topicB2)) - .headerMapping(headerMapping) - .qos(1) - .build(), - ConnectivityModelFactory.newSourceBuilder() - .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) - .index(3) - .consumerCount(1) - .addresses(Set.of(topicC1, topicC2, topicC3)) - .headerMapping(headerMapping) - .qos(1) - .build() - )) - .build()) + TestActorRef.apply( + createClientActor( + proxyActor.getRef(), + ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .sources(List.of( + ConnectivityModelFactory.newSourceBuilder() + .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) + .index(1) + .consumerCount(3) + .addresses(Set.of(topicA1)) + .headerMapping(headerMapping) + .qos(1) + .build(), + ConnectivityModelFactory.newSourceBuilder() + .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) + .index(2) + .consumerCount(3) + .addresses(Set.of(topicB1, topicB2)) + .headerMapping(headerMapping) + .qos(1) + .build(), + ConnectivityModelFactory.newSourceBuilder() + .authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT) + .index(3) + .consumerCount(1) + .addresses(Set.of(topicC1, topicC2, topicC3)) + .headerMapping(headerMapping) + .qos(1) + .build() + )) + .build() + ), + actorSystemResource.getActorSystem() ); final var receivedTopicsCounts = IntStream.range(0, genericMqttPublishesForRelevantTopics.size()) @@ -532,9 +553,10 @@ public void consumeMultipleSources() { @Test public void reconnectAndConsumeFromTopic() { enableSubscribingAndConsumingMethodStubbing(getMqttPublish(SOURCE_ADDRESS, getSerializedModifyThingCommand())); - final var testKit = actorSystemResource.newTestKit(); - final var underTest = testKit.childActorOf(createClientActor(proxyActor.getRef(), getConnection(false))); + final var underTest = TestActorRef.apply(createClientActor(proxyActor.getRef(), getConnection(false)), + actorSystemResource.getActorSystem()); final var dittoHeadersWithCorrelationId = getDittoHeadersWithCorrelationId(); + final var testKit = actorSystemResource.newTestKit(); proxyActor.expectMsgClass(ModifyThing.class); @@ -556,16 +578,19 @@ public void reconnectAndConsumeFromTopic() { @Test public void publishToTopic() { - final var testKit = actorSystemResource.newTestKit(); - final var dittoHeadersWithCorrelationId = getDittoHeadersWithCorrelationId(); final var authorizationContext = MQTT_TARGET.getAuthorizationContext(); final var thingModifiedEvent = TestConstants.thingModified(authorizationContext.getAuthorizationSubjects()); - final var underTest = testKit.childActorOf(createClientActor( - proxyActor.getRef(), - ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) - .connectionStatus(ConnectivityStatus.CLOSED) - .build() - )); + final var underTest = TestActorRef.apply( + createClientActor( + proxyActor.getRef(), + ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .connectionStatus(ConnectivityStatus.CLOSED) + .build() + ), + actorSystemResource.getActorSystem() + ); + final var dittoHeadersWithCorrelationId = getDittoHeadersWithCorrelationId(); + final var testKit = actorSystemResource.newTestKit(); underTest.tell(OpenConnection.of(CONNECTION_ID, dittoHeadersWithCorrelationId), testKit.getRef()); @@ -585,16 +610,19 @@ public void publishToTopic() { @Test public void publishToReplyTarget() { - final var testKit = actorSystemResource.newTestKit(); + final var underTest = TestActorRef.apply( + createClientActor( + proxyActor.getRef(), + ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) + .connectionStatus(ConnectivityStatus.CLOSED) + .setSources(TestConstants.Sources.SOURCES_WITH_AUTH_CONTEXT) + .build() + ), + actorSystemResource.getActorSystem() + ); final var dittoHeadersWithCorrelationId = getDittoHeadersWithCorrelationId(); + final var testKit = actorSystemResource.newTestKit(); final var thingId = ThingId.generateRandom(); - final var underTest = testKit.childActorOf(createClientActor( - proxyActor.getRef(), - ConnectivityModelFactory.newConnectionBuilder(getConnection(false)) - .connectionStatus(ConnectivityStatus.CLOSED) - .setSources(TestConstants.Sources.SOURCES_WITH_AUTH_CONTEXT) - .build() - )); underTest.tell(OpenConnection.of(CONNECTION_ID, dittoHeadersWithCorrelationId), testKit.getRef()); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClientTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClientTest.java index e30f5919ff..fed6517f22 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClientTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/DefaultGenericMqttClientTest.java @@ -270,6 +270,60 @@ public void disconnectFailsIfDisconnectOnPublishingClientFails() { .isEqualTo(illegalStateException); } + @Test + public void disconnectClientRoleWithNullThrowsException() { + final var underTest = + DefaultGenericMqttClient.newInstance(subscribingClient, publishingClient, hiveMqttClientProperties); + + Assertions.assertThatNullPointerException() + .isThrownBy(() -> underTest.disconnectClientRole(null)) + .withMessage("The clientRole must not be null!") + .withNoCause(); + } + + @Test + public void disconnectClientRoleWithConsumerRoleDisconnectsOnlySubscribingClient() { + Mockito.when(subscribingClient.disconnect()).thenReturn(CompletableFuture.completedFuture(null)); + final var underTest = + DefaultGenericMqttClient.newInstance(subscribingClient, publishingClient, hiveMqttClientProperties); + + final var disconnectFuture = underTest.disconnectClientRole(ClientRole.CONSUMER); + + assertThat(disconnectFuture).succeedsWithin(Duration.ofSeconds(1L)); + + Mockito.verify(subscribingClient).disconnect(); + Mockito.verify(publishingClient, Mockito.never()).disconnect(); + } + + @Test + public void disconnectClientRoleWithPublisherRoleDisconnectsOnlyPublishingClient() { + Mockito.when(publishingClient.disconnect()).thenReturn(CompletableFuture.completedFuture(null)); + final var underTest = + DefaultGenericMqttClient.newInstance(subscribingClient, publishingClient, hiveMqttClientProperties); + + final var disconnectFuture = underTest.disconnectClientRole(ClientRole.PUBLISHER); + + assertThat(disconnectFuture).succeedsWithin(Duration.ofSeconds(1L)); + + Mockito.verify(subscribingClient, Mockito.never()).disconnect(); + Mockito.verify(publishingClient).disconnect(); + } + + @Test + public void disconnectClientRoleWithConsumerPublisherRoleDisconnectsPublishingClientAndSubscribingClient() { + Mockito.when(subscribingClient.disconnect()).thenReturn(CompletableFuture.completedFuture(null)); + Mockito.when(publishingClient.disconnect()).thenReturn(CompletableFuture.completedFuture(null)); + final var underTest = + DefaultGenericMqttClient.newInstance(subscribingClient, publishingClient, hiveMqttClientProperties); + + final var disconnectFuture = underTest.disconnectClientRole(ClientRole.CONSUMER_PUBLISHER); + + assertThat(disconnectFuture).succeedsWithin(Duration.ofSeconds(1L)); + + Mockito.verify(subscribingClient).disconnect(); + Mockito.verify(publishingClient).disconnect(); + } + @Test public void subscribeCallsSubscribeOnSubscribingClient() { final var genericMqttSubscribe = GenericMqttSubscribe.of( diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactoryTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactoryTest.java index c503cad4e1..44e4957452 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactoryTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientFactoryTest.java @@ -36,6 +36,7 @@ import org.eclipse.ditto.connectivity.service.config.MqttConfig; import org.eclipse.ditto.connectivity.service.messaging.internal.ssl.DittoTrustManagerFactory; import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig; import org.eclipse.ditto.connectivity.service.messaging.tunnel.SshTunnelState; import org.junit.Before; import org.junit.Rule; @@ -69,6 +70,7 @@ public final class HiveMqttClientFactoryTest { public final JUnitSoftAssertions softly = new JUnitSoftAssertions(); @Mock private Connection mqttConnection; + @Mock private MqttConfig mqttConfig; @Mock private ConnectivityConfig connectivityConfig; @Mock private Supplier sshTunnelStateSupplier; @Mock private ConnectionLogger connectionLogger; @@ -83,8 +85,8 @@ public void before() { Mockito.when(mqttConnection.getUsername()).thenReturn(Optional.of(USERNAME)); Mockito.when(mqttConnection.getPassword()).thenReturn(Optional.of(PASSWORD)); - final var mqttConfig = Mockito.mock(MqttConfig.class); Mockito.when(mqttConfig.getEventLoopThreads()).thenReturn(EVENT_LOOP_THREAD_NUMBER); + final var connectionConfig = Mockito.mock(ConnectionConfig.class); Mockito.when(connectionConfig.getMqttConfig()).thenReturn(mqttConfig); Mockito.when(connectivityConfig.getConnectionConfig()).thenReturn(connectionConfig); @@ -136,6 +138,7 @@ public void getMqtt3ClientWithoutLastWillWithoutSslReturnsExpected() throws NoMq final var hiveMqttClientProperties = HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) @@ -199,6 +202,7 @@ public void getMqtt3ClientWithLastWillWithoutSslReturnsExpected() throws NoMqttC final var hiveMqttClientProperties = HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) @@ -236,6 +240,7 @@ public void getMqtt3ClientWithoutLastWillWithSslReturnsExpected() throws NoMqttC final var hiveMqttClientProperties = HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) @@ -279,6 +284,7 @@ public void getMqtt5ClientWithoutLastWillWithoutSslReturnsExpected() throws NoMq final var hiveMqttClientProperties = HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) @@ -342,6 +348,7 @@ public void getMqtt5ClientWithLastWillWithoutSslReturnsExpected() throws NoMqttC final var hiveMqttClientProperties = HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) @@ -379,6 +386,7 @@ public void getMqtt5ClientWithoutLastWillWithSslReturnsExpected() throws NoMqttC final var hiveMqttClientProperties = HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientPropertiesTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientPropertiesTest.java index 19c3a9b1b8..e5eed3106d 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientPropertiesTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/HiveMqttClientPropertiesTest.java @@ -45,6 +45,7 @@ public final class HiveMqttClientPropertiesTest { private static final UUID ACTOR_UUID = UUID.randomUUID(); @Mock private Connection mqttConnection; + @Mock private MqttConfig mqttConfig; @Mock private ConnectivityConfig connectivityConfig; @Mock private Supplier sshTunnelStateSupplier; @Mock private ConnectionLogger connectionLogger; @@ -96,6 +97,7 @@ public void setNullSshTunnelStateSupplierThrowsException() { .isThrownBy(() -> HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(null)) .withMessage("The sshTunnelStateSupplier must not be null!") .withNoCause(); @@ -107,6 +109,7 @@ public void setNullConnectionLoggerThrowsException() { .isThrownBy(() -> HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(null)) .withMessage("The connectionLogger must not be null!") @@ -119,6 +122,7 @@ public void setNullActorUuidThrowsException() { .isThrownBy(() -> HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(null)) @@ -132,6 +136,7 @@ public void setNullClientConnectedListenerThrowsException() { .isThrownBy(() -> HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) @@ -146,6 +151,7 @@ public void setNullClientDisconnectedListenerThrowsException() { .isThrownBy(() -> HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) @@ -168,6 +174,7 @@ public void buildReturnsExpectedInstance() throws NoMqttConnectionException { final var underTest = HiveMqttClientProperties.builder() .withMqttConnection(mqttConnection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(mqttConnection, mqttConfig)) .withSshTunnelStateSupplier(sshTunnelStateSupplier) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/MqttPublishingClientIdentifierFactoryTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/MqttPublishingClientIdentifierFactoryTest.java index be8faa170a..2172cc82df 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/MqttPublishingClientIdentifierFactoryTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/MqttPublishingClientIdentifierFactoryTest.java @@ -26,6 +26,7 @@ import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; import org.eclipse.ditto.connectivity.service.config.MqttConfig; import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -84,6 +85,7 @@ private HiveMqttClientProperties getHiveMqttClientProperties() throws NoMqttConn return HiveMqttClientProperties.builder() .withMqttConnection(connection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(connection, mqttConfig)) .withSshTunnelStateSupplier(() -> null) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/MqttSubscribingClientIdentifierFactoryTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/MqttSubscribingClientIdentifierFactoryTest.java index 6bf2614281..a588582f61 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/MqttSubscribingClientIdentifierFactoryTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/client/MqttSubscribingClientIdentifierFactoryTest.java @@ -26,6 +26,7 @@ import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig; import org.eclipse.ditto.connectivity.service.config.MqttConfig; import org.eclipse.ditto.connectivity.service.messaging.monitoring.logs.ConnectionLogger; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.MqttSpecificConfig; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -84,6 +85,7 @@ private HiveMqttClientProperties getHiveMqttClientProperties() throws NoMqttConn return HiveMqttClientProperties.builder() .withMqttConnection(connection) .withConnectivityConfig(connectivityConfig) + .withMqttSpecificConfig(MqttSpecificConfig.fromConnection(connection, mqttConfig)) .withSshTunnelStateSupplier(() -> null) .withConnectionLogger(connectionLogger) .withActorUuid(ACTOR_UUID) diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttConsumerActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttConsumerActorTest.java index 44ad67e0e0..d519093fd3 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttConsumerActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/MqttConsumerActorTest.java @@ -456,7 +456,7 @@ public void reconnectConsumerClientForRedeliveryIfInboundMessageIsRejected() { ActorRef.noSender() ); - fakeMqttClientActor.expectMsg(MqttClientActorControl.RECONNECT_CONSUMER_CLIENT); + fakeMqttClientActor.expectMsgClass(ReconnectConsumerClient.class); underTest.tell(GracefulStop.INSTANCE, ActorRef.noSender()); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/ReconnectConsumerClientTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/ReconnectConsumerClientTest.java new file mode 100644 index 0000000000..c146c314e2 --- /dev/null +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/consuming/ReconnectConsumerClientTest.java @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2022 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + * + * SPDX-License-Identifier: EPL-2.0 + */ +package org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.consuming; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf; +import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable; + +import org.assertj.core.api.Assertions; +import org.eclipse.ditto.connectivity.service.messaging.mqtt.ReconnectDelay; +import org.junit.Test; +import org.mockito.Mockito; + +import nl.jqno.equalsverifier.EqualsVerifier; + +/** + * Unit test for {@link ReconnectConsumerClient}. + */ +public final class ReconnectConsumerClientTest { + + @Test + public void assertImmutability() { + assertInstancesOf(ReconnectConsumerClient.class, areImmutable()); + } + + @Test + public void testHashCodeAndEquals() { + EqualsVerifier.forClass(ReconnectConsumerClient.class) + .usingGetClass() + .verify(); + } + + @Test + public void ofWithNullReconnectDelayThrowsException() { + Assertions.assertThatNullPointerException() + .isThrownBy(() -> ReconnectConsumerClient.of(null)) + .withMessage("The reconnectDelay must not be null!") + .withNoCause(); + } + + @Test + public void getReconnectDelayReturnsExpected() { + final var reconnectDelay = Mockito.mock(ReconnectDelay.class); + final var reconnectConsumerClient = ReconnectConsumerClient.of(reconnectDelay); + + assertThat(reconnectConsumerClient.getReconnectDelay()).isEqualTo(reconnectDelay); + } + +} \ No newline at end of file