Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Clean shutdown of ReliableProxySpec, see #2846 #993

Merged
merged 1 commit into from

5 participants

@patriknw
Owner
  • Solved by adding missing Dequeue in throttler
  • Some boy scouting
@viktorklang viktorklang commented on the diff
...vm/scala/akka/contrib/pattern/ReliableProxySpec.scala
@@ -199,8 +202,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(50)
}
}
-
- enterBarrier("test4a")
@viktorklang Owner

wow, nice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@drewhk drewhk commented on the diff
.../main/scala/akka/remote/testconductor/Extension.scala
@@ -36,8 +36,8 @@ object TestConductor extends ExtensionKey[TestConductorExt] {
* to be a [[akka.remote.RemoteActorRefProvider]].
*
* To use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the
- * `TestConductorTranport` by specifying `testTransport(on = true)` in your
- * MultiNodeConfig.
+ * failure injector and throttler transport adapters by specifying `testTransport(on = true)`
+ * in your MultiNodeConfig.
@drewhk Owner
drewhk added a note

Thanks for the doc changes!

@patriknw Owner
patriknw added a note

np, but I forgot the rst doc, I'll change there as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...akka/remote/transport/ThrottlerTransportAdapter.scala
((5 lines not shown))
stay()
case Event(InboundPayload(p), _) ⇒
forwardOrDelay(p)
stay()
case Event(Dequeue, _) ⇒
- if (!queue.isEmpty) {
+ if (queue.nonEmpty) {
@drewhk Owner
drewhk added a note

Yay, nice :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@drewhk
Owner

LGTM. I will add the inbound flushing.

...akka/remote/transport/ThrottlerTransportAdapter.scala
@@ -323,19 +323,20 @@ private[transport] class ThrottledAssociation(
case Event(mode: ThrottleMode, _) ⇒
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) queue = Queue.empty[ByteString]
+ self ! Dequeue
@rkuhn Owner
rkuhn added a note

While it fixes the current issue, I don’t think it is fully correct: this introduces a second Dequeue “token” flying around, which will then consume messages too fast. I think the delayed Dequeue sending should be done with an FSM named timer, which is then canceled right here before doing the direct thing (in case of PassThrough only; if you want to be really correct you could also cancel and reschedule for throttling mode to get immediate effect of changed bandwidth).

@drewhk Owner
drewhk added a note
@rkuhn Owner
rkuhn added a note

[sixties advertisement voice] … and only FSM timers are non-racy! FSM timers: when you need non-racy cancellation.

@patriknw Owner
patriknw added a note

sounds good, I'll take a stab at it

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw
Owner

I changed the scheduling in the throttler as discussed, please review, I might have misunderstood something

...akka/remote/transport/ThrottlerTransportAdapter.scala
((6 lines not shown))
val needed = (if (tokens > capacity) 1 else tokens) - tokensGenerated(currentTime)
- TimeUnit.SECONDS.toNanos((needed / tokensPerSecond).toLong)
+ (needed / tokensPerSecond).toLong.seconds
@rkuhn Owner
rkuhn added a note

hmm, why not use millisecond resolution? seems a bit strange to me that 999ms should be rounded down

@patriknw Owner
patriknw added a note

yes, I also thought about that rounding, but in the end I trusted the existing code, do we need rounding at all? double.seconds?

@drewhk Owner
drewhk added a note

You can get rid of the rounding, it would improve the code.

@patriknw Owner
patriknw added a note

ok

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...akka/remote/transport/ThrottlerTransportAdapter.scala
@@ -388,6 +389,12 @@ private[transport] class ThrottledAssociation(
}
}
+ def scheduleDequeue(delay: FiniteDuration): Unit = inboundThrottleMode match {
+ case Unthrottled ⇒ self ! Dequeue
+ case Blackhole ⇒ // Do nothing
+ case _ ⇒ setTimer(DequeueTimerName, Dequeue, delay, repeat = false)
@rkuhn Owner
rkuhn added a note

how about doing self ! Dequeue in case of zero delay?

@drewhk Owner
drewhk added a note

I would say < 0.01 or so is better than zero.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@rkuhn
Owner

yes, your changes look like what I had in mind; the two comments are more to the pre-existing state of the code, @drewhk can you comment?

@drewhk
Owner

I had no time to review this fully, I will continue tomorrow.

@akka-ci
Owner

Started jenkins job akka-pr-validator at https://jenkins.akka.io/job/akka-pr-validator/267/

@akka-ci
Owner

jenkins job akka-pr-validator: Success - https://jenkins.akka.io/job/akka-pr-validator/267/

...akka/remote/transport/ThrottlerTransportAdapter.scala
@@ -252,6 +253,8 @@ private[transport] class ThrottledAssociation(
extends Actor with LoggingFSM[ThrottlerState, ThrottlerData] {
import context.dispatcher
+ val DequeueTimerName = "dequeue"
@drewhk Owner
drewhk added a note

Shouldn't this be in a companion object?

@patriknw Owner
patriknw added a note

yes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@drewhk
Owner

LGTM

...akka/remote/transport/ThrottlerTransportAdapter.scala
((5 lines not shown))
override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true)
- override def timeToAvailable(currentTime: Long, tokens: Int): Long = 1L
+ override def timeToAvailable(currentTime: Long, tokens: Int): FiniteDuration = TimeToAvailable
@patriknw Owner
patriknw added a note

@drewhk what is the reason for using 1.nanos here?
I would like to change it to Duration.Zero

@drewhk Owner
drewhk added a note

How will timers behave if they are given a zero delay?

@patriknw Owner
patriknw added a note

that should be fine, but I would like to change to

def scheduleDequeue(delay: FiniteDuration): Unit = inboundThrottleMode match {
case Blackhole ⇒ // Do nothing
case _ if delay <= Duration.Zero ⇒ self ! Dequeue
case _ ⇒ setTimer(DequeueTimerName, Dequeue, delay, repeat = false)
}

1.nanos feels like a magic number that we don't really need

@viktorklang Owner

Good idea @patriknw

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw
Owner

Alright, I have made the suggested adjustments and will merge after PLS REBUILD ALL

@rkuhn
Owner
...akka/remote/transport/ThrottlerTransportAdapter.scala
((15 lines not shown))
val (payload, newqueue) = queue.dequeue
upstreamListener notify InboundPayload(payload)
queue = newqueue
inboundThrottleMode = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), payload.length)._1
- if (inboundThrottleMode == Unthrottled && !queue.isEmpty) self ! Dequeue
- else if (!queue.isEmpty) {
- context.system.scheduler.scheduleOnce(
- inboundThrottleMode.timeToAvailable(System.nanoTime(), queue.head.length).nanos, self, Dequeue)
- }
+ if (queue.nonEmpty)
+ scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), queue.head.length))
@viktorklang Owner

