Skip to content

Commit

Permalink
Merge ce5335b into 326924b
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Jun 20, 2019
2 parents 326924b + ce5335b commit 6c6948b
Show file tree
Hide file tree
Showing 20 changed files with 624 additions and 69 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [3.5.0] - 2019-06-20

### Added
- Command _`deduplicate topic messages`_ that finds all Topic duplicate messages and removes the duplicates from the DLQ then re-sends them to the DLQ, creating new JMSMessageIds for each message

## [3.4.0] - 2019-06-04

### Added
- Command _`remove all duplicates`_ that finds all not Topic duplicate messages and removes the duplicates from the DLQ
- Command _`remove all duplicates`_ that finds all non Topic duplicate messages and removes the duplicates from the DLQ

## [3.3.0] - 2019-05-09

Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ Only messages that have the same JMSMessageId and Consumer (_AMQ_ORIG_QUEUE) wil

`java -jar artemis-manager.jar removeallduplicates @artemis.config`

## Deduplicate Topic Messages from DLQ

* Deduplicate topic messages

This checks the DLQ to find any duplicate Topic messages and will remove the duplicate messages from the DLQ. Then it re-sends the duplicates to the DLQ, so that a unique JMSMessageId is created for each message. The messages can then be replayed or removed individually.

Only messages that have the same JMSMessageId and different Consumer (_AMQ_ORIG_QUEUE) will be removed.

**Note: Browse uses JMS to connect to the Artemis broker.**

`java -jar artemis-manager.jar deduplicatetopicmessages @artemis.config`

## Reprocess Message from DLQ

* Reprocess message by id
Expand Down
11 changes: 6 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
<version>1.7.1</version>
</parent>

<artifactId>artemis-manager</artifactId>
<version>3.4.1-SNAPSHOT</version>

<properties>
<cpp.repo.name>artemis-manager</cpp.repo.name>
<framework-tools-command.version>2.0.0</framework-tools-command.version>
Expand All @@ -18,13 +21,11 @@
<slf4j-version>1.7.10</slf4j-version>
<throwing-function.version>1.3</throwing-function.version>
<jcommander.version>1.48</jcommander.version>
<utilities.version>1.16.2</utilities.version>
<utilities.version>1.19.0</utilities.version>
<opencsv.version>4.5</opencsv.version>
<common-bom.version>2.2.0</common-bom.version>
</properties>

<artifactId>artemis-manager</artifactId>
<version>3.4.1-SNAPSHOT</version>

