diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java b/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java index a3442a4..d14522e 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java @@ -1,122 +1,37 @@ package uk.gov.justice.artemis.manager.connector; -import static java.lang.String.format; +import uk.gov.justice.artemis.manager.connector.jms.JmsManagement; +import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor; +import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement; +import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor; +import uk.gov.justice.output.ConsolePrinter; -import java.util.ArrayList; -import java.util.Enumeration; import java.util.Iterator; import java.util.List; -import java.util.function.Function; - -import javax.jms.Message; -import javax.jms.Queue; -import javax.jms.QueueBrowser; -import javax.jms.QueueConnection; -import javax.jms.QueueSession; -import javax.jms.Session; -import javax.jms.TextMessage; -import javax.management.remote.JMXConnector; - -import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; -import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; -import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory; /** - * reprocess, remove and messagesOf were re-implemented in JMS due to issues with large messages over JMX. - * + * reprocess, remove and messagesOf were re-implemented in JMS due to issues with large messages + * over JMX. */ public class CombinedJmsAndJmxArtemisConnector extends JmxArtemisConnector { - private static final String JMS_URL = "tcp://%s:%s"; - private static final String JMS_ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS"; - private static final String ID_PREFIX = "ID:"; - private static final String BLANK = ""; - private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}"; - - private Function, Long>> removeMessages = queueControl -> msgIds -> { - long removedMessages = 0L; - - while (msgIds.hasNext()) { - try { - queueControl.removeMessage(format("ID:%s", msgIds.next())); - removedMessages++; - } catch (final Exception exception) { - outputPrinter.writeException(exception); - } - } - - return removedMessages; - }; - - private Function, Long>> reprocessMessages = queueControl -> msgIds -> { - long reprocessedMessages = 0L; - - while (msgIds.hasNext()) { - try { - final String nextId = msgIds.next(); - if (queueControl.retryMessage(format("ID:%s", nextId))) { - reprocessedMessages++; - } else { - outputPrinter.writeException(new RuntimeException(format("Skipped retrying of message id %s as it does not exist", nextId))); - } - } catch (final Exception exception) { - outputPrinter.writeException(exception); - } - } - return reprocessedMessages; - }; - + private final JmxProcessor jmxProcessor = new JmxProcessor(); + private final JmxManagement jmxManagement = new JmxManagement(new ConsolePrinter()); + private final JmsProcessor jmsProcessor = new JmsProcessor(); + private final JmsManagement jmsManagement = new JmsManagement(); @Override public List messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception { - - final Queue queue = ActiveMQJMSClient.createQueue(destinationName); - - try (final ActiveMQQueueConnectionFactory connectionFactory = new ActiveMQQueueConnectionFactory(format(JMS_URL, host, port)); - final QueueConnection queueConnection = connectionFactory.createQueueConnection(); - final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); - final QueueBrowser queueBrowser = queueSession.createBrowser(queue)) { - - final Enumeration browserEnumeration = queueBrowser.getEnumeration(); - - final ArrayList messages = new ArrayList<>(); - - while (browserEnumeration.hasMoreElements()) { - final Message message = (Message) browserEnumeration.nextElement(); - - final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK); - final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION); - final String text; - - if (message instanceof TextMessage) { - final TextMessage textMessage = (TextMessage) message; - text = textMessage.getText(); - } else { - text = UNSUPPORTED_MESSAGE_CONTENT; - } - - messages.add(new MessageData(jmsMessageID, originalDestination, text)); - } - - return messages; - } + return jmsProcessor.process(host, port, destinationName, jmsManagement.browseMessages()); } @Override public long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator msgIds) throws Exception { - return processJmxFunction(host, port, brokerName, destinationName, msgIds, removeMessages); + return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.removeMessages(msgIds)); } @Override public long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator msgIds) throws Exception { - return processJmxFunction(host, port, brokerName, destinationName, msgIds, reprocessMessages); - } - - private long processJmxFunction(final String host, final String port, final String brokerName, final String destinationName, final Iterator msgIds, final Function, Long>> processMessages) throws Exception { - try (final JMXConnector connector = getJMXConnector(host, port)) { - final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName); - - return processMessages.apply(queueControl).apply(msgIds); - } + return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.reprocessMessages(msgIds)); } } \ No newline at end of file diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java b/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java index 2a0f2ed..a23d00f 100644 --- a/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java @@ -1,113 +1,41 @@ package uk.gov.justice.artemis.manager.connector; -import static java.lang.String.format; -import static java.util.Arrays.stream; -import static java.util.Collections.emptyMap; -import static java.util.stream.Collectors.toList; -import static javax.management.MBeanServerInvocationHandler.newProxyInstance; -import static javax.management.remote.JMXConnectorFactory.connect; -import static org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration.getDefaultJmxDomain; +import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement; +import uk.gov.justice.artemis.manager.connector.jmx.JmxProcessor; +import uk.gov.justice.output.ConsolePrinter; -import java.io.IOException; import java.util.Iterator; import java.util.List; -import javax.management.ObjectName; -import javax.management.openmbean.CompositeData; -import javax.management.remote.JMXConnector; -import javax.management.remote.JMXServiceURL; - -import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; -import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; import org.apache.activemq.artemis.api.jms.management.JMSServerControl; -import uk.gov.justice.output.ConsolePrinter; -import uk.gov.justice.output.OutputPrinter; - public class JmxArtemisConnector implements ArtemisConnector { - final protected OutputPrinter outputPrinter = new ConsolePrinter(); - - private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi"; - private static final String JMS_MESSAGE_ID = "JMSMessageID"; - private static final String ORIGINAL_DESTINATION = "OriginalDestination"; - private static final String TEXT = "Text"; + private final JmxProcessor jmxProcessor = new JmxProcessor(); + private final JmxManagement jmxManagement = new JmxManagement(new ConsolePrinter()); @Override public List messagesOf(final String host, final String port, final String brokerName, final String destinationName) throws Exception { - try (final JMXConnector connector = getJMXConnector(host, port)) { - final CompositeData[] browseResult = queueControlOf(connector, brokerName, destinationName).browse(); - return stream(browseResult) - .map(cd -> new MessageData(String.valueOf(cd.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""), String.valueOf(cd.get(ORIGINAL_DESTINATION)), String.valueOf(cd.get(TEXT)))) - .collect(toList()); - } - + return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.browseMessages()); } @Override public long remove(final String host, final String port, final String brokerName, final String destinationName, final Iterator msgIds) throws Exception { - try (final JMXConnector connector = getJMXConnector(host, port)) { - final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName); - long removedMessages = 0; - while (msgIds.hasNext()) { - try { - queueControl.removeMessage(format("ID:%s", msgIds.next())); - removedMessages++; - } catch (final IllegalArgumentException exception) { - outputPrinter.writeException(exception); - } - } - return removedMessages; - } + return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.removeMessages(msgIds)); } @Override public long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator msgIds) throws Exception { - try (final JMXConnector connector = getJMXConnector(host, port)) { - final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName); - long reprocessedMessages = 0; - while (msgIds.hasNext()) { - try { - final String nextId = msgIds.next(); - if (queueControl.retryMessage(format("ID:%s", nextId))) { - reprocessedMessages++; - } else { - outputPrinter.writeException(new RuntimeException(format("Skipped retrying of message id %s as it does not exist", nextId))); - } - } catch (final IllegalArgumentException exception) { - outputPrinter.writeException(exception); - } - } - return reprocessedMessages; - } + return jmxProcessor.processQueueControl(host, port, brokerName, destinationName, jmxManagement.reprocessMessages(msgIds)); } @Override public String[] queueNames(final String host, final String port, final String brokerName) throws Exception { - try (final JMXConnector connector = getJMXConnector(host, port)) { - final JMSServerControl serverControl = serverControlOf(connector, brokerName); - return serverControl.getQueueNames(); - } + return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getQueueNames); } + @Override public String[] topicNames(final String host, final String port, final String brokerName) throws Exception { - try (final JMXConnector connector = getJMXConnector(host, port)) { - final JMSServerControl serverControl = serverControlOf(connector, brokerName); - return serverControl.getTopicNames(); - } - } - - protected JMXConnector getJMXConnector(final String host, final String port) throws IOException { - return connect(new JMXServiceURL(format(JMX_URL, host, port)), emptyMap()); - } - - protected JMSServerControl serverControlOf(final JMXConnector connector, final String brokerName) throws Exception { - final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSServerObjectName(); - return newProxyInstance(connector.getMBeanServerConnection(), on, JMSServerControl.class, false); - } - - protected JMSQueueControl queueControlOf(final JMXConnector connector, final String brokerName, final String destinationName) throws Exception { - final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSQueueObjectName(destinationName); - return newProxyInstance(connector.getMBeanServerConnection(), on, JMSQueueControl.class, false); + return jmxProcessor.processServerControl(host, port, brokerName, JMSServerControl::getTopicNames); } -} \ No newline at end of file +} diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagement.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagement.java new file mode 100644 index 0000000..d7f59e0 --- /dev/null +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagement.java @@ -0,0 +1,50 @@ +package uk.gov.justice.artemis.manager.connector.jms; + +import uk.gov.justice.artemis.manager.connector.MessageData; + +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.List; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +public class JmsManagement { + + private static final String JMS_ORIGINAL_DESTINATION = "_AMQ_ORIG_ADDRESS"; + private static final String ID_PREFIX = "ID:"; + private static final String BLANK = ""; + private static final String UNSUPPORTED_MESSAGE_CONTENT = "{\"error\": \"Unsupported message content\"}"; + + public JmsManagementFunction> browseMessages() { + return queueBrowser -> { + try { + final Enumeration browserEnumeration = queueBrowser.getEnumeration(); + + final ArrayList messages = new ArrayList<>(); + + while (browserEnumeration.hasMoreElements()) { + final Message message = (Message) browserEnumeration.nextElement(); + + final String jmsMessageID = message.getJMSMessageID().replaceFirst(ID_PREFIX, BLANK); + final String originalDestination = message.getStringProperty(JMS_ORIGINAL_DESTINATION); + final String text; + + if (message instanceof TextMessage) { + final TextMessage textMessage = (TextMessage) message; + text = textMessage.getText(); + } else { + text = UNSUPPORTED_MESSAGE_CONTENT; + } + + messages.add(new MessageData(jmsMessageID, originalDestination, text)); + } + + return messages; + } catch (final JMSException exception) { + throw new JmsManagementFunctionFailedException("JMS Browse messages failed.", exception); + } + }; + } +} diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagementFunction.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagementFunction.java new file mode 100644 index 0000000..2f0c53d --- /dev/null +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagementFunction.java @@ -0,0 +1,9 @@ +package uk.gov.justice.artemis.manager.connector.jms; + +import javax.jms.QueueBrowser; + +@FunctionalInterface +public interface JmsManagementFunction { + + T apply(final QueueBrowser queueBrowser); +} diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagementFunctionFailedException.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagementFunctionFailedException.java new file mode 100644 index 0000000..9fb0588 --- /dev/null +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagementFunctionFailedException.java @@ -0,0 +1,8 @@ +package uk.gov.justice.artemis.manager.connector.jms; + +public class JmsManagementFunctionFailedException extends RuntimeException { + + public JmsManagementFunctionFailedException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsProcessor.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsProcessor.java new file mode 100644 index 0000000..7bac419 --- /dev/null +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jms/JmsProcessor.java @@ -0,0 +1,34 @@ +package uk.gov.justice.artemis.manager.connector.jms; + +import static java.lang.String.format; + +import javax.jms.JMSException; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; +import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory; + +public class JmsProcessor { + + private static final String JMS_URL = "tcp://%s:%s"; + + public T process(final String host, + final String port, + final String destinationName, + final JmsManagementFunction jmsManagementFunction) throws JMSException { + + final Queue queue = ActiveMQJMSClient.createQueue(destinationName); + + try (final ActiveMQQueueConnectionFactory connectionFactory = new ActiveMQQueueConnectionFactory(format(JMS_URL, host, port)); + final QueueConnection queueConnection = connectionFactory.createQueueConnection(); + final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + final QueueBrowser queueBrowser = queueSession.createBrowser(queue)) { + + return jmsManagementFunction.apply(queueBrowser); + } + } +} diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagement.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagement.java new file mode 100644 index 0000000..a4d564b --- /dev/null +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagement.java @@ -0,0 +1,84 @@ +package uk.gov.justice.artemis.manager.connector.jmx; + +import static java.lang.String.format; +import static java.util.Arrays.stream; +import static java.util.stream.Collectors.toList; + +import uk.gov.justice.artemis.manager.connector.MessageData; +import uk.gov.justice.output.OutputPrinter; + +import java.util.Iterator; +import java.util.List; + +import javax.management.openmbean.CompositeData; + +public class JmxManagement { + + private static final String JMS_MESSAGE_ID = "JMSMessageID"; + private static final String ORIGINAL_DESTINATION = "OriginalDestination"; + private static final String TEXT = "Text"; + private final OutputPrinter outputPrinter; + + public JmxManagement(final OutputPrinter outputPrinter) { + this.outputPrinter = outputPrinter; + } + + public JmxManagementFunction> browseMessages() { + return queueControl -> { + try { + final CompositeData[] browseResult = queueControl.browse(); + + return stream(browseResult) + .map(message -> { + final String jmsMessageId = String.valueOf(message.get(JMS_MESSAGE_ID)).replaceFirst("ID:", ""); + final String originalDestination = String.valueOf(message.get(ORIGINAL_DESTINATION)); + final String text = String.valueOf(message.get(TEXT)); + + return new MessageData(jmsMessageId, originalDestination, text); + }) + .collect(toList()); + + } catch (final Exception exception) { + throw new JmxManagementFunctionFailedException("JMX Browse messages failed.", exception); + } + }; + } + + public JmxManagementFunction removeMessages(final Iterator msgIds) { + return queueControl -> { + long removedMessages = 0; + + while (msgIds.hasNext()) { + try { + queueControl.removeMessage(format("ID:%s", msgIds.next())); + removedMessages++; + } catch (final Exception exception) { + outputPrinter.writeException(exception); + } + } + + return removedMessages; + }; + } + + public JmxManagementFunction reprocessMessages(final Iterator msgIds) { + return queueControl -> { + long reprocessedMessages = 0; + + while (msgIds.hasNext()) { + try { + final String nextId = msgIds.next(); + if (queueControl.retryMessage(format("ID:%s", nextId))) { + reprocessedMessages++; + } else { + outputPrinter.writeException(new RuntimeException(format("Skipped retrying of message id %s as it does not exist", nextId))); + } + } catch (final Exception exception) { + outputPrinter.writeException(exception); + } + } + + return reprocessedMessages; + }; + } +} diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagementFunction.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagementFunction.java new file mode 100644 index 0000000..d68980a --- /dev/null +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagementFunction.java @@ -0,0 +1,9 @@ +package uk.gov.justice.artemis.manager.connector.jmx; + +import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; + +@FunctionalInterface +public interface JmxManagementFunction { + + T apply(final JMSQueueControl queueControl); +} diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagementFunctionFailedException.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagementFunctionFailedException.java new file mode 100644 index 0000000..5795a4c --- /dev/null +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagementFunctionFailedException.java @@ -0,0 +1,8 @@ +package uk.gov.justice.artemis.manager.connector.jmx; + +public class JmxManagementFunctionFailedException extends RuntimeException { + + public JmxManagementFunctionFailedException(final String message, final Throwable cause) { + super(message, cause); + } +} diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxProcessor.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxProcessor.java new file mode 100644 index 0000000..887e3d4 --- /dev/null +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxProcessor.java @@ -0,0 +1,61 @@ +package uk.gov.justice.artemis.manager.connector.jmx; + +import static java.lang.String.format; +import static java.util.Collections.emptyMap; +import static javax.management.MBeanServerInvocationHandler.newProxyInstance; +import static javax.management.remote.JMXConnectorFactory.connect; +import static org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration.getDefaultJmxDomain; + +import java.io.IOException; + +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXServiceURL; + +import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder; +import org.apache.activemq.artemis.api.jms.management.JMSQueueControl; +import org.apache.activemq.artemis.api.jms.management.JMSServerControl; + +public class JmxProcessor { + + private static final String JMX_URL = "service:jmx:rmi:///jndi/rmi://%s:%s/jmxrmi"; + + public T processQueueControl(final String host, + final String port, + final String brokerName, + final String destinationName, + final JmxManagementFunction jmxManagementFunction) throws Exception { + + try (final JMXConnector connector = getJMXConnector(host, port)) { + final JMSQueueControl queueControl = queueControlOf(connector, brokerName, destinationName); + + return jmxManagementFunction.apply(queueControl); + } + } + + public T processServerControl(final String host, + final String port, + final String brokerName, + final JmxServerControlFunction jmxServerControlFunction) throws Exception { + + try (final JMXConnector connector = getJMXConnector(host, port)) { + final JMSServerControl serverControl = serverControlOf(connector, brokerName); + + return jmxServerControlFunction.apply(serverControl); + } + } + + private JMXConnector getJMXConnector(final String host, final String port) throws IOException { + return connect(new JMXServiceURL(format(JMX_URL, host, port)), emptyMap()); + } + + private JMSQueueControl queueControlOf(final JMXConnector connector, final String brokerName, final String destinationName) throws Exception { + final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSQueueObjectName(destinationName); + return newProxyInstance(connector.getMBeanServerConnection(), on, JMSQueueControl.class, false); + } + + private JMSServerControl serverControlOf(final JMXConnector connector, final String brokerName) throws Exception { + final ObjectName on = ObjectNameBuilder.create(getDefaultJmxDomain(), brokerName, true).getJMSServerObjectName(); + return newProxyInstance(connector.getMBeanServerConnection(), on, JMSServerControl.class, false); + } +} diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxServerControlFunction.java b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxServerControlFunction.java new file mode 100644 index 0000000..fb80328 --- /dev/null +++ b/src/main/java/uk/gov/justice/artemis/manager/connector/jmx/JmxServerControlFunction.java @@ -0,0 +1,9 @@ +package uk.gov.justice.artemis.manager.connector.jmx; + +import org.apache.activemq.artemis.api.jms.management.JMSServerControl; + +@FunctionalInterface +public interface JmxServerControlFunction { + + T apply(final JMSServerControl jmsServerControl); +} diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java b/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java index c3369bc..4e51a47 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java +++ b/src/test/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnectorIT.java @@ -1,6 +1,7 @@ package uk.gov.justice.artemis.manager.connector; import static java.util.Arrays.asList; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -128,4 +129,23 @@ public void shouldReturnNumberOfDeletedMessages() throws Exception { assertThat(removedMessages, is(2L)); } + + @Test + public void shouldReprocessMessageOntoOriginalQueue() throws Exception { + final String queue = "DLQ"; + + cleanQueue(queue); + + putInQueue(queue, "{\"key1\":\"value123\"}", "origQueueO1"); + putInQueue(queue, "{\"key1\":\"valueBB\"}", "origQueueO2"); + + final List messageData = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue); + + final long reprocessedMessages = combinedArtemisConnector.reprocess("localhost", "3000", "0.0.0.0", queue, asList(messageData.get(0).getMsgId(), messageData.get(1).getMsgId()).iterator()); + + final List messageDataAfter = combinedArtemisConnector.messagesOf("localhost", "61616", "0.0.0.0", queue); + + assertThat(reprocessedMessages, is(2L)); + assertThat(messageDataAfter, is(empty())); + } } diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java b/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java index 80ffc4f..1e43799 100644 --- a/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java +++ b/src/test/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnectorIT.java @@ -130,4 +130,4 @@ public void shouldReturnListOfTopics() throws Exception { assertThat(topicNames, arrayContainingInAnyOrder(new String[] {"testTopic"})); } -} \ No newline at end of file +} diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagementFunctionFailedExceptionTest.java b/src/test/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagementFunctionFailedExceptionTest.java new file mode 100644 index 0000000..5085451 --- /dev/null +++ b/src/test/java/uk/gov/justice/artemis/manager/connector/jms/JmsManagementFunctionFailedExceptionTest.java @@ -0,0 +1,21 @@ +package uk.gov.justice.artemis.manager.connector.jms; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; + +import org.junit.Test; + +public class JmsManagementFunctionFailedExceptionTest { + + @Test + public void shouldConstructExceptionWithMessageAndCause() throws Exception { + final String message = "message"; + final Exception cause = mock(Exception.class); + + final JmsManagementFunctionFailedException exception = new JmsManagementFunctionFailedException(message, cause); + + assertThat(exception.getMessage(), is(message)); + assertThat(exception.getCause(), is(cause)); + } +} diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagementFunctionFailedExceptionTest.java b/src/test/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagementFunctionFailedExceptionTest.java new file mode 100644 index 0000000..937554b --- /dev/null +++ b/src/test/java/uk/gov/justice/artemis/manager/connector/jmx/JmxManagementFunctionFailedExceptionTest.java @@ -0,0 +1,21 @@ +package uk.gov.justice.artemis.manager.connector.jmx; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; + +import org.junit.Test; + +public class JmxManagementFunctionFailedExceptionTest { + + @Test + public void shouldConstructExceptionWithMessageAndCause() throws Exception { + final String message = "message"; + final Exception cause = mock(Exception.class); + + final JmxManagementFunctionFailedException exception = new JmxManagementFunctionFailedException(message, cause); + + assertThat(exception.getMessage(), is(message)); + assertThat(exception.getCause(), is(cause)); + } +}