nanoTime may return negative values, is this accounted for in timeToAvailable?

@patriknw Owner
patriknw added a note

it's only used for producing a duration, i.e. diff between two nanoTime (btw that is the only thing nanoTime can be used for)

@viktorklang Owner

perhaps it is the name of "queue" that is a bit non-descript, what is it queueing?

@viktorklang Owner

The signature for timeToAvailable:

def timeToAvailable(currentTime: Long, tokens: Int): Long

Doesn't say in which format the currentTime is, is it currentTimeMillis (similar name) or "currentNanoTime" which would hint at nanoTime

@patriknw Owner
patriknw added a note

I agree, I will change

@patriknw Owner
patriknw added a note

regarding the queue it is holding the throttled (delayed, not sent yet) messages

@viktorklang Owner

Can we rename it so that the name has something to do with the contents?

@patriknw Owner
patriknw added a note

ok, fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@patriknw patriknw Clean shutdown of ReliableProxySpec, see #2846
* Solved by adding missing Dequeue in throttler
* Changed to FSM timers in throttler
* Some boy scouting
48c6374
@patriknw patriknw merged commit 27acfcf into master
@patriknw patriknw deleted the wip-2846-stop-ReliableProxySpec-patriknw branch
@akka-ci
Owner

Started jenkins job akka-pr-validator at https://jenkins.akka.io/job/akka-pr-validator/276/

