diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d6c3a8..1f2d6d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to ## [Unreleased] +###Added +- Output of consumer name so that it's easy to see where the DLQ message was going to + ## [3.1.0] - 2018-10-30 ### Added diff --git a/pom.xml b/pom.xml index 3ea4d9b..ece64de 100644 --- a/pom.xml +++ b/pom.xml @@ -9,12 +9,13 @@ maven-parent-pom 1.7.1 + + artemis-manager 2.0.0 ${project.build.directory}/apache-artemis-${artemis.version} 1.5.6 1.7.10 - artemis-manager 1.3 1.48 diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java b/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java index 321cab2..7fe60bc 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java @@ -39,6 +39,29 @@ public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector { private final JmsProcessor jmsProcessor = new JmsProcessor(); private final JmsManagement jmsManagement = new JmsManagement(); +// private static final String JMS_URL = "tcp://%s:%s"; +// private static final String JMS_ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS"; +// private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi"; +// private static final String ID_PREFIX = "ID:"; +// private static final String BLANK = ""; +// private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}"; +// private static final String JMS_CONSUMER = "_AMQ_ORIG_QUEUE"; +// +// final OutputPrinter outputPrinter = new ConsolePrinter(); +// +// private Function, Long>> removeMessages = queueControl -> msgIds -> { +// long removedMessages = 0L; +// +// while (msgIds.hasNext()) { +// try { +// queueControl.removeMessage(format("ID:%s", msgIds.next())); +// removedMessages++; +// } catch (final Exception exception) { +// outputPrinter.writeException(exception); +// } +// } +//>>>>>>> Stashed changes + private List jmxServiceUrls; private Map jmxEnvironment; private ObjectNameBuilder objectNameBuilder; @@ -93,7 +116,14 @@ public long reprocess(final String destinationName, final Iterator msgId destinationName, jmxManagement.reprocessMessages(msgIds)).mapToLong(Long::longValue).sum(); } - +// +//<<<<<<< Updated upstream +//======= +// final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK); +// final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION); +// final String text; +// final String consumer = message.getStringProperty(JMS_CONSUMER); +//>>>>>>> Stashed changes @Override public int reprocessAll(final String destinationName) { @@ -105,6 +135,11 @@ public int reprocessAll(final String destinationName) { jmxManagement.reprocessAllMessages()).mapToInt(Integer::intValue).sum(); } +//<<<<<<< Updated upstream +//======= +// messages.add(new MessageData(jmsMessageID, originalDestination, text, consumer)); +// } +//>>>>>>> Stashed changes @Override public List queueNames() { diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java b/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java index 0b1e30f..1f2de32 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java @@ -31,23 +31,23 @@ public class JmxArtemisConnector implements ArtemisConnector { private final JmxManagement jmxManagement = new JmxManagement(new ConsolePrinter()); private List jmxServiceUrls; - private Map jmxEnvironment = new HashMap<>(); + private Map jmxEnvironment = new HashMap<>(); private ObjectNameBuilder objectNameBuilder; @Override public void setParameters(final List jmxUrls, - final String brokerName, - final String jmxUsername, - final String jmxPassword, - final String jmsUrl, - final String jmsUsername, - final String jmsPassword) { + final String brokerName, + final String jmxUsername, + final String jmxPassword, + final String jmsUrl, + final String jmsUsername, + final String jmsPassword) { this.jmxServiceUrls = jmxProcessor.processJmxUrls(jmxUrls); this.objectNameBuilder = jmxProcessor.getObjectNameBuilder(brokerName); if ((jmxUsername != null) && (jmxPassword != null)) { this.jmxEnvironment = new HashMap<>(); - this.jmxEnvironment.put(JMXConnector.CREDENTIALS, new String[]{ jmxUsername, jmxPassword }); + this.jmxEnvironment.put(JMXConnector.CREDENTIALS, new String[]{jmxUsername, jmxPassword}); } else { this.jmxEnvironment = emptyMap(); } @@ -56,10 +56,10 @@ public void setParameters(final List jmxUrls, @Override public List messagesOf(final String destinationName) { return jmxProcessor.processQueueControl(this.jmxServiceUrls, - this.jmxEnvironment, - this.objectNameBuilder, - destinationName, - jmxManagement.browseMessages()).flatMap( + this.jmxEnvironment, + this.objectNameBuilder, + destinationName, + jmxManagement.browseMessages()).flatMap( Collection::stream).collect(toList()); } @@ -95,10 +95,10 @@ public int reprocessAll(final String destinationName) { @Override public List queueNames() { return jmxProcessor.processServerControl( - this.jmxServiceUrls, - this.jmxEnvironment, - this.objectNameBuilder, - JMSServerControl::getQueueNames).flatMap( + this.jmxServiceUrls, + this.jmxEnvironment, + this.objectNameBuilder, + JMSServerControl::getQueueNames).flatMap( Arrays::stream).sorted(). distinct().collect(toList()); } @@ -107,34 +107,34 @@ public List queueNames() { @Override public Map queueMessageCount(final Collection queueNames) { return jmxProcessor.processQueues(this.jmxServiceUrls, - this.jmxEnvironment, - this.objectNameBuilder, - queueNames, - unchecked(DestinationControl::getMessageCount)).flatMap( + this.jmxEnvironment, + this.objectNameBuilder, + queueNames, + unchecked(DestinationControl::getMessageCount)).flatMap( m -> m.entrySet().stream()).collect( - groupingBy(Entry::getKey, - summingLong(Entry::getValue))); + groupingBy(Entry::getKey, + summingLong(Entry::getValue))); } @Override public List topicNames() { return jmxProcessor.processServerControl(this.jmxServiceUrls, - this.jmxEnvironment, - this.objectNameBuilder, - JMSServerControl::getTopicNames).flatMap( + this.jmxEnvironment, + this.objectNameBuilder, + JMSServerControl::getTopicNames).flatMap( Arrays::stream).sorted(). - distinct().collect(toList()); + distinct().collect(toList()); } @Override public Map topicMessageCount(final Collection topicNames) { return jmxProcessor.processTopics(this.jmxServiceUrls, - this.jmxEnvironment, - this.objectNameBuilder, - topicNames, - unchecked(DestinationControl::getMessageCount)).flatMap( + this.jmxEnvironment, + this.objectNameBuilder, + topicNames, + unchecked(DestinationControl::getMessageCount)).flatMap( m -> m.entrySet().stream()).collect( - groupingBy(Entry::getKey, - summingLong(Entry::getValue))); + groupingBy(Entry::getKey, + summingLong(Entry::getValue))); } } \ No newline at end of file diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/MessageData.java b/src/main/java/uk/gov/justice/artemis/manager/connector/MessageData.java index 90375b2..16ad856 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/MessageData.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/MessageData.java @@ -11,10 +11,12 @@ public class MessageData { private String msgId; private String originalDestination; private JsonObject msgContent; + private String consumer; - public MessageData(final String msgId, final String originalDestination, final String msgText) { + public MessageData(final String msgId, final String originalDestination, final String msgText, final String consumer) { this.msgId = msgId; this.originalDestination = originalDestination; + this.consumer = consumer; try(final JsonReader jsonReader = Json.createReader(new StringReader(String.valueOf(msgText)))) { this.msgContent = jsonReader.readObject(); @@ -32,4 +34,6 @@ public String getOriginalDestination() { public JsonObject getMsgContent() { return msgContent; } + + public String getConsumer() { return consumer; } } diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagement.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagement.java index 1d7a706..cfc47c5 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagement.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagement.java @@ -1,5 +1,7 @@ package uk.gov.justice.artemis.manager.connector.jms; +import static java.util.Arrays.stream; + import uk.gov.justice.artemis.manager.connector.MessageData; import java.util.ArrayList; @@ -16,6 +18,7 @@ public class JmsManagement { private static final String ID_PREFIX = "ID:"; private static final String BLANK = ""; private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}"; + private static final String CONSUMER = "_AMQ_ORIG_QUEUE"; public JmsManagementFunction> browseMessages() { return queueBrowser -> { @@ -29,6 +32,7 @@ public JmsManagementFunction> browseMessages() { final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK); final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION); + final String consumer = message.getStringProperty(CONSUMER); final String text; if (message instanceof TextMessage) { @@ -38,7 +42,7 @@ public JmsManagementFunction> browseMessages() { text = UNSUPPORTED_MESSAGE_CONTENT; } - messages.add(new MessageData(jmsMessageID, originalDestination, text)); + messages.add(new MessageData(jmsMessageID, originalDestination, text, consumer)); } return messages; diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagement.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagement.java index 7f8d2a6..616d534 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagement.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagement.java @@ -17,6 +17,8 @@ public class JmxManagement { private static final String JMS_MESSAGE_ID = "JMSMessageID"; private static final String ORIGINAL_DESTINATION = "OriginalDestination"; private static final String TEXT = "Text"; + private static final String PROPERTIES_TEXT = "PropertiesText"; + private static final String CONSUMER = "_AMQ_ORIG_QUEUE"; private final OutputPrinter outputPrinter; public JmxManagement(final OutputPrinter outputPrinter) { @@ -33,8 +35,10 @@ public JmxManagementFunction> browseMessages() { final String jmsMessageId = String.valueOf(message.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""); final String originalDestination = String.valueOf(message.get(ORIGINAL_DESTINATION)); final String text = String.valueOf(message.get(TEXT)); + final String consumer = stream(String.valueOf(message.get(PROPERTIES_TEXT)).split(",")) + .filter(e -> e.contains(CONSUMER)).findFirst().get().replaceFirst(CONSUMER + "=", "").trim(); - return new MessageData(jmsMessageId, originalDestination, text); + return new MessageData(jmsMessageId, originalDestination, text, consumer); }) .collect(toList()); diff --git a/src/main/java/uk/gov/justice/output/ConsolePrinter.java b/src/main/java/uk/gov/justice/output/ConsolePrinter.java index ca882be..0aaf627 100644 --- a/src/main/java/uk/gov/justice/output/ConsolePrinter.java +++ b/src/main/java/uk/gov/justice/output/ConsolePrinter.java @@ -39,7 +39,7 @@ private JsonObject toJsonObject(Map.Entry entry, String valueName) public void writeMap(final Map map, String valueName) { final JsonArrayBuilder arrayBuilder = Json.createArrayBuilder(); map.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach( - entry -> arrayBuilder.add(this.toJsonObject(entry, valueName)) + entry -> arrayBuilder.add(this.toJsonObject(entry, valueName)) ); System.out.println(arrayBuilder.build().toString()); } @@ -62,11 +62,15 @@ public void writeException(final Throwable throwable) { private String jsonStringOf(final List messageData) { final JsonArrayBuilder jsonResponse = Json.createArrayBuilder(); for (MessageData md : messageData) { - jsonResponse - .add(Json.createObjectBuilder().add("msgId", String.valueOf(md.getMsgId())) - .add("originalDestination", String.valueOf(md.getOriginalDestination())) - .add("msgContent", md.getMsgContent())); + JsonObjectBuilder builder = Json.createObjectBuilder().add("msgId", String.valueOf(md.getMsgId())) + .add("originalDestination", String.valueOf(md.getOriginalDestination())) + .add("msgContent", md.getMsgContent()); + if (md.getConsumer() != null) { + builder.add("consumer", md.getConsumer()); + } + + jsonResponse.add(builder.build()); } return jsonResponse.build().toString(); } diff --git a/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java b/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java index cb8e338..cb7053f 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java +++ b/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java @@ -45,8 +45,8 @@ public static void afterClass() throws JMSException { public void shouldBrowseMessagesInDLQ() throws Exception { cleanQueue(DLQ); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "jms.queue.abracadabra"); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "jms.queue.hocuspocus"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "consumer1", "jms.queue.abracadabra"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "consumer2", "jms.queue.hocuspocus"); final Output output = execute(COMMAND_LINE_BROWSE); assertThat(output.errorOutput, isEmptyString()); @@ -58,11 +58,13 @@ public void shouldBrowseMessagesInDLQ() throws Exception { assertThat(standardOutput, hasJsonPath("$[0].originalDestination", equalTo("jms.queue.abracadabra"))); assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.name", equalTo("some.name"))); assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12"))); + assertThat(standardOutput, hasJsonPath("$[0].consumer", equalTo("consumer1"))); assertThat(standardOutput, hasJsonPath("$[1].msgId")); assertThat(standardOutput, hasJsonPath("$[1].originalDestination", equalTo("jms.queue.hocuspocus"))); assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.name", equalTo("some.other.name"))); assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13"))); + assertThat(standardOutput, hasJsonPath("$[1].consumer", equalTo("consumer2"))); } @Test @@ -96,7 +98,7 @@ public void shouldReturnInfoIfMessageNotound() throws IOException { public void shouldRemoveMultipleMessagesReadingIdsFromSystemInput() throws Exception { if (notWindows()) { setDefaultDLQMessages(); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "jms.queue.abracadabra2"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "consumer1", "jms.queue.abracadabra2"); final String messageData = standardOutputOf(COMMAND_LINE_BROWSE); @@ -188,8 +190,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception { consumerOf("abracadabra"); consumerOf("hocuspocus"); - putInQueue(DLQ, createLargeMessage(4024L), "jms.queue.abracadabra"); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus"); + putInQueue(DLQ, createLargeMessage(4024L), "consumer1", "jms.queue.abracadabra"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus"); assertDLQHasSizeOf(2); @@ -210,8 +212,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception { private void setDefaultDLQMessages() throws JMSException, IOException { cleanQueue(DLQ); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "jms.queue.abracadabra"); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "consumer1", "jms.queue.abracadabra"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus"); assertDLQHasSizeOf(2); } diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java b/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java index 8d949c0..0bec918 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java +++ b/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java @@ -64,18 +64,20 @@ public void shouldReturnMessagesFromQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); final List messageData = combinedArtemisConnector.messagesOf(queue); assertThat(messageData, hasSize(2)); assertThat(messageData.get(0).getMsgId(), not(nullValue())); assertThat(messageData.get(0).getOriginalDestination(), is("origQueueO1")); assertThat(messageData.get(0).getMsgContent().getString("key1"), is("value123")); + assertThat(messageData.get(0).getConsumer(), is("consumer1")); assertThat(messageData.get(1).getMsgId(), not(nullValue())); assertThat(messageData.get(1).getOriginalDestination(), is("origQueueO2")); assertThat(messageData.get(1).getMsgContent().getString("key1"), is("valueBB")); + assertThat(messageData.get(1).getConsumer(), is("consumer2")); } @Test @@ -86,7 +88,7 @@ public void shouldReturnUnsupportedMessageTextForByteMessage() throws Exception final ByteArrayInputStream messageInput = new ByteArrayInputStream("{\"key1\":\"value123\"}".getBytes()); - putInQueue(queue, messageInput, "origQueueO1"); + putInQueue(queue, messageInput, "consumer1", "origQueueO1"); final List messageData = combinedArtemisConnector.messagesOf(queue); assertThat(messageData, hasSize(1)); @@ -101,9 +103,9 @@ public void shouldRemoveMessagesFromQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); - putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3", "origQueueO3"); final List messageData = combinedArtemisConnector.messagesOf(queue); assertThat(messageData, hasSize(3)); @@ -122,8 +124,8 @@ public void shouldIgnoreMessagesNotInTheQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); final List messageData = combinedArtemisConnector.messagesOf(queue); assertThat(messageData, hasSize(2)); @@ -142,9 +144,9 @@ public void shouldReturnNumberOfDeletedMessages() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); - putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3", "origQueueO3"); final List messageData = combinedArtemisConnector.messagesOf(queue); @@ -214,8 +216,8 @@ public void shouldReturnMessagesQueueCount() throws Exception { cleanQueueWithNewConsumer("origQueueO1"); cleanQueueWithNewConsumer("origQueueO2"); - putInQueue("origQueueO1", "{\"key1\":\"value123\"}"); - putInQueue("origQueueO2", "{\"key1\":\"valueBB\"}"); + putInQueue("origQueueO1", "defaultConsumer", "{\"key1\":\"value123\"}"); + putInQueue("origQueueO2", "defaultConsumer", "{\"key1\":\"valueBB\"}"); final Map messageCounts = combinedArtemisConnector.queueMessageCount(asList("origQueueO1", "origQueueO2")); assertThat(messageCounts.keySet(), hasSize(2)); diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java b/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java index e88b137..57606e5 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java +++ b/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java @@ -64,18 +64,20 @@ public void shouldReturnMessagesFromQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); final List messageData = jmxArtemisConnector.messagesOf(queue); assertThat(messageData, hasSize(2)); assertThat(messageData.get(0).getMsgId(), not(nullValue())); assertThat(messageData.get(0).getOriginalDestination(), is("origQueueO1")); assertThat(messageData.get(0).getMsgContent().getString("key1"), is("value123")); + assertThat(messageData.get(0).getConsumer(), is("consumer1")); assertThat(messageData.get(1).getMsgId(), not(nullValue())); assertThat(messageData.get(1).getOriginalDestination(), is("origQueueO2")); assertThat(messageData.get(1).getMsgContent().getString("key1"), is("valueBB")); + assertThat(messageData.get(1).getConsumer(), is("consumer2")); } @Test @@ -84,9 +86,9 @@ public void shouldRemoveMessagesFromQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); - putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2"); + putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3","origQueueO3"); final List messageData = jmxArtemisConnector.messagesOf(queue); assertThat(messageData, hasSize(3)); @@ -105,8 +107,8 @@ public void shouldIgnoreMessagesNotInTheQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2"); final List messageData = jmxArtemisConnector.messagesOf(queue); assertThat(messageData, hasSize(2)); @@ -125,9 +127,9 @@ public void shouldReturnNumberOfDeletedMessages() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); - putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2"); + putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3","origQueueO3"); final List messageData = jmxArtemisConnector.messagesOf(queue); diff --git a/src/test/java/uk/gov/justice/artemis/manager/util/JmsTestUtil.java b/src/test/java/uk/gov/justice/artemis/manager/util/JmsTestUtil.java index 6434169..49010f3 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/util/JmsTestUtil.java +++ b/src/test/java/uk/gov/justice/artemis/manager/util/JmsTestUtil.java @@ -25,6 +25,9 @@ public class JmsTestUtil { private static final ConnectionFactory JMS_CF = new ActiveMQConnectionFactory("tcp://localhost:61616?clientID=artemis-manager"); private static Connection JMS_CONNECTION; private static Session JMS_SESSION; + + private static final String ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS"; + private static final String CONSUMER = "_AMQ_ORIG_QUEUE"; private static Map QUEUES = new HashMap<>(); private static Map CONSUMERS = new HashMap<>(); private static Map PRODUCERS = new HashMap<>(); @@ -32,22 +35,24 @@ public class JmsTestUtil { private static Map SUBSCRIBERS = new HashMap<>(); private static Map PUBLISHERS = new HashMap<>(); - public static void putInQueue(final String queueName, final String msgText, final String... origAddress) throws JMSException { + public static void putInQueue(final String queueName, final String msgText, final String consumer, final String... origAddress) throws JMSException { TextMessage message = JMS_SESSION.createTextMessage(msgText); if (origAddress.length > 0) { - message.setStringProperty("_AMQ_ORIG_ADDRESS", origAddress[0]); + message.setStringProperty(ORIGINAL_DESTINATION, origAddress[0]); } + message.setStringProperty(CONSUMER, consumer); producerOf(queueName).send(message); } - public static void putInQueue(final String queueName, final InputStream messageInput, final String... origAddress) throws JMSException { + public static void putInQueue(final String queueName, final InputStream messageInput, final String consumer, final String... origAddress) throws JMSException { final Message message = JMS_SESSION.createBytesMessage(); message.setObjectProperty("JMS_AMQ_InputStream", messageInput); if (origAddress.length > 0) { - message.setStringProperty("_AMQ_ORIG_ADDRESS", origAddress[0]); + message.setStringProperty(ORIGINAL_DESTINATION, origAddress[0]); } + message.setStringProperty(CONSUMER, consumer); producerOf(queueName).send(message); } @@ -201,6 +206,8 @@ public static void closeJmsConnection() throws JMSException { public static void openJmsConnection() throws JMSException { JMS_CONNECTION = JMS_CF.createConnection(); JMS_SESSION = JMS_CONNECTION.createSession(false, Session.AUTO_ACKNOWLEDGE); +// JMS_SESSION = JMS_CONNECTION.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); +// JMS_CONNECTION = JMS_CF.createQueueConnection(); } private static Queue queueOf(final String queueName) {