Permalink
Browse files

Implemented HashedWheelTimer as the default scheduling mechanism in A…

…kka. Fixes #1291
  • Loading branch information...
1 parent b2d548b commit 896c906d03b8096d31c78f125d4279f6903749c5 @henrikengstrom henrikengstrom committed Nov 9, 2011
Showing with 3,318 additions and 132 deletions.
  1. +2 −0 .history
  2. +17 −15 akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala
  3. +88 −0 akka-actor/src/main/java/org/jboss/netty/akka/logging/AbstractInternalLogger.java
  4. +42 −0 akka-actor/src/main/java/org/jboss/netty/akka/logging/InternalLogLevel.java
  5. +102 −0 akka-actor/src/main/java/org/jboss/netty/akka/logging/InternalLogger.java
  6. +155 −0 akka-actor/src/main/java/org/jboss/netty/akka/logging/InternalLoggerFactory.java
  7. +93 −0 akka-actor/src/main/java/org/jboss/netty/akka/logging/JdkLogger.java
  8. +38 −0 akka-actor/src/main/java/org/jboss/netty/akka/logging/JdkLoggerFactory.java
  9. +64 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/DebugUtil.java
  10. +555 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java
  11. +74 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/MapBackedSet.java
  12. +59 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/ThreadNameDeterminer.java
  13. +131 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/ThreadRenamingRunnable.java
  14. +56 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/Timeout.java
  15. +54 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java
  16. +37 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/TimerTask.java
  17. +1,418 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ConcurrentIdentityHashMap.java
  18. +27 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/internal/ReusableIterator.java
  19. +63 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/internal/SharedResourceMisuseDetector.java
  20. +85 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/internal/StackTraceSimplifier.java
  21. +89 −0 akka-actor/src/main/java/org/jboss/netty/akka/util/internal/SystemPropertyUtil.java
  22. +9 −4 akka-actor/src/main/scala/akka/AkkaApplication.scala
  23. +3 −5 akka-actor/src/main/scala/akka/actor/ActorCell.scala
  24. +6 −7 akka-actor/src/main/scala/akka/actor/FSM.scala
  25. +48 −98 akka-actor/src/main/scala/akka/actor/Scheduler.scala
  26. +2 −2 akka-actor/src/main/scala/akka/dispatch/Future.scala
  27. +1 −1 akka-remote/src/main/scala/akka/remote/Gossiper.scala
