Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor to remove repeated code #5

Merged
merged 1 commit into from
Aug 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,122 +1,37 @@
package uk.gov.justice.artemis.manager.connector;

import static java.lang.String.format;
import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;

import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.remote.JMXConnector;

import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;

/**
* reprocess, remove and messagesOf were re-implemented in JMS due to issues with large messages over JMX.
*
* reprocess, remove and messagesOf were re-implemented in JMS due to issues with large messages
* over JMX.
*/
public class CombinedJmsAndJmxArtemisConnector extends JmxArtemisConnector {

private static final String JMS_URL = "tcp://%s:%s";
private static final String JMS_ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS";
private static final String ID_PREFIX = "ID:";
private static final String BLANK = "";
private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}";

private Function<JMSQueueControl, Function<Iterator<String>, Long>> removeMessages = queueControl -> msgIds -> {
long removedMessages = 0L;

while (msgIds.hasNext()) {
try {
queueControl.removeMessage(format("ID:%s", msgIds.next()));
removedMessages++;
} catch (final Exception exception) {
outputPrinter.writeException(exception);
}
}

return removedMessages;
};

private Function<JMSQueueControl, Function<Iterator<String>, Long>> reprocessMessages = queueControl -> msgIds -> {
long reprocessedMessages = 0L;

while (msgIds.hasNext()) {
try {
final String nextId = msgIds.next();
if (queueControl.retryMessage(format("ID:%s", nextId))) {
reprocessedMessages++;
} else {
outputPrinter.writeException(new RuntimeException(format("Skipped retrying of message id %s as it does not exist", nextId)));
}
} catch (final Exception exception) {
outputPrinter.writeException(exception);
}
}
return reprocessedMessages;
};

private final JmxProcessor jmxProcessor = new JmxProcessor();
private final JmxManagement jmxManagement = new JmxManagement(new ConsolePrinter());
private final JmsProcessor jmsProcessor = new JmsProcessor();
private final JmsManagement jmsManagement = new JmsManagement();

@Override
public List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {

final Queue queue = ActiveMQJMSClient.createQueue(destinationName);

try (final ActiveMQQueueConnectionFactory connectionFactory = new ActiveMQQueueConnectionFactory(format(JMS_URL, host, port));
final QueueConnection queueConnection = connectionFactory.createQueueConnection();
final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
final QueueBrowser queueBrowser = queueSession.createBrowser(queue)) {

final Enumeration browserEnumeration = queueBrowser.getEnumeration();

final ArrayList<MessageData> messages = new ArrayList<>();

while (browserEnumeration.hasMoreElements()) {
final Message message = (Message) browserEnumeration.nextElement();

final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK);
final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION);
final String text;

if (message instanceof TextMessage) {
final TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
} else {
text = UNSUPPORTED_MESSAGE_CONTENT;
}

messages.add(new MessageData(jmsMessageID, originalDestination, text));
}

return messages;
}
return jmsProcessor.process(host, port, destinationName, jmsManagement.browseMessages());
}

@Override
public long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
return processJmxFunction(host, port, brokerName, destinationName, msgIds, removeMessages);
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.removeMessages(msgIds));
}

@Override
public long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
return processJmxFunction(host, port, brokerName, destinationName, msgIds, reprocessMessages);
}

private long processJmxFunction(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds, final Function<JMSQueueControl, Function<Iterator<String>, Long>> processMessages) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName);

return processMessages.apply(queueControl).apply(msgIds);
}
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.reprocessMessages(msgIds));
}
}
Original file line number Diff line number Diff line change
@@ -1,113 +1,41 @@
package uk.gov.justice.artemis.manager.connector;

import static java.lang.String.format;
import static java.util.Arrays.stream;
import static java.util.Collections.emptyMap;
import static java.util.stream.Collectors.toList;
import static javax.management.MBeanServerInvocationHandler.newProxyInstance;
import static javax.management.remote.JMXConnectorFactory.connect;
import static org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration.getDefaultJmxDomain;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor;
import uk.gov.justice.output.ConsolePrinter;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXServiceURL;

import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
import org.apache.activemq.artemis.api.jms.management.JMSServerControl;

import uk.gov.justice.output.ConsolePrinter;
import uk.gov.justice.output.OutputPrinter;

public class JmxArtemisConnector implements ArtemisConnector {

final protected OutputPrinter outputPrinter = new ConsolePrinter();

private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi";
private static final String JMS_MESSAGE_ID = "JMSMessageID";
private static final String ORIGINAL_DESTINATION = "OriginalDestination";
private static final String TEXT = "Text";
private final JmxProcessor jmxProcessor = new JmxProcessor();
private final JmxManagement jmxManagement = new JmxManagement(new ConsolePrinter());

@Override
public List<MessageData> messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final CompositeData[] browseResult = queueControlOf(connector, brokerName, destinationName).browse();
return stream(browseResult)
.map(cd -> new MessageData(String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""), String.valueOf(cd.get(ORIGINAL_DESTINATION)), String.valueOf(cd.get(TEXT))))
.collect(toList());
}

