Skip to content
Browse files

Added throughput test with multiple threads

  • Loading branch information...
1 parent 4c04903 commit 4602a9ad9462e440cc090764db1545e791b1e4eb @belaban committed May 11, 2012
Showing with 52 additions and 11 deletions.
  1. +52 −11 tests/junit-functional/org/jgroups/protocols/RATE_LIMITER_Test.java
View
63 tests/junit-functional/org/jgroups/protocols/RATE_LIMITER_Test.java
@@ -23,9 +23,17 @@
*/
@Test(groups=Global.TIME_SENSITIVE, sequential=true)
public class RATE_LIMITER_Test {
+ final byte[] buffer=new byte[1];
- public void testThroughput() throws Exception {
- byte[] buffer=new byte[1000];
+ public void testThroughputSingleThreaded() throws Exception {
+ _testThroughput(1);
+ }
+
+ public void testThroughputMultiThreaded() throws Exception {
+ _testThroughput(10);
+ }
+
+ protected void _testThroughput(int num_threads) throws Exception {
RATE_LIMITER limiter=create(10, 100000);
long target_throughput=10000000; // 10MB/s
final CountDownLatch latch=new CountDownLatch(1);
@@ -36,14 +44,7 @@ public void testThroughput() throws Exception {
latch.countDown();
System.out.println("Measuring throughput for 10 seconds:");
- long target=TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS) + System.nanoTime();
- for(;;) {
- Message msg=new Message(false);
- msg.setBuffer(buffer);
- limiter.down(new Event(Event.MSG,msg));
- if(System.nanoTime() > target)
- break;
- }
+ sendMessages(limiter, TimeUnit.MILLISECONDS.convert(10, TimeUnit.SECONDS), num_threads);
throughput.stop();
List<Long> list=new ArrayList<Long>(throughput.getMeasurements());
@@ -64,6 +65,25 @@ public void testThroughput() throws Exception {
}
+ protected void sendMessages(RATE_LIMITER limiter, long duration, int num_threads) {
+ System.out.println("Measuring throughput for 10 seconds (" + num_threads + " threads):");
+ long target_time=TimeUnit.NANOSECONDS.convert(duration, TimeUnit.MILLISECONDS) + System.nanoTime();
+ Sender[] senders=new Sender[num_threads];
+ for(int i=0; i < senders.length; i++) {
+ senders[i]=new Sender(limiter, target_time);
+ senders[i].start();
+ }
+ for(Sender sender: senders) {
+ try {
+ sender.join();
+ }
+ catch(InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+
protected RATE_LIMITER create(long time_period, long max_bytes) throws Exception {
RATE_LIMITER prot=new RATE_LIMITER();
prot.setTimePeriod(time_period);
@@ -74,6 +94,27 @@ protected RATE_LIMITER create(long time_period, long max_bytes) throws Exception
}
+ protected class Sender extends Thread {
+ protected final RATE_LIMITER limiter;
+ protected final long target_time;
+
+ public Sender(RATE_LIMITER limiter, long target_time) {
+ this.limiter=limiter;
+ this.target_time=target_time;
+ }
+
+ public void run() {
+ for(;;) {
+ Message msg=new Message(false);
+ msg.setBuffer(buffer);
+ limiter.down(new Event(Event.MSG,msg));
+ if(System.nanoTime() > target_time)
+ break;
+ }
+ }
+ }
+
+
protected static class Throughput extends Protocol implements Runnable {
protected final AtomicInteger bytes_in_period=new AtomicInteger(0);
protected final Collection<Long> measurements=new ConcurrentLinkedQueue<Long>(); // measurement taken every second
@@ -143,6 +184,6 @@ public void run() {
public static void main(String[] args) throws Exception {
RATE_LIMITER_Test test=new RATE_LIMITER_Test();
- test.testThroughput();
+ test.testThroughputMultiThreaded();
}
}

0 comments on commit 4602a9a

Please sign in to comment.
Something went wrong with that request. Please try again.