Skip to content

Commit

Permalink
fix AmqpPublisherActor
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Mar 1, 2018
1 parent d898d61 commit 34da389
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 22 deletions.
Expand Up @@ -94,10 +94,10 @@ public Receive createReceive() {
log.debug("Response dropped, missing replyTo address.");
}
})
.match(ExternalMessage.class, ExternalMessage::isCommandResponse, event -> {
.match(ExternalMessage.class, ExternalMessage::isEvent, event -> {
final String correlationId = event.getHeaders().get(CORRELATION_ID.getKey());
LogUtil.enhanceLogWithCorrelationId(log, correlationId);
log.debug("Received command response {} ", event);
log.debug("Received thing event {} ", event);
amqpConnection.getEventTarget().ifPresent(target -> sendMessage(target, event));
})
.matchAny(m -> {
Expand Down Expand Up @@ -136,7 +136,7 @@ private Message toJmsMessage(final ExternalMessage externalMessage) throws JMSEx
@Nullable
private MessageProducer getProducer(final String target) {
return Optional.of(target)
.filter(String::isEmpty)
.filter(s -> !s.isEmpty())
.map(t -> producerMap.computeIfAbsent(target, this::createMessageProducer)).orElse(null);
}

Expand All @@ -146,7 +146,7 @@ private MessageProducer createMessageProducer(final String target) {
log.debug("Creating AMQP Producer for '{}'", target);
try {
return session.createProducer(destination);
} catch (JMSException e) {
} catch (final JMSException e) {
log.warning("Could not create producer for {}.", target);
return null;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand All @@ -28,6 +29,7 @@
import java.util.concurrent.TimeUnit;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
Expand All @@ -53,7 +55,6 @@
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -94,6 +95,8 @@ public class AmqpClientActorTest {
private MessageConsumer mockConsumer = Mockito.mock(MessageConsumer.class);
@Mock
private MessageProducer mockProducer = Mockito.mock(MessageProducer.class);
@Mock
private TextMessage mockTextMessage = Mockito.mock(TextMessage.class);

@BeforeClass
public static void setUp() {
Expand All @@ -110,7 +113,8 @@ public static void tearDown() {
public void init() throws JMSException {
when(mockConnection.createSession(false, Session.AUTO_ACKNOWLEDGE)).thenReturn(mockSession);
when(mockSession.createConsumer(any(JmsQueue.class))).thenReturn(mockConsumer);
when(mockSession.createProducer(any(JmsQueue.class))).thenReturn(mockProducer);
when(mockSession.createProducer(any(Destination.class))).thenReturn(mockProducer);
when(mockSession.createTextMessage(anyString())).thenReturn(mockTextMessage);
}

@Test
Expand Down Expand Up @@ -264,7 +268,7 @@ public void testCloseConnectionFails() throws JMSException {
}

@Test
public void testInitAfterTimeout() {
public void testInitializeClientActorAfterTimeout() {
new TestKit(actorSystem) {{
final String pubSubTargetPath = getRef().path().toStringWithoutAddress();
final Props props =
Expand All @@ -286,7 +290,6 @@ public void testInitAfterTimeout() {
}

@Test
@Ignore("currently failing")
public void testConsumeMessageAndExpectForwardToProxyActor() throws JMSException {
new TestKit(actorSystem) {{
final ActorRef mediator = DistributedPubSub.get(actorSystem).mediator();
Expand All @@ -313,12 +316,8 @@ public void testConsumeMessageAndExpectForwardToProxyActor() throws JMSException
}

@Test
@Ignore("currently failing")
public void testReceiveThingEventAndExpectForwardToAmqpClient() throws JMSException {
public void testReceiveThingEventAndExpectForwardToJMSProducer() throws JMSException {
new TestKit(actorSystem) {{
final ActorRef mediator = DistributedPubSub.get(actorSystem).mediator();
mediator.tell(new DistributedPubSubMediator.Put(getRef()), getRef());

final String pubSubTargetPath = getRef().path().toStringWithoutAddress();
final Props props =
AmqpClientActor.props(connectionId, getRef(), amqpConnection, pubSubTargetPath,
Expand All @@ -328,17 +327,10 @@ public void testReceiveThingEventAndExpectForwardToAmqpClient() throws JMSExcept
amqpClientActor.tell(CreateConnection.of(amqpConnection, DittoHeaders.empty()), getRef());
expectMsg(CONNECTED_SUCCESS);

amqpClientActor.tell(TestConstants.thingModified(singletonList("")), getRef());

//TODO find better way, this is ugly
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}

verify(mockProducer).send(any(Message.class));
amqpClientActor.tell(TestConstants.thingModified(singletonList("")), getRef());

verify(mockProducer, timeout(500)).send(mockTextMessage);
}};
}

Expand Down

0 comments on commit 34da389

Please sign in to comment.