Skip to content

Commit

Permalink
Merge b64c639 into 074e13e
Browse files Browse the repository at this point in the history
  • Loading branch information
bdellegrazie committed Aug 20, 2018
2 parents 074e13e + b64c639 commit ebcabff
Show file tree
Hide file tree
Showing 16 changed files with 323 additions and 24 deletions.
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<artemis.version>1.5.6</artemis.version>
<slf4j-version>1.7.10</slf4j-version>
<cpp.repo.name>artemis-manager</cpp.repo.name>
<throwing-function.version>1.3</throwing-function.version>
</properties>

<artifactId>artemis-manager</artifactId>
Expand Down Expand Up @@ -60,6 +61,11 @@
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
<dependency>
<groupId>pl.touk</groupId>
<artifactId>throwing-function</artifactId>
<version>${throwing-function.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.Iterator;
import java.util.List;
import java.util.Map;

public interface ArtemisConnector {

Expand All @@ -14,4 +15,8 @@ public interface ArtemisConnector {
String[] queueNames(final String host, final String port, final String brokerName) throws Exception;

String[] topicNames(final String host, final String port, final String brokerName) throws Exception;

Map<String, Long> queueMessageCount(final String host, final String port, final String brokerName, final String[] queueNames) throws Exception;

Map<String, Long> topicMessageCount(final String host, final String port, final String brokerName, final String[] topicNames) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
package uk.gov.justice.artemis.manager.connector;

import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;
import static pl.touk.throwing.ThrowingFunction.unchecked;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.activemq.artemis.api.jms.management.DestinationControl;
import org.apache.activemq.artemis.api.jms.management.JMSServerControl;

import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;

/**
* reprocess, remove and messagesOf were re-implemented in JMS due to issues with large messages
* over JMX.
Expand Down Expand Up @@ -42,8 +46,18 @@ public String[] queueNames(final String host, final String port, final String br
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getQueueNames);
}

@Override
public Map<String, Long> queueMessageCount(final String host, final String port, final String brokerName, final String[] queueNames) throws Exception {
return jmxProcessor.processQueues(host, port, brokerName, queueNames, unchecked(DestinationControl::getMessageCount));
}

@Override
public String[] topicNames(final String host, final String port, final String brokerName) throws Exception {
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getTopicNames);
}

@Override
public Map<String, Long> topicMessageCount(final String host, final String port, final String brokerName, final String[] topicNames) throws Exception {
return jmxProcessor.processTopics(host, port, brokerName, topicNames, unchecked(DestinationControl::getMessageCount));
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package uk.gov.justice.artemis.manager.connector;

import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;
import static pl.touk.throwing.ThrowingFunction.unchecked;

import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.activemq.artemis.api.jms.management.DestinationControl;
import org.apache.activemq.artemis.api.jms.management.JMSServerControl;

import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;

public class JmxArtemisConnector implements ArtemisConnector {

private final JmxProcessor jmxProcessor = new JmxProcessor();
Expand All @@ -34,8 +38,18 @@ public String[] queueNames(final String host, final String port, final String br
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getQueueNames);
}

@Override
public Map<String, Long> queueMessageCount(final String host, final String port, final String brokerName, final String[] queueNames) throws Exception {
return jmxProcessor.processQueues(host, port, brokerName, queueNames, unchecked(DestinationControl::getMessageCount));
}

@Override
public String[] topicNames(final String host, final String port, final String brokerName) throws Exception {
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getTopicNames);
}

@Override
public Map<String, Long> topicMessageCount(final String host, final String port, final String brokerName, final String[] topicNames) throws Exception {
return jmxProcessor.processTopics(host, port, brokerName, topicNames, unchecked(DestinationControl::getMessageCount));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toMap;
import static javax.management.MBeanServerInvocationHandler.newProxyInstance;
import static javax.management.remote.JMXConnectorFactory.connect;
import static org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration.getDefaultJmxDomain;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.function.Function;

import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
Expand All @@ -15,6 +19,7 @@
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
import org.apache.activemq.artemis.api.jms.management.TopicControl;

public class JmxProcessor {

Expand All @@ -36,12 +41,44 @@ public <T> T processQueueControl(final String host,
public <T> T processServerControl(final String host,
final String port,
final String brokerName,
final JmxServerControlFunction<T> jmxServerControlFunction) throws Exception {
final Function<JMSServerControl, T> fn) throws Exception {

try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSServerControl serverControl = serverControlOf(connector, brokerName);

return jmxServerControlFunction.apply(serverControl);
return fn.apply(serverControl);
}
}

public <T> Map<String, T> processQueues(final String host,
final String port,
final String brokerName,
final String[] destinations,
final Function<JMSQueueControl, T> fn) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
return Arrays.stream(destinations).collect(toMap(Function.identity(), destination -> {
try {
return processQueueControl(connector, brokerName, destination, fn);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
}

public <T> Map<String, T> processTopics(final String host,
final String port,
final String brokerName,
final String[] destinations,
final Function<TopicControl, T> fn) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
return Arrays.stream(destinations).collect(toMap(Function.identity(), destination -> {
try {
return processTopicControl(connector, brokerName, destination, fn);
} catch (Exception e) {
throw new RuntimeException(e);
}
}));
}
}

Expand All @@ -54,8 +91,29 @@ private JMSQueueControl queueControlOf(final JMXConnector connector, final Strin
return newProxyInstance(connector.getMBeanServerConnection(), on, JMSQueueControl.class, false);
}

private TopicControl topicControlOf(final JMXConnector connector, final String brokerName, final String destinationName) throws Exception {
final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSTopicObjectName(destinationName);
return newProxyInstance(connector.getMBeanServerConnection(), on, TopicControl.class, false);
}

private JMSServerControl serverControlOf(final JMXConnector connector, final String brokerName) throws Exception {
final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSServerObjectName();
return newProxyInstance(connector.getMBeanServerConnection(), on, JMSServerControl.class, false);
}
}

private <T> T processQueueControl(final JMXConnector connector,
final String brokerName,
final String destination,
final Function<JMSQueueControl, T> fn) throws Exception {
final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destination);
return fn.apply(queueControl);
}

private <T> T processTopicControl(final JMXConnector connector,
final String brokerName,
final String destination,
final Function<TopicControl, T> fn) throws Exception {
final TopicControl topicControl = topicControlOf(connector, brokerName, destination);
return fn.apply(topicControl);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
public class ListQueues extends AbstractArtemisCommand implements ShellCommand {

@Override
public void run(String[] args) {
public void run(final String[] args) {

try {
final String[] queues = artemisConnector.queueNames(host, port, brokerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
public class ListTopics extends AbstractArtemisCommand implements ShellCommand {

@Override
public void run(String[] args) {
public void run(final String[] args) {

try {
final String[] queues = artemisConnector.topicNames(host, port, brokerName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package uk.gov.justice.framework.tools.command;

import java.util.Map;

import uk.gov.justice.framework.tools.common.command.ShellCommand;

public class QueueMessageCounts extends AbstractArtemisCommand implements ShellCommand {

@Override
public void run(final String[] args) {

try {
final String[] queues = artemisConnector.queueNames(host, port, brokerName);
final Map<String, Long> counts = artemisConnector.queueMessageCount(host, port, brokerName, queues);
outputPrinter.writeMap(counts, "messageCount");
} catch (final Exception exception) {
outputPrinter.writeStackTrace(exception);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package uk.gov.justice.framework.tools.command;

import java.util.Map;

import uk.gov.justice.framework.tools.common.command.ShellCommand;

public class TopicMessageCounts extends AbstractArtemisCommand implements ShellCommand {

@Override
public void run(final String[] args) {

try {
final String[] topics = artemisConnector.topicNames(host, port, brokerName);
final Map<String, Long> counts = artemisConnector.topicMessageCount(host, port, brokerName, topics);
outputPrinter.writeMap(counts, "messageCount");
} catch (final Exception exception) {
outputPrinter.writeStackTrace(exception);
}
}

}
18 changes: 18 additions & 0 deletions src/main/java/uk/gov/justice/output/ConsolePrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
import uk.gov.justice.artemis.manager.connector.MessageData;

import java.util.List;
import java.util.Map;

import javax.json.Json;
import javax.json.JsonArrayBuilder;
import javax.json.JsonObject;
import javax.json.JsonObjectBuilder;

public class ConsolePrinter implements OutputPrinter {
Expand All @@ -25,6 +27,22 @@ public void writeStringArray(final String[] items) {
System.out.println(jsonStringOf(items));
}

private JsonObject toJsonObject(Map.Entry<String, Long> entry, String valueName) {
final JsonObjectBuilder builder = Json.createObjectBuilder();
builder.add("name", entry.getKey());
builder.add(valueName, entry.getValue());
return builder.build();
}

@Override
public void writeMap(final Map<String, Long> map, String valueName) {
final JsonArrayBuilder arrayBuilder = Json.createArrayBuilder();
map.entrySet().stream().sorted(Map.Entry.comparingByKey()).forEach(
entry -> arrayBuilder.add(this.toJsonObject(entry, valueName))
);
System.out.println(arrayBuilder.build().toString());
}

@Override
public void writeMessages(final List<MessageData> messageData) {
System.out.println(jsonStringOf(messageData));
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/uk/gov/justice/output/OutputPrinter.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import uk.gov.justice.artemis.manager.connector.MessageData;

import java.util.List;
import java.util.Map;

public interface OutputPrinter {

Expand All @@ -25,6 +26,11 @@ public interface OutputPrinter {
*/
public void writeStringArray(final String[] items);

/**
* Writes a list of Strings to the output in JSON Array style
* @param items - the list of items to be written
*/
public void writeMap(Map<String, Long> map, String valueName);
/**
* Writes a list of {@link MessageData} content to the output.
* @param messageData - the list of data to be written.
Expand Down
Loading

0 comments on commit ebcabff

Please sign in to comment.