diff --git a/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/DeadLetterQueueBrowser.java b/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/DeadLetterQueueBrowser.java index 74e2d77..c856840 100644 --- a/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/DeadLetterQueueBrowser.java +++ b/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/DeadLetterQueueBrowser.java @@ -1,5 +1,6 @@ package uk.gov.justice.services.test.utils.core.messaging; +import static java.util.stream.Collectors.toCollection; import static uk.gov.justice.services.test.utils.core.messaging.QueueUriProvider.artemisQueueUri; import java.io.StringReader; @@ -18,65 +19,66 @@ import javax.json.JsonObject; import javax.json.JsonReader; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import org.apache.activemq.artemis.jms.client.ActiveMQQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.annotations.VisibleForTesting; - /** - * Utility class that allows to browse and clean - * messages in dead letter queue - * - * Usage: DeadLetterQueueBrowser dlqBrowser = new DeadLetterQueueBrowser(); - * dlqBrowser.browse() will return a list of {@link String} in the dlq - * dlqBrowser.removeMessages() will clean dlq + * Utility class that allows to browse and clean messages in dead letter queue + *

+ * Usage: DeadLetterQueueBrowser dlqBrowser = new DeadLetterQueueBrowser(); dlqBrowser.browse() will + * return a list of {@link String} in the dlq dlqBrowser.removeMessages() will clean dlq * dlqBrowser.close() will release resources - * - * Note:It has been observed there is sometimes a delay by the time - * the message lands in dlq. Setting a delay of few milliseconds - * generally resolves this. + *

+ * Note:It has been observed there is sometimes a delay by the time the message lands in dlq. + * Setting a delay of few milliseconds generally resolves this. * * @author gopal - * */ public class DeadLetterQueueBrowser implements AutoCloseable { private static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueBrowser.class); - private static final String DLQ_QUEUE_URI = artemisQueueUri(); - private static final String dlqName = "DLQ"; + private static final List DLQ_QUEUE_URIS = artemisQueueUri(); + private static final String DLQ_NAME = "DLQ"; - private Session session; - private javax.jms.Queue dlqQueue; - private final JmsSessionFactory jmsSessionFactory; + private List sessions; + private List jmsSessionFactories; + private Queue dlqQueue; private final ConsumerClient consumerClient; public DeadLetterQueueBrowser() { - jmsSessionFactory = new JmsSessionFactory(); consumerClient = new ConsumerClient(); + jmsSessionFactories = Lists.newArrayList(); + sessions = Lists.newArrayList(); initialise(); } @VisibleForTesting - DeadLetterQueueBrowser(final Queue dlqQueue, final Session session, - final JmsSessionFactory jmsSessionFactory, final ConsumerClient consumerClient) { + DeadLetterQueueBrowser(final Queue dlqQueue, final List sessions, + final List jmsSessionFactories, final ConsumerClient consumerClient) { super(); - this.session = session; + this.sessions = sessions; + this.jmsSessionFactories = jmsSessionFactories; this.dlqQueue = dlqQueue; - this.jmsSessionFactory = jmsSessionFactory; this.consumerClient = consumerClient; } private void initialise() { try { - LOGGER.info("Artemis URI: {}", DLQ_QUEUE_URI); - session = jmsSessionFactory.session(DLQ_QUEUE_URI); - dlqQueue = new ActiveMQQueue(dlqName); + DLQ_QUEUE_URIS.forEach(u -> { + LOGGER.info("Setting up session for uri: " + u); + final JmsSessionFactory jmsSessionFactory = new JmsSessionFactory(); + sessions.add(jmsSessionFactory.session(u)); + jmsSessionFactories.add(jmsSessionFactory); + }); + dlqQueue = new ActiveMQQueue(DLQ_NAME); } catch (Exception e) { close(); - final String message = "Failed to start dlq message consumer for " + "queue: '" + dlqName + "', " - + "queueUri: '" + DLQ_QUEUE_URI + " "; + final String message = "Failed to start dlq message consumer for " + "queue: '" + DLQ_NAME + "', " + + "queueUris: '" + DLQ_QUEUE_URIS + " "; LOGGER.error("Fatal error initialising Artemis {} ", message); throw new MessageConsumerException(message, e); } @@ -84,50 +86,56 @@ private void initialise() { /** * allows browsing messages in dlq + * * @return list of {@link JsonObject} */ public List browseAsJson() { - return browse().stream().map(s->convert(s)).collect(Collectors.toCollection(ArrayList::new)); + return browse().stream().map(this::convert).collect(toCollection(ArrayList::new)); } /** * allows browsing messages in dlq + * * @return list of {@link String} */ public List browse() { - try (QueueBrowser dlqBrowser = session.createBrowser(dlqQueue);) { - final List messages = new ArrayList<>(); - final Enumeration enumeration = dlqBrowser.getEnumeration(); - while (enumeration.hasMoreElements()) { - String message = ((TextMessage) enumeration.nextElement()).getText(); - messages.add(message); + final List messages = new ArrayList<>(); + for (Session session : sessions) { + try (QueueBrowser dlqBrowser = session.createBrowser(dlqQueue);) { + final Enumeration enumeration = dlqBrowser.getEnumeration(); + + while (enumeration.hasMoreElements()) { + final String message = ((TextMessage) enumeration.nextElement()).getText(); + messages.add(message); + } + + + } catch (JMSException e) { + final String message = "Fatal error getting messages from DLQ"; + LOGGER.error(message); + throw new MessageConsumerException(message, e); } - return messages; - } catch (JMSException e) { - String message = "Fatal error getting messges from DLQ"; - LOGGER.error(message); - throw new MessageConsumerException(message, e); } + LOGGER.info("Total number of messages across {} brokers is {}", sessions.size(), messages.size()); + return messages; } /** * removes messages from dlq */ public void removeMessages() { - try (MessageConsumer messageConsumer = session.createConsumer(dlqQueue)) { - cleanQueue(messageConsumer); - } catch (JMSException e) { - String message = "Fatal error cleaning messges from DLQ"; - LOGGER.error(message); - throw new MessageConsumerException(message, e); + for (Session session : sessions) { + try (MessageConsumer messageConsumer = session.createConsumer(dlqQueue)) { + consumerClient.cleanQueue(messageConsumer); + } catch (JMSException e) { + final String message = "Fatal error cleaning messges from DLQ"; + LOGGER.error(message); + throw new MessageConsumerException(message, e); + } } } - private void cleanQueue(MessageConsumer messageConsumer) { - consumerClient.cleanQueue(messageConsumer); - } - private JsonObject convert(final String source) { try (final JsonReader reader = Json.createReader(new StringReader(source))) { return reader.readObject(); @@ -138,7 +146,8 @@ private JsonObject convert(final String source) { * clean up resources */ public void close() { - jmsSessionFactory.close(); + for (JmsSessionFactory jmsSessionFactory : jmsSessionFactories) { + jmsSessionFactory.close(); + } } - } diff --git a/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/JmsSessionFactory.java b/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/JmsSessionFactory.java index dcd91be..df238a3 100644 --- a/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/JmsSessionFactory.java +++ b/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/JmsSessionFactory.java @@ -36,7 +36,9 @@ private void doClose(final AutoCloseable closeable) { try { closeable.close(); } catch (final Exception ignored) { + // do nothing } + } } diff --git a/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/QueueUriProvider.java b/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/QueueUriProvider.java index 5e40228..526990b 100644 --- a/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/QueueUriProvider.java +++ b/test-utils-core/src/main/java/uk/gov/justice/services/test/utils/core/messaging/QueueUriProvider.java @@ -1,33 +1,41 @@ package uk.gov.justice.services.test.utils.core.messaging; +import static com.google.common.base.Splitter.on; +import static com.google.common.collect.Lists.newArrayList; import static java.lang.String.format; import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getArtemisHost; import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getHost; +import java.util.List; + import org.apache.commons.lang3.StringUtils; public class QueueUriProvider { private static final String BASE_URI_PATTERN = "tcp://%s:61616"; + + /** + * Takes a comma separated list of broker hosts + */ private static final String ARTEMIS_URI = "ARTEMIS_URI"; public String getQueueUri() { return format(BASE_URI_PATTERN, getHost()); } - public String getArtemisQueueUri() { + public List getArtemisQueueUri() { final String artemisUri = System.getProperty(ARTEMIS_URI); if (StringUtils.isNotBlank(artemisUri)) { - return artemisUri; + return on(",").splitToList(artemisUri); } - return format(BASE_URI_PATTERN, getArtemisHost()); + return newArrayList(format(BASE_URI_PATTERN, getArtemisHost())); } public static String queueUri() { return new QueueUriProvider().getQueueUri(); } - public static String artemisQueueUri(){ + public static List artemisQueueUri() { return new QueueUriProvider().getArtemisQueueUri(); } } diff --git a/test-utils-core/src/test/java/uk/gov/justice/services/test/utils/core/messaging/DeadLetterQueueBrowserTest.java b/test-utils-core/src/test/java/uk/gov/justice/services/test/utils/core/messaging/DeadLetterQueueBrowserTest.java index 1cc811c..c55f885 100644 --- a/test-utils-core/src/test/java/uk/gov/justice/services/test/utils/core/messaging/DeadLetterQueueBrowserTest.java +++ b/test-utils-core/src/test/java/uk/gov/justice/services/test/utils/core/messaging/DeadLetterQueueBrowserTest.java @@ -1,32 +1,34 @@ package uk.gov.justice.services.test.utils.core.messaging; -import static org.hamcrest.CoreMatchers.is; -import static org.hamcrest.collection.IsIterableContainingInOrder.contains; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.Arrays; -import java.util.List; -import java.util.Vector; +import org.hamcrest.collection.IsMapContaining; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; import javax.jms.JMSException; import javax.jms.MessageConsumer; +import javax.jms.Queue; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; import javax.json.JsonObject; +import java.util.Arrays; +import java.util.List; +import java.util.Vector; -import org.hamcrest.collection.IsMapContaining; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.mockito.InjectMocks; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.runners.MockitoJUnitRunner; +import static com.google.common.collect.Lists.newArrayList; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.collection.IsIterableContainingInOrder.contains; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) public class DeadLetterQueueBrowserTest { @@ -35,7 +37,7 @@ public class DeadLetterQueueBrowserTest { private Session session; @Mock - private javax.jms.Queue dlqQueue; + private Queue dlqQueue; @Mock private MessageConsumer dlqMessageConsumer; @@ -49,16 +51,20 @@ public class DeadLetterQueueBrowserTest { @Mock private JmsSessionFactory jmsSessionFactory; - @InjectMocks private DeadLetterQueueBrowser deadLetterQueueBrowser; @Rule public ExpectedException thrown = ExpectedException.none(); + @Before + public void setup() { + deadLetterQueueBrowser = new DeadLetterQueueBrowser(dlqQueue, newArrayList(session), newArrayList(jmsSessionFactory), consumerClient); + } + @Test public void shouldRemoveMessages() throws JMSException { - when(session.createConsumer(Mockito.any(javax.jms.Queue.class))).thenReturn(dlqMessageConsumer); + when(session.createConsumer(any(Queue.class))).thenReturn(dlqMessageConsumer); deadLetterQueueBrowser.removeMessages(); @@ -66,12 +72,12 @@ public void shouldRemoveMessages() throws JMSException { } @Test - public void shouldbeAbleToBrowseAsJson() throws JMSException { - TextMessage textMessage_1 = mock(TextMessage.class); - when(textMessage_1.getText()).thenReturn("{\"urn1\": \"urn1\"}", "{\"urn2\": \"urn2\"}"); - Vector textmessages = new Vector<>(Arrays.asList(textMessage_1, textMessage_1)); + public void shouldBeAbleToBrowseAsJson() throws JMSException { + TextMessage textMessage = mock(TextMessage.class); + when(textMessage.getText()).thenReturn("{\"urn1\": \"urn1\"}", "{\"urn2\": \"urn2\"}"); + Vector textMessages = new Vector<>(Arrays.asList(textMessage, textMessage)); when(session.createBrowser(dlqQueue)).thenReturn(dlqBrowser); - when(dlqBrowser.getEnumeration()).thenReturn(textmessages.elements()); + when(dlqBrowser.getEnumeration()).thenReturn(textMessages.elements()); List result = deadLetterQueueBrowser.browseAsJson(); @@ -80,12 +86,12 @@ public void shouldbeAbleToBrowseAsJson() throws JMSException { } @Test - public void shouldbeAbleToBrowse() throws JMSException { - TextMessage textMessage_1 = mock(TextMessage.class); - when(textMessage_1.getText()).thenReturn("abc", "def"); - Vector textmessages = new Vector<>(Arrays.asList(textMessage_1, textMessage_1)); + public void shouldBeAbleToBrowse() throws JMSException { + TextMessage textMessage = mock(TextMessage.class); + when(textMessage.getText()).thenReturn("abc", "def"); + Vector textMessages = new Vector<>(Arrays.asList(textMessage, textMessage)); when(session.createBrowser(dlqQueue)).thenReturn(dlqBrowser); - when(dlqBrowser.getEnumeration()).thenReturn(textmessages.elements()); + when(dlqBrowser.getEnumeration()).thenReturn(textMessages.elements()); List result = deadLetterQueueBrowser.browse(); @@ -95,9 +101,9 @@ public void shouldbeAbleToBrowse() throws JMSException { @Test public void shouldReturnEmptyListWhenNoElementsInDlq() throws JMSException { - Vector textmessages = new Vector<>(); + Vector textMessages = new Vector<>(); when(session.createBrowser(dlqQueue)).thenReturn(dlqBrowser); - when(dlqBrowser.getEnumeration()).thenReturn(textmessages.elements()); + when(dlqBrowser.getEnumeration()).thenReturn(textMessages.elements()); List result = deadLetterQueueBrowser.browse(); @@ -107,7 +113,7 @@ public void shouldReturnEmptyListWhenNoElementsInDlq() throws JMSException { @Test public void shouldThrowExceptionWhenBrowsing() throws JMSException { thrown.expect(MessageConsumerException.class); - thrown.expectMessage("Fatal error getting messges from DLQ"); + thrown.expectMessage("Fatal error getting messages from DLQ"); when(session.createBrowser(dlqQueue)).thenReturn(dlqBrowser); when(dlqBrowser.getEnumeration()).thenThrow(JMSException.class); @@ -118,8 +124,8 @@ public void shouldThrowExceptionWhenBrowsing() throws JMSException { public void shouldThrowExceptionWhenCleaningQueue() throws JMSException { thrown.expect(MessageConsumerException.class); thrown.expectMessage("Fatal error cleaning messges from DLQ"); - when(session.createConsumer(Mockito.any(javax.jms.Queue.class))).thenReturn(dlqMessageConsumer); - Mockito.doThrow(JMSException.class).when(consumerClient).cleanQueue(dlqMessageConsumer); + when(session.createConsumer(any(Queue.class))).thenReturn(dlqMessageConsumer); + doThrow(JMSException.class).when(consumerClient).cleanQueue(dlqMessageConsumer); deadLetterQueueBrowser.removeMessages(); } @@ -127,6 +133,7 @@ public void shouldThrowExceptionWhenCleaningQueue() throws JMSException { @Test public void shouldClose() { deadLetterQueueBrowser.close(); + verify(jmsSessionFactory).close(); } diff --git a/test-utils-core/src/test/java/uk/gov/justice/services/test/utils/core/messaging/QueueUriProviderTest.java b/test-utils-core/src/test/java/uk/gov/justice/services/test/utils/core/messaging/QueueUriProviderTest.java index 5b5b253..08cec02 100644 --- a/test-utils-core/src/test/java/uk/gov/justice/services/test/utils/core/messaging/QueueUriProviderTest.java +++ b/test-utils-core/src/test/java/uk/gov/justice/services/test/utils/core/messaging/QueueUriProviderTest.java @@ -1,5 +1,7 @@ package uk.gov.justice.services.test.utils.core.messaging; +import static com.google.common.collect.Lists.newArrayList; +import static org.codehaus.groovy.runtime.InvokerHelper.asList; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.INTEGRATION_HOST_KEY; @@ -7,6 +9,7 @@ import static uk.gov.justice.services.test.utils.core.messaging.QueueUriProvider.queueUri; import static uk.gov.justice.services.test.utils.core.messaging.QueueUriProvider.artemisQueueUri; +import com.google.common.collect.Lists; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -56,8 +59,8 @@ public void shouldGetLocalhostUriByDefaultForArtemis() { final String localhostUri = "tcp://localhost:61616"; - assertThat(queueUriProvider.getArtemisQueueUri(), is(localhostUri)); - assertThat(artemisQueueUri(), is(localhostUri)); + assertThat(queueUriProvider.getArtemisQueueUri(), is(asList(localhostUri))); + assertThat(artemisQueueUri(), is(asList(localhostUri))); } @Test @@ -66,8 +69,20 @@ public void shouldGetArtemisUriFromSystemProperty() { final String userDefinedUri = "tcp://myServer:61616?debug=true"; System.setProperty(ARTEMIS_URI, userDefinedUri); - assertThat(queueUriProvider.getArtemisQueueUri(), is(userDefinedUri)); - assertThat(artemisQueueUri(), is(userDefinedUri)); + assertThat(queueUriProvider.getArtemisQueueUri(), is(asList(userDefinedUri))); + assertThat(artemisQueueUri(), is(asList(userDefinedUri))); + } + + @Test + public void shouldGetArtemisUriFromSystemPropertyAfterSplittingTheEntries() { + + final String userDefinedUri1 = "tcp://myServer2:61616?debug=true"; + final String userDefinedUri2 = "tcp://myServer2:61616?debug=true"; + final String userDefinedUri = userDefinedUri1 + ","+ userDefinedUri2; + System.setProperty(ARTEMIS_URI, userDefinedUri); + + assertThat(queueUriProvider.getArtemisQueueUri(), is(newArrayList(userDefinedUri1, userDefinedUri2))); + assertThat(artemisQueueUri(), is(newArrayList(userDefinedUri1, userDefinedUri2))); } @Test @@ -77,7 +92,7 @@ public void shouldGetTheRemoteUriForArtemisIfTheSystemPropertyIsSet() { final String remoteUri = "tcp://my.host.com:61616"; - assertThat(queueUriProvider.getArtemisQueueUri(), is(remoteUri)); - assertThat(artemisQueueUri(), is(remoteUri)); + assertThat(queueUriProvider.getArtemisQueueUri(), is(asList(remoteUri))); + assertThat(artemisQueueUri(), is(asList(remoteUri))); } }