Skip to content

Commit

Permalink
StatisticsBrokerPlugin: Add feat: request destination firstMessageTim…
Browse files Browse the repository at this point in the history
…estamp

Adding a feature (STATS_FIRST_MESSAGE_TIMESTAMP) to the
StatisticsBrokerPlugin's destination-statistics for getting the
timestamp of the first message in the destination(s) being requested: If
you on the query-message set the property
StatisticsBroker.STATS_FIRST_MESSAGE_TIMESTAMP to anything (e.g. boolean
true), a long value "firstMessageTimestamp" will be added to the
statistics reply message(s). Since the reply message has JMSTimestamp
set, which is the broker's now-timestamp, you may also on the query side
calculate the age of the first message in milliseconds. The key name was
chosen since that is the name of the corresponding feature in Artemis.

This extension of the existing feature is implemented to be as
non-intrusive as possible, adding very little runtime cost if not
requested. It also seems like the runtime cost for enabling this
feature, thus finding and adding the firstMessageTimestamp, is small.

While at it, also slightly improving an existing feature
(STATS_DENOTE_END_LIST) where a reply to a destination query can be
"null terminated": After sending the relevant replies, the
StatisticsBroker also sends an empty message. This feature is relevant
if the query is a wildcard query, thus returning multiple messages: The
empty message denotes the end of the replies. However, to activate this
feature, a somewhat complicated query destination had to be constructed.
Adopting the solution for the other StatisticsBroker feature where you
may reset the broker statistics by adding a property to the query
message, this null-termination feature now /also/ checks for the
presence of this query modifier STATS_DENOTE_END_LIST as a property.
(This property based solution was thus also adopted for the present
'firstMessageTimestamp' solution, as it was found much more intuitive).

Added tests for both the STATS_FIRST_MESSAGE_TIMESTAMP query modifier,
and the improved STATS_DENOTE_END_LIST property-based query modifier.

Had to make the Topic.doBrowse(List browseList, int max) public - the
corresponding method for Queue was already public.

Made the evaluation of whether this is a StatisticsBroker-relevant
message a microscopic bit more performant (exiting faster if not
relevant): To the initial test of whether the message is relevant, which
only checked for replyTo being set, a check for 'destination.
startsWith("ActiveMQ.Statistics")' was added. Only if so, the rest of
the evaluations kick in. Also using 'string.startsWith(..)' instead of
the verbose 'string.regionMatches(..)'.

Removed an unused import on PartitionBrokerTest.java, as IntelliJ
complained about not finding it.
  • Loading branch information
stolsvik committed Jan 25, 2022
1 parent d544153 commit 9167a79
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 13 deletions.
Expand Up @@ -654,7 +654,7 @@ public Message[] browse() {
return result.toArray(new Message[result.size()]);
}

