Skip to content

Commit

Permalink
Added method to set RequestCorrelator in MessageDispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Jun 22, 2016
1 parent 6664a14 commit 71d1f25
Showing 1 changed file with 12 additions and 43 deletions.
55 changes: 12 additions & 43 deletions src/org/jgroups/blocks/MessageDispatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import org.jgroups.logging.LogFactory;
import org.jgroups.protocols.TP;
import org.jgroups.protocols.relay.SiteAddress;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.StateTransferInfo;
import org.jgroups.util.*;
Expand Down Expand Up @@ -51,7 +50,6 @@ public class MessageDispatcher implements AsyncRequestHandler, ChannelListener,
protected boolean hardware_multicast_supported=false;
protected final Set<ChannelListener> channel_listeners=new CopyOnWriteArraySet<>();
protected final RpcStats rpc_stats=new RpcStats(false);
protected final DiagnosticsHandler.ProbeHandler probe_handler=new MyProbeHandler();


public MessageDispatcher() {
Expand Down Expand Up @@ -104,6 +102,18 @@ public UpHandler getProtocolAdapter() {
return prot_adapter;
}

public RequestCorrelator correlator() {return corr;}

public <T extends MessageDispatcher> T correlator(RequestCorrelator c) {
if(c == null)
return (T)this;
stop();
this.corr=c;
corr.asyncDispatching(this.async_dispatching).wrapExceptions(this.wrap_exceptions);
start();
return (T)this;
}



/**
Expand Down Expand Up @@ -632,47 +642,6 @@ protected Object handleUpEvent(Event evt) throws Exception {
}


protected class MyProbeHandler implements DiagnosticsHandler.ProbeHandler {

@Override
public Map<String,String> handleProbe(String... keys) {
Map<String,String> retval=new HashMap<>(16);
for(String key: keys) {
switch(key) {
case "rpcs":
String channel_name=channel != null? channel.getClusterName() : "";
retval.put(channel_name + ": sync unicast RPCs", String.valueOf(rpc_stats.unicasts(true)));
retval.put(channel_name + ": sync multicast RPCs", String.valueOf(rpc_stats.multicasts(true)));
retval.put(channel_name + ": async unicast RPCs", String.valueOf(rpc_stats.unicasts(false)));
retval.put(channel_name + ": async multicast RPCs", String.valueOf(rpc_stats.multicasts(false)));
retval.put(channel_name + ": sync anycast RPCs", String.valueOf(rpc_stats.anycasts(true)));
retval.put(channel_name + ": async anycast RPCs", String.valueOf(rpc_stats.anycasts(false)));
break;
case "rpcs-reset":
rpc_stats.reset();
break;
case "rpcs-enable-details":
rpc_stats.extendedStats(true);
break;
case "rpcs-disable-details":
rpc_stats.extendedStats(false);
break;
case "rpcs-details":
if(!rpc_stats.extendedStats())
retval.put(key, "<details not enabled: use rpc-enable-details to enable>");
else
retval.put(key, rpc_stats.printOrderByDest());
break;
}
}
return retval;
}

@Override
public String[] supportedKeys() {
return new String[]{"rpcs", "rpcs-reset", "rpcs-enable-details", "rpcs-disable-details", "rpcs-details"};
}
}


class ProtocolAdapter extends Protocol implements UpHandler {
Expand Down

0 comments on commit 71d1f25

Please sign in to comment.