diff --git a/src/org/jgroups/blocks/MessageDispatcher.java b/src/org/jgroups/blocks/MessageDispatcher.java index f6f26dba408..cd9eb4646c5 100644 --- a/src/org/jgroups/blocks/MessageDispatcher.java +++ b/src/org/jgroups/blocks/MessageDispatcher.java @@ -7,7 +7,6 @@ import org.jgroups.logging.LogFactory; import org.jgroups.protocols.TP; import org.jgroups.protocols.relay.SiteAddress; -import org.jgroups.stack.DiagnosticsHandler; import org.jgroups.stack.Protocol; import org.jgroups.stack.StateTransferInfo; import org.jgroups.util.*; @@ -51,7 +50,6 @@ public class MessageDispatcher implements AsyncRequestHandler, ChannelListener, protected boolean hardware_multicast_supported=false; protected final Set channel_listeners=new CopyOnWriteArraySet<>(); protected final RpcStats rpc_stats=new RpcStats(false); - protected final DiagnosticsHandler.ProbeHandler probe_handler=new MyProbeHandler(); public MessageDispatcher() { @@ -104,6 +102,18 @@ public UpHandler getProtocolAdapter() { return prot_adapter; } + public RequestCorrelator correlator() {return corr;} + + public T correlator(RequestCorrelator c) { + if(c == null) + return (T)this; + stop(); + this.corr=c; + corr.asyncDispatching(this.async_dispatching).wrapExceptions(this.wrap_exceptions); + start(); + return (T)this; + } + /** @@ -632,47 +642,6 @@ protected Object handleUpEvent(Event evt) throws Exception { } - protected class MyProbeHandler implements DiagnosticsHandler.ProbeHandler { - - @Override - public Map handleProbe(String... keys) { - Map retval=new HashMap<>(16); - for(String key: keys) { - switch(key) { - case "rpcs": - String channel_name=channel != null? channel.getClusterName() : ""; - retval.put(channel_name + ": sync unicast RPCs", String.valueOf(rpc_stats.unicasts(true))); - retval.put(channel_name + ": sync multicast RPCs", String.valueOf(rpc_stats.multicasts(true))); - retval.put(channel_name + ": async unicast RPCs", String.valueOf(rpc_stats.unicasts(false))); - retval.put(channel_name + ": async multicast RPCs", String.valueOf(rpc_stats.multicasts(false))); - retval.put(channel_name + ": sync anycast RPCs", String.valueOf(rpc_stats.anycasts(true))); - retval.put(channel_name + ": async anycast RPCs", String.valueOf(rpc_stats.anycasts(false))); - break; - case "rpcs-reset": - rpc_stats.reset(); - break; - case "rpcs-enable-details": - rpc_stats.extendedStats(true); - break; - case "rpcs-disable-details": - rpc_stats.extendedStats(false); - break; - case "rpcs-details": - if(!rpc_stats.extendedStats()) - retval.put(key, "
"); - else - retval.put(key, rpc_stats.printOrderByDest()); - break; - } - } - return retval; - } - - @Override - public String[] supportedKeys() { - return new String[]{"rpcs", "rpcs-reset", "rpcs-enable-details", "rpcs-disable-details", "rpcs-details"}; - } - } class ProtocolAdapter extends Protocol implements UpHandler {