Skip to content

Commit

Permalink
Add capability to determine topic names
Browse files Browse the repository at this point in the history
  • Loading branch information
Brett Delle Grazie committed Aug 15, 2018
1 parent 473ed73 commit 8dda4db
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 30 deletions.
11 changes: 7 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
<parent>
<groupId>uk.gov.justice</groupId>
<artifactId>maven-parent-pom</artifactId>
<version>1.6.0</version>
<version>1.7.1</version>
</parent>
<properties>
<framework-tools-command.version>2.0.0</framework-tools-command.version>
<artemis.version>1.5.5</artemis.version>
<artemis.home>${project.build.directory}/apache-artemis-${artemis.version}</artemis.home>
<artemis.version>1.5.6</artemis.version>
<slf4j-version>1.7.10</slf4j-version>
<cpp.repo.name>artemis-manager</cpp.repo.name>
</properties>
Expand All @@ -31,7 +31,7 @@
<dependency>
<groupId>uk.gov.justice</groupId>
<artifactId>maven-common-bom</artifactId>
<version>1.17.0</version>
<version>1.28.0</version>
<scope>import</scope>
<type>pom</type>
</dependency>
Expand All @@ -51,7 +51,10 @@
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-jms-client</artifactId>
<version>${artemis.version}</version>
</dependency>
<dependency>
<groupId>org.jboss.logging</groupId>
<artifactId>jboss-logging</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@ public interface ArtemisConnector {
long reprocess(final String host, final String port, final String brokerName, final String destinationName, final Iterator<String> msgIds) throws Exception;

String[] queueNames(final String host, final String port, final String brokerName) throws Exception;

String[] topicNames(final String host, final String port, final String brokerName) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
import static javax.management.remote.JMXConnectorFactory.connect;
import static org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration.getDefaultJmxDomain;

import uk.gov.justice.output.ConsolePrinter;
import uk.gov.justice.output.OutputPrinter;

import java.io.IOException;
import java.net.MalformedURLException;
import java.util.Iterator;
import java.util.List;

Expand All @@ -25,6 +21,9 @@
import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
import org.apache.activemq.artemis.api.jms.management.JMSServerControl;

import uk.gov.justice.output.ConsolePrinter;
import uk.gov.justice.output.OutputPrinter;

public class JmxArtemisConnector implements ArtemisConnector {

final protected OutputPrinter outputPrinter = new ConsolePrinter();
Expand Down Expand Up @@ -86,11 +85,18 @@ public long reprocess(final String host, final String port, final String brokerN
@Override
public String[] queueNames(final String host, final String port, final String brokerName) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
JMSServerControl serverControl = serverControlOf(connector, brokerName);
final JMSServerControl serverControl = serverControlOf(connector, brokerName);
return serverControl.getQueueNames();
}
}

public String[] topicNames(final String host, final String port, final String brokerName) throws Exception {
try (final JMXConnector connector = getJMXConnector(host, port)) {
final JMSServerControl serverControl = serverControlOf(connector, brokerName);
return serverControl.getTopicNames();
}
}

protected JMXConnector getJMXConnector(final String host, final String port) throws IOException {
return connect(new JMXServiceURL(format(JMX_URL, host, port)), emptyMap());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package uk.gov.justice.framework.tools.command;

import uk.gov.justice.framework.tools.common.command.ShellCommand;

public class ListTopics extends AbstractArtemisCommand implements ShellCommand {

@Override
public void run(String[] args) {

try {
final String[] queues = artemisConnector.topicNames(host, port, brokerName);
outputPrinter.writeStringArray(queues);
} catch (final Exception exception) {
outputPrinter.writeStackTrace(exception);
}
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package uk.gov.justice.artemis.manager.connector;

import static java.util.Arrays.asList;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThat;
import static uk.gov.justice.artemis.manager.util.JmsTestUtil.cleanQueue;
import static uk.gov.justice.artemis.manager.util.JmsTestUtil.cleanTopic;
import static uk.gov.justice.artemis.manager.util.JmsTestUtil.closeJmsConnection;
import static uk.gov.justice.artemis.manager.util.JmsTestUtil.openJmsConnection;
import static uk.gov.justice.artemis.manager.util.JmsTestUtil.putInQueue;
Expand Down Expand Up @@ -118,4 +119,15 @@ public void shouldReturnListOfQueues() throws Exception {
final String[] queueNames = jmxArtemisConnector.queueNames("localhost", "3000", "0.0.0.0");
assertThat(queueNames, arrayContainingInAnyOrder(new String[] {"DLQ", "ExpiryQueue"}));
}

@Test
public void shouldReturnListOfTopics() throws Exception {
final String topic = "testTopic";

cleanTopic(topic, "testSubscription");

final String[] topicNames = jmxArtemisConnector.topicNames("localhost", "3000", "0.0.0.0");

assertThat(topicNames, arrayContainingInAnyOrder(new String[] {"testTopic"}));
}
}
83 changes: 70 additions & 13 deletions src/test/java/uk/gov/justice/artemis/manager/util/JmsTestUtil.java
Original file line number Diff line number Diff line change
@@ -1,32 +1,36 @@
package uk.gov.justice.artemis.manager.util;

import static org.apache.activemq.artemis.api.jms.ActiveMQJMSClient.createQueue;
import static org.apache.activemq.artemis.api.jms.ActiveMQJMSClient.createTopic;

import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;

import org.apache.activemq.artemis.jms.client.ActiveMQQueueConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

public class JmsTestUtil {

private static final QueueConnectionFactory JMS_CF = new ActiveMQQueueConnectionFactory("tcp://localhost:61616");
private static QueueConnection JMS_CONNECTION;
private static QueueSession JMS_SESSION;
private static final ConnectionFactory JMS_CF = new ActiveMQConnectionFactory("tcp://localhost:61616?clientID=1234");
private static Connection JMS_CONNECTION;
private static Session JMS_SESSION;
private static Map<String, Queue> QUEUES = new HashMap<>();
private static Map<String, MessageConsumer> CONSUMERS = new HashMap<>();
private static Map<String, MessageProducer> PRODUCERS = new HashMap<>();
private static Map<String, Topic> TOPICS = new HashMap<>();
private static Map<String, TopicSubscriber> SUBSCRIBERS = new HashMap<>();
private static Map<String, MessageProducer> PUBLISHERS = new HashMap<>();

public static void putInQueue(final String queueName, final String msgText, final String... origAddress) throws JMSException {
TextMessage message = JMS_SESSION.createTextMessage(msgText);
Expand All @@ -48,11 +52,19 @@ public static void putInQueue(final String queueName, final InputStream messageI
producerOf(queueName).send(message);
}

public static void putOnTopic(final String topicName, final String msgText, final String... origAddress) throws JMSException {
TextMessage message = JMS_SESSION.createTextMessage(msgText);
if (origAddress.length > 0) {
message.setStringProperty("_AMQ_ORIG_ADDRESS", origAddress[0]);
}
publisherOf(topicName).send(message);
}

/**
* Returns the number of messages that were removed from the queue.
*
* @param queueName - the name of the queue that is to be cleaned
* @return the number of cleaned messagesß
* @return the number of cleaned messages
*/
public static int cleanQueue(final String queueName) throws JMSException {
JMS_CONNECTION.start();
Expand All @@ -66,6 +78,24 @@ public static int cleanQueue(final String queueName) throws JMSException {
return cleanedMessage;
}

/**
* Returns the number of messages that were received from the topic.
*
* @param topicName - the name of the topic that is to be cleaned
* @return the number of cleaned messages
*/
public static int cleanTopic(final String topicName, final String name) throws JMSException {
JMS_CONNECTION.start();
final TopicSubscriber subscriber = subscriberOf(topicName, name);

int cleanedMessage = 0;
while (subscriber.receiveNoWait() != null) {
cleanedMessage++;
}
JMS_CONNECTION.stop();
return cleanedMessage;
}

public static MessageConsumer consumerOf(final String queueName) throws JMSException {
return CONSUMERS.computeIfAbsent(queueName, name -> {
try {
Expand All @@ -86,23 +116,50 @@ private static MessageProducer producerOf(final String queueName) throws JMSExce
});
}

private static MessageProducer publisherOf(final String topicName) throws JMSException {
return PUBLISHERS.computeIfAbsent(topicName, name -> {
try {
return JMS_SESSION.createProducer(topicOf(name));
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
}

private static TopicSubscriber subscriberOf(final String topicName, final String subscriptionName) throws JMSException {
return SUBSCRIBERS.computeIfAbsent(topicName, name -> {
try {
return JMS_SESSION.createDurableSubscriber(topicOf(name), subscriptionName);
} catch (JMSException e) {
throw new RuntimeException(e);
}
});
}

public static void closeJmsConnection() throws JMSException {
SUBSCRIBERS.clear();
PUBLISHERS.clear();

CONSUMERS.clear();
PRODUCERS.clear();

TOPICS.clear();
QUEUES.clear();

JMS_CONNECTION.close();
}

public static void openJmsConnection() throws JMSException {
JMS_CONNECTION = JMS_CF.createQueueConnection();
JMS_SESSION = JMS_CONNECTION.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);

JMS_CF.createConnection();
JMS_CONNECTION = JMS_CF.createConnection();
JMS_SESSION = JMS_CONNECTION.createSession(false, Session.AUTO_ACKNOWLEDGE);
}

private static Queue queueOf(final String queueName) {
return QUEUES.computeIfAbsent(queueName, name -> createQueue(queueName));
}


private static Topic topicOf(final String topicName) {
return TOPICS.computeIfAbsent(topicName, name -> createTopic(topicName));
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
package uk.gov.justice.framework.tools.command;

import static org.junit.Assert.*;
import static org.mockito.Mockito.verify;

import uk.gov.justice.artemis.manager.connector.ArtemisConnector;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;

import uk.gov.justice.artemis.manager.connector.ArtemisConnector;


@RunWith(MockitoJUnitRunner.class)
public class BrowseTest {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package uk.gov.justice.framework.tools.command;

import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.verify;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import uk.gov.justice.artemis.manager.connector.ArtemisConnector;


@RunWith(MockitoJUnitRunner.class)
public class ListTopicsTest {

@Mock
ArtemisConnector artemisConnector;

@InjectMocks
ListTopics listTopicsCommand;

@Test
public void shouldInvokeConnector() throws Exception {
listTopicsCommand.brokerName = "brokerabc";
listTopicsCommand.host = "some.host";
listTopicsCommand.port = "1212";
when(artemisConnector.topicNames(anyString(), anyString(), anyString())).thenReturn(new String[] {"testTopic" });

listTopicsCommand.run(null);
verify(artemisConnector).topicNames("some.host", "1212", "brokerabc");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import uk.gov.justice.artemis.manager.connector.ArtemisConnector;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
Expand All @@ -18,7 +16,6 @@

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
Expand All @@ -27,6 +24,8 @@
import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner;

import uk.gov.justice.artemis.manager.connector.ArtemisConnector;


@RunWith(MockitoJUnitRunner.class)
public class RemoveTest {
Expand Down

0 comments on commit 8dda4db

Please sign in to comment.