Skip to content

Commit

Permalink
Merge branch 'master' into feature/simplify-connection-config
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Oct 22, 2021
2 parents afe733f + 66317d2 commit f1ccd79
Show file tree
Hide file tree
Showing 16 changed files with 282 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingS
.event(CloseConnectionAndShutdown.class, this::closeConnectionAndShutdown)
.event(SshTunnelActor.TunnelStarted.class, this::tunnelStarted)
.eventEquals(Control.CONNECT_AFTER_TUNNEL_ESTABLISHED, this::connectAfterTunnelStarted)
.eventEquals(Control.GOTO_CONNECTED_AFTER_INITIALIZATION, this::gotoConnectedAfterInitialization)
.event(SshTunnelActor.TunnelClosed.class, this::tunnelClosed)
.event(OpenConnection.class, this::openConnectionInConnectingState);
}
Expand Down Expand Up @@ -1162,20 +1163,29 @@ protected CompletionStage<InitializationResult> startPublisherAndConsumerActors(

private State<BaseClientState, BaseClientData> handleInitializationResult(
final InitializationResult initializationResult, final BaseClientData data) {

if (initializationResult.isSuccess()) {
logger.debug("Initialization of consumers, publisher and subscriptions successful, going to CONNECTED.");
getSelf().tell(Control.GOTO_CONNECTED_AFTER_INITIALIZATION, ActorRef.noSender());
} else {
logger.info("Initialization of consumers, publisher and subscriptions failed: {}. Staying in CONNECTING " +
"state to continue with connection recovery after backoff.", initializationResult.getFailure());
getSelf().tell(initializationResult.getFailure(), ActorRef.noSender());
}
return stay();
}

private State<BaseClientState, BaseClientData> gotoConnectedAfterInitialization(final Control message,
final BaseClientData data) {
if (data.getFailureCount() == 0) {
logger.info("Initialization of consumers, publisher and subscriptions successful, going to CONNECTED.");
connectionLogger.success("Connection successful.");
data.getSessionSenders().forEach(origin -> origin.first().tell(new Status.Success(CONNECTED), getSelf()));
return goTo(CONNECTED).using(data.resetSession()
.resetFailureCount()
.setConnectionStatus(ConnectivityStatus.OPEN)
.setConnectionStatusDetails("Connected at " + Instant.now())
);
.setConnectionStatusDetails("Connected at " + Instant.now()));
} else {
logger.info("Initialization of consumers, publisher and subscriptions failed: {}. Staying in CONNECTING " +
"state to continue with connection recovery after backoff.", initializationResult.getFailure());
getSelf().tell(initializationResult.getFailure(), ActorRef.noSender());
logger.info("Initialization of consumers, publisher and subscriptions successful, but failures were " +
"received meanwhile. Staying in CONNECTING state to continue with connection recovery after backoff.");
return stay();
}
}
Expand Down Expand Up @@ -1245,7 +1255,7 @@ private State<BaseClientState, BaseClientData> connectingConnectionFailed(final
data.getSessionSenders().forEach(sender ->
sender.first().tell(getStatusToReport(event.getFailure(), sender.second()), getSelf()));

return backoffAfterFailure(event, data);
return backoffAfterFailure(event, data.increaseFailureCount());
}

