Skip to content

Commit

Permalink
fixed ConnectionActorTest and added a new test cases
Browse files Browse the repository at this point in the history
* in AmqpClientActorTest which ensure that the target address with placeholders is replaced correctly

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch-si.com>
  • Loading branch information
thjaeckle committed Jan 30, 2019
1 parent e844805 commit bd2de62
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 29 deletions.
Expand Up @@ -495,9 +495,9 @@ public void testThingEventIsForwardedToFilteredTarget() {
final Connection connection = TestConstants.createConnection(connectionId, actorSystem,
TestConstants.Targets.TARGET_WITH_PLACEHOLDER);

// expect that address is still with placeholders (as replacement was moved to MessageMappingProcessorActor
final Target expectedTarget = ConnectivityModelFactory.newTarget(TestConstants.Targets.TARGET_WITH_PLACEHOLDER,
"target:" + TestConstants.Things.NAMESPACE + "/" +
TestConstants.Things.ID, null);
TestConstants.Targets.TARGET_WITH_PLACEHOLDER.getAddress(), null);

final CreateConnection createConnection = CreateConnection.of(connection, DittoHeaders.empty());
final Set<String> valid = Collections.singleton(TestConstants.Authorization.SUBJECT_ID);
Expand Down
Expand Up @@ -170,8 +170,8 @@ public static class Targets {

private static final HeaderMapping HEADER_MAPPING = null;

static final Target TARGET_WITH_PLACEHOLDER =
newTarget("target:{{ thing:namespace }}/{{thing:name}}", Authorization.AUTHORIZATION_CONTEXT, HEADER_MAPPING,
public static final Target TARGET_WITH_PLACEHOLDER =
newTarget("target:{{ thing:namespace }}/{{thing:name}}@{{ topic:channel }}", Authorization.AUTHORIZATION_CONTEXT, HEADER_MAPPING,
null, Topic.TWIN_EVENTS);
static final Target TWIN_TARGET =
newTarget("twinEventExchange/twinEventRoutingKey", Authorization.AUTHORIZATION_CONTEXT, HEADER_MAPPING,
Expand Down
Expand Up @@ -18,7 +18,6 @@
import static org.eclipse.ditto.services.connectivity.messaging.TestConstants.createRandomConnectionId;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.timeout;
Expand All @@ -33,6 +32,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.IntStream;
import java.util.stream.Stream;

Expand All @@ -45,11 +45,11 @@
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.JmsConnectionListener;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsTextMessage;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsTextMessageFacade;
Expand All @@ -66,6 +66,7 @@
import org.eclipse.ditto.model.connectivity.ResourceStatus;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.model.connectivity.Topic;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientState;
import org.eclipse.ditto.services.connectivity.messaging.TestConstants;
import org.eclipse.ditto.services.connectivity.messaging.TestConstants.Authorization;
Expand Down Expand Up @@ -111,7 +112,6 @@ public class AmqpClientActorTest extends WithMockServers {
private static final JMSException JMS_EXCEPTION = new JMSException("FAIL");
private static final URI DUMMY = URI.create("amqp://test:1234");


@SuppressWarnings("NullableProblems") private static ActorSystem actorSystem;

private static final String connectionId = TestConstants.createRandomConnectionId();
Expand All @@ -129,12 +129,7 @@ public class AmqpClientActorTest extends WithMockServers {
private final MessageConsumer mockConsumer = Mockito.mock(MessageConsumer.class);
@Mock
private final MessageProducer mockProducer = Mockito.mock(MessageProducer.class);
@Mock
private final TextMessage mockTextMessage = Mockito.mock(TextMessage.class);
@Mock
private final TextMessage mockReplyMessage = Mockito.mock(TextMessage.class);
@Mock
private final TextMessage mockErrorMessage = Mockito.mock(TextMessage.class);

private ArgumentCaptor<JmsConnectionListener> listenerArgumentCaptor;

@BeforeClass
Expand All @@ -157,16 +152,20 @@ public void init() throws JMSException {
doNothing().when(mockConnection).addConnectionListener(listenerArgumentCaptor.capture());

when(mockSession.createConsumer(any(JmsQueue.class))).thenReturn(mockConsumer);
when(mockSession.createProducer(any(Destination.class))).thenReturn(mockProducer);
when(mockSession.createTextMessage(anyString())).thenAnswer(invocation -> {
final String message = invocation.getArgument(0);
if (message.contains("ditto/thing/things/twin/errors")) {
return mockErrorMessage;
} else if (message.contains("\"status\":2")) {
return mockReplyMessage;
} else {
return mockTextMessage;
}
when(mockSession.createProducer(any(Destination.class))).thenAnswer((Answer<MessageProducer>) destinationInv -> {
final Destination destination = destinationInv.getArgument(0);

when(mockSession.createTextMessage(anyString())).thenAnswer((Answer<JmsMessage>) textMsgInv -> {
final String textMsg = textMsgInv.getArgument(0);
final AmqpJmsTextMessageFacade facade = new AmqpJmsTextMessageFacade();
facade.initialize(Mockito.mock(AmqpConnection.class));
final JmsTextMessage jmsTextMessage = new JmsTextMessage(facade);
jmsTextMessage.setText(textMsg);
jmsTextMessage.setJMSDestination(destination);
return jmsTextMessage;
});

return mockProducer;
});
}

Expand Down Expand Up @@ -489,20 +488,23 @@ private void testConsumeMessageAndExpectForwardToConciergeForwarder(final Connec
public void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse() throws JMSException {
testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse(
(id, headers) -> ModifyThingResponse.modified(id, DittoHeaders.of(headers)),
mockReplyMessage);
"replies",
message -> message.contains("\"status\":2"));
}

@Test
public void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveError() throws JMSException {
testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse(
(id, headers) -> ThingErrorResponse.of(id,
ThingNotModifiableException.newBuilder(id).dittoHeaders(headers).build()),
mockErrorMessage);
"replies",
message -> message.contains("ditto/thing/things/twin/errors"));
}

private void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveResponse(
final BiFunction<String, DittoHeaders, CommandResponse> responseSupplier,
final TextMessage expectedJmsResponse) throws JMSException {
final String expectedAddress,
final Predicate<String> messageTextPredicate) throws JMSException {
new TestKit(actorSystem) {{
final Props props =
AmqpClientActor.propsForTests(connection, connectionStatus, getRef(), (ac, el) -> mockConnection);
Expand All @@ -523,12 +525,58 @@ private void testConsumeMessageAndExpectForwardToConciergeForwarderAndReceiveRes

getLastSender().tell(responseSupplier.apply(command.getId(), command.getDittoHeaders()), getRef());

verify(mockProducer, timeout(2000)).send(same(expectedJmsResponse), any(CompletionListener.class));
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(mockProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class));

final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getJMSDestination()).isEqualTo(new JmsQueue(expectedAddress));
assertThat(messageTextPredicate).accepts(message.getBody(String.class));
}};
}

@Test
public void testTargetAddressPlaceholderReplacement() throws JMSException {
final Connection connection =
TestConstants.createConnection(connectionId, actorSystem,
TestConstants.Targets.TARGET_WITH_PLACEHOLDER);

// target Placeholder: target:{{ thing:namespace }}/{{thing:name}}@{{ topic:channel }}
final String expectedAddress =
"target:" + TestConstants.Things.NAMESPACE + "/" + TestConstants.Things.ID + "@" + TopicPath.Channel.TWIN.getName();

new TestKit(actorSystem) {{
final Props props =
AmqpClientActor.propsForTests(connection, connectionStatus, getRef(), (ac, el) -> mockConnection);
final ActorRef amqpClientActor = actorSystem.actorOf(props);

amqpClientActor.tell(OpenConnection.of(connectionId, DittoHeaders.empty()), getRef());
expectMsg(CONNECTED_SUCCESS);

final ThingModifiedEvent thingModifiedEvent = TestConstants.thingModified(singletonList(""));

final OutboundSignal outboundSignal = OutboundSignalFactory.newOutboundSignal(thingModifiedEvent,
singleton(ConnectivityModelFactory.newTarget(
TestConstants.Targets.TARGET_WITH_PLACEHOLDER.getAddress(),
Authorization.AUTHORIZATION_CONTEXT,
null, null, Topic.TWIN_EVENTS))
);

amqpClientActor.tell(outboundSignal, getRef());

final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(mockProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class));

final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getJMSDestination()).isEqualTo(new JmsQueue(expectedAddress));
}};
}

@Test
public void testReceiveThingEventAndExpectForwardToJMSProducer() throws JMSException {
final String expectedAddress = "target";

new TestKit(actorSystem) {{
final Props props =
AmqpClientActor.propsForTests(connection, connectionStatus, getRef(), (ac, el) -> mockConnection);
Expand All @@ -539,12 +587,21 @@ public void testReceiveThingEventAndExpectForwardToJMSProducer() throws JMSExcep

final ThingModifiedEvent thingModifiedEvent = TestConstants.thingModified(singletonList(""));
final OutboundSignal outboundSignal = OutboundSignalFactory.newOutboundSignal(thingModifiedEvent,
singleton(ConnectivityModelFactory.newTarget("target", Authorization.AUTHORIZATION_CONTEXT, null,
singleton(ConnectivityModelFactory.newTarget(expectedAddress, Authorization.AUTHORIZATION_CONTEXT, null,
null, Topic.TWIN_EVENTS)));

amqpClientActor.tell(outboundSignal, getRef());

verify(mockProducer, timeout(2000)).send(same(mockTextMessage), any(CompletionListener.class));
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(mockProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class));

final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getJMSDestination()).isEqualTo(new JmsQueue(expectedAddress));
assertThat(message.getBody(String.class)).contains(
TestConstants.Things.NAMESPACE + "/" + TestConstants.Things.ID + "/" +
TopicPath.Group.THINGS.getName() + "/" + TopicPath.Channel.TWIN.getName() + "/" +
TopicPath.Criterion.EVENTS.getName() + "/" + TopicPath.Action.MODIFIED.getName());
}};
}

Expand Down

0 comments on commit bd2de62

Please sign in to comment.