From 3a8c01b7487ad3773b1c3035bc6e593886bf78b1 Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Mon, 29 Apr 2024 13:33:08 +0200 Subject: [PATCH] - Added Metrics to extract metrics for micrometer (https://issues.redhat.com/browse/JGRP-2796) - ResourceDMBean now accepts filters (JGRP-2796) - Added AttributeType.SCALAR to selected attributes --- src/org/jgroups/JChannelProbeHandler.java | 17 +- src/org/jgroups/jmx/ResourceDMBean.java | 83 ++++--- src/org/jgroups/protocols/BARRIER.java | 2 +- src/org/jgroups/protocols/Discovery.java | 2 +- src/org/jgroups/protocols/FD_SOCK2.java | 2 +- src/org/jgroups/protocols/MERGE3.java | 2 +- src/org/jgroups/protocols/MsgStats.java | 6 +- src/org/jgroups/protocols/RED.java | 15 +- src/org/jgroups/protocols/TP.java | 44 ++-- .../protocols/TransferQueueBundler.java | 8 +- src/org/jgroups/protocols/UNICAST3.java | 77 +++---- src/org/jgroups/protocols/pbcast/STABLE.java | 42 ++-- .../protocols/pbcast/STATE_TRANSFER.java | 20 +- src/org/jgroups/util/ExtractMetrics.java | 94 -------- src/org/jgroups/util/Metrics.java | 205 ++++++++++++++++++ src/org/jgroups/util/ThreadPool.java | 8 +- .../org/jgroups/tests/MsgStatsTest.java | 4 +- 17 files changed, 387 insertions(+), 244 deletions(-) delete mode 100644 src/org/jgroups/util/ExtractMetrics.java create mode 100644 src/org/jgroups/util/Metrics.java diff --git a/src/org/jgroups/JChannelProbeHandler.java b/src/org/jgroups/JChannelProbeHandler.java index bcf3ef4051b..5568f28e23b 100644 --- a/src/org/jgroups/JChannelProbeHandler.java +++ b/src/org/jgroups/JChannelProbeHandler.java @@ -5,6 +5,7 @@ import org.jgroups.logging.LogFactory; import org.jgroups.stack.DiagnosticsHandler; import org.jgroups.stack.Protocol; +import org.jgroups.util.Metrics; import org.jgroups.util.Util; import java.lang.management.ManagementFactory; @@ -55,6 +56,20 @@ public Map handleProbe(String... keys) { } continue; } + if(key.startsWith("metrics")) { + Map>> m=Metrics.extract(ch, Metrics.IS_NUMBER); + Map>> metrics=Metrics.convert(m); + StringBuilder sb=new StringBuilder(); + for(Map.Entry>> e: metrics.entrySet()) { + sb.append(String.format("%s:\n", e.getKey())); + for(Map.Entry> e2: e.getValue().entrySet()) { + Metrics.Entry val=e2.getValue(); + sb.append(String.format(" %s: %s\n", e2.getKey(), e2.getValue().supplier().get())); + } + } + map.put(key, sb.toString()); + continue; + } if(key.startsWith("threads")) { ThreadMXBean bean=ManagementFactory.getThreadMXBean(); boolean cpu_supported=bean.isThreadCpuTimeSupported(); @@ -178,7 +193,7 @@ else if(val.startsWith("wtime")) } public String[] supportedKeys() { - return new String[]{"reset-stats", "jmx", "op=[]", "ops", + return new String[]{"reset-stats", "jmx", "op=[]", "ops", "metrics", "threads[=[=]]", "enable-cpu", "enable-contention", "disable-cpu", "disable-contention"}; } diff --git a/src/org/jgroups/jmx/ResourceDMBean.java b/src/org/jgroups/jmx/ResourceDMBean.java index 977ddaa2012..4762a671fcf 100755 --- a/src/org/jgroups/jmx/ResourceDMBean.java +++ b/src/org/jgroups/jmx/ResourceDMBean.java @@ -7,6 +7,7 @@ import org.jgroups.conf.AttributeType; import org.jgroups.logging.Log; import org.jgroups.logging.LogFactory; +import org.jgroups.util.Average; import org.jgroups.util.Util; import javax.management.*; @@ -43,7 +44,7 @@ public class ResourceDMBean implements DynamicMBean { protected final boolean expose_all; protected final Log log=LogFactory.getLog(ResourceDMBean.class); protected final Object obj; - protected List objs; + protected List components; protected final MBeanAttributeInfo[] attrInfo; protected final MBeanOperationInfo[] opInfo; protected final HashMap atts=new HashMap<>(); @@ -54,27 +55,31 @@ public class ResourceDMBean implements DynamicMBean { protected static final Predicate FILTER=obj -> obj.isAnnotationPresent(ManagedAttribute.class) || (obj.isAnnotationPresent(Property.class) && obj.getAnnotation(Property.class).exposeAsManagedAttribute()); - public ResourceDMBean(Object instance) { + this(instance, null); + } + + public ResourceDMBean(Object instance, Predicate filter) { if(instance == null) throw new NullPointerException("Cannot make an MBean wrapper for null instance"); this.obj=instance; Class c=obj.getClass(); expose_all=c.isAnnotationPresent(MBean.class) && c.getAnnotation(MBean.class).exposeAll(); - findFields(instance); - findMethods(instance); + findFields(instance, filter, null); + findMethods(instance, filter, null); fixFields(instance); List objects=Util.getComponents(instance); if(objects != null) { for(Object inst: objects) { if(inst != null) { - if(objs == null) - objs=new ArrayList<>(); - objs.add(inst); - findFields(inst); - findMethods(inst); + if(components == null) + components=new ArrayList<>(); + String prefix=Util.methodNameToAttributeName(inst.getClass().getSimpleName()); + components.add(inst); + findFields(inst, filter, prefix); + findMethods(inst, filter, prefix); fixFields(inst); } } @@ -150,8 +155,8 @@ public Object invoke(String name, Object[] args, String[] sig) throws MBeanExcep for(int i=0;i < classes.length;i++) classes[i]=getClassForName(sig[i]); Method method=null; - if(objs != null) { - for(Object o: objs) { + if(components != null) { + for(Object o: components) { try { method=o.getClass().getMethod(name, classes); } @@ -273,6 +278,10 @@ public static boolean isFractional(Class cl) { return cl.equals(float.class) || cl.equals(Float.class) || cl.equals(double.class) || cl.equals(Double.class); } + public static boolean isNumber(Class cl) { + return isNumeric(cl) || isFractional(cl) || Number.class.isAssignableFrom(cl) || Average.class.isAssignableFrom(cl); + } + protected static AttributeType getType(AccessibleObject ao) { Property prop=ao.getAnnotation(Property.class); @@ -306,15 +315,18 @@ protected static Class getClassForName(String name) throws ClassNotFoundExcep throw new ClassNotFoundException("Class " + name + " cannot be found"); } - protected void findMethods(Object instance) { + protected void findMethods(Object instance, Predicate filter, String prefix) { // find all methods but don't include methods from Object class List methods = new ArrayList<>(Arrays.asList(instance.getClass().getMethods())); methods.removeAll(OBJECT_METHODS); for(Method method: methods) { + // does method have @ManagedAttribute annotation? if(method.isAnnotationPresent(ManagedAttribute.class) || method.isAnnotationPresent(Property.class)) { - exposeManagedAttribute(method, instance); + if(filter != null && !filter.test(method)) + continue; + exposeManagedAttribute(method, instance, prefix); } //or @ManagedOperation else if (method.isAnnotationPresent(ManagedOperation.class) || expose_all){ @@ -338,7 +350,7 @@ protected void fixFields(Object instance) { - protected void exposeManagedAttribute(Method method, Object instance) { + protected void exposeManagedAttribute(Method method, Object instance, String prefix) { String methodName=method.getName(); ManagedAttribute attr_annotation=method.getAnnotation(ManagedAttribute.class); Property prop=method.getAnnotation(Property.class); @@ -367,7 +379,7 @@ protected void exposeManagedAttribute(Method method, Object instance) { } String descr=attr_annotation != null ? attr_annotation.description() : prop != null? prop.description() : null; - AttributeEntry attr=atts.get(attr_name); + AttributeEntry attr=atts.get(prefix(prefix,attr_name)); if(attr != null) { if(isSetMethod(method)) { if(attr.setter != null) { @@ -389,17 +401,19 @@ protected void exposeManagedAttribute(Method method, Object instance) { else { // create a new entry in atts boolean is_setter=isSetMethod(method); String type=is_setter? method.getParameterTypes()[0].getCanonicalName() : method.getReturnType().getCanonicalName(); - MBeanAttributeInfo info=new MBeanAttributeInfo(attr_name, type, descr, true, writable, methodName.startsWith("is")); - AttributeEntry entry=new AttributeEntry(Util.methodNameToAttributeName(methodName), info); + MBeanAttributeInfo info=new MBeanAttributeInfo(prefix(attr_name, prefix), type, descr, true, writable, methodName.startsWith("is")); + AttributeEntry entry=new AttributeEntry(method, Util.methodNameToAttributeName(methodName), info); if(is_setter) entry.setter(new MethodAccessor(method, instance)); else entry.getter(new MethodAccessor(method, instance)); - atts.put(attr_name, entry); + atts.put(prefix(attr_name, prefix), entry); } } - + protected static String prefix(String s, String prefix) { + return prefix == null? s : prefix + "." + s; + } /** Finds an accessor for an attribute. Tries to find getAttrName(), isAttrName(), attrName() methods. If not * found, tries to use reflection to get the value of attr_name. If still not found, creates a NullAccessor. */ @@ -416,7 +430,6 @@ protected static Accessor findGetter(Object target, String attr_name) { Field field=Util.getField(clazz, attr_name); if(field != null) return new FieldAccessor(field, target); - return new NoopAccessor(); } @@ -461,24 +474,28 @@ protected static String toLowerCase(String input) { } - protected void findFields(Object instance) { + protected void findFields(Object instance, Predicate filter, String prefix) { // traverse class hierarchy and find all annotated fields for(Class clazz=instance.getClass(); clazz != null && clazz != Object.class; clazz=clazz.getSuperclass()) { - Field[] fields=clazz.getDeclaredFields(); for(Field field: fields) { - ManagedAttribute attr=field.getAnnotation(ManagedAttribute.class); + ManagedAttribute annotation=field.getAnnotation(ManagedAttribute.class); Property prop=field.getAnnotation(Property.class); boolean expose_prop=prop != null && prop.exposeAsManagedAttribute(); - boolean expose=attr != null || expose_prop; + boolean expose=annotation != null || expose_prop; if(expose) { - String fieldName=attr != null? attr.name() : (prop != null? prop.name() : null); + if(filter != null && !filter.test(field)) + continue; + + String fieldName=annotation != null? annotation.name() : (prop != null? prop.name() : null); if(fieldName != null && fieldName.trim().isEmpty()) fieldName=field.getName(); + if(prefix != null) + fieldName=prefix + "." + fieldName; - String descr=attr != null? attr.description() : prop.description(); - boolean writable=attr != null? attr.writable() : prop.writable(); + String descr=annotation != null? annotation.description() : prop.description(); + boolean writable=annotation != null? annotation.writable() : prop.writable(); MBeanAttributeInfo info=new MBeanAttributeInfo(fieldName, field.getType().getCanonicalName(), @@ -487,7 +504,7 @@ protected void findFields(Object instance) { !Modifier.isFinal(field.getModifiers()) && writable, false); - atts.put(fieldName, new AttributeEntry(field.getName(), info)); + atts.put(fieldName, new AttributeEntry(field, field.getName(), info)); } } } @@ -531,7 +548,7 @@ protected boolean setNamedAttribute(Attribute attribute) { public static class AttributeEntry { - + protected final AccessibleObject type; // method of field /** The name of the field or method. Can be different from the key in atts when name in @Property or * @ManagedAttribute was used */ protected final String name; @@ -539,17 +556,19 @@ public static class AttributeEntry { protected Accessor getter; protected Accessor setter; - public AttributeEntry(String name, MBeanAttributeInfo info) { - this(name, info, null, null); + public AttributeEntry(AccessibleObject type, String name, MBeanAttributeInfo info) { + this(type, name, info, null, null); } - public AttributeEntry(String name, MBeanAttributeInfo info, Accessor getter, Accessor setter) { + public AttributeEntry(AccessibleObject type, String name, MBeanAttributeInfo info, Accessor getter, Accessor setter) { + this.type=type; this.name=name; this.info=info; this.getter=getter; this.setter=setter; } + public AccessibleObject type() {return type;} public String name() {return name;} public MBeanAttributeInfo info() {return info;} public Accessor getter() {return getter;} diff --git a/src/org/jgroups/protocols/BARRIER.java b/src/org/jgroups/protocols/BARRIER.java index 69798c49181..88b2b074a91 100644 --- a/src/org/jgroups/protocols/BARRIER.java +++ b/src/org/jgroups/protocols/BARRIER.java @@ -86,7 +86,7 @@ public int getNumberOfInFlightThreads() { return in_flight_threads.size(); } - @ManagedAttribute + @ManagedAttribute(description="Number of threads in flight",type=AttributeType.SCALAR) public int getInFlightThreadsCount() { return getNumberOfInFlightThreads(); } diff --git a/src/org/jgroups/protocols/Discovery.java b/src/org/jgroups/protocols/Discovery.java index 16164966c33..12f50d2dea5 100644 --- a/src/org/jgroups/protocols/Discovery.java +++ b/src/org/jgroups/protocols/Discovery.java @@ -87,7 +87,7 @@ public abstract class Discovery extends Protocol { /* --------------------------------------------- JMX ------------------------------------------------------ */ - @ManagedAttribute(description="Total number of discovery requests sent ") + @ManagedAttribute(description="Total number of discovery requests sent",type=AttributeType.SCALAR) protected int num_discovery_requests; /* --------------------------------------------- Fields ------------------------------------------------------ */ diff --git a/src/org/jgroups/protocols/FD_SOCK2.java b/src/org/jgroups/protocols/FD_SOCK2.java index 78c9993fa0d..63cfc21e203 100644 --- a/src/org/jgroups/protocols/FD_SOCK2.java +++ b/src/org/jgroups/protocols/FD_SOCK2.java @@ -75,7 +75,7 @@ public class FD_SOCK2 extends Protocol implements Receiver, ConnectionListener, @Property(description="SO_LINGER in seconds. Default of -1 disables it") protected int linger=-1; // SO_LINGER (number of seconds, -1 disables it) - @ManagedAttribute(description="Number of suspect events emitted") + @ManagedAttribute(description="Number of suspect events emitted",type=AttributeType.SCALAR) protected int num_suspect_events; @ManagedAttribute(description="True when this member is leaving the cluster, set to false when joining") diff --git a/src/org/jgroups/protocols/MERGE3.java b/src/org/jgroups/protocols/MERGE3.java index d9ff61656b3..5838c04a4a8 100644 --- a/src/org/jgroups/protocols/MERGE3.java +++ b/src/org/jgroups/protocols/MERGE3.java @@ -89,7 +89,7 @@ public class MERGE3 extends Protocol { @ManagedAttribute(description="Whether or not the current member is the coordinator") protected volatile boolean is_coord; - @ManagedAttribute(description="Number of times a MERGE event was sent up the stack") + @ManagedAttribute(description="Number of times a MERGE event was sent up the stack",type=AttributeType.SCALAR) protected int num_merge_events; diff --git a/src/org/jgroups/protocols/MsgStats.java b/src/org/jgroups/protocols/MsgStats.java index ee4217f0d4e..e2c7e091d3d 100644 --- a/src/org/jgroups/protocols/MsgStats.java +++ b/src/org/jgroups/protocols/MsgStats.java @@ -45,6 +45,7 @@ public class MsgStats { protected final LongAdder num_batches_received=new LongAdder(); /** The average number of messages in a received {@link MessageBatch} */ + @ManagedAttribute(description="Returns the average batch size of received batches") protected final AverageMinMax avg_batch_size=new AverageMinMax(); @ManagedAttribute(description="Number of multicast bytes sent",type=BYTES) @@ -66,10 +67,7 @@ public class MsgStats { @ManagedAttribute(description="Number of messages received (mcasts and ucasts received)",type=SCALAR) public long getNumMsgsReceived() {return num_mcasts_received.sum() + num_ucasts_received.sum();} - @ManagedAttribute(description="Returns the average batch size of received batches") - public String getAvgBatchSize() {return avg_batch_size.toString();} - - public AverageMinMax avgBatchSize() {return avg_batch_size;} + public AverageMinMax getAvgBatchSize() {return avg_batch_size;} @ManagedAttribute(description="Total number of bytes sent (unicast + multicast bytes)",type=BYTES) public long getNumBytesSent() {return num_mcast_bytes_sent.sum() + num_ucast_bytes_sent.sum();} diff --git a/src/org/jgroups/protocols/RED.java b/src/org/jgroups/protocols/RED.java index f86af1de55a..1929e7df61b 100644 --- a/src/org/jgroups/protocols/RED.java +++ b/src/org/jgroups/protocols/RED.java @@ -48,7 +48,9 @@ public class RED extends Protocol { "longer to reflect the current queue size.") protected double weight_factor=1; + @ManagedAttribute(description="The number of dropped messages",type=AttributeType.SCALAR) protected final LongAdder dropped_msgs=new LongAdder(); // dropped messages + @ManagedAttribute(description="Total number of messages processed",type=AttributeType.SCALAR) protected final LongAdder total_msgs=new LongAdder(); // total messages looked at protected Bundler bundler; @@ -60,17 +62,10 @@ public class RED extends Protocol { public boolean isEnabled() {return enabled;} public RED setEnabled(boolean e) {enabled=e; return this;} public double getMinThreshold() {return min_threshold;} - - - - @ManagedAttribute(description="The number of dropped messages",type=AttributeType.SCALAR) - public long getDroppedMessages() {return dropped_msgs.sum();} - - @ManagedAttribute(description="Total number of messages processed",type=AttributeType.SCALAR) - public long getTotalMessages() {return total_msgs.sum();} - + public long getDroppedMessages() {return dropped_msgs.sum();} + public long getTotalMessages() {return total_msgs.sum();} @ManagedAttribute(description="Percentage of all messages that were dropped") - public double getDropRate() {return dropped_msgs.sum() / (double)total_msgs.sum();} + public double getDropRate() {return dropped_msgs.sum() / (double)total_msgs.sum();} public void start() throws Exception { diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index f3c657cd2e0..963d372a2b5 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -28,6 +28,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import static org.jgroups.conf.AttributeType.SCALAR; + /** * Generic transport - specific implementations should extend this abstract class. @@ -186,55 +188,55 @@ public String getBundlerClass() { public MessageFactory getMessageFactory() {return msg_factory;} public T setMessageFactory(MessageFactory m) {msg_factory=m; return (T)this;} - public InetAddress getBindAddr() {return bind_addr;} + public InetAddress getBindAddr() {return bind_addr;} public T setBindAddr(InetAddress b) {this.bind_addr=b; return (T)this;} - public InetAddress getExternalAddr() {return external_addr;} + public InetAddress getExternalAddr() {return external_addr;} public T setExternalAddr(InetAddress e) {this.external_addr=e; return (T)this;} - public int getExternalPort() {return external_port;} + public int getExternalPort() {return external_port;} public T setExternalPort(int e) {this.external_port=e; return (T)this;} - public boolean isTrace() {return is_trace;} + public boolean isTrace() {return is_trace;} public T isTrace(boolean i) {this.is_trace=i; return (T)this;} - public boolean receiveOnAllInterfaces() {return receive_on_all_interfaces;} + public boolean receiveOnAllInterfaces() {return receive_on_all_interfaces;} public T receiveOnAllInterfaces(boolean r) {this.receive_on_all_interfaces=r; return (T)this;} - public int getLogicalAddrCacheMaxSize() {return logical_addr_cache_max_size;} + public int getLogicalAddrCacheMaxSize() {return logical_addr_cache_max_size;} public T setLogicalAddrCacheMaxSize(int l) {this.logical_addr_cache_max_size=l; return (T)this;} - public long getLogicalAddrCacheExpiration() {return logical_addr_cache_expiration;} + public long getLogicalAddrCacheExpiration() {return logical_addr_cache_expiration;} public T setLogicalAddrCacheExpiration(long l) {this.logical_addr_cache_expiration=l; return (T)this;} - public long getLogicalAddrCacheReaperInterval() {return logical_addr_cache_reaper_interval;} + public long getLogicalAddrCacheReaperInterval() {return logical_addr_cache_reaper_interval;} public T setLogicalAddrCacheReaperInterval(long l) {this.logical_addr_cache_reaper_interval=l; return (T)this;} - public boolean loopbackCopy() {return loopback_copy;} + public boolean loopbackCopy() {return loopback_copy;} public T loopbackCopy(boolean l) {this.loopback_copy=l; return (T)this;} - public boolean loopbackSeparateThread() {return loopback_separate_thread;} + public boolean loopbackSeparateThread() {return loopback_separate_thread;} public T loopbackSeparateThread(boolean l) {this.loopback_separate_thread=l; return (T)this;} public boolean useVirtualThreads() {return use_virtual_threads;} public T useVirtualThreads(boolean b) {use_virtual_threads=b; return (T)this;} - public long getTimeServiceInterval() {return time_service_interval;} + public long getTimeServiceInterval() {return time_service_interval;} public T setTimeServiceInterval(long t) {this.time_service_interval=t; return (T)this;} - public boolean logDiscardMsgs() {return log_discard_msgs;} + public boolean logDiscardMsgs() {return log_discard_msgs;} public T logDiscardMsgs(boolean l) {this.log_discard_msgs=l; return (T)this;} - public boolean logDiscardMsgsVersion() {return log_discard_msgs_version;} + public boolean logDiscardMsgsVersion() {return log_discard_msgs_version;} public T logDiscardMsgsVersion(boolean l) {this.log_discard_msgs_version=l; return (T)this;} - public long getWhoHasCacheTimeout() {return who_has_cache_timeout;} + public long getWhoHasCacheTimeout() {return who_has_cache_timeout;} public T setWhoHasCacheTimeout(long w) {this.who_has_cache_timeout=w; return (T)this;} - public long getSuppressTimeDifferentVersionWarnings() {return suppress_time_different_version_warnings;} + public long getSuppressTimeDifferentVersionWarnings() {return suppress_time_different_version_warnings;} public T setSuppressTimeDifferentVersionWarnings(long s) {this.suppress_time_different_version_warnings=s; return (T)this;} - public long getSuppressTimeDifferentClusterWarnings() {return suppress_time_different_cluster_warnings;} + public long getSuppressTimeDifferentClusterWarnings() {return suppress_time_different_cluster_warnings;} public T setSuppressTimeDifferentClusterWarnings(long s) {this.suppress_time_different_cluster_warnings=s; return (T)this;} public String getMsgFactoryClass() {return msg_factory_class;} @@ -313,12 +315,12 @@ public String getClusterName() { public AsciiString getClusterNameAscii() {return cluster_name;} - @ManagedAttribute(description="Number of messages from members in a different cluster") + @ManagedAttribute(description="Number of messages from members in a different cluster",type=SCALAR) public int getDifferentClusterMessages() { return suppress_log_different_cluster != null? suppress_log_different_cluster.getCache().size() : 0; } - @ManagedAttribute(description="Number of messages from members with a different JGroups version") + @ManagedAttribute(description="Number of messages from members with a different JGroups version",type=SCALAR) public int getDifferentVersionMessages() { return suppress_log_different_version != null? suppress_log_different_version.getCache().size() : 0; } @@ -628,7 +630,7 @@ public SocketFactory getSocketFactory() { - @ManagedAttribute(name="timer_tasks",description="Number of timer tasks queued up for execution") + @ManagedAttribute(name="timer_tasks",description="Number of timer tasks queued up for execution",type=SCALAR) public int getNumTimerTasks() {return timer.size();} @ManagedOperation public String dumpTimerTasks() {return timer.dumpTimerTasks();} @@ -636,10 +638,10 @@ public SocketFactory getSocketFactory() { @ManagedOperation(description="Purges cancelled tasks from the timer queue") public void removeCancelledTimerTasks() {timer.removeCancelledTasks();} - @ManagedAttribute(description="Number of threads currently in the pool") + @ManagedAttribute(description="Number of threads currently in the pool",type=SCALAR) public int getTimerThreads() {return timer.getCurrentThreads();} - @ManagedAttribute(description="Returns the number of live threads in the JVM") + @ManagedAttribute(description="Returns the number of live threads in the JVM",type=SCALAR) public static int getNumThreads() {return ManagementFactory.getThreadMXBean().getThreadCount();} public T setLogDiscardMessages(boolean flag) {log_discard_msgs=flag; return (T)this;} diff --git a/src/org/jgroups/protocols/TransferQueueBundler.java b/src/org/jgroups/protocols/TransferQueueBundler.java index 28a7f269fb0..f8b6e17239e 100644 --- a/src/org/jgroups/protocols/TransferQueueBundler.java +++ b/src/org/jgroups/protocols/TransferQueueBundler.java @@ -28,13 +28,13 @@ public class TransferQueueBundler extends BaseBundler implements Runnable { protected boolean drop_when_full=true; protected volatile boolean running=true; - @ManagedAttribute(description="Number of times a message was sent because the queue was full", type= SCALAR) + @ManagedAttribute(description="Number of times a message was sent because the queue was full", type=SCALAR) protected long num_sends_because_full_queue; @ManagedAttribute(description="Number of times a message was sent because there was no message available in the queue", - type= SCALAR) + type=SCALAR) protected long num_sends_because_no_msgs; - @ManagedAttribute(description="Number of dropped messages (when drop_when_full is true)") + @ManagedAttribute(description="Number of dropped messages (when drop_when_full is true)",type=SCALAR) protected long num_drops_on_full_queue; @ManagedAttribute(description="Average fill size of the queue (in bytes)") @@ -137,7 +137,7 @@ public void run() { } } catch(Throwable t) { - log.warn("%s: failed sending message: %s", transport.addr(), t); + log.trace("%s: failed sending message: %s", transport.addr(), t); } } } diff --git a/src/org/jgroups/protocols/UNICAST3.java b/src/org/jgroups/protocols/UNICAST3.java index 77b350fbba0..daa94726b93 100644 --- a/src/org/jgroups/protocols/UNICAST3.java +++ b/src/org/jgroups/protocols/UNICAST3.java @@ -26,6 +26,7 @@ import static org.jgroups.Message.Flag.*; import static org.jgroups.Message.TransientFlag.*; +import static org.jgroups.conf.AttributeType.SCALAR; import static org.jgroups.protocols.UnicastHeader3.DATA; @@ -100,21 +101,27 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler
{ /* --------------------------------------------- JMX ---------------------------------------------- */ - - protected long num_msgs_sent=0, num_msgs_received=0; - protected long num_acks_sent=0, num_acks_received=0, num_xmits=0; - - @ManagedAttribute(description="Number of retransmit requests received",type=AttributeType.SCALAR) + @ManagedAttribute(description="Number of message sent",type=SCALAR) + protected final LongAdder num_msgs_sent=new LongAdder(); + @ManagedAttribute(description="Number of message received",type=SCALAR) + protected final LongAdder num_msgs_received=new LongAdder(); + @ManagedAttribute(description="Number of acks sent",type=SCALAR) + protected final LongAdder num_acks_sent=new LongAdder(); + @ManagedAttribute(description="Number of acks received",type=SCALAR) + protected final LongAdder num_acks_received=new LongAdder(); + @ManagedAttribute(description="Number of retransmitted messages",type=SCALAR) + protected final LongAdder num_xmits=new LongAdder(); + + @ManagedAttribute(description="Number of retransmit requests received",type=SCALAR) protected final LongAdder xmit_reqs_received=new LongAdder(); - @ManagedAttribute(description="Number of retransmit requests sent",type=AttributeType.SCALAR) + @ManagedAttribute(description="Number of retransmit requests sent",type=SCALAR) protected final LongAdder xmit_reqs_sent=new LongAdder(); - @ManagedAttribute(description="Number of retransmit responses sent",type=AttributeType.SCALAR) + @ManagedAttribute(description="Number of retransmit responses sent",type=SCALAR) protected final LongAdder xmit_rsps_sent=new LongAdder(); - @ManagedAttribute(description="Number of unicast messages to self looped back up",type=AttributeType.SCALAR) - public long getNumLoopbacks() {return loopbed_back_msgs.sum();} + public long getNumLoopbacks() {return num_loopbacks.sum();} @ManagedAttribute(description="Average batch size of messages delivered to the application") protected final AverageMinMax avg_delivery_batch_size=new AverageMinMax(); @@ -160,7 +167,8 @@ public class UNICAST3 extends Protocol implements AgeOutCache.Handler
{ /** Keep track of when a SEND_FIRST_SEQNO message was sent to a given sender */ protected ExpiryCache
last_sync_sent; - protected final LongAdder loopbed_back_msgs=new LongAdder(); + @ManagedAttribute(description="Number of unicast messages to self looped back up",type=SCALAR) + protected final LongAdder num_loopbacks=new LongAdder(); // Queues messages until a {@link ReceiverEntry} has been created. Queued messages are then removed from // the cache and added to the ReceiverEntry @@ -259,16 +267,12 @@ public String printConnections() { return sb.toString(); } - @ManagedAttribute(type=AttributeType.SCALAR) - public long getNumMessagesSent() {return num_msgs_sent;} - @ManagedAttribute(type=AttributeType.SCALAR) - public long getNumMessagesReceived() {return num_msgs_received;} - @ManagedAttribute(type=AttributeType.SCALAR) - public long getNumAcksSent() {return num_acks_sent;} - @ManagedAttribute(type=AttributeType.SCALAR) - public long getNumAcksReceived() {return num_acks_received;} - @ManagedAttribute(type=AttributeType.SCALAR) - public long getNumXmits() {return num_xmits;} + + public long getNumMessagesSent() {return num_msgs_sent.sum();} + public long getNumMessagesReceived() {return num_msgs_received.sum();} + public long getNumAcksSent() {return num_acks_sent.sum();} + public long getNumAcksReceived() {return num_acks_received.sum();} + public long getNumXmits() {return num_xmits.sum();} public long getMaxRetransmitTime() {return max_retransmit_time;} @Property(description="Max number of milliseconds we try to retransmit a message to any given member. After that, " + @@ -305,42 +309,42 @@ public boolean hasSendConnectionTo(Address dest) { } /** The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet) */ - @ManagedAttribute(type=AttributeType.SCALAR) + @ManagedAttribute(type= SCALAR) public int getNumUnackedMessages() { return accumulate(Table::size, send_table.values()); } - @ManagedAttribute(description="Total number of undelivered messages in all receive windows",type=AttributeType.SCALAR) + @ManagedAttribute(description="Total number of undelivered messages in all receive windows",type=SCALAR) public int getXmitTableUndeliveredMessages() { return accumulate(Table::size, recv_table.values()); } - @ManagedAttribute(description="Total number of missing messages in all receive windows",type=AttributeType.SCALAR) + @ManagedAttribute(description="Total number of missing messages in all receive windows",type=SCALAR) public int getXmitTableMissingMessages() { return accumulate(Table::getNumMissing, recv_table.values()); } - @ManagedAttribute(description="Total number of deliverable messages in all receive windows",type=AttributeType.SCALAR) + @ManagedAttribute(description="Total number of deliverable messages in all receive windows",type=SCALAR) public int getXmitTableDeliverableMessages() { return accumulate(Table::getNumDeliverable, recv_table.values()); } - @ManagedAttribute(description="Number of compactions in all (receive and send) windows") + @ManagedAttribute(description="Number of compactions in all (receive and send) windows",type=SCALAR) public int getXmitTableNumCompactions() { return accumulate(Table::getNumCompactions, recv_table.values(), send_table.values()); } - @ManagedAttribute(description="Number of moves in all (receive and send) windows") + @ManagedAttribute(description="Number of moves in all (receive and send) windows",type=SCALAR) public int getXmitTableNumMoves() { return accumulate(Table::getNumMoves, recv_table.values(), send_table.values()); } - @ManagedAttribute(description="Number of resizes in all (receive and send) windows") + @ManagedAttribute(description="Number of resizes in all (receive and send) windows",type=SCALAR) public int getXmitTableNumResizes() { return accumulate(Table::getNumResizes, recv_table.values(), send_table.values()); } - @ManagedAttribute(description="Number of purges in all (receive and send) windows") + @ManagedAttribute(description="Number of purges in all (receive and send) windows",type=SCALAR) public int getXmitTableNumPurges() { return accumulate(Table::getNumPurges, recv_table.values(), send_table.values()); } @@ -369,10 +373,9 @@ public String printSendWindowMessages() { public void resetStats() { - num_msgs_sent=num_msgs_received=num_acks_sent=num_acks_received=num_xmits=0; avg_delivery_batch_size.clear(); - Stream.of(xmit_reqs_received, xmit_reqs_sent, xmit_rsps_sent).forEach(LongAdder::reset); - loopbed_back_msgs.reset(); + Stream.of(num_msgs_sent, num_msgs_received, num_acks_sent, num_acks_received, num_xmits, xmit_reqs_received, + xmit_reqs_sent, xmit_rsps_sent, num_loopbacks).forEach(LongAdder::reset); } @@ -676,7 +679,7 @@ public Object down(Message msg) { if(loopback && Objects.equals(local_addr, dst)) {// https://issues.redhat.com/browse/JGRP-2547 if(msg.isFlagSet(DONT_LOOPBACK)) return null; - loopbed_back_msgs.increment(); + num_loopbacks.increment(); return up_prot.up(msg); } @@ -714,7 +717,7 @@ public Object down(Message msg) { log.trace(sb); } - num_msgs_sent++; + num_msgs_sent.increment(); return down_prot.down(msg); } @@ -818,7 +821,7 @@ protected void retransmit(Message msg) { log.trace("%s --> %s: resending(#%d)", local_addr, msg.getDest(), seqno); } resend(msg); - num_xmits++; + num_xmits.increment(); } /** @@ -1064,7 +1067,7 @@ protected void handleAckReceived(Address sender, long seqno, short conn_id, int if(win != null && entry.updateLastTimestamp(timestamp)) { // win.forEach(win.getLow(), seqno, null); win.purge(seqno, true); // removes all messages <= seqno (forced purge) - num_acks_received++; + num_acks_received.increment(); } } @@ -1198,7 +1201,7 @@ protected void sendAck(Address dst, long seqno, short conn_id, Address real_dest log.trace("%s --> %s: ACK(#%d)", local_addr, dst, seqno); try { down_prot.down(ack); - num_acks_sent++; + num_acks_sent.increment(); } catch(Throwable t) { log.error(Util.getMessage("FailedSendingAck"), local_addr, seqno, dst, t); @@ -1416,7 +1419,7 @@ protected void update(Entry entry, int num_received) { entry.update(); if(entry.state() == State.CLOSING) entry.state(State.OPEN); - num_msgs_received+=num_received; + num_msgs_received.add(num_received); } /** Compares 2 timestamps, handles numeric overflow */ diff --git a/src/org/jgroups/protocols/pbcast/STABLE.java b/src/org/jgroups/protocols/pbcast/STABLE.java index 159999a1abd..7f6cfabbf28 100644 --- a/src/org/jgroups/protocols/pbcast/STABLE.java +++ b/src/org/jgroups/protocols/pbcast/STABLE.java @@ -17,13 +17,16 @@ import java.util.Objects; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; +import java.util.stream.Stream; import static org.jgroups.Message.Flag.*; import static org.jgroups.Message.TransientFlag.DONT_BLOCK; import static org.jgroups.Message.TransientFlag.DONT_LOOPBACK; +import static org.jgroups.conf.AttributeType.SCALAR; /** @@ -55,16 +58,20 @@ public class STABLE extends Protocol { * be broadcast and{@code num_bytes_received} reset to 0 . If this is > 0, then ideally {@code stability_delay} * should be set to a low number as well */ - @Property(description="Maximum number of bytes received in all messages before sending a STABLE message is triggered", + @Property(description="Maximum number of bytes received in all messages before sending a STABLE message", type=AttributeType.BYTES) protected long max_bytes=2000000; /* --------------------------------------------- JMX ---------------------------------------------- */ - protected int num_stable_msgs_sent; - protected int num_stable_msgs_received; - protected int num_stability_msgs_sent; - protected int num_stability_msgs_received; + @ManagedAttribute(description="Number of stable messages sent",type= SCALAR) + protected final LongAdder num_stable_msgs_sent=new LongAdder(); + @ManagedAttribute(description="Number of stable messages received",type= SCALAR) + protected final LongAdder num_stable_msgs_received=new LongAdder(); + @ManagedAttribute(description="Number of stability messages sent",type= SCALAR) + protected final LongAdder num_stability_msgs_sent=new LongAdder(); + @ManagedAttribute(description="Number of stability messages received",type= SCALAR) + protected final LongAdder num_stability_msgs_received=new LongAdder(); /* --------------------------------------------- Fields ------------------------------------------------------ */ @@ -121,14 +128,6 @@ public STABLE() { // @ManagedAttribute(name="bytes_received") public long getBytes() {return num_bytes_received;} - @ManagedAttribute(type=AttributeType.SCALAR) - public int getStableSent() {return num_stable_msgs_sent;} - @ManagedAttribute(type=AttributeType.SCALAR) - public int getStableReceived() {return num_stable_msgs_received;} - @ManagedAttribute(type=AttributeType.SCALAR) - public int getStabilitySent() {return num_stability_msgs_sent;} - @ManagedAttribute(type=AttributeType.SCALAR) - public int getStabilityReceived() {return num_stability_msgs_received;} @ManagedAttribute(description="The number of votes for the current digest") public int getNumVotes() {return votes != null? votes.cardinality() : 0;} @@ -162,7 +161,8 @@ public String printVotes() { public void resetStats() { super.resetStats(); - num_stability_msgs_received=num_stability_msgs_sent=num_stable_msgs_sent=num_stable_msgs_received=0; + Stream.of(num_stability_msgs_received,num_stability_msgs_sent,num_stable_msgs_sent,num_stable_msgs_received) + .forEach(LongAdder::reset); } @@ -488,7 +488,7 @@ protected void handleStableMessage(final Digest d, final Address sender, final V int rank=getRank(sender, view); if(rank < 0 || votes.get(rank)) // already received gossip from sender; discard it return; - num_stable_msgs_received++; + num_stable_msgs_received.increment(); updateLocalDigest(d, sender); if(addVote(rank)) { // votes from all members have been received stable_digest=digest; // no need to copy, as digest (although mutable) is reassigned below @@ -544,7 +544,7 @@ protected void handleStabilityMessage(final Digest stable_digest, final Address return; } log.trace("%s: received stability msg from %s: %s", local_addr, sender, printDigest(stable_digest)); - num_stability_msgs_received++; + num_stability_msgs_received.increment(); resetDigest(); } finally { @@ -576,7 +576,7 @@ protected void sendStableMessage(boolean send_in_background) { // don't send a STABLE message to self when coord, but instead update the digest directly if(is_coord) { log.trace("%s: updating the local digest with a stable message (coordinator): %s", local_addr, d); - num_stable_msgs_sent++; + num_stable_msgs_sent.increment(); handleStableMessage(d, local_addr, current_view.getViewId()); return; } @@ -586,14 +586,14 @@ protected void sendStableMessage(boolean send_in_background) { .putHeader(this.id, new StableHeader(StableHeader.STABLE_GOSSIP, current_view.getViewId())); try { if(!send_in_background) { - num_stable_msgs_sent++; + num_stable_msgs_sent.increment(); down_prot.down(msg); return; } Runnable r=new Runnable() { public void run() { down_prot.down(msg); - num_stable_msgs_sent++; + num_stable_msgs_sent.increment(); } public String toString() { @@ -628,8 +628,8 @@ protected void sendStabilityMessage(Digest d, final ViewId view_id) { .setFlag(OOB, NO_RELIABILITY, NO_RELAY, NO_FC).setFlag(DONT_LOOPBACK,DONT_BLOCK) .putHeader(id, new StableHeader(StableHeader.STABILITY, view_id)); log.trace("%s: sending stability msg %s", local_addr, printDigest(d)); - num_stability_msgs_sent++; - num_stability_msgs_received++; // since we don't receive this message + num_stability_msgs_sent.increment(); + num_stability_msgs_received.increment(); // since we don't receive this message down_prot.down(msg); } catch(Exception e) { diff --git a/src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java b/src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java index 514bcbb91d7..0ada5220566 100644 --- a/src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java +++ b/src/org/jgroups/protocols/pbcast/STATE_TRANSFER.java @@ -4,7 +4,6 @@ import org.jgroups.annotations.MBean; import org.jgroups.annotations.ManagedAttribute; import org.jgroups.annotations.ManagedOperation; -import org.jgroups.conf.AttributeType; import org.jgroups.stack.Protocol; import org.jgroups.stack.StateTransferInfo; import org.jgroups.util.Digest; @@ -20,6 +19,9 @@ import java.util.concurrent.atomic.LongAdder; import java.util.function.Supplier; +import static org.jgroups.conf.AttributeType.BYTES; +import static org.jgroups.conf.AttributeType.SCALAR; + /** * STATE_TRANSFER protocol based on byte array transfer. A state request is sent * to a chosen member (coordinator if null). That member makes a copy D of its @@ -33,9 +35,12 @@ @MBean(description="State transfer protocol based on byte array transfer") public class STATE_TRANSFER extends Protocol implements ProcessingQueue.Handler
{ protected long start, stop; // to measure state transfer time + @ManagedAttribute(description="The number of state requests",type=SCALAR) protected final LongAdder num_state_reqs=new LongAdder(); + @ManagedAttribute(description="The number of bytes sent (total state)",type=BYTES) protected final LongAdder num_bytes_sent=new LongAdder(); - protected double avg_state_size=0; + @ManagedAttribute(description="The average state size (in bytes)",type=BYTES) + protected double avg_state_size; protected volatile View view; protected final List
members=new ArrayList<>(); @@ -46,12 +51,9 @@ public class STATE_TRANSFER extends Protocol implements ProcessingQueue.Handler< protected volatile boolean waiting_for_state_response=false; protected boolean flushProtocolInStack=false; - - @ManagedAttribute public long getNumberOfStateRequests() {return num_state_reqs.sum();} - @ManagedAttribute(type=AttributeType.BYTES) - public long getNumberOfStateBytesSent() {return num_bytes_sent.sum();} - @ManagedAttribute(type=AttributeType.BYTES) - public double getAverageStateSize() {return avg_state_size;} + public long getNumberOfStateRequests() {return num_state_reqs.sum();} + public long getNumberOfStateBytesSent() {return num_bytes_sent.sum();} + public double getAverageStateSize() {return avg_state_size;} public List requiredDownServices() { return Arrays.asList(Event.GET_DIGEST, Event.OVERWRITE_DIGEST); @@ -64,8 +66,6 @@ public void resetStats() { avg_state_size=0; } - public void init() throws Exception {} - public void start() throws Exception { Map map=new HashMap<>(); map.put("state_transfer", Boolean.TRUE); diff --git a/src/org/jgroups/util/ExtractMetrics.java b/src/org/jgroups/util/ExtractMetrics.java deleted file mode 100644 index 2cebf320639..00000000000 --- a/src/org/jgroups/util/ExtractMetrics.java +++ /dev/null @@ -1,94 +0,0 @@ -package org.jgroups.util; - -import org.jgroups.JChannel; -import org.jgroups.jmx.ResourceDMBean; -import org.jgroups.stack.Protocol; - -import javax.management.MBeanAttributeInfo; -import java.util.HashMap; -import java.util.Map; -import java.util.function.Supplier; - -/** - * Extracts all attributes and methods annotated with {@link org.jgroups.annotations.ManagedAttribute} and returns them - * as a map of names associated with [getter-method/description tuples]. E.g. for an attribute called foo, a method - * foo() or getFoo() is searched for. - * @author Bela Ban - * @since 5.4, 5.3.6 - */ -public class ExtractMetrics { - protected JChannel ch; - - public static class Entry { - protected final String description; - protected final Supplier method; - - protected Entry(String description, Supplier method) { - this.description=description; - this.method=method; - } - - public String getDescription() { - return description; - } - - public Supplier getMethod() { - return method; - } - - @Override - public String toString() { - return String.format(" %s [%s]", method.get(), description); - } - } - - public static Map> extract(JChannel ch) { - Map> map=new HashMap<>(); - for(Protocol p: ch.stack().getProtocols()) { - map.put(p.getName(), extract(p)); - } - return map; - } - - - public static Map extract(Protocol p) { - Map map=new HashMap<>(); - - ResourceDMBean dm=new ResourceDMBean(p); - dm.forAllAttributes((k,v) -> { - MBeanAttributeInfo info=v.info(); - String descr=info != null? info.getDescription() : "n/a"; - Supplier getter=() -> { - try { - return v.getter() != null? v.getter().invoke(null) : null; - } - catch(Exception e) { - System.err.printf("failed getting value for %s\n", k); - return null; - } - }; - map.put(k, new Entry(descr, getter)); - }); - - return map; - } - - protected void start() throws Exception { - ch=new JChannel().connect("bla").name("X"); - Map> map=extract(ch); - for(Map.Entry> e: map.entrySet()) { - System.out.printf("\n%s:\n---------------\n", e.getKey()); - for(Map.Entry e2: e.getValue().entrySet()) { - System.out.printf(" %s: %s\n", e2.getKey(), e2.getValue()); - } - } - Util.close(ch); - } - - public static void main(String[] args) throws Throwable { - new ExtractMetrics().start(); - - } -} - - diff --git a/src/org/jgroups/util/Metrics.java b/src/org/jgroups/util/Metrics.java new file mode 100644 index 00000000000..54276e1e16e --- /dev/null +++ b/src/org/jgroups/util/Metrics.java @@ -0,0 +1,205 @@ +package org.jgroups.util; + +import org.jgroups.JChannel; +import org.jgroups.annotations.ManagedAttribute; +import org.jgroups.annotations.Property; +import org.jgroups.jmx.ResourceDMBean; +import org.jgroups.stack.Protocol; + +import javax.management.MBeanAttributeInfo; +import java.lang.reflect.AccessibleObject; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.function.Predicate; +import java.util.function.Supplier; + +import static org.jgroups.conf.AttributeType.BYTES; +import static org.jgroups.conf.AttributeType.SCALAR; + +/** + * Extracts all attributes and methods annotated with {@link org.jgroups.annotations.ManagedAttribute} and returns them + * as a map of names associated with [getter-method/description tuples]. E.g. for an attribute called foo, a method + * foo() or getFoo() is searched for. + * @author Bela Ban + * @since 5.4, 5.3.6 + */ +public class Metrics { + protected JChannel ch; + public static final Predicate IS_NUMBER=obj -> { + if(obj instanceof Field) + return isNumberAndScalar(obj, ((Field)obj).getType()); + if(obj instanceof Method) { + Method m=(Method)obj; + return isNumberAndScalar(obj, m.getReturnType()); + } + return false; + }; + + + public static class Entry { + protected final AccessibleObject type; // Field or Method + protected final String description; + protected final Supplier supplier; + + protected Entry(AccessibleObject type, String description, Supplier method) { + this.type=type; + this.description=description; + this.supplier=method; + } + + public AccessibleObject type() {return type;} + public String description() {return description;} + public Supplier supplier() {return supplier;} + + @Override + public String toString() { + return String.format(" %s [%s]", supplier.get(), description); + } + } + + public static Map>> extract(JChannel ch) { + return extract(ch, null); + } + + public static Map>> extract(JChannel ch, Predicate filter) { + Map>> map=new LinkedHashMap<>(); + for(Protocol p: ch.stack().getProtocols()) + map.put(p.getName(), extract(p, filter)); + return map; + } + + public static Map> extract(Protocol p) { + return extract(p, null); + } + + public static Map> extract(Protocol p, Predicate filter) { + Map> map=new TreeMap<>(); + ResourceDMBean dm=new ResourceDMBean(p, filter); + dm.forAllAttributes((k,v) -> { + MBeanAttributeInfo info=v.info(); + String descr=info != null? info.getDescription() : "n/a"; + Supplier getter=() -> { + try { + return v.getter() != null? v.getter().invoke(null) : null; + } + catch(Exception e) { + System.err.printf("failed getting value for %s\n", k); + return null; + } + }; + map.put(k, new Entry<>(v.type(), descr, getter)); + }); + + return map; + } + + protected void start(boolean numeric) throws Exception { + ch=new JChannel().connect("bla").name("X"); + Map>> m=extract(ch, numeric? IS_NUMBER : null); + if(numeric) { + Map>> map=convert(m); + print(map); + } + else + print(m); + Util.close(ch); + } + + protected static void print(Map>> map) { + for(Map.Entry>> e: map.entrySet()) { + System.out.printf("\n%s:\n---------------\n", e.getKey()); + for(Map.Entry> e2: e.getValue().entrySet()) { + Entry entry=e2.getValue(); + Supplier s=entry.supplier(); + if(s != null) + System.out.printf(" %s: %s\n", e2.getKey(), s.get()); + } + } + } + + public static Map>> convert(Map>> m) { + Map>> retval=new LinkedHashMap<>(); + for(Map.Entry>> entry: m.entrySet()) { + Map> m1=entry.getValue(); + Map> m2=convertProtocol(m1); + retval.put(entry.getKey(), m2); + } + return retval; + } + + public static Map> convertProtocol(Map> m) { + Map> retval=new TreeMap<>(); + for(Map.Entry> e: m.entrySet()) { + Entry en=e.getValue(); + AccessibleObject type=en.type(); + Class cl=(type instanceof Field)? ((Field)type).getType() : ((Method)type).getReturnType(); + if(Number.class.isAssignableFrom(cl)) { + Entry tmp=new Entry<>(en.type(), en.description(), () -> (Number)en.supplier().get()); + retval.put(e.getKey(), tmp); + continue; + } + if(int.class.isAssignableFrom(cl)) { + Entry tmp=new Entry<>(en.type(), en.description(), () -> (Integer)en.supplier().get()); + retval.put(e.getKey(), tmp); + continue; + } + if(long.class.isAssignableFrom(cl)) { + Entry tmp=new Entry<>(en.type(), en.description(), () -> (Long)en.supplier().get()); + retval.put(e.getKey(), tmp); + continue; + } + if(double.class.isAssignableFrom(cl)) { + Entry tmp=new Entry<>(en.type(), en.description(), () -> (Double)en.supplier().get()); + retval.put(e.getKey(), tmp); + continue; + } + if(float.class.isAssignableFrom(cl)) { + Entry tmp=new Entry<>(en.type(), en.description(), () -> (Float)en.supplier().get()); + retval.put(e.getKey(), tmp); + continue; + } + if(AverageMinMax.class.isAssignableFrom(cl)) { + Entry tmp=new Entry<>(en.type(), en.description(), () -> ((AverageMinMax)en.supplier().get()).min()); + retval.put(e.getKey() + ".min", tmp); + tmp=new Entry<>(en.type(), en.description(), () -> ((AverageMinMax)en.supplier().get()).average()); + retval.put(e.getKey(), tmp); + tmp=new Entry<>(en.type(), en.description(), () -> ((AverageMinMax)en.supplier().get()).max()); + retval.put(e.getKey() + ".max", tmp); + continue; + } + if(Average.class.isAssignableFrom(cl)) { + Entry tmp=new Entry<>(en.type(), en.description(), () -> ((Average)en.supplier()).average()); + retval.put(e.getKey(), tmp); + } + } + return retval; + } + + protected static boolean isNumberAndScalar(AccessibleObject obj, Class cl) { + if(cl.equals(float.class) || cl.equals(Float.class) || cl.equals(double.class) || cl.equals(Double.class) + || Average.class.isAssignableFrom(cl)) + return true; + boolean is_number=cl.equals(int.class) || cl.equals(Integer.class) || cl.equals(long.class) || cl.equals(Long.class) + || Number.class.isAssignableFrom(cl); + return is_number && isScalar(obj); + } + + protected static boolean isScalar(AccessibleObject obj) { + ManagedAttribute annotation=obj.getAnnotation(ManagedAttribute.class); + if(annotation != null && (annotation.type() == SCALAR || annotation.type() == BYTES)) + return true; + Property prop=obj.getAnnotation(Property.class); + return prop != null && (prop.type() == SCALAR || prop.type() == BYTES); + } + + public static void main(String[] args) throws Throwable { + boolean numeric=args.length > 0 && Boolean.parseBoolean(args[0]); + new Metrics().start(numeric); + + } +} + + diff --git a/src/org/jgroups/util/ThreadPool.java b/src/org/jgroups/util/ThreadPool.java index 5d0668a7c13..9752ea0f78e 100644 --- a/src/org/jgroups/util/ThreadPool.java +++ b/src/org/jgroups/util/ThreadPool.java @@ -165,13 +165,13 @@ public ThreadPool setThreadDumpsThreshold(int t) { public boolean useVirtualThreads() {return use_virtual_threads;} public ThreadPool useVirtualThreads(boolean b) {use_virtual_threads=b; return this;} - @ManagedAttribute(description="Number of thread dumps") + @ManagedAttribute(description="Number of thread dumps",type=SCALAR) public int getNumberOfThreadDumps() {return thread_dumps.get();} @ManagedOperation(description="Resets the thread_dumps counter") public void resetThreadDumps() {thread_dumps.set(0);} - @ManagedAttribute(description="Current number of threads in the thread pool") + @ManagedAttribute(description="Current number of threads in the thread pool",type=SCALAR) public int getThreadPoolSize() { if(thread_pool instanceof ThreadPoolExecutor) return ((ThreadPoolExecutor)thread_pool).getPoolSize(); @@ -179,14 +179,14 @@ public int getThreadPoolSize() { } - @ManagedAttribute(description="Current number of active threads in the thread pool") + @ManagedAttribute(description="Current number of active threads in the thread pool",type=SCALAR) public int getThreadPoolSizeActive() { if(thread_pool instanceof ThreadPoolExecutor) return ((ThreadPoolExecutor)thread_pool).getActiveCount(); return 0; } - @ManagedAttribute(description="Largest number of threads in the thread pool") + @ManagedAttribute(description="Largest number of threads in the thread pool",type=SCALAR) public int getLargestSize() { if(thread_pool instanceof ThreadPoolExecutor) return ((ThreadPoolExecutor)thread_pool).getLargestPoolSize(); diff --git a/tests/junit-functional/org/jgroups/tests/MsgStatsTest.java b/tests/junit-functional/org/jgroups/tests/MsgStatsTest.java index 0553a4d974d..6b9ff12774b 100644 --- a/tests/junit-functional/org/jgroups/tests/MsgStatsTest.java +++ b/tests/junit-functional/org/jgroups/tests/MsgStatsTest.java @@ -53,7 +53,7 @@ public void testStatsMcasts() throws Exception { assert stats_b.getNumMcastsReceived() >= NUM_MSGS; assert stats_b.getNumMcastBytesReceived() >= TOTAL_BYTES; assert stats_b.getNumBatchesReceived() > 0; - AverageMinMax avg=stats_b.avgBatchSize(); + AverageMinMax avg=stats_b.getAvgBatchSize(); assert avg.getAverage() > 0; assert stats_a.getNumSingleMsgsSent() + stats_a.getNumBatchesSent() > 0; } @@ -73,7 +73,7 @@ public void testStatsUcasts() throws Exception { assert stats_b.getNumUcastsReceived() >= NUM_MSGS; assert stats_b.getNumUcastBytesReceived() >= TOTAL_BYTES; assert stats_b.getNumBatchesReceived() > 0; - AverageMinMax avg=stats_b.avgBatchSize(); + AverageMinMax avg=stats_b.getAvgBatchSize(); assert avg.getAverage() > 0; assert stats_a.getNumSingleMsgsSent() + stats_a.getNumBatchesSent() > 0; }