Skip to content

Commit

Permalink
add unit test for AmqpConsumerActor backoff and resource status update
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Sep 3, 2021
1 parent b3e8631 commit 46bdbd1
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ private void prepareResponseHandler(final AcknowledgeableMessage acknowledgeable

protected void resetResourceStatus() {
resourceStatus = ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
ConnectivityStatus.OPEN, sourceAddress, "Started at " + Instant.now());
ConnectivityStatus.OPEN, sourceAddress, "Consumer started.", Instant.now());
}

protected ResourceStatus getCurrentSourceStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
import static org.eclipse.ditto.connectivity.service.messaging.TestConstants.header;
import static org.eclipse.ditto.json.assertions.DittoJsonAssertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -30,19 +33,23 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import javax.annotation.Nullable;
import javax.jms.JMSException;
import javax.jms.JMSRuntimeException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;

import org.apache.qpid.jms.JmsAcknowledgeCallback;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsMessageSupport;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsTextMessageFacade;
import org.apache.qpid.jms.provider.exceptions.ProviderSecurityException;
import org.apache.qpid.proton.amqp.Symbol;
import org.awaitility.Awaitility;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.FilteredAcknowledgementRequest;
import org.eclipse.ditto.base.model.common.ResponseType;
Expand All @@ -52,9 +59,11 @@
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.ConnectivityStatus;
import org.eclipse.ditto.connectivity.model.MappingContext;
import org.eclipse.ditto.connectivity.model.PayloadMapping;
import org.eclipse.ditto.connectivity.model.ReplyTarget;
import org.eclipse.ditto.connectivity.model.ResourceStatus;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.service.config.ConnectivityConfig;
import org.eclipse.ditto.connectivity.service.mapping.ConnectionContext;
Expand All @@ -67,13 +76,17 @@
import org.eclipse.ditto.connectivity.service.messaging.InboundMappingProcessor;
import org.eclipse.ditto.connectivity.service.messaging.InboundMappingSink;
import org.eclipse.ditto.connectivity.service.messaging.TestConstants;
import org.eclipse.ditto.connectivity.service.messaging.amqp.status.ConsumerClosedStatusReport;
import org.eclipse.ditto.connectivity.service.messaging.internal.ConnectionFailure;
import org.eclipse.ditto.connectivity.service.messaging.internal.RetrieveAddressStatus;
import org.eclipse.ditto.internal.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyAttribute;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyFeatureProperty;
import org.eclipse.ditto.things.model.signals.commands.modify.ModifyThing;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -101,9 +114,16 @@ public final class AmqpConsumerActorTest extends AbstractConsumerActorWithAcknow
@Override
protected Props getConsumerActorProps(final Sink<Object, NotUsed> inboundMappingSink,
final PayloadMapping payloadMapping) {
final MessageConsumer messageConsumer = mock(MessageConsumer.class);
return getConsumerActorProps(mock(MessageConsumer.class), TestProbe.apply(actorSystem).testActor(),
CONNECTION_ID.toString(), inboundMappingSink, payloadMapping
);
}

private Props getConsumerActorProps(final MessageConsumer messageConsumer,
final ActorRef jmsActor, final String address, final Sink<Object, NotUsed> inboundMappingSink,
final PayloadMapping payloadMapping) {
final ConsumerData mockConsumerData =
consumerData(CONNECTION_ID.toString(), messageConsumer, ConnectivityModelFactory.newSourceBuilder()
consumerData(address, messageConsumer, ConnectivityModelFactory.newSourceBuilder()
.authorizationContext(TestConstants.Authorization.AUTHORIZATION_CONTEXT)
.enforcement(ENFORCEMENT)
.headerMapping(TestConstants.HEADER_MAPPING)
Expand All @@ -114,7 +134,7 @@ protected Props getConsumerActorProps(final Sink<Object, NotUsed> inboundMapping
.build())
.build());
return AmqpConsumerActor.props(CONNECTION, mockConsumerData, inboundMappingSink,
TestProbe.apply(actorSystem).testActor(), mock(ConnectivityStatusResolver.class));
jmsActor, mock(ConnectivityStatusResolver.class));
}

@Override
Expand Down Expand Up @@ -178,7 +198,7 @@ public void plainStringMappingTest() {
final MappingContext mappingContext = ConnectivityModelFactory.newMappingContextBuilder(
"JavaScript",
JavaScriptMessageMapperFactory.createJavaScriptMessageMapperConfigurationBuilder(
"plainStringMapping", Collections.emptyMap())
"plainStringMapping", Collections.emptyMap())
.incomingScript(TestConstants.Mapping.INCOMING_MAPPING_SCRIPT)
.outgoingScript(TestConstants.Mapping.OUTGOING_MAPPING_SCRIPT)
.build()
Expand Down Expand Up @@ -215,7 +235,7 @@ public void plainStringMappingMultipleTest() {
final MappingContext mappingContext = ConnectivityModelFactory.newMappingContextBuilder(
"JavaScript",
JavaScriptMessageMapperFactory.createJavaScriptMessageMapperConfigurationBuilder(
"plainStringMultiMapping", Collections.emptyMap())
"plainStringMultiMapping", Collections.emptyMap())
.incomingScript(TestConstants.Mapping.INCOMING_MAPPING_SCRIPT
// return array instead of single result object
.replace("return msg;", "return [msg, msg, msg];"))
Expand Down Expand Up @@ -375,6 +395,93 @@ public void closedMessageConsumerFailConnection() throws JMSException {
}};
}