private State<BaseClientState, BaseClientData> connectedConnectionFailed(final ConnectionFailure event,
Expand All @@ -1272,13 +1282,13 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
dittoProtocolSub.removeSubscriber(getSelf());
if (ConnectivityStatus.OPEN.equals(data.getDesiredConnectionStatus())) {
if (reconnectTimeoutStrategy.canReconnect()) {
if (data.getFailureCount() > 0) {
if (data.getFailureCount() > 1) {
connectionLogger.failure(
"Reconnection attempt <{0}> failed due to: {1}. Reconnect after backoff was " +
"Received {0} subsequent failures during backoff: {1}. Reconnect after backoff was " +
"already triggered.", data.getFailureCount(), event.getFailureDescription());
logger.info("Reconnection attempt <{}> failed: {}. Reconnect was already triggered.",
logger.info("Received {} subsequent failures during backoff: {}. Reconnect was already triggered.",
data.getFailureCount(), event);
return stay().using(data.increaseFailureCount());
return stay();
} else {
final Duration nextBackoff = reconnectTimeoutStrategy.getNextBackoff();
final var errorMessage =
Expand All @@ -1288,7 +1298,6 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
logger.info("Connection failed: {}. Reconnect after: {}. Resolved status: {}. " +
"Going to 'CONNECTING'", event, nextBackoff, resolvedStatus);
return goToConnecting(nextBackoff).using(data.resetSession()
.increaseFailureCount()
.setConnectionStatus(resolvedStatus)
.setConnectionStatusDetails(event.getFailureDescription())
);
Expand All @@ -1305,6 +1314,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
// stay in INITIALIZED state until re-opened manually
return goTo(INITIALIZED)
.using(data.resetSession()
.resetFailureCount()
.setConnectionStatus(connectivityStatusResolver.resolve(event))
.setConnectionStatusDetails(event.getFailureDescription()
+ " Reached maximum retries after backing off after failure and thus will " +
Expand Down Expand Up @@ -2150,7 +2160,8 @@ public String toString() {
private enum Control {
INIT_COMPLETE,
REFRESH_CLIENT_ACTOR_REFS,
CONNECT_AFTER_TUNNEL_ESTABLISHED
CONNECT_AFTER_TUNNEL_ESTABLISHED,
GOTO_CONNECTED_AFTER_INITIALIZATION
}

private static final Object SEND_DISCONNECT_ANNOUNCEMENT = new Object();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import javax.annotation.Nullable;
Expand All @@ -43,7 +44,6 @@
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.service.config.Amqp10Config;
Expand Down Expand Up @@ -396,43 +396,27 @@ protected ActorRef getPublisherActor() {
return amqpPublisherActor;
}

private CompletionStage<Object> startCommandConsumers(final List<ConsumerData> consumers, final ActorRef jmsActor) {
private CompletionStage<Done> startCommandConsumers(final List<ConsumerData> consumers, final ActorRef jmsActor) {
if (isConsuming()) {
stopCommandConsumers();
// wait a fraction of the configured timeout before asking to allow the consumer to stabilize
final CompletionStage<Object> identity =
new CompletableFuture<>().completeOnTimeout(Done.getInstance(),
initialConsumerResourceStatusAskTimeout.toMillis() / 2, TimeUnit.MILLISECONDS);
final CompletionStage<Object> completionStage = consumers.stream()
.map(consumer -> startCommandConsumer(consumer, getInboundMappingSink(), jmsActor))
.map(this::retrieveAddressStatusFromConsumerActor)
.reduce(CompletableFuture.completedStage(Done.getInstance()),
// not interested in the actual result, just if it failed or not
(stage, reply) -> stage.thenCompose(unused -> reply));
.map(ref -> identity.thenCompose(done -> Patterns.ask(ref, RetrieveAddressStatus.getInstance(),
initialConsumerResourceStatusAskTimeout)))
.reduce(identity, (done, reply) -> done.thenCombine(reply, (x, y) -> x));
connectionLogger.success("Subscriptions {0} initialized successfully.", consumers);
logger.info("Subscribed Connection <{}> to sources: {}", connectionId(), consumers);
return completionStage;
return completionStage.thenApply(object -> Done.getInstance()).exceptionally(t -> Done.getInstance());
} else {
logger.debug("Not starting consumers, no sources were configured.");
return CompletableFuture.completedStage(Done.getInstance());
}
}

private CompletionStage<Object> retrieveAddressStatusFromConsumerActor(final ActorRef ref) {
return Patterns.ask(ref, RetrieveAddressStatus.getInstance(), initialConsumerResourceStatusAskTimeout)
.thenApply(reply -> {
if (reply instanceof ResourceStatus) {
final ResourceStatus resourceStatus = (ResourceStatus) reply;
// if status of the consumer actors is not OPEN after initialization, we must fail the stage
// with an exception, otherwise the client actor wil go to CONNECTED state, despite the
// failure that occurred in the consumer
if (resourceStatus.getStatus() != ConnectivityStatus.OPEN) {
final String msg = String.format("Resource status of consumer is not OPEN, but %s: %s",
resourceStatus.getStatus(),
resourceStatus.getStatusDetails().orElse("(no status details provided)"));
throw new IllegalStateException(msg);
}
}
return reply;
});
}

private ActorRef startCommandConsumer(final ConsumerData consumer, final Sink<Object, NotUsed> inboundMappingSink,
final ActorRef jmsActor) {
final String namePrefix = consumer.getActorNamePrefix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.Enforcement;
import org.eclipse.ditto.connectivity.model.EnforcementFilterFactory;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
Expand Down Expand Up @@ -117,6 +118,10 @@ private AmqpConsumerActor(final Connection connection, final ConsumerData consum
this.jmsActor = checkNotNull(jmsActor, "jmsActor");
jmsActorAskTimeout = connectionConfig.getClientActorAskTimeout();

// the amqp consumer is OPEN (ready to handle messages) after setMessageListener() was called successfully
handleAddressStatus(ConnectivityModelFactory.newSourceStatus(InstanceIdentifierSupplier.getInstance().get(),
ConnectivityStatus.UNKNOWN, sourceAddress, "Consumer is being initialized.", Instant.now()));

messageRateLimiter = initMessageRateLimiter(amqp10Config);
backOffActor = getContext().actorOf(BackOffActor.props(amqp10Config.getBackOffConfig()));

Expand Down Expand Up @@ -222,6 +227,7 @@ private void initMessageConsumer() {
if (messageConsumer != null) {
messageConsumer.setMessageListener(this);
consumerData = consumerData.withMessageConsumer(messageConsumer);
resetResourceStatus();
}
} catch (final Exception e) {
final ResourceStatus resourceStatus =
Expand Down Expand Up @@ -308,7 +314,6 @@ private void messageConsumerCreated(final CreateMessageConsumerResponse response
destroyMessageConsumer();
messageConsumer = response.messageConsumer;
initMessageConsumer();
resetResourceStatus();
} else {
// got an orphaned message consumer! this is an error.
logger.error("RESOURCE_LEAK! Got created MessageConsumer <{}> for <{}>, while I have <{}> for <{}>",
Expand All @@ -321,6 +326,12 @@ private void messageConsumerFailed(final Status.Failure failure) {
final ConnectionFailure connectionFailed = ConnectionFailure.of(getSelf(), failure.cause(),
"Failed to recreate closed message consumer");
getContext().getParent().tell(connectionFailed, getSelf());
final ResourceStatus addressStatus =
ConnectivityModelFactory.newStatusUpdate(InstanceIdentifierSupplier.getInstance().get(),
connectivityStatusResolver.resolve(failure.cause()), sourceAddress,
"Failed to recreate closed message consumer.",
Instant.now());
handleAddressStatus(addressStatus);
}

private void handleJmsMessage(final JmsMessage message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,21 @@ public void reconnectsInConnectingStateIfFailureResponseReceived() {

@Test
public void reconnectsInConnectingStateAfterBackoffWhenMultipleFailuresAreReceived() {
reconnectsAfterBackoffWhenMultipleFailuresReceived(false);
// expect reconnects after 100ms + 200ms + 400ms + 400ms = 1100ms backoff in total
final long expectedTotalBackoffMs = 1100L;
reconnectsAfterBackoffWhenMultipleFailuresReceived(false, expectedTotalBackoffMs);
}

@Test
public void reconnectsFromConnectedStateAfterBackoffWhenMultipleFailuresAreReceived() {
reconnectsAfterBackoffWhenMultipleFailuresReceived(true);
// expect reconnects after 200ms + 400ms + 400ms + 400ms = 1400ms backoff in total
// because we transition from CONNECTED -> CONNECTING which already adds 100ms backoff
final long expectedTotalBackoffMs = 1400L;
reconnectsAfterBackoffWhenMultipleFailuresReceived(true, expectedTotalBackoffMs);
}

private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean initialConnectionSucceeds) {
private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean initialConnectionSucceeds,
final long expectedTotalBackoffMs) {
new TestKit(actorSystem) {{
final ConnectionId randomConnectionId = TestConstants.createRandomConnectionId();
final Connection connection =
Expand Down Expand Up @@ -175,10 +181,8 @@ private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean in
// verify that doConnectClient is called after correct backoff
thenExpectConnectClientCalledAfterTimeout(i + 2, connectivityConfig.getClientConfig().getMaxBackoff());
}
// expecting 4 invocations of doConnectClient within 100ms + 200ms + 400ms + 400ms = 1100ms backoff in total
final long totalBackoffDurationMs = System.currentTimeMillis() - start;
final long expectedTotalBackoffMs = 1100L;
final long tolerancePerBackoffMs = 50L; // allow 50ms tolerance per backoff until connectClient is called
final long tolerancePerBackoffMs = 100L; // allow 100ms tolerance per backoff until connectClient is called
assertThat(totalBackoffDurationMs).isGreaterThan(expectedTotalBackoffMs);
assertThat(totalBackoffDurationMs).isLessThan(
expectedTotalBackoffMs + (nrOfBackoffs * tolerancePerBackoffMs));
Expand Down
Loading

0 comments on commit f1ccd79

Please sign in to comment.