Skip to content

Commit

Permalink
Merge 62110c2 into a0f21dd
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Aug 15, 2018
2 parents a0f21dd + 62110c2 commit 5253b78
Show file tree
Hide file tree
Showing 15 changed files with 361 additions and 184 deletions.
Original file line number Diff line number Diff line change
@@ -1,122 +1,37 @@
package uk.gov.justice.artemis.manager.connector;

import static java.lang.String.format;
import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.remote.JMXConnector;

import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;

/**
* reprocess, remove and messagesOf were re-implemented in JMS due to issues with large messages over JMX.
*
* reprocess, remove and messagesOf were re-implemented in JMS due to issues with large messages
* over JMX.
*/
public class CombinedJmsAndJmxArtemisConnector extends JmxArtemisConnector {

private static final String JMS_URL = "tcp://%s:%s";
private static final String JMS_ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS";
private static final String ID_PREFIX = "ID:";
private static final String BLANK = "";
private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}";

private Function<JMSQueueControl, Function<Iterator<String>, Long>> removeMessages = queueControl -> msgIds -> {
long removedMessages = 0L;

while (msgIds.hasNext()) {
try {
queueControl.removeMessage(format("ID:%s", msgIds.next()));
removedMessages++;
} catch (final Exception exception) {
outputPrinter.writeException(exception);
}
}

return removedMessages;
};

private Function<JMSQueueControl, Function<Iterator<String>, Long>> reprocessMessages = queueControl -> msgIds -> {
long reprocessedMessages = 0L;

while (msgIds.hasNext()) {
try {
final String nextId = msgIds.next();
if (queueControl.retryMessage(format("ID:%s", nextId))) {
reprocessedMessages++;
} else {
outputPrinter.writeException(new RuntimeException(format("Skipped retrying of message id %s as it does not exist", nextId)));
}
} catch (final Exception exception) {
outputPrinter.writeException(exception);
}
}
return reprocessedMessages;
};

private final JmxProcessor jmxProcessor = new JmxProcessor();
private final JmxManagement jmxManagement = new JmxManagement(new ConsolePrinter());
private final JmsProcessor jmsProcessor = new JmsProcessor();
private final JmsManagement jmsManagement = new JmsManagement();

@Override
public List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {

final Queue queue = ActiveMQJMSClient.createQueue(destinationName);

try (final ActiveMQQueueConnectionFactory connectionFactory = new ActiveMQQueueConnectionFactory(format(JMS_URL, host, port));
final QueueConnection queueConnection = connectionFactory.createQueueConnection();
final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
final QueueBrowser queueBrowser = queueSession.createBrowser(queue)) {

final Enumeration browserEnumeration = queueBrowser.getEnumeration();

final ArrayList<MessageData> messages = new ArrayList<>();

while (browserEnumeration.hasMoreElements()) {
final Message message = (Message) browserEnumeration.nextElement();

final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK);
final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION);
final String text;

if (message instanceof TextMessage) {
final TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
} else {
text = UNSUPPORTED_MESSAGE_CONTENT;
}

messages.add(new MessageData(jmsMessageID, originalDestination, text));
}

return messages;
}
return jmsProcessor.process(host, port, destinationName, jmsManagement.browseMessages());
}

@Override
public long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
return processJmxFunction(host, port, brokerName, destinationName, msgIds, removeMessages);
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.removeMessages(msgIds));
}

@Override
public long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
return processJmxFunction(host, port, brokerName, destinationName, msgIds, reprocessMessages);
}

private long processJmxFunction(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds, final Function<JMSQueueControl, Function<Iterator<String>, Long>> processMessages) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName);

return processMessages.apply(queueControl).apply(msgIds);
}
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.reprocessMessages(msgIds));
}
}
Original file line number Diff line number Diff line change
@@ -1,113 +1,41 @@
package uk.gov.justice.artemis.manager.connector;

import static java.lang.String.format;
import static java.util.Arrays.stream;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static javax.management.MBeanServerInvocationHandler.newProxyInstance;
import static javax.management.remote.JMXConnectorFactory.connect;
import static org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration.getDefaultJmxDomain;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
import org.apache.activemq.artemis.api.jms.management.JMSServerControl;

import uk.gov.justice.output.ConsolePrinter;
import uk.gov.justice.output.OutputPrinter;

