Skip to content

Commit

Permalink
ARTEMIS-1961 track routed and unrouted messages sent to an address
Browse files Browse the repository at this point in the history
  • Loading branch information
jbertram authored and clebertsuconic committed Sep 25, 2018
1 parent a9916ad commit 9c62531
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 14 deletions.
Expand Up @@ -104,6 +104,18 @@ public interface AddressControl {
@Attribute(desc = "number of messages added to all the queues for this address")
long getMessageCount();

/**
* Returns the number of messages routed to one or more bindings
*/
@Attribute(desc = "number of messages routed to one or more bindings")
long getRoutedMessageCount();

/**
* Returns the number of messages not routed to any bindings
*/
@Attribute(desc = "number of messages not routed to any bindings")
long getUnRoutedMessageCount();


/**
* @param headers the message headers and properties to set. Can only
Expand Down
Expand Up @@ -267,6 +267,16 @@ public long getMessageCount() {
return getMessageCount(DurabilityType.ALL);
}

@Override
public long getRoutedMessageCount() {
return addressInfo.getRoutedMessageCount();
}

@Override
public long getUnRoutedMessageCount() {
return addressInfo.getUnRoutedMessageCount();
}


@Override
public String sendMessage(final Map<String, String> headers,
Expand Down
Expand Up @@ -842,11 +842,11 @@ public RoutingStatus route(final Message message,
throw new IllegalStateException("Message cannot be routed more than once");
}

setPagingStore(context.getAddress(message), message);
final SimpleString address = context.getAddress(message);

AtomicBoolean startedTX = new AtomicBoolean(false);
setPagingStore(address, message);

final SimpleString address = context.getAddress(message);
AtomicBoolean startedTX = new AtomicBoolean(false);

applyExpiryDelay(message, address);

Expand All @@ -856,23 +856,24 @@ public RoutingStatus route(final Message message,

message.cleanupInternalProperties();

Bindings bindings = addressManager.getBindingsForRoutingAddress(context.getAddress(message));
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);

AddressInfo addressInfo = addressManager.getAddressInfo(address);

// TODO auto-create queues here?
// first check for the auto-queue creation thing
if (bindings == null) {
// There is no queue with this address, we will check if it needs to be created
// if (queueCreator.create(address)) {
// TODO: this is not working!!!!
// reassign bindings if it was created
// bindings = addressManager.getBindingsForRoutingAddress(address);
// }
}
if (bindingMove != null) {
bindingMove.route(message, context);
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
}
} else if (bindings != null) {
bindings.route(message, context);
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
}
} else {
if (addressInfo != null) {
addressInfo.incrementUnRoutedMessageCount();
}
// this is a debug and not warn because this could be a regular scenario on publish-subscribe queues (or topic subscriptions on JMS)
if (logger.isDebugEnabled()) {
logger.debug("Couldn't find any bindings for address=" + address + " on message=" + message);
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public class AddressInfo {

Expand All @@ -36,6 +37,14 @@ public class AddressInfo {

private boolean internal = false;

private volatile long routedMessageCount = 0;

private static final AtomicLongFieldUpdater<AddressInfo> routedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "routedMessageCount");

private volatile long unRoutedMessageCount = 0;

private static final AtomicLongFieldUpdater<AddressInfo> unRoutedMessageCountUpdater = AtomicLongFieldUpdater.newUpdater(AddressInfo.class, "unRoutedMessageCount");

public AddressInfo(SimpleString name) {
this(name, EnumSet.noneOf(RoutingType.class));
}
Expand Down Expand Up @@ -155,4 +164,20 @@ public AddressInfo getAddressAndRoutingType(Map<SimpleString, RoutingType> prefi
return this;
}

public long incrementRoutedMessageCount() {
return routedMessageCountUpdater.incrementAndGet(this);
}

public long incrementUnRoutedMessageCount() {
return unRoutedMessageCountUpdater.incrementAndGet(this);
}

public long getRoutedMessageCount() {
return routedMessageCountUpdater.get(this);
}

public long getUnRoutedMessageCount() {
return unRoutedMessageCountUpdater.get(this);
}

}
Expand Up @@ -340,6 +340,36 @@ public void testGetMessageCount() throws Exception {
assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100));
}

@Test
public void testGetRoutedMessageCounts() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
session.createAddress(address, RoutingType.ANYCAST, false);

AddressControl addressControl = createManagementControl(address);
assertEquals(0, addressControl.getMessageCount());

ClientProducer producer = session.createProducer(address.toString());
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 0, 2000, 100));
assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));

session.createQueue(address, RoutingType.ANYCAST, address);
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 1, 2000, 100));
assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));

session.createQueue(address, RoutingType.ANYCAST, address.concat('2'));
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100));
assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 1, 2000, 100));

session.deleteQueue(address);
session.deleteQueue(address.concat('2'));
producer.send(session.createMessage(false));
assertTrue(Wait.waitFor(() -> addressControl.getRoutedMessageCount() == 2, 2000, 100));
assertTrue(Wait.waitFor(() -> addressControl.getUnRoutedMessageCount() == 2, 2000, 100));
}

@Test
public void testSendMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
Expand Down
Expand Up @@ -103,6 +103,16 @@ public long getMessageCount() {
return (long) proxy.retrieveAttributeValue("messageCount");
}

@Override
public long getRoutedMessageCount() {
return (long) proxy.retrieveAttributeValue("routedMessageCount");
}

@Override
public long getUnRoutedMessageCount() {
return (long) proxy.retrieveAttributeValue("unRoutedMessageCount");
}

@Override
public String sendMessage(Map<String, String> headers,
int type,
Expand Down

0 comments on commit 9c62531

Please sign in to comment.