Skip to content

Commit

Permalink
ARTEMIS-2003 - Add bridge metrics
Browse files Browse the repository at this point in the history
This commit adds support for tracking metrics for bridges for both
normal bridges and bridges that are part of a cluster. The two
statistics added in this commit are messages pending acknowledgement
and messages acknowledged but more can be added later.
  • Loading branch information
cshannon authored and clebertsuconic committed Aug 3, 2018
1 parent f980c34 commit e629ac4
Show file tree
Hide file tree
Showing 18 changed files with 445 additions and 10 deletions.
Expand Up @@ -106,4 +106,33 @@ public interface BridgeControl extends ActiveMQComponentControl {
*/
@Attribute(desc = "whether this bridge is using high availability")
boolean isHA();

/**
* The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but
* is waiting acknowledgement from the other broker. This is a cumulative total and the number of outstanding
* pending messages can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
*
*/
@Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker.")
long getMessagesPendingAcknowledgement();

/**
* The messagesAcknowledged counter is the number of messages actually received by the remote broker.
* This is a cumulative total and the number of outstanding pending messages can be computed by subtracting
* messagesAcknowledged from messagesPendingAcknowledgement.
*
*/
@Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
long getMessagesAcknowledged();

/**
* The bridge metrics for this bridge
*
* The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
* The messagesAcknowledged counter is the number of messages actually received by the remote broker.
*
*/
@Attribute(desc = "The metrics for this bridge. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
Map<String, Object> getMetrics();

}
Expand Up @@ -96,4 +96,52 @@ public interface ClusterConnectionControl extends ActiveMQComponentControl {
*/
@Attribute(desc = "map of the nodes connected to this cluster connection (keys are node IDs, values are the addresses used to connect to the nodes)")
Map<String, String> getNodes() throws Exception;

/**
* The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
* forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)
*
* This is a cumulative total and the number of outstanding pending messages for the cluster connection
* can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
*
*/
@Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)")
long getMessagesPendingAcknowledgement();

/**
* The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
* bridges in this cluster connection
*
* This is a cumulative total and the number of outstanding pending messages for the cluster connection
* can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
*
*/
@Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
long getMessagesAcknowledged();

/**
* The current metrics for this cluster connection (aggregate over all bridges to other nodes)
*
* The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
* forwarded a message and is waiting acknowledgement from the other broker.
*
* The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
* bridges in this cluster connection
*
* @return
*/
@Attribute(desc = "The metrics for this cluster connection. The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
Map<String, Object> getMetrics();

/**
* The bridge metrics for the given node in the cluster connection
*
* The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
* The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.
*
* @throws Exception
*/
@Attribute(desc = "The metrics for the bridge by nodeId. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.")
Map<String, Object> getBridgeMetrics(String nodeId) throws Exception;

}
Expand Up @@ -16,11 +16,12 @@
*/
package org.apache.activemq.artemis.core.management.impl;

import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import java.util.List;
import java.util.Map;

import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;

import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
Expand Down Expand Up @@ -228,6 +229,36 @@ protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(BridgeControl.class);
}

@Override
public long getMessagesPendingAcknowledgement() {
clearIO();
try {
return bridge.getMetrics().getMessagesPendingAcknowledgement();
} finally {
blockOnIO();
}
}

@Override
public long getMessagesAcknowledged() {
clearIO();
try {
return bridge.getMetrics().getMessagesAcknowledged();
} finally {
blockOnIO();
}
}

@Override
public Map<String, Object> getMetrics() {
clearIO();
try {
return bridge.getMetrics().convertToMap();
} finally {
blockOnIO();
}
}

// Public --------------------------------------------------------

// Package protected ---------------------------------------------
Expand Down
Expand Up @@ -16,16 +16,18 @@
*/
package org.apache.activemq.artemis.core.management.impl;

import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import java.util.List;
import java.util.Map;

import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;

import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;

public class ClusterConnectionControlImpl extends AbstractControl implements ClusterConnectionControl {

Expand Down Expand Up @@ -223,6 +225,48 @@ protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(ClusterConnectionControl.class);
}

@Override
public long getMessagesPendingAcknowledgement() {
clearIO();
try {
return clusterConnection.getMetrics().getMessagesPendingAcknowledgement();
} finally {
blockOnIO();
}
}

