Skip to content

Commit

Permalink
TP-973 Add consumer to browse messages output
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Rich committed Feb 11, 2019
1 parent e900c44 commit f956063
Show file tree
Hide file tree
Showing 12 changed files with 106 additions and 78 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

###Added
- Output of consumer name so that it's easy to see where the DLQ message was going to

## [3.1.0] - 2018-10-30

### Added
Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
<artifactId>maven-parent-pom</artifactId>
<version>1.7.1</version>
</parent>

<properties>
<cpp.repo.name>artemis-manager</cpp.repo.name>
<framework-tools-command.version>2.0.0</framework-tools-command.version>
<artemis.home>${project.build.directory}/apache-artemis-${artemis.version}</artemis.home>
<artemis.version>1.5.6</artemis.version>
<slf4j-version>1.7.10</slf4j-version>
<cpp.repo.name>artemis-manager</cpp.repo.name>
<throwing-function.version>1.3</throwing-function.version>
<jcommander.version>1.48</jcommander.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public long reprocess(final String destinationName, final Iterator<String> msgId
jmxManagement.reprocessMessages(msgIds)).mapToLong(Long::longValue).sum();
}


@Override
public int reprocessAll(final String destinationName) {
return jmxProcessor.processQueueControl(
Expand All @@ -105,7 +104,6 @@ public int reprocessAll(final String destinationName) {
jmxManagement.reprocessAllMessages()).mapToInt(Integer::intValue).sum();
}


@Override
public List<String> queueNames() {
return jmxProcessor.processServerControl(
Expand All @@ -117,7 +115,6 @@ public List<String> queueNames() {
distinct().collect(toList());
}


@Override
public Map<String, Long> queueMessageCount(final Collection<String> queueNames) {
return jmxProcessor.processQueues(this.jmxServiceUrls,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@ public class JmxArtemisConnector implements ArtemisConnector {
private final JmxManagement jmxManagement = new JmxManagement(new ConsolePrinter());

private List<JMXServiceURL> jmxServiceUrls;
private Map<String,String[]> jmxEnvironment = new HashMap<>();
private Map<String, String[]> jmxEnvironment = new HashMap<>();
private ObjectNameBuilder objectNameBuilder;

@Override
public void setParameters(final List<String> jmxUrls,
final String brokerName,
final String jmxUsername,
final String jmxPassword,
final String jmsUrl,
final String jmsUsername,
final String jmsPassword) {
final String brokerName,
final String jmxUsername,
final String jmxPassword,
final String jmsUrl,
final String jmsUsername,
final String jmsPassword) {
this.jmxServiceUrls = jmxProcessor.processJmxUrls(jmxUrls);
this.objectNameBuilder = jmxProcessor.getObjectNameBuilder(brokerName);

if ((jmxUsername != null) && (jmxPassword != null)) {
this.jmxEnvironment = new HashMap<>();
this.jmxEnvironment.put(JMXConnector.CREDENTIALS, new String[]{ jmxUsername, jmxPassword });
this.jmxEnvironment.put(JMXConnector.CREDENTIALS, new String[]{jmxUsername, jmxPassword});
} else {
this.jmxEnvironment = emptyMap();
}
Expand All @@ -56,10 +56,10 @@ public void setParameters(final List<String> jmxUrls,
@Override
public List<MessageData> messagesOf(final String destinationName) {
return jmxProcessor.processQueueControl(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
destinationName,
jmxManagement.browseMessages()).flatMap(
this.jmxEnvironment,
this.objectNameBuilder,
destinationName,
jmxManagement.browseMessages()).flatMap(
Collection::stream).collect(toList());
}

Expand Down Expand Up @@ -95,10 +95,10 @@ public int reprocessAll(final String destinationName) {
@Override
public List<String> queueNames() {
return jmxProcessor.processServerControl(
this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
JMSServerControl::getQueueNames).flatMap(
this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
JMSServerControl::getQueueNames).flatMap(
Arrays::stream).sorted().
distinct().collect(toList());
}
Expand All @@ -107,34 +107,34 @@ public List<String> queueNames() {
@Override
public Map<String, Long> queueMessageCount(final Collection<String> queueNames) {
return jmxProcessor.processQueues(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
queueNames,
unchecked(DestinationControl::getMessageCount)).flatMap(
this.jmxEnvironment,
this.objectNameBuilder,
queueNames,
unchecked(DestinationControl::getMessageCount)).flatMap(
m -> m.entrySet().stream()).collect(
groupingBy(Entry::getKey,
summingLong(Entry::getValue)));
groupingBy(Entry::getKey,
summingLong(Entry::getValue)));
}

@Override
public List<String> topicNames() {
return jmxProcessor.processServerControl(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
JMSServerControl::getTopicNames).flatMap(
this.jmxEnvironment,
this.objectNameBuilder,
JMSServerControl::getTopicNames).flatMap(
Arrays::stream).sorted().
distinct().collect(toList());
distinct().collect(toList());
}

@Override
public Map<String, Long> topicMessageCount(final Collection<String> topicNames) {
return jmxProcessor.processTopics(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
topicNames,
unchecked(DestinationControl::getMessageCount)).flatMap(
this.jmxEnvironment,
this.objectNameBuilder,
topicNames,
unchecked(DestinationControl::getMessageCount)).flatMap(
m -> m.entrySet().stream()).collect(
groupingBy(Entry::getKey,
summingLong(Entry::getValue)));
groupingBy(Entry::getKey,
summingLong(Entry::getValue)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ public class MessageData {
private String msgId;
private String originalDestination;
private JsonObject msgContent;
private String consumer;

public MessageData(final String msgId, final String originalDestination, final String msgText) {
public MessageData(final String msgId, final String originalDestination, final String msgText, final String consumer) {
this.msgId = msgId;
this.originalDestination = originalDestination;
this.consumer = consumer;

try(final JsonReader jsonReader = Json.createReader(new StringReader(String.valueOf(msgText)))) {
this.msgContent = jsonReader.readObject();
Expand All @@ -32,4 +34,6 @@ public String getOriginalDestination() {
public JsonObject getMsgContent() {
return msgContent;
}

public String getConsumer() { return consumer; }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package uk.gov.justice.artemis.manager.connector.jms;

import static java.util.Arrays.stream;

import uk.gov.justice.artemis.manager.connector.MessageData;

import java.util.ArrayList;
Expand All @@ -16,6 +18,7 @@ public class JmsManagement {
private static final String ID_PREFIX = "ID:";
private static final String BLANK = "";
private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}";
private static final String CONSUMER = "_AMQ_ORIG_QUEUE";

public JmsManagementFunction<List<MessageData>> browseMessages() {
return queueBrowser -> {
Expand All @@ -29,6 +32,7 @@ public JmsManagementFunction<List<MessageData>> browseMessages() {

final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK);
final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION);
final String consumer = message.getStringProperty(CONSUMER);
final String text;

if (message instanceof TextMessage) {
Expand All @@ -38,7 +42,7 @@ public JmsManagementFunction<List<MessageData>> browseMessages() {
text = UNSUPPORTED_MESSAGE_CONTENT;
}

messages.add(new MessageData(jmsMessageID, originalDestination, text));
messages.add(new MessageData(jmsMessageID, originalDestination, text, consumer));
}

return messages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class JmxManagement {
private static final String JMS_MESSAGE_ID = "JMSMessageID";
private static final String ORIGINAL_DESTINATION = "OriginalDestination";
private static final String TEXT = "Text";
private static final String PROPERTIES_TEXT = "PropertiesText";
private static final String CONSUMER = "_AMQ_ORIG_QUEUE";
private final OutputPrinter outputPrinter;

public JmxManagement(final OutputPrinter outputPrinter) {
Expand All @@ -33,8 +35,10 @@ public JmxManagementFunction<List<MessageData>> browseMessages() {
final String jmsMessageId = String.valueOf(message.get(JMS_MESSAGE_ID)).replaceFirst("ID:", "");
final String originalDestination = String.valueOf(message.get(ORIGINAL_DESTINATION));
final String text = String.valueOf(message.get(TEXT));
final String consumer = stream(String.valueOf(message.get(PROPERTIES_TEXT)).split(","))
.filter(e -> e.contains(CONSUMER)).findFirst().get().replaceFirst(CONSUMER + "=", "").trim();

return new MessageData(jmsMessageId, originalDestination, text);
return new MessageData(jmsMessageId, originalDestination, text, consumer);
})
.collect(toList());

Expand Down
14 changes: 9 additions & 5 deletions src/main/java/uk/gov/justice/output/ConsolePrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ private JsonObject toJsonObject(Map.Entry<String, Long> entry, String valueName)
public void writeMap(final Map<String, Long> map, String valueName) {
final JsonArrayBuilder arrayBuilder = Json.createArrayBuilder();
map.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(
entry -> arrayBuilder.add(this.toJsonObject(entry, valueName))
entry -> arrayBuilder.add(this.toJsonObject(entry, valueName))
);
System.out.println(arrayBuilder.build().toString());
}
Expand All @@ -62,11 +62,15 @@ public void writeException(final Throwable throwable) {
private String jsonStringOf(final List<MessageData> messageData) {
final JsonArrayBuilder jsonResponse = Json.createArrayBuilder();
for (MessageData md : messageData) {
jsonResponse
.add(Json.createObjectBuilder().add("msgId", String.valueOf(md.getMsgId()))
.add("originalDestination", String.valueOf(md.getOriginalDestination()))
.add("msgContent", md.getMsgContent()));
JsonObjectBuilder builder = Json.createObjectBuilder().add("msgId", String.valueOf(md.getMsgId()))
.add("originalDestination", String.valueOf(md.getOriginalDestination()))
.add("msgContent", md.getMsgContent());

if (md.getConsumer() != null) {
builder.add("consumer", md.getConsumer());
}

jsonResponse.add(builder.build());
}
return jsonResponse.build().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public static void afterClass() throws JMSException {
public void shouldBrowseMessagesInDLQ() throws Exception {
cleanQueue(DLQ);

putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "jms.queue.hocuspocus");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12\"}}", "consumer1", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\",\"id\":\"c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13\"}}", "consumer2", "jms.queue.hocuspocus");

final Output output = execute(COMMAND_LINE_BROWSE);
assertThat(output.errorOutput, isEmptyString());
Expand All @@ -58,11 +58,13 @@ public void shouldBrowseMessagesInDLQ() throws Exception {
assertThat(standardOutput, hasJsonPath("$[0].originalDestination", equalTo("jms.queue.abracadabra")));
assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.name", equalTo("some.name")));
assertThat(standardOutput, hasJsonPath("$[0].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea12")));
assertThat(standardOutput, hasJsonPath("$[0].consumer", equalTo("consumer1")));

assertThat(standardOutput, hasJsonPath("$[1].msgId"));
assertThat(standardOutput, hasJsonPath("$[1].originalDestination", equalTo("jms.queue.hocuspocus")));
assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.name", equalTo("some.other.name")));
assertThat(standardOutput, hasJsonPath("$[1].msgContent._metadata.id", equalTo("c97c5b7b-abc3-49d4-96a9-bcd83aa4ea13")));
assertThat(standardOutput, hasJsonPath("$[1].consumer", equalTo("consumer2")));
}

@Test
Expand Down Expand Up @@ -96,7 +98,7 @@ public void shouldReturnInfoIfMessageNotound() throws IOException {
public void shouldRemoveMultipleMessagesReadingIdsFromSystemInput() throws Exception {
if (notWindows()) {
setDefaultDLQMessages();
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "jms.queue.abracadabra2");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name2\"}}", "consumer1", "jms.queue.abracadabra2");


final String messageData = standardOutputOf(COMMAND_LINE_BROWSE);
Expand Down Expand Up @@ -188,8 +190,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception {
consumerOf("abracadabra");
consumerOf("hocuspocus");

putInQueue(DLQ, createLargeMessage(4024L), "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus");
putInQueue(DLQ, createLargeMessage(4024L), "consumer1", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus");

assertDLQHasSizeOf(2);

Expand All @@ -210,8 +212,8 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception {
private void setDefaultDLQMessages() throws JMSException, IOException {
cleanQueue(DLQ);

putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "jms.queue.hocuspocus");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.name\"}}", "consumer1", "jms.queue.abracadabra");
putInQueue(DLQ, "{\"_metadata\":{\"name\":\"some.other.name\"}}", "consumer2", "jms.queue.hocuspocus");

assertDLQHasSizeOf(2);
}
Expand Down
Loading

0 comments on commit f956063

Please sign in to comment.