Skip to content

Commit

Permalink
fixed a bug where an additional source status is shown when a mqtt so…
Browse files Browse the repository at this point in the history
…urce has two or more addresses;

add method to determine the number of consumers in BaseClientActor and RetrieveConnectionStatusAggregatorActor because number of consumers is calculated differently for mqtt sources;

Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Oct 22, 2021
1 parent 66317d2 commit d727672
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1407,11 +1407,9 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
" Forwarding to consumers and publishers.", command.getEntityId(),
sender);

// only one PublisherActor is started for all targets (if targets are present)
final int numberOfProducers = connection.getTargets().isEmpty() ? 0 : 1;
final int numberOfConsumers = connection.getSources()
.stream()
.mapToInt(source -> source.getConsumerCount() * source.getAddresses().size())
.sum();
final int numberOfConsumers = determineNumberOfConsumers();
int expectedNumberOfChildren = numberOfProducers + numberOfConsumers;
if (getSshTunnelState().isEnabled()) {
expectedNumberOfChildren++;
Expand Down Expand Up @@ -1470,6 +1468,18 @@ private FSM.State<BaseClientState, BaseClientData> retrieveConnectionStatus(fina
return stay();
}

/**
* Determines the number of consumers.
*
* @return the number of consumers.
*/
protected int determineNumberOfConsumers() {
return connection.getSources()
.stream()
.mapToInt(source -> source.getConsumerCount() * source.getAddresses().size())
.sum();
}

private void retrieveAddressStatusFromChildren(final RetrieveConnectionStatus command, final ActorRef sender,
final List<ActorRef> childrenToAsk) {
childrenToAsk.forEach(child -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
*/
package org.eclipse.ditto.connectivity.service.messaging.kafka;

import static org.eclipse.ditto.connectivity.api.EnforcementFactoryFactory.newEnforcementFilterFactory;
import static org.eclipse.ditto.placeholders.PlaceholderFactory.newHeadersPlaceholder;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.CompletionException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
import org.eclipse.ditto.connectivity.model.SshTunnel;
Expand Down Expand Up @@ -75,19 +76,16 @@ private RetrieveConnectionStatusAggregatorActor(final Connection connection,
configuredClientCount = connection.getClientCount();
// one response per client actor
expectedResponses.put(ResourceStatus.ResourceType.CLIENT, configuredClientCount);

if (ConnectivityStatus.OPEN.equals(connection.getConnectionStatus())) {
// one response per source/target
expectedResponses.put(ResourceStatus.ResourceType.TARGET,
connection.getTargets()
.stream()
.mapToInt(target -> configuredClientCount)
.sum());
expectedResponses.put(ResourceStatus.ResourceType.SOURCE,
connection.getSources()
.stream()
.mapToInt(source -> configuredClientCount * source.getConsumerCount() *
source.getAddresses().size())
.sum());
expectedResponses.put(ResourceStatus.ResourceType.SOURCE, determineSourceCount(connection));

if (connection.getSshTunnel().map(SshTunnel::isEnabled).orElse(false)) {
expectedResponses.put(ResourceStatus.ResourceType.SSH_TUNNEL, configuredClientCount);
}
Expand Down Expand Up @@ -240,4 +238,23 @@ private static ConnectivityStatus calculateOverallLiveStatus(final ConnectivityS
private void stopSelf() {
getContext().stop(getSelf());
}

private int determineSourceCount(final Connection connection) {
final int sourceCount;
if(connection.getConnectionType().equals(ConnectionType.MQTT)) {
// for mqtt only one consumer actor for all addresses of a source is started.
sourceCount = connection.getSources()
.stream()
.mapToInt(source -> configuredClientCount * source.getConsumerCount())
.sum();
} else {
sourceCount = connection.getSources()
.stream()
.mapToInt(source -> configuredClientCount * source.getConsumerCount() * source.getAddresses().size())
.sum();
}

return sourceCount;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,13 @@ private String distinguishClientIdIfNecessary(final String configuredClientId) {
}
}

@Override
protected int determineNumberOfConsumers() {
return connection.getSources()
.stream()
.mapToInt(Source::getConsumerCount)
.sum();
}

static class MqttClientConnected extends AbstractWithOrigin implements ClientConnected {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ ActorRef startPublisherActor(final Connection connection, final Mqtt3AsyncClient
final Props publisherActorProps =
HiveMqtt3PublisherActor.props(connection, client, isDryRun(), getDefaultClientId(),
connectivityStatusResolver);

return startChildActorConflictFree(HiveMqtt3PublisherActor.NAME, publisherActorProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,9 +470,8 @@ protected void processPingCommand(final PingCommand ping) {
}

private void askSelfForRetrieveConnectionStatus(@Nullable final CharSequence correlationId) {
final var retrieveConnectionStatus = RetrieveConnectionStatus.of(entityId, DittoHeaders.newBuilder()
.correlationId(correlationId)
.build());
final var retrieveConnectionStatus = RetrieveConnectionStatus.of(entityId,
DittoHeaders.newBuilder().correlationId(correlationId).build());
Patterns.ask(getSelf(), retrieveConnectionStatus, SELF_RETRIEVE_CONNECTION_STATUS_TIMEOUT)
.whenComplete((response, throwable) -> {
if (response instanceof RetrieveConnectionStatusResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,7 @@ public void sendsConnectionOpenedAnnouncementAfterReconnect() {

andConnectionFails(dummyClientActor, getRef());
// not expecting a closed announcement after connection failure, since it's not possible to send a message
// if a connecting is failed and thus not connected
// if connecting is failed and thus not connected

andConnectionSuccessful(dummyClientActor, getRef());

Expand Down

0 comments on commit d727672

Please sign in to comment.