diff --git a/CHANGELOG.md b/CHANGELOG.md
index 80b0e0d..582db1c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -5,10 +5,15 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to
## [Unreleased]
+## [3.5.0] - 2019-06-20
+
+### Added
+- Command _`deduplicate topic messages`_ that finds all Topic duplicate messages and removes the duplicates from the DLQ then re-sends them to the DLQ, creating new JMSMessageIds for each message
+
## [3.4.0] - 2019-06-04
### Added
-- Command _`remove all duplicates`_ that finds all not Topic duplicate messages and removes the duplicates from the DLQ
+- Command _`remove all duplicates`_ that finds all non Topic duplicate messages and removes the duplicates from the DLQ
## [3.3.0] - 2019-05-09
diff --git a/README.md b/README.md
index a67eed2..9237e97 100644
--- a/README.md
+++ b/README.md
@@ -72,6 +72,18 @@ Only messages that have the same JMSMessageId and Consumer (_AMQ_ORIG_QUEUE) wil
`java -jar artemis-manager.jar removeallduplicates @artemis.config`
+## Deduplicate Topic Messages from DLQ
+
+* Deduplicate topic messages
+
+This checks the DLQ to find any duplicate Topic messages and will remove the duplicate messages from the DLQ. Then it re-sends the duplicates to the DLQ, so that a unique JMSMessageId is created for each message. The messages can then be replayed or removed individually.
+
+Only messages that have the same JMSMessageId and different Consumer (_AMQ_ORIG_QUEUE) will be removed.
+
+**Note: Browse uses JMS to connect to the Artemis broker.**
+
+`java -jar artemis-manager.jar deduplicatetopicmessages @artemis.config`
+
## Reprocess Message from DLQ
* Reprocess message by id
diff --git a/pom.xml b/pom.xml
index 9358a62..a12c112 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,6 +10,9 @@
1.7.1
+ artemis-manager
+ 3.4.1-SNAPSHOT
+
artemis-manager
2.0.0
@@ -18,13 +21,11 @@
1.7.10
1.3
1.48
- 1.16.2
+ 1.19.0
4.5
+ 2.2.0
- artemis-manager
- 3.4.1-SNAPSHOT
-
${cpp.scm.connection}
${cpp.scm.developerConnection}
@@ -36,7 +37,7 @@
uk.gov.justice
maven-common-bom
- 1.28.0
+ ${common-bom.version}
import
pom
diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/ArtemisConnector.java b/src/main/java/uk/gov/justice/artemis/manager/connector/ArtemisConnector.java
index 4bd102c..e0aed8f 100644
--- a/src/main/java/uk/gov/justice/artemis/manager/connector/ArtemisConnector.java
+++ b/src/main/java/uk/gov/justice/artemis/manager/connector/ArtemisConnector.java
@@ -14,6 +14,8 @@ public interface ArtemisConnector {
List removeAllDuplicates(final String destinationName);
+ List deduplicateTopicMessages(final String destinationName);
+
long reprocess(final String destinationName, final Iterator msgIds);
int reprocessAll(final String destinationName);
diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java b/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java
index 7445097..7973f33 100644
--- a/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java
+++ b/src/main/java/uk/gov/justice/artemis/manager/connector/CombinedJmsAndJmxArtemisConnector.java
@@ -12,6 +12,7 @@
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageRemover;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.JmsMessageUtil;
+import uk.gov.justice.artemis.manager.connector.combined.duplicate.TopicDuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
@@ -48,6 +49,7 @@ public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector {
private final JmsMessageUtil jmsMessageUtil = new JmsMessageUtil();
private final CombinedManagement combinedManagement = new CombinedManagement(
new DuplicateMessageFinder(jmsMessageUtil),
+ new TopicDuplicateMessageFinder(jmsMessageUtil),
new DuplicateMessageRemover(),
new AddedMessageFinder(jmsMessageUtil));
@@ -114,6 +116,17 @@ public List removeAllDuplicates(final String destinationName) {
combinedManagement.removeAllDuplicates());
}
+ @Override
+ public List deduplicateTopicMessages(final String destinationName) {
+ return combinedProcessor.process(
+ jmsFactory,
+ jmxServiceUrls,
+ jmxEnvironment,
+ objectNameBuilder,
+ destinationName,
+ combinedManagement.deduplicateTopicMessages());
+ }
+
@Override
public long reprocess(final String destinationName, final Iterator msgIds) {
return jmxProcessor
diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java b/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java
index 38248c7..8607092 100644
--- a/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java
+++ b/src/main/java/uk/gov/justice/artemis/manager/connector/JmxArtemisConnector.java
@@ -77,6 +77,11 @@ public List removeAllDuplicates(final String destinationName) {
throw new UnsupportedOperationException("removeAllDuplicates is not supported by the JmxArtemisConnector");
}
+ @Override
+ public List deduplicateTopicMessages(final String destinationName) {
+ throw new UnsupportedOperationException("deduplicateTopicMessages is not supported by the JmxArtemisConnector");
+ }
+
@Override
public long reprocess(final String destinationName, final Iterator msgIds) {
return jmxProcessor.processQueueControl(
diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/CombinedManagement.java b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/CombinedManagement.java
index c22b548..754c4b3 100644
--- a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/CombinedManagement.java
+++ b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/CombinedManagement.java
@@ -1,38 +1,56 @@
package uk.gov.justice.artemis.manager.connector.combined;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.AddedMessageFinder;
+import uk.gov.justice.artemis.manager.connector.combined.duplicate.BrowsedMessages;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageRemover;
-import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessages;
+import uk.gov.justice.artemis.manager.connector.combined.duplicate.TopicDuplicateMessageFinder;
import java.util.List;
public class CombinedManagement {
private final DuplicateMessageFinder duplicateMessageFinder;
+ private final TopicDuplicateMessageFinder topicDuplicateMessageFinder;
private final DuplicateMessageRemover duplicateMessageRemover;
private final AddedMessageFinder addedMessageFinder;
public CombinedManagement(final DuplicateMessageFinder duplicateMessageFinder,
+ final TopicDuplicateMessageFinder topicDuplicateMessageFinder,
final DuplicateMessageRemover duplicateMessageRemover,
final AddedMessageFinder addedMessageFinder) {
this.duplicateMessageFinder = duplicateMessageFinder;
+ this.topicDuplicateMessageFinder = topicDuplicateMessageFinder;
this.duplicateMessageRemover = duplicateMessageRemover;
this.addedMessageFinder = addedMessageFinder;
}
public CombinedManagementFunction> removeAllDuplicates() {
- return (queueBrowser, queueSender, jmsQueueControl) ->
- {
- final DuplicateMessages duplicateMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
+ return (queueBrowser, queueSender, jmsQueueControl) -> {
- duplicateMessageRemover.removeDuplicatesOnly(
+ final BrowsedMessages browsedMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
+
+ duplicateMessageRemover.removeAndResendDuplicateMessages(
+ queueSender,
+ jmsQueueControl,
+ browsedMessages);
+
+ return addedMessageFinder.findAddedMessages(browsedMessages, queueBrowser);
+ };
+ }
+
+ public CombinedManagementFunction> deduplicateTopicMessages() {
+ return (queueBrowser, queueSender, jmsQueueControl) -> {
+
+ final BrowsedMessages topicBrowsedMessages = topicDuplicateMessageFinder.findTopicDuplicateMessages(queueBrowser);
+
+ duplicateMessageRemover.removeAndResendDuplicateMessages(
queueSender,
jmsQueueControl,
- duplicateMessages);
+ topicBrowsedMessages);
- return addedMessageFinder.findAddedMessages(duplicateMessages, queueBrowser);
+ return addedMessageFinder.findAddedMessages(topicBrowsedMessages, queueBrowser);
};
}
}
diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/AddedMessageFinder.java b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/AddedMessageFinder.java
index a1a03e5..22b7799 100644
--- a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/AddedMessageFinder.java
+++ b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/AddedMessageFinder.java
@@ -20,9 +20,9 @@ public AddedMessageFinder(final JmsMessageUtil jmsMessageUtil) {
}
@SuppressWarnings("unchecked")
- public List findAddedMessages(final DuplicateMessages duplicateMessages, final QueueBrowser queueBrowser) {
+ public List findAddedMessages(final BrowsedMessages browsedMessages, final QueueBrowser queueBrowser) {
- final Map messageCache = duplicateMessages.getMessageCache();
+ final Map messageCache = browsedMessages.getMessageCache();
final List addedMessageIds = new ArrayList<>();
try {
diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessages.java b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/BrowsedMessages.java
similarity index 57%
rename from src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessages.java
rename to src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/BrowsedMessages.java
index 8bc0106..661d3ac 100644
--- a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessages.java
+++ b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/BrowsedMessages.java
@@ -1,21 +1,22 @@
package uk.gov.justice.artemis.manager.connector.combined.duplicate;
+import java.util.List;
import java.util.Map;
import javax.jms.Message;
-public class DuplicateMessages {
+public class BrowsedMessages {
- private final Map duplicateMessages;
+ private final Map> duplicateMessages;
private final Map messageCache;
- public DuplicateMessages(final Map duplicateMessages,
- final Map messageCache) {
+ public BrowsedMessages(final Map> duplicateMessages, final Map messageCache) {
+
this.duplicateMessages = duplicateMessages;
this.messageCache = messageCache;
}
- public Map getDuplicateMessages() {
+ public Map> getDuplicateMessages() {
return duplicateMessages;
}
diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageFinder.java b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageFinder.java
index d10b86c..ab3b2cc 100644
--- a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageFinder.java
+++ b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageFinder.java
@@ -2,8 +2,10 @@
import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;
+import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
@@ -19,9 +21,9 @@ public DuplicateMessageFinder(final JmsMessageUtil jmsMessageUtil) {
}
@SuppressWarnings("unchecked")
- public DuplicateMessages findDuplicateMessages(final QueueBrowser queueBrowser) {
+ public BrowsedMessages findDuplicateMessages(final QueueBrowser queueBrowser) {
- final Map duplicateMessages = new HashMap<>();
+ final Map> duplicateMessages = new HashMap<>();
final Map messageCache = new HashMap<>();
try {
@@ -40,7 +42,9 @@ public DuplicateMessages findDuplicateMessages(final QueueBrowser queueBrowser)
final boolean isNotDuplicateTopicMessage = duplicateConsumer.equals(originalConsumer);
if (isNotDuplicateTopicMessage) {
- duplicateMessages.put(jmsMessageID, originalMessage);
+ duplicateMessages
+ .computeIfAbsent(jmsMessageID, key -> new ArrayList<>())
+ .add(originalMessage);
}
} else {
@@ -48,7 +52,7 @@ public DuplicateMessages findDuplicateMessages(final QueueBrowser queueBrowser)
}
}
- return new DuplicateMessages(duplicateMessages, messageCache);
+ return new BrowsedMessages(duplicateMessages, messageCache);
} catch (final JMSException exception) {
throw new CombinedManagementFunctionException("Failed to browse messages on queue.", exception);
diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageRemover.java b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageRemover.java
index 6ad57db..0f6b1d2 100644
--- a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageRemover.java
+++ b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageRemover.java
@@ -4,6 +4,7 @@
import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;
+import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
@@ -14,11 +15,11 @@
public class DuplicateMessageRemover {
- public void removeDuplicatesOnly(final QueueSender queueSender,
- final JMSQueueControl jmsQueueControl,
- final DuplicateMessages duplicateMessages) {
+ public void removeAndResendDuplicateMessages(final QueueSender queueSender,
+ final JMSQueueControl jmsQueueControl,
+ final BrowsedMessages browsedMessages) {
- final Map duplicateMessagesMap = duplicateMessages.getDuplicateMessages();
+ final Map> duplicateMessagesMap = browsedMessages.getDuplicateMessages();
duplicateMessagesMap.keySet()
.forEach(jmsMessageId -> {
@@ -28,14 +29,26 @@ public void removeDuplicatesOnly(final QueueSender queueSender,
final String filter = format("JMSMessageID = 'ID:%s'", jmsMessageId);
jmsQueueControl.removeMessages(filter);
- queueSender.send(duplicateMessagesMap.get(jmsMessageId));
- } catch (final JMSException exception) {
- throw new CombinedManagementFunctionException(format("Failed to add message back onto queue, all messages have been deleted for JMSMessageID: %s", jmsMessageId), exception);
+ final List messages = duplicateMessagesMap.get(jmsMessageId);
+
+ sendMessages(queueSender, jmsMessageId, messages);
+ } catch (final CombinedManagementFunctionException exception) {
+ throw exception;
} catch (final Exception exception) {
throw new CombinedManagementFunctionException(format("Failed to remove duplicates for JMSMessageID: %s", jmsMessageId), exception);
}
});
}
+
+ private void sendMessages(final QueueSender queueSender, final String jmsMessageId, final List messages) {
+ messages.forEach(message -> {
+ try {
+ queueSender.send(message);
+ } catch (final JMSException exception) {
+ throw new CombinedManagementFunctionException(format("Failed to add message back onto queue, all messages have been deleted for JMSMessageID: %s", jmsMessageId), exception);
+ }
+ });
+ }
}
diff --git a/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/TopicDuplicateMessageFinder.java b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/TopicDuplicateMessageFinder.java
new file mode 100644
index 0000000..bcad565
--- /dev/null
+++ b/src/main/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/TopicDuplicateMessageFinder.java
@@ -0,0 +1,70 @@
+package uk.gov.justice.artemis.manager.connector.combined.duplicate;
+
+import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.QueueBrowser;
+
+public class TopicDuplicateMessageFinder {
+
+ private final JmsMessageUtil jmsMessageUtil;
+
+ public TopicDuplicateMessageFinder(final JmsMessageUtil jmsMessageUtil) {
+ this.jmsMessageUtil = jmsMessageUtil;
+ }
+
+ @SuppressWarnings("unchecked")
+ public BrowsedMessages findTopicDuplicateMessages(final QueueBrowser queueBrowser) {
+
+ final Map> duplicateMessages = new HashMap<>();
+ final Map messageCache = new HashMap<>();
+
+ try {
+ final Enumeration browserEnumeration = queueBrowser.getEnumeration();
+
+ while (browserEnumeration.hasMoreElements()) {
+
+ final Message message = browserEnumeration.nextElement();
+ final String jmsMessageID = jmsMessageUtil.getJmsMessageIdFrom(message);
+
+ if (messageCache.containsKey(jmsMessageID)) {
+
+ final Message originalMessage = messageCache.get(jmsMessageID);
+ final String originalConsumer = jmsMessageUtil.getConsumerFrom(originalMessage);
+ final String duplicateConsumer = jmsMessageUtil.getConsumerFrom(message);
+ final boolean isDuplicateTopicMessage = !duplicateConsumer.equals(originalConsumer);
+
+ if (isDuplicateTopicMessage) {
+
+ duplicateMessages
+ .computeIfAbsent(jmsMessageID, key -> createNewMessageListWith(originalMessage))
+ .add(message);
+ } else {
+ messageCache.put(jmsMessageID, message);
+ }
+
+ } else {
+ messageCache.put(jmsMessageID, message);
+ }
+ }
+
+ return new BrowsedMessages(duplicateMessages, messageCache);
+
+ } catch (final JMSException exception) {
+ throw new CombinedManagementFunctionException("Failed to browse messages on queue.", exception);
+ }
+ }
+
+ private List createNewMessageListWith(final Message originalMessage) {
+ final ArrayList messages = new ArrayList<>();
+ messages.add(originalMessage);
+ return messages;
+ }
+}
diff --git a/src/main/java/uk/gov/justice/framework/tools/command/DeduplicateTopicMessages.java b/src/main/java/uk/gov/justice/framework/tools/command/DeduplicateTopicMessages.java
new file mode 100644
index 0000000..bdad919
--- /dev/null
+++ b/src/main/java/uk/gov/justice/framework/tools/command/DeduplicateTopicMessages.java
@@ -0,0 +1,20 @@
+package uk.gov.justice.framework.tools.command;
+
+import uk.gov.justice.framework.tools.common.command.ShellCommand;
+
+import java.util.List;
+
+public class DeduplicateTopicMessages extends AbstractArtemisCommand implements ShellCommand {
+
+ @Override
+ public void run(final String[] strings) {
+ try {
+ super.setup();
+
+ final List messageIds = artemisConnector.deduplicateTopicMessages("DLQ");
+ outputPrinter.writeStringCollection(messageIds);
+ } catch (final Exception exception) {
+ outputPrinter.writeStackTrace(exception);
+ }
+ }
+}
diff --git a/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java b/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java
index facd145..1ae7239 100644
--- a/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java
+++ b/src/test/java/uk/gov/justice/artemis/manager/ArtemisManagerIT.java
@@ -6,6 +6,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.isEmptyString;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
import static uk.gov.justice.artemis.manager.util.JmsTestUtil.cleanQueue;
import static uk.gov.justice.artemis.manager.util.JmsTestUtil.closeJmsConnection;
import static uk.gov.justice.artemis.manager.util.JmsTestUtil.consumerOf;
@@ -40,6 +41,7 @@ public class ArtemisManagerIT {
private static final String COMMAND_LINE_REPROCESS = "env -u _JAVA_OPTIONS java -jar target/artemis-manager.jar reprocess -brokerName 0.0.0.0 -jmxUrl service:jmx:rmi:///jndi/rmi://localhost:3000/jmxrmi -jmsUrl tcp://localhost:61616?clientID=artemis-manager";
private static final String COMMAND_LINE_REMOVE = "env -u _JAVA_OPTIONS java -jar target/artemis-manager.jar remove -brokerName 0.0.0.0 -jmxUrl service:jmx:rmi:///jndi/rmi://localhost:3000/jmxrmi -jmsUrl tcp://localhost:61616?clientID=artemis-manager";
private static final String COMMAND_LINE_REMOVE_ALL_DUPLICATES = "env -u _JAVA_OPTIONS java -jar target/artemis-manager.jar removeallduplicates -brokerName 0.0.0.0 -jmxUrl service:jmx:rmi:///jndi/rmi://localhost:3000/jmxrmi -jmsUrl tcp://localhost:61616?clientID=artemis-manager";
+ private static final String COMMAND_LINE_DEDUPLICATE_TOPIC_MESSAGES = "env -u _JAVA_OPTIONS java -jar target/artemis-manager.jar deduplicatetopicmessages -brokerName 0.0.0.0 -jmxUrl service:jmx:rmi:///jndi/rmi://localhost:3000/jmxrmi -jmsUrl tcp://localhost:61616?clientID=artemis-manager";
@BeforeClass
public static void beforeClass() throws JMSException {
@@ -340,6 +342,54 @@ public void shouldReprocessLargeMessageOntoOriginalQueue() throws Exception {
}
}
+ @Test
+ public void shouldDeduplicateTopicMessagesAndReturnListOfMessageIds() throws Exception {
+ if (notWindows()) {
+
+ final UUID jmsMessageId_1 = UUIDGenerator.getInstance().generateUUID();
+ final UUID jmsMessageId_2 = UUIDGenerator.getInstance().generateUUID();
+ final UUID jmsMessageId_3 = UUIDGenerator.getInstance().generateUUID();
+ final String messageText_1 = "{\"key1\":\"value1\"}";
+ final String messageText_2 = "{\"key1\":\"value2\"}";
+ final String consumer_1 = "consumer1";
+ final String consumer_2 = "consumer2";
+ final String consumer_3 = "consumer3";
+ final String originalQueue_1 = "origQueueO1";
+ final String originalQueue_2 = "origQueueO2";
+
+ cleanQueue(DLQ);
+
+ putInQueueWithMessageId(DLQ, jmsMessageId_1, messageText_1, consumer_1, originalQueue_1);
+ putInQueueWithMessageId(DLQ, jmsMessageId_1, messageText_1, consumer_1, originalQueue_1);
+ putInQueueWithMessageId(DLQ, jmsMessageId_1, messageText_1, consumer_1, originalQueue_1);
+ putInQueueWithMessageId(DLQ, jmsMessageId_2, messageText_1, consumer_1, originalQueue_1);
+ putInQueueWithMessageId(DLQ, jmsMessageId_3, messageText_2, consumer_1, originalQueue_2);
+ putInQueueWithMessageId(DLQ, jmsMessageId_3, messageText_2, consumer_2, originalQueue_2);
+ putInQueueWithMessageId(DLQ, jmsMessageId_3, messageText_2, consumer_3, originalQueue_2);
+
+ final Output output = execute(COMMAND_LINE_DEDUPLICATE_TOPIC_MESSAGES);
+ assertThat(output.errorOutput, isEmptyString());
+
+ final String standardOutput = output.standardOutput();
+
+ final JsonReader reader = Json.createReader(new StringReader(standardOutput));
+ final JsonArray jsonArray = reader.readArray();
+
+ assertThat(jsonArray.size(), is(3));
+ assertDLQHasSizeOf(7);
+
+ jsonArray.forEach(jsonValue -> {
+ try {
+ execute(COMMAND_LINE_REMOVE + " -msgId " + jsonValue.toString());
+ } catch (final IOException e) {
+ fail("Failed to remove: " + jsonValue.toString());
+ }
+ });
+
+ assertDLQHasSizeOf(4);
+ }
+ }
+
private void setDefaultDLQMessages() throws JMSException, IOException {
cleanQueue(DLQ);
@@ -363,11 +413,6 @@ private String standardOutputOf(final String cmd) throws IOException {
return execute(cmd).standardOutput();
}
- private String errorOutputOf(final String cmd) throws IOException {
- return execute(cmd).errorOutput();
- }
-
-
private Output execute(final String[] cmd) throws IOException {
Runtime runtime = Runtime.getRuntime();
final Process process = runtime.exec(cmd);
diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/DeduplicateTopicMessagesIT.java b/src/test/java/uk/gov/justice/artemis/manager/connector/DeduplicateTopicMessagesIT.java
new file mode 100644
index 0000000..8ceed94
--- /dev/null
+++ b/src/test/java/uk/gov/justice/artemis/manager/connector/DeduplicateTopicMessagesIT.java
@@ -0,0 +1,126 @@
+package uk.gov.justice.artemis.manager.connector;
+
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static uk.gov.justice.artemis.manager.util.JmsTestUtil.cleanQueue;
+import static uk.gov.justice.artemis.manager.util.JmsTestUtil.closeJmsConnection;
+import static uk.gov.justice.artemis.manager.util.JmsTestUtil.openJmsConnection;
+import static uk.gov.justice.artemis.manager.util.JmsTestUtil.putInQueueWithMessageId;
+
+import java.net.MalformedURLException;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.activemq.artemis.utils.UUID;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+//to run this test from IDE start artemis first by executing ./target/server0/bin/artemis run
+public class DeduplicateTopicMessagesIT {
+
+ private ArtemisConnector combinedArtemisConnector;
+
+ @BeforeClass
+ public static void beforeClass() throws JMSException {
+ openJmsConnection();
+ }
+
+ @AfterClass
+ public static void afterClass() throws JMSException {
+ closeJmsConnection();
+ }
+
+ @Before
+ public void setUp() throws MalformedURLException {
+ this.combinedArtemisConnector = new CombinedJmsAndJmxArtemisConnector();
+ this.combinedArtemisConnector.setParameters(
+ ImmutableList.of("service:jmx:rmi://localhost:3000/jndi/rmi://localhost:3000/jmxrmi"),
+ "0.0.0.0",
+ null,
+ null,
+ "tcp://localhost:61616?clientID=artemis-manager",
+ null,
+ null
+ );
+ }
+
+ @Test
+ public void shouldRemoveMessageDuplicates() throws Exception {
+
+ final UUIDGenerator uuidGenerator = UUIDGenerator.getInstance();
+
+ final String queue = "DLQ";
+
+ final UUID jmsMessageId_1 = uuidGenerator.generateUUID();
+ final UUID jmsMessageId_2 = uuidGenerator.generateUUID();
+
+ final String messageText = "{\"key1\":\"value123\"}";
+ final String consumer_1 = "consumer1";
+ final String consumer_2 = "consumer2";
+ final String consumer_3 = "consumer3";
+ final String originalQueue_1 = "origQueueO1";
+
+ cleanQueue(queue);
+
+ putInQueueWithMessageId(queue, jmsMessageId_1, messageText, consumer_1, originalQueue_1);
+ putInQueueWithMessageId(queue, jmsMessageId_1, messageText, consumer_2, originalQueue_1);
+ putInQueueWithMessageId(queue, jmsMessageId_1, messageText, consumer_3, originalQueue_1);
+ putInQueueWithMessageId(queue, jmsMessageId_2, messageText, consumer_1, originalQueue_1);
+
+ final List messageData_1 = combinedArtemisConnector.messagesOf(queue);
+
+ assertThat(messageData_1, hasSize(4));
+ assertThat(messageData_1.get(0).getMsgId(), is(jmsMessageId_1.toString()));
+ assertThat(messageData_1.get(0).getOriginalDestination(), is(originalQueue_1));
+ assertThat(messageData_1.get(0).getMsgContent().getString("key1"), is("value123"));
+ assertThat(messageData_1.get(0).getConsumer(), is(consumer_1));
+
+ assertThat(messageData_1.get(1).getMsgId(), is(jmsMessageId_1.toString()));
+ assertThat(messageData_1.get(1).getOriginalDestination(), is(originalQueue_1));
+ assertThat(messageData_1.get(1).getMsgContent().getString("key1"), is("value123"));
+ assertThat(messageData_1.get(1).getConsumer(), is(consumer_2));
+
+ assertThat(messageData_1.get(2).getMsgId(), is(jmsMessageId_1.toString()));
+ assertThat(messageData_1.get(2).getOriginalDestination(), is(originalQueue_1));
+ assertThat(messageData_1.get(2).getMsgContent().getString("key1"), is("value123"));
+ assertThat(messageData_1.get(2).getConsumer(), is(consumer_3));
+
+ assertThat(messageData_1.get(3).getMsgId(), is(jmsMessageId_2.toString()));
+ assertThat(messageData_1.get(3).getOriginalDestination(), is(originalQueue_1));
+ assertThat(messageData_1.get(3).getMsgContent().getString("key1"), is("value123"));
+ assertThat(messageData_1.get(3).getConsumer(), is(consumer_1));
+
+ final List listOfMessageIds = combinedArtemisConnector.deduplicateTopicMessages(queue);
+
+ assertThat(listOfMessageIds.size(), is(3));
+
+ final List messageData_2 = combinedArtemisConnector.messagesOf(queue);
+
+ assertThat(messageData_2, hasSize(4));
+ assertThat(messageData_2.get(0).getMsgId(), is(jmsMessageId_2.toString()));
+ assertThat(messageData_2.get(0).getOriginalDestination(), is(originalQueue_1));
+ assertThat(messageData_2.get(0).getMsgContent().getString("key1"), is("value123"));
+ assertThat(messageData_2.get(0).getConsumer(), is(consumer_1));
+
+ assertThat(messageData_2.get(1).getMsgId(), is(listOfMessageIds.get(0)));
+ assertThat(messageData_2.get(1).getOriginalDestination(), is(originalQueue_1));
+ assertThat(messageData_2.get(1).getMsgContent().getString("key1"), is("value123"));
+ assertThat(messageData_2.get(1).getConsumer(), is(consumer_1));
+
+ assertThat(messageData_2.get(2).getMsgId(), is(listOfMessageIds.get(1)));
+ assertThat(messageData_2.get(2).getOriginalDestination(), is(originalQueue_1));
+ assertThat(messageData_2.get(2).getMsgContent().getString("key1"), is("value123"));
+ assertThat(messageData_2.get(2).getConsumer(), is(consumer_2));
+
+ assertThat(messageData_2.get(3).getMsgId(), is(listOfMessageIds.get(2)));
+ assertThat(messageData_2.get(3).getOriginalDestination(), is(originalQueue_1));
+ assertThat(messageData_2.get(3).getMsgContent().getString("key1"), is("value123"));
+ assertThat(messageData_2.get(3).getConsumer(), is(consumer_3));
+ }
+}
diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/AddedMessageFinderTest.java b/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/AddedMessageFinderTest.java
index 16b6ffa..46194c2 100644
--- a/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/AddedMessageFinderTest.java
+++ b/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/AddedMessageFinderTest.java
@@ -65,7 +65,7 @@ public void shouldFindAddedMessages() throws Exception {
when(jmsMessageUtil.getJmsMessageIdFrom(message_4)).thenReturn(messageId_4);
final List addedMessageIds = addedMessageFinder.findAddedMessages(
- new DuplicateMessages(null, messageCache),
+ new BrowsedMessages(null, messageCache),
queueBrowser);
assertThat(addedMessageIds.size(), is(2));
@@ -81,7 +81,7 @@ public void shouldThrowCombinedManagementFunctionExceptionIfJMSExceptionIsThrown
when(queueBrowser.getEnumeration()).thenThrow(jmsException);
try {
- addedMessageFinder.findAddedMessages(mock(DuplicateMessages.class), queueBrowser);
+ addedMessageFinder.findAddedMessages(mock(BrowsedMessages.class), queueBrowser);
fail();
} catch (final CombinedManagementFunctionException exception) {
Assert.assertThat(exception.getMessage(), is("Failed to browse messages on queue."));
diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageFinderTest.java b/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageFinderTest.java
index 0724017..d2d153c 100644
--- a/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageFinderTest.java
+++ b/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageFinderTest.java
@@ -3,6 +3,7 @@
import static java.util.Arrays.asList;
import static java.util.Collections.emptyEnumeration;
import static java.util.Collections.enumeration;
+import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.CoreMatchers.is;
@@ -12,13 +13,11 @@
import static org.mockito.Mockito.when;
import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;
-import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageFinder;
-import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessages;
-import uk.gov.justice.artemis.manager.connector.combined.duplicate.JmsMessageUtil;
import java.util.Collection;
import java.util.Enumeration;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
@@ -40,6 +39,7 @@ public class DuplicateMessageFinderTest {
@InjectMocks
private DuplicateMessageFinder duplicateMessageFinder;
+ @SuppressWarnings("unchecked")
@Test
public void shouldReturnSingleInstanceOfDuplicatedMessages() throws Exception {
@@ -72,15 +72,15 @@ public void shouldReturnSingleInstanceOfDuplicatedMessages() throws Exception {
when(queueBrowser.getEnumeration()).thenReturn(messageEnumeration);
- final DuplicateMessages duplicateMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
+ final BrowsedMessages browsedMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
- final Map duplicateMessagesMap = duplicateMessages.getDuplicateMessages();
+ final Map> duplicateMessagesMap = browsedMessages.getDuplicateMessages();
assertThat(duplicateMessagesMap.size(), is(2));
- assertThat(duplicateMessages.getMessageCache().size(), is(3));
+ assertThat(browsedMessages.getMessageCache().size(), is(3));
- final Collection values = duplicateMessagesMap.values();
- assertThat(values, hasItems(message_1, message_5));
+ final Collection> values = duplicateMessagesMap.values();
+ assertThat(values, hasItems(singletonList(message_1), singletonList(message_5)));
}
@Test
@@ -115,15 +115,15 @@ public void shouldNotReturnTopicDuplicateMessages() throws Exception {
when(queueBrowser.getEnumeration()).thenReturn(messageEnumeration);
- final DuplicateMessages duplicateMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
+ final BrowsedMessages browsedMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
- final Map duplicateMessagesMap = duplicateMessages.getDuplicateMessages();
+ final Map> duplicateMessagesMap = browsedMessages.getDuplicateMessages();
assertThat(duplicateMessagesMap.size(), is(1));
- final Iterator messageIterator = duplicateMessagesMap.values().iterator();
- assertThat(messageIterator.next(), is(message_1));
+ final Iterator> messageIterator = duplicateMessagesMap.values().iterator();
+ assertThat(messageIterator.next().get(0), is(message_1));
- assertThat(duplicateMessages.getMessageCache().size(), is(3));
+ assertThat(browsedMessages.getMessageCache().size(), is(3));
}
@Test
@@ -149,10 +149,10 @@ public void shouldReturnEmptyDuplicatesIfNoDuplicates() throws Exception {
when(queueBrowser.getEnumeration()).thenReturn(messageEnumeration);
- final DuplicateMessages duplicateMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
+ final BrowsedMessages browsedMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
- assertThat(duplicateMessages.getDuplicateMessages().isEmpty(), is(true));
- assertThat(duplicateMessages.getMessageCache().size(), is(3));
+ assertThat(browsedMessages.getDuplicateMessages().isEmpty(), is(true));
+ assertThat(browsedMessages.getMessageCache().size(), is(3));
}
@Test
@@ -163,10 +163,10 @@ public void shouldReturnEmptyIfNoMessages() throws Exception {
when(queueBrowser.getEnumeration()).thenReturn(messageEnumeration);
- final DuplicateMessages duplicateMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
+ final BrowsedMessages browsedMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
- assertThat(duplicateMessages.getDuplicateMessages().isEmpty(), is(true));
- assertThat(duplicateMessages.getMessageCache().isEmpty(), is(true));
+ assertThat(browsedMessages.getDuplicateMessages().isEmpty(), is(true));
+ assertThat(browsedMessages.getMessageCache().isEmpty(), is(true));
}
@Test
diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageRemoverTest.java b/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageRemoverTest.java
index c436738..3161a99 100644
--- a/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageRemoverTest.java
+++ b/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/DuplicateMessageRemoverTest.java
@@ -1,6 +1,7 @@
package uk.gov.justice.artemis.manager.connector.combined.duplicate;
import static java.lang.String.format;
+import static java.util.Collections.singletonList;
import static java.util.UUID.randomUUID;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
@@ -11,10 +12,9 @@
import static org.mockito.Mockito.when;
import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;
-import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageRemover;
-import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessages;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
@@ -35,12 +35,14 @@ public class DuplicateMessageRemoverTest {
@InjectMocks
private DuplicateMessageRemover duplicateMessageRemover;
+ @SuppressWarnings("unchecked")
@Test
public void shouldRemoveDuplicateMessagesOnly() throws Exception {
final QueueSender queueSender = mock(QueueSender.class);
final JMSQueueControl jmsQueueControl = mock(JMSQueueControl.class);
- final Map duplicateMessages = new HashMap<>();
+ final Map> duplicateMessages = new HashMap<>();
+ final Map messageCache = mock(Map.class);
final Message message_1 = mock(Message.class);
final Message message_2 = mock(Message.class);
@@ -48,10 +50,10 @@ public void shouldRemoveDuplicateMessagesOnly() throws Exception {
final String messageId_1 = randomUUID().toString();
final String messageId_2 = randomUUID().toString();
- duplicateMessages.put(messageId_1, message_1);
- duplicateMessages.put(messageId_2, message_2);
+ duplicateMessages.put(messageId_1, singletonList(message_1));
+ duplicateMessages.put(messageId_2, singletonList(message_2));
- duplicateMessageRemover.removeDuplicatesOnly(queueSender, jmsQueueControl, new DuplicateMessages(duplicateMessages, null));
+ duplicateMessageRemover.removeAndResendDuplicateMessages(queueSender, jmsQueueControl, new BrowsedMessages(duplicateMessages, messageCache));
verify(jmsQueueControl).removeMessages(format(JMS_MESSAGE_ID_FORMAT, messageId_1));
verify(jmsQueueControl).removeMessages(format(JMS_MESSAGE_ID_FORMAT, messageId_2));
@@ -59,23 +61,25 @@ public void shouldRemoveDuplicateMessagesOnly() throws Exception {
verify(queueSender).send(message_2);
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldThrowCombinedManagementFunctionExceptionIfExceptionIsThrown() throws Exception {
final QueueSender queueSender = mock(QueueSender.class);
final JMSQueueControl jmsQueueControl = mock(JMSQueueControl.class);
final Exception exception = mock(Exception.class);
- final Map duplicateMessages = new HashMap<>();
+ final Map> duplicateMessages = new HashMap<>();
+ final Map messageCache = mock(Map.class);
final Message message = mock(Message.class);
final String messageId = randomUUID().toString();
- duplicateMessages.put(messageId, message);
+ duplicateMessages.put(messageId, singletonList(message));
when(jmsQueueControl.removeMessages(format(JMS_MESSAGE_ID_FORMAT, messageId))).thenThrow(exception);
try {
- duplicateMessageRemover.removeDuplicatesOnly(queueSender, jmsQueueControl, new DuplicateMessages(duplicateMessages, null));
+ duplicateMessageRemover.removeAndResendDuplicateMessages(queueSender, jmsQueueControl, new BrowsedMessages(duplicateMessages, messageCache));
fail();
} catch (CombinedManagementFunctionException e) {
assertThat(e.getMessage(), is("Failed to remove duplicates for JMSMessageID: " + messageId));
@@ -83,24 +87,26 @@ public void shouldThrowCombinedManagementFunctionExceptionIfExceptionIsThrown()
}
}
+ @SuppressWarnings("unchecked")
@Test
public void shouldThrowCombinedManagementFunctionExceptionIfJMSExceptionIsThrown() throws Exception {
final QueueSender queueSender = mock(QueueSender.class);
final JMSQueueControl jmsQueueControl = mock(JMSQueueControl.class);
final Exception jmsException = mock(JMSException.class);
- final Map duplicateMessages = new HashMap<>();
+ final Map> duplicateMessages = new HashMap<>();
+ final Map messageCache = mock(Map.class);
final Message message = mock(Message.class);
final String messageId = randomUUID().toString();
- duplicateMessages.put(messageId, message);
+ duplicateMessages.put(messageId, singletonList(message));
when(jmsQueueControl.removeMessages(format(JMS_MESSAGE_ID_FORMAT, messageId))).thenReturn(2);
doThrow(jmsException).when(queueSender).send(message);
try {
- duplicateMessageRemover.removeDuplicatesOnly(queueSender, jmsQueueControl, new DuplicateMessages(duplicateMessages, null));
+ duplicateMessageRemover.removeAndResendDuplicateMessages(queueSender, jmsQueueControl, new BrowsedMessages(duplicateMessages, messageCache));
fail();
} catch (CombinedManagementFunctionException e) {
assertThat(e.getMessage(), is("Failed to add message back onto queue, all messages have been deleted for JMSMessageID: " + messageId));
diff --git a/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/TopicDuplicateMessageFinderTest.java b/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/TopicDuplicateMessageFinderTest.java
new file mode 100644
index 0000000..1c6e580
--- /dev/null
+++ b/src/test/java/uk/gov/justice/artemis/manager/connector/combined/duplicate/TopicDuplicateMessageFinderTest.java
@@ -0,0 +1,147 @@
+package uk.gov.justice.artemis.manager.connector.combined.duplicate;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyEnumeration;
+import static java.util.Collections.enumeration;
+import static java.util.UUID.randomUUID;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;
+
+import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.QueueBrowser;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TopicDuplicateMessageFinderTest {
+
+ @Mock
+ private JmsMessageUtil jmsMessageUtil;
+
+ @InjectMocks
+ private TopicDuplicateMessageFinder topicDuplicateMessageFinder;
+
+ @Test
+ public void shouldReturnTopicDuplicateMessages() throws Exception {
+
+ final QueueBrowser queueBrowser = mock(QueueBrowser.class);
+ final Message message_1 = mock(Message.class);
+ final Message message_2 = mock(Message.class);
+ final Message message_3 = mock(Message.class);
+ final Message message_4 = mock(Message.class);
+ final Message message_5 = mock(Message.class);
+ final Message message_6 = mock(Message.class);
+ final Message message_7 = mock(Message.class);
+
+ final String jmsMessageId_1 = randomUUID().toString();
+ final String jmsMessageId_2 = randomUUID().toString();
+ final String jmsMessageId_3 = randomUUID().toString();
+
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_1)).thenReturn(jmsMessageId_1);
+ when(jmsMessageUtil.getConsumerFrom(message_1)).thenReturn("artemis-manager.command_handler");
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_2)).thenReturn(jmsMessageId_1);
+ when(jmsMessageUtil.getConsumerFrom(message_2)).thenReturn("artemis-manager.command_handler");
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_3)).thenReturn(jmsMessageId_1);
+ when(jmsMessageUtil.getConsumerFrom(message_3)).thenReturn("artemis-manager.command_handler");
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_4)).thenReturn(jmsMessageId_2);
+ when(jmsMessageUtil.getConsumerFrom(message_4)).thenReturn("artemis-manager.command_handler");
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_5)).thenReturn(jmsMessageId_3);
+ when(jmsMessageUtil.getConsumerFrom(message_5)).thenReturn("artemis-manager.event_listener");
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_6)).thenReturn(jmsMessageId_3);
+ when(jmsMessageUtil.getConsumerFrom(message_6)).thenReturn("artemis-manager.event_processor");
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_7)).thenReturn(jmsMessageId_3);
+ when(jmsMessageUtil.getConsumerFrom(message_7)).thenReturn("artemis-manager.event_indexer");
+
+ final Enumeration messageEnumeration = enumeration(asList(message_1, message_2, message_3, message_4, message_5, message_6, message_7));
+
+ when(queueBrowser.getEnumeration()).thenReturn(messageEnumeration);
+
+ final BrowsedMessages topicBrowsedMessages = topicDuplicateMessageFinder.findTopicDuplicateMessages(queueBrowser);
+
+ final Map> duplicateMessagesMap = topicBrowsedMessages.getDuplicateMessages();
+ assertThat(duplicateMessagesMap.size(), is(1));
+
+ final Iterator> messageIterator = duplicateMessagesMap.values().iterator();
+ final List messageList = messageIterator.next();
+ assertThat(messageList.size(), is(3));
+ assertThat(messageList, hasItems(message_5, message_6, message_7));
+
+ assertThat(topicBrowsedMessages.getMessageCache().size(), is(3));
+ }
+
+ @Test
+ public void shouldReturnEmptyDuplicatesIfNoDuplicates() throws Exception {
+
+ final QueueBrowser queueBrowser = mock(QueueBrowser.class);
+ final Message message_1 = mock(Message.class);
+ final Message message_2 = mock(Message.class);
+ final Message message_3 = mock(Message.class);
+
+ final String jmsMessageId_1 = randomUUID().toString();
+ final String jmsMessageId_2 = randomUUID().toString();
+ final String jmsMessageId_3 = randomUUID().toString();
+
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_1)).thenReturn(jmsMessageId_1);
+ when(jmsMessageUtil.getConsumerFrom(message_1)).thenReturn("artemis-manager.command_handler");
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_2)).thenReturn(jmsMessageId_2);
+ when(jmsMessageUtil.getConsumerFrom(message_2)).thenReturn("artemis-manager.command_handler");
+ when(jmsMessageUtil.getJmsMessageIdFrom(message_3)).thenReturn(jmsMessageId_3);
+ when(jmsMessageUtil.getConsumerFrom(message_3)).thenReturn("artemis-manager.command_handler");
+
+ final Enumeration messageEnumeration = enumeration(asList(message_1, message_2, message_3));
+
+ when(queueBrowser.getEnumeration()).thenReturn(messageEnumeration);
+
+ final BrowsedMessages browsedMessages = topicDuplicateMessageFinder.findTopicDuplicateMessages(queueBrowser);
+
+ assertThat(browsedMessages.getDuplicateMessages().isEmpty(), is(true));
+ assertThat(browsedMessages.getMessageCache().size(), is(3));
+ }
+
+ @Test
+ public void shouldReturnEmptyIfNoMessages() throws Exception {
+
+ final QueueBrowser queueBrowser = mock(QueueBrowser.class);
+ final Enumeration messageEnumeration = emptyEnumeration();
+
+ when(queueBrowser.getEnumeration()).thenReturn(messageEnumeration);
+
+ final BrowsedMessages browsedMessages = topicDuplicateMessageFinder.findTopicDuplicateMessages(queueBrowser);
+
+ assertThat(browsedMessages.getDuplicateMessages().isEmpty(), is(true));
+ assertThat(browsedMessages.getMessageCache().isEmpty(), is(true));
+ }
+
+ @Test
+ public void shouldThrowCombinedManagementFunctionExceptionIfJMSExceptionIsThrown() throws Exception {
+
+ final QueueBrowser queueBrowser = mock(QueueBrowser.class);
+ final JMSException jmsException = mock(JMSException.class);
+
+ when(queueBrowser.getEnumeration()).thenThrow(jmsException);
+
+ try {
+ topicDuplicateMessageFinder.findTopicDuplicateMessages(queueBrowser);
+ fail();
+ } catch (final CombinedManagementFunctionException exception) {
+ assertThat(exception.getMessage(), is("Failed to browse messages on queue."));
+ assertThat(exception.getCause(), is(jmsException));
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/uk/gov/justice/framework/tools/command/DeduplicateTopicMessagesTest.java b/src/test/java/uk/gov/justice/framework/tools/command/DeduplicateTopicMessagesTest.java
new file mode 100644
index 0000000..5dfcebe
--- /dev/null
+++ b/src/test/java/uk/gov/justice/framework/tools/command/DeduplicateTopicMessagesTest.java
@@ -0,0 +1,67 @@
+package uk.gov.justice.framework.tools.command;
+
+import static java.util.Collections.singletonList;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import uk.gov.justice.artemis.manager.connector.ArtemisConnector;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DeduplicateTopicMessagesTest {
+
+ private PrintStream originalerr;
+ private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+
+ @Mock
+ private ArtemisConnector artemisConnector;
+
+ @InjectMocks
+ private DeduplicateTopicMessages deduplicateTopicMessages;
+
+ @Before
+ public void setUpStreams() {
+ originalerr = System.err;
+ System.setErr(new PrintStream(outContent));
+ }
+
+ @After
+ public void cleanUpStreams() {
+ System.setOut(originalerr);
+ }
+
+ @Test
+ public void shouldInvokeConnector() throws Exception {
+ deduplicateTopicMessages.jmxURLs = singletonList("service:jmx:rmi:///jndi/rmi://localhost:3000/jmxrmi");
+ deduplicateTopicMessages.jmsURL = "tcp://localhost:61616";
+ deduplicateTopicMessages.brokerName = "brokerabc";
+
+ deduplicateTopicMessages.run(null);
+ verify(artemisConnector).deduplicateTopicMessages("DLQ");
+ }
+
+ @Test
+ public void shouldOutputException() {
+ deduplicateTopicMessages.jmxURLs = singletonList("service:jmx:rmi:///jndi/rmi://localhost:3000/jmxrmi");
+ deduplicateTopicMessages.jmsURL = "tcp://localhost:61616";
+ deduplicateTopicMessages.brokerName = "brokerabc";
+
+ when(artemisConnector.deduplicateTopicMessages("DLQ")).thenThrow(new RuntimeException("Test exception"));
+
+ deduplicateTopicMessages.run(null);
+
+ assertThat(outContent.toString(), containsString("java.lang.RuntimeException: Test exception"));
+ }
+}
\ No newline at end of file