<scm>
<connection>${cpp.scm.connection}</connection>
<developerConnection>${cpp.scm.developerConnection}</developerConnection>
Expand All @@ -36,7 +37,7 @@
<dependency>
<groupId>uk.gov.justice</groupId>
<artifactId>maven-common-bom</artifactId>
<version>1.28.0</version>
<version>${common-bom.version}</version>
<scope>import</scope>
<type>pom</type>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public interface ArtemisConnector {

List<String> removeAllDuplicates(final String destinationName);

List<String> deduplicateTopicMessages(final String destinationName);

long reprocess(final String destinationName, final Iterator<String> msgIds);

int reprocessAll(final String destinationName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageRemover;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.JmsMessageUtil;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.TopicDuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector {
private final JmsMessageUtil jmsMessageUtil = new JmsMessageUtil();
private final CombinedManagement combinedManagement = new CombinedManagement(
new DuplicateMessageFinder(jmsMessageUtil),
new TopicDuplicateMessageFinder(jmsMessageUtil),
new DuplicateMessageRemover(),
new AddedMessageFinder(jmsMessageUtil));

Expand Down Expand Up @@ -114,6 +116,17 @@ public List<String> removeAllDuplicates(final String destinationName) {
combinedManagement.removeAllDuplicates());
}

@Override
public List<String> deduplicateTopicMessages(final String destinationName) {
return combinedProcessor.process(
jmsFactory,
jmxServiceUrls,
jmxEnvironment,
objectNameBuilder,
destinationName,
combinedManagement.deduplicateTopicMessages());
}

@Override
public long reprocess(final String destinationName, final Iterator<String> msgIds) {
return jmxProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public List<String> removeAllDuplicates(final String destinationName) {
throw new UnsupportedOperationException("removeAllDuplicates is not supported by the JmxArtemisConnector");
}

@Override
public List<String> deduplicateTopicMessages(final String destinationName) {
throw new UnsupportedOperationException("deduplicateTopicMessages is not supported by the JmxArtemisConnector");
}

@Override
public long reprocess(final String destinationName, final Iterator<String> msgIds) {
return jmxProcessor.processQueueControl(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,56 @@
package uk.gov.justice.artemis.manager.connector.combined;

import uk.gov.justice.artemis.manager.connector.combined.duplicate.AddedMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.BrowsedMessages;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageRemover;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessages;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.TopicDuplicateMessageFinder;

import java.util.List;

public class CombinedManagement {

private final DuplicateMessageFinder duplicateMessageFinder;
private final TopicDuplicateMessageFinder topicDuplicateMessageFinder;
private final DuplicateMessageRemover duplicateMessageRemover;
private final AddedMessageFinder addedMessageFinder;

public CombinedManagement(final DuplicateMessageFinder duplicateMessageFinder,
final TopicDuplicateMessageFinder topicDuplicateMessageFinder,
final DuplicateMessageRemover duplicateMessageRemover,
final AddedMessageFinder addedMessageFinder) {
this.duplicateMessageFinder = duplicateMessageFinder;
this.topicDuplicateMessageFinder = topicDuplicateMessageFinder;
this.duplicateMessageRemover = duplicateMessageRemover;
this.addedMessageFinder = addedMessageFinder;
}

public CombinedManagementFunction<List<String>> removeAllDuplicates() {

return (queueBrowser, queueSender, jmsQueueControl) ->
{
final DuplicateMessages duplicateMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
return (queueBrowser, queueSender, jmsQueueControl) -> {

duplicateMessageRemover.removeDuplicatesOnly(
final BrowsedMessages browsedMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);

duplicateMessageRemover.removeAndResendDuplicateMessages(
queueSender,
jmsQueueControl,
browsedMessages);

return addedMessageFinder.findAddedMessages(browsedMessages, queueBrowser);
};
}

public CombinedManagementFunction<List<String>> deduplicateTopicMessages() {
return (queueBrowser, queueSender, jmsQueueControl) -> {

final BrowsedMessages topicBrowsedMessages = topicDuplicateMessageFinder.findTopicDuplicateMessages(queueBrowser);

duplicateMessageRemover.removeAndResendDuplicateMessages(
queueSender,
jmsQueueControl,
duplicateMessages);
topicBrowsedMessages);

return addedMessageFinder.findAddedMessages(duplicateMessages, queueBrowser);
return addedMessageFinder.findAddedMessages(topicBrowsedMessages, queueBrowser);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public AddedMessageFinder(final JmsMessageUtil jmsMessageUtil) {
}

@SuppressWarnings("unchecked")
public List<String> findAddedMessages(final DuplicateMessages duplicateMessages, final QueueBrowser queueBrowser) {
public List<String> findAddedMessages(final BrowsedMessages browsedMessages, final QueueBrowser queueBrowser) {

final Map<String, Message> messageCache = duplicateMessages.getMessageCache();
final Map<String, Message> messageCache = browsedMessages.getMessageCache();
final List<String> addedMessageIds = new ArrayList<>();

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package uk.gov.justice.artemis.manager.connector.combined.duplicate;

import java.util.List;
import java.util.Map;

import javax.jms.Message;

public class DuplicateMessages {
public class BrowsedMessages {

private final Map<String, Message> duplicateMessages;
private final Map<String, List<Message>> duplicateMessages;
private final Map<String, Message> messageCache;

public DuplicateMessages(final Map<String, Message> duplicateMessages,
final Map<String, Message> messageCache) {
public BrowsedMessages(final Map<String, List<Message>> duplicateMessages, final Map<String, Message> messageCache) {

this.duplicateMessages = duplicateMessages;
this.messageCache = messageCache;
}

public Map<String, Message> getDuplicateMessages() {
public Map<String, List<Message>> getDuplicateMessages() {
return duplicateMessages;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.jms.JMSException;
Expand All @@ -19,9 +21,9 @@ public DuplicateMessageFinder(final JmsMessageUtil jmsMessageUtil) {
}

@SuppressWarnings("unchecked")
public DuplicateMessages findDuplicateMessages(final QueueBrowser queueBrowser) {
public BrowsedMessages findDuplicateMessages(final QueueBrowser queueBrowser) {

final Map<String, Message> duplicateMessages = new HashMap<>();
final Map<String, List<Message>> duplicateMessages = new HashMap<>();
final Map<String, Message> messageCache = new HashMap<>();

try {
Expand All @@ -40,15 +42,17 @@ public DuplicateMessages findDuplicateMessages(final QueueBrowser queueBrowser)
final boolean isNotDuplicateTopicMessage = duplicateConsumer.equals(originalConsumer);

if (isNotDuplicateTopicMessage) {
duplicateMessages.put(jmsMessageID, originalMessage);
duplicateMessages
.computeIfAbsent(jmsMessageID, key -> new ArrayList<>())
.add(originalMessage);
}

} else {
messageCache.put(jmsMessageID, message);
}
}

return new DuplicateMessages(duplicateMessages, messageCache);
return new BrowsedMessages(duplicateMessages, messageCache);

} catch (final JMSException exception) {
throw new CombinedManagementFunctionException("Failed to browse messages on queue.", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;

import java.util.List;
import java.util.Map;

import javax.jms.JMSException;
Expand All @@ -14,11 +15,11 @@

public class DuplicateMessageRemover {

public void removeDuplicatesOnly(final QueueSender queueSender,
final JMSQueueControl jmsQueueControl,
final DuplicateMessages duplicateMessages) {
public void removeAndResendDuplicateMessages(final QueueSender queueSender,
final JMSQueueControl jmsQueueControl,
final BrowsedMessages browsedMessages) {

final Map<String, Message> duplicateMessagesMap = duplicateMessages.getDuplicateMessages();
final Map<String, List<Message>> duplicateMessagesMap = browsedMessages.getDuplicateMessages();

duplicateMessagesMap.keySet()
.forEach(jmsMessageId -> {
Expand All @@ -28,14 +29,26 @@ public void removeDuplicatesOnly(final QueueSender queueSender,
final String filter = format("JMSMessageID = 'ID:%s'", jmsMessageId);

jmsQueueControl.removeMessages(filter);
queueSender.send(duplicateMessagesMap.get(jmsMessageId));

} catch (final JMSException exception) {
throw new CombinedManagementFunctionException(format("Failed to add message back onto queue, all messages have been deleted for JMSMessageID: %s", jmsMessageId), exception);
final List<Message> messages = duplicateMessagesMap.get(jmsMessageId);

sendMessages(queueSender, jmsMessageId, messages);
} catch (final CombinedManagementFunctionException exception) {
throw exception;
} catch (final Exception exception) {
throw new CombinedManagementFunctionException(format("Failed to remove duplicates for JMSMessageID: %s", jmsMessageId), exception);
}

});
}

private void sendMessages(final QueueSender queueSender, final String jmsMessageId, final List<Message> messages) {
messages.forEach(message -> {
try {
queueSender.send(message);
} catch (final JMSException exception) {
throw new CombinedManagementFunctionException(format("Failed to add message back onto queue, all messages have been deleted for JMSMessageID: %s", jmsMessageId), exception);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package uk.gov.justice.artemis.manager.connector.combined.duplicate;

import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueBrowser;

public class TopicDuplicateMessageFinder {

private final JmsMessageUtil jmsMessageUtil;

public TopicDuplicateMessageFinder(final JmsMessageUtil jmsMessageUtil) {
this.jmsMessageUtil = jmsMessageUtil;
}

@SuppressWarnings("unchecked")
public BrowsedMessages findTopicDuplicateMessages(final QueueBrowser queueBrowser) {

final Map<String, List<Message>> duplicateMessages = new HashMap<>();
final Map<String, Message> messageCache = new HashMap<>();

try {
final Enumeration<Message> browserEnumeration = queueBrowser.getEnumeration();

while (browserEnumeration.hasMoreElements()) {

final Message message = browserEnumeration.nextElement();
final String jmsMessageID = jmsMessageUtil.getJmsMessageIdFrom(message);

if (messageCache.containsKey(jmsMessageID)) {

final Message originalMessage = messageCache.get(jmsMessageID);
final String originalConsumer = jmsMessageUtil.getConsumerFrom(originalMessage);
final String duplicateConsumer = jmsMessageUtil.getConsumerFrom(message);
final boolean isDuplicateTopicMessage = !duplicateConsumer.equals(originalConsumer);

if (isDuplicateTopicMessage) {

duplicateMessages
.computeIfAbsent(jmsMessageID, key -> createNewMessageListWith(originalMessage))
.add(message);
} else {
messageCache.put(jmsMessageID, message);
}

} else {
messageCache.put(jmsMessageID, message);
}
}

return new BrowsedMessages(duplicateMessages, messageCache);

} catch (final JMSException exception) {
throw new CombinedManagementFunctionException("Failed to browse messages on queue.", exception);
}
}

private List<Message> createNewMessageListWith(final Message originalMessage) {
final ArrayList<Message> messages = new ArrayList<>();
messages.add(originalMessage);
return messages;
}
}
Loading

0 comments on commit 6c6948b

Please sign in to comment.