Skip to content

Commit

Permalink
Merge a690adc into e3f95c1
Browse files Browse the repository at this point in the history
  • Loading branch information
mapingo committed Jun 4, 2019
2 parents e3f95c1 + a690adc commit af01579
Show file tree
Hide file tree
Showing 38 changed files with 1,302 additions and 146 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ on [Keep a CHANGELOG](http://keepachangelog.com/). This project adheres to

## [Unreleased]

## [3.4.0] - 2019-06-04

### Added
- Command _`remove all duplicates`_ that finds all not Topic duplicate messages and removes the duplicates from the DLQ

## [3.3.0] - 2019-05-09

### Added
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,18 @@ In the examples below it is assumed a configuration file of artemis.config has b

`echo msgId1 msgId2 | java -jar artemis-manager.jar remove`

## Remove All Duplicate Messages from DLQ

* Remove all duplicate messages

This checks the DLQ to find any duplicate messages and will remove the duplicate messages from the DLQ leaving a single message on the DLQ.

Only messages that have the same JMSMessageId and Consumer (_AMQ_ORIG_QUEUE) will be removed. Thus any duplicate Topic messages which should go to different consumers will not be deleted.

**Note: Browse uses JMS to connect to the Artemis broker.**

`java -jar artemis-manager.jar removeallduplicates @artemis.config`

## Reprocess Message from DLQ

* Reprocess message by id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ public interface ArtemisConnector {

long remove(final String destinationName, final Iterator<String> msgIds);

List<String> removeAllDuplicates(final String destinationName);

long reprocess(final String destinationName, final Iterator<String> msgIds);

int reprocessAll(final String destinationName);
Expand All @@ -25,10 +27,10 @@ public interface ArtemisConnector {
Map<String, Long> topicMessageCount(final Collection<String> queueNames);

void setParameters(final List<String> jmxUrls,
final String brokerName,
final String jmxUsername,
final String jmxPassword,
final String jmsUrl,
final String jmsUsername,
final String jmsPassword) throws MalformedURLException;
final String brokerName,
final String jmxUsername,
final String jmxPassword,
final String jmsUrl,
final String jmsUsername,
final String jmsPassword) throws MalformedURLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
import static java.util.stream.Collectors.toList;
import static pl.touk.throwing.ThrowingFunction.unchecked;

import uk.gov.justice.artemis.manager.connector.combined.CombinedManagement;
import uk.gov.justice.artemis.manager.connector.combined.CombinedProcessor;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.AddedMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageRemover;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.JmsMessageUtil;
import uk.gov.justice.artemis.manager.connector.jms.JmsManagement;
import uk.gov.justice.artemis.manager.connector.jms.JmsProcessor;
import uk.gov.justice.artemis.manager.connector.jmx.JmxManagement;
Expand Down Expand Up @@ -38,27 +44,33 @@ public class CombinedJmsAndJmxArtemisConnector implements ArtemisConnector {
private final JmxManagement jmxManagement = new JmxManagement(new ConsolePrinter());
private final JmsProcessor jmsProcessor = new JmsProcessor();
private final JmsManagement jmsManagement = new JmsManagement();
private final CombinedProcessor combinedProcessor = new CombinedProcessor();
private final JmsMessageUtil jmsMessageUtil = new JmsMessageUtil();
private final CombinedManagement combinedManagement = new CombinedManagement(
new DuplicateMessageFinder(jmsMessageUtil),
new DuplicateMessageRemover(),
new AddedMessageFinder(jmsMessageUtil));

private List<JMXServiceURL> jmxServiceUrls;
private Map<String,String[]> jmxEnvironment;
private Map<String, String[]> jmxEnvironment;
private ObjectNameBuilder objectNameBuilder;

private ActiveMQJMSConnectionFactory jmsFactory;

@Override
public void setParameters(final List<String> jmxUrls,
final String brokerName,
final String jmxUsername,
final String jmxPassword,
final String jmsUrl,
final String jmsUsername,
final String jmsPassword) {
final String brokerName,
final String jmxUsername,
final String jmxPassword,
final String jmsUrl,
final String jmsUsername,
final String jmsPassword) {
this.jmxServiceUrls = jmxProcessor.processJmxUrls(jmxUrls);
this.objectNameBuilder = jmxProcessor.getObjectNameBuilder(brokerName);

if ((jmxUsername != null) && (jmxPassword != null)) {
this.jmxEnvironment = new HashMap<>();
this.jmxEnvironment.put(JMXConnector.CREDENTIALS, new String[]{ jmxUsername, jmxPassword });
this.jmxEnvironment.put(JMXConnector.CREDENTIALS, new String[]{jmxUsername, jmxPassword});
} else {
this.jmxEnvironment = emptyMap();
}
Expand All @@ -72,80 +84,113 @@ public void setParameters(final List<String> jmxUrls,

@Override
public List<MessageData> messagesOf(final String destinationName) {
return jmsProcessor.process(this.jmsFactory, destinationName, jmsManagement.browseMessages());
return jmsProcessor.process(
jmsFactory,
destinationName,
jmsManagement.browseMessages());
}

@Override
public long remove(final String destinationName, final Iterator<String> msgIds) {
return jmxProcessor.processQueueControl(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
return jmxProcessor
.processQueueControl(
jmxServiceUrls,
jmxEnvironment,
objectNameBuilder,
destinationName,
jmxManagement.removeMessages(msgIds))
.mapToLong(Long::longValue)
.sum();
}

@Override
public List<String> removeAllDuplicates(final String destinationName) {
return combinedProcessor.process(
jmsFactory,
jmxServiceUrls,
jmxEnvironment,
objectNameBuilder,
destinationName,
jmxManagement.removeMessages(msgIds)).mapToLong(Long::longValue).sum();
combinedManagement.removeAllDuplicates());
}

@Override
public long reprocess(final String destinationName, final Iterator<String> msgIds) {
return jmxProcessor.processQueueControl(
this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
destinationName,
jmxManagement.reprocessMessages(msgIds)).mapToLong(Long::longValue).sum();
return jmxProcessor
.processQueueControl(
jmxServiceUrls,
jmxEnvironment,
objectNameBuilder,
destinationName,
jmxManagement.reprocessMessages(msgIds))
.mapToLong(Long::longValue)
.sum();
}

@Override
public int reprocessAll(final String destinationName) {
return jmxProcessor.processQueueControl(
this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
destinationName,
jmxManagement.reprocessAllMessages()).mapToInt(Integer::intValue).sum();
return jmxProcessor
.processQueueControl(
this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
destinationName,
jmxManagement.reprocessAllMessages())
.mapToInt(Integer::intValue)
.sum();
}

@Override
public List<String> queueNames() {
return jmxProcessor.processServerControl(
this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
JMSServerControl::getQueueNames).flatMap(
Arrays::stream).sorted().
distinct().collect(toList());
return jmxProcessor
.processServerControl(
this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
JMSServerControl::getQueueNames)
.flatMap(Arrays::stream)
.sorted()
.distinct()
.collect(toList());
}

@Override
public Map<String, Long> queueMessageCount(final Collection<String> queueNames) {
return jmxProcessor.processQueues(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
queueNames,
unchecked(DestinationControl::getMessageCount)).flatMap(
m -> m.entrySet().stream()).collect(
groupingBy(Entry::getKey,
summingLong(Entry::getValue)));
return jmxProcessor
.processQueues(
jmxServiceUrls,
jmxEnvironment,
objectNameBuilder,
queueNames,
unchecked(DestinationControl::getMessageCount))
.flatMap(m -> m.entrySet().stream())
.collect(groupingBy(Entry::getKey, summingLong(Entry::getValue)));
}

@Override
public List<String> topicNames() {
return jmxProcessor.processServerControl(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
JMSServerControl::getTopicNames).flatMap(
Arrays::stream).sorted().
distinct().collect(toList());
return jmxProcessor
.processServerControl(
jmxServiceUrls,
jmxEnvironment,
objectNameBuilder,
JMSServerControl::getTopicNames)
.flatMap(Arrays::stream)
.sorted()
.distinct()
.collect(toList());
}

@Override
public Map<String, Long> topicMessageCount(final Collection<String> topicNames) {
return jmxProcessor.processTopics(this.jmxServiceUrls,
this.jmxEnvironment,
this.objectNameBuilder,
topicNames,
unchecked(DestinationControl::getMessageCount)).flatMap(
m -> m.entrySet().stream()).collect(
groupingBy(Entry::getKey,
summingLong(Entry::getValue)));
return jmxProcessor
.processTopics(
jmxServiceUrls,
jmxEnvironment,
objectNameBuilder,
topicNames,
unchecked(DestinationControl::getMessageCount))
.flatMap(m -> m.entrySet().stream())
.collect(groupingBy(Entry::getKey, summingLong(Entry::getValue)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ public long remove(final String destinationName, final Iterator<String> msgIds)
jmxManagement.removeMessages(msgIds)).mapToLong(Long::longValue).sum();
}

@Override
public List<String> removeAllDuplicates(final String destinationName) {
throw new UnsupportedOperationException("removeAllDuplicates is not supported by the JmxArtemisConnector");
}

@Override
public long reprocess(final String destinationName, final Iterator<String> msgIds) {
return jmxProcessor.processQueueControl(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package uk.gov.justice.artemis.manager.connector.combined;

import uk.gov.justice.artemis.manager.connector.combined.duplicate.AddedMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageFinder;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessageRemover;
import uk.gov.justice.artemis.manager.connector.combined.duplicate.DuplicateMessages;

import java.util.List;

public class CombinedManagement {

private final DuplicateMessageFinder duplicateMessageFinder;
private final DuplicateMessageRemover duplicateMessageRemover;
private final AddedMessageFinder addedMessageFinder;

public CombinedManagement(final DuplicateMessageFinder duplicateMessageFinder,
final DuplicateMessageRemover duplicateMessageRemover,
final AddedMessageFinder addedMessageFinder) {
this.duplicateMessageFinder = duplicateMessageFinder;
this.duplicateMessageRemover = duplicateMessageRemover;
this.addedMessageFinder = addedMessageFinder;
}

public CombinedManagementFunction<List<String>> removeAllDuplicates() {

return (queueBrowser, queueSender, jmsQueueControl) ->
{
final DuplicateMessages duplicateMessages = duplicateMessageFinder.findDuplicateMessages(queueBrowser);

duplicateMessageRemover.removeDuplicatesOnly(
queueSender,
jmsQueueControl,
duplicateMessages);

return addedMessageFinder.findAddedMessages(duplicateMessages, queueBrowser);
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package uk.gov.justice.artemis.manager.connector.combined;

import javax.jms.QueueBrowser;
import javax.jms.QueueSender;

import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;

public interface CombinedManagementFunction<T> {

T apply(final QueueBrowser queueBrowser, final QueueSender queueSender, final JMSQueueControl jmsQueueControl);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package uk.gov.justice.artemis.manager.connector.combined;

public class CombinedManagementFunctionException extends RuntimeException {

public CombinedManagementFunctionException(final String message, final Throwable cause) {
super(message, cause);
}
}
Loading

0 comments on commit af01579

Please sign in to comment.