Skip to content

Commit

Permalink
Refactor for better reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Sep 19, 2017
1 parent c26f368 commit 39d9c36
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 166 deletions.
Original file line number Diff line number Diff line change
@@ -1,133 +1,36 @@
package uk.gov.justice.artemis.manager.connector;

import static java.lang.String.format;
import static java.util.Collections.emptyMap;
import static javax.management.MBeanServerInvocationHandler.newProxyInstance;
import static javax.management.remote.JMXConnectorFactory.connect;
import static org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration.getDefaultJmxDomain;

import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;
import uk.gov.justice.output.OutputPrinter;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;

public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector {

private static final String JMS_URL = "tcp://%s:%s";
private static final String JMS_ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS";
private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi";
private static final String ID_PREFIX = "ID:";
private static final String BLANK = "";
private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}";
private final JmxProcessor jmxProcessor = new JmxProcessor();
private final JmxManagement jmxManagement = new JmxManagement();
private final JmsProcessor jmsProcessor = new JmsProcessor();
private final JmsManagement jmsManagement = new JmsManagement();

final OutputPrinter outputPrinter = new ConsolePrinter();

private Function<JMSQueueControl, Function<Iterator<String>, Long>> removeMessages = queueControl -> msgIds -> {
long removedMessages = 0L;

while (msgIds.hasNext()) {
try {
queueControl.removeMessage(format("ID:%s", msgIds.next()));
removedMessages++;
} catch (final Exception exception) {
outputPrinter.writeException(exception);
}
}

return removedMessages;
};

private Function<JMSQueueControl, Function<Iterator<String>, Long>> reprocessMessages = queueControl -> msgIds -> {
long reprocessedMessages = 0L;

while (msgIds.hasNext()) {
try {
final String nextId = msgIds.next();
if (queueControl.retryMessage(format("ID:%s", nextId))) {
reprocessedMessages++;
} else {
outputPrinter.writeException(new RuntimeException(format("Skipped retrying of message id %s as it does not exist", nextId)));
}
} catch (final Exception exception) {
outputPrinter.writeException(exception);
}
}
return reprocessedMessages;
};


@Override
public List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {

final Queue queue = ActiveMQJMSClient.createQueue(destinationName);

try (final ActiveMQQueueConnectionFactory connectionFactory = new ActiveMQQueueConnectionFactory(format(JMS_URL, host, port));
final QueueConnection queueConnection = connectionFactory.createQueueConnection();
final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
final QueueBrowser queueBrowser = queueSession.createBrowser(queue)) {

final Enumeration browserEnumeration = queueBrowser.getEnumeration();

final ArrayList<MessageData> messages = new ArrayList<>();

while (browserEnumeration.hasMoreElements()) {
final Message message = (Message) browserEnumeration.nextElement();

final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK);
final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION);
final String text;

if (message instanceof TextMessage) {
final TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
} else {
text = UNSUPPORTED_MESSAGE_CONTENT;
}

messages.add(new MessageData(jmsMessageID, originalDestination, text));
}

return messages;
}
return jmsProcessor.process(host, port, destinationName, jmsManagement.browseMessages());
}

@Override
public long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
return processJmxFunction(host, port, brokerName, destinationName, msgIds, removeMessages);
return jmxProcessor.process(host, port, brokerName, destinationName, jmxManagement.removeMessages(msgIds, outputPrinter));
}

@Override
public long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
return processJmxFunction(host, port, brokerName, destinationName, msgIds, reprocessMessages);
}

private long processJmxFunction(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds, final Function<JMSQueueControl, Function<Iterator<String>, Long>> processMessages) throws Exception {
final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSQueueObjectName(destinationName);

try (final JMXConnector connector = connect(new JMXServiceURL(format(JMX_URL, host, port)), emptyMap())) {
final JMSQueueControl queueControl = newProxyInstance(connector.getMBeanServerConnection(), on, JMSQueueControl.class, false);

return processMessages.apply(queueControl).apply(msgIds);
}
return jmxProcessor.process(host, port, brokerName, destinationName, jmxManagement.reprocessMessages(msgIds, outputPrinter));
}
}
Original file line number Diff line number Diff line change
@@ -1,82 +1,32 @@
package uk.gov.justice.artemis.manager.connector;

import static java.lang.String.format;
import static java.util.Arrays.stream;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static javax.management.MBeanServerInvocationHandler.newProxyInstance;
import static javax.management.remote.JMXConnectorFactory.connect;
import static org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration.getDefaultJmxDomain;

