From bbd71b380fa3769002ebe0593976b72ad2ecd0aa Mon Sep 17 00:00:00 2001 From: James Graham Date: Fri, 1 Jun 2018 22:24:51 +0100 Subject: [PATCH 1/2] Added consumer to output of browse operation --- .../CombinedJmsAndJmxArtemisConnector.java | 4 +++- .../connector/JmxArtemisConnector.java | 9 ++++++- .../manager/connector/MessageData.java | 6 ++++- .../uk/gov/justice/output/ConsolePrinter.java | 3 ++- .../artemis/manager/ArtemisManagerIT.java | 16 +++++++------ .../CombinedJmsAndJmxArtemisConnectorIT.java | 24 ++++++++++--------- .../connector/JmxArtemisConnectorIT.java | 22 +++++++++-------- .../artemis/manager/util/JmsTestUtil.java | 12 ++++++---- 8 files changed, 60 insertions(+), 36 deletions(-) diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java b/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java index 800e513..4586a5e 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java @@ -39,6 +39,7 @@ public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector { private static final String ID_PREFIX = "ID:"; private static final String BLANK = ""; private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}"; + private static final String JMS_CONSUMER = "_AMQ_ORIG_QUEUE"; final OutputPrinter outputPrinter = new ConsolePrinter(); @@ -96,6 +97,7 @@ public List messagesOf(final String host, final String port, final final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK); final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION); final String text; + final String consumer = message.getStringProperty(JMS_CONSUMER); if (message instanceof TextMessage) { final TextMessage textMessage = (TextMessage) message; @@ -104,7 +106,7 @@ public List messagesOf(final String host, final String port, final text = UNSUPPORTED_MESSAGE_CONTENT; } - messages.add(new MessageData(jmsMessageID, originalDestination, text)); + messages.add(new MessageData(jmsMessageID, originalDestination, text, consumer)); } return messages; diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java b/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java index bbebb3d..2254b74 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java @@ -28,6 +28,8 @@ public class JmxArtemisConnector implements ArtemisConnector { private static final String JMS_MESSAGE_ID = "JMSMessageID"; private static final String ORIGINAL_DESTINATION = "OriginalDestination"; private static final String TEXT = "Text"; + private static final String PROPERTIES_TEXT = "PropertiesText"; + private static final String CONSUMER = "_AMQ_ORIG_QUEUE"; final OutputPrinter outputPrinter = new ConsolePrinter(); @@ -35,7 +37,12 @@ public class JmxArtemisConnector implements ArtemisConnector { public List messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception { final CompositeData[] browseResult = queueControlOf(host, port, brokerName, destinationName).browse(); return stream(browseResult) - .map(cd -> new MessageData(String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""), String.valueOf(cd.get(ORIGINAL_DESTINATION)), String.valueOf(cd.get(TEXT)))) + .map(cd -> new MessageData( + String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""), + String.valueOf(cd.get(ORIGINAL_DESTINATION)), + String.valueOf(cd.get(TEXT)), + stream(String.valueOf(cd.get(PROPERTIES_TEXT)).split(",")) + .filter(e -> e.contains(CONSUMER)).findFirst().get().replaceFirst(CONSUMER + "=", "").trim())) .collect(toList()); } diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/MessageData.java b/src/main/java/uk/gov/justice/artemis/manager/connector/MessageData.java index 90375b2..16ad856 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/MessageData.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/MessageData.java @@ -11,10 +11,12 @@ public class MessageData { private String msgId; private String originalDestination; private JsonObject msgContent; + private String consumer; - public MessageData(final String msgId, final String originalDestination, final String msgText) { + public MessageData(final String msgId, final String originalDestination, final String msgText, final String consumer) { this.msgId = msgId; this.originalDestination = originalDestination; + this.consumer = consumer; try(final JsonReader jsonReader = Json.createReader(new StringReader(String.valueOf(msgText)))) { this.msgContent = jsonReader.readObject(); @@ -32,4 +34,6 @@ public String getOriginalDestination() { public JsonObject getMsgContent() { return msgContent; } + + public String getConsumer() { return consumer; } } diff --git a/src/main/java/uk/gov/justice/output/ConsolePrinter.java b/src/main/java/uk/gov/justice/output/ConsolePrinter.java index 9c04b0e..e762614 100644 --- a/src/main/java/uk/gov/justice/output/ConsolePrinter.java +++ b/src/main/java/uk/gov/justice/output/ConsolePrinter.java @@ -41,7 +41,8 @@ private String jsonStringOf(final List messageData) { jsonResponse .add(Json.createObjectBuilder().add("msgId", String.valueOf(md.getMsgId())) .add("originalDestination", String.valueOf(md.getOriginalDestination())) - .add("msgContent", md.getMsgContent())); + .add("msgContent", md.getMsgContent()) + .add("consumer",md.getConsumer())); } return jsonResponse.build().toString(); diff --git a/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java b/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java index f10ff0a..76c04f9 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java +++ b/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java @@ -46,8 +46,8 @@ public static void afterClass() throws JMSException { public void shouldBrowseMessagesInDLQ() throws Exception { cleanQueue(DLQ); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "jms.queue.abracadabra"); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "jms.queue.hocuspocus"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "consumer1", "jms.queue.abracadabra"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "consumer2", "jms.queue.hocuspocus"); final Output output = execute(COMMAND_LINE_BROWSE); assertThat(output.errorOutput, isEmptyString()); @@ -59,11 +59,13 @@ public void shouldBrowseMessagesInDLQ() throws Exception { assertThat(standardOutput, hasJsonPath("$[0].originalDestination", equalTo("jms.queue.abracadabra"))); assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.name", equalTo("some.name"))); assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12"))); + assertThat(standardOutput, hasJsonPath("$[0].consumer", equalTo("consumer1"))); assertThat(standardOutput, hasJsonPath("$[1].msgId")); assertThat(standardOutput, hasJsonPath("$[1].originalDestination", equalTo("jms.queue.hocuspocus"))); assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.name", equalTo("some.other.name"))); assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13"))); + assertThat(standardOutput, hasJsonPath("$[1].consumer", equalTo("consumer2"))); } @@ -119,7 +121,7 @@ public void shouldReturnInfoIfMessageNotound() throws IOException { public void shouldRemoveMultipleMessagesReadingIdsFromSystemInput() throws Exception { if (notWindows()) { setDefaultDLQMessages(); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "jms.queue.abracadabra2"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "consumer1", "jms.queue.abracadabra2"); final String messageData = standardOutputOf(COMMAND_LINE_BROWSE); @@ -211,8 +213,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception { consumerOf("abracadabra"); consumerOf("hocuspocus"); - putInQueue(DLQ, createLargeMessage(4024L), "jms.queue.abracadabra"); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus"); + putInQueue(DLQ, createLargeMessage(4024L), "consumer1", "jms.queue.abracadabra"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus"); assertDLQHasSizeOf(2); @@ -233,8 +235,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception { private void setDefaultDLQMessages() throws JMSException, IOException { cleanQueue(DLQ); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "jms.queue.abracadabra"); - putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "consumer1", "jms.queue.abracadabra"); + putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus"); assertDLQHasSizeOf(2); } diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java b/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java index c3369bc..872caea 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java +++ b/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java @@ -40,18 +40,20 @@ public void shouldReturnMessagesFromQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); final List messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue); assertThat(messageData, hasSize(2)); assertThat(messageData.get(0).getMsgId(), not(nullValue())); assertThat(messageData.get(0).getOriginalDestination(), is("origQueueO1")); assertThat(messageData.get(0).getMsgContent().getString("key1"), is("value123")); + assertThat(messageData.get(0).getConsumer(), is("consumer1")); assertThat(messageData.get(1).getMsgId(), not(nullValue())); assertThat(messageData.get(1).getOriginalDestination(), is("origQueueO2")); assertThat(messageData.get(1).getMsgContent().getString("key1"), is("valueBB")); + assertThat(messageData.get(1).getConsumer(), is("consumer2")); } @Test @@ -62,7 +64,7 @@ public void shouldReturnUnsupportedMessageTextForByteMessage() throws Exception final ByteArrayInputStream messageInput = new ByteArrayInputStream("{\"key1\":\"value123\"}".getBytes()); - putInQueue(queue, messageInput, "origQueueO1"); + putInQueue(queue, messageInput, "consumer1", "origQueueO1"); final List messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue); assertThat(messageData, hasSize(1)); @@ -77,9 +79,9 @@ public void shouldRemoveMessagesFromQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); - putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3", "origQueueO3"); final List messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue); assertThat(messageData, hasSize(3)); @@ -98,8 +100,8 @@ public void shouldIgnoreMessagesNotInTheQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); final List messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue); assertThat(messageData, hasSize(2)); @@ -118,9 +120,9 @@ public void shouldReturnNumberOfDeletedMessages() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); - putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3", "origQueueO3"); final List messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue); diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java b/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java index b36b28f..ba31960 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java +++ b/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java @@ -40,18 +40,20 @@ public void shouldReturnMessagesFromQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2"); final List messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue); assertThat(messageData, hasSize(2)); assertThat(messageData.get(0).getMsgId(), not(nullValue())); assertThat(messageData.get(0).getOriginalDestination(), is("origQueueO1")); assertThat(messageData.get(0).getMsgContent().getString("key1"), is("value123")); + assertThat(messageData.get(0).getConsumer(), is("consumer1")); assertThat(messageData.get(1).getMsgId(), not(nullValue())); assertThat(messageData.get(1).getOriginalDestination(), is("origQueueO2")); assertThat(messageData.get(1).getMsgContent().getString("key1"), is("valueBB")); + assertThat(messageData.get(1).getConsumer(), is("consumer2")); } @Test @@ -60,9 +62,9 @@ public void shouldRemoveMessagesFromQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); - putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2"); + putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3","origQueueO3"); final List messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue); assertThat(messageData, hasSize(3)); @@ -81,8 +83,8 @@ public void shouldIgnoreMessagesNotInTheQueue() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2"); final List messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue); assertThat(messageData, hasSize(2)); @@ -101,9 +103,9 @@ public void shouldReturnNumberOfDeletedMessages() throws Exception { cleanQueue(queue); - putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); - putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); - putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3"); + putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2"); + putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3","origQueueO3"); final List messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue); diff --git a/src/test/java/uk/gov/justice/artemis/manager/util/JmsTestUtil.java b/src/test/java/uk/gov/justice/artemis/manager/util/JmsTestUtil.java index 37855a3..36cb415 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/util/JmsTestUtil.java +++ b/src/test/java/uk/gov/justice/artemis/manager/util/JmsTestUtil.java @@ -22,28 +22,32 @@ public class JmsTestUtil { private static final QueueConnectionFactory JMS_CF = new ActiveMQQueueConnectionFactory("tcp://localhost:61616"); + private static final String ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS"; + private static final String CONSUMER = "_AMQ_ORIG_QUEUE"; private static QueueConnection JMS_CONNECTION; private static QueueSession JMS_SESSION; private static Map QUEUES = new HashMap<>(); private static Map CONSUMERS = new HashMap<>(); private static Map PRODUCERS = new HashMap<>(); - public static void putInQueue(final String queueName, final String msgText, final String... origAddress) throws JMSException { + public static void putInQueue(final String queueName, final String msgText, final String consumer, final String... origAddress) throws JMSException { TextMessage message = JMS_SESSION.createTextMessage(msgText); if (origAddress.length > 0) { - message.setStringProperty("_AMQ_ORIG_ADDRESS", origAddress[0]); + message.setStringProperty(ORIGINAL_DESTINATION, origAddress[0]); } + message.setStringProperty(CONSUMER, consumer); producerOf(queueName).send(message); } - public static void putInQueue(final String queueName, final InputStream messageInput, final String... origAddress) throws JMSException { + public static void putInQueue(final String queueName, final InputStream messageInput, final String consumer, final String... origAddress) throws JMSException { final Message message = JMS_SESSION.createBytesMessage(); message.setObjectProperty("JMS_AMQ_InputStream", messageInput); if (origAddress.length > 0) { - message.setStringProperty("_AMQ_ORIG_ADDRESS", origAddress[0]); + message.setStringProperty(ORIGINAL_DESTINATION, origAddress[0]); } + message.setStringProperty(CONSUMER, consumer); producerOf(queueName).send(message); } From 06a09e3414e387fddecdcdc71e1a8ec44f590cf5 Mon Sep 17 00:00:00 2001 From: James Graham Date: Fri, 29 Jun 2018 16:58:19 +0100 Subject: [PATCH 2/2] Updated changelog --- CHANGELOG.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fc62a17..d880c48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,11 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to ## [Unreleased] +## [2.1.0] - 2018-06-29 + +###Added +- Output of consumer name so that it's easy to see where the DLQ message was going to + ## [2.0.0] - 2017-0-15 ### Fixed