Skip to content

Commit

Permalink
Merge 06a09e3 into c26f368
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraham0325 committed Jun 29, 2018
2 parents c26f368 + 06a09e3 commit d215b87
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 36 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [2.1.0] - 2018-06-29

###Added
- Output of consumer name so that it's easy to see where the DLQ message was going to

## [2.0.0] - 2017-0-15

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector {
private static final String ID_PREFIX = "ID:";
private static final String BLANK = "";
private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}";
private static final String JMS_CONSUMER = "_AMQ_ORIG_QUEUE";

final OutputPrinter outputPrinter = new ConsolePrinter();

Expand Down Expand Up @@ -96,6 +97,7 @@ public List<MessageData> messagesOf(final String host, final String port, final
final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK);
final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION);
final String text;
final String consumer = message.getStringProperty(JMS_CONSUMER);

if (message instanceof TextMessage) {
final TextMessage textMessage = (TextMessage) message;
Expand All @@ -104,7 +106,7 @@ public List<MessageData> messagesOf(final String host, final String port, final
text = UNSUPPORTED_MESSAGE_CONTENT;
}

messages.add(new MessageData(jmsMessageID, originalDestination, text));
messages.add(new MessageData(jmsMessageID, originalDestination, text, consumer));
}

return messages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@ public class JmxArtemisConnector implements ArtemisConnector {
private static final String JMS_MESSAGE_ID = "JMSMessageID";
private static final String ORIGINAL_DESTINATION = "OriginalDestination";
private static final String TEXT = "Text";
private static final String PROPERTIES_TEXT = "PropertiesText";
private static final String CONSUMER = "_AMQ_ORIG_QUEUE";

final OutputPrinter outputPrinter = new ConsolePrinter();

@Override
public List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {
final CompositeData[] browseResult = queueControlOf(host, port, brokerName, destinationName).browse();
return stream(browseResult)
.map(cd -> new MessageData(String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""), String.valueOf(cd.get(ORIGINAL_DESTINATION)), String.valueOf(cd.get(TEXT))))
.map(cd -> new MessageData(
String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""),
String.valueOf(cd.get(ORIGINAL_DESTINATION)),
String.valueOf(cd.get(TEXT)),
stream(String.valueOf(cd.get(PROPERTIES_TEXT)).split(","))
.filter(e -> e.contains(CONSUMER)).findFirst().get().replaceFirst(CONSUMER + "=", "").trim()))
.collect(toList());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ public class MessageData {
private String msgId;
private String originalDestination;
private JsonObject msgContent;
private String consumer;

public MessageData(final String msgId, final String originalDestination, final String msgText) {
public MessageData(final String msgId, final String originalDestination, final String msgText, final String consumer) {
this.msgId = msgId;
this.originalDestination = originalDestination;
this.consumer = consumer;

try(final JsonReader jsonReader = Json.createReader(new StringReader(String.valueOf(msgText)))) {
this.msgContent = jsonReader.readObject();
Expand All @@ -32,4 +34,6 @@ public String getOriginalDestination() {
public JsonObject getMsgContent() {
return msgContent;
}

public String getConsumer() { return consumer; }
}
3 changes: 2 additions & 1 deletion src/main/java/uk/gov/justice/output/ConsolePrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ private String jsonStringOf(final List<MessageData> messageData) {
jsonResponse
.add(Json.createObjectBuilder().add("msgId", String.valueOf(md.getMsgId()))
.add("originalDestination", String.valueOf(md.getOriginalDestination()))
.add("msgContent", md.getMsgContent()));
.add("msgContent", md.getMsgContent())
.add("consumer",md.getConsumer()));

}
return jsonResponse.build().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public static void afterClass() throws JMSException {
public void shouldBrowseMessagesInDLQ() throws Exception {
cleanQueue(DLQ);

putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "jms.queue.hocuspocus");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "consumer1", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "consumer2", "jms.queue.hocuspocus");

final Output output = execute(COMMAND_LINE_BROWSE);
assertThat(output.errorOutput, isEmptyString());
Expand All @@ -59,11 +59,13 @@ public void shouldBrowseMessagesInDLQ() throws Exception {
assertThat(standardOutput, hasJsonPath("$[0].originalDestination", equalTo("jms.queue.abracadabra")));
assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.name", equalTo("some.name")));
assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12")));
assertThat(standardOutput, hasJsonPath("$[0].consumer", equalTo("consumer1")));

assertThat(standardOutput, hasJsonPath("$[1].msgId"));
assertThat(standardOutput, hasJsonPath("$[1].originalDestination", equalTo("jms.queue.hocuspocus")));
assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.name", equalTo("some.other.name")));
assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13")));
assertThat(standardOutput, hasJsonPath("$[1].consumer", equalTo("consumer2")));

}

Expand Down Expand Up @@ -119,7 +121,7 @@ public void shouldReturnInfoIfMessageNotound() throws IOException {
public void shouldRemoveMultipleMessagesReadingIdsFromSystemInput() throws Exception {
if (notWindows()) {
setDefaultDLQMessages();
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "jms.queue.abracadabra2");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "consumer1", "jms.queue.abracadabra2");


final String messageData = standardOutputOf(COMMAND_LINE_BROWSE);
Expand Down Expand Up @@ -211,8 +213,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception {
consumerOf("abracadabra");
consumerOf("hocuspocus");

putInQueue(DLQ, createLargeMessage(4024L), "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus");
putInQueue(DLQ, createLargeMessage(4024L), "consumer1", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus");

assertDLQHasSizeOf(2);

Expand All @@ -233,8 +235,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception {
private void setDefaultDLQMessages() throws JMSException, IOException {
cleanQueue(DLQ);

putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "consumer1", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus");

assertDLQHasSizeOf(2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ public void shouldReturnMessagesFromQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);
assertThat(messageData, hasSize(2));
assertThat(messageData.get(0).getMsgId(), not(nullValue()));
assertThat(messageData.get(0).getOriginalDestination(), is("origQueueO1"));
assertThat(messageData.get(0).getMsgContent().getString("key1"), is("value123"));
assertThat(messageData.get(0).getConsumer(), is("consumer1"));

assertThat(messageData.get(1).getMsgId(), not(nullValue()));
assertThat(messageData.get(1).getOriginalDestination(), is("origQueueO2"));
assertThat(messageData.get(1).getMsgContent().getString("key1"), is("valueBB"));
assertThat(messageData.get(1).getConsumer(), is("consumer2"));
}

@Test
Expand All @@ -62,7 +64,7 @@ public void shouldReturnUnsupportedMessageTextForByteMessage() throws Exception

final ByteArrayInputStream messageInput = new ByteArrayInputStream("{\"key1\":\"value123\"}".getBytes());

putInQueue(queue, messageInput, "origQueueO1");
putInQueue(queue, messageInput, "consumer1", "origQueueO1");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);
assertThat(messageData, hasSize(1));
Expand All @@ -77,9 +79,9 @@ public void shouldRemoveMessagesFromQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3", "origQueueO3");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);
assertThat(messageData, hasSize(3));
Expand All @@ -98,8 +100,8 @@ public void shouldIgnoreMessagesNotInTheQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);
assertThat(messageData, hasSize(2));
Expand All @@ -118,9 +120,9 @@ public void shouldReturnNumberOfDeletedMessages() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3", "origQueueO3");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ public void shouldReturnMessagesFromQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");

final List<MessageData> messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue);
assertThat(messageData, hasSize(2));
assertThat(messageData.get(0).getMsgId(), not(nullValue()));
assertThat(messageData.get(0).getOriginalDestination(), is("origQueueO1"));
assertThat(messageData.get(0).getMsgContent().getString("key1"), is("value123"));
assertThat(messageData.get(0).getConsumer(), is("consumer1"));

assertThat(messageData.get(1).getMsgId(), not(nullValue()));
assertThat(messageData.get(1).getOriginalDestination(), is("origQueueO2"));
assertThat(messageData.get(1).getMsgContent().getString("key1"), is("valueBB"));
assertThat(messageData.get(1).getConsumer(), is("consumer2"));
}

@Test
Expand All @@ -60,9 +62,9 @@ public void shouldRemoveMessagesFromQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3","origQueueO3");

final List<MessageData> messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue);
assertThat(messageData, hasSize(3));
Expand All @@ -81,8 +83,8 @@ public void shouldIgnoreMessagesNotInTheQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2");

final List<MessageData> messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue);
assertThat(messageData, hasSize(2));
Expand All @@ -101,9 +103,9 @@ public void shouldReturnNumberOfDeletedMessages() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3","origQueueO3");

