From 212e3cd3fc7a2bd4902b0280272503b398dfe6ef Mon Sep 17 00:00:00 2001 From: Bela Ban Date: Fri, 29 Apr 2016 15:54:44 +0200 Subject: [PATCH] RoundTrip now does unicasts --- tests/other/org/jgroups/tests/RoundTrip.java | 164 ++++++++++++++----- 1 file changed, 119 insertions(+), 45 deletions(-) diff --git a/tests/other/org/jgroups/tests/RoundTrip.java b/tests/other/org/jgroups/tests/RoundTrip.java index b3dd6e301ee..7d3c484a80a 100644 --- a/tests/other/org/jgroups/tests/RoundTrip.java +++ b/tests/other/org/jgroups/tests/RoundTrip.java @@ -1,10 +1,14 @@ package org.jgroups.tests; import org.jgroups.*; -import org.jgroups.util.Average; +import org.jgroups.util.AverageMinMax; +import org.jgroups.util.Bits; import org.jgroups.util.Promise; import org.jgroups.util.Util; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + /** * Class that measure RTT for multicast messages between 2 cluster members. See {@link RpcDispatcherSpeedTest} for * RPCs @@ -12,10 +16,12 @@ */ public class RoundTrip extends ReceiverAdapter { protected JChannel channel; - protected final Promise promise=new Promise<>(); protected int num_msgs=5000; - protected int msg_size=1000; - protected boolean oob, dont_bundle; + protected int num_senders=1; // number of sender threads + protected boolean oob=true, dont_bundle, details; + protected static final byte REQ=0, RSP=1; + protected Sender[] senders; + protected void start(String props, String name) throws Exception { @@ -34,17 +40,28 @@ protected void start(String props, String name) throws Exception { * @param msg */ public void receive(Message msg) { - if(msg.getLength() > 0) { // request: send unicast response - Message rsp=new Message(msg.src()); - try { - channel.send(rsp); - } - catch(Exception e) { - e.printStackTrace(); - } + byte[] req_buf=msg.getRawBuffer(); + byte type=req_buf[0]; + short id=Bits.readShort(req_buf, 1); + switch(type) { + case REQ: + byte[] rsp_buf=new byte[Global.BYTE_SIZE + Global.SHORT_SIZE]; + rsp_buf[0]=RSP; + Bits.writeShort(id, rsp_buf, 1); + Message rsp=new Message(msg.src(), rsp_buf); + try { + channel.send(rsp); + } + catch(Exception e) { + e.printStackTrace(); + } + break; + case RSP: + senders[id].promise.setResult(true); // notify the sender of the response + break; + default: + throw new IllegalArgumentException("first byte needs to be either REQ or RSP but not " + req_buf[0]); } - else - promise.setResult(true); } public void viewAccepted(View view) { @@ -54,8 +71,9 @@ public void viewAccepted(View view) { protected void loop() { boolean looping=true; while(looping) { - int c=Util.keyPress(String.format("[1] send [2] num_msgs (%d) [3] msg_size (%d) [o] oob (%b) [b] dont_bundle (%b)[x] exit\n", - num_msgs, msg_size, oob, dont_bundle)); + int c=Util.keyPress(String.format("[1] send [2] num_msgs (%d) [3] senders (%d)\n" + + "[o] oob (%b) [b] dont_bundle (%b) [d] details (%b) [x] exit\n", + num_msgs, num_senders, oob, dont_bundle, details)); try { switch(c) { case '1': @@ -65,11 +83,7 @@ protected void loop() { num_msgs=Util.readIntFromStdin("num_msgs: "); break; case '3': - int tmp=Util.readIntFromStdin("msg_size: "); - if(tmp <= 0) - System.err.printf("msg_size of %d is invalid\n", tmp); - else - msg_size=tmp; + num_senders=Util.readIntFromStdin("num_senders: "); break; case 'o': oob=!oob; @@ -77,6 +91,9 @@ protected void loop() { case 'b': dont_bundle=!dont_bundle; break; + case 'd': + details=!details; + break; case 'x': looping=false; break; @@ -96,34 +113,91 @@ protected void sendRequests() throws Exception { } Address target=Util.pickNext(view.getMembers(), channel.getAddress()); - byte[] buf=new byte[msg_size]; - Average avg=new Average(); - long min=Long.MAX_VALUE, max=0; - int print=num_msgs/10; - System.out.printf("-- sending %d requests to %s (size=%d)\n", num_msgs, target, msg_size); - - for(int i=0; i < num_msgs; i++) { - Message req=new Message(target, buf); - if(oob) - req.setFlag(Message.Flag.OOB); - if(dont_bundle) - req.setFlag(Message.Flag.DONT_BUNDLE); - promise.reset(false); - long start=System.nanoTime(); - channel.send(req); - if(i > 0 && i % print == 0) - System.out.print("."); - promise.getResult(0); - long time_ns=System.nanoTime()-start; - avg.add(time_ns); - min=Math.min(min, time_ns); - max=Math.max(max, time_ns); + final CountDownLatch latch=new CountDownLatch(num_senders +1); + final AtomicInteger sent_msgs=new AtomicInteger(0); + senders=new Sender[num_senders]; + for(int i=0; i < num_senders; i++) { + senders[i]=new Sender((short)i, latch, sent_msgs, target); + senders[i].start(); + } + + long start=System.nanoTime(); + latch.countDown(); // start all sender threads + for(Sender sender: senders) + sender.join(); + long total_time=System.nanoTime()-start; + double msgs_sec=num_msgs / (total_time / 1_000_000_000.0); + + AverageMinMax avg=null; + if(details) + System.out.println(""); + for(Sender sender: senders) { + if(details) + System.out.printf("%d: %s\n", sender.id, print(sender.avg)); + if(avg == null) + avg=sender.avg; + else + avg.merge(sender.avg); } - System.out.println(""); - System.out.printf("\nround-trip = min/avg/max: %.2f / %.2f / %.2f us\n\n", min/1000.0, avg.getAverage() / 1000.0, max/1000.0); + + System.out.printf(Util.bold("\n\nreqs/sec = %.2f, " + + "round-trip = min/avg/max: %.2f / %.2f / %.2f us\n\n"), + msgs_sec, avg.min()/1000.0, avg.average() / 1000.0, avg.max()/1000.0); + } + + protected static String print(AverageMinMax avg) { + return String.format("round-trip min/avg/max = %.2f / %.2f / %.2f us", + avg.min() / 1000.0, avg.average() / 1000.0, avg.max() / 1000.0); } + protected class Sender extends Thread { + protected final short id; + protected final byte[] req_buf=new byte[Global.BYTE_SIZE + Global.SHORT_SIZE]; // request and id + protected final CountDownLatch latch; + protected final AtomicInteger sent_msgs; // current number of messages; senders stop if sent_msgs >= num_msgs + protected final Promise promise=new Promise<>(); // for the sender thread to wait for the response + protected final int print; + protected final Address target; + protected final AverageMinMax avg=new AverageMinMax(); // in ns + + public Sender(short id, CountDownLatch latch, AtomicInteger sent_msgs, Address target) { + this.id=id; + this.latch=latch; + this.sent_msgs=sent_msgs; + this.target=target; + req_buf[0]=REQ; + Bits.writeShort(id, req_buf, 1); // writes id at buf_reqs[1-2] + print=num_msgs / 10; + } + + public void run() { + for(;;) { + int num=sent_msgs.incrementAndGet(); + if(num >= num_msgs) + break; + if(num > 0 && num % print == 0) + System.out.printf("."); + promise.reset(false); + Message req=new Message(target, req_buf); + if(oob) + req.setFlag(Message.Flag.OOB); + if(dont_bundle) + req.setFlag(Message.Flag.DONT_BUNDLE); + try { + long start=System.nanoTime(); + channel.send(req); + promise.getResult(0); + long time=System.nanoTime()-start; + avg.add(time); + } + catch(Exception e) { + e.printStackTrace(); + } + } + } + } + public static void main(String[] args) throws Exception { String props=null, name=null;