Skip to content

Commit

Permalink
When exceeding limit, replace response and make sure to charge for ac…
Browse files Browse the repository at this point in the history
…tivation.
  • Loading branch information
rabbah committed Jun 19, 2017
1 parent 408ddf8 commit 62d65d1
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -388,9 +388,9 @@ protected[actions] case class SequenceAccounting(
newCnt = newCnt,
shortcircuit = shortcircuit,
incrDuration = activation.duration,
previousResponse = activation.response,
previousActivationId = activation.activationId,
previousMemoryLimit = activation.annotations.get("limits") map {
newResponse = activation.response,
newActivationId = activation.activationId,
newMemoryLimit = activation.annotations.get("limits") map {
limitsAnnotation => // we have a limits annotation
limitsAnnotation.asJsObject.getFields("memory") match {
case Seq(JsNumber(memory)) => Some(memory.toInt) // we have a numerical "memory" field in the "limits" annotation
Expand All @@ -400,6 +400,7 @@ protected[actions] case class SequenceAccounting(

/** the previous activation failed */
def fail(failureResponse: ActivationResponse) = {
require(!failureResponse.isSuccess)
copy(previousResponse = new AtomicReference(failureResponse), shortcircuit = true)
}

Expand All @@ -413,17 +414,24 @@ protected[actions] case class SequenceAccounting(
val outputPayload = activation.response.result.map(_.asJsObject)
val payloadContent = outputPayload getOrElse JsObject.empty
val errorField = payloadContent.fields.get(ActivationResponse.ERROR_FIELD)
val withinSeqLimit = newCnt <= maxSequenceCnt

if (newCnt > maxSequenceCnt) {
// oops, the dynamic count of actions exceeds the threshold
fail(ActivationResponse.applicationError(sequenceIsTooLong))
} else if (errorField.isEmpty) {
// all good with this action invocation!
if (withinSeqLimit && errorField.isEmpty) {
// all good with this action invocation
success(activation, newCnt)
} else {
val nextActivation = if (!withinSeqLimit) {
// no error in the activation but the dynamic count of actions exceeds the threshold
val newResponse = ActivationResponse.applicationError(sequenceIsTooLong)
activation.copy(response = newResponse)
} else {
assert(errorField.isDefined)
activation
}

// there is an error field in the activation response. here, we treat this like success,
// in the sense of tallying up the accounting fields, but terminate the sequence early
success(activation, newCnt, shortcircuit = true)
success(nextActivation, newCnt, shortcircuit = true)
}
case Left(response) =>
// utter failure somewhere downstream
Expand All @@ -439,31 +447,34 @@ protected[actions] case class SequenceAccounting(
* - one to initialize things
*/
protected[actions] object SequenceAccounting {

def maxMemory(prevMemoryLimit: Option[Int], newMemoryLimit: Option[Int]): Option[Int] = {
prevMemoryLimit.map { prevMax =>
newMemoryLimit
.map(currentMax => Some(Math.max(prevMax, currentMax)))
.getOrElse(prevMemoryLimit)
}.getOrElse(newMemoryLimit)
}

// constructor for successful invocations, or error'ing ones (where shortcircuit = true)
def apply(
prev: SequenceAccounting,
newCnt: Int,
incrDuration: Option[Long],
previousResponse: ActivationResponse,
previousActivationId: ActivationId,
previousMemoryLimit: Option[Int],
newResponse: ActivationResponse,
newActivationId: ActivationId,
newMemoryLimit: Option[Int],
shortcircuit: Boolean): SequenceAccounting = {

// compute the new max memory
val newMaxMemory = prev.maxMemory map {
currentMax => // currentMax is Some
previousMemoryLimit map {
previousLimit => // previousMemoryLimit is Some
Some(Math.max(currentMax, previousLimit)) // so take the max of them
} getOrElse { prev.maxMemory } // currentMax is Some, previousMemoryLimit is None
} getOrElse { previousMemoryLimit } // currentMax is None
val newMaxMemory = maxMemory(prev.maxMemory, newMemoryLimit)

// append log entry
prev.logs += previousActivationId
prev.logs += newActivationId

SequenceAccounting(
atomicActionCnt = newCnt,
previousResponse = new AtomicReference(previousResponse),
previousResponse = new AtomicReference(newResponse),
logs = prev.logs,
duration = incrDuration map { prev.duration + _ } getOrElse { prev.duration },
maxMemory = newMaxMemory,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.core.controller.actions.test

import java.time.Instant

import scala.concurrent.duration.DurationInt

import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner

import common.WskActorSystem
import spray.json._
import whisk.core.controller.actions.SequenceAccounting
import whisk.core.entity._
import whisk.core.entity.ActivationResponse
import whisk.core.entity.size.SizeInt
import whisk.http.Messages

@RunWith(classOf[JUnitRunner])
class SequenceAccountingTests extends FlatSpec with Matchers with WskActorSystem {

behavior of "sequence accounting"

val okRes1 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1))))
val okRes2 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(2))))
val failedRes = ActivationResponse.applicationError(JsNumber(3))

val okActivation = WhiskActivation(
namespace = EntityPath("ns"),
name = EntityName("a"),
Subject(),
activationId = ActivationId(),
start = Instant.now(),
end = Instant.now(),
response = okRes2,
annotations = Parameters("limits", ActionLimits(
TimeLimit(1.second),
MemoryLimit(128.MB),
LogLimit(1.MB)).toJson),
duration = Some(123))

val notOkActivation = WhiskActivation(
namespace = EntityPath("ns"),
name = EntityName("a"),
Subject(),
activationId = ActivationId(),
start = Instant.now(),
end = Instant.now(),
response = failedRes,
annotations = Parameters("limits", ActionLimits(
TimeLimit(11.second),
MemoryLimit(256.MB),
LogLimit(2.MB)).toJson),
duration = Some(234))

it should "create initial accounting object" in {
val s = SequenceAccounting(2, okRes1)
s.atomicActionCnt shouldBe 2
s.previousResponse.get shouldBe okRes1
s.logs shouldBe empty
s.duration shouldBe 0
s.maxMemory shouldBe None
s.shortcircuit shouldBe false
}

it should "resolve maybe to success and update accounting object" in {
val p = SequenceAccounting(2, okRes1)
val n1 = p.maybe(Right(okActivation), 3, 5)
n1.atomicActionCnt shouldBe 3
n1.previousResponse.get shouldBe okRes2
n1.logs.length shouldBe 1
n1.logs(0) shouldBe okActivation.activationId
n1.duration shouldBe 123
n1.maxMemory shouldBe Some(128)
n1.shortcircuit shouldBe false
}

it should "resolve maybe and enable short circuit" in {
val p = SequenceAccounting(2, okRes1)
val n1 = p.maybe(Right(okActivation), 3, 5)
val n2 = n1.maybe(Right(notOkActivation), 4, 5)
n2.atomicActionCnt shouldBe 4
n2.previousResponse.get shouldBe failedRes
n2.logs.length shouldBe 2
n2.logs(0) shouldBe okActivation.activationId
n2.logs(1) shouldBe notOkActivation.activationId
n2.duration shouldBe (123 + 234)
n2.maxMemory shouldBe Some(256)
n2.shortcircuit shouldBe true
}

it should "record an activation that exceeds allowed limit but also short circuit" in {
val p = SequenceAccounting(2, okRes1)
val n = p.maybe(Right(okActivation), 3, 2)
n.atomicActionCnt shouldBe 3
n.previousResponse.get shouldBe ActivationResponse.applicationError(Messages.sequenceIsTooLong)
n.logs.length shouldBe 1
n.logs(0) shouldBe okActivation.activationId
n.duration shouldBe 123
n.maxMemory shouldBe Some(128)
n.shortcircuit shouldBe true
}

it should "set failed response and short circuit on failure" in {
val p = SequenceAccounting(2, okRes1)
val n = p.maybe(Right(okActivation), 3, 3)
val f = n.fail(failedRes)
f.atomicActionCnt shouldBe 3
f.previousResponse.get shouldBe failedRes
f.logs.length shouldBe 1
f.logs(0) shouldBe okActivation.activationId
f.duration shouldBe 123
f.maxMemory shouldBe Some(128)
f.shortcircuit shouldBe true
}

it should "resolve max memory" in {
SequenceAccounting.maxMemory(None, None) shouldBe None
SequenceAccounting.maxMemory(None, Some(1)) shouldBe Some(1)
SequenceAccounting.maxMemory(Some(1), None) shouldBe Some(1)
SequenceAccounting.maxMemory(Some(1), Some(2)) shouldBe Some(2)
SequenceAccounting.maxMemory(Some(2), Some(1)) shouldBe Some(2)
SequenceAccounting.maxMemory(Some(2), Some(2)) shouldBe Some(2)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.utils.test

import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner

import common.WskActorSystem
import whisk.utils.ExecutionContextFactory.FutureExtensions

@RunWith(classOf[JUnitRunner])
class ExecutionContextFactoryTests extends FlatSpec with Matchers with WskActorSystem {

behavior of "future extensions"

it should "take first to complete" in {
val f1 = Future.successful({}).withTimeout(500.millis, new Throwable("error"))
Await.result(f1, 1.second) shouldBe ({})

val failure = new Throwable("error")
val f2 = Future { Thread.sleep(1.second.toMillis) }.withTimeout(500.millis, failure)
a[Throwable] shouldBe thrownBy { Await.result(f2, 1.seconds) }
}
}

0 comments on commit 62d65d1

Please sign in to comment.