Skip to content

Commit

Permalink
Merge fc3f3b4 into 3a57991
Browse files Browse the repository at this point in the history
  • Loading branch information
bdellegrazie committed Sep 6, 2018
2 parents 3a57991 + fc3f3b4 commit 236a575
Show file tree
Hide file tree
Showing 34 changed files with 750 additions and 330 deletions.
52 changes: 35 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,68 @@

[![Build Status](https://travis-ci.org/CJSCommonPlatform/artemis-manager.svg?branch=master)](https://travis-ci.org/CJSCommonPlatform/artemis-manager) [![Coverage Status](https://coveralls.io/repos/github/CJSCommonPlatform/artemis-manager/badge.svg?branch=master)](https://coveralls.io/github/CJSCommonPlatform/artemis-manager?branch=master)

## Configuration

## Browse DLQ
Configuration can be supplied in a config file and passed to the application via a '@<config_file_path>' option.

* -jmxUrl: The full JMX url, can be used multiple times for clusters. (default: service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi)
* -brokerName: Name of the broker (default: "default")
* -jmxUsername: User for JMX (default none)
* -jmxPassword: Password for JMX (default none)
* -jmsUrl: The JMS url, you should add the clientID as above. You can also add sslEnabled=true to get SSL capability (default: tcp://localhost:61616?clientID=artemis-manager)
* -jmsUsername: User for JMS (default none)
* -jmsPassword: Password for JMS (default none)

A complicated example configuration file might look like:

**Note: Browse uses JMS tcp port to connect to the Artemis broker.**
Assuming two brokers on 192.168.0.10 and 192.168.0.11, with JMX on 1098 and OpenWire on 61616 (but no security)

_java -jar artemis-manager.jar browse -host localhost -port 61616 -brokerName default_
```
-jmxUrl service:jmx:rmi:///jndi/rmi://192.168.0.10:1098/jmxrmi
-jmxUrl service:jmx:rmi:///jndi/rmi://192.168.0.11:1098/jmxrmi
-jmsUrl tcp://(192.168.0.10:61616,192.168.0.11:61616)?clientID=artemis-manager&sslEnabled=true
```

* port - JMS tcp port
* brokerName - name of the broker configured in the broker.xml, use _default_ if no specific broker configuration provided
In the examples below it is assumed a configuration file of artemis.config has been created like that shown above

## Browse DLQ

**Note: Browse uses JMS to connect to the Artemis broker.**

`java -jar artemis-manager.jar browse @artemis.config`

## Remove Message from DLQ

* Remove message by id

**Note: Remove uses JMX port to connect to the Artemis broker.**
**Note: Remove uses JMX to connect to the Artemis broker.**

_java -jar artemis-manager.jar remove -host localhost -port 3000 -brokerName default -msgId 12d8e63e-c842-11e6-986d-00e1000074d2_

* port - JMX port
* brokerName - name of the broker configured in the broker.xml, use _default_ if no specific broker configuration provided
`java -jar artemis-manager.jar remove @artemis.config -msgId 12d8e63e-c842-11e6-986d-00e1000074d2`

* Remove multiple messages (provide list of message ids on input)

_echo msgId1 msgId2 | java -jar artemis-manager.jar remove -host localhost -port 3000 -brokerName default_
`echo msgId1 msgId2 | java -jar artemis-manager.jar remove`

## Reprocess Message from DLQ

* Reprocess message by id

**Note: Reprocess uses JMX port to connect to the Artemis broker.**

_java -jar artemis-manager.jar reprocess -host localhost -port 3000 -brokerName default -msgId 12d8e63e-c842-11e6-986d-00e1000074d2_

* port - JMX port
* brokerName - name of the broker configured in the broker.xml, use _default_ if no specific broker configuration provided
`java -jar artemis-manager.jar reprocess @artemis.config -msgId 12d8e63e-c842-11e6-986d-00e1000074d2`

* Reprocess multiple messages (provide list of message ids on input)

_echo msgId1 msgId2 | java -jar artemis-manager.jar reprocess -host localhost -port 3000 -brokerName default_
`echo msgId1 msgId2 | java -jar artemis-manager.jar reprocess @artemis.config`

## Chaining Commands

* Chaining commands

This will remove all messages from DLQ that have been originally sent to the queue abracadabra

_java -jar target/artemis-manager.jar browse -host localhost -port 3000 -brokerName default | jgrep originalDestination=jms.queue.abracadabra -s msgId | java -jar target/artemis-manager.jar remove -host localhost -port 3000 -brokerName default_
```
java -jar target/artemis-manager.jar browse @artemis.config |\
jgrep originalDestination=jms.queue.abracadabra -s msgId |\
java -jar target/artemis-manager.jar remove @artemis.config
```
4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,10 @@
<configuration>
<ignore>false</ignore>
<home>${artemis.home}</home>
<javaOptions>-Dcom.sun.management.jmxremote \
<javaOptions>-Dcom.sun.management.jmxremote=true \
-Dcom.sun.management.jmxremote.rmi.port=3000 \
-Dcom.sun.management.jmxremote.port=3000 \
-Djava.rmi.server.hostname=localhost \
-Dcom.sun.management.jmxremote.ssl=false \
-Dcom.sun.management.jmxremote.authenticate=false
</javaOptions>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
package uk.gov.justice.artemis.manager.connector;

import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public interface ArtemisConnector {

List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception;
List<MessageData> messagesOf(final String destinationName);

long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception;
long remove(final String destinationName, final Iterator<String> msgIds);

long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception;
long reprocess(final String destinationName, final Iterator<String> msgIds);

String[] queueNames(final String host, final String port, final String brokerName) throws Exception;
List<String> queueNames();

String[] topicNames(final String host, final String port, final String brokerName) throws Exception;
List<String> topicNames();

Map<String, Long> queueMessageCount(final String host, final String port, final String brokerName, final String[] queueNames) throws Exception;
Map<String, Long> queueMessageCount(final Collection<String> queueNames);

Map<String, Long> topicMessageCount(final String host, final String port, final String brokerName, final String[] topicNames) throws Exception;
Map<String, Long> topicMessageCount(final Collection<String> queueNames);

void setParameters(final List<String> jmxUrls,
final String brokerName,
final String jmxUsername,
final String jmxPassword,
final String jmsUrl,
final String jmsUsername,
final String jmsPassword) throws MalformedURLException;
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,32 @@
package uk.gov.justice.artemis.manager.connector;

import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.summingLong;
import static java.util.stream.Collectors.toList;
import static pl.touk.throwing.ThrowingFunction.unchecked;

import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import javax.management.remote.JMXConnector;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.jms.management.DestinationControl;
import org.apache.activemq.artemis.api.jms.management.JMSServerControl;

import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;

/**
* reprocess, remove and messagesOf were re-implemented in JMS due to issues with large messages
Expand All @@ -26,38 +39,104 @@ public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector {
private final JmsProcessor jmsProcessor = new JmsProcessor();
private final JmsManagement jmsManagement = new JmsManagement();

private List<JMXServiceURL> jmxServiceUrls;
private Map<String,String[]> jmxEnvironment;
private ObjectNameBuilder objectNameBuilder;

private ActiveMQJMSConnectionFactory jmsFactory;

@Override
public void setParameters(final List<String> jmxUrls,
final String brokerName,
final String jmxUsername,
final String jmxPassword,
final String jmsUrl,
final String jmsUsername,
final String jmsPassword) {
this.jmxServiceUrls = jmxProcessor.processJmxUrls(jmxUrls);
this.objectNameBuilder = jmxProcessor.getObjectNameBuilder(brokerName);

if ((jmxUsername != null) && (jmxPassword != null)) {
this.jmxEnvironment = new HashMap<>();
this.jmxEnvironment.put(JMXConnector.CREDENTIALS, new String[]{ jmxUsername, jmxPassword });
} else {
this.jmxEnvironment = emptyMap();
}

if ((jmsUsername != null) && (jmsPassword != null)) {
this.jmsFactory = new ActiveMQJMSConnectionFactory(jmsUrl, jmsUsername, jmsPassword);
} else {
this.jmsFactory = new ActiveMQJMSConnectionFactory(jmsUrl);
}
}

@Override
public List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {
return jmsProcessor.process(host, port, destinationName, jmsManagement.browseMessages());
public List<MessageData> messagesOf(final String destinationName) {
return jmsProcessor.process(this.jmsFactory, destinationName, jmsManagement.browseMessages());
}

@Override
public long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.removeMessages(msgIds));
public long remove(final String destinationName, final Iterator<String> msgIds) {
return jmxProcessor.processQueueControl(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
destinationName,
jmxManagement.removeMessages(msgIds)).mapToLong(Long::longValue).sum();
}

@Override
public long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.reprocessMessages(msgIds));
public long reprocess(final String destinationName, final Iterator<String> msgIds) {
return jmxProcessor.processQueueControl(
this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
destinationName,
jmxManagement.reprocessMessages(msgIds)).mapToLong(Long::longValue).sum();
}

@Override
public String[] queueNames(final String host, final String port, final String brokerName) throws Exception {
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getQueueNames);
public List<String> queueNames() {
return jmxProcessor.processServerControl(
this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
JMSServerControl::getQueueNames).flatMap(
Arrays::stream).sorted().
distinct().collect(toList());
}


@Override
public Map<String, Long> queueMessageCount(final String host, final String port, final String brokerName, final String[] queueNames) throws Exception {
return jmxProcessor.processQueues(host, port, brokerName, queueNames, unchecked(DestinationControl::getMessageCount));
public Map<String, Long> queueMessageCount(final Collection<String> queueNames) {
return jmxProcessor.processQueues(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
queueNames,
unchecked(DestinationControl::getMessageCount)).flatMap(
m -> m.entrySet().stream()).collect(
groupingBy(Entry::getKey,
summingLong(Entry::getValue)));
}

@Override
public String[] topicNames(final String host, final String port, final String brokerName) throws Exception {
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getTopicNames);
public List<String> topicNames() {
return jmxProcessor.processServerControl(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
JMSServerControl::getTopicNames).flatMap(
Arrays::stream).sorted().
distinct().collect(toList());
}

@Override
public Map<String, Long> topicMessageCount(final String host, final String port, final String brokerName, final String[] topicNames) throws Exception {
return jmxProcessor.processTopics(host, port, brokerName, topicNames, unchecked(DestinationControl::getMessageCount));
public Map<String, Long> topicMessageCount(final Collection<String> topicNames) {
return jmxProcessor.processTopics(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
topicNames,
unchecked(DestinationControl::getMessageCount)).flatMap(
m -> m.entrySet().stream()).collect(
groupingBy(Entry::getKey,
summingLong(Entry::getValue)));
}
}
Loading

0 comments on commit 236a575

Please sign in to comment.