Skip to content

Commit

Permalink
Review: Delay AMQP consumer status retrieval.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 18, 2021
1 parent b98226d commit 4ad1ec0
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,10 @@ public void postStop() {

@Override
protected Set<Pattern> getExcludedAddressReportingChildNamePatterns() {
final Set<Pattern> excludedChildNamePatterns = new HashSet<>(super.getExcludedAddressReportingChildNamePatterns());
excludedChildNamePatterns.add(Pattern.compile(Pattern.quote(JMSConnectionHandlingActor.ACTOR_NAME_PREFIX) + ".*"));
final Set<Pattern> excludedChildNamePatterns =
new HashSet<>(super.getExcludedAddressReportingChildNamePatterns());
excludedChildNamePatterns.add(
Pattern.compile(Pattern.quote(JMSConnectionHandlingActor.ACTOR_NAME_PREFIX) + ".*"));
return excludedChildNamePatterns;
}

Expand Down Expand Up @@ -427,18 +429,17 @@ private CompletionStage<Done> startCommandConsumers(final List<ConsumerData> con
if (isConsuming()) {
stopCommandConsumers();
// wait a fraction of the configured timeout before asking to allow the consumer to stabilize
final CompletionStage<Done> identity =
new CompletableFuture<Done>().completeOnTimeout(Done.getInstance(),
final CompletionStage<Object> identity =
new CompletableFuture<>().completeOnTimeout(Done.getInstance(),
initialConsumerResourceStatusAskTimeout.toMillis() / 2, TimeUnit.MILLISECONDS);
final CompletionStage<Done> completionStage = consumers.stream()
final CompletionStage<Object> completionStage = consumers.stream()
.map(consumer -> startCommandConsumer(consumer, getInboundMappingSink(), jmsActor))
.map(ref -> Patterns.ask(ref, RetrieveAddressStatus.getInstance(),
initialConsumerResourceStatusAskTimeout).thenApply(result -> Done.getInstance()))
.reduce(identity, (done, reply) -> done.thenCompose(result -> done))
.exceptionally(t -> Done.getInstance());
.map(ref -> identity.thenCompose(done -> Patterns.ask(ref, RetrieveAddressStatus.getInstance(),
initialConsumerResourceStatusAskTimeout)))
.reduce(identity, (done, reply) -> done.thenCombine(reply, (x, y) -> x));
connectionLogger.success("Subscriptions {0} initialized successfully.", consumers);
logger.info("Subscribed Connection <{}> to sources: {}", connectionId(), consumers);
return completionStage;
return completionStage.thenApply(object -> Done.getInstance()).exceptionally(t -> Done.getInstance());
} else {
logger.debug("Not starting consumers, no sources were configured.");
return CompletableFuture.completedStage(Done.getInstance());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ public final class AmqpClientActorTest extends AbstractBaseClientActorTest {
private static final JMSException JMS_EXCEPTION = new JMSException("FAIL");
private static final URI DUMMY = URI.create("amqp://test:1234");
private static final ConnectionId CONNECTION_ID = TestConstants.createRandomConnectionId();
private static final ConnectionFailedException SESSION_EXCEPTION =
ConnectionFailedException.newBuilder(CONNECTION_ID).build();

private ActorSystem actorSystem;
private Connection connection;
Expand Down

0 comments on commit 4ad1ec0

Please sign in to comment.