From 0f581ffa2753979e1c5cea3467b1349fbeecdf65 Mon Sep 17 00:00:00 2001 From: Stefan Maute Date: Mon, 25 Oct 2021 08:06:33 +0200 Subject: [PATCH] align sourceStatus presentation for status "unknown/failure/misconfiguration" with status open for mqtt connection; don't split up source addresses for mqtt sources; Signed-off-by: Stefan Maute --- .../service/messaging/BaseClientActor.java | 18 ++++++++++++++---- .../mqtt/hivemq/AbstractMqttClientActor.java | 14 ++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java index 2f33da5abd..74a4672128 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java @@ -42,6 +42,7 @@ import java.util.function.Predicate; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import java.util.stream.StreamSupport; import javax.annotation.Nullable; @@ -1426,9 +1427,7 @@ private FSM.State retrieveConnectionStatus(fina .info("Responding early with static 'CLOSED' ResourceStatus for all sub-sources and " + "-targets and SSH tunnel, because some children could not be started, due to a " + "live status <{}> in the client actor.", clientConnectionStatus); - connection.getSources().stream() - .map(Source::getAddresses) - .flatMap(Collection::stream) + getSourceAddresses() .map(sourceAddress -> ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(), ConnectivityStatus.CLOSED, sourceAddress, @@ -1469,7 +1468,7 @@ private FSM.State retrieveConnectionStatus(fina } /** - * Determines the number of consumers. + * Determine the number of consumers. * * @return the number of consumers. */ @@ -1480,6 +1479,17 @@ protected int determineNumberOfConsumers() { .sum(); } + /** + * Get the source addresses as stream of strings. + * + * @return the stream of source addresses. + */ + protected Stream getSourceAddresses() { + return connection.getSources().stream() + .map(Source::getAddresses) + .flatMap(Collection::stream); + } + private void retrieveAddressStatusFromChildren(final RetrieveConnectionStatus command, final ActorRef sender, final List childrenToAsk) { childrenToAsk.forEach(child -> { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java index c6ad4347ab..dc598df0e3 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.stream.Stream; import javax.annotation.Nullable; @@ -591,6 +592,9 @@ private String distinguishClientIdIfNecessary(final String configuredClientId) { } } + /* + * For MQTT connections only one Consumer Actor for all addresses is started. + */ @Override protected int determineNumberOfConsumers() { return connection.getSources() @@ -599,6 +603,16 @@ protected int determineNumberOfConsumers() { .sum(); } + /* + * For MQTT connections only one Consumer Actor for all addresses is started. + */ + @Override + protected Stream getSourceAddresses() { + return connection.getSources().stream() + .map(Source::getAddresses) + .map(sourceAddresses -> String.join(";", sourceAddresses)); + } + static class MqttClientConnected extends AbstractWithOrigin implements ClientConnected { MqttClientConnected(@Nullable final ActorRef origin) {