Skip to content

Commit

Permalink
- Implemented AlternatingBundler and RemoveQueueBundler (https://issu…
Browse files Browse the repository at this point in the history
…es.jboss.org/browse/JGRP-2171)

- Added drainTo() methods to RingBuffer
- Attributes in JmxAdditionalObjects can now be set, too
  • Loading branch information
belaban committed Jun 13, 2017
1 parent 95488fb commit 05e5bb8
Show file tree
Hide file tree
Showing 8 changed files with 655 additions and 63 deletions.
8 changes: 5 additions & 3 deletions src/org/jgroups/JChannelProbeHandler.java
Expand Up @@ -221,12 +221,12 @@ protected void handleJmx(Map<String, String> map, String input) {
}
else {
// try to find a setter for X, e.g. x(type-of-x) or setX(type-of-x)
ResourceDMBean.Accessor setter=ResourceDMBean.findSetter(target, attrname); // Util.getSetter(prot.getClass(), attrname);
ResourceDMBean.Accessor setter=ResourceDMBean.findSetter(target, attrname);
if(setter == null && target instanceof AdditionalJmxObjects) {
Object[] objs=((AdditionalJmxObjects)target).getJmxObjects();
if(objs != null && objs.length > 0) {
for(Object o: objs) {
setter=o != null? ResourceDMBean.findSetter(target, attrname) : null;
setter=o != null? ResourceDMBean.findSetter(o, attrname) : null;
if(setter!= null)
break;
}
Expand All @@ -246,8 +246,10 @@ protected void handleJmx(Map<String, String> map, String input) {
log.error("unable to invoke %s() on %s: %s", setter, protocol_name, e);
}
}
else
else {
log.warn(Util.getMessage("FieldNotFound"), attrname, protocol_name);
setter=new ResourceDMBean.NoopAccessor();
}
}

it.remove();
Expand Down
8 changes: 5 additions & 3 deletions src/org/jgroups/jmx/ResourceDMBean.java
Expand Up @@ -260,13 +260,15 @@ else if (method.isAnnotationPresent(ManagedOperation.class) || expose_all){
}
}

/** Provides field-based getter and/or setters for all attributes in atts if not present */
/** Provides field-based getter and/or setters for all attributes in attrs if not present */
protected void fixFields(Object instance) {
for(AttributeEntry attr: atts.values()) {
if(attr.getter == null)
attr.getter=findGetter(instance, attr.name);
if(attr.setter == null)
attr.setter=findSetter(instance, attr.name);
if(attr.setter == null)
attr.setter=new NoopAccessor();
}
}

Expand Down Expand Up @@ -385,7 +387,7 @@ public static Accessor findSetter(Object target, String attr_name) {
if(field != null)
return new FieldAccessor(field, target);

return new NoopAccessor();
return null;
}

/** Returns a string with the first letter being lowercase */
Expand Down Expand Up @@ -557,7 +559,7 @@ public Object invoke(Object new_val) throws Exception {
}


