Skip to content

Commit

Permalink
This closes #4120
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram committed Aug 25, 2022
2 parents 38ee7ba + 63966dc commit 73e57e5
Show file tree
Hide file tree
Showing 12 changed files with 211 additions and 7 deletions.
Expand Up @@ -2976,4 +2976,12 @@ static void removeConnector(Object source, Object... args) {
@LogMessage(level = Logger.Level.INFO)
@Message(id = 601763, value = "User {0} is remove a connector on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void removeConnector(String user, Object source, Object... args);

static void deliverScheduledMessage(Object source, Object... args) {
BASE_LOGGER.deliverScheduledMessage(getCaller(), source, arrayToString(args));
}

@LogMessage(level = Logger.Level.INFO)
@Message(id = 601764, value = "User {0} is calling deliverScheduledMessage on queue: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void deliverScheduledMessage(String user, Object source, Object... args);
}
Expand Up @@ -770,4 +770,16 @@ CompositeData[] browse(@Parameter(name = "page", desc = "Current page") int page
*/
@Attribute(desc = "return how many messages are stuck in prepared transactions")
int getPreparedTransactionMessageCount();

/**
* Deliver the scheduled messages which match the filter
*/
@Operation(desc = "Immediately deliver the scheduled messages which match the filter", impact = MBeanOperationInfo.ACTION)
void deliverScheduledMessages(@Parameter(name = "filter", desc = "filter to match messages to deliver") String filter) throws Exception;

/**
* Deliver the scheduled message with the specified message ID
*/
@Operation(desc = "Immediately deliver the scheduled message with the specified message ID", impact = MBeanOperationInfo.ACTION)
void deliverScheduledMessage(@Parameter(name = "messageID", desc = "ID of the message to deliver") long messageId) throws Exception;
}
Expand Up @@ -1962,8 +1962,35 @@ public int getPreparedTransactionMessageCount() {
}
}

@Override
public void deliverScheduledMessages(String filter) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.deliverScheduledMessage(queue, filter);
}
checkStarted();

clearIO();
try {
queue.deliverScheduledMessages(filter);
} finally {
blockOnIO();
}
}

@Override
public void deliverScheduledMessage(long messageId) throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.deliverScheduledMessage(queue, messageId);
}
checkStarted();

clearIO();
try {
queue.deliverScheduledMessage(messageId);
} finally {
blockOnIO();
}
}

