Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: belaban/JGroups
base: e316b51
...
head fork: belaban/JGroups
compare: 8f0a71b
Checking mergeability… Don't worry, you can still create the pull request.
  • 2 commits
  • 1 file changed
  • 0 commit comments
  • 1 contributor
Showing with 39 additions and 32 deletions.
  1. +39 −32 src/org/jgroups/protocols/RATE_LIMITER.java
View
71 src/org/jgroups/protocols/RATE_LIMITER.java
@@ -6,8 +6,8 @@
import org.jgroups.stack.Protocol;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
/**
@@ -29,22 +29,47 @@
/** Keeps track of the number of bytes sent in the current time period */
@GuardedBy("lock")
- @ManagedAttribute
+ @ManagedAttribute(description="Number of bytes sent in the current time period. Reset after every time period.")
protected long num_bytes_sent=0L;
@GuardedBy("lock")
- protected long end_of_current_period=0L;
+ protected long end_of_current_period=0L; // ns
protected final Lock lock=new ReentrantLock();
- protected final Condition block=lock.newCondition();
@ManagedAttribute
protected int num_blockings=0;
- @ManagedAttribute
- protected long total_block_time=0L;
+ protected long total_block_time=0L; // ns
+
+ protected volatile boolean running=true;
+
+
+ @ManagedAttribute(description="Total block time in milliseconds")
+ public long getTotalBlockTime() {
+ return TimeUnit.MILLISECONDS.convert(total_block_time, TimeUnit.NANOSECONDS);
+ }
+
+ @ManagedAttribute(description="Average block time in ms (total block time / number of blockings)")
+ public double getAverageBlockTime() {
+ long block_time_ms=getTotalBlockTime();
+ return num_blockings == 0? 0.0 : block_time_ms / (double)num_blockings;
+ }
+
+ public void resetStats() {
+ super.resetStats();
+ num_blockings=0; total_block_time=0;
+ }
+ public void start() throws Exception {
+ super.start();
+ running=true;
+ }
+ public void stop() {
+ running=false;
+ super.stop();
+ }
public Object down(Event evt) {
if(evt.getType() == Event.MSG) {
@@ -59,25 +84,22 @@ public Object down(Event evt) {
max_bytes=len;
}
- while(true) {
+ while(running) {
boolean size_exceeded=num_bytes_sent + len >= max_bytes,
- time_exceeded=System.currentTimeMillis() > end_of_current_period;
+ time_exceeded=System.nanoTime() >= end_of_current_period;
if(!size_exceeded && !time_exceeded)
break;
if(time_exceeded) {
- reset();
+ num_bytes_sent=0L;
+ end_of_current_period=System.nanoTime() + TimeUnit.NANOSECONDS.convert(time_period, TimeUnit.MILLISECONDS);
}
else { // size exceeded
- long block_time=end_of_current_period - System.currentTimeMillis();
+ long block_time=end_of_current_period - System.nanoTime();
if(block_time > 0) {
- try {
- block.await(block_time, TimeUnit.MILLISECONDS);
- num_blockings++;
- total_block_time+=block_time;
- }
- catch(InterruptedException e) {
- }
+ LockSupport.parkNanos(block_time);
+ num_blockings++;
+ total_block_time+=block_time;
}
}
}
@@ -100,20 +122,5 @@ public void init() throws Exception {
throw new IllegalArgumentException("time_period needs to be positive");
}
- public void stop() {
- super.stop();
- lock.lock();
- try {
- reset();
- }
- finally {
- lock.unlock();
- }
- }
- protected void reset() {
- num_bytes_sent=0L;
- end_of_current_period=System.currentTimeMillis() + time_period;
- block.signalAll();
- }
}

No commit comments for this range

Something went wrong with that request. Please try again.