From b98226d60fc3fa2224f868c3c59cd454cbb3258c Mon Sep 17 00:00:00 2001 From: Dominik Guggemos Date: Mon, 4 Oct 2021 09:24:20 +0200 Subject: [PATCH] stabilize connection live status for amqp 1.0 connections Signed-off-by: Dominik Guggemos --- .../service/messaging/BaseClientActor.java | 74 +++++++++++-------- .../messaging/amqp/AmqpClientActor.java | 37 +++------- .../messaging/amqp/AmqpConsumerActor.java | 13 +++- .../messaging/BaseClientActorTest.java | 16 ++-- .../messaging/amqp/AmqpClientActorTest.java | 45 ++++++++++- .../service/src/test/resources/test.conf | 4 + 6 files changed, 122 insertions(+), 67 deletions(-) diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java index 39c4b9cd68..78bd817fda 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActor.java @@ -40,7 +40,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.StreamSupport; @@ -174,7 +173,7 @@ public abstract class BaseClientActor extends AbstractFSMWithStash inConnectingS .event(CloseConnectionAndShutdown.class, this::closeConnectionAndShutdown) .event(SshTunnelActor.TunnelStarted.class, this::tunnelStarted) .eventEquals(Control.CONNECT_AFTER_TUNNEL_ESTABLISHED, this::connectAfterTunnelStarted) + .eventEquals(Control.GOTO_CONNECTED_AFTER_INITIALIZATION, this::gotoConnectedAfterInitialization) .event(SshTunnelActor.TunnelClosed.class, this::tunnelClosed) .event(OpenConnection.class, this::openConnectionInConnectingState); } @@ -1230,20 +1230,29 @@ protected CompletionStage startPublisherAndConsumerActors( private State handleInitializationResult( final InitializationResult initializationResult, final BaseClientData data) { - if (initializationResult.isSuccess()) { - logger.debug("Initialization of consumers, publisher and subscriptions successful, going to CONNECTED."); - connectionLogger.success("Connection successful."); - data.getSessionSenders().forEach(origin -> origin.first().tell(new Status.Success(CONNECTED), getSelf())); - return goTo(CONNECTED).using(data.resetSession() - .resetFailureCount() - .setConnectionStatus(ConnectivityStatus.OPEN) - .setConnectionStatusDetails("Connected at " + Instant.now()) - ); + getSelf().tell(Control.GOTO_CONNECTED_AFTER_INITIALIZATION, ActorRef.noSender()); } else { logger.info("Initialization of consumers, publisher and subscriptions failed: {}. Staying in CONNECTING " + "state to continue with connection recovery after backoff.", initializationResult.getFailure()); getSelf().tell(initializationResult.getFailure(), ActorRef.noSender()); + } + return stay(); + } + + private State gotoConnectedAfterInitialization(final Control message, + final BaseClientData data) { + if (data.getFailureCount() == 0) { + logger.info("Initialization of consumers, publisher and subscriptions successful, going to CONNECTED."); + connectionLogger.success("Connection successful."); + data.getSessionSenders().forEach(origin -> origin.first().tell(new Status.Success(CONNECTED), getSelf())); + return goTo(CONNECTED).using(data.resetSession() + .resetFailureCount() + .setConnectionStatus(ConnectivityStatus.OPEN) + .setConnectionStatusDetails("Connected at " + Instant.now())); + } else { + logger.info("Initialization of consumers, publisher and subscriptions successful, but failures were " + + "received meanwhile. Staying in CONNECTING state to continue with connection recovery after backoff."); return stay(); } } @@ -1278,22 +1287,22 @@ protected CompletionStage startConsumerActors(@Nullable final Cli private State clientDisconnected(final ClientDisconnected event, final BaseClientData data) { - connectionLogger.success("Disconnected successfully."); + connectionLogger.success("Disconnected successfully."); - cleanupResourcesForConnection(); - tellTunnelActor(SshTunnelActor.TunnelControl.STOP_TUNNEL); - data.getSessionSenders() - .forEach(sender -> sender.first().tell(new Status.Success(DISCONNECTED), getSelf())); + cleanupResourcesForConnection(); + tellTunnelActor(SshTunnelActor.TunnelControl.STOP_TUNNEL); + data.getSessionSenders() + .forEach(sender -> sender.first().tell(new Status.Success(DISCONNECTED), getSelf())); - final BaseClientData nextStateData = data.resetSession() - .setConnectionStatus(ConnectivityStatus.CLOSED) - .setConnectionStatusDetails("Disconnected at " + Instant.now()); + final BaseClientData nextStateData = data.resetSession() + .setConnectionStatus(ConnectivityStatus.CLOSED) + .setConnectionStatusDetails("Disconnected at " + Instant.now()); - if (event.shutdownAfterDisconnected()) { - return stop(Normal(), nextStateData); - } else { - return goTo(DISCONNECTED).using(nextStateData); - } + if (event.shutdownAfterDisconnected()) { + return stop(Normal(), nextStateData); + } else { + return goTo(DISCONNECTED).using(nextStateData); + } } private void tellTunnelActor(final SshTunnelActor.TunnelControl control) { @@ -1313,7 +1322,7 @@ private State connectingConnectionFailed(final data.getSessionSenders().forEach(sender -> sender.first().tell(getStatusToReport(event.getFailure(), sender.second()), getSelf())); - return backoffAfterFailure(event, data); + return backoffAfterFailure(event, data.increaseFailureCount()); } private State connectedConnectionFailed(final ConnectionFailure event, @@ -1340,13 +1349,13 @@ private State backoffAfterFailure(final Connect dittoProtocolSub.removeSubscriber(getSelf()); if (ConnectivityStatus.OPEN.equals(data.getDesiredConnectionStatus())) { if (reconnectTimeoutStrategy.canReconnect()) { - if (data.getFailureCount() > 0) { + if (data.getFailureCount() > 1) { connectionLogger.failure( - "Reconnection attempt <{0}> failed due to: {1}. Reconnect after backoff was " + + "Received {0} subsequent failures during backoff: {1}. Reconnect after backoff was " + "already triggered.", data.getFailureCount(), event.getFailureDescription()); - logger.info("Reconnection attempt <{}> failed: {}. Reconnect was already triggered.", + logger.info("Received {} subsequent failures during backoff: {}. Reconnect was already triggered.", data.getFailureCount(), event); - return stay().using(data.increaseFailureCount()); + return stay(); } else { final Duration nextBackoff = reconnectTimeoutStrategy.getNextBackoff(); final var errorMessage = @@ -1356,7 +1365,6 @@ private State backoffAfterFailure(final Connect logger.info("Connection failed: {}. Reconnect after: {}. Resolved status: {}. " + "Going to 'CONNECTING'", event, nextBackoff, resolvedStatus); return goToConnecting(nextBackoff).using(data.resetSession() - .increaseFailureCount() .setConnectionStatus(resolvedStatus) .setConnectionStatusDetails(event.getFailureDescription()) ); @@ -1373,6 +1381,7 @@ private State backoffAfterFailure(final Connect // stay in INITIALIZED state until re-opened manually return goTo(INITIALIZED) .using(data.resetSession() + .resetFailureCount() .setConnectionStatus(connectivityStatusResolver.resolve(event)) .setConnectionStatusDetails(event.getFailureDescription() + " Reached maximum retries after backing off after failure and thus will " + @@ -2250,7 +2259,8 @@ public String toString() { private enum Control { INIT_COMPLETE, REFRESH_CLIENT_ACTOR_REFS, - CONNECT_AFTER_TUNNEL_ESTABLISHED + CONNECT_AFTER_TUNNEL_ESTABLISHED, + GOTO_CONNECTED_AFTER_INITIALIZATION } /** diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpClientActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpClientActor.java index e9de12a10c..466c3d2cfb 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpClientActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpClientActor.java @@ -23,6 +23,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -43,7 +44,6 @@ import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectionConfigurationInvalidException; import org.eclipse.ditto.connectivity.model.ConnectivityStatus; -import org.eclipse.ditto.connectivity.model.ResourceStatus; import org.eclipse.ditto.connectivity.model.signals.commands.exceptions.ConnectionFailedException; import org.eclipse.ditto.connectivity.model.signals.commands.modify.TestConnection; import org.eclipse.ditto.connectivity.service.config.Amqp10Config; @@ -423,15 +423,19 @@ protected ActorRef getPublisherActor() { return amqpPublisherActor; } - private CompletionStage startCommandConsumers(final List consumers, final ActorRef jmsActor) { + private CompletionStage startCommandConsumers(final List consumers, final ActorRef jmsActor) { if (isConsuming()) { stopCommandConsumers(); - final CompletionStage completionStage = consumers.stream() + // wait a fraction of the configured timeout before asking to allow the consumer to stabilize + final CompletionStage identity = + new CompletableFuture().completeOnTimeout(Done.getInstance(), + initialConsumerResourceStatusAskTimeout.toMillis() / 2, TimeUnit.MILLISECONDS); + final CompletionStage completionStage = consumers.stream() .map(consumer -> startCommandConsumer(consumer, getInboundMappingSink(), jmsActor)) - .map(this::retrieveAddressStatusFromConsumerActor) - .reduce(CompletableFuture.completedStage(Done.getInstance()), - // not interested in the actual result, just if it failed or not - (stage, reply) -> stage.thenCompose(unused -> reply)); + .map(ref -> Patterns.ask(ref, RetrieveAddressStatus.getInstance(), + initialConsumerResourceStatusAskTimeout).thenApply(result -> Done.getInstance())) + .reduce(identity, (done, reply) -> done.thenCompose(result -> done)) + .exceptionally(t -> Done.getInstance()); connectionLogger.success("Subscriptions {0} initialized successfully.", consumers); logger.info("Subscribed Connection <{}> to sources: {}", connectionId(), consumers); return completionStage; @@ -441,25 +445,6 @@ private CompletionStage startCommandConsumers(final List c } } - private CompletionStage retrieveAddressStatusFromConsumerActor(final ActorRef ref) { - return Patterns.ask(ref, RetrieveAddressStatus.getInstance(), initialConsumerResourceStatusAskTimeout) - .thenApply(reply -> { - if (reply instanceof ResourceStatus) { - final ResourceStatus resourceStatus = (ResourceStatus) reply; - // if status of the consumer actors is not OPEN after initialization, we must fail the stage - // with an exception, otherwise the client actor wil go to CONNECTED state, despite the - // failure that occurred in the consumer - if (resourceStatus.getStatus() != ConnectivityStatus.OPEN) { - final String msg = String.format("Resource status of consumer is not OPEN, but %s: %s", - resourceStatus.getStatus(), - resourceStatus.getStatusDetails().orElse("(no status details provided)")); - throw new IllegalStateException(msg); - } - } - return reply; - }); - } - private ActorRef startCommandConsumer(final ConsumerData consumer, final Sink inboundMappingSink, final ActorRef jmsActor) { final String namePrefix = consumer.getActorNamePrefix(); diff --git a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java index bce0232b72..5350bcc28f 100644 --- a/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java +++ b/connectivity/service/src/main/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpConsumerActor.java @@ -49,6 +49,7 @@ import org.eclipse.ditto.connectivity.api.ExternalMessageFactory; import org.eclipse.ditto.connectivity.model.Connection; import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory; +import org.eclipse.ditto.connectivity.model.ConnectivityStatus; import org.eclipse.ditto.connectivity.model.Enforcement; import org.eclipse.ditto.connectivity.model.EnforcementFilterFactory; import org.eclipse.ditto.connectivity.model.ResourceStatus; @@ -122,6 +123,10 @@ private AmqpConsumerActor(final Connection connection, final ConsumerData consum this.jmsActor = checkNotNull(jmsActor, "jmsActor"); jmsActorAskTimeout = connectionConfig.getClientActorAskTimeout(); + // the amqp consumer is OPEN (ready to handle messages) after setMessageListener() was called successfully + handleAddressStatus(ConnectivityModelFactory.newSourceStatus(InstanceIdentifierSupplier.getInstance().get(), + ConnectivityStatus.UNKNOWN, sourceAddress, "Consumer is being initialized.", Instant.now())); + messageRateLimiter = initMessageRateLimiter(amqp10Config); backOffActor = getContext().actorOf(BackOffActor.props(amqp10Config.getBackOffConfig())); @@ -232,6 +237,7 @@ private void initMessageConsumer() { if (messageConsumer != null) { messageConsumer.setMessageListener(this); consumerData = consumerData.withMessageConsumer(messageConsumer); + resetResourceStatus(); } } catch (final Exception e) { final ResourceStatus resourceStatus = @@ -317,7 +323,6 @@ private void messageConsumerCreated(final CreateMessageConsumerResponse response destroyMessageConsumer(); messageConsumer = response.messageConsumer; initMessageConsumer(); - resetResourceStatus(); } else { // got an orphaned message consumer! this is an error. logger.error("RESOURCE_LEAK! Got created MessageConsumer <{}> for <{}>, while I have <{}> for <{}>", @@ -330,6 +335,12 @@ private void messageConsumerFailed(final Status.Failure failure) { final ConnectionFailure connectionFailed = ConnectionFailure.of(getSelf(), failure.cause(), "Failed to recreate closed message consumer"); getContext().getParent().tell(connectionFailed, getSelf()); + final ResourceStatus addressStatus = + ConnectivityModelFactory.newStatusUpdate(InstanceIdentifierSupplier.getInstance().get(), + connectivityStatusResolver.resolve(failure.cause()), sourceAddress, + "Failed to recreate closed message consumer.", + Instant.now()); + handleAddressStatus(addressStatus); } private void handleJmsMessage(final JmsMessage message) { diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActorTest.java index d3356344cc..f7c449c592 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/BaseClientActorTest.java @@ -133,15 +133,21 @@ public void reconnectsInConnectingStateIfFailureResponseReceived() { @Test public void reconnectsInConnectingStateAfterBackoffWhenMultipleFailuresAreReceived() { - reconnectsAfterBackoffWhenMultipleFailuresReceived(false); + // expect reconnects after 100ms + 200ms + 400ms + 400ms = 1100ms backoff in total + final long expectedTotalBackoffMs = 1100L; + reconnectsAfterBackoffWhenMultipleFailuresReceived(false, expectedTotalBackoffMs); } @Test public void reconnectsFromConnectedStateAfterBackoffWhenMultipleFailuresAreReceived() { - reconnectsAfterBackoffWhenMultipleFailuresReceived(true); + // expect reconnects after 200ms + 400ms + 400ms + 400ms = 1400ms backoff in total + // because we transition from CONNECTED -> CONNECTING which already adds 100ms backoff + final long expectedTotalBackoffMs = 1400L; + reconnectsAfterBackoffWhenMultipleFailuresReceived(true, expectedTotalBackoffMs); } - private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean initialConnectionSucceeds) { + private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean initialConnectionSucceeds, + final long expectedTotalBackoffMs) { new TestKit(actorSystem) {{ final ConnectionId randomConnectionId = TestConstants.createRandomConnectionId(); final Connection connection = @@ -173,10 +179,8 @@ private void reconnectsAfterBackoffWhenMultipleFailuresReceived(final boolean in // verify that doConnectClient is called after correct backoff thenExpectConnectClientCalledAfterTimeout(i + 2, connectivityConfig.getClientConfig().getMaxBackoff()); } - // expecting 4 invocations of doConnectClient within 100ms + 200ms + 400ms + 400ms = 1100ms backoff in total final long totalBackoffDurationMs = System.currentTimeMillis() - start; - final long expectedTotalBackoffMs = 1100L; - final long tolerancePerBackoffMs = 50L; // allow 50ms tolerance per backoff until connectClient is called + final long tolerancePerBackoffMs = 100L; // allow 100ms tolerance per backoff until connectClient is called assertThat(totalBackoffDurationMs).isGreaterThan(expectedTotalBackoffMs); assertThat(totalBackoffDurationMs).isLessThan( expectedTotalBackoffMs + (nrOfBackoffs * tolerancePerBackoffMs)); diff --git a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpClientActorTest.java b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpClientActorTest.java index 81877b277f..0e6392cfda 100644 --- a/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpClientActorTest.java +++ b/connectivity/service/src/test/java/org/eclipse/ditto/connectivity/service/messaging/amqp/AmqpClientActorTest.java @@ -43,6 +43,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Predicate; @@ -53,6 +54,7 @@ import javax.annotation.Nullable; import javax.jms.CompletionListener; import javax.jms.Destination; +import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -70,6 +72,7 @@ import org.apache.qpid.jms.message.JmsTextMessage; import org.apache.qpid.jms.provider.amqp.AmqpConnection; import org.apache.qpid.jms.provider.amqp.message.AmqpJmsTextMessageFacade; +import org.apache.qpid.jms.provider.exceptions.ProviderSecurityException; import org.apache.qpid.proton.amqp.Symbol; import org.assertj.core.api.ThrowableAssert; import org.awaitility.Awaitility; @@ -217,7 +220,7 @@ public void invalidSpecificOptionsThrowConnectionConfigurationInvalidException() specificOptions.put("failover.unknown.option", "100"); specificOptions.put("failover.nested.amqp.vhost", "ditto"); final Connection connection = ConnectivityModelFactory.newConnectionBuilder(createRandomConnectionId(), - ConnectionType.AMQP_10, ConnectivityStatus.OPEN, TestConstants.getUriOfNewMockServer()) + ConnectionType.AMQP_10, ConnectivityStatus.OPEN, TestConstants.getUriOfNewMockServer()) .specificConfig(specificOptions) .sources(singletonList( ConnectivityModelFactory.newSourceBuilder() @@ -454,6 +457,43 @@ public void testCreateConsumerFails() throws JMSException { }}; } + @Test + public void testSetMessageListenerOnConsumerFails() throws JMSException { + new TestKit(actorSystem) {{ + + // ProviderSecurityException resolves to MISCONFIGURED state + final IllegalStateException jmsEx = new IllegalStateException("not allowed"); + jmsEx.initCause(new ProviderSecurityException("disallowed by local policy")); + + doThrow(jmsEx).when(mockConsumer).setMessageListener(any()); + final Props props = + AmqpClientActor.propsForTest(connection, getRef(), getRef(), (c, e, l, i) -> mockConnection, + actorSystem); + final ActorRef amqpClientActor = actorSystem.actorOf(props); + + amqpClientActor.tell(OpenConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef()); + expectMsgClass(Status.Failure.class); + + final AtomicInteger count = new AtomicInteger(20); + Awaitility.await() + .pollInterval(Duration.ofMillis(100)) + .atMost(Duration.ofSeconds(5)).until(() -> { + amqpClientActor.tell(RetrieveConnectionStatus.of(CONNECTION_ID, DittoHeaders.empty()), getRef()); + fishForMessage(Duration.ofSeconds(1), "client status", o -> { + if (o instanceof ResourceStatus) { + final ResourceStatus resourceStatus = (ResourceStatus) o; + if (resourceStatus.getResourceType() == ResourceStatus.ResourceType.CLIENT) { + assertThat((Object) resourceStatus.getStatus()).isEqualTo(ConnectivityStatus.MISCONFIGURED); + return true; + } + } + return false; + }); + return count.decrementAndGet() == 0; + }); + }}; + } + @Test public void testCloseConnectionFails() throws JMSException { new TestKit(actorSystem) {{ @@ -593,7 +633,8 @@ private void testConsumeMessageAndExpectForwardToProxyActor(final Connection con for (int i = 0; i < consumers; i++) { final Command command = expectMsgClass(Command.class); assertThat(command).isInstanceOf(SignalWithEntityId.class); - assertThat((CharSequence) ((SignalWithEntityId) command).getEntityId()).isEqualTo(TestConstants.Things.THING_ID); + assertThat((CharSequence) ((SignalWithEntityId) command).getEntityId()).isEqualTo( + TestConstants.Things.THING_ID); assertThat(command.getDittoHeaders().getCorrelationId()).contains(TestConstants.CORRELATION_ID); commandConsumer.accept(command); } diff --git a/connectivity/service/src/test/resources/test.conf b/connectivity/service/src/test/resources/test.conf index 3bb71c17d5..e12a8a8404 100644 --- a/connectivity/service/src/test/resources/test.conf +++ b/connectivity/service/src/test/resources/test.conf @@ -37,6 +37,10 @@ ditto { } connectivity { default-config-provider = true + + user-indicated-errors = [ + {exceptionName: "org.apache.qpid.jms.provider.exceptions.ProviderSecurityException", messagePattern: ".*"} + ] connection { // allow localhost in unit tests blocked-hostnames = ""