From d727672d2d867eac02e53f935ef1c6135fdf2d78 Mon Sep 17 00:00:00 2001 From: Stefan Maute Date: Fri, 22 Oct 2021 11:39:37 +0200 Subject: [PATCH] fixed a bug where an additional source status is shown when a mqtt source has two or more addresses; add method to determine the number of consumers in BaseClientActor and RetrieveConnectionStatusAggregatorActor because number of consumers is calculated differently for mqtt sources; Signed-off-by: Stefan Maute --- .../service/messaging/BaseClientActor.java | 18 +++++++++--- .../messaging/kafka/KafkaConsumerActor.java | 3 -- ...trieveConnectionStatusAggregatorActor.java | 29 +++++++++++++++---- .../mqtt/hivemq/AbstractMqttClientActor.java | 7 +++++ .../mqtt/hivemq/HiveMqtt3ClientActor.java | 1 + .../ConnectionPersistenceActor.java | 5 ++-- .../messaging/BaseClientActorTest.java | 2 +- 7 files changed, 48 insertions(+), 17 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java index 78bd817fda..2f33da5abd 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java @@ -1407,11 +1407,9 @@ private FSM.State retrieveConnectionStatus(fina " Forwarding to consumers and publishers.", command.getEntityId(), sender); + // only one PublisherActor is started for all targets (if targets are present) final int numberOfProducers = connection.getTargets().isEmpty() ? 0 : 1; - final int numberOfConsumers = connection.getSources() - .stream() - .mapToInt(source -> source.getConsumerCount() * source.getAddresses().size()) - .sum(); + final int numberOfConsumers = determineNumberOfConsumers(); int expectedNumberOfChildren = numberOfProducers + numberOfConsumers; if (getSshTunnelState().isEnabled()) { expectedNumberOfChildren++; @@ -1470,6 +1468,18 @@ private FSM.State retrieveConnectionStatus(fina return stay(); } + /** + * Determines the number of consumers. + * + * @return the number of consumers. + */ + protected int determineNumberOfConsumers() { + return connection.getSources() + .stream() + .mapToInt(source -> source.getConsumerCount() * source.getAddresses().size()) + .sum(); + } + private void retrieveAddressStatusFromChildren(final RetrieveConnectionStatus command, final ActorRef sender, final List childrenToAsk) { childrenToAsk.forEach(child -> { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActor.java index 6fd8cd2af3..e8dc9b986b 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/kafka/KafkaConsumerActor.java @@ -12,9 +12,6 @@ */ package org.eclipse.ditto.connectivity.service.messaging.kafka; -import static org.eclipse.ditto.connectivity.api.EnforcementFactoryFactory.newEnforcementFilterFactory; -import static org.eclipse.ditto.placeholders.PlaceholderFactory.newHeadersPlaceholder; - import java.time.Duration; import java.time.Instant; import java.util.concurrent.CompletionException; diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/RetrieveConnectionStatusAggregatorActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/RetrieveConnectionStatusAggregatorActor.java index 32c15e84fb..403cb97b72 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/RetrieveConnectionStatusAggregatorActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/monitoring/metrics/RetrieveConnectionStatusAggregatorActor.java @@ -24,6 +24,7 @@ import org.eclipse.ditto.base.model.headers.DittoHeaders; import org.eclipse.ditto.connectivity.model.Connection; +import org.eclipse.ditto.connectivity.model.ConnectionType; import org.eclipse.ditto.connectivity.model.ConnectivityStatus; import org.eclipse.ditto.connectivity.model.ResourceStatus; import org.eclipse.ditto.connectivity.model.SshTunnel; @@ -75,6 +76,7 @@ private RetrieveConnectionStatusAggregatorActor(final Connection connection, configuredClientCount = connection.getClientCount(); // one response per client actor expectedResponses.put(ResourceStatus.ResourceType.CLIENT, configuredClientCount); + if (ConnectivityStatus.OPEN.equals(connection.getConnectionStatus())) { // one response per source/target expectedResponses.put(ResourceStatus.ResourceType.TARGET, @@ -82,12 +84,8 @@ private RetrieveConnectionStatusAggregatorActor(final Connection connection, .stream() .mapToInt(target -> configuredClientCount) .sum()); - expectedResponses.put(ResourceStatus.ResourceType.SOURCE, - connection.getSources() - .stream() - .mapToInt(source -> configuredClientCount * source.getConsumerCount() * - source.getAddresses().size()) - .sum()); + expectedResponses.put(ResourceStatus.ResourceType.SOURCE, determineSourceCount(connection)); + if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) { expectedResponses.put(ResourceStatus.ResourceType.SSH_TUNNEL, configuredClientCount); } @@ -240,4 +238,23 @@ private static ConnectivityStatus calculateOverallLiveStatus(final ConnectivityS private void stopSelf() { getContext().stop(getSelf()); } + + private int determineSourceCount(final Connection connection) { + final int sourceCount; + if(connection.getConnectionType().equals(ConnectionType.MQTT)) { + // for mqtt only one consumer actor for all addresses of a source is started. + sourceCount = connection.getSources() + .stream() + .mapToInt(source -> configuredClientCount * source.getConsumerCount()) + .sum(); + } else { + sourceCount = connection.getSources() + .stream() + .mapToInt(source -> configuredClientCount * source.getConsumerCount() * source.getAddresses().size()) + .sum(); + } + + return sourceCount; + } + } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java index ff9efe0a42..c6ad4347ab 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/AbstractMqttClientActor.java @@ -591,6 +591,13 @@ private String distinguishClientIdIfNecessary(final String configuredClientId) { } } + @Override + protected int determineNumberOfConsumers() { + return connection.getSources() + .stream() + .mapToInt(Source::getConsumerCount) + .sum(); + } static class MqttClientConnected extends AbstractWithOrigin implements ClientConnected { diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/HiveMqtt3ClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/HiveMqtt3ClientActor.java index 104e5c5c04..10411e877e 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/HiveMqtt3ClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/mqtt/hivemq/HiveMqtt3ClientActor.java @@ -124,6 +124,7 @@ ActorRef startPublisherActor(final Connection connection, final Mqtt3AsyncClient final Props publisherActorProps = HiveMqtt3PublisherActor.props(connection, client, isDryRun(), getDefaultClientId(), connectivityStatusResolver); + return startChildActorConflictFree(HiveMqtt3PublisherActor.NAME, publisherActorProps); } diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java index a904e63a98..cf987bb25d 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/persistence/ConnectionPersistenceActor.java @@ -470,9 +470,8 @@ protected void processPingCommand(final PingCommand ping) { } private void askSelfForRetrieveConnectionStatus(@Nullable final CharSequence correlationId) { - final var retrieveConnectionStatus = RetrieveConnectionStatus.of(entityId, DittoHeaders.newBuilder() - .correlationId(correlationId) - .build()); + final var retrieveConnectionStatus = RetrieveConnectionStatus.of(entityId, + DittoHeaders.newBuilder().correlationId(correlationId).build()); Patterns.ask(getSelf(), retrieveConnectionStatus, SELF_RETRIEVE_CONNECTION_STATUS_TIMEOUT) .whenComplete((response, throwable) -> { if (response instanceof RetrieveConnectionStatusResponse) { diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActorTest.java index f7c449c592..d15eb291fb 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActorTest.java @@ -499,7 +499,7 @@ public void sendsConnectionOpenedAnnouncementAfterReconnect() { andConnectionFails(dummyClientActor, getRef()); // not expecting a closed announcement after connection failure, since it's not possible to send a message - // if a connecting is failed and thus not connected + // if connecting is failed and thus not connected andConnectionSuccessful(dummyClientActor, getRef());