Skip to content

Commit

Permalink
simplify HWT logic and make better use of inaccuracy, see #2354
Browse files Browse the repository at this point in the history
- use actual time after wake-up to determine runnable tasks, not
  precalculated and aimed-for deadline
  • Loading branch information
rkuhn authored and viktorklang committed Aug 6, 2012
1 parent 10ed24d commit cb4e353
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public class HashedWheelTimer implements Timer {
final ReusableIterator<HashedWheelTimeout>[] iterators;
final int mask;
final ReadWriteLock lock = new ReentrantReadWriteLock();
final boolean isWindows = System.getProperty("os.name", "").toLowerCase().indexOf("win") >= 0;
volatile int wheelCursor;
private LoggingAdapter logger;

Expand Down Expand Up @@ -265,23 +266,18 @@ public Timeout newTimeout(TimerTask task, Duration delay) {
}

void scheduleTimeout(HashedWheelTimeout timeout, long delay) {
// delay must be equal to or greater than tickDuration so that the
// worker thread never misses the timeout.
if (delay < tickDuration) {
delay = tickDuration;
}

// Prepare the required parameters to schedule the timeout object.
final long lastRoundDelay = delay % roundDuration;
final long lastTickDelay = delay % tickDuration;
final long relativeIndex = lastRoundDelay / tickDuration + (lastTickDelay != 0? 1 : 0);
final long remainingRounds = delay / roundDuration - (delay % roundDuration == 0? 1 : 0);
long relativeIndex = (delay + tickDuration - 1) / tickDuration;
if (relativeIndex == 0) {
relativeIndex = 1;
}
final long remainingRounds = relativeIndex / wheel.length;

// Add the timeout to the wheel.
lock.readLock().lock();
try {
if (shutdown) throw new IllegalStateException("cannot enqueue after shutdown");
int stopIndex = (int) (wheelCursor + relativeIndex & mask);
int stopIndex = (int) ((wheelCursor + relativeIndex) & mask);
timeout.stopIndex = stopIndex;
timeout.remainingRounds = remainingRounds;
wheel[stopIndex].add(timeout);
Expand Down Expand Up @@ -387,30 +383,35 @@ private void notifyExpiredTimeouts(
}

private long waitForNextTick() {
long deadline = startTime + tickDuration * tick;
final long deadline = startTime + tickDuration * tick;

for (;;) {
final long currentTime = System.nanoTime();
final long sleepTime = (tickDuration * tick - (currentTime - startTime));

if (sleepTime <= 0) {
break;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

if (sleepTimeMs <= 0) {
tick += 1;
return currentTime;
}

// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (isWindows) {
sleepTimeMs = (sleepTimeMs / 10) * 10;
}

try {
long milliSeconds = TimeUnit.NANOSECONDS.toMillis(sleepTime);
int nanoSeconds = (int) (sleepTime - (milliSeconds * 1000000));
Thread.sleep(milliSeconds, nanoSeconds);
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
if (shutdown()) {
return -1;
}
}
}

// Increase the tick.
tick ++;
return deadline;
}
}

Expand Down
2 changes: 1 addition & 1 deletion akka-actor/src/main/scala/akka/dispatch/Future.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ object Futures {

/**
* Java API.
Reduces the results of the supplied futures and binary function.
* Reduces the results of the supplied futures and binary function.
*/
def reduce[T <: AnyRef, R >: T](futures: JIterable[Future[T]], fun: akka.japi.Function2[R, T, R], executor: ExecutionContext): Future[R] =
Future.reduce[T, R](scala.collection.JavaConversions.iterableAsScalaIterable(futures))(fun.apply)(executor)
Expand Down

0 comments on commit cb4e353

Please sign in to comment.