Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deduplicate topic messages function #23

Merged
merged 1 commit into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [3.5.0] - 2019-06-20

### Added
- Command _`deduplicate topic messages`_ that finds all Topic duplicate messages and removes the duplicates from the DLQ then re-sends them to the DLQ, creating new JMSMessageIds for each message

## [3.4.0] - 2019-06-04

### Added
- Command _`remove all duplicates`_ that finds all not Topic duplicate messages and removes the duplicates from the DLQ
- Command _`remove all duplicates`_ that finds all non Topic duplicate messages and removes the duplicates from the DLQ

## [3.3.0] - 2019-05-09

Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,18 @@ Only messages that have the same JMSMessageId and Consumer (_AMQ_ORIG_QUEUE) wil

`java -jar artemis-manager.jar removeallduplicates @artemis.config`

## Deduplicate Topic Messages from DLQ

* Deduplicate topic messages

This checks the DLQ to find any duplicate Topic messages and will remove the duplicate messages from the DLQ. Then it re-sends the duplicates to the DLQ, so that a unique JMSMessageId is created for each message. The messages can then be replayed or removed individually.

Only messages that have the same JMSMessageId and different Consumer (_AMQ_ORIG_QUEUE) will be removed.

**Note: Browse uses JMS to connect to the Artemis broker.**

`java -jar artemis-manager.jar deduplicatetopicmessages @artemis.config`

## Reprocess Message from DLQ

* Reprocess message by id
Expand Down
11 changes: 6 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
<version>1.7.1</version>
</parent>

<artifactId>artemis-manager</artifactId>
<version>3.4.1-SNAPSHOT</version>

<properties>
<cpp.repo.name>artemis-manager</cpp.repo.name>
<framework-tools-command.version>2.0.0</framework-tools-command.version>
Expand All @@ -18,13 +21,11 @@
<slf4j-version>1.7.10</slf4j-version>
<throwing-function.version>1.3</throwing-function.version>
<jcommander.version>1.48</jcommander.version>
<utilities.version>1.16.2</utilities.version>
<utilities.version>1.19.0</utilities.version>
<opencsv.version>4.5</opencsv.version>
<common-bom.version>2.2.0</common-bom.version>
</properties>

<artifactId>artemis-manager</artifactId>
<version>3.4.1-SNAPSHOT</version>

