Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
This reverts commit dbb3a90.

The org.apache.activemq.artemis.core.server.Queue#getRate method is for
slow-consumer detection and is designed for internal use only.

Furthermore, it's too opaque to be trusted by a remote user as it only
returns the number of message added to the queue since *the last time
it was called*. The problem here is that the user calling it doesn't
know when it was invoked last. Therefore, they could be getting the
rate of messages added for the last 5 minutes or the last 5
milliseconds. This can lead to inconsistent and misleading results.

There are three main ways for users to track rates of message
production and consumption:

 1. Use a metrics plugin. This is the most feature-rich and flexible
way to track broker metrics, although it requires tools (e.g.
Prometheus) to store the metrics and display them (e.g. Grafana).

 2. Invoke the getMessageCount() and getMessagesAdded() management
methods and store the returned values along with the time they were
retrieved. A time-series database is a great tool for this job. This is
exactly what tools like Prometheus do. That data can then be used to
create informative graphs, etc. using tools like Grafana. Of course, one
can skip all the tools and just do some simple math to calculate rates
based on the last time the counts were retrieved.

 3. Use the broker's message counters. Message counters are the broker's
simple way of providing historical information about the queue. They
provide similar results to the previous solutions, but with less
flexibility since they only track data while the broker is up and
there's not really any good options for graphing.
  • Loading branch information
jbertram authored and clebertsuconic committed Sep 23, 2020
1 parent ddef389 commit 246bf08
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2299,14 +2299,6 @@ static void createCoreSession(Object source, Object... args) {
@Message(id = 601267, value = "User {0} is creating a core session on target resource {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void createCoreSession(String user, Object source, Object... args);

static void getProducedRate(Object source) {
LOGGER.getProducedRate(getCaller(), source);
}

@LogMessage(level = Logger.Level.INFO)
@Message(id = 601268, value = "User {0} is getting produced message rate on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
void getProducedRate(String user, Object source, Object... args);

static void getAcknowledgeAttempts(Object source) {
LOGGER.getMessagesAcknowledged(getCaller(), source);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,6 @@ public interface QueueControl {
@Attribute(desc = MESSAGE_COUNT_DESCRIPTION)
long getMessageCount();

/**
* Returns the rate of writing messages to the queue.
*/
@Attribute(desc = "rate of writing messages to the queue currently (based on default window function)")
float getProducedRate();

/**
* Returns the persistent size of all messages currently in this queue. The persistent size of a message
* is the amount of space the message would take up on disk which is used to track how much data there
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,17 +270,6 @@ public long getMessageCount() {
}
}

@Override
public float getProducedRate() {
if (AuditLogger.isEnabled()) {
AuditLogger.getProducedRate(queue);
}
checkStarted();

// This is an attribute, no need to blockOnIO
return queue.getRate();
}

@Override
public long getPersistentSize() {
if (AuditLogger.isEnabled()) {
Expand Down Expand Up @@ -920,7 +909,7 @@ public long countMessages(final String filterStr) throws Exception {
AuditLogger.countMessages(queue, filterStr);
}

Long value = internalCountMessages(filterStr, null).get(null);
Long value = intenalCountMessages(filterStr, null).get(null);
return value == null ? 0 : value;
}

Expand All @@ -930,10 +919,10 @@ public String countMessages(final String filterStr, final String groupByProperty
AuditLogger.countMessages(queue, filterStr, groupByProperty);
}

return JsonUtil.toJsonObject(internalCountMessages(filterStr, groupByProperty)).toString();
return JsonUtil.toJsonObject(intenalCountMessages(filterStr, groupByProperty)).toString();
}

private Map<String, Long> internalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
private Map<String, Long> intenalCountMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted();

clearIO();
Expand Down Expand Up @@ -968,7 +957,7 @@ public long countDeliveringMessages(final String filterStr) throws Exception {
AuditLogger.countDeliveringMessages(queue, filterStr);
}

Long value = internalCountDeliveryMessages(filterStr, null).get(null);
Long value = intenalCountDeliveryMessages(filterStr, null).get(null);
return value == null ? 0 : value;
}

Expand All @@ -978,10 +967,10 @@ public String countDeliveringMessages(final String filterStr, final String group
AuditLogger.countDeliveringMessages(queue, filterStr, groupByProperty);
}

return JsonUtil.toJsonObject(internalCountDeliveryMessages(filterStr, groupByProperty)).toString();
return JsonUtil.toJsonObject(intenalCountDeliveryMessages(filterStr, groupByProperty)).toString();
}

private Map<String, Long> internalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
private Map<String, Long> intenalCountDeliveryMessages(final String filterStr, final String groupByPropertyStr) throws Exception {
checkStarted();

clearIO();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,6 @@ public long getMessageCount() {
return (Long) proxy.retrieveAttributeValue("messageCount", Long.class);
}

@Override
public float getProducedRate() {
return (Long) proxy.retrieveAttributeValue("producedRate", Long.class);
}

@Override
public long getMessagesAdded() {
return (Integer) proxy.retrieveAttributeValue("messagesAdded", Integer.class);
Expand Down

0 comments on commit 246bf08

Please sign in to comment.