Skip to content

Commit

Permalink
Scheduler that manually advances time (#
Browse files Browse the repository at this point in the history
First version for review. More tests, docs and Java examples still TBD.
  • Loading branch information
raboof committed Jan 4, 2018
1 parent bd2a3de commit bbedcb2
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package akka.actor.typed

import scala.concurrent.duration._

import akka.actor.typed.scaladsl.Actor
import akka.testkit.typed.TestKit
import akka.testkit.typed.scaladsl.{ ManualTime, TestProbe }
import org.scalatest.WordSpecLike

class ManualTimerSpec extends TestKit() with ManualTime with WordSpecLike {
//#manual-scheduling-simple
"A timer" must {
"schedule non-repeated ticks" in {
case object Tick
case object Tock

val probe = TestProbe[Tock.type]()
val behv = Actor.withTimers[Tick.type] { timer
timer.startSingleTimer("T", Tick, 10.millis)
Actor.immutable { (ctx, Tick)
probe.ref ! Tock
Actor.same
}
}

val ref = spawn(behv)

scheduler.timePasses(9.millis)
probe.expectNoMsg(Duration.Zero)

scheduler.timePasses(2.millis)
probe.expectMsg(Tock)
probe.expectNoMsg(Duration.Zero)
}
}
//#manual-scheduling-simple
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,15 @@ object AskPattern {
* val f: Future[Reply] = target ? (Request("hello", _))
* }}}
*/
def ?[U](f: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] =
def ?[U](f: ActorRef[U] T)(implicit timeout: Timeout, scheduler: Scheduler): Future[U] = {
// We do not currently use the implicit scheduler, but want to require it
// because it might be needed when we move to a 'native' typed runtime, see #24219
ref match {
case a: adapt.ActorRefAdapter[_] askUntyped(ref, a.untyped, timeout, f)
case a: adapt.ActorSystemAdapter[_] askUntyped(ref, a.untyped.guardian, timeout, f)
case a throw new IllegalStateException("Only expect actor references to be ActorRefAdapter or ActorSystemAdapter until native system is implemented: " + a.getClass)
}
}
}