@akka-ci
Owner

jenkins job akka-pr-validator: Success - https://jenkins.akka.io/job/akka-pr-validator/276/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jan 4, 2013
  1. @patriknw

    Clean shutdown of ReliableProxySpec, see #2846

    patriknw authored
    * Solved by adding missing Dequeue in throttler
    * Changed to FSM timers in throttler
    * Some boy scouting
This page is out of date. Refresh to see the latest.
View
45 akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ReliableProxySpec.scala
@@ -37,8 +37,9 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
override def afterEach {
runOn(local) {
- testConductor.throttle(local, remote, Direction.Both, -1).await
+ testConductor.passThrough(local, remote, Direction.Both).await
}
+ enterBarrier("after-each")
}
@volatile var target: ActorRef = system.deadLetters
@@ -46,8 +47,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
def expectState(s: State) = expectMsg(FSM.CurrentState(proxy, s))
def expectTransition(s1: State, s2: State) = expectMsg(FSM.Transition(proxy, s1, s2))
-
- def sendN(n: Int) = (1 to n) foreach (proxy ! _)
+
+ def sendN(n: Int) = (1 to n) foreach (proxy ! _)
def expectN(n: Int) = (1 to n) foreach { n expectMsg(n); lastSender must be === target }
"A ReliableProxy" must {
@@ -82,6 +83,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
runOn(remote) {
expectMsg("hello")
}
+
+ enterBarrier("initialize-done")
}
"forward messages in sequence" in {
@@ -95,9 +98,9 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(100)
}
}
-
+
enterBarrier("test1a")
-
+
runOn(local) {
sendN(100)
expectTransition(Idle, Active)
@@ -108,7 +111,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(100)
}
}
-
+
enterBarrier("test1b")
}
@@ -121,17 +124,17 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectNoMsg
}
}
-
+
enterBarrier("test2a")
-
+
runOn(remote) {
expectNoMsg(0 seconds)
}
-
+
enterBarrier("test2b")
-
+
runOn(local) {
- testConductor.throttle(local, remote, Direction.Send, -1).await
+ testConductor.passThrough(local, remote, Direction.Send).await
within(5 seconds) { expectTransition(Active, Idle) }
}
runOn(remote) {
@@ -139,7 +142,7 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(100)
}
}
-
+
enterBarrier("test2c")
}
@@ -157,14 +160,14 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(100)
}
}
-
+
enterBarrier("test3a")
-
+
runOn(local) {
- testConductor.throttle(local, remote, Direction.Receive, -1).await
+ testConductor.passThrough(local, remote, Direction.Receive).await
within(5 seconds) { expectTransition(Active, Idle) }
}
-
+
enterBarrier("test3b")
}
@@ -182,11 +185,11 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(50)
}
}
-
+
enterBarrier("test4a")
-
+
runOn(local) {
- testConductor.throttle(local, remote, Direction.Send, rateMBit = -1).await
+ testConductor.passThrough(local, remote, Direction.Send).await
testConductor.throttle(local, remote, Direction.Receive, rateMBit = 0.1).await
sendN(50)
within(5 seconds) {
@@ -199,8 +202,8 @@ class ReliableProxySpec extends MultiNodeSpec(ReliableProxySpec) with STMultiNod
expectN(50)
}
}
-
- enterBarrier("test4a")
@viktorklang Owner

