Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added consumer to output #7

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [2.1.0] - 2018-06-29

###Added
- Output of consumer name so that it's easy to see where the DLQ message was going to

## [2.0.0] - 2017-0-15

### Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector {
private static final String ID_PREFIX = "ID:";
private static final String BLANK = "";
private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}";
private static final String JMS_CONSUMER = "_AMQ_ORIG_QUEUE";

final OutputPrinter outputPrinter = new ConsolePrinter();

Expand Down Expand Up @@ -96,6 +97,7 @@ public List<MessageData> messagesOf(final String host, final String port, final
final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK);
final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION);
final String text;
final String consumer = message.getStringProperty(JMS_CONSUMER);

if (message instanceof TextMessage) {
final TextMessage textMessage = (TextMessage) message;
Expand All @@ -104,7 +106,7 @@ public List<MessageData> messagesOf(final String host, final String port, final
text = UNSUPPORTED_MESSAGE_CONTENT;
}

messages.add(new MessageData(jmsMessageID, originalDestination, text));
messages.add(new MessageData(jmsMessageID, originalDestination, text, consumer));
}

return messages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@ public class JmxArtemisConnector implements ArtemisConnector {
private static final String JMS_MESSAGE_ID = "JMSMessageID";
private static final String ORIGINAL_DESTINATION = "OriginalDestination";
private static final String TEXT = "Text";
private static final String PROPERTIES_TEXT = "PropertiesText";
private static final String CONSUMER = "_AMQ_ORIG_QUEUE";

final OutputPrinter outputPrinter = new ConsolePrinter();

@Override
public List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {
final CompositeData[] browseResult = queueControlOf(host, port, brokerName, destinationName).browse();
return stream(browseResult)
.map(cd -> new MessageData(String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""), String.valueOf(cd.get(ORIGINAL_DESTINATION)), String.valueOf(cd.get(TEXT))))
.map(cd -> new MessageData(
String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""),
String.valueOf(cd.get(ORIGINAL_DESTINATION)),
String.valueOf(cd.get(TEXT)),
stream(String.valueOf(cd.get(PROPERTIES_TEXT)).split(","))
.filter(e -> e.contains(CONSUMER)).findFirst().get().replaceFirst(CONSUMER + "=", "").trim()))
.collect(toList());

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ public class MessageData {
private String msgId;
private String originalDestination;
private JsonObject msgContent;
private String consumer;

public MessageData(final String msgId, final String originalDestination, final String msgText) {
public MessageData(final String msgId, final String originalDestination, final String msgText, final String consumer) {
this.msgId = msgId;
this.originalDestination = originalDestination;
this.consumer = consumer;

try(final JsonReader jsonReader = Json.createReader(new StringReader(String.valueOf(msgText)))) {
this.msgContent = jsonReader.readObject();
Expand All @@ -32,4 +34,6 @@ public String getOriginalDestination() {
public JsonObject getMsgContent() {
return msgContent;
}

public String getConsumer() { return consumer; }
}
3 changes: 2 additions & 1 deletion src/main/java/uk/gov/justice/output/ConsolePrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ private String jsonStringOf(final List<MessageData> messageData) {
jsonResponse
.add(Json.createObjectBuilder().add("msgId", String.valueOf(md.getMsgId()))
.add("originalDestination", String.valueOf(md.getOriginalDestination()))
.add("msgContent", md.getMsgContent()));
.add("msgContent", md.getMsgContent())
.add("consumer",md.getConsumer()));

}
return jsonResponse.build().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public static void afterClass() throws JMSException {
public void shouldBrowseMessagesInDLQ() throws Exception {
cleanQueue(DLQ);

putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "jms.queue.hocuspocus");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "consumer1", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "consumer2", "jms.queue.hocuspocus");

final Output output = execute(COMMAND_LINE_BROWSE);
assertThat(output.errorOutput, isEmptyString());
Expand All @@ -59,11 +59,13 @@ public void shouldBrowseMessagesInDLQ() throws Exception {
assertThat(standardOutput, hasJsonPath("$[0].originalDestination", equalTo("jms.queue.abracadabra")));
assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.name", equalTo("some.name")));
assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12")));
assertThat(standardOutput, hasJsonPath("$[0].consumer", equalTo("consumer1")));

assertThat(standardOutput, hasJsonPath("$[1].msgId"));
assertThat(standardOutput, hasJsonPath("$[1].originalDestination", equalTo("jms.queue.hocuspocus")));
assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.name", equalTo("some.other.name")));
assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13")));
assertThat(standardOutput, hasJsonPath("$[1].consumer", equalTo("consumer2")));

}

Expand Down Expand Up @@ -119,7 +121,7 @@ public void shouldReturnInfoIfMessageNotound() throws IOException {
public void shouldRemoveMultipleMessagesReadingIdsFromSystemInput() throws Exception {
if (notWindows()) {
setDefaultDLQMessages();
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "jms.queue.abracadabra2");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "consumer1", "jms.queue.abracadabra2");


final String messageData = standardOutputOf(COMMAND_LINE_BROWSE);
Expand Down Expand Up @@ -211,8 +213,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception {
consumerOf("abracadabra");
consumerOf("hocuspocus");

putInQueue(DLQ, createLargeMessage(4024L), "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus");
putInQueue(DLQ, createLargeMessage(4024L), "consumer1", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus");

assertDLQHasSizeOf(2);

Expand All @@ -233,8 +235,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception {
private void setDefaultDLQMessages() throws JMSException, IOException {
cleanQueue(DLQ);

putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "consumer1", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus");

assertDLQHasSizeOf(2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ public void shouldReturnMessagesFromQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);
assertThat(messageData, hasSize(2));
assertThat(messageData.get(0).getMsgId(), not(nullValue()));
assertThat(messageData.get(0).getOriginalDestination(), is("origQueueO1"));
assertThat(messageData.get(0).getMsgContent().getString("key1"), is("value123"));
assertThat(messageData.get(0).getConsumer(), is("consumer1"));

assertThat(messageData.get(1).getMsgId(), not(nullValue()));
assertThat(messageData.get(1).getOriginalDestination(), is("origQueueO2"));
assertThat(messageData.get(1).getMsgContent().getString("key1"), is("valueBB"));
assertThat(messageData.get(1).getConsumer(), is("consumer2"));
}

@Test
Expand All @@ -62,7 +64,7 @@ public void shouldReturnUnsupportedMessageTextForByteMessage() throws Exception

final ByteArrayInputStream messageInput = new ByteArrayInputStream("{\"key1\":\"value123\"}".getBytes());

putInQueue(queue, messageInput, "origQueueO1");
putInQueue(queue, messageInput, "consumer1", "origQueueO1");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);
assertThat(messageData, hasSize(1));
Expand All @@ -77,9 +79,9 @@ public void shouldRemoveMessagesFromQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3", "origQueueO3");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);
assertThat(messageData, hasSize(3));
Expand All @@ -98,8 +100,8 @@ public void shouldIgnoreMessagesNotInTheQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);
assertThat(messageData, hasSize(2));
Expand All @@ -118,9 +120,9 @@ public void shouldReturnNumberOfDeletedMessages() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3", "origQueueO3");

final List<MessageData> messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,20 @@ public void shouldReturnMessagesFromQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2", "origQueueO2");

final List<MessageData> messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue);
assertThat(messageData, hasSize(2));
assertThat(messageData.get(0).getMsgId(), not(nullValue()));
assertThat(messageData.get(0).getOriginalDestination(), is("origQueueO1"));
assertThat(messageData.get(0).getMsgContent().getString("key1"), is("value123"));
assertThat(messageData.get(0).getConsumer(), is("consumer1"));

assertThat(messageData.get(1).getMsgId(), not(nullValue()));
assertThat(messageData.get(1).getOriginalDestination(), is("origQueueO2"));
assertThat(messageData.get(1).getMsgContent().getString("key1"), is("valueBB"));
assertThat(messageData.get(1).getConsumer(), is("consumer2"));
}

@Test
Expand All @@ -60,9 +62,9 @@ public void shouldRemoveMessagesFromQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3","origQueueO3");

final List<MessageData> messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue);
assertThat(messageData, hasSize(3));
Expand All @@ -81,8 +83,8 @@ public void shouldIgnoreMessagesNotInTheQueue() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2");

final List<MessageData> messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue);
assertThat(messageData, hasSize(2));
Expand All @@ -101,9 +103,9 @@ public void shouldReturnNumberOfDeletedMessages() throws Exception {

cleanQueue(queue);

putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "origQueueO3");
putInQueue(queue, "{\"key1\":\"value123\"}", "consumer1","origQueueO1");
putInQueue(queue, "{\"key1\":\"valueBB\"}", "consumer2","origQueueO2");
putInQueue(queue, "{\"key1\":\"valueCC\"}", "consumer3","origQueueO3");

final List<MessageData> messageData = jmxArtemisConnector.messagesOf("localhost", "3000", "0.0.0.0", queue);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,32 @@
public class JmsTestUtil {

private static final QueueConnectionFactory JMS_CF = new ActiveMQQueueConnectionFactory("tcp://localhost:61616");
private static final String ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS";
private static final String CONSUMER = "_AMQ_ORIG_QUEUE";
private static QueueConnection JMS_CONNECTION;
private static QueueSession JMS_SESSION;
private static Map<String, Queue> QUEUES = new HashMap<>();
private static Map<String, MessageConsumer> CONSUMERS = new HashMap<>();
private static Map<String, MessageProducer> PRODUCERS = new HashMap<>();

public static void putInQueue(final String queueName, final String msgText, final String... origAddress) throws JMSException {
public static void putInQueue(final String queueName, final String msgText, final String consumer, final String... origAddress) throws JMSException {
TextMessage message = JMS_SESSION.createTextMessage(msgText);
if (origAddress.length > 0) {
message.setStringProperty("_AMQ_ORIG_ADDRESS", origAddress[0]);
message.setStringProperty(ORIGINAL_DESTINATION, origAddress[0]);
}
message.setStringProperty(CONSUMER, consumer);
producerOf(queueName).send(message);
}

public static void putInQueue(final String queueName, final InputStream messageInput, final String... origAddress) throws JMSException {
public static void putInQueue(final String queueName, final InputStream messageInput, final String consumer, final String... origAddress) throws JMSException {
final Message message = JMS_SESSION.createBytesMessage();

message.setObjectProperty("JMS_AMQ_InputStream", messageInput);

if (origAddress.length > 0) {
message.setStringProperty("_AMQ_ORIG_ADDRESS", origAddress[0]);
message.setStringProperty(ORIGINAL_DESTINATION, origAddress[0]);
}
message.setStringProperty(CONSUMER, consumer);

producerOf(queueName).send(message);
}
Expand Down