private void doBrowse(final List<Message> browseList, final int max) {
public void doBrowse(final List<Message> browseList, final int max) {
try {
if (topicStore != null) {
final List<Message> toExpire = new ArrayList<Message>();
Expand Down
Expand Up @@ -18,6 +18,8 @@

import java.io.File;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;

import javax.jms.JMSException;
Expand All @@ -33,7 +35,9 @@
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.Message;
Expand All @@ -54,11 +58,17 @@
*/
public class StatisticsBroker extends BrokerFilter {
private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
static final String STATS_PREFIX = "ActiveMQ.Statistics";

static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset";
static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
static final String STATS_DENOTE_END_LIST = STATS_DESTINATION_PREFIX + ".List.End.With.Null";

// Query-message properties controlling features of Destination-query replies:
static final String STATS_DENOTE_END_LIST = "ActiveMQ.Statistics.Destination.List.End.With.Null";
static final String STATS_FIRST_MESSAGE_TIMESTAMP = "ActiveMQ.Statistics.Destination.Include.First.Message.Timestamp";

private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
protected final ProducerId advisoryProducerId = new ProducerId();
Expand All @@ -85,26 +95,27 @@ public StatisticsBroker(Broker next) {
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
ActiveMQDestination msgDest = messageSend.getDestination();
ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) {
if ((replyTo != null) && (msgDest.getPhysicalName().startsWith(STATS_PREFIX))) {
String physicalName = msgDest.getPhysicalName();
boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0,
STATS_DESTINATION_PREFIX.length());
boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
.length());
boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
.length());
boolean destStats = physicalName.startsWith(STATS_DESTINATION_PREFIX);
boolean brokerStats = physicalName.startsWith(STATS_BROKER_PREFIX);
boolean subStats = physicalName.startsWith(STATS_SUBSCRIPTION_PREFIX);
BrokerService brokerService = getBrokerService();
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
if (destStats) {
String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length());
if (destinationName.startsWith(".")) {
destinationName = destinationName.substring(1);
}
String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,"");
boolean endListMessage = !destinationName.equals(destinationQuery);
boolean endListMessage = !destinationName.equals(destinationQuery)
|| messageSend.getProperties().containsKey(STATS_DENOTE_END_LIST);
ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType());
Set<Destination> destinations = getDestinations(queryDestination);

boolean includeFirstMessageTimestamp = messageSend.getProperties().containsKey(STATS_FIRST_MESSAGE_TIMESTAMP);
List<Message> tempFirstMessage = includeFirstMessageTimestamp ? new ArrayList<>(1) : null;

for (Destination dest : destinations) {
DestinationStatistics stats = dest.getDestinationStatistics();
if (stats != null) {
Expand All @@ -129,6 +140,21 @@ public void send(ProducerBrokerExchange producerExchange, Message messageSend) t
statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
statsMessage.setLong("producerCount", stats.getProducers().getCount());
if (includeFirstMessageTimestamp) {
if (dest instanceof Queue) {
((Queue) dest).doBrowse(tempFirstMessage, 1);
}
else if (dest instanceof Topic) {
((Topic) dest).doBrowse(tempFirstMessage, 1);
}
if (!tempFirstMessage.isEmpty()) {
Message message = tempFirstMessage.get(0);
// NOTICE: Client-side, you may get the broker "now" Timestamp by msg.getJMSTimestamp()
// This allows for calculating age.
statsMessage.setLong("firstMessageTimestamp", message.getBrokerInTime());
tempFirstMessage.clear();
}
}
statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
}
Expand Down
Expand Up @@ -17,7 +17,6 @@
package org.apache.activemq.partition;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
Expand Down
Expand Up @@ -17,7 +17,6 @@
package org.apache.activemq.plugin;

import java.net.URI;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MapMessage;
Expand Down Expand Up @@ -153,6 +152,76 @@ public void testDestinationStatsWithDot() throws Exception{
*/
}

public void testDestinationStatsWithNullTermination() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue testQueue = session.createQueue("Test.Queue");
MessageProducer producer = session.createProducer(null);
Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + testQueue.getQueueName());
Message msg = session.createMessage();
// Instruct to terminate query reply with a null-message
msg.setBooleanProperty(StatisticsBroker.STATS_DENOTE_END_LIST, true);

producer.send(testQueue, msg);

msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
assertEquals(1, reply.getLong("size"));
assertTrue(reply.getJMSTimestamp() > 0);
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());

/*
for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name));
}
*/

// Assert that we got a null-termination
MapMessage nullReply = (MapMessage) consumer.receive(10 * 1000);
assertNotNull(nullReply);
// No props in null-message
assertFalse(nullReply.getMapNames().hasMoreElements());
assertTrue(nullReply.getJMSTimestamp() > 0);
assertEquals(Message.DEFAULT_PRIORITY, nullReply.getJMSPriority());
}

public void testDestinationStatsWithFirstMessageTimestamp() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue testQueue = session.createQueue("Test.Queue");
MessageProducer producer = session.createProducer(null);
Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + testQueue.getQueueName());
Message msg = session.createMessage();
// Instruct to include timestamp of first message in the queue
msg.setBooleanProperty(StatisticsBroker.STATS_FIRST_MESSAGE_TIMESTAMP, true);

producer.send(testQueue, msg);

msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
assertEquals(1, reply.getLong("size"));
assertTrue(reply.getJMSTimestamp() > 0);
// Assert that we got the brokerInTime for the first message in queue as value of key "firstMessageTimestamp"
assertTrue(System.currentTimeMillis() >= reply.getLong("firstMessageTimestamp"));
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());

/*
for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name));
}
*/
}

@SuppressWarnings("unused")
public void testSubscriptionStats() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Expand Down

0 comments on commit 9167a79

Please sign in to comment.