final List<MessageData> messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,32 @@
public class JmsTestUtil {

private static final QueueConnectionFactory JMS_CF = new ActiveMQQueueConnectionFactory("tcp://localhost:61616");
private static final String ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS";
private static final String CONSUMER = "_AMQ_ORIG_QUEUE";
private static QueueConnection JMS_CONNECTION;
private static QueueSession JMS_SESSION;
private static Map<String, Queue> QUEUES = new HashMap<>();
private static Map<String, MessageConsumer> CONSUMERS = new HashMap<>();
private static Map<String, MessageProducer> PRODUCERS = new HashMap<>();

public static void putInQueue(final String queueName, final String msgText, final String... origAddress) throws JMSException {
public static void putInQueue(final String queueName, final String msgText, final String consumer, final String... origAddress) throws JMSException {
TextMessage message = JMS_SESSION.createTextMessage(msgText);
if (origAddress.length > 0) {
message.setStringProperty("_AMQ_ORIG_ADDRESS", origAddress[0]);
message.setStringProperty(ORIGINAL_DESTINATION, origAddress[0]);
}
message.setStringProperty(CONSUMER, consumer);
producerOf(queueName).send(message);
}

public static void putInQueue(final String queueName, final InputStream messageInput, final String... origAddress) throws JMSException {
public static void putInQueue(final String queueName, final InputStream messageInput, final String consumer, final String... origAddress) throws JMSException {
final Message message = JMS_SESSION.createBytesMessage();

message.setObjectProperty("JMS_AMQ_InputStream", messageInput);

if (origAddress.length > 0) {
message.setStringProperty("_AMQ_ORIG_ADDRESS", origAddress[0]);
message.setStringProperty(ORIGINAL_DESTINATION, origAddress[0]);
}
message.setStringProperty(CONSUMER, consumer);

producerOf(queueName).send(message);
}
Expand Down

0 comments on commit d215b87

Please sign in to comment.