Skip to content
This repository has been archived by the owner on May 26, 2020. It is now read-only.

Commit

Permalink
SCTP-279: Enabling ability to browse and delete messages across multi…
Browse files Browse the repository at this point in the history
…ple artemis brokers. System property ARTEMIS_URI takes a comma separated list of broker host values.
  • Loading branch information
Mahesh Subramanian committed May 14, 2019
1 parent 371cdee commit 0a282fd
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 99 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package uk.gov.justice.services.test.utils.core.messaging;

import static java.util.stream.Collectors.toCollection;
import static uk.gov.justice.services.test.utils.core.messaging.QueueUriProvider.artemisQueueUri;

import java.io.StringReader;
Expand All @@ -18,116 +19,123 @@
import javax.json.JsonObject;
import javax.json.JsonReader;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.activemq.artemis.jms.client.ActiveMQQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;


/**
* Utility class that allows to browse and clean
* messages in dead letter queue
*
* Usage: DeadLetterQueueBrowser dlqBrowser = new DeadLetterQueueBrowser();
* dlqBrowser.browse() will return a list of {@link String} in the dlq
* dlqBrowser.removeMessages() will clean dlq
* Utility class that allows to browse and clean messages in dead letter queue
* <p>
* Usage: DeadLetterQueueBrowser dlqBrowser = new DeadLetterQueueBrowser(); dlqBrowser.browse() will
* return a list of {@link String} in the dlq dlqBrowser.removeMessages() will clean dlq
* dlqBrowser.close() will release resources
*
* Note:It has been observed there is sometimes a delay by the time
* the message lands in dlq. Setting a delay of few milliseconds
* generally resolves this.
* <p>
* Note:It has been observed there is sometimes a delay by the time the message lands in dlq.
* Setting a delay of few milliseconds generally resolves this.
*
* @author gopal
*
*/
public class DeadLetterQueueBrowser implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(DeadLetterQueueBrowser.class);

private static final String DLQ_QUEUE_URI = artemisQueueUri();
private static final String dlqName = "DLQ";
private static final List<String> DLQ_QUEUE_URIS = artemisQueueUri();
private static final String DLQ_NAME = "DLQ";

private Session session;
private javax.jms.Queue dlqQueue;
private final JmsSessionFactory jmsSessionFactory;
private List<Session> sessions;
private List<JmsSessionFactory> jmsSessionFactories;
private Queue dlqQueue;
private final ConsumerClient consumerClient;

public DeadLetterQueueBrowser() {
jmsSessionFactory = new JmsSessionFactory();
consumerClient = new ConsumerClient();
jmsSessionFactories = Lists.newArrayList();
sessions = Lists.newArrayList();
initialise();
}

