Skip to content

Commit

Permalink
RoundTrip now does unicasts
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed Apr 29, 2016
1 parent 68e4904 commit 212e3cd
Showing 1 changed file with 119 additions and 45 deletions.
164 changes: 119 additions & 45 deletions tests/other/org/jgroups/tests/RoundTrip.java
@@ -1,21 +1,27 @@
package org.jgroups.tests;

import org.jgroups.*;
import org.jgroups.util.Average;
import org.jgroups.util.AverageMinMax;
import org.jgroups.util.Bits;
import org.jgroups.util.Promise;
import org.jgroups.util.Util;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Class that measure RTT for multicast messages between 2 cluster members. See {@link RpcDispatcherSpeedTest} for
* RPCs
* @author Bela Ban
*/
public class RoundTrip extends ReceiverAdapter {
protected JChannel channel;
protected final Promise<Boolean> promise=new Promise<>();
protected int num_msgs=5000;
protected int msg_size=1000;
protected boolean oob, dont_bundle;
protected int num_senders=1; // number of sender threads
protected boolean oob=true, dont_bundle, details;
protected static final byte REQ=0, RSP=1;
protected Sender[] senders;



protected void start(String props, String name) throws Exception {
Expand All @@ -34,17 +40,28 @@ protected void start(String props, String name) throws Exception {
* @param msg
*/
public void receive(Message msg) {
if(msg.getLength() > 0) { // request: send unicast response
Message rsp=new Message(msg.src());
try {
channel.send(rsp);
}
catch(Exception e) {
e.printStackTrace();
}
byte[] req_buf=msg.getRawBuffer();
byte type=req_buf[0];
short id=Bits.readShort(req_buf, 1);
switch(type) {
case REQ:
byte[] rsp_buf=new byte[Global.BYTE_SIZE + Global.SHORT_SIZE];
rsp_buf[0]=RSP;
Bits.writeShort(id, rsp_buf, 1);
Message rsp=new Message(msg.src(), rsp_buf);
try {
channel.send(rsp);
}
catch(Exception e) {
e.printStackTrace();
}
break;
case RSP:
senders[id].promise.setResult(true); // notify the sender of the response
break;
default:
throw new IllegalArgumentException("first byte needs to be either REQ or RSP but not " + req_buf[0]);
}
else
promise.setResult(true);
}

public void viewAccepted(View view) {
Expand All @@ -54,8 +71,9 @@ public void viewAccepted(View view) {
protected void loop() {
boolean looping=true;
while(looping) {
int c=Util.keyPress(String.format("[1] send [2] num_msgs (%d) [3] msg_size (%d) [o] oob (%b) [b] dont_bundle (%b)[x] exit\n",
num_msgs, msg_size, oob, dont_bundle));
int c=Util.keyPress(String.format("[1] send [2] num_msgs (%d) [3] senders (%d)\n" +
"[o] oob (%b) [b] dont_bundle (%b) [d] details (%b) [x] exit\n",
num_msgs, num_senders, oob, dont_bundle, details));
try {
switch(c) {
case '1':
Expand All @@ -65,18 +83,17 @@ protected void loop() {
num_msgs=Util.readIntFromStdin("num_msgs: ");
break;
case '3':
int tmp=Util.readIntFromStdin("msg_size: ");
if(tmp <= 0)
System.err.printf("msg_size of %d is invalid\n", tmp);
else
msg_size=tmp;
num_senders=Util.readIntFromStdin("num_senders: ");
break;
case 'o':
oob=!oob;
break;
case 'b':
dont_bundle=!dont_bundle;
break;
case 'd':
details=!details;
break;
case 'x':
looping=false;
break;
Expand All @@ -96,34 +113,91 @@ protected void sendRequests() throws Exception {
}

Address target=Util.pickNext(view.getMembers(), channel.getAddress());
byte[] buf=new byte[msg_size];
Average avg=new Average();
long min=Long.MAX_VALUE, max=0;
int print=num_msgs/10;
System.out.printf("-- sending %d requests to %s (size=%d)\n", num_msgs, target, msg_size);

for(int i=0; i < num_msgs; i++) {
Message req=new Message(target, buf);
if(oob)
req.setFlag(Message.Flag.OOB);
if(dont_bundle)
req.setFlag(Message.Flag.DONT_BUNDLE);
promise.reset(false);
long start=System.nanoTime();
channel.send(req);
if(i > 0 && i % print == 0)
System.out.print(".");
promise.getResult(0);
long time_ns=System.nanoTime()-start;
avg.add(time_ns);
min=Math.min(min, time_ns);
max=Math.max(max, time_ns);
final CountDownLatch latch=new CountDownLatch(num_senders +1);
final AtomicInteger sent_msgs=new AtomicInteger(0);
senders=new Sender[num_senders];
for(int i=0; i < num_senders; i++) {
senders[i]=new Sender((short)i, latch, sent_msgs, target);
senders[i].start();
}

long start=System.nanoTime();
latch.countDown(); // start all sender threads
for(Sender sender: senders)
sender.join();
long total_time=System.nanoTime()-start;
double msgs_sec=num_msgs / (total_time / 1_000_000_000.0);

AverageMinMax avg=null;
if(details)
System.out.println("");
for(Sender sender: senders) {
if(details)
System.out.printf("%d: %s\n", sender.id, print(sender.avg));
if(avg == null)
avg=sender.avg;
else
avg.merge(sender.avg);
}
System.out.println("");
System.out.printf("\nround-trip = min/avg/max: %.2f / %.2f / %.2f us\n\n", min/1000.0, avg.getAverage() / 1000.0, max/1000.0);

System.out.printf(Util.bold("\n\nreqs/sec = %.2f, " +
"round-trip = min/avg/max: %.2f / %.2f / %.2f us\n\n"),
msgs_sec, avg.min()/1000.0, avg.average() / 1000.0, avg.max()/1000.0);
}

protected static String print(AverageMinMax avg) {
return String.format("round-trip min/avg/max = %.2f / %.2f / %.2f us",
avg.min() / 1000.0, avg.average() / 1000.0, avg.max() / 1000.0);
}


protected class Sender extends Thread {
protected final short id;
protected final byte[] req_buf=new byte[Global.BYTE_SIZE + Global.SHORT_SIZE]; // request and id
protected final CountDownLatch latch;
protected final AtomicInteger sent_msgs; // current number of messages; senders stop if sent_msgs >= num_msgs
protected final Promise<Boolean> promise=new Promise<>(); // for the sender thread to wait for the response
protected final int print;
protected final Address target;
protected final AverageMinMax avg=new AverageMinMax(); // in ns

public Sender(short id, CountDownLatch latch, AtomicInteger sent_msgs, Address target) {
this.id=id;
this.latch=latch;
this.sent_msgs=sent_msgs;
this.target=target;
req_buf[0]=REQ;
Bits.writeShort(id, req_buf, 1); // writes id at buf_reqs[1-2]
print=num_msgs / 10;
}

public void run() {
for(;;) {
int num=sent_msgs.incrementAndGet();
if(num >= num_msgs)
break;
if(num > 0 && num % print == 0)
System.out.printf(".");
promise.reset(false);
Message req=new Message(target, req_buf);
if(oob)
req.setFlag(Message.Flag.OOB);
if(dont_bundle)
req.setFlag(Message.Flag.DONT_BUNDLE);
try {
long start=System.nanoTime();
channel.send(req);
promise.getResult(0);
long time=System.nanoTime()-start;
avg.add(time);
}
catch(Exception e) {
e.printStackTrace();
}
}
}
}


public static void main(String[] args) throws Exception {
String props=null, name=null;
Expand Down

0 comments on commit 212e3cd

Please sign in to comment.