wow, nice

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+
+ enterBarrier("test4b")
}
}
View
4 akka-docs/rst/dev/multi-node-testing.rst
@@ -207,8 +207,8 @@ surprising ways.
* Don't issue a shutdown of the first node. The first node is the controller and if it shuts down your test will break.
- * To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the ``TestConductorTranport``
- by specifying ``testTransport(on = true)`` in your MultiNodeConfig.
+ * To be able to use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the failure injector and
+ throttler transport adapters by specifying ``testTransport(on = true)`` in your MultiNodeConfig.
* Throttling, shutdown and other failure injections can only be done from the first node, which again is the controller.
View
26 akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala
@@ -94,8 +94,8 @@ trait Conductor { this: TestConductorExt ⇒
* increased latency.
*
* ====Note====
- * To use this feature you must activate the `TestConductorTranport`
- * by specifying `testTransport(on = true)` in your MultiNodeConfig.
+ * To use this feature you must activate the failure injector and throttler
+ * transport adapters by specifying `testTransport(on = true)` in your MultiNodeConfig.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be throttled
@@ -115,18 +115,15 @@ trait Conductor { this: TestConductorExt ⇒
* Socket.
*
* ====Note====
- * To use this feature you must activate the `TestConductorTranport`
- * by specifying `testTransport(on = true)` in your MultiNodeConfig.
+ * To use this feature you must activate the failure injector and throttler
+ * transport adapters by specifying `testTransport(on = true)` in your MultiNodeConfig.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
*/
- def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
- import Settings.QueryTimeout
- requireTestConductorTranport()
- controller ? Throttle(node, target, direction, 0f) mapTo classTag[Done]
- }
+ def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] =
+ throttle(node, target, direction, 0f)
private def requireTestConductorTranport(): Unit = if (!transport.defaultAddress.protocol.contains(".gremlin.trttl."))
throw new ConfigurationException("To use this feature you must activate the failure injector adapters " +
@@ -137,18 +134,15 @@ trait Conductor { this: TestConductorExt ⇒
* sending and/or receiving.
*
* ====Note====
- * To use this feature you must activate the `TestConductorTranport`
- * by specifying `testTransport(on = true)` in your MultiNodeConfig.
+ * To use this feature you must activate the failure injector and throttler
+ * transport adapters by specifying `testTransport(on = true)` in your MultiNodeConfig.
*
* @param node is the symbolic name of the node which is to be affected
* @param target is the symbolic name of the other node to which connectivity shall be impeded
* @param direction can be either `Direction.Send`, `Direction.Receive` or `Direction.Both`
*/
- def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] = {
- import Settings.QueryTimeout
- requireTestConductorTranport()
- controller ? Throttle(node, target, direction, -1f) mapTo classTag[Done]
- }
+ def passThrough(node: RoleName, target: RoleName, direction: Direction): Future[Done] =
+ throttle(node, target, direction, -1f)
/**
* Tell the remote support to shutdown the connection to the given remote
View
4 akka-remote-tests/src/main/scala/akka/remote/testconductor/Extension.scala
@@ -36,8 +36,8 @@ object TestConductor extends ExtensionKey[TestConductorExt] {
* to be a [[akka.remote.RemoteActorRefProvider]].
*
* To use ``blackhole``, ``passThrough``, and ``throttle`` you must activate the
- * `TestConductorTranport` by specifying `testTransport(on = true)` in your
- * MultiNodeConfig.
+ * failure injector and throttler transport adapters by specifying `testTransport(on = true)`
+ * in your MultiNodeConfig.
@drewhk Owner
drewhk added a note

Thanks for the doc changes!

@patriknw Owner
patriknw added a note

np, but I forgot the rst doc, I'll change there as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
*
*/
class TestConductorExt(val system: ExtendedActorSystem) extends Extension with Conductor with Player {
View
4 akka-remote-tests/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala
@@ -87,8 +87,8 @@ abstract class MultiNodeConfig {
/**
* To be able to use `blackhole`, `passThrough`, and `throttle` you must
- * activate the TestConductorTranport by specifying
- * `testTransport(on = true)` in your MultiNodeConfig.
+ * activate the failure injector and throttler transport adapters by
+ * specifying `testTransport(on = true)` in your MultiNodeConfig.
*/
def testTransport(on: Boolean): Unit = _testTransport = on
View
85 akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala
@@ -59,43 +59,42 @@ object ThrottlerTransportAdapter {
case class SetThrottle(address: Address, direction: Direction, mode: ThrottleMode)
sealed trait ThrottleMode {
- def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean)
- def timeToAvailable(currentTime: Long, tokens: Int): Long
+ def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean)
+ def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration
}
- case class TokenBucket(capacity: Int, tokensPerSecond: Double, lastSend: Long, availableTokens: Int)
+ case class TokenBucket(capacity: Int, tokensPerSecond: Double, nanoTimeOfLastSend: Long, availableTokens: Int)
extends ThrottleMode {
- private def isAvailable(timeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) {
+ private def isAvailable(nanoTimeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) {
true // Allow messages larger than capacity through, it will be recorded as negative tokens
- } else min((availableTokens + tokensGenerated(timeOfSend)), capacity) >= tokens
+ } else min((availableTokens + tokensGenerated(nanoTimeOfSend)), capacity) >= tokens
- override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = {
- if (isAvailable(timeOfSend, tokens))
+ override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = {
+ if (isAvailable(nanoTimeOfSend, tokens))
(this.copy(
- lastSend = timeOfSend,
- availableTokens = min(availableTokens - tokens + tokensGenerated(timeOfSend), capacity)), true)
+ nanoTimeOfLastSend = nanoTimeOfSend,
+ availableTokens = min(availableTokens - tokens + tokensGenerated(nanoTimeOfSend), capacity)), true)
else (this, false)
}
- override def timeToAvailable(currentTime: Long, tokens: Int): Long = {
- val needed = (if (tokens > capacity) 1 else tokens) - tokensGenerated(currentTime)
- TimeUnit.SECONDS.toNanos((needed / tokensPerSecond).toLong)
+ override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = {
+ val needed = (if (tokens > capacity) 1 else tokens) - tokensGenerated(currentNanoTime)
+ (needed / tokensPerSecond).seconds
}
- private def tokensGenerated(timeOfSend: Long): Int =
- (TimeUnit.NANOSECONDS.toMillis(timeOfSend - lastSend) * tokensPerSecond / 1000.0).toInt
+ private def tokensGenerated(nanoTimeOfSend: Long): Int =
+ (TimeUnit.NANOSECONDS.toMillis(nanoTimeOfSend - nanoTimeOfLastSend) * tokensPerSecond / 1000.0).toInt
}
case object Unthrottled extends ThrottleMode {
-
- override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true)
- override def timeToAvailable(currentTime: Long, tokens: Int): Long = 1L
+ override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, true)
+ override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero
}
case object Blackhole extends ThrottleMode {
- override def tryConsumeTokens(timeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false)
- override def timeToAvailable(currentTime: Long, tokens: Int): Long = 0L
+ override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = (this, false)
+ override def timeToAvailable(currentNanoTime: Long, tokens: Int): FiniteDuration = Duration.Zero
}
}
@@ -215,6 +214,8 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
}
object ThrottledAssociation {
+ private final val DequeueTimerName = "dequeue"
+
case object Dequeue
sealed trait ThrottlerState
@@ -253,7 +254,7 @@ private[transport] class ThrottledAssociation(
import context.dispatcher
var inboundThrottleMode: ThrottleMode = _
- var queue = Queue.empty[ByteString]
+ var throttledMessages = Queue.empty[ByteString]
var upstreamListener: HandleEventListener = _
override def postStop(): Unit = originalHandle.disassociate()
@@ -272,7 +273,7 @@ private[transport] class ThrottledAssociation(
when(WaitOrigin) {
case Event(InboundPayload(p), ExposedHandle(exposedHandle))
- queue = queue enqueue p
+ throttledMessages = throttledMessages enqueue p
peekOrigin(p) match {
case Some(origin)
manager ! Checkin(origin, exposedHandle)
@@ -283,12 +284,12 @@ private[transport] class ThrottledAssociation(
when(WaitMode) {
case Event(InboundPayload(p), _)
- queue = queue enqueue p
+ throttledMessages = throttledMessages enqueue p
stay()
case Event(mode: ThrottleMode, ExposedHandle(exposedHandle))
inboundThrottleMode = mode
if (inboundThrottleMode == Blackhole) {
- queue = Queue.empty[ByteString]
+ throttledMessages = Queue.empty[ByteString]
exposedHandle.disassociate()
stop()
} else {
@@ -300,7 +301,7 @@ private[transport] class ThrottledAssociation(
when(WaitUpstreamListener) {
case Event(InboundPayload(p), _)
- queue = queue enqueue p
+ throttledMessages = throttledMessages enqueue p
stay()
case Event(listener: HandleEventListener, _)
upstreamListener = listener
@@ -315,30 +316,30 @@ private[transport] class ThrottledAssociation(
self ! Dequeue
goto(Throttling)
case Event(InboundPayload(p), _)
- queue = queue enqueue p
+ throttledMessages = throttledMessages enqueue p
stay()
}
when(Throttling) {
case Event(mode: ThrottleMode, _)
inboundThrottleMode = mode
- if (inboundThrottleMode == Blackhole) queue = Queue.empty[ByteString]
+ if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString]
+ cancelTimer(DequeueTimerName)
+ if (throttledMessages.nonEmpty)
+ scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length))
stay()
case Event(InboundPayload(p), _)
forwardOrDelay(p)
stay()
case Event(Dequeue, _)
- if (!queue.isEmpty) {
- val (payload, newqueue) = queue.dequeue
+ if (throttledMessages.nonEmpty) {
+ val (payload, newqueue) = throttledMessages.dequeue
upstreamListener notify InboundPayload(payload)
- queue = newqueue
+ throttledMessages = newqueue
inboundThrottleMode = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), payload.length)._1
- if (inboundThrottleMode == Unthrottled && !queue.isEmpty) self ! Dequeue
- else if (!queue.isEmpty) {
- context.system.scheduler.scheduleOnce(
- inboundThrottleMode.timeToAvailable(System.nanoTime(), queue.head.length).nanos, self, Dequeue)
- }
+ if (throttledMessages.nonEmpty)
+ scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length))
}
stay()
@@ -370,24 +371,28 @@ private[transport] class ThrottledAssociation(
if (inboundThrottleMode == Blackhole) {
// Do nothing
} else {
- if (queue.isEmpty) {
+ if (throttledMessages.isEmpty) {
val tokens = payload.length
val (newbucket, success) = inboundThrottleMode.tryConsumeTokens(System.nanoTime(), tokens)
if (success) {
inboundThrottleMode = newbucket
upstreamListener notify InboundPayload(payload)
} else {
- queue = queue.enqueue(payload)
-
- context.system.scheduler.scheduleOnce(
- inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens).nanos, self, Dequeue)
+ throttledMessages = throttledMessages.enqueue(payload)
+ scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens))
}
} else {
- queue = queue.enqueue(payload)
+ throttledMessages = throttledMessages.enqueue(payload)
}
}
}
+ def scheduleDequeue(delay: FiniteDuration): Unit = inboundThrottleMode match {
+ case Blackhole // Do nothing
+ case _ if delay <= Duration.Zero self ! Dequeue
+ case _ setTimer(DequeueTimerName, Dequeue, delay, repeat = false)
+ }
+
}
private[transport] case class ThrottlerHandle(_wrappedHandle: AssociationHandle, throttlerActor: ActorRef)
View
40 akka-remote/src/test/scala/akka/remote/transport/ThrottleModeSpec.scala
@@ -18,77 +18,77 @@ class ThrottleModeSpec extends AkkaSpec {
}
"in tokenbucket mode allow consuming tokens up to capacity" in {
- val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 100)
- val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10)
+ val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 100)
+ val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 10)
bucket1 must be(TokenBucket(100, 100, 0, 90))
success1 must be(true)
- val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = 0L, 40)
+ val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = 0L, 40)
bucket2 must be(TokenBucket(100, 100, 0, 50))
success2 must be(true)
- val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 0L, 50)
+ val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 0L, 50)
bucket3 must be(TokenBucket(100, 100, 0, 0))
success3 must be(true)
- val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 0, 1)
+ val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 0, 1)
bucket4 must be(TokenBucket(100, 100, 0, 0))
success4 must be(false)
}
"accurately replenish tokens" in {
- val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 0)
- val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 0)
+ val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 0)
+ val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 0)
bucket1 must be(TokenBucket(100, 100, 0, 0))
success1 must be(true)
- val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 0)
+ val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = halfSecond, 0)
bucket2 must be(TokenBucket(100, 100, halfSecond, 50))
success2 must be(true)
- val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 0)
+ val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 2 * halfSecond, 0)
bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 100))
success3 must be(true)
- val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 0)
+ val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 3 * halfSecond, 0)
bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 100))
success4 must be(true)
}
"accurately interleave replenish and consume" in {
- val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20)
- val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 10)
+ val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 20)
+ val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 10)
bucket1 must be(TokenBucket(100, 100, 0, 10))
success1 must be(true)
- val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 60)
+ val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = halfSecond, 60)
bucket2 must be(TokenBucket(100, 100, halfSecond, 0))
success2 must be(true)
- val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 40)
+ val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 2 * halfSecond, 40)
bucket3 must be(TokenBucket(100, 100, 2 * halfSecond, 10))
success3 must be(true)
- val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 70)
+ val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 3 * halfSecond, 70)
bucket4 must be(TokenBucket(100, 100, 2 * halfSecond, 10))
success4 must be(false)
}
"allow oversized packets through by loaning" in {
- val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, lastSend = 0L, availableTokens = 20)
- val (bucket1, success1) = bucket.tryConsumeTokens(timeOfSend = 0L, 30)
+ val bucket = TokenBucket(capacity = 100, tokensPerSecond = 100, nanoTimeOfLastSend = 0L, availableTokens = 20)
+ val (bucket1, success1) = bucket.tryConsumeTokens(nanoTimeOfSend = 0L, 30)
bucket1 must be(TokenBucket(100, 100, 0, 20))
success1 must be(false)
- val (bucket2, success2) = bucket1.tryConsumeTokens(timeOfSend = halfSecond, 110)
+ val (bucket2, success2) = bucket1.tryConsumeTokens(nanoTimeOfSend = halfSecond, 110)
bucket2 must be(TokenBucket(100, 100, halfSecond, -40))
success2 must be(true)
- val (bucket3, success3) = bucket2.tryConsumeTokens(timeOfSend = 2 * halfSecond, 20)
+ val (bucket3, success3) = bucket2.tryConsumeTokens(nanoTimeOfSend = 2 * halfSecond, 20)
bucket3 must be(TokenBucket(100, 100, halfSecond, -40))
success3 must be(false)
- val (bucket4, success4) = bucket3.tryConsumeTokens(timeOfSend = 3 * halfSecond, 20)
+ val (bucket4, success4) = bucket3.tryConsumeTokens(nanoTimeOfSend = 3 * halfSecond, 20)
bucket4 must be(TokenBucket(100, 100, 3 * halfSecond, 40))
success4 must be(true)
}
Something went wrong with that request. Please try again.