Permalink
Browse files

Added stats to RELAY2 (https://issues.jboss.org/browse/JGRP-1547)

  • Loading branch information...
1 parent fd08bf7 commit 836461046cf60a2284c81b8064aaeac8577c10a8 Bela Ban committed Dec 21, 2012
@@ -19,6 +19,8 @@
import java.io.DataOutput;
import java.io.InputStream;
import java.util.*;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
/**
*
@@ -73,7 +75,7 @@
@Property(description="If true, logs a warning if the FORWARD_TO_COORD protocol is not found. This property might " +
"get deprecated soon")
- protected boolean warn_when_ftc_missing=true;
+ protected boolean warn_when_ftc_missing=false;
/* --------------------------------------------- Fields ------------------------------------------------ */
@@ -103,6 +105,29 @@
// protocol IDs above RELAY2
protected short[] prots_above;
+ /** Number of messages forwarded to the local SiteMaster */
+ protected final AtomicLong forward_to_site_master=new AtomicLong(0);
+
+ protected final AtomicLong forward_sm_time=new AtomicLong(0);
+
+ /** Number of messages relayed by the local SiteMaster to a remote SiteMaster */
+ protected final AtomicLong relayed=new AtomicLong(0);
+
+ /** Total time spent relaying messages from the local SiteMaster to remote SiteMasters (in ns) */
+ protected final AtomicLong relayed_time=new AtomicLong(0);
+
+ /** Number of messages (received from a remote Sitemaster and) delivered by the local SiteMaster to a local node */
+ protected final AtomicLong forward_to_local_mbr=new AtomicLong(0);
+
+ protected final AtomicLong forward_to_local_mbr_time=new AtomicLong(0);
+
+ /** Number of messages delivered locally, e.g. received and delivered to self */
+ protected final AtomicLong local_deliveries=new AtomicLong(0);
+
+ /** Total time (ms) for received messages that are delivered locally */
+ protected final AtomicLong local_delivery_time=new AtomicLong(0);
+
+
// Fluent configuration
public RELAY2 site(String site_name) {site=site_name; return this;}
@@ -126,10 +151,70 @@
public int forwardQueueMaxSize() {return fwd_queue_max_size;}
public long siteDownTimeout() {return site_down_timeout;}
public boolean asyncRelayCreation() {return async_relay_creation;}
-
-
public Address getLocalAddress() {return local_addr;}
public TimeScheduler getTimer() {return timer;}
+ public void incrementRelayed() {relayed.incrementAndGet();}
+ public void addToRelayedTime(long delta) {relayed_time.addAndGet(delta);}
+
+ @ManagedAttribute(description="Number of messages forwarded to the local SiteMaster")
+ public long getNumForwardedToSiteMaster() {return forward_to_site_master.get();}
+
+ @ManagedAttribute(description="The total time (in ms) spent forwarding messages to the local SiteMaster")
+ public long getTimeForwardingToSM() {return TimeUnit.MILLISECONDS.convert(forward_sm_time.get(),TimeUnit.NANOSECONDS);}
+
+ @ManagedAttribute(description="The average number of messages / s for forwarding messages to the local SiteMaster")
+ public long getAvgMsgsForwardingToSM() {return getTimeForwardingToSM() > 0?
+ (long)(getNumForwardedToSiteMaster() / (getTimeForwardingToSM()/1000.0)) : 0;}
+
+
+
+ @ManagedAttribute(description="Number of messages sent by this SiteMaster to a remote SiteMaster")
+ public long getNumRelayed() {return relayed.get();}
+
+ @ManagedAttribute(description="The total time (ms) spent relaying messages from this SiteMaster to remote SiteMasters")
+ public long getTimeRelaying() {return TimeUnit.MILLISECONDS.convert(relayed_time.get(), TimeUnit.NANOSECONDS);}
+
+ @ManagedAttribute(description="The average number of messages / s for relaying messages from this SiteMaster to remote SiteMasters")
+ public long getAvgMsgsRelaying() {return getTimeRelaying() > 0? (long)(getNumRelayed() / (getTimeRelaying()/1000.0)) : 0;}
+
+
+
+ @ManagedAttribute(description="Number of messages (received from a remote Sitemaster and) delivered " +
+ "by this SiteMaster to a local node")
+ public long getNumForwardedToLocalMbr() {return forward_to_local_mbr.get();}
+
+ @ManagedAttribute(description="The total time (in ms) spent forwarding messages to a member in the same site")
+ public long getTimeForwardingToLocalMbr() {return TimeUnit.MILLISECONDS.convert(forward_to_local_mbr_time.get(),TimeUnit.NANOSECONDS);}
+
+ @ManagedAttribute(description="The average number of messages / s for forwarding messages to a member in the same site")
+ public long getAvgMsgsForwardingToLocalMbr() {return getTimeForwardingToLocalMbr() > 0?
+ (long)(getNumForwardedToLocalMbr() / (getTimeForwardingToLocalMbr()/1000.0)) : 0;}
+
+
+
+
+ @ManagedAttribute(description="Number of messages delivered locally, e.g. received and delivered to self")
+ public long getNumLocalDeliveries() {return local_deliveries.get();}
+
+ @ManagedAttribute(description="The total time (ms) spent delivering received messages locally")
+ public long getTimeDeliveringLocally() {return TimeUnit.MILLISECONDS.convert(local_delivery_time.get(),TimeUnit.NANOSECONDS);}
+
+ @ManagedAttribute(description="The average number of messages / s for delivering received messages locally")
+ public long getAvgMsgsDeliveringLocally() {return getTimeDeliveringLocally() > 0?
+ (long)(getNumLocalDeliveries() / (getTimeDeliveringLocally()/1000.0)) : 0;}
+
+
+ public void resetStats() {
+ super.resetStats();
+ forward_to_site_master.set(0);
+ forward_sm_time.set(0);
+ relayed.set(0);
+ relayed_time.set(0);
+ forward_to_local_mbr.set(0);
+ forward_to_local_mbr_time.set(0);
+ local_deliveries.set(0);
+ local_delivery_time.set(0);
+ }
public View getBridgeView(String cluster_name) {
Relayer tmp=relayer;
@@ -283,16 +368,27 @@ public Object down(Event evt) {
if(target.getSite() == site_id ) {
if(local_addr.equals(target) || (target instanceof SiteMaster && is_coord)) {
// we cannot simply pass msg down, as the transport doesn't know how to send a message to a (e.g.) SiteMaster
+ long start=stats? System.nanoTime() : 0;
forwardTo(local_addr, target, sender, msg, false);
+ if(stats) {
+ local_delivery_time.addAndGet(System.nanoTime() - start);
+ local_deliveries.incrementAndGet();
+ }
}
else
deliverLocally(target, sender, msg);
return null;
}
// forward to the coordinator unless we're the coord (then route the message directly)
- if(!is_coord)
- forwardTo(coord, target, sender, msg, true);
+ if(!is_coord) {
+ long start=stats? System.nanoTime() : 0;
+ forwardTo(coord,target,sender,msg,true);
+ if(stats) {
+ forward_sm_time.addAndGet(System.nanoTime() - start);
+ forward_to_site_master.incrementAndGet();
+ }
+ }
else
route(target, sender, msg);
return null;
@@ -381,7 +477,8 @@ protected void handleMessage(Relay2Header hdr, Message msg) {
/**
* Routes the message to the target destination, used by a site master (coordinator)
* @param dest
- * @param buf
+ * @param sender the address of the sender
+ * @param msg The message
*/
protected void route(SiteAddress dest, SiteAddress sender, Message msg) {
short target_site=dest.getSite();
@@ -402,8 +499,9 @@ protected void route(SiteAddress dest, SiteAddress sender, Message msg) {
Relayer.Route route=tmp.getRoute(target_site);
if(route == null)
log.error(local_addr + ": no route to " + SiteUUID.getSiteName(target_site) + ": dropping message");
- else
+ else {
route.send(target_site, dest, sender, msg);
+ }
}
@@ -472,7 +570,12 @@ protected void deliverLocally(SiteAddress dest, SiteAddress sender, Message msg)
if(log.isTraceEnabled())
log.trace(local_addr + ": delivering message to " + dest + " in local cluster");
+ long start=stats? System.nanoTime() : 0;
forwardTo(local_dest, dest, sender, msg, send_to_coord);
+ if(stats) {
+ forward_to_local_mbr_time.addAndGet(System.nanoTime() - start);
+ forward_to_local_mbr.incrementAndGet();
+ }
}
@@ -481,7 +584,12 @@ protected void deliver(Address dest, Address sender, final Message msg) {
Message copy=copy(msg).dest(dest).src(sender);
if(log.isTraceEnabled())
log.trace(local_addr + ": delivering message from " + sender);
+ long start=stats? System.nanoTime() : 0;
up_prot.up(new Event(Event.MSG, copy));
+ if(stats) {
+ local_delivery_time.addAndGet(System.nanoTime() - start);
+ local_deliveries.incrementAndGet();
+ }
}
catch(Exception e) {
log.error("failed unmarshalling message", e);
@@ -32,6 +32,8 @@
* instance needs to be created */
protected volatile boolean done;
+ protected boolean stats;
+
// Used to store messages for a site with status UNKNOWN. Messages will be flushed when the status changes to UP, or
// a SITE-UNREACHABLE message will be sent to each member *once* when the status changes to DOWN
protected final ConcurrentMap<Short,BlockingQueue<Message>> fwd_queue=new ConcurrentHashMap<Short,BlockingQueue<Message>>();
@@ -45,6 +47,7 @@
public Relayer(RELAY2 relay, Log log, int num_routes) {
this.relay=relay;
+ stats=relay.statsEnabled();
this.log=log;
init(num_routes);
}
@@ -241,9 +244,14 @@ public void send(short target_site, Address final_destination, Address original_
// at this point status is RUNNING
if(log.isTraceEnabled())
log.trace("routing message to " + final_destination + " via " + site_master);
+ long start=stats? System.nanoTime() : 0;
try {
Message copy=createMessage(site_master, final_destination, original_sender, msg);
bridge.send(copy);
+ if(stats) {
+ relay.addToRelayedTime(System.nanoTime() - start);
+ relay.incrementRelayed();
+ }
}
catch(Exception e) {
log.error("failure relaying message", e);
@@ -30,7 +30,7 @@
private JChannel channel;
private Address local_addr;
private RpcDispatcher disp;
- static final String groupname="UTestRpc";
+ private String groupname="UTestRpc";
private boolean sync=false, oob=false, anycasting=false;
private int num_threads=1;
private int num_msgs=50000, msg_size=1000, print=num_msgs / 10;
@@ -56,7 +56,9 @@
}
- public void init(String props, String name) throws Exception {
+ public void init(String props, String name, String cluster_name) throws Exception {
+ if(cluster_name != null)
+ groupname=cluster_name;
channel=new JChannel(props);
if(name != null)
channel.setName(name);
@@ -173,7 +175,7 @@ void invokeRpcs() throws Throwable {
(anycasting? anycast_mbrs : destination) + ", sync=" + sync + ", oob=" + oob + ", anycasting=" + anycasting);
// The first call needs to be synchronous with OOB !
- RequestOptions options=new RequestOptions(ResponseMode.GET_ALL, 0, anycasting, null);
+ RequestOptions options=new RequestOptions(ResponseMode.GET_ALL, 15000, anycasting, null);
if(sync) options.setFlags(Message.DONT_BUNDLE);
if(oob) options.setFlags(Message.OOB);
@@ -385,6 +387,7 @@ public Object objectFromBuffer(byte[] buffer, int offset, int length) throws Exc
public static void main(String[] args) {
String props=null;
String name=null;
+ String cluster_name=null;
for(int i=0; i < args.length; i++) {
@@ -396,14 +399,18 @@ public static void main(String[] args) {
name=args[++i];
continue;
}
+ if("-cluster".endsWith(args[i])) {
+ cluster_name=args[++i];
+ continue;
+ }
help();
return;
}
UnicastTestRpc test=null;
try {
test=new UnicastTestRpc();
- test.init(props, name);
+ test.init(props, name, cluster_name);
test.eventLoop();
}
catch(Throwable ex) {
@@ -414,7 +421,7 @@ public static void main(String[] args) {
}
static void help() {
- System.out.println("UnicastTestRpc [-help] [-props <props>] [-name name]");
+ System.out.println("UnicastTestRpc [-help] [-props <props>] [-name name] [-cluster name]");
}

0 comments on commit 8364610

Please sign in to comment.