@VisibleForTesting
DeadLetterQueueBrowser(final Queue dlqQueue, final Session session,
final JmsSessionFactory jmsSessionFactory, final ConsumerClient consumerClient) {
DeadLetterQueueBrowser(final Queue dlqQueue, final List<Session> sessions,
final List<JmsSessionFactory> jmsSessionFactories, final ConsumerClient consumerClient) {
super();
this.session = session;
this.sessions = sessions;
this.jmsSessionFactories = jmsSessionFactories;
this.dlqQueue = dlqQueue;
this.jmsSessionFactory = jmsSessionFactory;
this.consumerClient = consumerClient;
}

private void initialise() {
try {
LOGGER.info("Artemis URI: {}", DLQ_QUEUE_URI);
session = jmsSessionFactory.session(DLQ_QUEUE_URI);
dlqQueue = new ActiveMQQueue(dlqName);
DLQ_QUEUE_URIS.forEach(u -> {
LOGGER.info("Setting up session for uri: " + u);
final JmsSessionFactory jmsSessionFactory = new JmsSessionFactory();
sessions.add(jmsSessionFactory.session(u));
jmsSessionFactories.add(jmsSessionFactory);
});
dlqQueue = new ActiveMQQueue(DLQ_NAME);
} catch (Exception e) {
close();
final String message = "Failed to start dlq message consumer for " + "queue: '" + dlqName + "', "
+ "queueUri: '" + DLQ_QUEUE_URI + " ";
final String message = "Failed to start dlq message consumer for " + "queue: '" + DLQ_NAME + "', "
+ "queueUris: '" + DLQ_QUEUE_URIS + " ";
LOGGER.error("Fatal error initialising Artemis {} ", message);
throw new MessageConsumerException(message, e);
}
}

/**
* allows browsing messages in dlq
*
* @return list of {@link JsonObject}
*/
public List<JsonObject> browseAsJson() {
return browse().stream().map(s->convert(s)).collect(Collectors.toCollection(ArrayList<JsonObject>::new));
return browse().stream().map(this::convert).collect(toCollection(ArrayList::new));
}

/**
* allows browsing messages in dlq
*
* @return list of {@link String}
*/
public List<String> browse() {
try (QueueBrowser dlqBrowser = session.createBrowser(dlqQueue);) {
final List<String> messages = new ArrayList<>();
final Enumeration enumeration = dlqBrowser.getEnumeration();

while (enumeration.hasMoreElements()) {
String message = ((TextMessage) enumeration.nextElement()).getText();
messages.add(message);
final List<String> messages = new ArrayList<>();
for (Session session : sessions) {
try (QueueBrowser dlqBrowser = session.createBrowser(dlqQueue);) {
final Enumeration enumeration = dlqBrowser.getEnumeration();

while (enumeration.hasMoreElements()) {
final String message = ((TextMessage) enumeration.nextElement()).getText();
messages.add(message);
}


} catch (JMSException e) {
final String message = "Fatal error getting messages from DLQ";
LOGGER.error(message);
throw new MessageConsumerException(message, e);
}
return messages;
} catch (JMSException e) {
String message = "Fatal error getting messges from DLQ";
LOGGER.error(message);
throw new MessageConsumerException(message, e);
}
LOGGER.info("Total number of messages across {} brokers is {}", sessions.size(), messages.size());
return messages;
}

/**
* removes messages from dlq
*/
public void removeMessages() {
try (MessageConsumer messageConsumer = session.createConsumer(dlqQueue)) {
cleanQueue(messageConsumer);
} catch (JMSException e) {
String message = "Fatal error cleaning messges from DLQ";
LOGGER.error(message);
throw new MessageConsumerException(message, e);
for (Session session : sessions) {
try (MessageConsumer messageConsumer = session.createConsumer(dlqQueue)) {
consumerClient.cleanQueue(messageConsumer);
} catch (JMSException e) {
final String message = "Fatal error cleaning messges from DLQ";
LOGGER.error(message);
throw new MessageConsumerException(message, e);
}
}
}

private void cleanQueue(MessageConsumer messageConsumer) {
consumerClient.cleanQueue(messageConsumer);
}

private JsonObject convert(final String source) {
try (final JsonReader reader = Json.createReader(new StringReader(source))) {
return reader.readObject();
Expand All @@ -138,7 +146,8 @@ private JsonObject convert(final String source) {
* clean up resources
*/
public void close() {
jmsSessionFactory.close();
for (JmsSessionFactory jmsSessionFactory : jmsSessionFactories) {
jmsSessionFactory.close();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ private void doClose(final AutoCloseable closeable) {
try {
closeable.close();
} catch (final Exception ignored) {
// do nothing
}

}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,41 @@
package uk.gov.justice.services.test.utils.core.messaging;

import static com.google.common.base.Splitter.on;
import static com.google.common.collect.Lists.newArrayList;
import static java.lang.String.format;
import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getArtemisHost;
import static uk.gov.justice.services.test.utils.common.host.TestHostProvider.getHost;

import java.util.List;

import org.apache.commons.lang3.StringUtils;

public class QueueUriProvider {

private static final String BASE_URI_PATTERN = "tcp://%s:61616";

/**
* Takes a comma separated list of broker hosts
*/
private static final String ARTEMIS_URI = "ARTEMIS_URI";

public String getQueueUri() {
return format(BASE_URI_PATTERN, getHost());
}

public String getArtemisQueueUri() {
public List<String> getArtemisQueueUri() {
final String artemisUri = System.getProperty(ARTEMIS_URI);
if (StringUtils.isNotBlank(artemisUri)) {
return artemisUri;
return on(",").splitToList(artemisUri);
}
return format(BASE_URI_PATTERN, getArtemisHost());
return newArrayList(format(BASE_URI_PATTERN, getArtemisHost()));
}

public static String queueUri() {
return new QueueUriProvider().getQueueUri();
}

public static String artemisQueueUri(){
public static List<String> artemisQueueUri() {
return new QueueUriProvider().getArtemisQueueUri();
}
}
Loading

0 comments on commit 0a282fd

Please sign in to comment.