return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.browseMessages());
}

@Override
public long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName);
long removedMessages = 0;
while (msgIds.hasNext()) {
try {
queueControl.removeMessage(format("ID:%s", msgIds.next()));
removedMessages++;
} catch (final IllegalArgumentException exception) {
outputPrinter.writeException(exception);
}
}
return removedMessages;
}
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.removeMessages(msgIds));
}

@Override
public long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName);
long reprocessedMessages = 0;
while (msgIds.hasNext()) {
try {
final String nextId = msgIds.next();
if (queueControl.retryMessage(format("ID:%s", nextId))) {
reprocessedMessages++;
} else {
outputPrinter.writeException(new RuntimeException(format("Skipped retrying of message id %s as it does not exist", nextId)));
}
} catch (final IllegalArgumentException exception) {
outputPrinter.writeException(exception);
}
}
return reprocessedMessages;
}
return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.reprocessMessages(msgIds));
}

@Override
public String[] queueNames(final String host, final String port, final String brokerName) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSServerControl serverControl = serverControlOf(connector, brokerName);
return serverControl.getQueueNames();
}
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getQueueNames);
}

@Override
public String[] topicNames(final String host, final String port, final String brokerName) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSServerControl serverControl = serverControlOf(connector, brokerName);
return serverControl.getTopicNames();
}
}

protected JMXConnector getJMXConnector(final String host, final String port) throws IOException {
return connect(new JMXServiceURL(format(JMX_URL, host, port)), emptyMap());
}

protected JMSServerControl serverControlOf(final JMXConnector connector, final String brokerName) throws Exception {
final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSServerObjectName();
return newProxyInstance(connector.getMBeanServerConnection(), on, JMSServerControl.class, false);
}

protected JMSQueueControl queueControlOf(final JMXConnector connector, final String brokerName, final String destinationName) throws Exception {
final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSQueueObjectName(destinationName);
return newProxyInstance(connector.getMBeanServerConnection(), on, JMSQueueControl.class, false);
return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getTopicNames);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package uk.gov.justice.artemis.manager.connector.jms;

import uk.gov.justice.artemis.manager.connector.MessageData;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;

public class JmsManagement {

private static final String JMS_ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS";
private static final String ID_PREFIX = "ID:";
private static final String BLANK = "";
private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}";

public JmsManagementFunction<List<MessageData>> browseMessages() {
return queueBrowser -> {
try {
final Enumeration browserEnumeration = queueBrowser.getEnumeration();

final ArrayList<MessageData> messages = new ArrayList<>();

while (browserEnumeration.hasMoreElements()) {
final Message message = (Message) browserEnumeration.nextElement();

final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK);
final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION);
final String text;

if (message instanceof TextMessage) {
final TextMessage textMessage = (TextMessage) message;
text = textMessage.getText();
} else {
text = UNSUPPORTED_MESSAGE_CONTENT;
}

messages.add(new MessageData(jmsMessageID, originalDestination, text));
}

return messages;
} catch (final JMSException exception) {
throw new JmsManagementFunctionFailedException("JMS Browse messages failed.", exception);
}
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package uk.gov.justice.artemis.manager.connector.jms;

import javax.jms.QueueBrowser;

@FunctionalInterface
public interface JmsManagementFunction<T> {

T apply(final QueueBrowser queueBrowser);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package uk.gov.justice.artemis.manager.connector.jms;

public class JmsManagementFunctionFailedException extends RuntimeException {

public JmsManagementFunctionFailedException(final String message, final Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package uk.gov.justice.artemis.manager.connector.jms;

import static java.lang.String.format;

import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;

import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;

public class JmsProcessor {

private static final String JMS_URL = "tcp://%s:%s";

public <T> T process(final String host,
final String port,
final String destinationName,
final JmsManagementFunction<T> jmsManagementFunction) throws JMSException {

final Queue queue = ActiveMQJMSClient.createQueue(destinationName);

try (final ActiveMQQueueConnectionFactory connectionFactory = new ActiveMQQueueConnectionFactory(format(JMS_URL, host, port));
final QueueConnection queueConnection = connectionFactory.createQueueConnection();
final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
final QueueBrowser queueBrowser = queueSession.createBrowser(queue)) {

return jmsManagementFunction.apply(queueBrowser);
}
}
}
Loading