@Test
public void closedMessageConsumerIsRecreatedAfterBackoffAndResourceStatusIsUpdated() throws JMSException {
new TestKit(actorSystem) {
{
final TestProbe proxyActor = TestProbe.apply(actorSystem);
final TestProbe clientActor = TestProbe.apply(actorSystem);
final TestProbe jmsActor = TestProbe.apply(actorSystem);
final String address = "source";
final String errorMessage = "link disallowed";
final String consumerStarted = "Consumer started.";

final var messageListener = new AtomicReference<MessageListener>();
final var messageConsumer = prepareMessageConsumer(messageListener);
final Sink<Object, NotUsed> inboundMappingSink =
setupInboundMappingSink(clientActor.ref(), proxyActor.ref());

final ActorRef underTest = actorSystem.actorOf(getConsumerActorProps(messageConsumer, jmsActor.ref(),
address, inboundMappingSink, ConnectivityModelFactory.emptyPayloadMapping()));

verifyResourceStatus(underTest, consumerStarted, ConnectivityStatus.OPEN);

Awaitility.await().untilAtomic(messageListener, Matchers.notNullValue());

// verify message is processed via the initial message consumer
Awaitility.await().untilAtomic(messageListener, Matchers.notNullValue())
.onMessage(getInboundMessage(TestConstants.modifyThing(),
TestConstants.header("device_id", TestConstants.Things.THING_ID)));
final ModifyThing modifyThing = proxyActor.expectMsgClass(ModifyThing.class);
assertThat((CharSequence) modifyThing.getEntityId()).isEqualTo(TestConstants.Things.THING_ID);

final long start = System.currentTimeMillis();
// signal closed consumer to consumer actor
underTest.tell(ConsumerClosedStatusReport.get(messageConsumer,
new ProviderSecurityException(errorMessage)), getRef());
verify(messageConsumer).close();

// verify the actual state ist reflected in the source resource status
verifyResourceStatus(underTest, errorMessage, ConnectivityStatus.MISCONFIGURED);

// the jms actor must receive a CreateMessageConsumer message and provide a new message consumer
final AmqpConsumerActor.CreateMessageConsumer createMessageConsumer =
jmsActor.expectMsgClass(AmqpConsumerActor.CreateMessageConsumer.class);
final long stop = System.currentTimeMillis();
assertThat(createMessageConsumer.getConsumerData().getAddress()).isEqualTo(address);

final var newMessageListener = new AtomicReference<MessageListener>();
final var newMessageConsumer = prepareMessageConsumer(newMessageListener);
jmsActor.reply(createMessageConsumer.toResponse(newMessageConsumer));

// verify that the new message consumer is created after some backoff
// default backoff is 1 second
assertThat(stop - start).isGreaterThan(Duration.ofSeconds(1).toMillis());

verifyResourceStatus(underTest, consumerStarted, ConnectivityStatus.OPEN);

// verify that a message is processed via the new message consumer
Awaitility.await().untilAtomic(newMessageListener, Matchers.notNullValue())
.onMessage(getInboundMessage(TestConstants.modifyThing(),
TestConstants.header("device_id", TestConstants.Things.THING_ID)));
final ModifyThing modifyThingViaNewConsumer = proxyActor.expectMsgClass(ModifyThing.class);
assertThat((CharSequence) modifyThingViaNewConsumer.getEntityId()).isEqualTo(
TestConstants.Things.THING_ID);

}

private void verifyResourceStatus(final ActorRef underTest, final String statusMessage,
final ConnectivityStatus expected) {
underTest.tell(RetrieveAddressStatus.getInstance(), getRef());
final var status = expectMsgClass(ResourceStatus.class);
assertThat((Object) status.getStatus()).isEqualTo(expected);
assertThat(status.getStatusDetails())
.hasValueSatisfying(details -> assertThat(details).contains(statusMessage));
}

private MessageConsumer prepareMessageConsumer(final AtomicReference<MessageListener> ref)
throws JMSException {
final MessageConsumer messageConsumer = mock(MessageConsumer.class);
doAnswer(invocation -> {
ref.set(invocation.getArgument(0));
return null;
}).when(messageConsumer).setMessageListener(any(MessageListener.class));
return messageConsumer;
}
};
}


private ConsumerData consumerData(final String address, final MessageConsumer messageConsumer,
final Source source) {
final var connectionContext =
Expand Down

0 comments on commit 46bdbd1

Please sign in to comment.