Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

- Fixed pickTargets() in UUPerf (would pick A,A in {A})

- Code to prevent NPE in RpcDispatcher is options is null
- Stats for MessageDispatcher (https://issues.jboss.org/browse/JGRP-1456)
  • Loading branch information...
commit 60342bb055b1033a7ba5ceb24564612ba00f1d5f 1 parent f480200
@belaban authored
View
65 src/org/jgroups/blocks/MessageDispatcher.java
@@ -6,6 +6,7 @@
import org.jgroups.logging.Log;
import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.TP;
+import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.util.*;
@@ -13,6 +14,7 @@
import java.io.*;
import java.util.*;
import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -45,8 +47,14 @@
protected Address local_addr=null;
protected final Log log=LogFactory.getLog(getClass());
protected boolean hardware_multicast_supported=false;
-
+ protected final AtomicInteger sync_unicasts=new AtomicInteger(0);
+ protected final AtomicInteger async_unicasts=new AtomicInteger(0);
+ protected final AtomicInteger sync_multicasts=new AtomicInteger(0);
+ protected final AtomicInteger async_multicasts=new AtomicInteger(0);
+ protected final AtomicInteger sync_anycasts=new AtomicInteger(0);
+ protected final AtomicInteger async_anycasts=new AtomicInteger(0);
protected final Set<ChannelListener> channel_listeners=new CopyOnWriteArraySet<ChannelListener>();
+ protected final DiagnosticsHandler.ProbeHandler probe_handler=new MyProbeHandler();
public MessageDispatcher() {
@@ -83,7 +91,6 @@ public UpHandler getProtocolAdapter() {
-
/**
* If this dispatcher is using a user-provided PullPushAdapter, then need to set the members from the adapter
* initially since viewChange has most likely already been called in PullPushAdapter.
@@ -111,9 +118,8 @@ public void removeChannelListener(ChannelListener l) {
public void start() {
- if(corr == null) {
+ if(corr == null)
corr=createRequestCorrelator(prot_adapter, this, local_addr);
- }
correlatorStarted();
corr.start();
@@ -126,6 +132,7 @@ public void start() {
}
TP transport=channel.getProtocolStack().getTransport();
hardware_multicast_supported=transport.supportsMulticasting();
+ transport.registerProbeHandler(probe_handler);
}
}
@@ -144,6 +151,7 @@ public void stop() {
if(channel instanceof JChannel) {
TP transport=channel.getProtocolStack().getTransport();
+ transport.unregisterProbeHandler(probe_handler);
corr.unregisterProbeHandler(transport);
}
}
@@ -296,6 +304,18 @@ else if (canReplace) {
return null;
}
+ if(options != null) {
+ boolean async=options.getMode() == ResponseMode.GET_NONE;
+ if(options.getAnycasting()) {
+ if(async) async_anycasts.incrementAndGet();
+ else sync_anycasts.incrementAndGet();
+ }
+ else {
+ if(async) async_multicasts.incrementAndGet();
+ else sync_multicasts.incrementAndGet();
+ }
+ }
+
GroupRequest<T> req=new GroupRequest<T>(msg, corr, real_dests, options);
if(options != null) {
req.setResponseFilter(options.getRspFilter());
@@ -336,6 +356,10 @@ public void done(long req_id) {
msg.setFlag(opts.getFlags());
if(opts.getScope() > 0)
msg.setScope(opts.getScope());
+ if(opts.getMode() == ResponseMode.GET_NONE)
+ async_unicasts.incrementAndGet();
+ else
+ sync_unicasts.incrementAndGet();
}
UnicastRequest<T> req=new UnicastRequest<T>(msg, corr, dest, opts);
@@ -383,6 +407,10 @@ public void done(long req_id) {
msg.setFlag(options.getFlags());
if(options.getScope() > 0)
msg.setScope(options.getScope());
+ if(options.getMode() == ResponseMode.GET_NONE)
+ async_unicasts.incrementAndGet();
+ else
+ sync_unicasts.incrementAndGet();
}
UnicastRequest<T> req=new UnicastRequest<T>(msg, corr, dest, options);
@@ -447,6 +475,35 @@ public void channelClosed(Channel channel) {
/* ----------------------------------------------------------------------- */
+ class MyProbeHandler implements DiagnosticsHandler.ProbeHandler {
+
+ public Map<String,String> handleProbe(String... keys) {
+ Map<String,String> retval=new HashMap<String,String>();
+ for(String key: keys) {
+ if("rpcs".equals(key)) {
+ retval.put("sync unicast RPCs", sync_unicasts.toString());
+ retval.put("sync multicast RPCs", sync_multicasts.toString());
+ retval.put("async unicast RPCs", async_unicasts.toString());
+ retval.put("async multicast RPCs", async_multicasts.toString());
+ retval.put("sync anycast RPCs", sync_anycasts.toString());
+ retval.put("async anycast RPCs", async_anycasts.toString());
+ }
+ if("rpcs-reset".equals(key)) {
+ sync_unicasts.set(0);
+ sync_multicasts.set(0);
+ async_unicasts.set(0);
+ async_multicasts.set(0);
+ sync_anycasts.set(0);
+ async_anycasts.set(0);
+ }
+ }
+ return retval;
+ }
+
+ public String[] supportedKeys() {
+ return new String[]{"rpcs", "rpcs-reset"};
+ }
+ }
class ProtocolAdapter extends Protocol implements UpHandler {
View
34 src/org/jgroups/blocks/RpcDispatcher.java
@@ -157,9 +157,11 @@ public void setMethodLookup(MethodLookup method_lookup) {
else
msg.setBuffer((byte[])buf);
- msg.setFlag(options.getFlags());
- if(options.getScope() > 0)
- msg.setScope(options.getScope());
+ if(options != null) {
+ msg.setFlag(options.getFlags());
+ if(options.getScope() > 0)
+ msg.setScope(options.getScope());
+ }
RspList<T> retval=super.castMessage(dests, msg, options);
if(log.isTraceEnabled()) log.trace("responses: " + retval);
@@ -198,10 +200,12 @@ public void setMethodLookup(MethodLookup method_lookup) {
msg.setBuffer((Buffer)buf);
else
msg.setBuffer((byte[])buf);
- msg.setFlag(options.getFlags());
- if(options.getScope() > 0)
- msg.setScope(options.getScope());
-
+ if(options != null) {
+ msg.setFlag(options.getFlags());
+ if(options.getScope() > 0)
+ msg.setScope(options.getScope());
+ }
+
NotifyingFuture<RspList<T>> retval=super.castMessageWithFuture(dests, msg, options);
if(log.isTraceEnabled()) log.trace("responses: " + retval);
return retval;
@@ -243,9 +247,11 @@ public void setMethodLookup(MethodLookup method_lookup) {
msg.setBuffer((Buffer)buf);
else
msg.setBuffer((byte[])buf);
- msg.setFlag(options.getFlags());
- if(options.getScope() > 0)
- msg.setScope(options.getScope());
+ if(options != null) {
+ msg.setFlag(options.getFlags());
+ if(options.getScope() > 0)
+ msg.setScope(options.getScope());
+ }
T retval=(T)super.sendMessage(msg, options);
if(log.isTraceEnabled()) log.trace("retval: " + retval);
@@ -272,9 +278,11 @@ public void setMethodLookup(MethodLookup method_lookup) {
msg.setBuffer((Buffer)buf);
else
msg.setBuffer((byte[])buf);
- msg.setFlag(options.getFlags());
- if(options.getScope() > 0)
- msg.setScope(options.getScope());
+ if(options != null) {
+ msg.setFlag(options.getFlags());
+ if(options.getScope() > 0)
+ msg.setScope(options.getScope());
+ }
return super.sendMessageWithFuture(msg, options);
}
View
4 tests/other/org/jgroups/tests/UnicastTestRpcDist.java
@@ -487,7 +487,9 @@ private Address pickTarget() {
int index=dests.indexOf(local_addr);
for(int i=index + 1; i < index + 1 + anycast_count; i++) {
int new_index=i % dests.size();
- anycast_targets.add(dests.get(new_index));
+ Address tmp = dests.get(new_index);
+ if(!anycast_targets.contains(tmp))
+ anycast_targets.add(tmp);
}
return anycast_targets;
}
Please sign in to comment.
Something went wrong with that request. Please try again.