Skip to content

Commit

Permalink
ARTEMIS-4557 expose producer window size in clusterconnection JMX
Browse files Browse the repository at this point in the history
  • Loading branch information
andytaylor authored and jbertram committed Jan 9, 2024
1 parent 85b2f4b commit 43166e2
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,4 +144,7 @@ public interface ClusterConnectionControl extends ActiveMQComponentControl {
@Attribute(desc = "The metrics for the bridge by nodeId. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.")
Map<String, Object> getBridgeMetrics(String nodeId) throws Exception;

@Attribute(desc = "The Producer Window Size used by the Cluster Connection")
long getProducerWindowSize();

}
Original file line number Diff line number Diff line change
Expand Up @@ -321,5 +321,18 @@ public Map<String, Object> getBridgeMetrics(String nodeId) {

}

@Override
public long getProducerWindowSize() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getTopology(this.clusterConnection);
}
clearIO();
try {
return clusterConnection.getProducerWindowSize();
} finally {
blockOnIO();
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ void nodeAnnounced(long eventUID,

long getCallTimeout();

long getProducerWindowSize();

Bridge[] getBridges();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,11 @@ public long getCallTimeout() {
return callTimeout;
}

@Override
public long getProducerWindowSize() {
return producerWindowSize;
}

@Override
public Bridge[] getBridges() {
synchronized (recordsGuard) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public void testAttributes1() throws Exception {
Assert.assertEquals(clusterConnectionConfig1.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection());
Assert.assertEquals(clusterConnectionConfig1.getMessageLoadBalancingType().toString(), clusterConnectionControl.getMessageLoadBalancingType());
Assert.assertEquals(clusterConnectionConfig1.getMaxHops(), clusterConnectionControl.getMaxHops());
Assert.assertEquals(clusterConnectionConfig1.getProducerWindowSize(), clusterConnectionControl.getProducerWindowSize());
Assert.assertEquals(0L, clusterConnectionControl.getMessagesPendingAcknowledgement());
Assert.assertEquals(0L, clusterConnectionControl.getMessagesAcknowledged());
Map<String, Object> clusterMetrics = clusterConnectionControl.getMetrics();
Expand Down Expand Up @@ -112,6 +113,7 @@ public void testAttributes2() throws Exception {
Assert.assertEquals(clusterConnectionConfig2.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection());
Assert.assertEquals(clusterConnectionConfig2.getMessageLoadBalancingType().toString(), clusterConnectionControl.getMessageLoadBalancingType());
Assert.assertEquals(clusterConnectionConfig2.getMaxHops(), clusterConnectionControl.getMaxHops());
Assert.assertEquals(clusterConnectionConfig2.getProducerWindowSize(), clusterConnectionControl.getProducerWindowSize());

Object[] connectorPairs = clusterConnectionControl.getStaticConnectors();
Assert.assertEquals(0, connectorPairs.length);
Expand Down Expand Up @@ -199,9 +201,9 @@ public void setUp() throws Exception {

Configuration conf_1 = createBasicConfig().addAcceptorConfiguration(acceptorConfig).addQueueConfiguration(queueConfig);

clusterConnectionConfig1 = new ClusterConnectionConfiguration().setName(RandomUtil.randomString()).setAddress(queueConfig.getAddress().toString()).setConnectorName(connectorConfig.getName()).setRetryInterval(RandomUtil.randomPositiveLong()).setDuplicateDetection(RandomUtil.randomBoolean()).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setMaxHops(RandomUtil.randomPositiveInt()).setConfirmationWindowSize(RandomUtil.randomPositiveInt()).setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND).setStaticConnectors(connectors).setCallTimeout(500).setCallFailoverTimeout(500);
clusterConnectionConfig1 = new ClusterConnectionConfiguration().setName(RandomUtil.randomString()).setAddress(queueConfig.getAddress().toString()).setConnectorName(connectorConfig.getName()).setRetryInterval(RandomUtil.randomPositiveLong()).setDuplicateDetection(RandomUtil.randomBoolean()).setMessageLoadBalancingType(MessageLoadBalancingType.STRICT).setMaxHops(RandomUtil.randomPositiveInt()).setConfirmationWindowSize(RandomUtil.randomPositiveInt()).setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND).setStaticConnectors(connectors).setCallTimeout(500).setCallFailoverTimeout(500).setProducerWindowSize(1234);

clusterConnectionConfig2 = new ClusterConnectionConfiguration().setName(RandomUtil.randomString()).setAddress(queueConfig.getAddress().toString()).setConnectorName(connectorConfig.getName()).setRetryInterval(RandomUtil.randomPositiveLong()).setDuplicateDetection(RandomUtil.randomBoolean()).setMessageLoadBalancingType(MessageLoadBalancingType.OFF).setMaxHops(RandomUtil.randomPositiveInt()).setConfirmationWindowSize(RandomUtil.randomPositiveInt()).setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND).setDiscoveryGroupName(discoveryGroupName).setCallTimeout(500).setCallFailoverTimeout(500);
clusterConnectionConfig2 = new ClusterConnectionConfiguration().setName(RandomUtil.randomString()).setAddress(queueConfig.getAddress().toString()).setConnectorName(connectorConfig.getName()).setRetryInterval(RandomUtil.randomPositiveLong()).setDuplicateDetection(RandomUtil.randomBoolean()).setMessageLoadBalancingType(MessageLoadBalancingType.OFF).setMaxHops(RandomUtil.randomPositiveInt()).setConfirmationWindowSize(RandomUtil.randomPositiveInt()).setMessageLoadBalancingType(MessageLoadBalancingType.ON_DEMAND).setDiscoveryGroupName(discoveryGroupName).setCallTimeout(500).setCallFailoverTimeout(500).setProducerWindowSize(1234);

Configuration conf_0 = createBasicConfig().addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName())).addConnectorConfiguration(connectorConfig.getName(), connectorConfig).addClusterConfiguration(clusterConnectionConfig1).addClusterConfiguration(clusterConnectionConfig2).addDiscoveryGroupConfiguration(discoveryGroupName, discoveryGroupConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ public void stop() throws Exception {
proxy.invokeOperation("stop");
}

@Override
public long getProducerWindowSize() {
return (Long) proxy.retrieveAttributeValue("producerWindowSize", Long.class);
}
};
}

Expand Down

0 comments on commit 43166e2

Please sign in to comment.