diff --git a/src/org/jgroups/protocols/AlternatingBundler.java b/src/org/jgroups/protocols/AlternatingBundler.java index b6e4f3d07c..098405a444 100644 --- a/src/org/jgroups/protocols/AlternatingBundler.java +++ b/src/org/jgroups/protocols/AlternatingBundler.java @@ -74,8 +74,6 @@ protected void _sendBundledMessages() { else { avg_batch_size.add(target_list.size()); sendMessageList(target_dest, target_list.get(0).getSrc(), target_list); - if(transport.statsEnabled()) - transport.getMessageStats().incrNumBatchesSent(1); } } finally { diff --git a/src/org/jgroups/protocols/BaseBundler.java b/src/org/jgroups/protocols/BaseBundler.java index a0870254a0..fed61a089e 100644 --- a/src/org/jgroups/protocols/BaseBundler.java +++ b/src/org/jgroups/protocols/BaseBundler.java @@ -109,8 +109,6 @@ public int getQueueSize() { else { Address dst=entry.getKey(); sendMessageList(dst, list.get(0).getSrc(), list); - if(transport.statsEnabled()) - transport.getMessageStats().incrNumBatchesSent(1); } list.clear(); } @@ -125,8 +123,7 @@ protected void sendSingleMessage(final Message msg) { try { Util.writeMessage(msg, output, dest == null); transport.doSend(output.buffer(), 0, output.position(), dest); - if(transport.statsEnabled()) - transport.getMessageStats().incrNumSingleMsgsSent(1); + transport.getMessageStats().incrNumSingleMsgsSent(); } catch(Throwable e) { log.trace(Util.getMessage("SendFailure"), @@ -140,6 +137,7 @@ protected void sendMessageList(final Address dest, final Address src, final List try { Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, output, dest == null, transport.getId()); transport.doSend(output.buffer(), 0, output.position(), dest); + transport.getMessageStats().incrNumBatchesSent(); } catch(Throwable e) { log.trace(Util.getMessage("FailureSendingMsgBundle"), transport.getAddress(), e); diff --git a/src/org/jgroups/protocols/MsgStats.java b/src/org/jgroups/protocols/MsgStats.java index 148d5070b5..353cfa9c72 100644 --- a/src/org/jgroups/protocols/MsgStats.java +++ b/src/org/jgroups/protocols/MsgStats.java @@ -1,128 +1,179 @@ package org.jgroups.protocols; +import org.jgroups.Address; +import org.jgroups.Message; import org.jgroups.annotations.ManagedAttribute; -import org.jgroups.conf.AttributeType; +import org.jgroups.annotations.Property; +import org.jgroups.util.AverageMinMax; +import org.jgroups.util.MessageBatch; import org.jgroups.util.Util; import java.lang.reflect.Field; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; import java.util.stream.Stream; +import static org.jgroups.conf.AttributeType.BYTES; +import static org.jgroups.conf.AttributeType.SCALAR; + /** - * Class which has all the stats about received/sent messages etc (in TP) + * Class which has all the stats about received/sent messages etc. (in TP) * @author Bela Ban * @since 4.0 */ public class MsgStats { - protected final AtomicLong num_msgs_sent=new AtomicLong(); - protected final AtomicLong num_msgs_received=new AtomicLong(); + @Property(description="Whether metrics should be logged") + protected boolean enabled=true; + + @ManagedAttribute(description="Number of unicast messages sent",type=SCALAR) + protected final LongAdder num_ucasts_sent=new LongAdder(); + @ManagedAttribute(description="Number of unicast messages received",type=SCALAR) + protected final LongAdder num_ucasts_received=new LongAdder(); + + @ManagedAttribute(description="Number of multicast messages sent",type=SCALAR) + protected final LongAdder num_mcasts_sent=new LongAdder(); + @ManagedAttribute(description="Total number of multicast messages received",type=SCALAR) + protected final LongAdder num_mcasts_received=new LongAdder(); + + @ManagedAttribute(description="Number of single messages sent (by the bundler)",type=SCALAR) + protected final LongAdder num_single_msgs_sent=new LongAdder(); + @ManagedAttribute(description="Number of single messages received (by the transport)",type=SCALAR) + protected final LongAdder num_single_msgs_received=new LongAdder(); + + @ManagedAttribute(description="Number of batches sent (by the bundler)",type=SCALAR) + protected final LongAdder num_batches_sent=new LongAdder(); + @ManagedAttribute(description="Number of batches received (by the transport)",type=SCALAR) + protected final LongAdder num_batches_received=new LongAdder(); - protected final AtomicLong num_ucasts_sent=new AtomicLong(); - protected final AtomicLong num_mcasts_sent=new AtomicLong(); + /** The average number of messages in a received {@link MessageBatch} */ + protected final AverageMinMax avg_batch_size=new AverageMinMax(); - protected final AtomicLong num_ucasts_received=new AtomicLong(); - protected final AtomicLong num_mcasts_received=new AtomicLong(); + @ManagedAttribute(description="Number of messages rejected by the thread pool (because it was full)",type=SCALAR) + protected final LongAdder num_rejected_msgs=new LongAdder(); - protected final AtomicLong num_bytes_sent=new AtomicLong(); - protected final AtomicLong num_bytes_received=new AtomicLong(); + @ManagedAttribute(description="Number of multicast bytes sent",type=BYTES) + protected final LongAdder num_mcast_bytes_sent=new LongAdder(); + @ManagedAttribute(description="Number of multicast bytes received",type=BYTES) + protected final LongAdder num_mcast_bytes_received=new LongAdder(); - protected final AtomicLong num_ucast_bytes_sent=new AtomicLong(); - protected final AtomicLong num_mcast_bytes_sent=new AtomicLong(); + @ManagedAttribute(description="Number of unicast bytes sent",type=BYTES) + protected final LongAdder num_ucast_bytes_sent=new LongAdder(); + @ManagedAttribute(description="Number of unicast bytes received",type=BYTES) + protected final LongAdder num_ucast_bytes_received=new LongAdder(); - protected final AtomicLong num_ucast_bytes_received=new AtomicLong(); - protected final AtomicLong num_mcast_bytes_received=new AtomicLong(); + public boolean enabled() {return enabled;} + public MsgStats enable(boolean b) {enabled=b; return this;} - protected final AtomicLong num_oob_msgs_received=new AtomicLong(); + @ManagedAttribute(description="Number of messages sent (mcasts and ucasts sent)",type=SCALAR) + public long getNumMsgsSent() {return num_mcasts_sent.sum() + num_ucasts_sent.sum();} - protected final AtomicLong num_single_msgs_sent=new AtomicLong(); - protected final AtomicLong num_batches_sent=new AtomicLong(); - protected final AtomicLong num_batches_received=new AtomicLong(); + @ManagedAttribute(description="Number of messages received (mcasts and ucasts received)",type=SCALAR) + public long getNumMsgsReceived() {return num_mcasts_received.sum() + num_ucasts_received.sum();} - protected final AtomicInteger num_rejected_msgs=new AtomicInteger(0); + @ManagedAttribute(description="Returns the average batch size of received batches") + public String getAvgBatchSize() {return avg_batch_size.toString();} + public AverageMinMax avgBatchSize() {return avg_batch_size;} - @ManagedAttribute(description="Number of messages sent",type=AttributeType.SCALAR) - public long getNumMsgsSent() {return num_msgs_sent.get();} - public MsgStats incrNumMsgsSent(int d) {num_msgs_sent.addAndGet(d); return this;} + @ManagedAttribute(description="Total number of bytes sent (unicast + multicast bytes)",type=BYTES) + public long getNumBytesSent() {return num_mcast_bytes_sent.sum() + num_ucast_bytes_sent.sum();} - @ManagedAttribute(description="Number of unicast messages sent",type=AttributeType.SCALAR) - public long getNumUcastMsgsSent() {return num_ucasts_sent.get();} - public MsgStats incrNumUcastMsgsSent(int d) {num_ucasts_sent.addAndGet(d); return this;} + @ManagedAttribute(description="Total number of bytes received (unicast + multicast)",type=BYTES) + public long getNumBytesReceived() {return num_mcast_bytes_received.sum() + num_ucast_bytes_received.sum();} - @ManagedAttribute(description="Number of multicast messages sent",type=AttributeType.SCALAR) - public long getNumMcastMsgsSent() {return num_mcasts_sent.get();} - public MsgStats incrNumMcastMsgsSent(int d) {num_mcasts_sent.addAndGet(d); return this;} + public long getNumUcastsSent() {return num_ucasts_sent.sum();} - @ManagedAttribute(description="Number of unicast messages received",type=AttributeType.SCALAR) - public long getNumUcastMsgsReceived() {return num_ucasts_received.get();} - public MsgStats incrNumUcastMsgsReceived(int d) {num_ucasts_received.addAndGet(d); return this;} + public long getNumMcastsSent() {return num_mcasts_sent.sum();} - @ManagedAttribute(description="Number of multicast messages received",type=AttributeType.SCALAR) - public long getNumMcastMsgsReceived() {return num_mcasts_received.get();} - public MsgStats incrNumMcastMsgsReceived(int d) {num_mcasts_received.addAndGet(d); return this;} + public long getNumUcastsReceived() {return num_ucasts_received.sum();} - @ManagedAttribute(description="Number of regular messages received",type=AttributeType.SCALAR) - public long getNumMsgsReceived() {return num_msgs_received.get();} - public MsgStats incrNumMsgsReceived(int d) {num_msgs_received.addAndGet(d); return this;} + public long getNumMcastsReceived() {return num_mcasts_received.sum();} - @ManagedAttribute(description="Number of OOB messages received. This value is included in num_msgs_received." - ,type=AttributeType.SCALAR) - public long getNumOOBMsgsReceived() {return num_oob_msgs_received.get();} - public MsgStats incrNumOOBMsgsReceived(int d) {num_oob_msgs_received.addAndGet(d); return this;} + public long getNumSingleMsgsSent() {return num_single_msgs_sent.sum();} + public MsgStats incrNumSingleMsgsSent() {num_single_msgs_sent.increment(); return this;} - @ManagedAttribute(description="Number of single messages sent",type=AttributeType.SCALAR) - public long getNumSingleMsgsSent() {return num_single_msgs_sent.get();} - public MsgStats incrNumSingleMsgsSent(int d) {num_single_msgs_sent.addAndGet(d); return this;} + public long getNumBatchesSent() {return num_batches_sent.sum();} + public MsgStats incrNumBatchesSent() {num_batches_sent.increment(); return this;} + public MsgStats incrNumBatchesSent(int n) {num_batches_sent.add(n); return this;} - @ManagedAttribute(description="Number of message batches sent",type=AttributeType.SCALAR) - public long getNumBatchesSent() {return num_batches_sent.get();} - public MsgStats incrNumBatchesSent(int d) {num_batches_sent.addAndGet(d); return this;} + public long getNumBatchesReceived() {return num_batches_received.sum();} - @ManagedAttribute(description="Number of message batches received",type=AttributeType.SCALAR) - public long getNumBatchesReceived() {return num_batches_received.get();} - public MsgStats incrNumBatchesReceived(int d) {num_batches_received.addAndGet(d); return this;} + public long getNumUcastBytesSent() {return num_ucast_bytes_sent.sum();} - @ManagedAttribute(description="Number of bytes sent",type=AttributeType.BYTES) - public long getNumBytesSent() {return num_bytes_sent.get();} - public MsgStats incrNumBytesSent(int d) {num_bytes_sent.addAndGet(d); return this;} + public long getNumMcastBytesSent() {return num_mcast_bytes_sent.sum();} - @ManagedAttribute(description="Number of unicast bytes sent",type=AttributeType.BYTES) - public long getNumUcastBytesSent() {return num_ucast_bytes_sent.get();} - public MsgStats incrNumUcastBytesSent(int d) {num_ucast_bytes_sent.addAndGet(d); return this;} + public long getNumUcastBytesReceived() {return num_ucast_bytes_received.sum();} - @ManagedAttribute(description="Number of multicast bytes sent",type=AttributeType.BYTES) - public long getNumMcastBytesSent() {return num_mcast_bytes_sent.get();} - public MsgStats incrNumMcastBytesSent(int d) {num_mcast_bytes_sent.addAndGet(d); return this;} + public long getNumMcastBytesReceived() {return num_mcast_bytes_received.sum();} - @ManagedAttribute(description="Number of bytes received",type=AttributeType.BYTES) - public long getNumBytesReceived() {return num_bytes_received.get();} - public MsgStats incrNumBytesReceived(int d) {num_bytes_received.addAndGet(d); return this;} + public long getNumRejectedMsgs() {return num_rejected_msgs.sum();} + public MsgStats incrNumRejectedMsgs() {num_rejected_msgs.increment(); return this;} - @ManagedAttribute(description="Number of unicast bytes received",type=AttributeType.BYTES) - public long getNumUcastBytesReceived() {return num_ucast_bytes_received.get();} - public MsgStats incrNumUcastBytesReceived(int d) {num_ucast_bytes_received.addAndGet(d); return this;} - @ManagedAttribute(description="Number of multicast bytes received",type=AttributeType.BYTES) - public long getNumMcastBytesReceived() {return num_mcast_bytes_received.get();} - public MsgStats incrNumMcastBytesReceived(int d) {num_mcast_bytes_received.addAndGet(d); return this;} + public MsgStats sent(Address dest, int length) { + if(!enabled) + return this; + if(dest == null) // multicast + return add(num_mcasts_sent, 1, num_mcast_bytes_sent, length); + return add(num_ucasts_sent, 1, num_ucast_bytes_sent, length); + } - @ManagedAttribute(description="Number of dropped messages that were rejected by the thread pool" - ,type=AttributeType.SCALAR) - public int getNumRejectedMsgs() {return num_rejected_msgs.get();} - public MsgStats incrNumRejectedMsgs(int d) {num_rejected_msgs.addAndGet(d); return this;} + public MsgStats sent(Message msg) { + return (msg == null || !enabled)? this : sent(msg.dest(), msg.getLength()); + } + + public MsgStats received(Address dest, int length) { + if(!enabled) + return this; + if(dest == null) + return add(num_mcasts_received, 1, num_mcast_bytes_received, length); + return add(num_ucasts_received, 1, num_ucast_bytes_received, length); + } + + public MsgStats received(Message msg) { + if(msg == null || !enabled) + return this; + num_single_msgs_received.increment(); + return received(msg.dest(), msg.getLength()); + } + + public MsgStats received(MessageBatch batch) { + if(batch == null || !enabled) + return this; + num_batches_received.increment(); + int num_msgs=batch.size(); + int length=batch.length(); + avg_batch_size.add(num_msgs); + if(batch.dest() == null) + return add(num_mcasts_received, num_msgs, num_mcast_bytes_received, length); + return add(num_ucasts_received, num_msgs, num_ucast_bytes_received, length); + } public MsgStats reset() { - Stream.of(num_msgs_sent, num_msgs_received, num_single_msgs_sent, num_oob_msgs_received, - num_batches_sent, num_batches_received, num_bytes_sent,num_bytes_received) - .forEach(al -> al.set(0)); - Stream.of(num_rejected_msgs).forEach(ai -> ai.set(0)); + Stream.of(num_ucasts_sent, num_ucasts_received, + num_mcasts_sent, num_mcasts_received, + num_single_msgs_sent, num_single_msgs_received, + num_batches_sent, num_batches_received, + num_rejected_msgs, + num_mcast_bytes_sent, num_mcast_bytes_received, + num_ucast_bytes_sent, num_ucast_bytes_received) + .forEach(LongAdder::reset); + avg_batch_size.clear(); return this; } + @Override public String toString() { + return toString(false); + } + + public String toString(boolean details) { StringBuilder sb=new StringBuilder(); + if(!details) { + sb.append(String.format("%,d sent (%s) %,d received (%s)", getNumMsgsSent(), Util.printBytes(getNumBytesSent()), + getNumMsgsReceived(), Util.printBytes(getNumBytesReceived()))); + return sb.toString(); + } Field[] fields=MsgStats.class.getDeclaredFields(); for(Field field: fields) { try { @@ -135,4 +186,11 @@ public String toString() { } return sb.toString(); } + + protected MsgStats add(LongAdder msgs, int num_msgs, LongAdder bytes, int length) { + msgs.add(num_msgs); + if(length > 0) + bytes.add(length); + return this; + } } diff --git a/src/org/jgroups/protocols/NoBundler.java b/src/org/jgroups/protocols/NoBundler.java index 5b92ae062b..fa429cd95c 100644 --- a/src/org/jgroups/protocols/NoBundler.java +++ b/src/org/jgroups/protocols/NoBundler.java @@ -44,8 +44,7 @@ protected void sendSingleMessage(final Message msg, final ByteArrayDataOutputStr output.position(0); Util.writeMessage(msg, output, dest == null); transport.doSend(output.buffer(), 0, output.position(), dest); - if(transport.statsEnabled()) - transport.getMessageStats().incrNumSingleMsgsSent(1); + transport.getMessageStats().incrNumSingleMsgsSent(); } } \ No newline at end of file diff --git a/src/org/jgroups/protocols/PerDestinationBundler.java b/src/org/jgroups/protocols/PerDestinationBundler.java index 3bc7bbecec..6439b67353 100644 --- a/src/org/jgroups/protocols/PerDestinationBundler.java +++ b/src/org/jgroups/protocols/PerDestinationBundler.java @@ -232,8 +232,7 @@ protected void sendSingleMessage(final Address dest, final Message msg) { output.position(0); Util.writeMessage(msg, output, dest == null); transport.doSend(output.buffer(), 0, output.position(), dest); - if(transport.statsEnabled()) - transport.getMessageStats().incrNumSingleMsgsSent(1); + transport.getMessageStats().incrNumSingleMsgsSent(); num_single_msgs_sent.increment(); } catch(Throwable e) { @@ -252,8 +251,7 @@ protected void sendMessageList(final Address dest, final Address src, final Fast Util.writeMessageList(dest, src, transport.cluster_name.chars(), list, output, dest == null, transport.getId()); transport.doSend(output.buffer(), 0, output.position(), dest); - if(transport.statsEnabled()) - transport.getMessageStats().incrNumBatchesSent(1); + transport.getMessageStats().incrNumBatchesSent(); num_batches_sent.increment(); } catch(Throwable e) { diff --git a/src/org/jgroups/protocols/RingBufferBundler.java b/src/org/jgroups/protocols/RingBufferBundler.java index e9121f831f..972155c2df 100644 --- a/src/org/jgroups/protocols/RingBufferBundler.java +++ b/src/org/jgroups/protocols/RingBufferBundler.java @@ -121,8 +121,7 @@ public void sendBundledMessages(final Message[] buf, final int read_index, final output.position(current_pos); } transport.doSend(output.buffer(), 0, output.position(), dest); - if(transport.statsEnabled()) - transport.getMessageStats().incrNumBatchesSent(num_msgs); + transport.getMessageStats().incrNumBatchesSent(num_msgs); } catch(Exception ex) { log.trace("failed to send message(s) to %s: %s", dest == null? "group" : dest, ex.getMessage()); diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless.java b/src/org/jgroups/protocols/RingBufferBundlerLockless.java index d6c3a278c1..df06a695fc 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless.java @@ -182,8 +182,7 @@ protected int sendBundledMessages(final Message[] buf, final int read_index, int output.position(current_pos); } transport.doSend(output.buffer(), 0, output.position(), dest); - if(transport.statsEnabled()) - transport.getMessageStats().incrNumBatchesSent(num_msgs); + transport.getMessageStats().incrNumBatchesSent(num_msgs); } catch(Exception ex) { log.trace("failed to send message(s)", ex); diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java index 185b6f2de8..78f55ab107 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java @@ -183,8 +183,7 @@ protected int sendBundledMessages(final Message[] buf, final int read_index, fin output.position(current_pos); } transport.doSend(output.buffer(), 0, output.position(), dest); - if(transport.statsEnabled()) - transport.getMessageStats().incrNumBatchesSent(num_msgs); + transport.getMessageStats().incrNumBatchesSent(num_msgs); } catch(Exception ex) { log.trace("failed to send message(s)", ex); diff --git a/src/org/jgroups/protocols/STATS.java b/src/org/jgroups/protocols/STATS.java index f281f81172..ac3b29f25b 100644 --- a/src/org/jgroups/protocols/STATS.java +++ b/src/org/jgroups/protocols/STATS.java @@ -17,13 +17,11 @@ */ @MBean(description="Protocol which exposes various statistics such as sent messages, number of bytes received etc") public class STATS extends Protocol { - protected static final short UP=1; - protected static final short DOWN=2; protected static final Address NULL_DEST=Global.NULL_ADDRESS; /** Global stats */ @Component - protected final MsgStats mstats=new MsgStats(); + protected final MsgStats mstats=new MsgStats(); /** Maintains stats per target destination */ protected final ConcurrentMap sent=new ConcurrentHashMap<>(); @@ -39,7 +37,16 @@ public void resetStats() { received.clear(); } + public Object down(Event evt) { + if(evt.getType() == Event.VIEW_CHANGE) + handleViewChange(evt.getArg()); + return down_prot.down(evt); + } + public Object down(Message msg) { + sent(msg); + return down_prot.down(msg); + } public Object up(Event evt) { if(evt.getType() == Event.VIEW_CHANGE) @@ -48,26 +55,15 @@ public Object up(Event evt) { } public Object up(Message msg) { - updateStats(msg.dest(), msg.src(), 1, msg.getLength(), UP); + received(msg); return up_prot.up(msg); } public void up(MessageBatch batch) { - updateStats(batch.dest(), batch.sender(), batch.size(), batch.length(), UP); + received(batch); up_prot.up(batch); } - public Object down(Event evt) { - if(evt.getType() == Event.VIEW_CHANGE) - handleViewChange(evt.getArg()); - return down_prot.down(evt); - } - - public Object down(Message msg) { - updateStats(msg.dest(), msg.src(), 1, msg.getLength(), DOWN); - return down_prot.down(msg); - } - @ManagedOperation public String printStats() { Object key, val; @@ -91,7 +87,7 @@ public String printStats() { return sb.toString(); } - private void handleViewChange(View view) { + protected void handleViewChange(View view) { List
members=view.getMembers(); Set
tmp=new LinkedHashSet<>(members); tmp.add(null); // for null destination (= mcast) @@ -99,62 +95,32 @@ private void handleViewChange(View view) { received.keySet().retainAll(tmp); } - protected void updateStats(Address dest, Address src, int num_msgs, int num_bytes, short direction) { - boolean mcast=dest == null; - - if(direction == UP) { // received - mstats.incrNumMsgsReceived(num_msgs); - mstats.incrNumBytesReceived(num_bytes); - if(mcast) { - mstats.incrNumMcastMsgsReceived(num_msgs); - mstats.incrNumMcastBytesReceived(num_bytes); - } - else { - mstats.incrNumUcastMsgsReceived(num_msgs); - mstats.incrNumUcastBytesReceived(num_bytes); - } - } - else { // sent - mstats.incrNumMsgsSent(num_msgs); - mstats.incrNumBytesSent(num_bytes); - if(mcast) { - mstats.incrNumMcastMsgsSent(num_msgs); - mstats.incrNumMcastBytesSent(num_bytes); - } - else { - mstats.incrNumUcastMsgsSent(num_msgs); - mstats.incrNumUcastBytesSent(num_bytes); - } - } - Address key=direction == UP? src : dest; + protected void sent(Message msg) { + mstats.sent(msg); + + Address key=msg.dest(); if(key == null) key=NULL_DEST; - Map map=direction == UP? received : sent; - MsgStats entry=map.computeIfAbsent(key, k -> new MsgStats()); - if(direction == UP) { - entry.incrNumMsgsSent(num_msgs); - entry.incrNumBytesSent(num_bytes); - if(mcast) { - entry.incrNumMcastMsgsSent(num_msgs); - entry.incrNumMcastBytesSent(num_bytes); - } - else { - entry.incrNumUcastMsgsSent(num_msgs); - entry.incrNumUcastBytesSent(num_bytes); - } - } - else { - entry.incrNumMsgsReceived(num_msgs); - entry.incrNumBytesReceived(num_bytes); - if(mcast) { - entry.incrNumMcastMsgsReceived(num_msgs); - entry.incrNumMcastBytesReceived(num_bytes); - } - else { - entry.incrNumUcastMsgsReceived(num_msgs); - entry.incrNumUcastBytesReceived(num_bytes); - } - } + MsgStats entry=((Map)sent).computeIfAbsent(key, k -> new MsgStats()); + entry.sent(msg); + } + + protected void received(Message msg) { + mstats.received(msg); + + Address key=msg.src(); + if(key == null) key=NULL_DEST; + MsgStats entry=((Map)received).computeIfAbsent(key, k -> new MsgStats()); + entry.received(msg); + } + + protected void received(MessageBatch batch) { + mstats.received(batch); + + Address key=batch.sender(); + if(key == null) key=NULL_DEST; + MsgStats entry=((Map)received).computeIfAbsent(key, k -> new MsgStats()); + entry.received(batch); } diff --git a/src/org/jgroups/protocols/TP.java b/src/org/jgroups/protocols/TP.java index 0fd79d308e..6f2c9159a1 100644 --- a/src/org/jgroups/protocols/TP.java +++ b/src/org/jgroups/protocols/TP.java @@ -254,14 +254,6 @@ public boolean isLogicalAddressCacheReaperRunning() { return logical_addr_cache_reaper != null && !logical_addr_cache_reaper.isDone(); } - @ManagedAttribute(description="Returns the average batch size of received batches") - public String getAvgBatchSize() { - return avg_batch_size.toString(); - } - - public AverageMinMax avgBatchSize() {return avg_batch_size;} - - @Property(name="level", description="Sets the level") public T setLevel(String level) { T retval=super.setLevel(level); @@ -296,7 +288,7 @@ public void setMessageProcessingPolicy(String policy) { /* --------------------------------------------- JMX ---------------------------------------------- */ @Component(name="msg_stats") - protected final MsgStats msg_stats=new MsgStats(); + protected final MsgStats msg_stats=new MsgStats().enable(stats); /** The name of the group to which this member is connected. With a shared transport, the channel name is @@ -421,9 +413,6 @@ public T enableBlockingTimerTasks(boolean flag) { protected Future logical_addr_cache_reaper; - /** The average number of messages in a received {@link MessageBatch} */ - protected final AverageMinMax avg_batch_size=new AverageMinMax(); - protected static final LazyRemovalCache.Printable> print_function= (logical_addr, entry) -> { StringBuilder sb=new StringBuilder(); @@ -454,7 +443,13 @@ protected TP() { public MsgStats getMessageStats() {return msg_stats;} - /** Whether or not hardware multicasting is supported */ + @Override + public void enableStats(boolean flag) { + super.enableStats(flag); + msg_stats.enable(flag); + } + + /** Whether hardware multicasting is supported */ public abstract boolean supportsMulticasting(); public boolean isMulticastCapable() {return supportsMulticasting();} @@ -481,7 +476,6 @@ public T setAddress(Address addr) { public void resetStats() { msg_stats.reset(); - avg_batch_size.clear(); msg_processing_policy.reset(); if(local_transport != null) local_transport.resetStats(); @@ -1024,7 +1018,7 @@ public Object down(Event evt) { /** A message needs to be sent to a single member or all members */ public Object down(Message msg) { if(header != null) - msg.putHeader(this.id, header); // added patch by Roland Kurmann (March 20 2003) + msg.putHeader(this.id, header); // added patch by Roland Kurmann (March 20, 2003) setSourceAddress(msg); // very important !! listToBuffer() will fail with a null src address !! @@ -1117,7 +1111,7 @@ protected void loopback(Message msg, final boolean multicast) { final Message copy=loopback_copy? msg.copy(true, true) : msg; if(is_trace) log.trace("%s: looping back message %s, headers are %s", local_addr, copy, copy.printHeaders()); - + msg_stats.received(msg); if(!loopback_separate_thread) { passMessageUp(copy, null, false, multicast, false); return; @@ -1129,8 +1123,10 @@ protected void loopback(Message msg, final boolean multicast) { protected void _send(Message msg, Address dest) { try { Bundler tmp_bundler=bundler; - if(tmp_bundler != null) + if(tmp_bundler != null) { tmp_bundler.send(msg); + msg_stats.sent(msg); + } } catch(InterruptedIOException ignored) { } @@ -1273,6 +1269,9 @@ protected void handleMessageBatch(DataInput in, boolean multicast, MessageFactor final MessageBatch[] batches=Util.readMessageBatch(in, multicast, factory); final MessageBatch regular=batches[0], oob=batches[1]; + // we need to update the stats *before* processing the batches: protocols can remove msgs from the batch + if(oob != null) msg_stats.received(oob); + if(regular != null) msg_stats.received(regular); processBatch(oob, true); processBatch(regular,false); } @@ -1293,6 +1292,7 @@ protected void handleSingleMessage(DataInput in, boolean multicast) { boolean oob=msg.isFlagSet(Message.Flag.OOB); msg_processing_policy.process(msg, oob); + msg_stats.received(msg); } catch(Throwable t) { log.error(String.format(Util.getMessage("IncomingMsgFailure"), local_addr), t); @@ -1331,10 +1331,6 @@ protected boolean versionMatch(short version, Address sender) { public void doSend(byte[] buf, int offset, int length, Address dest) throws Exception { - if(stats) { - msg_stats.incrNumMsgsSent(1); - msg_stats.incrNumBytesSent(length); - } if(dest != null) sendTo(dest, buf, offset, length); else diff --git a/src/org/jgroups/util/SubmitToThreadPool.java b/src/org/jgroups/util/SubmitToThreadPool.java index 0c7763a8a3..f2bd916152 100644 --- a/src/org/jgroups/util/SubmitToThreadPool.java +++ b/src/org/jgroups/util/SubmitToThreadPool.java @@ -3,7 +3,6 @@ import org.jgroups.Address; import org.jgroups.Message; import org.jgroups.logging.Log; -import org.jgroups.protocols.MsgStats; import org.jgroups.protocols.TP; import org.jgroups.protocols.TpHeader; import org.jgroups.stack.MessageProcessingPolicy; @@ -60,8 +59,6 @@ protected boolean removeAndDispatchNonBundledMessages(MessageBatch oob_batch) { Message msg=it.next(); if(msg.isFlagSet(Message.Flag.DONT_BUNDLE) && msg.isFlagSet(Message.Flag.OOB)) { it.remove(); - if(tp.statsEnabled()) - tp.getMessageStats().incrNumOOBMsgsReceived(1); tp.getThreadPool().execute(new SingleMessageHandlerWithClusterName(msg, cname)); removed=true; } @@ -94,15 +91,6 @@ public void run() { Address dest=msg.getDest(); boolean multicast=dest == null; try { - if(tp.statsEnabled()) { - MsgStats msg_stats=tp.getMessageStats(); - boolean oob=msg.isFlagSet(Message.Flag.OOB); - if(oob) - msg_stats.incrNumOOBMsgsReceived(1); - else - msg_stats.incrNumMsgsReceived(1); - msg_stats.incrNumBytesReceived(msg.getLength()); - } byte[] cname=getClusterName(); tp.passMessageUp(msg, cname, true, multicast, true); } @@ -142,18 +130,6 @@ public BatchHandler(final MessageBatch batch) { public void run() { if(batch == null || (!batch.multicast() && tp.unicastDestMismatch(batch.dest()))) return; - if(tp.statsEnabled()) { - int batch_size=batch.size(); - MsgStats msg_stats=tp.getMessageStats(); - boolean oob=batch.getMode() == MessageBatch.Mode.OOB; - if(oob) - msg_stats.incrNumOOBMsgsReceived(batch_size); - else - msg_stats.incrNumMsgsReceived(batch_size); - msg_stats.incrNumBatchesReceived(1); - msg_stats.incrNumBytesReceived(batch.length()); - tp.avgBatchSize().add(batch_size); - } passBatchUp(); } diff --git a/src/org/jgroups/util/ThreadPool.java b/src/org/jgroups/util/ThreadPool.java index c4f6a9bfb6..a7624e90b8 100644 --- a/src/org/jgroups/util/ThreadPool.java +++ b/src/org/jgroups/util/ThreadPool.java @@ -209,7 +209,7 @@ public boolean execute(Runnable task) { return true; } catch(RejectedExecutionException ex) { - tp.getMessageStats().incrNumRejectedMsgs(1); + tp.getMessageStats().incrNumRejectedMsgs(); // https://issues.redhat.com/browse/JGRP-2403 if(thread_dumps.incrementAndGet() == thread_dumps_threshold) { String thread_dump=Util.dumpThreads(); @@ -236,7 +236,7 @@ public boolean execute(Runnable task) { } catch(Throwable t) { tp.getLog().error("failure submitting task to thread pool", t); - tp.getMessageStats().incrNumRejectedMsgs(1); + tp.getMessageStats().incrNumRejectedMsgs(); return false; } } diff --git a/tests/junit-functional/org/jgroups/tests/MsgStatsTest.java b/tests/junit-functional/org/jgroups/tests/MsgStatsTest.java new file mode 100644 index 0000000000..0553a4d974 --- /dev/null +++ b/tests/junit-functional/org/jgroups/tests/MsgStatsTest.java @@ -0,0 +1,121 @@ +package org.jgroups.tests; + +import org.jgroups.*; +import org.jgroups.protocols.MsgStats; +import org.jgroups.util.AverageMinMax; +import org.jgroups.util.MessageBatch; +import org.jgroups.util.Util; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.concurrent.atomic.LongAdder; +import java.util.stream.Stream; + +/** + * Tests {@link org.jgroups.protocols.MsgStats} + * @author Bela Ban + * @since 5.3.5 + */ +@Test(groups= Global.FUNCTIONAL,singleThreaded=true) +public class MsgStatsTest { + protected JChannel a, b; + protected MyReceiver ra, rb; + protected MsgStats stats_a, stats_b; + protected static final int NUM_MSGS=1000, SIZE=1000, TOTAL_BYTES=NUM_MSGS*SIZE; + protected static final String CLUSTER=MsgStatsTest.class.getSimpleName(); + + @BeforeMethod + protected void setup() throws Exception { + a=new JChannel(Util.getTestStack()).name("A").connect(CLUSTER).setReceiver(ra=new MyReceiver()); + b=new JChannel(Util.getTestStack()).name("B").connect(CLUSTER).setReceiver(rb=new MyReceiver()); + Util.waitUntilAllChannelsHaveSameView(5000, 100, a,b); + stats_a=a.stack().getTransport().getMessageStats(); + stats_b=b.stack().getTransport().getMessageStats(); + } + + @AfterMethod + protected void destroy() { + Util.close(b,a); + } + + public void testStatsMcasts() throws Exception { + byte[] payload=new byte[SIZE]; + for(int i=0; i < NUM_MSGS; i++) { + Message msg=new BytesMessage(null, payload); + a.send(msg); + } + Util.waitUntil(2000, 100, () -> Stream.of(ra,rb).map(MyReceiver::mcasts).allMatch(n -> n >= NUM_MSGS)); + assertTrue(NUM_MSGS, TOTAL_BYTES, NUM_MSGS, TOTAL_BYTES, stats_a); + assertTrue(0, 0, NUM_MSGS, TOTAL_BYTES, stats_b); + assert stats_a.getNumMcastsSent() >= NUM_MSGS; + assert stats_a.getNumMcastBytesSent() >= TOTAL_BYTES; + assert stats_b.getNumMcastsReceived() >= NUM_MSGS; + assert stats_b.getNumMcastBytesReceived() >= TOTAL_BYTES; + assert stats_b.getNumBatchesReceived() > 0; + AverageMinMax avg=stats_b.avgBatchSize(); + assert avg.getAverage() > 0; + assert stats_a.getNumSingleMsgsSent() + stats_a.getNumBatchesSent() > 0; + } + + public void testStatsUcasts() throws Exception { + byte[] payload=new byte[SIZE]; + final Address dest=b.getAddress(); + for(int i=0; i < NUM_MSGS; i++) { + Message msg=new BytesMessage(dest, payload); + a.send(msg); + } + Util.waitUntil(2000, 100, () -> Stream.of(rb).map(MyReceiver::ucasts).allMatch(n -> n >= NUM_MSGS)); + assertTrue(NUM_MSGS, TOTAL_BYTES, 0, 0, stats_a); + assertTrue(0, 0, NUM_MSGS, TOTAL_BYTES, stats_b); + assert stats_a.getNumUcastsSent() >= NUM_MSGS; + assert stats_a.getNumUcastBytesSent() >= TOTAL_BYTES; + assert stats_b.getNumUcastsReceived() >= NUM_MSGS; + assert stats_b.getNumUcastBytesReceived() >= TOTAL_BYTES; + assert stats_b.getNumBatchesReceived() > 0; + AverageMinMax avg=stats_b.avgBatchSize(); + assert avg.getAverage() > 0; + assert stats_a.getNumSingleMsgsSent() + stats_a.getNumBatchesSent() > 0; + } + + + protected static void assertTrue(int sent, int sent_bytes, int received, int received_bytes, MsgStats... s) { + for(MsgStats ms: s) { + assert sent <= 0 || ms.getNumMsgsSent() >= sent + : String.format("sent msgs expected: %d, actual: %d", sent, ms.getNumBytesSent()); + assert sent_bytes <= 0 || ms.getNumBytesSent() >= sent_bytes + : String.format("sent bytes expected: %d, actual: %d", sent_bytes, ms.getNumBytesSent()); + assert received <= 0 || ms.getNumMsgsReceived() >= received + : String.format("num received expected: %d, actual: %d", received, ms.getNumMsgsReceived()); + assert received_bytes <= 0 || ms.getNumBytesReceived() >= received_bytes + : String.format("msgs bytes expected: %d, actual: %d", received_bytes, ms.getNumBytesReceived()); + } + } + + protected static class MyReceiver implements Receiver { + protected final LongAdder num_ucasts=new LongAdder(), num_mcasts=new LongAdder(); + protected final LongAdder mcast_bytes=new LongAdder(), ucast_bytes=new LongAdder(); + + public int ucasts() {return (int)num_ucasts.sum();} + public int mcasts() {return (int)num_mcasts.sum();} + + @Override + public void receive(Message msg) { + (msg.dest() == null? num_mcasts : num_ucasts).increment(); + (msg.dest() == null? mcast_bytes : ucast_bytes).add(msg.getLength()); + } + + @Override + public void receive(MessageBatch batch) { + (batch.dest() == null? num_mcasts : num_ucasts).add(batch.size()); + (batch.dest() == null? mcast_bytes : ucast_bytes).add(batch.length()); + } + + public void reset() {Stream.of(num_mcasts, num_ucasts, mcast_bytes, ucast_bytes).forEach(LongAdder::reset);} + + @Override + public String toString() { + return String.format("%,d received (%s)", ucasts()+mcasts(), Util.printBytes(mcast_bytes.sum() + ucast_bytes.sum())); + } + } +} diff --git a/tests/junit/org/jgroups/tests/LoopbackTest.java b/tests/junit/org/jgroups/tests/LoopbackTest.java index 99db04ffc6..f48aee3097 100644 --- a/tests/junit/org/jgroups/tests/LoopbackTest.java +++ b/tests/junit/org/jgroups/tests/LoopbackTest.java @@ -76,7 +76,7 @@ protected void sendMessagesWithLoopback(boolean unicast) throws Exception { if(unicast) assert num_msgs_sent_after < NUM/10; else - assert num_msgs_sent_after <= NUM; // max of NUM single messages; probably some batches were sent + assert num_msgs_sent_after >= NUM; // max of NUM single messages; probably some batches were sent try { // wait for all messages to be received promise.getResultWithTimeout(TIMEOUT) ;