Skip to content

Commit

Permalink
ARTEMIS-3309 Add a parameter to limit messages to move or transfer
Browse files Browse the repository at this point in the history
  • Loading branch information
brusdev committed May 20, 2021
1 parent d8d4419 commit 8002182
Show file tree
Hide file tree
Showing 11 changed files with 416 additions and 11 deletions.
Expand Up @@ -105,10 +105,207 @@ public class Transfer extends InputAbstract {
@Option(name = "--target-topic", description = "Destination to be used. It can be prefixed with queue:// or topic:// and can be an FQQN in the form of <address>::<queue>. (Default: queue://TEST)")
String targetTopic;

boolean isCopy() {
@Option(name = "--max-count", description = "The maximum number of messages to transfer.")
int maxCount = Integer.MAX_VALUE;

public String getSourceURL() {
return sourceURL;
}

public Transfer setSourceURL(String sourceURL) {
this.sourceURL = sourceURL;
return this;
}

public String getSourceUser() {
return sourceUser;
}

public Transfer setSourceUser(String sourceUser) {
this.sourceUser = sourceUser;
return this;
}

public String getSourcePassword() {
return sourcePassword;
}

public Transfer setSourcePassword(String sourcePassword) {
this.sourcePassword = sourcePassword;
return this;
}

public String getTargetURL() {
return targetURL;
}

public Transfer setTargetURL(String targetURL) {
this.targetURL = targetURL;
return this;
}

public String getTargetUser() {
return targetUser;
}

public Transfer setTargetUser(String targetUser) {
this.targetUser = targetUser;
return this;
}

public String getTargetPassword() {
return targetPassword;
}

public Transfer setTargetPassword(String targetPassword) {
this.targetPassword = targetPassword;
return this;
}

public int getReceiveTimeout() {
return receiveTimeout;
}

public Transfer setReceiveTimeout(int receiveTimeout) {
this.receiveTimeout = receiveTimeout;
return this;
}

public String getSourceClientID() {
return sourceClientID;
}

public Transfer setSourceClientID(String sourceClientID) {
this.sourceClientID = sourceClientID;
return this;
}

public String getSourceProtocol() {
return sourceProtocol;
}

public Transfer setSourceProtocol(String sourceProtocol) {
this.sourceProtocol = sourceProtocol;
return this;
}

public String getSourceQueue() {
return sourceQueue;
}

public Transfer setSourceQueue(String sourceQueue) {
this.sourceQueue = sourceQueue;
return this;
}

public String getSharedDurableSubscription() {
return sharedDurableSubscription;
}

public Transfer setSharedDurableSubscription(String sharedDurableSubscription) {
this.sharedDurableSubscription = sharedDurableSubscription;
return this;
}

public String getSharedSubscription() {
return sharedSubscription;
}

public Transfer setSharedSubscription(String sharedSubscription) {
this.sharedSubscription = sharedSubscription;
return this;
}

public String getDurableConsumer() {
return durableConsumer;
}

public Transfer setDurableConsumer(String durableConsumer) {
this.durableConsumer = durableConsumer;
return this;
}

public boolean isNoLocal() {
return noLocal;
}

public Transfer setNoLocal(boolean noLocal) {
this.noLocal = noLocal;
return this;
}

public String getSourceTopic() {
return sourceTopic;
}

public Transfer setSourceTopic(String sourceTopic) {
this.sourceTopic = sourceTopic;
return this;
}

public String getFilter() {
return filter;
}

public Transfer setFilter(String filter) {
this.filter = filter;
return this;
}

public String getTargetProtocol() {
return targetProtocol;
}

public Transfer setTargetProtocol(String targetProtocol) {
this.targetProtocol = targetProtocol;
return this;
}

public int getCommitInterval() {
return commitInterval;
}

public Transfer setCommitInterval(int commitInterval) {
this.commitInterval = commitInterval;
return this;
}

public boolean isCopy() {
return copy;
}

public Transfer setCopy(boolean copy) {
this.copy = copy;
return this;
}

public String getTargetQueue() {
return targetQueue;
}

public Transfer setTargetQueue(String targetQueue) {
this.targetQueue = targetQueue;
return this;
}

public String getTargetTopic() {
return targetTopic;
}

public Transfer setTargetTopic(String targetTopic) {
this.targetTopic = targetTopic;
return this;
}

public int getMaxCount() {
return maxCount;
}

public Transfer setMaxCount(int maxCount) {
this.maxCount = maxCount;
return this;
}

@SuppressWarnings("StringEquality")
@Override
public Object execute(ActionContext context) throws Exception {
Expand Down Expand Up @@ -183,7 +380,7 @@ public Object execute(ActionContext context) throws Exception {

sourceConnection.start();
int pending = 0, total = 0;
while (true) {
while (total < maxCount) {

Message receivedMessage;
if (receiveTimeout < 0) {
Expand Down Expand Up @@ -231,7 +428,7 @@ public Object execute(ActionContext context) throws Exception {
sourceConnection.close();
targetConnection.close();

return null;
return total;
}

Destination createDestination(String role, Session session, String queue, String topic) throws Exception {
Expand Down
Expand Up @@ -37,6 +37,7 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import java.util.ArrayList;
Expand Down Expand Up @@ -142,18 +143,30 @@ protected Session createSession(Connection connection) throws JMSException {
}

protected List<Message> consumeMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
List<Message> messages = new ArrayList<>();
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);
MessageConsumer consumer = session.createConsumer(destination);

List<Message> messages = new ArrayList<>();
for (int i = 0; i < noMessages; i++) {
Message m = consumer.receive(1000);
assertNotNull(m);
messages.add(m);
try (MessageConsumer consumer = session.createConsumer(destination)) {
for (int i = 0; i < noMessages; i++) {
Message m = consumer.receive(1000);
assertNotNull(m);
messages.add(m);
}
}

return messages;
}

protected void produceMessages(Session session, String address, int noMessages, boolean fqqn) throws Exception {
Destination destination = fqqn ? session.createQueue(address) : getDestination(address);

try (MessageProducer producer = session.createProducer(destination)) {
for (int i = 0; i < noMessages; i++) {
producer.send(session.createTextMessage("test message: " + i));
}
}
}

Destination getDestination(String queueName) {
return ActiveMQDestination.createDestination("queue://" + queueName, ActiveMQDestination.TYPE.QUEUE);
}
Expand Down
@@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.cli.test;

import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.cli.commands.messages.Transfer;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.management.ManagementContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import javax.jms.Connection;
import javax.jms.Session;

public class TransferTest extends CliTestBase {
private Connection connection;
private ActiveMQConnectionFactory cf;
private ActiveMQServer server;
private static final int TEST_MESSAGE_COUNT = 10;

@Before
@Override
public void setup() throws Exception {
setupAuth();
super.setup();
server = ((Pair<ManagementContext, ActiveMQServer>)startServer()).getB();
cf = getConnectionFactory(61616);
connection = cf.createConnection("admin", "admin");
}

@After
@Override
public void tearDown() throws Exception {
closeConnection(cf, connection);
super.tearDown();
}

@Test
public void testTransferMessages() throws Exception {
testTransferMessages(TEST_MESSAGE_COUNT, 0);
}

@Test
public void testTransferMessagesWithMaxCount() throws Exception {
testTransferMessages(TEST_MESSAGE_COUNT, 5);
}

private void testTransferMessages(int messageCount, int maxCount) throws Exception {
String sourceQueueName = "SOURCE_QUEUE";
String targetQueueName = "TARGET_QUEUE";

Session session = createSession(connection);
produceMessages(session, sourceQueueName, messageCount, false);

Queue sourceQueue = server.locateQueue(sourceQueueName);

Assert.assertEquals(messageCount, sourceQueue.getMessageCount());

Transfer transfer = new Transfer()
.setSourceUser("admin")
.setSourcePassword("admin")
.setSourceQueue(sourceQueueName)
.setTargetUser("admin")
.setTargetPassword("admin")
.setTargetQueue(targetQueueName);

if (maxCount > 0) {
transfer.setMaxCount(maxCount);

Assert.assertEquals(maxCount, transfer.execute(new TestActionContext()));

Queue targetQueue = server.locateQueue(targetQueueName);
Assert.assertEquals(messageCount - maxCount, sourceQueue.getMessageCount());
Assert.assertEquals(maxCount, targetQueue.getMessageCount());
} else {
Assert.assertEquals(messageCount, transfer.execute(new TestActionContext()));

Queue targetQueue = server.locateQueue(targetQueueName);
Assert.assertEquals(0, sourceQueue.getMessageCount());
Assert.assertEquals(messageCount, targetQueue.getMessageCount());
}
}
}
Expand Up @@ -523,6 +523,13 @@ int moveMessages(@Parameter(name = "flushLimit", desc = "Limit to flush transact
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates) throws Exception;

@Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
int moveMessages(@Parameter(name = "flushLimit", desc = "Limit to flush transactions during the operation to avoid OutOfMemory") int flushLimit,
@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName,
@Parameter(name = "rejectDuplicates", desc = "Reject messages identified as duplicate by the duplicate message") boolean rejectDuplicates,
@Parameter(name = "maxCount", desc = "The maximum number of messages to move.") int maxCount) throws Exception;

/**
* Sends the message corresponding to the specified message ID to this queue's dead letter address.
*
Expand Down

0 comments on commit 8002182

Please sign in to comment.