public class JmxArtemisConnector implements ArtemisConnector {

final protected OutputPrinter outputPrinter = new ConsolePrinter();

private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi";
private static final String JMS_MESSAGE_ID = "JMSMessageID";
private static final String ORIGINAL_DESTINATION = "OriginalDestination";
private static final String TEXT = "Text";
private final JmxProcessor jmxProcessor = new JmxProcessor();
private final JmxManagement jmxManagement = new JmxManagement(new ConsolePrinter());

@Override
public List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final CompositeData[] browseResult = queueControlOf(connector, brokerName, destinationName).browse();
return stream(browseResult)
.map(cd -> new MessageData(String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""), String.valueOf(cd.get(ORIGINAL_DESTINATION)), String.valueOf(cd.get(TEXT))))
.collect(toList());
}

return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.browseMessages());
}

@Override
public long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName);
long removedMessages = 0;
while (msgIds.hasNext()) {
try {
queueControl.removeMessage(format("ID:%s", msgIds.next()));
removedMessages++;
} catch (final IllegalArgumentException exception) {
outputPrinter.writeException(exception);
}
}
return removedMessages;
}
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.removeMessages(msgIds));
}

@Override
public long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName);
long reprocessedMessages = 0;
while (msgIds.hasNext()) {
try {
final String nextId = msgIds.next();
if (queueControl.retryMessage(format("ID:%s", nextId))) {
reprocessedMessages++;
} else {
outputPrinter.writeException(new RuntimeException(format("Skipped retrying of message id %s as it does not exist", nextId)));
}
} catch (final IllegalArgumentException exception) {
outputPrinter.writeException(exception);
}
}
return reprocessedMessages;
}
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.reprocessMessages(msgIds));
}

@Override
public String[] queueNames(final String host, final String port, final String brokerName) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSServerControl serverControl = serverControlOf(connector, brokerName);
return serverControl.getQueueNames();
}
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getQueueNames);
}

@Override
public String[] topicNames(final String host, final String port, final String brokerName) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSServerControl serverControl = serverControlOf(connector, brokerName);
return serverControl.getTopicNames();
}
}

protected JMXConnector getJMXConnector(final String host, final String port) throws IOException {
return connect(new JMXServiceURL(format(JMX_URL, host, port)), emptyMap());
}

protected JMSServerControl serverControlOf(final JMXConnector connector, final String brokerName) throws Exception {
final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSServerObjectName();
return newProxyInstance(connector.getMBeanServerConnection(), on, JMSServerControl.class, false);
}

protected JMSQueueControl queueControlOf(final JMXConnector connector, final String brokerName, final String destinationName) throws Exception {
final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSQueueObjectName(destinationName);
return newProxyInstance(connector.getMBeanServerConnection(), on, JMSQueueControl.class, false);
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getTopicNames);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package uk.gov.justice.artemis.manager.connector.jms;

import uk.gov.justice.artemis.manager.connector.MessageData;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

public class JmsManagement {

private static final String JMS_ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS";
private static final String ID_PREFIX = "ID:";
private static final String BLANK = "";
private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}";

public JmsManagementFunction<List<MessageData>> browseMessages() {
return queueBrowser -> {
try {
final Enumeration browserEnumeration = queueBrowser.getEnumeration();

final ArrayList<MessageData> messages = new ArrayList<>();

while (browserEnumeration.hasMoreElements()) {
final Message message = (Message) browserEnumeration.nextElement();

final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK);
final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION);
final String text;

if (message instanceof TextMessage) {
final TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
} else {
text = UNSUPPORTED_MESSAGE_CONTENT;
}

messages.add(new MessageData(jmsMessageID, originalDestination, text));
}

return messages;
} catch (final JMSException exception) {
throw new JmsManagementFunctionFailedException("JMS Browse messages failed.", exception);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package uk.gov.justice.artemis.manager.connector.jms;

import javax.jms.QueueBrowser;

@FunctionalInterface
public interface JmsManagementFunction<T> {

T apply(final QueueBrowser queueBrowser);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package uk.gov.justice.artemis.manager.connector.jms;

public class JmsManagementFunctionFailedException extends RuntimeException {

public JmsManagementFunctionFailedException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package uk.gov.justice.artemis.manager.connector.jms;

import static java.lang.String.format;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;

public class JmsProcessor {

private static final String JMS_URL = "tcp://%s:%s";

public <T> T process(final String host,
final String port,
final String destinationName,
final JmsManagementFunction<T> jmsManagementFunction) throws JMSException {

final Queue queue = ActiveMQJMSClient.createQueue(destinationName);

try (final ActiveMQQueueConnectionFactory connectionFactory = new ActiveMQQueueConnectionFactory(format(JMS_URL, host, port));
final QueueConnection queueConnection = connectionFactory.createQueueConnection();
final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
final QueueBrowser queueBrowser = queueSession.createBrowser(queue)) {

return jmsManagementFunction.apply(queueBrowser);
}
}
}
Loading

0 comments on commit 5253b78

Please sign in to comment.