import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;
import uk.gov.justice.output.OutputPrinter;

import java.util.Iterator;
import java.util.List;

import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;

public class JmxArtemisConnector implements ArtemisConnector {

private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi";
private static final String JMS_MESSAGE_ID = "JMSMessageID";
private static final String ORIGINAL_DESTINATION = "OriginalDestination";
private static final String TEXT = "Text";

final OutputPrinter outputPrinter = new ConsolePrinter();

private final JmxProcessor jmxProcessor = new JmxProcessor();
private final JmxManagement jmxManagement = new JmxManagement();

@Override
public List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {
final CompositeData[] browseResult = queueControlOf(host, port, brokerName, destinationName).browse();
return stream(browseResult)
.map(cd -> new MessageData(String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""), String.valueOf(cd.get(ORIGINAL_DESTINATION)), String.valueOf(cd.get(TEXT))))
.collect(toList());

return jmxProcessor.process(host, port, brokerName, destinationName, jmxManagement.browseMessages());
}

@Override
public long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
final JMSQueueControl queueControl = queueControlOf(host, port, brokerName, destinationName);
long removedMessages = 0;
while (msgIds.hasNext()) {
try {
queueControl.removeMessage(format("ID:%s", msgIds.next()));
removedMessages++;
} catch (final IllegalArgumentException exception) {
outputPrinter.writeException(exception);
}
}
return removedMessages;
return jmxProcessor.process(host, port, brokerName, destinationName, jmxManagement.removeMessages(msgIds, outputPrinter));
}

@Override
public long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
final JMSQueueControl queueControl = queueControlOf(host, port, brokerName, destinationName);
long reprocessedMessages = 0;
while (msgIds.hasNext()) {
try {
final String nextId = msgIds.next();
if (queueControl.retryMessage(format("ID:%s", nextId))) {
reprocessedMessages++;
} else {
outputPrinter.writeException(new RuntimeException(format("Skipped retrying of message id %s as it does not exist", nextId)));
}
} catch (final IllegalArgumentException exception) {
outputPrinter.writeException(exception);
}
}
return reprocessedMessages;
}

private JMSQueueControl queueControlOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {
final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSQueueObjectName(destinationName);
final JMXConnector connector = connect(new JMXServiceURL(format(JMX_URL, host, port)), emptyMap());
return newProxyInstance(connector.getMBeanServerConnection(), on, JMSQueueControl.class, false);
return jmxProcessor.process(host, port, brokerName, destinationName, jmxManagement.reprocessMessages(msgIds, outputPrinter));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package uk.gov.justice.artemis.manager.connector.jms;

import uk.gov.justice.artemis.manager.connector.MessageData;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

public class JmsManagement {

private static final String JMS_ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS";
private static final String ID_PREFIX = "ID:";
private static final String BLANK = "";
private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}";

public JmsManagementFunction<List<MessageData>> browseMessages() {
return queueBrowser -> {
try {
final Enumeration browserEnumeration = queueBrowser.getEnumeration();

final ArrayList<MessageData> messages = new ArrayList<>();

while (browserEnumeration.hasMoreElements()) {
final Message message = (Message) browserEnumeration.nextElement();

final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK);
final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION);
final String text;

if (message instanceof TextMessage) {
final TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
} else {
text = UNSUPPORTED_MESSAGE_CONTENT;
}

messages.add(new MessageData(jmsMessageID, originalDestination, text));
}

return messages;
} catch (final JMSException exception) {
throw new RuntimeException(exception);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package uk.gov.justice.artemis.manager.connector.jms;

import javax.jms.QueueBrowser;

@FunctionalInterface
public interface JmsManagementFunction<T> {

T apply(final QueueBrowser queueBrowser);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package uk.gov.justice.artemis.manager.connector.jms;

import static java.lang.String.format;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;

public class JmsProcessor {

private static final String JMS_URL = "tcp://%s:%s";

public <T> T process(final String host,
final String port,
final String destinationName,
final JmsManagementFunction<T> jmsManagementFunction) throws JMSException {

final Queue queue = ActiveMQJMSClient.createQueue(destinationName);

try (final ActiveMQQueueConnectionFactory connectionFactory = new ActiveMQQueueConnectionFactory(format(JMS_URL, host, port));
final QueueConnection queueConnection = connectionFactory.createQueueConnection();
final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
final QueueBrowser queueBrowser = queueSession.createBrowser(queue)) {

return jmsManagementFunction.apply(queueBrowser);
}
}
}
Loading

0 comments on commit 39d9c36

Please sign in to comment.