private void checkStarted() {
if (!server.getPostOffice().isStarted()) {
Expand Down
Expand Up @@ -504,6 +504,16 @@ default void forEach(java.util.function.Consumer<MessageReference> consumer) {
*/
void deliverScheduledMessages() throws ActiveMQException;

/**
* cancels scheduled messages which match the filter and send them to the head of the queue.
*/
void deliverScheduledMessages(String filter) throws ActiveMQException;

/**
* cancels scheduled message with the corresponding message ID and sends it to the head of the queue.
*/
void deliverScheduledMessage(long messageId) throws ActiveMQException;

void postAcknowledge(MessageReference ref, AckReason reason);

void postAcknowledge(MessageReference ref, AckReason reason, boolean delivering);
Expand Down
Expand Up @@ -17,9 +17,9 @@
package org.apache.activemq.artemis.core.server;

import java.util.List;
import java.util.function.Predicate;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.transaction.Transaction;

public interface ScheduledDeliveryHandler {
Expand All @@ -36,7 +36,7 @@ public interface ScheduledDeliveryHandler {

List<MessageReference> getScheduledReferences();

List<MessageReference> cancel(Filter filter) throws ActiveMQException;
List<MessageReference> cancel(Predicate<MessageReference> predicate) throws ActiveMQException;

MessageReference removeReferenceWithID(long id) throws Exception;

Expand Down
Expand Up @@ -2035,7 +2035,21 @@ public void incrementMesssagesAdded() {

@Override
public void deliverScheduledMessages() throws ActiveMQException {
List<MessageReference> scheduledMessages = scheduledDeliveryHandler.cancel(null);
internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> true));
}

@Override
public void deliverScheduledMessages(String filterString) throws ActiveMQException {
final Filter filter = filterString == null || filterString.length() == 0 ? null : FilterImpl.createFilter(filterString);
internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> filter == null ? true : filter.match(ref.getMessage())));
}

@Override
public void deliverScheduledMessage(long messageId) throws ActiveMQException {
internalDeliverScheduleMessages(scheduledDeliveryHandler.cancel(ref -> ref.getMessageID() == messageId));
}

private void internalDeliverScheduleMessages(List<MessageReference> scheduledMessages) {
if (scheduledMessages != null && scheduledMessages.size() > 0) {
for (MessageReference ref : scheduledMessages) {
ref.getMessage().setScheduledDeliveryTime(ref.getScheduledDeliveryTime());
Expand Down Expand Up @@ -2170,7 +2184,7 @@ private int iterQueue(final int flushLimit,
txCount = 0;
}

List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(filter1);
List<MessageReference> cancelled = scheduledDeliveryHandler.cancel(ref -> filter1.match(ref.getMessage()));
for (MessageReference messageReference : cancelled) {
messageAction.actMessage(tx, messageReference);
count++;
Expand Down
Expand Up @@ -27,9 +27,9 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ScheduledDeliveryHandler;
Expand Down Expand Up @@ -117,15 +117,15 @@ public List<MessageReference> getScheduledReferences() {
}

@Override
public List<MessageReference> cancel(final Filter filter) throws ActiveMQException {
public List<MessageReference> cancel(Predicate<MessageReference> predicate) throws ActiveMQException {
List<MessageReference> refs = new ArrayList<>();

synchronized (scheduledReferences) {
Iterator<RefScheduled> iter = scheduledReferences.iterator();

while (iter.hasNext()) {
MessageReference ref = iter.next().getRef();
if (filter == null || filter.match(ref.getMessage())) {
if (predicate.test(ref)) {
iter.remove();
refs.add(ref);
metrics.decrementMetrics(ref);
Expand Down
Expand Up @@ -895,6 +895,16 @@ public void deliverScheduledMessages() throws ActiveMQException {

}

@Override
public void deliverScheduledMessages(String filter) throws ActiveMQException {

}

@Override
public void deliverScheduledMessage(long messageId) throws ActiveMQException {

}

@Override
public void postAcknowledge(MessageReference ref, AckReason reason) {

Expand Down
Expand Up @@ -1623,6 +1623,16 @@ public void deliverScheduledMessages() {

}

@Override
public void deliverScheduledMessages(String filter) throws ActiveMQException {

}

@Override
public void deliverScheduledMessage(long messageId) throws ActiveMQException {

}

@Override
public void route(Message message, RoutingContext context) throws Exception {

Expand Down
Expand Up @@ -58,6 +58,24 @@ public int getPreparedTransactionMessageCount() {
}
}

@Override
public void deliverScheduledMessages(String filter) throws Exception {
try {
proxy.invokeOperation("deliverScheduledMessages", filter);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

@Override
public void deliverScheduledMessage(long messageId) throws Exception {
try {
proxy.invokeOperation("deliverScheduledMessage", messageId);
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}

@Override
public void resetAllGroups() {
try {
Expand Down
Expand Up @@ -30,12 +30,15 @@
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -505,6 +508,87 @@ public void testManyMessagesSameTime() throws Exception {
session.close();
}

@Test
public void testManagementDeliveryById() throws Exception {

ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession session = sessionFactory.createSession(false, false, false);
session.createQueue(new QueueConfiguration(atestq));
ClientProducer producer = session.createProducer(atestq);
long time = System.currentTimeMillis();
time += 999_999_999;

ClientMessage messageToSend = session.createMessage(true);
messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
producer.send(messageToSend);

session.commit();

session.start();
ClientConsumer consumer = session.createConsumer(atestq);
ClientMessage message = consumer.receive(500);
assertNull(message);

QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq);
queueControl.deliverScheduledMessage((long) queueControl.listScheduledMessages()[0].get("messageID"));

message = consumer.receive(500);
assertNotNull(message);
message.acknowledge();

session.commit();

Assert.assertNull(consumer.receiveImmediate());

session.close();
}

@Test
public void testManagementDeliveryByFilter() throws Exception {
final String propertyValue = RandomUtil.randomString();
final String propertyName = "X" + RandomUtil.randomString().replace("-","");
ClientSessionFactory sessionFactory = createSessionFactory(locator);
ClientSession session = sessionFactory.createSession(false, false, false);
session.createQueue(new QueueConfiguration(atestq));
ClientProducer producer = session.createProducer(atestq);
long time = System.currentTimeMillis();
time += 999_999_999;

ClientMessage messageToSend = session.createMessage(true);
messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
messageToSend.putStringProperty(propertyName, propertyValue);
producer.send(messageToSend);

messageToSend = session.createMessage(true);
messageToSend.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
messageToSend.putStringProperty(propertyName, propertyValue);
producer.send(messageToSend);

session.commit();

session.start();
ClientConsumer consumer = session.createConsumer(atestq);
ClientMessage message = consumer.receive(500);
assertNull(message);

QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + atestq);
queueControl.deliverScheduledMessages(propertyName + " = '" + propertyValue + "'");

message = consumer.receive(500);
assertNotNull(message);
message.acknowledge();

message = consumer.receive(500);
assertNotNull(message);
message.acknowledge();

session.commit();

Assert.assertNull(consumer.receiveImmediate());

session.close();
}

public void testScheduledAndNormalMessagesDeliveredCorrectly(final boolean recover) throws Exception {

ClientSessionFactory sessionFactory = createSessionFactory(locator);
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.Executor;

import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.RoutingType;
Expand Down Expand Up @@ -678,6 +679,16 @@ public void deliverScheduledMessages() {

}

@Override
public void deliverScheduledMessages(String filter) throws ActiveMQException {

}

@Override
public void deliverScheduledMessage(long messageId) throws ActiveMQException {

}

@Override
public SimpleString getName() {
return name;
Expand Down

0 comments on commit 73e57e5

Please sign in to comment.