Skip to content

Commit

Permalink
Code optimizations in RATE_LIMITER and throughput test
Browse files Browse the repository at this point in the history
  • Loading branch information
belaban committed May 11, 2012
1 parent 3bccdf3 commit 4c04903
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 24 deletions.
33 changes: 13 additions & 20 deletions src/org/jgroups/protocols/RATE_LIMITER.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ public class RATE_LIMITER extends Protocol {


@Property(description="Max number of bytes to be sent in time_period ms. Blocks the sender if exceeded until a new " + @Property(description="Max number of bytes to be sent in time_period ms. Blocks the sender if exceeded until a new " +
"time period has started") "time period has started")
protected long max_bytes=500000; protected long max_bytes=300000;


@Property(description="Number of milliseconds during which max_bytes bytes can be sent") @Property(description="Number of milliseconds during which max_bytes bytes can be sent")
protected long time_period=1000L; protected long time_period=10L;


protected long time_period_ns=TimeUnit.NANOSECONDS.convert(time_period, TimeUnit.MILLISECONDS); protected long time_period_ns;




/** Keeps track of the number of bytes sent in the current time period */ /** Keeps track of the number of bytes sent in the current time period */
Expand Down Expand Up @@ -86,6 +86,7 @@ public void init() throws Exception {
super.init(); super.init();
if(time_period <= 0) if(time_period <= 0)
throw new IllegalArgumentException("time_period needs to be positive"); throw new IllegalArgumentException("time_period needs to be positive");
time_period_ns=TimeUnit.NANOSECONDS.convert(time_period, TimeUnit.MILLISECONDS);
} }


public void start() throws Exception { public void start() throws Exception {
Expand Down Expand Up @@ -115,25 +116,17 @@ public Object down(Event evt) {
max_bytes=len; max_bytes=len;
} }


while(running) { if(num_bytes_sent_in_period + len > max_bytes) { // size exceeded
long current_time=System.nanoTime(); long current_time=System.nanoTime();
boolean size_exceeded=num_bytes_sent_in_period + len > max_bytes, if(current_time < end_of_current_period) {
time_exceeded=current_time >= end_of_current_period; long block_time=end_of_current_period - current_time;
if(!size_exceeded && !time_exceeded) LockSupport.parkNanos(block_time);
break; num_blockings++;

total_block_time+=block_time;
if(time_exceeded) { current_time=end_of_current_period; // more or less, avoid having to call nanoTime() again
num_bytes_sent_in_period=0L;
end_of_current_period=current_time + time_period_ns;
}
else { // size exceeded
long block_time=current_time - end_of_current_period;
if(block_time > 0) {
LockSupport.parkNanos(block_time);
num_blockings++;
total_block_time+=block_time;
}
} }
end_of_current_period=current_time + time_period_ns; // start a new time period
num_bytes_sent_in_period=0;
} }
} }
finally { finally {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport; import java.util.concurrent.locks.LockSupport;




Expand All @@ -26,7 +25,7 @@
public class RATE_LIMITER_Test { public class RATE_LIMITER_Test {


public void testThroughput() throws Exception { public void testThroughput() throws Exception {
byte[] buffer=new byte[1]; byte[] buffer=new byte[1000];
RATE_LIMITER limiter=create(10, 100000); RATE_LIMITER limiter=create(10, 100000);
long target_throughput=10000000; // 10MB/s long target_throughput=10000000; // 10MB/s
final CountDownLatch latch=new CountDownLatch(1); final CountDownLatch latch=new CountDownLatch(1);
Expand Down Expand Up @@ -58,9 +57,9 @@ public void testThroughput() throws Exception {


// 50% deviation below is ok: e.g. max_bytes=100000, and sending 50001 bytes ! // 50% deviation below is ok: e.g. max_bytes=100000, and sending 50001 bytes !
// In the real setup, there a warning if the system is not configured for max_bytes to be a multiple of frag_size // In the real setup, there a warning if the system is not configured for max_bytes to be a multiple of frag_size
long min_value=(long)(target_throughput * 0.45); long min_value=(long)(target_throughput * 0.45), max_value=(long)(target_throughput * 1.1);
for(long value: list) { for(long value: list) {
assert value >= min_value && value <= target_throughput; assert value >= min_value && value <= max_value;
} }
} }


Expand Down

0 comments on commit 4c04903

Please sign in to comment.