Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,108 +342,108 @@ public Object execute(ActionContext context) throws Exception {
}

private int doTransfer(ActionContext context) throws Exception {
ConnectionFactoryClosable sourceConnectionFactory = createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, sourcePassword, sourceClientID);
Connection sourceConnection = sourceConnectionFactory.createConnection();

Session sourceSession = sourceConnection.createSession(Session.SESSION_TRANSACTED);
Destination sourceDestination = createDestination("source", sourceSession, sourceQueue, sourceTopic);
MessageConsumer consumer = null;
if (sourceDestination instanceof Queue) {
if (filter != null) {
consumer = sourceSession.createConsumer(sourceDestination, filter);
} else {
consumer = sourceSession.createConsumer(sourceDestination);
}
} else if (sourceDestination instanceof Topic topic) {
try (ConnectionFactoryClosable sourceConnectionFactory = createConnectionFactory("source", sourceProtocol, sourceURL, sourceUser, sourcePassword, sourceClientID);
Connection sourceConnection = sourceConnectionFactory.createConnection();
Session sourceSession = sourceConnection.createSession(Session.SESSION_TRANSACTED)) {

if (durableConsumer != null) {
if (filter != null) {
consumer = sourceSession.createDurableConsumer(topic, durableConsumer);
} else {
consumer = sourceSession.createDurableConsumer(topic, durableConsumer, filter, noLocal);
}
} else if (sharedDurableSubscription != null) {
Destination sourceDestination = createDestination("source", sourceSession, sourceQueue, sourceTopic);
MessageConsumer consumer = null;
if (sourceDestination instanceof Queue) {
if (filter != null) {
consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription, filter);
consumer = sourceSession.createConsumer(sourceDestination, filter);
} else {
consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription);
consumer = sourceSession.createConsumer(sourceDestination);
}
} else if (sharedSubscription != null) {
if (filter != null) {
consumer = sourceSession.createSharedConsumer(topic, sharedSubscription, filter);
} else if (sourceDestination instanceof Topic topic) {

if (durableConsumer != null) {
if (filter != null) {
consumer = sourceSession.createDurableConsumer(topic, durableConsumer);
} else {
consumer = sourceSession.createDurableConsumer(topic, durableConsumer, filter, noLocal);
}
} else if (sharedDurableSubscription != null) {
if (filter != null) {
consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription, filter);
} else {
consumer = sourceSession.createSharedDurableConsumer(topic, sharedDurableSubscription);
}
} else if (sharedSubscription != null) {
if (filter != null) {
consumer = sourceSession.createSharedConsumer(topic, sharedSubscription, filter);
} else {
consumer = sourceSession.createSharedConsumer(topic, sharedSubscription);
}
} else {
consumer = sourceSession.createSharedConsumer(topic, sharedSubscription);
throw new IllegalArgumentException("you must specify either --durable-consumer, --shared-durable-subscription or --shared-subscription with a JMS topic");
}
} else {
throw new IllegalArgumentException("you must specify either --durable-consumer, --shared-durable-subscription or --shared-subscription with a JMS topic");
}
}

ConnectionFactoryClosable targetConnectionFactory = createConnectionFactory("target", targetProtocol, targetURL, targetUser, targetPassword, null);
Connection targetConnection = targetConnectionFactory.createConnection();
Session targetSession = targetConnection.createSession(Session.SESSION_TRANSACTED);
Destination targetDestination = createDestination("target", targetSession, targetQueue, targetTopic);
MessageProducer producer = targetSession.createProducer(targetDestination);

if (sourceURL.equals(targetURL) && sourceDestination.equals(targetDestination)) {
context.out.println("You cannot transfer between " + sourceURL + "/" + sourceDestination + " and " + targetURL + "/" + targetDestination + ".\n" + "That would create an infinite recursion.");
throw new IllegalArgumentException("cannot use " + sourceDestination + " == " + targetDestination);
}