@Override
public long getMessagesAcknowledged() {
clearIO();
try {
return clusterConnection.getMetrics().getMessagesAcknowledged();
} finally {
blockOnIO();
}
}

@Override
public Map<String, Object> getMetrics() {
clearIO();
try {
return clusterConnection.getMetrics().convertToMap();
} finally {
blockOnIO();
}
}

@Override
public Map<String, Object> getBridgeMetrics(String nodeId) {
clearIO();
try {
final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(nodeId);
return bridgeMetrics != null ? bridgeMetrics.convertToMap() : null;
} finally {
blockOnIO();
}

}

// Public --------------------------------------------------------

// Package protected ---------------------------------------------
Expand Down
Expand Up @@ -20,6 +20,7 @@
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;

Expand Down Expand Up @@ -52,4 +53,6 @@ public interface Bridge extends Consumer, ActiveMQComponent {
void disconnect();

boolean isConnected();

BridgeMetrics getMetrics();
}
Expand Up @@ -25,6 +25,8 @@
import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;

public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {

Expand Down Expand Up @@ -80,4 +82,18 @@ void nodeAnnounced(long eventUID,

long getCallTimeout();

/**
* The metric for this cluster connection
*
* @return
*/
ClusterConnectionMetrics getMetrics();

/**
* Returns the BridgeMetrics for the bridge to the given node if exists
*
* @param nodeId
* @return
*/
BridgeMetrics getBridgeMetrics(String nodeId);
}
Expand Up @@ -55,10 +55,10 @@
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
Expand Down Expand Up @@ -159,6 +159,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled

private ActiveMQServer server;

private final BridgeMetrics metrics = new BridgeMetrics();

public BridgeImpl(final ServerLocatorInternal serverLocator,
final int initialConnectAttempts,
final int reconnectAttempts,
Expand Down Expand Up @@ -518,6 +520,7 @@ public void sendAcknowledged(final Message message) {
}
ref.getQueue().acknowledge(ref);
pendingAcks.countDown();
metrics.incrementMessagesAcknowledged();
} else {
if (logger.isTraceEnabled()) {
logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
Expand Down Expand Up @@ -611,13 +614,21 @@ public HandleStatus handle(final MessageReference ref) throws Exception {
pendingAcks.countUp();

try {
final HandleStatus status;
if (message.isLargeMessage()) {
deliveringLargeMessage = true;
deliverLargeMessage(dest, ref, (LargeServerMessage) message);
return HandleStatus.HANDLED;
status = HandleStatus.HANDLED;
} else {
return deliverStandardMessage(dest, ref, message);
status = deliverStandardMessage(dest, ref, message);
}

//Only increment messages pending acknowledgement if handled by bridge
if (status == HandleStatus.HANDLED) {
metrics.incrementMessagesPendingAcknowledgement();
}

return status;
} catch (Exception e) {
// If an exception happened, we must count down immediately
pendingAcks.countDown();
Expand Down Expand Up @@ -770,6 +781,11 @@ public TopologyMember getTargetNodeFromTopology() {
return this.targetNode;
}

@Override
public BridgeMetrics getMetrics() {
return this.metrics;
}

@Override
public String toString() {
return this.getClass().getSimpleName() + "@" +
Expand Down
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.cluster.impl;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

public class BridgeMetrics {

public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement";
public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged";

private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER =
AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY);

private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_ACKNOWLEDGED_UPDATER =
AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_ACKNOWLEDGED_KEY);

private volatile long messagesPendingAcknowledgement;
private volatile long messagesAcknowledged;

public void incrementMessagesPendingAcknowledgement() {
MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER.incrementAndGet(this);
}

public void incrementMessagesAcknowledged() {
MESSAGES_ACKNOWLEDGED_UPDATER.incrementAndGet(this);
}

/**
* @return the messagesPendingAcknowledgement
*/
public long getMessagesPendingAcknowledgement() {
return messagesPendingAcknowledgement;
}

/**
* @return the messagesAcknowledged
*/
public long getMessagesAcknowledged() {
return messagesAcknowledged;
}

/**
* @return New map containing the Bridge metrics
*/
public Map<String, Object> convertToMap() {
final Map<String, Object> metrics = new HashMap<>();
metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement);
metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged);

return metrics;
}
}

0 comments on commit e629ac4

Please sign in to comment.