Skip to content

Commit

Permalink
stabilize connection live status for amqp 1.0 connections
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Oct 15, 2021
1 parent e67d7d0 commit b98226d
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -174,7 +173,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash<BaseClientSta

private static final String DITTO_STATE_TIMEOUT_TIMER = "dittoStateTimeout";
private static final int SOCKET_CHECK_TIMEOUT_MS = 2000;
private static final String CLOSED_BECAUSE_OF_UNKNOWN_FAILURE_MISCONFIGURATION_STATUS_IN_CLIENT =
private static final String CLOSED_BECAUSE_OF_UNKNOWN_FAILURE_MISCONFIGURATION_STATUS_IN_CLIENT =
"Closed because of unknown/failure/misconfiguration status in client.";
/**
* Common logger for all sub-classes of BaseClientActor as its MDC already contains the connection ID.
Expand Down Expand Up @@ -297,7 +296,7 @@ public void preStart() throws Exception {
// start with UNKNOWN state but send self OpenConnection because client actors are never created closed
final BaseClientData startingData =
BaseClientData.BaseClientDataBuilder.from(connection.getId(), connection, ConnectivityStatus.UNKNOWN,
ConnectivityStatus.OPEN, "initialized", Instant.now())
ConnectivityStatus.OPEN, "initialized", Instant.now())
.build();
startWith(UNKNOWN, startingData);

Expand Down Expand Up @@ -759,6 +758,7 @@ protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectingS
.event(CloseConnectionAndShutdown.class, this::closeConnectionAndShutdown)
.event(SshTunnelActor.TunnelStarted.class, this::tunnelStarted)
.eventEquals(Control.CONNECT_AFTER_TUNNEL_ESTABLISHED, this::connectAfterTunnelStarted)
.eventEquals(Control.GOTO_CONNECTED_AFTER_INITIALIZATION, this::gotoConnectedAfterInitialization)
.event(SshTunnelActor.TunnelClosed.class, this::tunnelClosed)
.event(OpenConnection.class, this::openConnectionInConnectingState);
}
Expand Down Expand Up @@ -1230,20 +1230,29 @@ protected CompletionStage<InitializationResult> startPublisherAndConsumerActors(

private State<BaseClientState, BaseClientData> handleInitializationResult(
final InitializationResult initializationResult, final BaseClientData data) {

if (initializationResult.isSuccess()) {
logger.debug("Initialization of consumers, publisher and subscriptions successful, going to CONNECTED.");
connectionLogger.success("Connection successful.");
data.getSessionSenders().forEach(origin -> origin.first().tell(new Status.Success(CONNECTED), getSelf()));
return goTo(CONNECTED).using(data.resetSession()
.resetFailureCount()
.setConnectionStatus(ConnectivityStatus.OPEN)
.setConnectionStatusDetails("Connected at " + Instant.now())
);
getSelf().tell(Control.GOTO_CONNECTED_AFTER_INITIALIZATION, ActorRef.noSender());
} else {
logger.info("Initialization of consumers, publisher and subscriptions failed: {}. Staying in CONNECTING " +
"state to continue with connection recovery after backoff.", initializationResult.getFailure());
getSelf().tell(initializationResult.getFailure(), ActorRef.noSender());
}
return stay();
}

private State<BaseClientState, BaseClientData> gotoConnectedAfterInitialization(final Control message,
final BaseClientData data) {
if (data.getFailureCount() == 0) {
logger.info("Initialization of consumers, publisher and subscriptions successful, going to CONNECTED.");
connectionLogger.success("Connection successful.");
data.getSessionSenders().forEach(origin -> origin.first().tell(new Status.Success(CONNECTED), getSelf()));
return goTo(CONNECTED).using(data.resetSession()
.resetFailureCount()
.setConnectionStatus(ConnectivityStatus.OPEN)
.setConnectionStatusDetails("Connected at " + Instant.now()));
} else {
logger.info("Initialization of consumers, publisher and subscriptions successful, but failures were " +
"received meanwhile. Staying in CONNECTING state to continue with connection recovery after backoff.");
return stay();
}
}
Expand Down Expand Up @@ -1278,22 +1287,22 @@ protected CompletionStage<Status.Status> startConsumerActors(@Nullable final Cli
private State<BaseClientState, BaseClientData> clientDisconnected(final ClientDisconnected event,
final BaseClientData data) {

connectionLogger.success("Disconnected successfully.");
connectionLogger.success("Disconnected successfully.");

cleanupResourcesForConnection();
tellTunnelActor(SshTunnelActor.TunnelControl.STOP_TUNNEL);
data.getSessionSenders()
.forEach(sender -> sender.first().tell(new Status.Success(DISCONNECTED), getSelf()));
cleanupResourcesForConnection();
tellTunnelActor(SshTunnelActor.TunnelControl.STOP_TUNNEL);
data.getSessionSenders()
.forEach(sender -> sender.first().tell(new Status.Success(DISCONNECTED), getSelf()));

final BaseClientData nextStateData = data.resetSession()
.setConnectionStatus(ConnectivityStatus.CLOSED)
.setConnectionStatusDetails("Disconnected at " + Instant.now());
final BaseClientData nextStateData = data.resetSession()
.setConnectionStatus(ConnectivityStatus.CLOSED)
.setConnectionStatusDetails("Disconnected at " + Instant.now());

if (event.shutdownAfterDisconnected()) {
return stop(Normal(), nextStateData);
} else {
return goTo(DISCONNECTED).using(nextStateData);
}
if (event.shutdownAfterDisconnected()) {
return stop(Normal(), nextStateData);
} else {
return goTo(DISCONNECTED).using(nextStateData);
}
}

private void tellTunnelActor(final SshTunnelActor.TunnelControl control) {
Expand All @@ -1313,7 +1322,7 @@ private State<BaseClientState, BaseClientData> connectingConnectionFailed(final
data.getSessionSenders().forEach(sender ->
sender.first().tell(getStatusToReport(event.getFailure(), sender.second()), getSelf()));

return backoffAfterFailure(event, data);
return backoffAfterFailure(event, data.increaseFailureCount());
}

private State<BaseClientState, BaseClientData> connectedConnectionFailed(final ConnectionFailure event,
Expand All @@ -1340,13 +1349,13 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
dittoProtocolSub.removeSubscriber(getSelf());
if (ConnectivityStatus.OPEN.equals(data.getDesiredConnectionStatus())) {
if (reconnectTimeoutStrategy.canReconnect()) {
if (data.getFailureCount() > 0) {
if (data.getFailureCount() > 1) {
connectionLogger.failure(
"Reconnection attempt <{0}> failed due to: {1}. Reconnect after backoff was " +
"Received {0} subsequent failures during backoff: {1}. Reconnect after backoff was " +
"already triggered.", data.getFailureCount(), event.getFailureDescription());
logger.info("Reconnection attempt <{}> failed: {}. Reconnect was already triggered.",
logger.info("Received {} subsequent failures during backoff: {}. Reconnect was already triggered.",
data.getFailureCount(), event);
return stay().using(data.increaseFailureCount());
return stay();
} else {
final Duration nextBackoff = reconnectTimeoutStrategy.getNextBackoff();
final var errorMessage =
Expand All @@ -1356,7 +1365,6 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
logger.info("Connection failed: {}. Reconnect after: {}. Resolved status: {}. " +
"Going to 'CONNECTING'", event, nextBackoff, resolvedStatus);
return goToConnecting(nextBackoff).using(data.resetSession()
.increaseFailureCount()
.setConnectionStatus(resolvedStatus)
.setConnectionStatusDetails(event.getFailureDescription())
);
Expand All @@ -1373,6 +1381,7 @@ private State<BaseClientState, BaseClientData> backoffAfterFailure(final Connect
// stay in INITIALIZED state until re-opened manually
return goTo(INITIALIZED)
.using(data.resetSession()
.resetFailureCount()
.setConnectionStatus(connectivityStatusResolver.resolve(event))
.setConnectionStatusDetails(event.getFailureDescription()
+ " Reached maximum retries after backing off after failure and thus will " +
Expand Down Expand Up @@ -2250,7 +2259,8 @@ public String toString() {
private enum Control {
INIT_COMPLETE,
REFRESH_CLIENT_ACTOR_REFS,
CONNECT_AFTER_TUNNEL_ESTABLISHED
CONNECT_AFTER_TUNNEL_ESTABLISHED,
GOTO_CONNECTED_AFTER_INITIALIZATION
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;

import javax.annotation.Nullable;
Expand All @@ -43,7 +44,6 @@
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException;
import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection;
import org.eclipse.ditto.connectivity.service.config.Amqp10Config;
Expand Down Expand Up @@ -423,15 +423,19 @@ protected ActorRef getPublisherActor() {
return amqpPublisherActor;
}

private CompletionStage<Object> startCommandConsumers(final List<ConsumerData> consumers, final ActorRef jmsActor) {
private CompletionStage<Done> startCommandConsumers(final List<ConsumerData> consumers, final ActorRef jmsActor) {
if (isConsuming()) {
stopCommandConsumers();
final CompletionStage<Object> completionStage = consumers.stream()
// wait a fraction of the configured timeout before asking to allow the consumer to stabilize
final CompletionStage<Done> identity =
new CompletableFuture<Done>().completeOnTimeout(Done.getInstance(),
initialConsumerResourceStatusAskTimeout.toMillis() / 2, TimeUnit.MILLISECONDS);
final CompletionStage<Done> completionStage = consumers.stream()
.map(consumer -> startCommandConsumer(consumer, getInboundMappingSink(), jmsActor))
.map(this::retrieveAddressStatusFromConsumerActor)
.reduce(CompletableFuture.completedStage(Done.getInstance()),
// not interested in the actual result, just if it failed or not
(stage, reply) -> stage.thenCompose(unused -> reply));
.map(ref -> Patterns.ask(ref, RetrieveAddressStatus.getInstance(),
initialConsumerResourceStatusAskTimeout).thenApply(result -> Done.getInstance()))
.reduce(identity, (done, reply) -> done.thenCompose(result -> done))
.exceptionally(t -> Done.getInstance());
connectionLogger.success("Subscriptions {0} initialized successfully.", consumers);
logger.info("Subscribed Connection <{}> to sources: {}", connectionId(), consumers);
return completionStage;
Expand All @@ -441,25 +445,6 @@ private CompletionStage<Object> startCommandConsumers(final List<ConsumerData> c
}
}

private CompletionStage<Object> retrieveAddressStatusFromConsumerActor(final ActorRef ref) {
return Patterns.ask(ref, RetrieveAddressStatus.getInstance(), initialConsumerResourceStatusAskTimeout)
.thenApply(reply -> {
if (reply instanceof ResourceStatus) {
final ResourceStatus resourceStatus = (ResourceStatus) reply;
// if status of the consumer actors is not OPEN after initialization, we must fail the stage
// with an exception, otherwise the client actor wil go to CONNECTED state, despite the
// failure that occurred in the consumer
if (resourceStatus.getStatus() != ConnectivityStatus.OPEN) {
final String msg = String.format("Resource status of consumer is not OPEN, but %s: %s",
resourceStatus.getStatus(),
resourceStatus.getStatusDetails().orElse("(no status details provided)"));
throw new IllegalStateException(msg);
}
}
return reply;
});
}

private ActorRef startCommandConsumer(final ConsumerData consumer, final Sink<Object, NotUsed> inboundMappingSink,
final ActorRef jmsActor) {
final String namePrefix = consumer.getActorNamePrefix();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.eclipse.ditto.connectivity.api.ExternalMessageFactory;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.Enforcement;
import org.eclipse.ditto.connectivity.model.EnforcementFilterFactory;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
Expand Down Expand Up @@ -122,6 +123,10 @@ private AmqpConsumerActor(final Connection connection, final ConsumerData consum
this.jmsActor = checkNotNull(jmsActor, "jmsActor");
jmsActorAskTimeout = connectionConfig.getClientActorAskTimeout();

// the amqp consumer is OPEN (ready to handle messages) after setMessageListener() was called successfully
handleAddressStatus(ConnectivityModelFactory.newSourceStatus(InstanceIdentifierSupplier.getInstance().get(),
ConnectivityStatus.UNKNOWN, sourceAddress, "Consumer is being initialized.", Instant.now()));

messageRateLimiter = initMessageRateLimiter(amqp10Config);
backOffActor = getContext().actorOf(BackOffActor.props(amqp10Config.getBackOffConfig()));

Expand Down Expand Up @@ -232,6 +237,7 @@ private void initMessageConsumer() {
if (messageConsumer != null) {
messageConsumer.setMessageListener(this);
consumerData = consumerData.withMessageConsumer(messageConsumer);
resetResourceStatus();
}
} catch (final Exception e) {
final ResourceStatus resourceStatus =
Expand Down Expand Up @@ -317,7 +323,6 @@ private void messageConsumerCreated(final CreateMessageConsumerResponse response
destroyMessageConsumer();
messageConsumer = response.messageConsumer;
initMessageConsumer();
resetResourceStatus();
} else {
// got an orphaned message consumer! this is an error.
logger.error("RESOURCE_LEAK! Got created MessageConsumer <{}> for <{}>, while I have <{}> for <{}>",
Expand All @@ -330,6 +335,12 @@ private void messageConsumerFailed(final Status.Failure failure) {
final ConnectionFailure connectionFailed = ConnectionFailure.of(getSelf(), failure.cause(),
"Failed to recreate closed message consumer");
getContext().getParent().tell(connectionFailed, getSelf());
final ResourceStatus addressStatus =
ConnectivityModelFactory.newStatusUpdate(InstanceIdentifierSupplier.getInstance().get(),
connectivityStatusResolver.resolve(failure.cause()), sourceAddress,
"Failed to recreate closed message consumer.",
Instant.now());
handleAddressStatus(addressStatus);
}

private void handleJmsMessage(final JmsMessage message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,21 @@ public void reconnectsInConnectingStateIfFailureResponseReceived() {

@Test
public void reconnectsInConnectingStateAfterBackoffWhenMultipleFailuresAreReceived() {
reconnectsAfterBackoffWhenMultipleFailuresReceived(false);
// expect reconnects after 100ms + 200ms + 400ms + 400ms = 1100ms backoff in total
final long expectedTotalBackoffMs = 1100L;
reconnectsAfterBackoffWhenMultipleFailuresReceived(false, expectedTotalBackoffMs);
}

@Test
public void reconnectsFromConnectedStateAfterBackoffWhenMultipleFailuresAreReceived() {
reconnectsAfterBackoffWhenMultipleFailuresReceived(true);
// expect reconnects after 200ms + 400ms + 400ms + 400ms = 1400ms backoff in total
// because we transition from CONNECTED -> CONNECTING which already adds 100ms backoff
final long expectedTotalBackoffMs = 1400L;
reconnectsAfterBackoffWhenMultipleFailuresReceived(true, expectedTotalBackoffMs);
}

private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean initialConnectionSucceeds) {
private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean initialConnectionSucceeds,
final long expectedTotalBackoffMs) {
new TestKit(actorSystem) {{
final ConnectionId randomConnectionId = TestConstants.createRandomConnectionId();
final Connection connection =
Expand Down Expand Up @@ -173,10 +179,8 @@ private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean in
// verify that doConnectClient is called after correct backoff
thenExpectConnectClientCalledAfterTimeout(i + 2, connectivityConfig.getClientConfig().getMaxBackoff());
}
// expecting 4 invocations of doConnectClient within 100ms + 200ms + 400ms + 400ms = 1100ms backoff in total
final long totalBackoffDurationMs = System.currentTimeMillis() - start;
final long expectedTotalBackoffMs = 1100L;
final long tolerancePerBackoffMs = 50L; // allow 50ms tolerance per backoff until connectClient is called
final long tolerancePerBackoffMs = 100L; // allow 100ms tolerance per backoff until connectClient is called
assertThat(totalBackoffDurationMs).isGreaterThan(expectedTotalBackoffMs);
assertThat(totalBackoffDurationMs).isLessThan(
expectedTotalBackoffMs + (nrOfBackoffs * tolerancePerBackoffMs));
Expand Down

0 comments on commit b98226d

Please sign in to comment.