From bd2de622800146644f02bf43af6456806c1fd5a7 Mon Sep 17 00:00:00 2001 From: Thomas Jaeckle Date: Wed, 30 Jan 2019 11:46:15 +0100 Subject: [PATCH] fixed ConnectionActorTest and added a new test cases * in AmqpClientActorTest which ensure that the target address with placeholders is replaced correctly Signed-off-by: Thomas Jaeckle --- .../messaging/ConnectionActorTest.java | 4 +- .../connectivity/messaging/TestConstants.java | 4 +- .../messaging/amqp/AmqpClientActorTest.java | 107 ++++++++++++++---- 3 files changed, 86 insertions(+), 29 deletions(-) diff --git a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActorTest.java b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActorTest.java index be6695ccd8..148221c024 100644 --- a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActorTest.java +++ b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/ConnectionActorTest.java @@ -495,9 +495,9 @@ public void testThingEventIsForwardedToFilteredTarget() { final Connection connection = TestConstants.createConnection(connectionId, actorSystem, TestConstants.Targets.TARGET_WITH_PLACEHOLDER); + // expect that address is still with placeholders (as replacement was moved to MessageMappingProcessorActor final Target expectedTarget = ConnectivityModelFactory.newTarget(TestConstants.Targets.TARGET_WITH_PLACEHOLDER, - "target:" + TestConstants.Things.NAMESPACE + "/" + - TestConstants.Things.ID, null); + TestConstants.Targets.TARGET_WITH_PLACEHOLDER.getAddress(), null); final CreateConnection createConnection = CreateConnection.of(connection, DittoHeaders.empty()); final Set valid = Collections.singleton(TestConstants.Authorization.SUBJECT_ID); diff --git a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/TestConstants.java b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/TestConstants.java index 0c2f0bc4b1..dc71d44501 100644 --- a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/TestConstants.java +++ b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/TestConstants.java @@ -170,8 +170,8 @@ public static class Targets { private static final HeaderMapping HEADER_MAPPING = null; - static final Target TARGET_WITH_PLACEHOLDER = - newTarget("target:{{ thing:namespace }}/{{thing:name}}", Authorization.AUTHORIZATION_CONTEXT, HEADER_MAPPING, + public static final Target TARGET_WITH_PLACEHOLDER = + newTarget("target:{{ thing:namespace }}/{{thing:name}}@{{ topic:channel }}", Authorization.AUTHORIZATION_CONTEXT, HEADER_MAPPING, null, Topic.TWIN_EVENTS); static final Target TWIN_TARGET = newTarget("twinEventExchange/twinEventRoutingKey", Authorization.AUTHORIZATION_CONTEXT, HEADER_MAPPING, diff --git a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActorTest.java b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActorTest.java index fe25c1be21..f4480ed502 100644 --- a/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActorTest.java +++ b/services/connectivity/messaging/src/test/java/org/eclipse/ditto/services/connectivity/messaging/amqp/AmqpClientActorTest.java @@ -18,7 +18,6 @@ import static org.eclipse.ditto.services.connectivity.messaging.TestConstants.createRandomConnectionId; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.timeout; @@ -33,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Predicate; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -45,11 +45,11 @@ import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; -import javax.jms.TextMessage; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionListener; import org.apache.qpid.jms.JmsQueue; +import org.apache.qpid.jms.message.JmsMessage; import org.apache.qpid.jms.message.JmsTextMessage; import org.apache.qpid.jms.provider.amqp.AmqpConnection; import org.apache.qpid.jms.provider.amqp.message.AmqpJmsTextMessageFacade; @@ -66,6 +66,7 @@ import org.eclipse.ditto.model.connectivity.ResourceStatus; import org.eclipse.ditto.model.connectivity.Source; import org.eclipse.ditto.model.connectivity.Topic; +import org.eclipse.ditto.protocoladapter.TopicPath; import org.eclipse.ditto.services.connectivity.messaging.BaseClientState; import org.eclipse.ditto.services.connectivity.messaging.TestConstants; import org.eclipse.ditto.services.connectivity.messaging.TestConstants.Authorization; @@ -111,7 +112,6 @@ public class AmqpClientActorTest extends WithMockServers { private static final JMSException JMS_EXCEPTION = new JMSException("FAIL"); private static final URI DUMMY = URI.create("amqp://test:1234"); - @SuppressWarnings("NullableProblems") private static ActorSystem actorSystem; private static final String connectionId = TestConstants.createRandomConnectionId(); @@ -129,12 +129,7 @@ public class AmqpClientActorTest extends WithMockServers { private final MessageConsumer mockConsumer = Mockito.mock(MessageConsumer.class); @Mock private final MessageProducer mockProducer = Mockito.mock(MessageProducer.class); - @Mock - private final TextMessage mockTextMessage = Mockito.mock(TextMessage.class); - @Mock - private final TextMessage mockReplyMessage = Mockito.mock(TextMessage.class); - @Mock - private final TextMessage mockErrorMessage = Mockito.mock(TextMessage.class); + private ArgumentCaptor listenerArgumentCaptor; @BeforeClass @@ -157,16 +152,20 @@ public void init() throws JMSException { doNothing().when(mockConnection).addConnectionListener(listenerArgumentCaptor.capture()); when(mockSession.createConsumer(any(JmsQueue.class))).thenReturn(mockConsumer); - when(mockSession.createProducer(any(Destination.class))).thenReturn(mockProducer); - when(mockSession.createTextMessage(anyString())).thenAnswer(invocation -> { - final String message = invocation.getArgument(0); - if (message.contains("ditto/thing/things/twin/errors")) { - return mockErrorMessage; - } else if (message.contains("\"status\":2")) { - return mockReplyMessage; - } else { - return mockTextMessage; - } + when(mockSession.createProducer(any(Destination.class))).thenAnswer((Answer) destinationInv -> { + final Destination destination = destinationInv.getArgument(0); + + when(mockSession.createTextMessage(anyString())).thenAnswer((Answer) textMsgInv -> { + final String textMsg = textMsgInv.getArgument(0); + final AmqpJmsTextMessageFacade facade = new AmqpJmsTextMessageFacade(); + facade.initialize(Mockito.mock(AmqpConnection.class)); + final JmsTextMessage jmsTextMessage = new JmsTextMessage(facade); + jmsTextMessage.setText(textMsg); + jmsTextMessage.setJMSDestination(destination); + return jmsTextMessage; + }); + + return mockProducer; }); } @@ -489,7 +488,8 @@ private void testConsumeMessageAndExpectForwardToConciergeForwarder(final Connec public void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse() throws JMSException { testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse( (id, headers) -> ModifyThingResponse.modified(id, DittoHeaders.of(headers)), - mockReplyMessage); + "replies", + message -> message.contains("\"status\":2")); } @Test @@ -497,12 +497,14 @@ public void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveErro testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse( (id, headers) -> ThingErrorResponse.of(id, ThingNotModifiableException.newBuilder(id).dittoHeaders(headers).build()), - mockErrorMessage); + "replies", + message -> message.contains("ditto/thing/things/twin/errors")); } private void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse( final BiFunction responseSupplier, - final TextMessage expectedJmsResponse) throws JMSException { + final String expectedAddress, + final Predicate messageTextPredicate) throws JMSException { new TestKit(actorSystem) {{ final Props props = AmqpClientActor.propsForTests(connection, connectionStatus, getRef(), (ac, el) -> mockConnection); @@ -523,12 +525,58 @@ private void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveRes getLastSender().tell(responseSupplier.apply(command.getId(), command.getDittoHeaders()), getRef()); - verify(mockProducer, timeout(2000)).send(same(expectedJmsResponse), any(CompletionListener.class)); + final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(JmsMessage.class); + verify(mockProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class)); + + final Message message = messageCaptor.getValue(); + assertThat(message).isNotNull(); + assertThat(message.getJMSDestination()).isEqualTo(new JmsQueue(expectedAddress)); + assertThat(messageTextPredicate).accepts(message.getBody(String.class)); + }}; + } + + @Test + public void testTargetAddressPlaceholderReplacement() throws JMSException { + final Connection connection = + TestConstants.createConnection(connectionId, actorSystem, + TestConstants.Targets.TARGET_WITH_PLACEHOLDER); + + // target Placeholder: target:{{ thing:namespace }}/{{thing:name}}@{{ topic:channel }} + final String expectedAddress = + "target:" + TestConstants.Things.NAMESPACE + "/" + TestConstants.Things.ID + "@" + TopicPath.Channel.TWIN.getName(); + + new TestKit(actorSystem) {{ + final Props props = + AmqpClientActor.propsForTests(connection, connectionStatus, getRef(), (ac, el) -> mockConnection); + final ActorRef amqpClientActor = actorSystem.actorOf(props); + + amqpClientActor.tell(OpenConnection.of(connectionId, DittoHeaders.empty()), getRef()); + expectMsg(CONNECTED_SUCCESS); + + final ThingModifiedEvent thingModifiedEvent = TestConstants.thingModified(singletonList("")); + + final OutboundSignal outboundSignal = OutboundSignalFactory.newOutboundSignal(thingModifiedEvent, + singleton(ConnectivityModelFactory.newTarget( + TestConstants.Targets.TARGET_WITH_PLACEHOLDER.getAddress(), + Authorization.AUTHORIZATION_CONTEXT, + null, null, Topic.TWIN_EVENTS)) + ); + + amqpClientActor.tell(outboundSignal, getRef()); + + final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(JmsMessage.class); + verify(mockProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class)); + + final Message message = messageCaptor.getValue(); + assertThat(message).isNotNull(); + assertThat(message.getJMSDestination()).isEqualTo(new JmsQueue(expectedAddress)); }}; } @Test public void testReceiveThingEventAndExpectForwardToJMSProducer() throws JMSException { + final String expectedAddress = "target"; + new TestKit(actorSystem) {{ final Props props = AmqpClientActor.propsForTests(connection, connectionStatus, getRef(), (ac, el) -> mockConnection); @@ -539,12 +587,21 @@ public void testReceiveThingEventAndExpectForwardToJMSProducer() throws JMSExcep final ThingModifiedEvent thingModifiedEvent = TestConstants.thingModified(singletonList("")); final OutboundSignal outboundSignal = OutboundSignalFactory.newOutboundSignal(thingModifiedEvent, - singleton(ConnectivityModelFactory.newTarget("target", Authorization.AUTHORIZATION_CONTEXT, null, + singleton(ConnectivityModelFactory.newTarget(expectedAddress, Authorization.AUTHORIZATION_CONTEXT, null, null, Topic.TWIN_EVENTS))); amqpClientActor.tell(outboundSignal, getRef()); - verify(mockProducer, timeout(2000)).send(same(mockTextMessage), any(CompletionListener.class)); + final ArgumentCaptor messageCaptor = ArgumentCaptor.forClass(JmsMessage.class); + verify(mockProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class)); + + final Message message = messageCaptor.getValue(); + assertThat(message).isNotNull(); + assertThat(message.getJMSDestination()).isEqualTo(new JmsQueue(expectedAddress)); + assertThat(message.getBody(String.class)).contains( + TestConstants.Things.NAMESPACE + "/" + TestConstants.Things.ID + "/" + + TopicPath.Group.THINGS.getName() + "/" + TopicPath.Channel.TWIN.getName() + "/" + + TopicPath.Criterion.EVENTS.getName() + "/" + TopicPath.Action.MODIFIED.getName()); }}; }