Skip to content
Permalink
Browse files

add some unit tests

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Jul 1, 2019
1 parent febeca2 commit 2c36d22b3c98a69c6ada8809dfde1d1236af34fe
@@ -168,9 +168,16 @@ public void postStop() {
@Override
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inConnectedState() {
return super.inConnectedState()
.event(ConnectionFailure.class, this::handleConnectionFailureWhenConnected)
.event(JmsSessionRecovered.class, this::handleSessionRecovered);
}

private State<BaseClientState, BaseClientData> handleConnectionFailureWhenConnected(final ConnectionFailure failure, final BaseClientData data) {
return goTo(BaseClientState.UNKNOWN)
.using(data.setConnectionStatus(ConnectivityStatus.FAILED)
.setConnectionStatusDetails(failure.getFailureDescription()));
}

@Override
protected FSMStateFunctionBuilder<BaseClientState, BaseClientData> inAnyState() {
return super.inAnyState()
@@ -493,9 +500,9 @@ private void recoverSession(@Nullable final Session session) {
private FSM.State<BaseClientState, BaseClientData> handleSessionRecovered(
final JmsSessionRecovered sessionRecovered,
final BaseClientData currentData) {
jmsSession = sessionRecovered.session;
jmsSession = sessionRecovered.getSession();
consumers.clear();
consumers.addAll(sessionRecovered.consumerList);
consumers.addAll(sessionRecovered.getConsumerList());
// note: start order is important (publisher -> mapping -> consumer actor)
startAmqpPublisherActor();
startMessageMappingProcessor(connection().getMappingContext().orElse(null));
@@ -598,6 +605,14 @@ public void onException(final JMSException exception) {
this.session = session;
this.consumerList = consumerList;
}

Session getSession() {
return session;
}

List<ConsumerData> getConsumerList() {
return consumerList;
}
}

/**
@@ -138,7 +138,7 @@ private void handleRecoverSession(final AmqpClientActor.JmsRecoverSession recove
if (recoverSession.getConnection().isPresent()) {
final JmsConnection jmsConnection = recoverSession.getConnection().map(c -> (JmsConnection)c).get();
try {
final JmsSession session = createSession(jmsConnection);
final Session session = createSession(jmsConnection);
final List<ConsumerData> consumers = createConsumers(session);
final AmqpClientActor.JmsSessionRecovered r = new AmqpClientActor.JmsSessionRecovered(origin, session, consumers);
sender.tell(r, self);
@@ -151,7 +151,9 @@ private void handleRecoverSession(final AmqpClientActor.JmsRecoverSession recove
log.error("Unexpected error: {}", e.getMessage());
}
} else {
// TODO do what??
log.info("Recovering session failed, no connection available.");
sender.tell(new ImmutableConnectionFailure(origin,null,
"Session recovery failed, no connection available."), self);
}
}

@@ -203,12 +205,12 @@ private void startConnection(final JmsConnection jmsConnection) {
});
}

private JmsSession createSession(final JmsConnection jmsConnection) {
private Session createSession(final JmsConnection jmsConnection) {
return safelyExecuteJmsOperation(jmsConnection, "create session",
() -> ((JmsSession) jmsConnection.createSession(Session.CLIENT_ACKNOWLEDGE)));
() -> (jmsConnection.createSession(Session.CLIENT_ACKNOWLEDGE)));
}

private <T> T safelyExecuteJmsOperation(final JmsConnection jmsConnection,
private <T> T safelyExecuteJmsOperation(@Nullable final JmsConnection jmsConnection,
final String task, final ThrowingSupplier<T> jmsOperation) {
try {
return jmsOperation.get();
@@ -291,18 +293,20 @@ private ConnectionFailedException buildConnectionFailedException(final Map<Strin
.build();
}

private void terminateConnection(final javax.jms.Connection jmsConnection) {
try {
jmsConnection.stop();
} catch (final JMSException e) {
log.debug("Stopping connection <{}> failed, probably it was already stopped: {}",
this.connection.getId(), e.getMessage());
}
try {
jmsConnection.close();
} catch (final JMSException e) {
log.debug("Closing connection <{}> failed, probably it was already closed: {}",
this.connection.getId(), e.getMessage());
private void terminateConnection(@Nullable final javax.jms.Connection jmsConnection) {
if (jmsConnection != null) {
try {
jmsConnection.stop();
} catch (final JMSException e) {
log.debug("Stopping connection <{}> failed, probably it was already stopped: {}",
this.connection.getId(), e.getMessage());
}
try {
jmsConnection.close();
} catch (final JMSException e) {
log.debug("Closing connection <{}> failed, probably it was already closed: {}",
this.connection.getId(), e.getMessage());
}
}
}

@@ -62,6 +62,7 @@
import org.eclipse.ditto.model.messages.Message;
import org.eclipse.ditto.model.messages.MessageDirection;
import org.eclipse.ditto.model.messages.MessageHeaders;
import org.eclipse.ditto.model.things.Attributes;
import org.eclipse.ditto.model.things.Thing;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.DittoProtocolAdapter;
@@ -500,8 +501,12 @@ public Receive createReceive() {
}

public static ThingModifiedEvent thingModified(final Collection<String> readSubjects) {
return thingModified(readSubjects, Attributes.newBuilder().build());
}

public static ThingModifiedEvent thingModified(final Collection<String> readSubjects, final Attributes attributes) {
final DittoHeaders dittoHeaders = DittoHeaders.newBuilder().readSubjects(readSubjects).build();
return ThingModified.of(Things.THING, 1, dittoHeaders);
return ThingModified.of(Things.THING.toBuilder().setAttributes(attributes).build(), 1, dittoHeaders);
}

public static MessageCommand sendThingMessage(final Collection<String> readSubjects) {
@@ -23,12 +23,15 @@
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;

import java.net.URI;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -69,7 +72,9 @@
import org.eclipse.ditto.model.connectivity.ConnectivityStatus;
import org.eclipse.ditto.model.connectivity.ResourceStatus;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.model.connectivity.Target;
import org.eclipse.ditto.model.connectivity.Topic;
import org.eclipse.ditto.model.things.Attributes;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.services.connectivity.messaging.BaseClientState;
import org.eclipse.ditto.services.connectivity.messaging.TestConstants;
@@ -97,9 +102,12 @@
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
@@ -111,6 +119,7 @@
@RunWith(MockitoJUnitRunner.class)
public final class AmqpClientActorTest extends WithMockServers {

private static final Logger LOGGER = LoggerFactory.getLogger(AmqpClientActorTest.class);
private static final Status.Success CONNECTED_SUCCESS = new Status.Success(BaseClientState.CONNECTED);
private static final Status.Success DISCONNECTED_SUCCESS = new Status.Success(BaseClientState.DISCONNECTED);
private static final JMSException JMS_EXCEPTION = new JMSException("FAIL");
@@ -121,7 +130,6 @@

@SuppressWarnings("NullableProblems") private static ActorSystem actorSystem;
private static Connection connection;

private final ConnectivityStatus connectionStatus = ConnectivityStatus.OPEN;

@Mock
@@ -131,8 +139,7 @@
private final Session mockSession = Mockito.mock(Session.class);
@Mock
private final JmsMessageConsumer mockConsumer = Mockito.mock(JmsMessageConsumer.class);
@Mock
private final MessageProducer mockProducer = Mockito.mock(MessageProducer.class);
private MessageProducer mockProducer;

private ArgumentCaptor<JmsConnectionListener> listenerArgumentCaptor;

@@ -150,27 +157,35 @@ public static void tearDown() {

@Before
public void init() throws JMSException {
mockProducer = Mockito.mock(MessageProducer.class);

when(mockConnection.createSession(Session.CLIENT_ACKNOWLEDGE)).thenReturn(mockSession);

listenerArgumentCaptor = ArgumentCaptor.forClass(JmsConnectionListener.class);
doNothing().when(mockConnection).addConnectionListener(listenerArgumentCaptor.capture());

when(mockSession.createConsumer(any(JmsQueue.class))).thenReturn(mockConsumer);
when(mockSession.createProducer(any(Destination.class))).thenAnswer((Answer<MessageProducer>) destinationInv -> {
final Destination destination = destinationInv.getArgument(0);
prepareCreateProducer(mockSession, mockProducer);
}

when(mockSession.createTextMessage(anyString())).thenAnswer((Answer<JmsMessage>) textMsgInv -> {
final String textMsg = textMsgInv.getArgument(0);
final AmqpJmsTextMessageFacade facade = new AmqpJmsTextMessageFacade();
facade.initialize(Mockito.mock(AmqpConnection.class));
final JmsTextMessage jmsTextMessage = new JmsTextMessage(facade);
jmsTextMessage.setText(textMsg);
jmsTextMessage.setJMSDestination(destination);
return jmsTextMessage;
});
private void prepareCreateProducer(final Session mockSession, final MessageProducer mockProducer)
throws JMSException {
when(mockSession.createProducer(any(Destination.class))).thenAnswer(
(Answer<MessageProducer>) destinationInv -> {
final Destination destination = destinationInv.getArgument(0);

when(mockSession.createTextMessage(anyString())).thenAnswer((Answer<JmsMessage>) textMsgInv -> {
final String textMsg = textMsgInv.getArgument(0);
final AmqpJmsTextMessageFacade facade = new AmqpJmsTextMessageFacade();
facade.initialize(Mockito.mock(AmqpConnection.class));
final JmsTextMessage jmsTextMessage = new JmsTextMessage(facade);
jmsTextMessage.setText(textMsg);
jmsTextMessage.setJMSDestination(destination);
return jmsTextMessage;
});

return mockProducer;
});
return mockProducer;
});
}

@Test
@@ -205,7 +220,7 @@ public void testExceptionDuringJMSConnectionCreation() {
new TestKit(actorSystem) {{
final Props props =
AmqpClientActor.propsForTests(connection, connectionStatus, getRef(),
(ac, el) -> {
(theConnection, exceptionListener) -> {
throw JMS_EXCEPTION;
});
final ActorRef connectionActor = actorSystem.actorOf(props);
@@ -425,6 +440,58 @@ public void testCloseConnectionFails() throws JMSException {
}};
}


@Test
public void testCloseSessionWhenConnectedExpectRecreateSession() throws JMSException, InterruptedException {
final String expectedAddress = "target";
final Target target = ConnectivityModelFactory.newTarget(expectedAddress, Authorization.AUTHORIZATION_CONTEXT,
null, null, Topic.TWIN_EVENTS);
new TestKit(actorSystem) {{
final Session newSession = Mockito.mock(Session.class, withSettings().name("recoveredSession"));
final MessageProducer recoveredProducer = Mockito.mock(MessageProducer.class);
final MessageConsumer recoveredConsumer = Mockito.mock(MessageConsumer.class);

when(mockConnection.createSession(Session.CLIENT_ACKNOWLEDGE))
.thenReturn(mockSession)
.thenReturn(newSession);

when(newSession.createConsumer(any(JmsQueue.class))).thenReturn(recoveredConsumer);
prepareCreateProducer(newSession, recoveredProducer);

final Props props =
AmqpClientActor.propsForTests(connection, connectionStatus, getRef(), (ac, el) -> mockConnection);
final ActorRef amqpClientActor = actorSystem.actorOf(props);

// connect
amqpClientActor.tell(OpenConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTED_SUCCESS);

// capture connection listener to close session later
final ArgumentCaptor<JmsConnectionListener> captor = ArgumentCaptor.forClass(JmsConnectionListener.class);
verify(mockConnection).addConnectionListener(captor.capture());
final JmsConnectionListener jmsConnectionListener = captor.getValue();

// verify everything is setup correctly by publishing an event
sendThingEventAndExpectPublish(amqpClientActor, target, mockProducer);
// verify message is consumed and forwarded to concierge
consumeMockMessage(mockConsumer);
expectMsgClass(Command.class);

// now close session
jmsConnectionListener.onSessionClosed(mockSession, new JMSException("Session closed for unknown reason"));
verify(mockConnection, timeout(2000).times(2)).createSession(Session.CLIENT_ACKNOWLEDGE);

// close is called on old session
verify(mockSession).close();

// verify publishing an event works with new session/producer
sendThingEventAndExpectPublish(amqpClientActor, target, recoveredProducer);

consumeMockMessage(recoveredConsumer);
expectMsgClass(Command.class);
}};
}

@Test
public void testConsumeMessageAndExpectForwardToConciergeForwarder() throws JMSException {
testConsumeMessageAndExpectForwardToConciergeForwarder(connection, 1,
@@ -618,34 +685,18 @@ public void testTargetAddressPlaceholderReplacement() throws JMSException {
@Test
public void testReceiveThingEventAndExpectForwardToJMSProducer() throws JMSException {
final String expectedAddress = "target";
final Target target = ConnectivityModelFactory.newTarget(expectedAddress, Authorization.AUTHORIZATION_CONTEXT,
null, null, Topic.TWIN_EVENTS);

new TestKit(actorSystem) {{
final Props props =
AmqpClientActor.propsForTests(connection, connectionStatus, getRef(),
(ac, el) -> mockConnection);
final Props props = AmqpClientActor.propsForTests(connection, connectionStatus, getRef(),
(ac, el) -> mockConnection);
final ActorRef amqpClientActor = actorSystem.actorOf(props);

amqpClientActor.tell(OpenConnection.of(CONNECTION_ID, DittoHeaders.empty()), getRef());
expectMsg(CONNECTED_SUCCESS);

final ThingModifiedEvent thingModifiedEvent = TestConstants.thingModified(singletonList(""));
final OutboundSignal outboundSignal = OutboundSignalFactory.newOutboundSignal(thingModifiedEvent,
singletonList(
ConnectivityModelFactory.newTarget(expectedAddress, Authorization.AUTHORIZATION_CONTEXT,
null, null, Topic.TWIN_EVENTS)));

amqpClientActor.tell(outboundSignal, getRef());

final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(mockProducer, timeout(2000)).send(messageCaptor.capture(), any(CompletionListener.class));

final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getJMSDestination()).isEqualTo(new JmsQueue(expectedAddress));
assertThat(message.getBody(String.class)).contains(
TestConstants.Things.NAMESPACE + "/" + TestConstants.Things.ID + "/" +
TopicPath.Group.THINGS.getName() + "/" + TopicPath.Channel.TWIN.getName() + "/" +
TopicPath.Criterion.EVENTS.getName() + "/" + TopicPath.Action.MODIFIED.getName());
sendThingEventAndExpectPublish(amqpClientActor, target, AmqpClientActorTest.this.mockProducer);
}};
}

@@ -698,8 +749,7 @@ private boolean isExpectedMessage(final Object o, final String... expectedSource

@Test
public void testRetrieveConnectionStatus() throws JMSException {
new TestKit(actorSystem)
{{
new TestKit(actorSystem) {{
final String sourceWithSpecialCharacters =
IntStream.range(32, 255).mapToObj(i -> (char) i)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
@@ -781,4 +831,30 @@ private static Message mockMessage() throws JMSException {
return result;
}

private void consumeMockMessage(final MessageConsumer mockConsumer) throws JMSException {
final ArgumentCaptor<MessageListener> listener = ArgumentCaptor.forClass(MessageListener.class);
verify(mockConsumer, timeout(1000).atLeast(1)).setMessageListener(listener.capture());
listener.getValue().onMessage(mockMessage());
}

private void sendThingEventAndExpectPublish(final ActorRef amqpClientActor, final Target target,
final MessageProducer mockProducer) throws JMSException {
final String uuid = UUID.randomUUID().toString();
final ThingModifiedEvent thingModifiedEvent = TestConstants.thingModified(singletonList(""),
Attributes.newBuilder().set("uuid", uuid).build());
final OutboundSignal outboundSignal =
OutboundSignalFactory.newOutboundSignal(thingModifiedEvent, singletonList(target));
amqpClientActor.tell(outboundSignal, ActorRef.noSender());
final ArgumentCaptor<JmsMessage> messageCaptor = ArgumentCaptor.forClass(JmsMessage.class);
verify(mockProducer, timeout(2000).times(1)).send(messageCaptor.capture(), any(CompletionListener.class));

final Message message = messageCaptor.getValue();
assertThat(message).isNotNull();
assertThat(message.getJMSDestination()).isEqualTo(new JmsQueue(target.getAddress()));
assertThat(message.getBody(String.class)).contains(uuid);
assertThat(message.getBody(String.class)).contains(
TestConstants.Things.NAMESPACE + "/" + TestConstants.Things.ID + "/" +
TopicPath.Group.THINGS.getName() + "/" + TopicPath.Channel.TWIN.getName() + "/" +
TopicPath.Criterion.EVENTS.getName() + "/" + TopicPath.Action.MODIFIED.getName());
}
}

0 comments on commit 2c36d22

Please sign in to comment.
You can’t perform that action at this time.