Permalink
Browse files

implement more precise bandwith throttling

- will keep track of theoretical packet boundaries and send on timer
  tick or send request according to actual time
- will split packets if calculated release time is >100ms into the
  future (configurable) to simulate proper trickling
  • Loading branch information...
1 parent c68df06 commit a351e6ad9fb9ca24d69a20da9f6bb2028f901a91 @rkuhn rkuhn committed May 5, 2012
@@ -165,6 +165,11 @@ akka {
# Timeout for interrogation of TestConductor’s Controller actor
query-timeout = 5s
+ # Threshold for packet size in time unit above which the failure injector will
+ # split the packet and deliver in smaller portions; do not give value smaller
+ # than HashedWheelTimer resolution (would not make sense)
+ packet-split-threshold = 100ms
+
# Default port to start the conductor on; 0 means <auto>
port = 0
@@ -24,6 +24,7 @@ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with C
implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("akka.testconductor.barrier-timeout"), MILLISECONDS))
implicit val QueryTimeout = Timeout(Duration(config.getMilliseconds("akka.testconductor.query-timeout"), MILLISECONDS))
+ val PacketSplitThreshold = Duration(config.getMilliseconds("akka.testconductor.packet-split-threshold"), MILLISECONDS)
val name = config.getString("akka.testconductor.name")
val host = config.getString("akka.testconductor.host")
@@ -23,6 +23,13 @@ import akka.actor.Props
import akka.actor.ActorRef
import akka.event.Logging
import org.jboss.netty.channel.SimpleChannelHandler
+import scala.annotation.tailrec
+import akka.util.Duration
+import akka.actor.LoggingFSM
+import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.ChannelFuture
+import org.jboss.netty.channel.ChannelFutureListener
+import org.jboss.netty.channel.ChannelFuture
case class FailureInjector(sender: ActorRef, receiver: ActorRef) {
def refs(dir: Direction) = dir match {
@@ -42,8 +49,10 @@ class NetworkFailureInjector(system: ActorSystem) extends SimpleChannelHandler {
val log = Logging(system, "FailureInjector")
// everything goes via these Throttle actors to enable easy steering
- private val sender = system.actorOf(Props(new Throttle(_.sendDownstream(_))))
- private val receiver = system.actorOf(Props(new Throttle(_.sendUpstream(_))))
+ private val sender = system.actorOf(Props(new Throttle(Direction.Send)))
+ private val receiver = system.actorOf(Props(new Throttle(Direction.Receive)))
+
+ private val packetSplitThreshold = TestConductor(system).Settings.PacketSplitThreshold
/*
* State, Data and Messages for the internal Throttle actor
@@ -53,80 +62,136 @@ class NetworkFailureInjector(system: ActorSystem) extends SimpleChannelHandler {
private case object Throttle extends State
private case object Blackhole extends State
- private case class Data(ctx: ChannelHandlerContext, rateMBit: Float, queue: Queue[MessageEvent])
+ private case class Data(lastSent: Long, rateMBit: Float, queue: Queue[Send])
- private case class Send(ctx: ChannelHandlerContext, msg: MessageEvent)
+ private case class Send(ctx: ChannelHandlerContext, future: Option[ChannelFuture], msg: AnyRef)
private case class SetContext(ctx: ChannelHandlerContext)
private case object Tick
- private class Throttle(send: (ChannelHandlerContext, MessageEvent) Unit) extends Actor with FSM[State, Data] {
+ private class Throttle(dir: Direction) extends Actor with LoggingFSM[State, Data] {
import FSM._
- startWith(PassThrough, Data(null, -1, Queue()))
+ var channelContext: ChannelHandlerContext = _
+
+ startWith(PassThrough, Data(0, -1, Queue()))
when(PassThrough) {
- case Event(Send(ctx, msg), d)
+ case Event(s @ Send(_, _, msg), _)
log.debug("sending msg (PassThrough): {}", msg)
- send(ctx, msg)
+ send(s)
stay
}
when(Throttle) {
- case Event(Send(ctx, msg), d)
- if (!timerActive_?("send")) {
- setTimer("send", Tick, (size(msg) / d.rateMBit) microseconds, false)
- }
- stay using d.copy(ctx = ctx, queue = d.queue.enqueue(msg))
- case Event(Tick, d)
- val (msg, queue) = d.queue.dequeue
- log.debug("sending msg (Tick, {}/{} left): {}", d.queue.size, queue.size, msg)
- send(d.ctx, msg)
- if (queue.nonEmpty) {
- val time = (size(queue.head) / d.rateMBit).microseconds
- log.debug("scheduling next Tick in {}", time)
- setTimer("send", Tick, time, false)
- }
- stay using d.copy(queue = queue)
+ case Event(s: Send, d @ Data(_, _, Queue()))
+ stay using sendThrottled(d.copy(lastSent = System.nanoTime, queue = Queue(s)))
+ case Event(s: Send, data)
+ stay using sendThrottled(data.copy(queue = data.queue.enqueue(s)))
+ case Event(Tick, data)
+ stay using sendThrottled(data)
}
onTransition {
case Throttle -> PassThrough
- stateData.queue foreach { msg
- log.debug("sending msg (Transition): {}")
- send(stateData.ctx, msg)
+ for (s stateData.queue) {
+ log.debug("sending msg (Transition): {}", s.msg)
+ send(s)
}
cancelTimer("send")
case Throttle -> Blackhole
cancelTimer("send")
}
when(Blackhole) {
- case Event(Send(_, msg), _)
+ case Event(Send(_, _, msg), _)
log.debug("dropping msg {}", msg)
stay
}
whenUnhandled {
- case Event(SetContext(ctx), d) stay using d.copy(ctx = ctx)
case Event(NetworkFailureInjector.SetRate(rate), d)
sender ! "ok"
if (rate > 0) {
- goto(Throttle) using d.copy(rateMBit = rate, queue = Queue())
+ goto(Throttle) using d.copy(lastSent = System.nanoTime, rateMBit = rate, queue = Queue())
} else if (rate == 0) {
goto(Blackhole)
} else {
goto(PassThrough)
}
+ case Event(SetContext(ctx), _) channelContext = ctx; stay
case Event(NetworkFailureInjector.Disconnect(abort), Data(ctx, _, _))
sender ! "ok"
// TODO implement abort
- ctx.getChannel.disconnect()
+ channelContext.getChannel.disconnect()
stay
}
initialize
- private def size(msg: MessageEvent) = msg.getMessage() match {
+ private def sendThrottled(d: Data): Data = {
+ val (data, toSend, toTick) = schedule(d)
+ for (s toSend) {
+ log.debug("sending msg (Tick): {}", s.msg)
+ send(s)
+ }
+ for (time toTick) {
+ log.debug("scheduling next Tick in {}", time)
+ setTimer("send", Tick, time, false)
+ }
+ data
+ }
+
+ private def send(s: Send): Unit = dir match {
+ case Direction.Send Channels.write(s.ctx, s.future getOrElse Channels.future(s.ctx.getChannel), s.msg)
+ case Direction.Receive Channels.fireMessageReceived(s.ctx, s.msg)
+ case _
+ }
+
+ private def schedule(d: Data): (Data, Seq[Send], Option[Duration]) = {
+ val now = System.nanoTime
+ @tailrec def rec(d: Data, toSend: Seq[Send]): (Data, Seq[Send], Option[Duration]) = {
+ if (d.queue.isEmpty) (d, toSend, None)
+ else {
+ val timeForPacket = d.lastSent + (1000 * size(d.queue.head.msg) / d.rateMBit).toLong
+ if (timeForPacket <= now) rec(Data(timeForPacket, d.rateMBit, d.queue.tail), toSend :+ d.queue.head)
+ else {
+ val deadline = now + packetSplitThreshold.toNanos
+ if (timeForPacket <= deadline) (d, toSend, Some((timeForPacket - now).nanos))
+ else {
+ val micros = (deadline - d.lastSent) / 1000
+ val (s1, s2) = split(d.queue.head, (micros * d.rateMBit / 8).toInt)
+ (d.copy(queue = s1 +: s2 +: d.queue.tail), toSend, Some(packetSplitThreshold))
+ }
+ }
+ }
+ }
+ rec(d, Seq())
+ }
+
+ private def split(s: Send, bytes: Int): (Send, Send) = {
+ s.msg match {
+ case buf: ChannelBuffer
+ val f = s.future map { f
+ val newF = Channels.future(s.ctx.getChannel)
+ newF.addListener(new ChannelFutureListener {
+ def operationComplete(future: ChannelFuture) {
+ if (future.isCancelled) f.cancel()
+ else future.getCause match {
+ case null
+ case thr f.setFailure(thr)
+ }
+ }
+ })
+ newF
+ }
+ val b = buf.slice()
+ b.writerIndex(b.readerIndex + bytes)
+ buf.readerIndex(buf.readerIndex + bytes)
+ (Send(s.ctx, f, b), Send(s.ctx, s.future, buf))
+ }
+ }
+
+ private def size(msg: AnyRef) = msg match {
case b: ChannelBuffer b.readableBytes() * 8
case _ throw new UnsupportedOperationException("NetworkFailureInjector only supports ChannelBuffer messages")
}
@@ -136,7 +201,7 @@ class NetworkFailureInjector(system: ActorSystem) extends SimpleChannelHandler {
override def messageReceived(ctx: ChannelHandlerContext, msg: MessageEvent) {
log.debug("upstream(queued): {}", msg)
- receiver ! Send(ctx, msg)
+ receiver ! Send(ctx, Option(msg.getFuture), msg.getMessage)
}
override def channelConnected(ctx: ChannelHandlerContext, state: ChannelStateEvent) {
@@ -166,7 +231,7 @@ class NetworkFailureInjector(system: ActorSystem) extends SimpleChannelHandler {
override def writeRequested(ctx: ChannelHandlerContext, msg: MessageEvent) {
log.debug("downstream(queued): {}", msg)
- sender ! Send(ctx, msg)
+ sender ! Send(ctx, Option(msg.getFuture), msg.getMessage)
}
}
@@ -62,15 +62,15 @@ class TestConductorMultiJvmNode1 extends AkkaRemoteSpec(TestConductorMultiJvmSpe
"throttling" in {
expectMsg("start")
- tc.throttle("node1", "node0", Direction.Send, 0.016).await
+ tc.throttle("node1", "node0", Direction.Send, 0.01).await
tc.enter("throttled_send")
within(1 second, 2 seconds) {
receiveN(10) must be(0 to 9)
}
tc.enter("throttled_send2")
tc.throttle("node1", "node0", Direction.Send, -1).await
- tc.throttle("node1", "node0", Direction.Receive, 0.016).await
+ tc.throttle("node1", "node0", Direction.Receive, 0.01).await
tc.enter("throttled_recv")
receiveN(10, 500 millis) must be(10 to 19)
tc.enter("throttled_recv2")
@@ -69,15 +69,15 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor {
* <pre>
* class Test extends TestKit(ActorSystem()) {
* try {
- *
+ *
* val test = system.actorOf(Props[SomeActor]
*
* within (1 second) {
* test ! SomeWork
* expectMsg(Result1) // bounded to 1 second
* expectMsg(Result2) // bounded to the remainder of the 1 second
* }
- *
+ *
* } finally {
* system.shutdown()
* }
@@ -86,7 +86,7 @@ class TestActor(queue: BlockingDeque[TestActor.Message]) extends Actor {
*
* Beware of two points:
*
- * - the ActorSystem passed into the constructor needs to be shutdown,
+ * - the ActorSystem passed into the constructor needs to be shutdown,
* otherwise thread pools and memory will be leaked
* - this trait is not thread-safe (only one actor with one queue, one stack
* of `within` blocks); it is expected that the code is executed from a

0 comments on commit a351e6a

Please sign in to comment.