Skip to content

Commit

Permalink
- Fixed pickTargets() in UUPerf (would pick A,A in {A})
Browse files Browse the repository at this point in the history
- Code to prevent NPE in RpcDispatcher is options is null
- Stats for MessageDispatcher (https://issues.jboss.org/browse/JGRP-1456)
  • Loading branch information
belaban committed Apr 20, 2012
1 parent 7b8645f commit ffa98d8
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 16 deletions.
64 changes: 61 additions & 3 deletions src/org/jgroups/blocks/MessageDispatcher.java
Expand Up @@ -6,13 +6,15 @@
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.TP;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.util.*;

import java.io.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;


/**
Expand Down Expand Up @@ -45,8 +47,14 @@ public class MessageDispatcher implements RequestHandler, ChannelListener {
protected Address local_addr=null;
protected final Log log=LogFactory.getLog(getClass());
protected boolean hardware_multicast_supported=false;

protected final AtomicInteger sync_unicasts=new AtomicInteger(0);
protected final AtomicInteger async_unicasts=new AtomicInteger(0);
protected final AtomicInteger sync_multicasts=new AtomicInteger(0);
protected final AtomicInteger async_multicasts=new AtomicInteger(0);
protected final AtomicInteger sync_anycasts=new AtomicInteger(0);
protected final AtomicInteger async_anycasts=new AtomicInteger(0);
protected final Set<ChannelListener> channel_listeners=new CopyOnWriteArraySet<ChannelListener>();
protected final DiagnosticsHandler.ProbeHandler probe_handler=new MyProbeHandler();


public MessageDispatcher() {
Expand Down Expand Up @@ -110,9 +118,8 @@ public void removeChannelListener(ChannelListener l) {


public void start() {
if(corr == null) {
if(corr == null)
corr=createRequestCorrelator(prot_adapter, this, local_addr);
}
correlatorStarted();
corr.start();

Expand All @@ -125,6 +132,7 @@ public void start() {
}
TP transport=channel.getProtocolStack().getTransport();
hardware_multicast_supported=transport.supportsMulticasting();
transport.registerProbeHandler(probe_handler);
}
}

Expand All @@ -143,6 +151,7 @@ public void stop() {

if(channel instanceof JChannel) {
TP transport=channel.getProtocolStack().getTransport();
transport.unregisterProbeHandler(probe_handler);
corr.unregisterProbeHandler(transport);
}
}
Expand Down Expand Up @@ -295,6 +304,18 @@ protected <T> GroupRequest<T> cast(final Collection<Address> dests, Message msg,
return null;
}

if(options != null) {
boolean async=options.getMode() == ResponseMode.GET_NONE;
if(options.getAnycasting()) {
if(async) async_anycasts.incrementAndGet();
else sync_anycasts.incrementAndGet();
}
else {
if(async) async_multicasts.incrementAndGet();
else sync_multicasts.incrementAndGet();
}
}

GroupRequest<T> req=new GroupRequest<T>(msg, corr, real_dests, options);
if(options != null) {
req.setResponseFilter(options.getRspFilter());
Expand Down Expand Up @@ -335,6 +356,10 @@ public <T> T sendMessage(Message msg, RequestOptions opts) throws Exception {
msg.setFlag(opts.getFlags());
if(opts.getScope() > 0)
msg.setScope(opts.getScope());
if(opts.getMode() == ResponseMode.GET_NONE)
async_unicasts.incrementAndGet();
else
sync_unicasts.incrementAndGet();
}

UnicastRequest<T> req=new UnicastRequest<T>(msg, corr, dest, opts);
Expand Down Expand Up @@ -382,6 +407,10 @@ public <T> NotifyingFuture<T> sendMessageWithFuture(Message msg, RequestOptions
msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
if(options.getMode() == ResponseMode.GET_NONE)
async_unicasts.incrementAndGet();
else
sync_unicasts.incrementAndGet();
}

UnicastRequest<T> req=new UnicastRequest<T>(msg, corr, dest, options);
Expand Down Expand Up @@ -446,6 +475,35 @@ public void channelClosed(Channel channel) {
/* ----------------------------------------------------------------------- */


class MyProbeHandler implements DiagnosticsHandler.ProbeHandler {

public Map<String,String> handleProbe(String... keys) {
Map<String,String> retval=new HashMap<String,String>();
for(String key: keys) {
if("rpcs".equals(key)) {
retval.put("sync unicast RPCs", sync_unicasts.toString());
retval.put("sync multicast RPCs", sync_multicasts.toString());
retval.put("async unicast RPCs", async_unicasts.toString());
retval.put("async multicast RPCs", async_multicasts.toString());
retval.put("sync anycast RPCs", sync_anycasts.toString());
retval.put("async anycast RPCs", async_anycasts.toString());
}
if("rpcs-reset".equals(key)) {
sync_unicasts.set(0);
sync_multicasts.set(0);
async_unicasts.set(0);
async_multicasts.set(0);
sync_anycasts.set(0);
async_anycasts.set(0);
}
}
return retval;
}

public String[] supportedKeys() {
return new String[]{"rpcs", "rpcs-reset"};
}
}


class ProtocolAdapter extends Protocol implements UpHandler {
Expand Down
32 changes: 20 additions & 12 deletions src/org/jgroups/blocks/RpcDispatcher.java
Expand Up @@ -157,9 +157,11 @@ public <T> RspList<T> callRemoteMethods(Collection<Address> dests,
else
msg.setBuffer((byte[])buf);

msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
if(options != null) {
msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
}

RspList<T> retval=super.castMessage(dests, msg, options);
if(log.isTraceEnabled()) log.trace("responses: " + retval);
Expand Down Expand Up @@ -198,9 +200,11 @@ public <T> NotifyingFuture<RspList<T>> callRemoteMethodsWithFuture(Collection<Ad
msg.setBuffer((Buffer)buf);
else
msg.setBuffer((byte[])buf);
msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
if(options != null) {
msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
}

NotifyingFuture<RspList<T>> retval=super.castMessageWithFuture(dests, msg, options);
if(log.isTraceEnabled()) log.trace("responses: " + retval);
Expand Down Expand Up @@ -243,9 +247,11 @@ public <T> T callRemoteMethod(Address dest, MethodCall call, RequestOptions opti
msg.setBuffer((Buffer)buf);
else
msg.setBuffer((byte[])buf);
msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
if(options != null) {
msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
}

T retval=(T)super.sendMessage(msg, options);
if(log.isTraceEnabled()) log.trace("retval: " + retval);
Expand All @@ -272,9 +278,11 @@ public <T> NotifyingFuture<T> callRemoteMethodWithFuture(Address dest, MethodCal
msg.setBuffer((Buffer)buf);
else
msg.setBuffer((byte[])buf);
msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
if(options != null) {
msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
}
return super.sendMessageWithFuture(msg, options);
}

Expand Down
4 changes: 3 additions & 1 deletion tests/perf/org/jgroups/tests/perf/UPerf.java
Expand Up @@ -507,7 +507,9 @@ private Collection<Address> pickAnycastTargets() {
int index=dests.indexOf(local_addr);
for(int i=index + 1; i < index + 1 + anycast_count; i++) {
int new_index=i % dests.size();
anycast_targets.add(dests.get(new_index));
Address tmp=dests.get(new_index);
if(!anycast_targets.contains(tmp))
anycast_targets.add(tmp);
}
return anycast_targets;
}
Expand Down

0 comments on commit ffa98d8

Please sign in to comment.