sourceConnection.start();
int pending = 0, total = 0;
while (total < messageCount) {

Message receivedMessage;
if (receiveTimeout < 0) {
receivedMessage = consumer.receive();
} else if (receiveTimeout == 0) {
receivedMessage = consumer.receiveNoWait();
} else {
receivedMessage = consumer.receive(receiveTimeout);
}

if (receivedMessage == null) {
if (isVerbose()) {
context.out.println("could not receive any more messages");
}
break;
}
producer.send(receivedMessage);
pending++;
total++;

if (isVerbose()) {
context.out.println("Received message " + total + " with " + pending + " messages pending to be commited");
}
if (pending > commitInterval) {
context.out.println("Transferred " + pending + " messages of " + total);
pending = 0;
targetSession.commit();
if (!isCopy()) {
sourceSession.commit();
try (MessageConsumer sourceConsumer = consumer;
ConnectionFactoryClosable targetConnectionFactory = createConnectionFactory("target", targetProtocol, targetURL, targetUser, targetPassword, null);
Connection targetConnection = targetConnectionFactory.createConnection();
Session targetSession = targetConnection.createSession(Session.SESSION_TRANSACTED)) {

Destination targetDestination = createDestination("target", targetSession, targetQueue, targetTopic);

try (MessageProducer producer = targetSession.createProducer(targetDestination)) {

if (sourceURL.equals(targetURL) && sourceDestination.equals(targetDestination)) {
context.out.println("You cannot transfer between " + sourceURL + "/" + sourceDestination + " and " + targetURL + "/" + targetDestination + ".\n" + "That would create an infinite recursion.");
throw new IllegalArgumentException("cannot use " + sourceDestination + " == " + targetDestination);
}

sourceConnection.start();
int pending = 0, total = 0;
while (total < messageCount) {

Message receivedMessage;
if (receiveTimeout < 0) {
receivedMessage = sourceConsumer.receive();
} else if (receiveTimeout == 0) {
receivedMessage = sourceConsumer.receiveNoWait();
} else {
receivedMessage = sourceConsumer.receive(receiveTimeout);
}

if (receivedMessage == null) {
if (isVerbose()) {
context.out.println("could not receive any more messages");
}
break;
}
producer.send(receivedMessage);
pending++;
total++;

if (isVerbose()) {
context.out.println("Received message " + total + " with " + pending + " messages pending to be commited");
}
if (pending > commitInterval) {
context.out.println("Transferred " + pending + " messages of " + total);
pending = 0;
targetSession.commit();
if (!isCopy()) {
sourceSession.commit();
}
}
}

context.out.println("Transferred a total of " + total + " messages");

if (pending != 0) {
targetSession.commit();
if (isCopy()) {
sourceSession.rollback();
} else {
sourceSession.commit();
}
}

return total;
}
}
}

context.out.println("Transferred a total of " + total + " messages");

if (pending != 0) {
targetSession.commit();
if (isCopy()) {
sourceSession.rollback();
} else {
sourceSession.commit();
}
}

sourceConnection.close();
sourceConnectionFactory.close();

targetConnection.close();
targetConnectionFactory.close();

return total;
}

Destination createDestination(String role, Session session, String queue, String topic) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.netty.channel.EventLoop;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.messages.ConnectionProtocol;
import org.apache.activemq.artemis.cli.factory.ConnectionFactoryClosable;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
Expand Down Expand Up @@ -98,7 +99,6 @@ public class PerfClientCommand extends PerfCommand {
protected void onExecuteBenchmark(final ConnectionFactory producerConnectionFactory, final Destination[] jmsDestinations, final ActionContext context) throws Exception {
final ConnectionProtocol listenerProtocol = Objects.requireNonNullElse(this.consumerProtocol, protocol);
final String listenerUrl = Objects.requireNonNullElse(this.consumerUrl, brokerURL);
final ConnectionFactory consumerConnectionFactory = createConnectionFactory(listenerUrl, user, password, null, listenerProtocol);
if (consumerConnections == 0) {
if (sharedSubscription > 0) {
if (getClientID() == null) {
Expand All @@ -122,17 +122,18 @@ protected void onExecuteBenchmark(final ConnectionFactory producerConnectionFact
boolean warmingUp = warmup != 0;
final LiveStatistics statistics;
final StringBuilder skratchBuffer = new StringBuilder();
try (MessageListenerBenchmark consumerBenchmark = new MessageListenerBenchmarkBuilder()
.setClientID(getClientID())
.setDestinations(consumerProtocol != null ? lookupDestinations(consumerConnectionFactory) : jmsDestinations)
.setFactory(consumerConnectionFactory)
.setTransacted(transaction)
.setConsumers(consumersPerDestination)
.setConnections(consumerConnections)
.setTimeProvider(() -> TimeUnit.NANOSECONDS.toMicros(System.nanoTime()))
.setCanDelayMessageCount(true)
.setSharedSubscription(sharedSubscription)
.setDurableSubscription(durableSubscription)
try (ConnectionFactoryClosable consumerConnectionFactory = createConnectionFactory(listenerUrl, user, password, null, listenerProtocol);
MessageListenerBenchmark consumerBenchmark = new MessageListenerBenchmarkBuilder()
Comment thread
gemmellr marked this conversation as resolved.
.setClientID(getClientID())
.setDestinations(consumerProtocol != null ? lookupDestinations(consumerConnectionFactory) : jmsDestinations)
.setFactory(consumerConnectionFactory)
.setTransacted(transaction)
.setConsumers(consumersPerDestination)
.setConnections(consumerConnections)
.setTimeProvider(() -> TimeUnit.NANOSECONDS.toMicros(System.nanoTime()))
.setCanDelayMessageCount(true)
.setSharedSubscription(sharedSubscription)
.setDurableSubscription(durableSubscription)
.createMessageListenerBenchmark()) {

final DefaultEventLoopGroup eventLoopGroup = new DefaultEventLoopGroup(threads) {
Expand Down