<scm>
<connection>${cpp.scm.connection}</connection>
<developerConnection>${cpp.scm.developerConnection}</developerConnection>
Expand All @@ -36,7 +37,7 @@
<dependency>
<groupId>uk.gov.justice</groupId>
<artifactId>maven-common-bom</artifactId>
<version>1.28.0</version>
<version>${common-bom.version}</version>
<scope>import</scope>
<type>pom</type>
</dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public interface ArtemisConnector {

List<String> removeAllDuplicates(final String destinationName);

List<String> deduplicateTopicMessages(final String destinationName);

long reprocess(final String destinationName, final Iterator<String> msgIds);

int reprocessAll(final String destinationName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageRemover;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.JmsMessageUtil;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.TopicDuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
Expand Down Expand Up @@ -48,6 +49,7 @@ public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector {
private final JmsMessageUtil jmsMessageUtil = new JmsMessageUtil();
private final CombinedManagement combinedManagement = new CombinedManagement(
new DuplicateMessageFinder(jmsMessageUtil),
new TopicDuplicateMessageFinder(jmsMessageUtil),
new DuplicateMessageRemover(),
new AddedMessageFinder(jmsMessageUtil));

Expand Down Expand Up @@ -114,6 +116,17 @@ public List<String> removeAllDuplicates(final String destinationName) {
combinedManagement.removeAllDuplicates());
}

@Override
public List<String> deduplicateTopicMessages(final String destinationName) {
return combinedProcessor.process(
jmsFactory,
jmxServiceUrls,
jmxEnvironment,
objectNameBuilder,
destinationName,
combinedManagement.deduplicateTopicMessages());
}

@Override
public long reprocess(final String destinationName, final Iterator<String> msgIds) {
return jmxProcessor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public List<String> removeAllDuplicates(final String destinationName) {
throw new UnsupportedOperationException("removeAllDuplicates is not supported by the JmxArtemisConnector");
}

@Override
public List<String> deduplicateTopicMessages(final String destinationName) {
throw new UnsupportedOperationException("deduplicateTopicMessages is not supported by the JmxArtemisConnector");
}

@Override
public long reprocess(final String destinationName, final Iterator<String> msgIds) {
return jmxProcessor.processQueueControl(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,38 +1,56 @@
package uk.gov.justice.artemis.manager.connector.combined;

import uk.gov.justice.artemis.manager.connector.combined.duplicate.AddedMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.BrowsedMessages;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageRemover;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessages;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.TopicDuplicateMessageFinder;

import java.util.List;

public class CombinedManagement {

private final DuplicateMessageFinder duplicateMessageFinder;
private final TopicDuplicateMessageFinder topicDuplicateMessageFinder;
private final DuplicateMessageRemover duplicateMessageRemover;
private final AddedMessageFinder addedMessageFinder;

public CombinedManagement(final DuplicateMessageFinder duplicateMessageFinder,
final TopicDuplicateMessageFinder topicDuplicateMessageFinder,
final DuplicateMessageRemover duplicateMessageRemover,
final AddedMessageFinder addedMessageFinder) {
this.duplicateMessageFinder = duplicateMessageFinder;
this.topicDuplicateMessageFinder = topicDuplicateMessageFinder;
this.duplicateMessageRemover = duplicateMessageRemover;
this.addedMessageFinder = addedMessageFinder;
}

public CombinedManagementFunction<List<String>> removeAllDuplicates() {

return (queueBrowser, queueSender, jmsQueueControl) ->
{
final DuplicateMessages duplicateMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);
return (queueBrowser, queueSender, jmsQueueControl) -> {

duplicateMessageRemover.removeDuplicatesOnly(
final BrowsedMessages browsedMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);

duplicateMessageRemover.removeAndResendDuplicateMessages(
queueSender,
jmsQueueControl,
browsedMessages);

return addedMessageFinder.findAddedMessages(browsedMessages, queueBrowser);
};
}

public CombinedManagementFunction<List<String>> deduplicateTopicMessages() {
return (queueBrowser, queueSender, jmsQueueControl) -> {

final BrowsedMessages topicBrowsedMessages = topicDuplicateMessageFinder.findTopicDuplicateMessages(queueBrowser);

duplicateMessageRemover.removeAndResendDuplicateMessages(
queueSender,
jmsQueueControl,
duplicateMessages);
topicBrowsedMessages);

return addedMessageFinder.findAddedMessages(duplicateMessages, queueBrowser);
return addedMessageFinder.findAddedMessages(topicBrowsedMessages, queueBrowser);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ public AddedMessageFinder(final JmsMessageUtil jmsMessageUtil) {
}

@SuppressWarnings("unchecked")
public List<String> findAddedMessages(final DuplicateMessages duplicateMessages, final QueueBrowser queueBrowser) {
public List<String> findAddedMessages(final BrowsedMessages browsedMessages, final QueueBrowser queueBrowser) {

final Map<String, Message> messageCache = duplicateMessages.getMessageCache();
final Map<String, Message> messageCache = browsedMessages.getMessageCache();
final List<String> addedMessageIds = new ArrayList<>();

try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package uk.gov.justice.artemis.manager.connector.combined.duplicate;

import java.util.List;
import java.util.Map;

import javax.jms.Message;

public class DuplicateMessages {
public class BrowsedMessages {

private final Map<String, Message> duplicateMessages;
private final Map<String, List<Message>> duplicateMessages;
private final Map<String, Message> messageCache;

public DuplicateMessages(final Map<String, Message> duplicateMessages,
final Map<String, Message> messageCache) {
public BrowsedMessages(final Map<String, List<Message>> duplicateMessages, final Map<String, Message> messageCache) {

this.duplicateMessages = duplicateMessages;
this.messageCache = messageCache;
}

public Map<String, Message> getDuplicateMessages() {
public Map<String, List<Message>> getDuplicateMessages() {
return duplicateMessages;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.jms.JMSException;
Expand All @@ -19,9 +21,9 @@ public DuplicateMessageFinder(final JmsMessageUtil jmsMessageUtil) {
}

@SuppressWarnings("unchecked")
public DuplicateMessages findDuplicateMessages(final QueueBrowser queueBrowser) {
public BrowsedMessages findDuplicateMessages(final QueueBrowser queueBrowser) {

final Map<String, Message> duplicateMessages = new HashMap<>();
final Map<String, List<Message>> duplicateMessages = new HashMap<>();
final Map<String, Message> messageCache = new HashMap<>();

try {
Expand All @@ -40,15 +42,17 @@ public DuplicateMessages findDuplicateMessages(final QueueBrowser queueBrowser)
final boolean isNotDuplicateTopicMessage = duplicateConsumer.equals(originalConsumer);

if (isNotDuplicateTopicMessage) {
duplicateMessages.put(jmsMessageID, originalMessage);
duplicateMessages
.computeIfAbsent(jmsMessageID, key -> new ArrayList<>())
.add(originalMessage);
}

} else {
messageCache.put(jmsMessageID, message);
}
}

return new DuplicateMessages(duplicateMessages, messageCache);
return new BrowsedMessages(duplicateMessages, messageCache);

} catch (final JMSException exception) {
throw new CombinedManagementFunctionException("Failed to browse messages on queue.", exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;

import java.util.List;
import java.util.Map;

import javax.jms.JMSException;
Expand All @@ -14,11 +15,11 @@

public class DuplicateMessageRemover {

public void removeDuplicatesOnly(final QueueSender queueSender,
final JMSQueueControl jmsQueueControl,
final DuplicateMessages duplicateMessages) {
public void removeAndResendDuplicateMessages(final QueueSender queueSender,
final JMSQueueControl jmsQueueControl,
final BrowsedMessages browsedMessages) {

final Map<String, Message> duplicateMessagesMap = duplicateMessages.getDuplicateMessages();
final Map<String, List<Message>> duplicateMessagesMap = browsedMessages.getDuplicateMessages();

duplicateMessagesMap.keySet()
.forEach(jmsMessageId -> {
Expand All @@ -28,14 +29,26 @@ public void removeDuplicatesOnly(final QueueSender queueSender,
final String filter = format("JMSMessageID = 'ID:%s'", jmsMessageId);

jmsQueueControl.removeMessages(filter);
queueSender.send(duplicateMessagesMap.get(jmsMessageId));

} catch (final JMSException exception) {
throw new CombinedManagementFunctionException(format("Failed to add message back onto queue, all messages have been deleted for JMSMessageID: %s", jmsMessageId), exception);
final List<Message> messages = duplicateMessagesMap.get(jmsMessageId);

sendMessages(queueSender, jmsMessageId, messages);
} catch (final CombinedManagementFunctionException exception) {
throw exception;
} catch (final Exception exception) {
throw new CombinedManagementFunctionException(format("Failed to remove duplicates for JMSMessageID: %s", jmsMessageId), exception);
}

});
}

private void sendMessages(final QueueSender queueSender, final String jmsMessageId, final List<Message> messages) {
messages.forEach(message -> {
try {
queueSender.send(message);
} catch (final JMSException exception) {
throw new CombinedManagementFunctionException(format("Failed to add message back onto queue, all messages have been deleted for JMSMessageID: %s", jmsMessageId), exception);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package uk.gov.justice.artemis.manager.connector.combined.duplicate;

import uk.gov.justice.artemis.manager.connector.combined.CombinedManagementFunctionException;

import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.QueueBrowser;

public class TopicDuplicateMessageFinder {

private final JmsMessageUtil jmsMessageUtil;

public TopicDuplicateMessageFinder(final JmsMessageUtil jmsMessageUtil) {
this.jmsMessageUtil = jmsMessageUtil;
}

@SuppressWarnings("unchecked")
public BrowsedMessages findTopicDuplicateMessages(final QueueBrowser queueBrowser) {

final Map<String, List<Message>> duplicateMessages = new HashMap<>();
final Map<String, Message> messageCache = new HashMap<>();

try {
final Enumeration<Message> browserEnumeration = queueBrowser.getEnumeration();

while (browserEnumeration.hasMoreElements()) {

final Message message = browserEnumeration.nextElement();
final String jmsMessageID = jmsMessageUtil.getJmsMessageIdFrom(message);

if (messageCache.containsKey(jmsMessageID)) {

final Message originalMessage = messageCache.get(jmsMessageID);
final String originalConsumer = jmsMessageUtil.getConsumerFrom(originalMessage);
final String duplicateConsumer = jmsMessageUtil.getConsumerFrom(message);
final boolean isDuplicateTopicMessage = !duplicateConsumer.equals(originalConsumer);

if (isDuplicateTopicMessage) {

duplicateMessages
.computeIfAbsent(jmsMessageID, key -> createNewMessageListWith(originalMessage))
.add(message);
} else {
messageCache.put(jmsMessageID, message);
}

} else {
messageCache.put(jmsMessageID, message);
}
}

return new BrowsedMessages(duplicateMessages, messageCache);

} catch (final JMSException exception) {
throw new CombinedManagementFunctionException("Failed to browse messages on queue.", exception);
}
}

private List<Message> createNewMessageListWith(final Message originalMessage) {
final ArrayList<Message> messages = new ArrayList<>();
messages.add(originalMessage);
return messages;
}
}
Loading