View
@@ -1,2 +1,4 @@
update
reload
+projects
+exit
@@ -4,21 +4,21 @@ import org.scalatest.BeforeAndAfterEach
import akka.testkit.TestEvent._
import akka.testkit.EventFilter
import org.multiverse.api.latches.StandardLatch
-import java.util.concurrent.{ ScheduledFuture, ConcurrentLinkedQueue, CountDownLatch, TimeUnit }
+import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit }
import akka.testkit.AkkaSpec
+import org.jboss.netty.akka.util.{ Timeout TimeOut }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
- private val futures = new ConcurrentLinkedQueue[ScheduledFuture[AnyRef]]()
+ private val timeouts = new ConcurrentLinkedQueue[TimeOut]()
- def collectFuture(f: ScheduledFuture[AnyRef]): ScheduledFuture[AnyRef] = {
- val future = f
- futures.add(future)
- future
+ def collectTimeout(t: TimeOut): TimeOut = {
+ timeouts.add(t)
+ t
}
override def afterEach {
- while (futures.peek() ne null) { Option(futures.poll()).foreach(_.cancel(true)) }
+ while (timeouts.peek() ne null) { Option(timeouts.poll()).foreach(_.cancel()) }
}
"A Scheduler" must {
@@ -30,14 +30,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
def receive = { case Tick countDownLatch.countDown() }
})
// run every 50 millisec
- collectFuture(app.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS))
+ collectTimeout(app.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3)
- collectFuture(app.scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS))
+ collectTimeout(app.scheduler.schedule(() countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(2, TimeUnit.SECONDS))
@@ -49,9 +49,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
val tickActor = actorOf(new Actor {
def receive = { case Tick countDownLatch.countDown() }
})
+
// run every 50 millisec
- collectFuture(app.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS))
- collectFuture(app.scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS))
+ collectTimeout(app.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS))
+ collectTimeout(app.scheduler.scheduleOnce(() countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS))
// after 1 second the wait should fail
assert(countDownLatch.await(2, TimeUnit.SECONDS) == false)
@@ -87,9 +88,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
})
(1 to 10).foreach { i
- val future = collectFuture(app.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS))
- future.cancel(true)
+ val timeout = collectTimeout(app.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS))
+ timeout.cancel()
}
+
assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made
}
@@ -114,9 +116,9 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
})
val actor = (supervisor ? props).as[ActorRef].get
- collectFuture(app.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS))
+ collectTimeout(app.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS))
// appx 2 pings before crash
- collectFuture(app.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS))
+ collectTimeout(app.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS))
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
// should be enough time for the ping countdown to recover and reach 6 pings
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * Red Hat licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jboss.netty.akka.logging;
+
+/**
+ * A skeletal implementation of {@link InternalLogger}. This class implements
+ * all methods that have a {@link InternalLogLevel} parameter by default to call
+ * specific logger methods such as {@link #info(String)} or {@link #isInfoEnabled()}.
+ *
+ * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
+ * @author <a href="http://gleamynode.net/">Trustin Lee</a>
+ * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
+ */
+public abstract class AbstractInternalLogger implements InternalLogger {
+
+ /**
+ * Creates a new instance.
+ */
+ protected AbstractInternalLogger() {
+ super();
+ }
+
+ public boolean isEnabled(InternalLogLevel level) {
+ switch (level) {
+ case DEBUG:
+ return isDebugEnabled();
+ case INFO:
+ return isInfoEnabled();
+ case WARN:
+ return isWarnEnabled();
+ case ERROR:
+ return isErrorEnabled();
+ default:
+ throw new Error();
+ }
+ }
+
+ public void log(InternalLogLevel level, String msg, Throwable cause) {
+ switch (level) {
+ case DEBUG:
+ debug(msg, cause);
+ break;
+ case INFO:
+ info(msg, cause);
+ break;
+ case WARN:
+ warn(msg, cause);
+ break;
+ case ERROR:
+ error(msg, cause);
+ break;
+ default:
+ throw new Error();
+ }
+ }
+
+ public void log(InternalLogLevel level, String msg) {
+ switch (level) {
+ case DEBUG:
+ debug(msg);
+ break;
+ case INFO:
+ info(msg);
+ break;
+ case WARN:
+ warn(msg);
+ break;
+ case ERROR:
+ error(msg);
+ break;
+ default:
+ throw new Error();
+ }
+ }
+}
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * Red Hat licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jboss.netty.akka.logging;
+
+/**
+ * The log level that {@link InternalLogger} can log at.
+ *
+ * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
+ * @author <a href="http://gleamynode.net/">Trustin Lee</a>
+ * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
+ */
+public enum InternalLogLevel {
+ /**
+ * 'DEBUG' log level.
+ */
+ DEBUG,
+ /**
+ * 'INFO' log level.
+ */
+ INFO,
+ /**
+ * 'WARN' log level.
+ */
+ WARN,
+ /**
+ * 'ERROR' log level.
+ */
+ ERROR;
+}
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ *
+ * Red Hat licenses this file to you under the Apache License, version 2.0
+ * (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+package org.jboss.netty.akka.logging;
+
+/**
+ * <em>Internal-use-only</em> logger used by Netty. <strong>DO NOT</strong>
+ * access this class outside of Netty.
+ *
+ * @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
+ * @author <a href="http://gleamynode.net/">Trustin Lee</a>
+ *
+ * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
+ */
+public interface InternalLogger {
+ /**
+ * Returns {@code true} if a DEBUG level message is logged.
+ */
+ boolean isDebugEnabled();
+
+ /**
+ * Returns {@code true} if an INFO level message is logged.
+ */
+ boolean isInfoEnabled();
+
+ /**
+ * Returns {@code true} if a WARN level message is logged.
+ */
+ boolean isWarnEnabled();
+
+ /**
+ * Returns {@code true} if an ERROR level message is logged.
+ */
+ boolean isErrorEnabled();
+
+ /**
+ * Returns {@code true} if the specified log level message is logged.
+ */
+ boolean isEnabled(InternalLogLevel level);
+
+ /**
+ * Logs a DEBUG level message.
+ */
+ void debug(String msg);
+
+ /**
+ * Logs a DEBUG level message.
+ */
+ void debug(String msg, Throwable cause);
+
+ /**
+ * Logs an INFO level message.
+ */
+ void info(String msg);
+
+ /**
+ * Logs an INFO level message.
+ */
+ void info(String msg, Throwable cause);
+
+ /**
+ * Logs a WARN level message.
+ */
+ void warn(String msg);
+
+ /**
+ * Logs a WARN level message.
+ */
+ void warn(String msg, Throwable cause);
+
+ /**
+ * Logs an ERROR level message.
+ */
+ void error(String msg);
+
+ /**
+ * Logs an ERROR level message.
+ */
+ void error(String msg, Throwable cause);
+
+ /**
+ * Logs a message.
+ */
+ void log(InternalLogLevel level, String msg);
+
+ /**
+ * Logs a message.
+ */
+ void log(InternalLogLevel level, String msg, Throwable cause);
+}
Oops, something went wrong.

0 comments on commit 896c906

Please sign in to comment.