Skip to content

Commit

Permalink
fix retrieving connection metrics, improve exception handling
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Jul 12, 2018
1 parent 0ca4c32 commit 6160ff6
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 19 deletions.
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -586,7 +587,11 @@ protected final CompletableFuture<Pair<String, AddressMetric>> retrieveAddressMe
}
}).toCompletableFuture();
} else {
log.warning("Consumer actor child <{}> was not found", childActorName);
log.warning("Consumer actor child <{}> was not found in child actors <{}>", childActorName,
StreamSupport
.stream(getContext().getChildren().spliterator(), false)
.map(ref -> ref.path().name())
.collect(Collectors.toList()));
return CompletableFuture.completedFuture(Pair.create(addressIdentifier,
ConnectivityModelFactory.newAddressMetric(
ConnectionStatus.FAILED,
Expand Down
Expand Up @@ -27,7 +27,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import javax.annotation.Nullable;
import javax.jms.ExceptionListener;
Expand Down Expand Up @@ -89,8 +88,7 @@ public final class AmqpClientActor extends BaseClientActor implements ExceptionL
* This constructor is called via reflection by the static method propsForTest.
*/
private AmqpClientActor(final Connection connection, final ConnectionStatus connectionStatus,
final JmsConnectionFactory jmsConnectionFactory,
final ActorRef conciergeForwarder) {
final JmsConnectionFactory jmsConnectionFactory, final ActorRef conciergeForwarder) {
super(connection, connectionStatus, conciergeForwarder);
this.jmsConnectionFactory = jmsConnectionFactory;
connectionListener = new ConnectionListener();
Expand All @@ -100,6 +98,7 @@ private AmqpClientActor(final Connection connection, final ConnectionStatus conn
/*
* This constructor is called via reflection by the static method props(Connection, ActorRef).
*/
@SuppressWarnings("unused") // called via reflection
private AmqpClientActor(final Connection connection, final ConnectionStatus connectionStatus,
final ActorRef conciergeForwarder) {
this(connection, connectionStatus, ConnectionBasedJmsConnectionFactory.getInstance(), conciergeForwarder);
Expand All @@ -126,7 +125,7 @@ public static Props props(final Connection connection, final ActorRef conciergeF
* @param jmsConnectionFactory the JMS connection factory.
* @return the Akka configuration Props object.
*/
public static Props propsForTests(final Connection connection, final ConnectionStatus connectionStatus,
static Props propsForTests(final Connection connection, final ConnectionStatus connectionStatus,
final ActorRef conciergeForwarder, final JmsConnectionFactory jmsConnectionFactory) {
return Props.create(AmqpClientActor.class, validateConnection(connection), connectionStatus,
jmsConnectionFactory, conciergeForwarder);
Expand Down Expand Up @@ -196,23 +195,23 @@ protected void doDisconnectClient(final Connection connection, @Nullable final A

@Override
protected Map<String, AddressMetric> getSourceConnectionStatus(final Source source) {

try {
return collectAsList(source.getAddresses().stream()
.flatMap(sourceAddress -> IntStream.range(0, source.getConsumerCount())
.mapToObj(idx -> {
final String addressWithIndex = sourceAddress + "-" + idx;
final String actorName =
escapeActorName(AmqpConsumerActor.ACTOR_NAME_PREFIX + addressWithIndex);
return retrieveAddressMetric(addressWithIndex, actorName);
})
).collect(Collectors.toList()))
.thenApply((entries) ->
entries.stream().collect(Collectors.toMap(Pair::first, Pair::second)))
return collectAsList(consumers.stream()
.map(consumerData -> retrieveAddressMetric(consumerData.getAddressWithIndex(),
consumerData.getActorName()))
.collect(Collectors.toList())).thenApply(
entries -> entries.stream().collect(Collectors.toMap(Pair::first, Pair::second)))
.get(RETRIEVE_METRICS_TIMEOUT, TimeUnit.SECONDS);
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
} catch (final InterruptedException e) {
log.error(e, "Aggregating ConnectionStatus for sources was interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
return Collections.emptyMap();
} catch (final ExecutionException e) {
log.error(e, "Error while aggregating sources ConnectionStatus: {}", e.getMessage());
return Collections.emptyMap();
} catch (final TimeoutException e) {
log.error(e, "Aggregating ConnectionStatus for sources timed out: {}", e.getMessage());
return Collections.emptyMap();
}
}

Expand All @@ -227,9 +226,16 @@ protected Map<String, AddressMetric> getTargetConnectionStatus(final Target targ
TimeUnit.SECONDS);
targetStatus.put(targetEntry.first(), targetEntry.second());
return targetStatus;
} catch (final InterruptedException | ExecutionException | TimeoutException e) {
} catch (final InterruptedException e) {
log.error(e, "Aggregating ConnectionStatus for targets was interrupted: {}", e.getMessage());
Thread.currentThread().interrupt();
return Collections.emptyMap();
} catch (final ExecutionException e) {
log.error(e, "Error while aggregating target ConnectionStatus: {}", e.getMessage());
return Collections.emptyMap();
} catch (final TimeoutException e) {
log.error(e, "Aggregating ConnectionStatus for targets timed out: {}", e.getMessage());
return Collections.emptyMap();
}
}

Expand Down
Expand Up @@ -43,6 +43,10 @@ String getAddress() {
return address;
}

String getAddressWithIndex() {
return addressWithIndex;
}

MessageConsumer getMessageConsumer() {
return messageConsumer;
}
Expand Down

0 comments on commit 6160ff6

Please sign in to comment.