protected static class NoopAccessor implements Accessor {
public static class NoopAccessor implements Accessor {
public Object invoke(Object new_val) throws Exception {return null;}

public String toString() {return "NoopAccessor";}
Expand Down
120 changes: 120 additions & 0 deletions src/org/jgroups/protocols/AlternatingBundler.java
@@ -0,0 +1,120 @@
package org.jgroups.protocols;

import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.annotations.Experimental;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.Util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Bundler implementation which sends message batches (or single messages) as soon as the target destination changes
* (or max_bundler_size would be exceeded).<br/>
* Messages are removed from the main queue one by one and processed as follows:<br/>
* A B B C C A causes the following sends: A -> {CC} -> {BB} -> A<br/>
* Note that <em>null</em> is also a valid destination (send-to-all).<br/>
* JIRA: https://issues.jboss.org/browse/JGRP-2171
* @author Bela Ban
* @since 4.0.4
*/
@Experimental
public class AlternatingBundler extends TransferQueueBundler implements DiagnosticsHandler.ProbeHandler {
protected Address target_dest; // the current destination
protected final List<Message> target_list=new ArrayList<>(); // msgs for the current dest; flushed on dest change
protected final AverageMinMax avg_batch_size=new AverageMinMax();


public synchronized void start() {
super.start();
transport.registerProbeHandler(this);
}

public synchronized void stop() {
transport.unregisterProbeHandler(this);
super.stop();
}

public void run() {
while(running) {
Message msg=null;
try {
if((msg=queue.take()) == null) // block until first message is available
continue;
long size=msg.size();
if(count + size >= transport.getMaxBundleSize()) {
num_sends_because_full_queue++;
fill_count.add(count);
_sendBundledMessages();
}

for(;;) {
Address dest=msg.dest();
if(!Util.match(dest, target_dest) || count + size >= transport.getMaxBundleSize())
_sendBundledMessages();
_addMessage(msg, size);
msg=queue.poll();
if(msg == null)
break;
size=msg.size();
}
_sendBundledMessages();
}
catch(Throwable t) {
}
}
}

public Map<String,String> handleProbe(String... keys) {
Map<String,String> map=new HashMap<>();
for(String key: keys) {
switch(key) {
case "ab.avg_batch_size":
map.put(key, avg_batch_size.toString());
break;
case "ab.avg_batch_size.reset":
avg_batch_size.clear();
break;
}
}
return map;
}

public String[] supportedKeys() {
return new String[]{"ab.avg_batch_size", "ab.avg_batch_size.reset"};
}

protected void _sendBundledMessages() {
try {
if(target_list.isEmpty())
return;
output.position(0);
if(target_list.size() == 1) {
sendSingleMessage(target_list.get(0));
// avg_batch_size.add(1);
}
else {
avg_batch_size.add(target_list.size());
sendMessageList(target_dest, target_list.get(0).getSrc(), target_list);
if(transport.statsEnabled())
transport.incrBatchesSent(1);
}
}
finally {
target_list.clear();
count=0;
}
}

protected void _addMessage(Message msg, long size) {
target_dest=msg.dest();
target_list.add(msg);
count+=size;
}


}
119 changes: 119 additions & 0 deletions src/org/jgroups/protocols/RemoveQueueBundler.java
@@ -0,0 +1,119 @@
package org.jgroups.protocols;

import org.jgroups.Address;
import org.jgroups.Message;
import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.RingBuffer;
import org.jgroups.util.Runner;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Bundler implementation which sends message batches (or single messages) as soon as the remove queue is full
* (or max_bundler_size would be exceeded).<br/>
* Messages are removed from the main queue and processed as follows (assuming they all fit into the remove queue):<br/>
* A B B C C A causes the following sends: {AA} -> {CC} -> {BB}<br/>
* Note that <em>null</em> is also a valid destination (send-to-all).<br/>
* Contrary to {@link TransferQueueBundler}, this bundler uses a {@link RingBuffer} rather than an ArrayBlockingQueue
* and the size of the remove queue is fixed. TransferQueueBundler increases the size of the remove queue
* dynamically, which leads to higher latency if the remove queue grows too much.
* <br/>
* JIRA: https://issues.jboss.org/browse/JGRP-2171
* @author Bela Ban
* @since 4.0.4
*/
@Experimental
public class RemoveQueueBundler extends BaseBundler {
protected RingBuffer<Message> rb;
protected Runner runner;
protected Message[] remove_queue;
protected final AverageMinMax avg_batch_size=new AverageMinMax();
protected int queue_size=1024;

@ManagedAttribute(description="Remove queue size")
public int rqbRemoveQueueSize() {return remove_queue.length;}

@ManagedAttribute(description="Sets the size of the remove queue; creates a new remove queue")
public void rqbRemoveQueueSize(int size) {
if(size == queue_size) return;
queue_size=size;
remove_queue=new Message[queue_size];
}

@ManagedAttribute(description="Average batch length")
public String rqbAvgBatchSize() {return avg_batch_size.toString();}

@ManagedAttribute(description="Current number of messages (to be sent) in the ring buffer")
public int rqbRingBufferSize() {return rb.size();}

public Map<String,Object> getStats() {
Map<String,Object> map=new HashMap<>();
map.put("avg-batch-size", avg_batch_size.toString());
map.put("ring-buffer-size", rb.size());
map.put("remove-queue-size", queue_size);
return map;
}

public void resetStats() {
avg_batch_size.clear();
}

public void init(TP transport) {
super.init(transport);
rb=new RingBuffer(Message.class, transport.getBundlerCapacity());
remove_queue=new Message[queue_size];
runner=new Runner(new DefaultThreadFactory("aqb", true, true), "runner", this::run, null);
}

public synchronized void start() {
super.start();
runner.start();
}

public synchronized void stop() {
runner.stop();
super.stop();
}

public void send(Message msg) throws Exception {
rb.put(msg);
}

public void run() {
try {
int drained=rb.drainToBlocking(remove_queue);
if(drained == 1) {
output.position(0);
sendSingleMessage(remove_queue[0]);
return;
}

for(int i=0; i < drained; i++) {
Message msg=remove_queue[i];
long size=msg.size();
if(count + size >= transport.getMaxBundleSize())
sendBundledMessages();
addMessage(msg, msg.size());
}
sendBundledMessages();
}
catch(Throwable t) {
}
}

public int size() {
return rb.size();
}

protected void sendMessageList(Address dest, Address src, List<Message> list) {
super.sendMessageList(dest, src, list);
avg_batch_size.add(list.size());
}


}
8 changes: 7 additions & 1 deletion src/org/jgroups/protocols/TP.java
Expand Up @@ -327,7 +327,7 @@ public TP setThreadPoolKeepAliveTime(long time) {
public long getThreadPoolKeepAliveTime() {return thread_pool_keep_alive_time;}

public Object[] getJmxObjects() {
return new Object[]{msg_stats, msg_processing_policy};
return new Object[]{msg_stats, msg_processing_policy, bundler};
}

public <T extends Protocol> T setLevel(String level) {
Expand Down Expand Up @@ -1127,6 +1127,12 @@ protected Bundler createBundler(String type) {
case "async-no-bundler":
case "anb":
return new AsyncNoBundler();
case "ab":
case "alternating-bundler":
return new AlternatingBundler();
case "rqb": case "rq":
case "remove-queue-bundler": case "remove-queue":
return new RemoveQueueBundler();
}

try {
Expand Down

0 comments on commit 05e5bb8

Please sign in to comment.