private final class PromiseRef[U](target: ActorRef[_], untyped: InternalActorRef, timeout: Timeout) {
Expand Down
11 changes: 11 additions & 0 deletions akka-docs/src/main/paradox/testing-typed.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,3 +183,14 @@ Scala
Java
: @@snip [BasicAsyncTestingTest.java]($akka$/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/testing/async/BasicAsyncTestingTest.java) { #test-spawn-anonymous }

### Controlling the scheduler

It can be hard to reliably unit test specific scenario's when your actor relies on timing:
especially when running many tests in parallel it can be hard to get the timing just right.
Making such tests more reliable by using generous timeouts make the tests take a long time to run.

For such situations, we provide a scheduler where you can manually, explicitly advance the clock.

// TODO java example
Scala
: @@snip [ManualTimerSpec.scala]($akka$/akka-actor-typed-tests/src/test/scala/akka/actor/typed/ManualTimerSpec.scala) { #manual-scheduling-simple }
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package akka.testkit.typed

import java.util.concurrent.ThreadFactory

import akka.event.LoggingAdapter
import com.typesafe.config.Config

import scala.concurrent.duration.FiniteDuration

class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends akka.testkit.ExplicitlyTriggeredScheduler(config, log, tf) {
def timePasses(amount: FiniteDuration) = {
val newTime = currentTime.get + amount.toMillis
executeTasks(newTime)
currentTime.set(newTime)
}
}
16 changes: 13 additions & 3 deletions akka-testkit-typed/src/main/scala/akka/testkit/typed/TestKit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import akka.annotation.ApiMayChange
import akka.testkit.typed.TestKit._
import akka.util.Timeout
import com.typesafe.config.Config
import com.typesafe.config.{ Config, ConfigFactory }

import scala.concurrent.duration._
import scala.concurrent.{ Await, TimeoutException }
Expand Down Expand Up @@ -64,12 +64,16 @@ class TestKit(name: String, config: Option[Config]) extends TestKitBase {
def this(name: String) = this(name, None)
def this(config: Config) = this(TestKit.getCallerName(classOf[TestKit]), Some(config))
def this(name: String, config: Config) = this(name, Some(config))

import TestKit._
implicit val system = ActorSystem(testKitGuardian, name, config = config)
implicit val system = ActorSystem(testKitGuardian, name, config = Some(config match {
case None mixedInConfig.withFallback(ConfigFactory.load())
case Some(explicitConfiguration) explicitConfiguration.withFallback(mixedInConfig)
}))
}

@ApiMayChange
trait TestKitBase {
trait TestKitBase extends TestKitMixin {
def system: ActorSystem[TestKitCommand]
implicit def testkitSettings = TestKitSettings(system)
implicit def scheduler = system.scheduler
Expand Down Expand Up @@ -102,3 +106,9 @@ trait TestKitBase {
def systemActor[T](behaviour: Behavior[T]): ActorRef[T] =
Await.result(system.systemActorOf(behaviour, childName.next()), timeoutDuration)
}

@ApiMayChange
trait TestKitMixin {
// Can be overridden by traits extending TestKitMixin to contribute to the configuration
def mixedInConfig: Config = ConfigFactory.empty()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package akka.testkit.typed.scaladsl

import com.typesafe.config.{ Config, ConfigFactory }

import akka.testkit.typed._

trait ManualTime extends TestKitMixin { self: TestKit
override def mixedInConfig: Config =
ConfigFactory
.parseString("""akka.scheduler.implementation = "akka.testkit.typed.ExplicitlyTriggeredScheduler"""")
.withFallback(super.mixedInConfig)

override val scheduler: ExplicitlyTriggeredScheduler = self.system.scheduler.asInstanceOf[ExplicitlyTriggeredScheduler]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package akka.testkit

import java.util.concurrent.ThreadFactory
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.{ Duration, FiniteDuration }

import com.typesafe.config.Config

import akka.actor.{ ActorSystem, Cancellable, Scheduler }
import akka.event.LoggingAdapter

/**
* For testing: scheduler that does not look at the clock, but must be progressed manually by calling `timePasses`.
*
* This is not entirely realistic: jobs will be executed on the test thread instead of using the `ExecutionContext`, but does
* allow for faster and less timing-sensitive specs..
*/
class ExplicitlyTriggeredScheduler(config: Config, log: LoggingAdapter, tf: ThreadFactory) extends Scheduler {

case class Item(time: Long, interval: Option[FiniteDuration], runnable: Runnable)

val currentTime = new AtomicLong()
val scheduled = new ConcurrentHashMap[Item, Unit]()

override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
schedule(initialDelay, Some(interval), runnable)

override def scheduleOnce(delay: FiniteDuration, runnable: Runnable)(implicit executor: ExecutionContext): Cancellable =
schedule(delay, None, runnable)

def timePasses(amount: FiniteDuration)(implicit system: ActorSystem) = {
// TODO double-check if we really want/need dilation here
val newTime = currentTime.get + amount.dilated.toMillis
executeTasks(newTime)
currentTime.set(newTime)
}

@tailrec
private[testkit] final def executeTasks(runTo: Long): Unit = {
scheduled
.keySet
.asScala
.filter(_.time <= runTo)
.toList
.sortBy(_.time)
.headOption match {
case Some(task)
currentTime.set(task.time)
task.runnable.run()
scheduled.remove(task)
task.interval.foreach(i scheduled.put(task.copy(time = task.time + i.toMillis), ()))

// running the runnable might have scheduled new events
executeTasks(runTo)
case _ // Done
}
}

private def schedule(initialDelay: FiniteDuration, interval: Option[FiniteDuration], runnable: Runnable)(implicit executor: ExecutionContext): Cancellable = {
val item = Item(currentTime.get + initialDelay.toMillis, interval, runnable)
scheduled.put(item, ())

if (initialDelay == Duration.Zero)
executeTasks(currentTime.get)

new Cancellable {
var cancelled = false

override def cancel(): Boolean = {
val before = scheduled.size
scheduled.remove(item)
cancelled = true
before > scheduled.size
}

override def isCancelled: Boolean = cancelled
}
}

override def maxFrequency: Double = 42
}

0 comments on commit bbedcb2

Please sign in to comment.