Skip to content

Commit

Permalink
Added TP.bundlerStats() (https://issues.jboss.org/browse/JGRP-2173)
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jun 6, 2017
1 parent 5a345bc commit f751d2a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 6 deletions.
10 changes: 10 additions & 0 deletions src/org/jgroups/protocols/Bundler.java
Expand Up @@ -3,6 +3,8 @@
import org.jgroups.Message; import org.jgroups.Message;
import org.jgroups.View; import org.jgroups.View;


import java.util.Map;

/** /**
* Pluggable way to collect messages and send them as batches * Pluggable way to collect messages and send them as batches
* @author Bela Ban * @author Bela Ban
Expand All @@ -23,4 +25,12 @@ default void viewChange(View view) {}


/** The number of unsent messages in the bundler */ /** The number of unsent messages in the bundler */
int size(); int size();

/**
* Returns stats about the bundler itself.
* @return Stats, may be null
*/
default Map<String,Object> getStats() {return null;}

default void resetStats() {}
} }
9 changes: 9 additions & 0 deletions src/org/jgroups/protocols/TP.java
Expand Up @@ -925,6 +925,15 @@ public void start() throws Exception {
setInAllThreadFactories(cluster_name != null? cluster_name.toString() : null, local_addr, thread_naming_pattern); setInAllThreadFactories(cluster_name != null? cluster_name.toString() : null, local_addr, thread_naming_pattern);
} }


@ManagedOperation(description="Returns stats about the current bundler")
public String bundlerStats() {
Map<String,Object> tmp=bundler.getStats();
return tmp != null? tmp.toString() : "n/a";
}

@ManagedOperation(description="Resets stats of the current bundler")
public void bundlerStatsReset() {bundler.resetStats();}

@ManagedOperation(description="Creates and sets a new bundler. Type has to be either a bundler_type or the fully " + @ManagedOperation(description="Creates and sets a new bundler. Type has to be either a bundler_type or the fully " +
"qualified classname of a Bundler impl. Stops the current bundler (if running)") "qualified classname of a Bundler impl. Stops the current bundler (if running)")
public void bundler(String type) { public void bundler(String type) {
Expand Down
44 changes: 38 additions & 6 deletions src/org/jgroups/protocols/TransferQueueBundler.java
Expand Up @@ -2,9 +2,12 @@




import org.jgroups.Message; import org.jgroups.Message;
import org.jgroups.util.AverageMinMax;


import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;


Expand All @@ -17,6 +20,9 @@ public class TransferQueueBundler extends BaseBundler implements Runnable {
protected List<Message> remove_queue; protected List<Message> remove_queue;
protected volatile Thread bundler_thread; protected volatile Thread bundler_thread;
protected volatile boolean running=true; protected volatile boolean running=true;
protected int num_sends_because_full_queue;
protected int num_sends_because_no_msgs;
protected final AverageMinMax fill_count=new AverageMinMax(); // avg number of bytes when a batch is sent
protected static final String THREAD_NAME="TQ-Bundler"; protected static final String THREAD_NAME="TQ-Bundler";


public TransferQueueBundler() { public TransferQueueBundler() {
Expand All @@ -37,10 +43,27 @@ public TransferQueueBundler(int capacity) {
public int removeQueueSize() {return remove_queue.size();} public int removeQueueSize() {return remove_queue.size();}
public TransferQueueBundler removeQueueSize(int size) {this.remove_queue=new ArrayList<>(size); return this;} public TransferQueueBundler removeQueueSize(int size) {this.remove_queue=new ArrayList<>(size); return this;}


public void init(TP transport) { @Override
super.init(transport); public Map<String,Object> getStats() {
Map<String,Object> retval=super.getStats();
if(retval == null)
retval=new HashMap<>(3);
retval.put("sends_because_full", num_sends_because_full_queue);
retval.put("sends_because_no_msgs", num_sends_because_no_msgs);
retval.put("avg_fill_count", fill_count);
return retval;
}

@Override
public void resetStats() {
num_sends_because_full_queue=num_sends_because_no_msgs=0;
fill_count.clear();
}

public void init(TP tp) {
super.init(tp);
if(queue == null) if(queue == null)
queue=new ArrayBlockingQueue<>(assertPositive(transport.getBundlerCapacity(), "bundler capacity cannot be " + transport.getBundlerCapacity())); queue=new ArrayBlockingQueue<>(assertPositive(tp.getBundlerCapacity(), "bundler capacity cannot be " + tp.getBundlerCapacity()));
} }


public synchronized void start() { public synchronized void start() {
Expand Down Expand Up @@ -81,8 +104,11 @@ public void run() {
if((msg=queue.take()) == null) if((msg=queue.take()) == null)
continue; continue;
long size=msg.size(); long size=msg.size();
if(count + size >= transport.getMaxBundleSize()) if(count + size >= transport.getMaxBundleSize()) {
num_sends_because_full_queue++;
fill_count.add(count);
_sendBundledMessages(); _sendBundledMessages();
}
_addMessage(msg, size); _addMessage(msg, size);
while(true) { while(true) {
remove_queue.clear(); remove_queue.clear();
Expand All @@ -92,13 +118,19 @@ public void run() {
for(int i=0; i < remove_queue.size(); i++) { for(int i=0; i < remove_queue.size(); i++) {
msg=remove_queue.get(i); msg=remove_queue.get(i);
size=msg.size(); size=msg.size();
if(count + size >= transport.getMaxBundleSize()) if(count + size >= transport.getMaxBundleSize()) {
num_sends_because_full_queue++;
fill_count.add(count);
_sendBundledMessages(); _sendBundledMessages();
}
_addMessage(msg, size); _addMessage(msg, size);
} }
} }
if(count > 0) if(count > 0) {
num_sends_because_no_msgs++;
fill_count.add(count);
_sendBundledMessages(); _sendBundledMessages();
}
} }
catch(Throwable t) { catch(Throwable t) {
} }
Expand Down

0 comments on commit f751d2a

Please sign in to comment.