Skip to content

Commit

Permalink
Update sequence impl to tune controller memory consumption (apache#2387)
Browse files Browse the repository at this point in the history
- switch to scheduleOnce+weakrefs for timeout handling in SequenceActions
- switch SequenceAccounting to store array of ActivationId rather than array of String -- cheaper in memory
-  use better (non-dragging) impl of withTimeout
- use a getAndSet(null) pattern to avoid two copies of responses being alive simultaneously
- refactor top level sequence scheduler to eliminate promises
  • Loading branch information
starpit authored and rabbah committed Jun 20, 2017
1 parent ff26225 commit 768cc48
Showing 1 changed file with 48 additions and 45 deletions.
93 changes: 48 additions & 45 deletions tests/src/test/scala/system/basic/WskSequenceTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class WskSequenceTests

behavior of "Wsk Sequence"

it should "invoke a blocking sequence action and invoke the updated sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
it should "invoke a sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val name = "sequence"
val actions = Seq("split", "sort", "head", "cat")
Expand Down Expand Up @@ -109,7 +109,7 @@ class WskSequenceTests
// result of sequence should be identical to previous invocation above
val payload = Map("error" -> JsString("irrelevant error string"), "payload" -> args.mkString("\n").toJson)
val thirdrun = wsk.action.invoke(name, payload)
withActivation(wsk.activation, thirdrun, totalWait = 2 *allowedActionDuration) {
withActivation(wsk.activation, thirdrun, totalWait = 2 * allowedActionDuration) {
activation =>
checkSequenceLogsAndAnnotations(activation, 2) // 2 activations in this sequence
val result = activation.response.result.get
Expand All @@ -118,14 +118,57 @@ class WskSequenceTests
}
}

it should "invoke a sequence with an enclosing sequence action" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val inner_name = "inner_sequence"
val outer_name = "outer_sequence"
val inner_actions = Seq("sort", "head")
val actions = Seq("split") ++ inner_actions ++ Seq("cat")
// create atomic actions
for (actionName <- actions) {
val file = TestUtils.getTestActionFilename(s"$actionName.js")
assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
}
}

// create inner sequence
assetHelper.withCleaner(wsk.action, inner_name) {
val inner_sequence = inner_actions.mkString(",")
(action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
}

// create outer sequence
assetHelper.withCleaner(wsk.action, outer_name) {
val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
(action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
}

val now = "it is now " + new Date()
val args = Array("what time is it?", now)
val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
activation =>
checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
activation.cause shouldBe None // topmost sequence
val result = activation.response.result.get
result.fields.get("payload") shouldBe defined
result.fields.get("length") should not be defined
result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
}
}

/**
* s -> echo, x, echo
* x -> echo
*
* update x -> <limit-1> echo -- should work
* run s -> should stop after <limit> echo
*
* This confirms that a dynamic check on the sequence length holds within the system limit.
* This is different from creating a long sequence up front which will report a length error at create time.
*/
it should "create a sequence, run it, update one of the atomic actions to a sequence and stop executing the outer sequence when limit reached" in withAssetCleaner(wskprops) {
it should "replace atomic component in a sequence that is too long and report invoke error" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val xName = "xSequence"
val sName = "sSequence"
Expand Down Expand Up @@ -176,52 +219,11 @@ class WskSequenceTests
withActivation(wsk.activation, getInnerSeq, totalWait = allowedActionDuration) {
innerSeqActivation =>
innerSeqActivation.logs.get.size shouldBe (limit - 1)
innerSeqActivation.cause shouldBe defined
innerSeqActivation.cause.get shouldBe (activation.activationId)
innerSeqActivation.cause shouldBe Some(activation.activationId)
}
}
}

it should "invoke a blocking sequence action with an enclosing sequence action" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val inner_name = "inner_sequence"
val outer_name = "outer_sequence"
val inner_actions = Seq("sort", "head")
val actions = Seq("split") ++ inner_actions ++ Seq("cat")
// create atomic actions
for (actionName <- actions) {
val file = TestUtils.getTestActionFilename(s"$actionName.js")
assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration))
}
}

// create inner sequence
assetHelper.withCleaner(wsk.action, inner_name) {
val inner_sequence = inner_actions.mkString(",")
(action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence"))
}

// create outer sequence
assetHelper.withCleaner(wsk.action, outer_name) {
val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",")
(action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence"))
}

val now = "it is now " + new Date()
val args = Array("what time is it?", now)
val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson))
withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) {
activation =>
checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence
activation.cause shouldBe None // topmost sequence
val result = activation.response.result.get
result.fields.get("payload") shouldBe defined
result.fields.get("length") should not be defined
result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson)))
}
}

it should "create and run a sequence in a package with parameters" in withAssetCleaner(wskprops) {
(wp, assetHelper) =>
val sName = "sSequence"
Expand Down Expand Up @@ -294,6 +296,7 @@ class WskSequenceTests
// action params trump package params
checkLogsAtomicAction(0, run, new Regex(String.format(".*key0: value0.*key1a: value1a.*key1b: value2b.*key2a: value2a.*payload: %s", now)))
}

/**
* s -> apperror, echo
* only apperror should run
Expand Down

0 comments on commit 768cc48

Please sign in to comment.