Permalink
Browse files

#292 - Added scheduleOne and re-created unit tests

  • Loading branch information...
1 parent 21dc177 commit fe22944fe9be2c25b18ced5536a37cfcbf14d0ef @momania momania committed Jul 1, 2010
Showing with 47 additions and 23 deletions.
  1. +20 −4 akka-core/src/main/scala/actor/Scheduler.scala
  2. +27 −19 akka-core/src/test/scala/SchedulerSpec.scala
@@ -15,10 +15,9 @@
*/
package se.scalablesolutions.akka.actor
+import _root_.scala.collection.JavaConversions
import java.util.concurrent._
-import se.scalablesolutions.akka.config.ScalaConfig._
-import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.util.Logging
object Scheduler {
@@ -35,6 +34,7 @@ object Scheduler {
val future = service.scheduleAtFixedRate(
new Runnable { def run = receiver ! message },
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
+ createAndStoreScheduleActorForFuture(future)
val scheduler = actorOf(new ScheduleActor(future)).start
schedulers.put(scheduler, scheduler)
scheduler
@@ -43,6 +43,22 @@ object Scheduler {
}
}
+ def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ActorRef = {
+ try {
+ val future = service.schedule(
+ new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
+ createAndStoreScheduleActorForFuture(future)
+ } catch {
+ case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
+ }
+ }
+
+ private def createAndStoreScheduleActorForFuture(future: ScheduledFuture[AnyRef]): ActorRef = {
+ val scheduler = actorOf(new ScheduleActor(future)).start
+ schedulers.put(scheduler, scheduler)
+ scheduler
+ }
+
def unschedule(scheduleActor: ActorRef) = {
scheduleActor ! UnSchedule
schedulers.remove(scheduleActor)
@@ -65,7 +81,7 @@ private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor with
def receive = {
case Scheduler.UnSchedule =>
future.cancel(true)
- exit
+ self.stop
}
}
@@ -79,4 +95,4 @@ private object SchedulerThreadFactory extends ThreadFactory {
thread.setDaemon(true)
thread
}
-}
+}
@@ -1,30 +1,38 @@
package se.scalablesolutions.akka.actor
-import java.util.concurrent.TimeUnit
-
import org.scalatest.junit.JUnitSuite
-import org.junit.Test
-
import Actor._
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+import org.junit.{After, Test}
class SchedulerSpec extends JUnitSuite {
+
+ @Test def schedulerShouldScheduleMoreThanOnce = {
+
+ case object Tick
+ val countDownLatch = new CountDownLatch(3)
+ val tickActor = actor {
+ case Tick => countDownLatch.countDown
+ }
+ // run every 50 millisec
+ Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)
+
+ // after max 1 second it should be executed at least the 3 times already
+ assert(countDownLatch.await(1, TimeUnit.SECONDS))
+ }
- @Test def schedulerShouldSchedule = {
-/*
- var count = 0
+ @Test def schedulerShouldScheduleOnce = {
case object Tick
- val actor = new Actor() {
- def receive = {
- case Tick => count += 1
- }}
- actor.start
- Thread.sleep(1000)
- Scheduler.schedule(actor, Tick, 0L, 1L, TimeUnit.SECONDS)
- Thread.sleep(5000)
- Scheduler.stop
- assert(count > 0)
+ val countDownLatch = new CountDownLatch(2)
+ val tickActor = actor {
+ case Tick => countDownLatch.countDown
+ }
+ // run every 50 millisec
+ Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)
-*/
- assert(true)
+ // after 1 second the wait should fail
+ assert(countDownLatch.await(1, TimeUnit.SECONDS) == false)
+ // should still be 1 left
+ assert(countDownLatch.getCount == 1)
}
}

0 comments on commit fe22944

Please sign in to comment.