Skip to content
Browse files

started implementation of message sending

  • Loading branch information...
1 parent 79c6060 commit 49d981e5dd2f90f3b6a2e00dca1d77a92224a20d @belaban committed Dec 9, 2011
Showing with 120 additions and 65 deletions.
  1. +120 −65 tests/perf/org/jgroups/tests/perf/MPerf.java
View
185 tests/perf/org/jgroups/tests/perf/MPerf.java
@@ -15,8 +15,8 @@
import java.lang.reflect.Field;
import java.text.NumberFormat;
import java.util.*;
-import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Dynamic tool to measure multicast performance of JGroups; every member sends N messages and we measure how long it
@@ -32,28 +32,27 @@
protected Address local_addr=null;
protected int num_msgs=1000 * 1000;
- protected int warmup=200000;
protected int msg_size=1000;
protected int num_threads=1;
protected int log_interval=num_msgs / 10; // log every 10%
/** Maintains stats per sender, will be sent to perf originator when all messages have been received */
- protected final Map<Address,MemberInfo> senders=Util.createConcurrentMap();
+ protected final Map<Address,Result> received_msgs=Util.createConcurrentMap();
protected final List<Address> members=new ArrayList<Address>();
protected final Log log=LogFactory.getLog(getClass());
protected boolean looping=true;
/** Map<Object, MemberInfo>. A hashmap of senders, each value is the 'senders' hashmap */
protected final Map<Object,MemberInfo> results=Util.createConcurrentMap();
protected FileWriter output;
- protected static final NumberFormat f=NumberFormat.getNumberInstance();
+ protected static final NumberFormat format=NumberFormat.getNumberInstance();
protected static final short ID=ClassConfigurator.getProtocolId(MPerf.class);
static {
- f.setGroupingUsed(false);
- f.setMaximumFractionDigits(2);
+ format.setGroupingUsed(false);
+ format.setMaximumFractionDigits(2);
}
@@ -88,22 +87,24 @@ public void start(String props, String name) throws Exception {
// send a CONFIG_REQ to the current coordinator, so we can get the current config
Address coord=channel.getView().getMembers().get(0);
if(coord != null && !local_addr.equals(coord))
- sendConfigRequest(coord);
+ send(coord,null,MPerfHeader.CONFIG_REQ,true);
}
protected void loop() {
int c;
final String INPUT="[1] Send [2] View\n" +
- "[3] Set num msgs (%d) [4] Set warmup (%d) [5] Set msg size (%s) [6] Set threads (%d)\n" +
+ "[3] Set num msgs (%d) [4] Set msg size (%s) [5] Set threads (%d)\n" +
"[x] Exit this [X] Exit all ";
while(looping) {
try {
- c=Util.keyPress(String.format(INPUT, num_msgs,warmup, Util.printBytes(msg_size), num_threads));
+ c=Util.keyPress(String.format(INPUT, num_msgs, Util.printBytes(msg_size), num_threads));
switch(c) {
case '1':
+ send(null, null, MPerfHeader.CLEAR_RESULTS, true); // clear all results (from prev runs) first
+ send(null, null, MPerfHeader.START_SENDING, false);
break;
case '2':
System.out.println("view: " + channel.getView() + " (local address=" + channel.getAddress() + ")");
@@ -112,19 +113,16 @@ protected void loop() {
configChange("num_msgs");
break;
case '4':
- configChange("warmup");
- break;
- case '5':
configChange("msg_size");
break;
- case '6':
+ case '5':
configChange("num_threads");
break;
case 'x':
looping=false;
break;
case 'X':
- sendExit();
+ send(null,null,MPerfHeader.EXIT,false);
break;
}
}
@@ -137,30 +135,19 @@ protected void loop() {
protected void configChange(String name) throws Exception {
int tmp=Util.readIntFromStdin(name + ": ");
+ if(tmp < 1)
+ throw new IllegalArgumentException("illegal value");
ConfigChange change=new ConfigChange(name, tmp);
- Message msg=new Message(null, null, change);
- msg.setFlag(Message.Flag.RSVP);
- msg.putHeader(ID,new MPerfHeader(MPerfHeader.CONFIG_CHANGE));
- channel.send(msg);
- }
-
- protected void sendExit() throws Exception {
- Message msg=new Message();
- msg.putHeader(ID, new MPerfHeader(MPerfHeader.EXIT));
- channel.send(msg);
+ send(null,change,MPerfHeader.CONFIG_CHANGE,true);
}
- // Send a CONFIG_REQ to the current coordinator
- protected void sendConfigRequest(Address coord) throws Exception {
- Message msg=new Message(coord);
- msg.setFlag(Message.Flag.RSVP);
- msg.putHeader(ID, new MPerfHeader(MPerfHeader.CONFIG_REQ));
- channel.send(msg);
- }
- protected void sendConfigurationResponse(Address target, Configuration cfg) throws Exception {
- Message msg=new Message(target, null, cfg);
- msg.putHeader(ID, new MPerfHeader(MPerfHeader.CONFIG_RSP));
+ protected void send(Address target, Object payload, byte header, boolean rsvp) throws Exception {
+ Message msg=new Message(target, null, payload);
+ if(rsvp)
+ msg.setFlag(Message.Flag.RSVP);
+ if(header > 0)
+ msg.putHeader(ID, new MPerfHeader(header));
channel.send(msg);
}
@@ -203,6 +190,16 @@ public void receive(Message msg) {
MPerfHeader hdr=(MPerfHeader)msg.getHeader(ID);
switch(hdr.type) {
case MPerfHeader.DATA:
+ // System.out.println("<< received " + Util.printBytes(msg.getLength()) + " from " + msg.getSrc());
+ break;
+
+ case MPerfHeader.START_SENDING:
+ sendMessages();
+ break;
+
+ case MPerfHeader.CLEAR_RESULTS:
+ for(Result result: received_msgs.values())
+ result.reset();
break;
case MPerfHeader.CONFIG_CHANGE:
@@ -252,10 +249,9 @@ protected void handleConfigChange(ConfigChange config_change) {
protected void handleConfigRequest(Address sender) throws Exception {
Configuration cfg=new Configuration();
cfg.addChange("num_msgs", num_msgs);
- cfg.addChange("warmup", warmup);
cfg.addChange("msg_size", msg_size);
cfg.addChange("num_threads", num_threads);
- sendConfigurationResponse(sender, cfg);
+ send(sender,cfg,MPerfHeader.CONFIG_RSP,false);
}
protected void handleConfigResponse(Configuration cfg) {
@@ -269,38 +265,59 @@ public void viewAccepted(View view) {
System.out.println("** " + view);
}
+ protected void sendMessages() {
+ final AtomicInteger num_msgs_sent=new AtomicInteger(0); // all threads will increment this
+ final Sender[] senders=new Sender[num_threads];
+ final CyclicBarrier barrier=new CyclicBarrier(num_threads +1);
+ final byte[] payload=new byte[msg_size];
+
+ for(int i=0; i < num_threads; i++) {
+ senders[i]=new Sender(barrier, num_msgs_sent, payload);
+ senders[i].setName("sender-" + i);
+ senders[i].start();
+ }
+ try {
+ System.out.println("-- sending " + num_msgs + " msgs");
+ barrier.await();
+ }
+ catch(Exception e) {
+ System.err.println("failed triggering send threads: " + e);
+ }
+
+ // for(Sender sender: senders) {try {sender.join();} catch(InterruptedException e) {}}
+
+
+
+ }
+
protected class Sender extends Thread {
- CyclicBarrier barrier;
- byte[] payload;
- long total_msgs=0;
- long thread_interval;
+ protected final CyclicBarrier barrier;
+ protected final AtomicInteger num_msgs_sent;
+ protected final byte[] payload;
- Sender(CyclicBarrier barrier, byte[] payload, long thread_interval) {
+ protected Sender(CyclicBarrier barrier, AtomicInteger num_msgs_sent, byte[] payload) {
this.barrier=barrier;
+ this.num_msgs_sent=num_msgs_sent;
this.payload=payload;
- this.thread_interval=thread_interval;
}
public void run() {
try {
barrier.await();
}
- catch(InterruptedException e) {
+ catch(Exception e) {
e.printStackTrace();
return;
}
- catch(BrokenBarrierException e) {
- e.printStackTrace();
- return;
- }
- System.out.println("-- [" + getName() + "] sending " + num_msgs + " msgs");
- for(int i=0; i < num_msgs; i++) {
+
+ for(;;) {
try {
- channel.send(null, payload);
- total_msgs++;
- if(total_msgs % log_interval == 0) {
- System.out.println("++ sent " + total_msgs + " [" + getName() + "]");
- }
+ send(null, payload, MPerfHeader.DATA, false);
+ int tmp=num_msgs_sent.incrementAndGet();
+ if(tmp > num_msgs)
+ break;
+ if(tmp % log_interval == 0)
+ System.out.println("++ sent " + tmp + " [" + getName() + "]");
}
catch(Exception e) {
}
@@ -336,7 +353,7 @@ private void dumpResults(Map<Object,MemberInfo> final_results) {
combined_msgs_sec=tmp / final_results.size();
combined_tp=(long)combined_msgs_sec * msg_size;
- sb.append("\ncombined: ").append(f.format(combined_msgs_sec)).
+ sb.append("\ncombined: ").append(format.format(combined_msgs_sec)).
append(" msgs/sec averaged over all receivers (throughput=" + Util.printBytes(combined_tp) + "/sec)\n");
System.out.println(sb.toString());
output(sb.toString());
@@ -380,7 +397,7 @@ private String dumpStats(long received_msgs) {
msgs_sec=322649;
throughput_sec=msgs_sec * msg_size;
- sb.append(f.format(msgs_sec)).append(' ').append(f.format(throughput_sec)).append(' ');
+ sb.append(format.format(msgs_sec)).append(' ').append(format.format(throughput_sec)).append(' ');
sb.append(Runtime.getRuntime().freeMemory() / 1000.0).append(' ');
@@ -491,16 +508,54 @@ public void readFrom(DataInput in) throws Exception {
}
}
+ protected static class Result implements Streamable {
+ protected long start=0;
+ protected long stop=0; // done when > 0
+ protected long num_msgs_received=0;
+ protected long num_bytes_received=0;
+
+ public void reset() {
+ start=stop=num_msgs_received=num_bytes_received=0;
+ }
+
+ public boolean isDone() {return stop > 0;}
+
+ public int size() {
+ return Util.size(start) + Util.size(stop) + Util.size(num_msgs_received) + Util.size(num_bytes_received);
+ }
+
+ public void add(long bytes) {
+ num_bytes_received+=bytes;
+ num_msgs_received++;
+ }
+
+ public void writeTo(DataOutput out) throws Exception {
+ Util.writeLong(start, out);
+ Util.writeLong(stop, out);
+ Util.writeLong(num_msgs_received, out);
+ Util.writeLong(num_bytes_received, out);
+ }
+
+ public void readFrom(DataInput in) throws Exception {
+ start=Util.readLong(in);
+ stop=Util.readLong(in);
+ num_msgs_received=Util.readLong(in);
+ num_bytes_received=Util.readLong(in);
+ }
+ }
+
protected static class MPerfHeader extends Header {
- protected static final byte DATA = 1;
- protected static final byte START_SENDING = 2;
- protected static final byte SENDING_DONE = 3;
- protected static final byte RESULTS_REQ = 4;
- protected static final byte RESULTS_RSP = 5;
- protected static final byte CONFIG_CHANGE = 6;
- protected static final byte CONFIG_REQ = 7;
- protected static final byte CONFIG_RSP = 8;
- protected static final byte EXIT = 9;
+ protected static final byte DATA = 1;
+ protected static final byte START_SENDING = 2;
+ protected static final byte SENDING_DONE = 3;
+ protected static final byte RESULTS_REQ = 4;
+ protected static final byte RESULTS_RSP = 5;
+ protected static final byte CLEAR_RESULTS = 6;
+ protected static final byte CONFIG_CHANGE = 7;
+ protected static final byte CONFIG_REQ = 8;
+ protected static final byte CONFIG_RSP = 9;
+ protected static final byte EXIT = 10;
+
protected byte type;

0 comments on commit 49d981e

Please sign in to comment.
Something went wrong with that request. Please try again.