Skip to content

Commit

Permalink
Update sequence impl to tune controller memory consumption (#2387)
Browse files Browse the repository at this point in the history
- switch to scheduleOnce+weakrefs for timeout handling in SequenceActions
- switch SequenceAccounting to store array of ActivationId rather than array of String -- cheaper in memory
-  use better (non-dragging) impl of withTimeout
- use a getAndSet(null) pattern to avoid two copies of responses being alive simultaneously
- refactor top level sequence scheduler to eliminate promises
  • Loading branch information
starpit authored and rabbah committed Jun 20, 2017
1 parent dad7243 commit 7f399a4
Show file tree
Hide file tree
Showing 7 changed files with 531 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import whisk.http.Messages
*
* @param id the activation id, required not null
*/
protected[core] class ActivationId private (private val id: java.util.UUID) extends AnyVal {
protected[whisk] class ActivationId private (private val id: java.util.UUID) extends AnyVal {
def asString = toString
override def toString = id.toString.replaceAll("-", "")
def toJsObject = JsObject("activationId" -> toString.toJson)
Expand Down
3 changes: 2 additions & 1 deletion common/scala/src/main/scala/whisk/http/ErrorResponse.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import whisk.common.TransactionId
import whisk.core.entity.SizeError
import whisk.core.entity.ByteSize
import whisk.core.entity.Exec
import whisk.core.entity.ActivationId

object Messages {
/** Standard message for reporting resource conflicts. */
Expand Down Expand Up @@ -95,7 +96,7 @@ object Messages {
val notAllowedOnBinding = "Operation not permitted on package binding."

/** Error messages for sequence activations. */
val sequenceRetrieveActivationTimeout = "Timeout reached when retrieving activation for sequence component."
def sequenceRetrieveActivationTimeout(id: ActivationId) = s"Timeout reached when retrieving activation $id for sequence component."
val sequenceActivationFailure = "Sequence failed."

/** Error messages for bad requests where parameters do not conform. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,32 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.concurrent.duration.FiniteDuration
import scala.util.Try

import akka.actor.ActorSystem
import akka.pattern.{ after => expire }

object ExecutionContextFactory {

// Future.firstCompletedOf has a memory drag bug
// https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
val p = Promise[T]()
val pref = new java.util.concurrent.atomic.AtomicReference(p)
val completeFirst: Try[T] => Unit = { result: Try[T] =>
val promise = pref.getAndSet(null)
if (promise != null) {
promise.tryComplete(result)
}
}
futures foreach { _ onComplete completeFirst }
p.future
}

implicit class FutureExtensions[T](f: Future[T]) {
def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit system: ActorSystem): Future[T] = {
implicit val ec = system.dispatcher
Future firstCompletedOf Seq(f, expire(timeout, system.scheduler)(Future.failed(msg)))
firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(Future.failed(msg))))
}
}

Expand Down

Large diffs are not rendered by default.

93 changes: 48 additions & 45 deletions tests/src/test/scala/system/basic/WskSequenceTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class WskSequenceTests

behavior of "Wsk Sequence"

it should "invoke a blocking sequence action and invoke the updated sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
it should "invoke a sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val name = "sequence"
val actions = Seq("split", "sort", "head", "cat")
Expand Down Expand Up @@ -109,7 +109,7 @@ class WskSequenceTests
// result of sequence should be identical to previous invocation above
val payload = Map("error" -> JsString("irrelevant error string"), "payload" -> args.mkString("\n").toJson)
val thirdrun = wsk.action.invoke(name, payload)
withActivation(wsk.activation, thirdrun, totalWait = 2 *allowedActionDuration) {
withActivation(wsk.activation, thirdrun, totalWait = 2 * allowedActionDuration) {
activation =>
checkSequenceLogsAndAnnotations(activation, 2) // 2 activations in this sequence
val result = activation.response.result.get
Expand All @@ -118,14 +118,57 @@ class WskSequenceTests
}
}

it should "invoke a sequence with an enclosing sequence action" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val inner_name = "inner_sequence"
val outer_name = "outer_sequence"
val inner_actions = Seq("sort", "head")
val actions = Seq("split") ++ inner_actions ++ Seq("cat")
// create atomic actions
for (actionName <- actions) {
val file = TestUtils.getTestActionFilename(s"$actionName.js")
assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
}
}

// create inner sequence
assetHelper.withCleaner(wsk.action, inner_name) {
val inner_sequence = inner_actions.mkString(",")
(action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
}

// create outer sequence
assetHelper.withCleaner(wsk.action, outer_name) {
val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
(action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
}

val now = "it is now " + new Date()
val args = Array("what time is it?", now)
val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
activation =>
checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
activation.cause shouldBe None // topmost sequence
val result = activation.response.result.get
result.fields.get("payload") shouldBe defined
result.fields.get("length") should not be defined
result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
}
}

/**
* s -> echo, x, echo
* x -> echo
*
* update x -> <limit-1> echo -- should work
* run s -> should stop after <limit> echo
*
* This confirms that a dynamic check on the sequence length holds within the system limit.
* This is different from creating a long sequence up front which will report a length error at create time.
*/
it should "create a sequence, run it, update one of the atomic actions to a sequence and stop executing the outer sequence when limit reached" in withAssetCleaner(wskprops) {
it should "replace atomic component in a sequence that is too long and report invoke error" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val xName = "xSequence"
val sName = "sSequence"
Expand Down Expand Up @@ -176,52 +219,11 @@ class WskSequenceTests
withActivation(wsk.activation, getInnerSeq, totalWait = allowedActionDuration) {
innerSeqActivation =>
innerSeqActivation.logs.get.size shouldBe (limit - 1)
innerSeqActivation.cause shouldBe defined
innerSeqActivation.cause.get shouldBe (activation.activationId)
innerSeqActivation.cause shouldBe Some(activation.activationId)
}
}
}

it should "invoke a blocking sequence action with an enclosing sequence action" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val inner_name = "inner_sequence"
val outer_name = "outer_sequence"
val inner_actions = Seq("sort", "head")
val actions = Seq("split") ++ inner_actions ++ Seq("cat")
// create atomic actions
for (actionName <- actions) {
val file = TestUtils.getTestActionFilename(s"$actionName.js")
assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
}
}

// create inner sequence
assetHelper.withCleaner(wsk.action, inner_name) {
val inner_sequence = inner_actions.mkString(",")
(action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
}

// create outer sequence
assetHelper.withCleaner(wsk.action, outer_name) {
val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
(action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
}

val now = "it is now " + new Date()
val args = Array("what time is it?", now)
val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
activation =>
checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
activation.cause shouldBe None // topmost sequence
val result = activation.response.result.get
result.fields.get("payload") shouldBe defined
result.fields.get("length") should not be defined
result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
}
}

it should "create and run a sequence in a package with parameters" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val sName = "sSequence"
Expand Down Expand Up @@ -294,6 +296,7 @@ class WskSequenceTests
// action params trump package params
checkLogsAtomicAction(0, run, new Regex(String.format(".*key0: value0.*key1a: value1a.*key1b: value2b.*key2a: value2a.*payload: %s", now)))
}

/**
* s -> apperror, echo
* only apperror should run
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.core.controller.actions.test

import java.time.Instant

import scala.concurrent.duration.DurationInt

import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner

import common.WskActorSystem
import spray.json._
import whisk.core.controller.actions.SequenceAccounting
import whisk.core.entity._
import whisk.core.entity.ActivationResponse
import whisk.core.entity.size.SizeInt
import whisk.http.Messages

@RunWith(classOf[JUnitRunner])
class SequenceAccountingTests extends FlatSpec with Matchers with WskActorSystem {

behavior of "sequence accounting"

val okRes1 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1))))
val okRes2 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(2))))
val failedRes = ActivationResponse.applicationError(JsNumber(3))

val okActivation = WhiskActivation(
namespace = EntityPath("ns"),
name = EntityName("a"),
Subject(),
activationId = ActivationId(),
start = Instant.now(),
end = Instant.now(),
response = okRes2,
annotations = Parameters("limits", ActionLimits(
TimeLimit(1.second),
MemoryLimit(128.MB),
LogLimit(1.MB)).toJson),
duration = Some(123))

val notOkActivation = WhiskActivation(
namespace = EntityPath("ns"),
name = EntityName("a"),
Subject(),
activationId = ActivationId(),
start = Instant.now(),
end = Instant.now(),
response = failedRes,
annotations = Parameters("limits", ActionLimits(
TimeLimit(11.second),
MemoryLimit(256.MB),
LogLimit(2.MB)).toJson),
duration = Some(234))

it should "create initial accounting object" in {
val s = SequenceAccounting(2, okRes1)
s.atomicActionCnt shouldBe 2
s.previousResponse.get shouldBe okRes1
s.logs shouldBe empty
s.duration shouldBe 0
s.maxMemory shouldBe None
s.shortcircuit shouldBe false
}

it should "resolve maybe to success and update accounting object" in {
val p = SequenceAccounting(2, okRes1)
val n1 = p.maybe(okActivation, 3, 5)
n1.atomicActionCnt shouldBe 3
n1.previousResponse.get shouldBe okRes2
n1.logs.length shouldBe 1
n1.logs(0) shouldBe okActivation.activationId
n1.duration shouldBe 123
n1.maxMemory shouldBe Some(128)
n1.shortcircuit shouldBe false
}

it should "resolve maybe and enable short circuit" in {
val p = SequenceAccounting(2, okRes1)
val n1 = p.maybe(okActivation, 3, 5)
val n2 = n1.maybe(notOkActivation, 4, 5)
n2.atomicActionCnt shouldBe 4
n2.previousResponse.get shouldBe failedRes
n2.logs.length shouldBe 2
n2.logs(0) shouldBe okActivation.activationId
n2.logs(1) shouldBe notOkActivation.activationId
n2.duration shouldBe (123 + 234)
n2.maxMemory shouldBe Some(256)
n2.shortcircuit shouldBe true
}

it should "record an activation that exceeds allowed limit but also short circuit" in {
val p = SequenceAccounting(2, okRes1)
val n = p.maybe(okActivation, 3, 2)
n.atomicActionCnt shouldBe 3
n.previousResponse.get shouldBe ActivationResponse.applicationError(Messages.sequenceIsTooLong)
n.logs.length shouldBe 1
n.logs(0) shouldBe okActivation.activationId
n.duration shouldBe 123
n.maxMemory shouldBe Some(128)
n.shortcircuit shouldBe true
}

it should "set failed response and short circuit on failure" in {
val p = SequenceAccounting(2, okRes1)
val n = p.maybe(okActivation, 3, 3)
val f = n.fail(failedRes, None)
f.atomicActionCnt shouldBe 3
f.previousResponse.get shouldBe failedRes
f.logs.length shouldBe 1
f.logs(0) shouldBe okActivation.activationId
f.duration shouldBe 123
f.maxMemory shouldBe Some(128)
f.shortcircuit shouldBe true
}

it should "resolve max memory" in {
SequenceAccounting.maxMemory(None, None) shouldBe None
SequenceAccounting.maxMemory(None, Some(1)) shouldBe Some(1)
SequenceAccounting.maxMemory(Some(1), None) shouldBe Some(1)
SequenceAccounting.maxMemory(Some(1), Some(2)) shouldBe Some(2)
SequenceAccounting.maxMemory(Some(2), Some(1)) shouldBe Some(2)
SequenceAccounting.maxMemory(Some(2), Some(2)) shouldBe Some(2)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2015-2016 IBM Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package whisk.utils.test

import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.DurationInt

import org.junit.runner.RunWith
import org.scalatest.FlatSpec
import org.scalatest.Matchers
import org.scalatest.junit.JUnitRunner

import common.WskActorSystem
import whisk.utils.ExecutionContextFactory.FutureExtensions

@RunWith(classOf[JUnitRunner])
class ExecutionContextFactoryTests extends FlatSpec with Matchers with WskActorSystem {

behavior of "future extensions"

it should "take first to complete" in {
val f1 = Future.successful({}).withTimeout(500.millis, new Throwable("error"))
Await.result(f1, 1.second) shouldBe ({})

val failure = new Throwable("error")
val f2 = Future { Thread.sleep(1.second.toMillis) }.withTimeout(500.millis, failure)
a[Throwable] shouldBe thrownBy { Await.result(f2, 1.seconds) }
}
}

0 comments on commit 7f399a4

Please sign in to comment.