From 7f399a43e561765a23fb377e7747ca4c8c397ea6 Mon Sep 17 00:00:00 2001 From: Nick Mitchell Date: Tue, 20 Jun 2017 12:16:36 -0400 Subject: [PATCH] Update sequence impl to tune controller memory consumption (#2387) - switch to scheduleOnce+weakrefs for timeout handling in SequenceActions - switch SequenceAccounting to store array of ActivationId rather than array of String -- cheaper in memory - use better (non-dragging) impl of withTimeout - use a getAndSet(null) pattern to avoid two copies of responses being alive simultaneously - refactor top level sequence scheduler to eliminate promises --- .../whisk/core/entity/ActivationId.scala | 2 +- .../main/scala/whisk/http/ErrorResponse.scala | 3 +- .../whisk/utils/ExecutionContextFactory.scala | 18 +- .../controller/actions/SequenceActions.scala | 498 ++++++++++-------- .../scala/system/basic/WskSequenceTests.scala | 93 ++-- .../test/SequenceAccountingTests.scala | 141 +++++ .../test/ExecutionContextFactoryTests.scala | 43 ++ 7 files changed, 531 insertions(+), 267 deletions(-) create mode 100644 tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala create mode 100644 tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala diff --git a/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala b/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala index 374320cb310..4a71681e846 100644 --- a/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala +++ b/common/scala/src/main/scala/whisk/core/entity/ActivationId.scala @@ -39,7 +39,7 @@ import whisk.http.Messages * * @param id the activation id, required not null */ -protected[core] class ActivationId private (private val id: java.util.UUID) extends AnyVal { +protected[whisk] class ActivationId private (private val id: java.util.UUID) extends AnyVal { def asString = toString override def toString = id.toString.replaceAll("-", "") def toJsObject = JsObject("activationId" -> toString.toJson) diff --git a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala index d89d749fa29..547618d6329 100644 --- a/common/scala/src/main/scala/whisk/http/ErrorResponse.scala +++ b/common/scala/src/main/scala/whisk/http/ErrorResponse.scala @@ -35,6 +35,7 @@ import whisk.common.TransactionId import whisk.core.entity.SizeError import whisk.core.entity.ByteSize import whisk.core.entity.Exec +import whisk.core.entity.ActivationId object Messages { /** Standard message for reporting resource conflicts. */ @@ -95,7 +96,7 @@ object Messages { val notAllowedOnBinding = "Operation not permitted on package binding." /** Error messages for sequence activations. */ - val sequenceRetrieveActivationTimeout = "Timeout reached when retrieving activation for sequence component." + def sequenceRetrieveActivationTimeout(id: ActivationId) = s"Timeout reached when retrieving activation $id for sequence component." val sequenceActivationFailure = "Sequence failed." /** Error messages for bad requests where parameters do not conform. */ diff --git a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala index bdbd00a42a2..5aa465c64b5 100644 --- a/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala +++ b/common/scala/src/main/scala/whisk/utils/ExecutionContextFactory.scala @@ -22,16 +22,32 @@ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration.FiniteDuration +import scala.util.Try import akka.actor.ActorSystem import akka.pattern.{ after => expire } object ExecutionContextFactory { + // Future.firstCompletedOf has a memory drag bug + // https://stackoverflow.com/questions/36420697/about-future-firstcompletedof-and-garbage-collect-mechanism + def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = { + val p = Promise[T]() + val pref = new java.util.concurrent.atomic.AtomicReference(p) + val completeFirst: Try[T] => Unit = { result: Try[T] => + val promise = pref.getAndSet(null) + if (promise != null) { + promise.tryComplete(result) + } + } + futures foreach { _ onComplete completeFirst } + p.future + } + implicit class FutureExtensions[T](f: Future[T]) { def withTimeout(timeout: FiniteDuration, msg: => Throwable)(implicit system: ActorSystem): Future[T] = { implicit val ec = system.dispatcher - Future firstCompletedOf Seq(f, expire(timeout, system.scheduler)(Future.failed(msg))) + firstCompletedOf(Seq(f, expire(timeout, system.scheduler)(Future.failed(msg)))) } } diff --git a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala index 4c4ccbe6572..27dac99b1a9 100644 --- a/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala +++ b/core/controller/src/main/scala/whisk/core/controller/actions/SequenceActions.scala @@ -18,16 +18,15 @@ package whisk.core.controller.actions import java.time.Clock import java.time.Instant +import java.util.concurrent.atomic.AtomicReference -import scala.Left -import scala.Right +import scala.collection._ import scala.concurrent.ExecutionContext import scala.concurrent.Future -import scala.concurrent.Promise import scala.concurrent.duration._ +import scala.language.postfixOps import scala.util.Failure import scala.util.Success -import scala.util.Try import akka.actor.ActorSystem import spray.json._ @@ -98,71 +97,59 @@ protected[actions] trait SequenceActions { // create new activation id that corresponds to the sequence val seqActivationId = activationIdFactory.make() logging.info(this, s"invoking sequence $action topmost $topmost activationid '$seqActivationId'") + val start = Instant.now(Clock.systemUTC()) - val seqActivationPromise = Promise[Option[WhiskActivation]] - // the cause for the component activations is the current sequence - val futureWskActivations = invokeSequenceComponents(user, action, seqActivationId, payload, components, cause = Some(seqActivationId), atomicActionsCount) - val futureSeqResult = Future.sequence(futureWskActivations) - val response: Future[(ActivationId, Option[WhiskActivation], Int)] = - if (topmost) { // need to deal with blocking and closing connection - if (blocking) { - val timeout = maxWaitForBlockingActivation + blockingInvokeGrace - val futureSeqResultTimeout = futureSeqResult withTimeout (timeout, new BlockingInvokeTimeout(seqActivationId)) - // if the future fails with a timeout, the failure is dealt with at the caller level - futureSeqResultTimeout map { wskActivationTuples => - val wskActivationEithers = wskActivationTuples.map(_._1) - // the execution of the sequence was successful, return the result - val end = Instant.now(Clock.systemUTC()) - val seqActivation = Some(makeSequenceActivation(user, action, seqActivationId, wskActivationEithers, topmost, cause, start, end)) - val atomicActionCnt = wskActivationTuples.last._2 - (seqActivationId, seqActivation, atomicActionCnt) - } andThen { - case Success((_, seqActivation, _)) => seqActivationPromise.success(seqActivation) - case Failure(t) => seqActivationPromise.success(None) - } - } else { - // non-blocking sequence execution, return activation id - Future.successful((seqActivationId, None, 0)) andThen { - case _ => seqActivationPromise.success(None) - } - } + val futureSeqResult = { + completeSequenceActivation( + seqActivationId, + // the cause for the component activations is the current sequence + invokeSequenceComponents(user, action, seqActivationId, payload, components, cause = Some(seqActivationId), atomicActionsCount), + user, action, topmost, start, cause) + } + + if (topmost) { // need to deal with blocking and closing connection + if (blocking) { + logging.info(this, s"invoke sequence blocking topmost!") + val timeout = maxWaitForBlockingActivation + blockingInvokeGrace + // if the future fails with a timeout, the failure is dealt with at the caller level + futureSeqResult.withTimeout(timeout, new BlockingInvokeTimeout(seqActivationId)) } else { - // not topmost, no need to worry about terminating incoming request - futureSeqResult map { wskActivationTuples => - val wskActivationEithers = wskActivationTuples.map(_._1) - // all activations are successful, the result of the sequence is the result of the last activation - val end = Instant.now(Clock.systemUTC()) - val seqActivation = Some(makeSequenceActivation(user, action, seqActivationId, wskActivationEithers, topmost, cause, start, end)) - val atomicActionCnt = wskActivationTuples.last._2 - (seqActivationId, seqActivation, atomicActionCnt) - } andThen { - case Success((_, seqActivation, _)) => seqActivationPromise.success(seqActivation) - case Failure(t) => seqActivationPromise.success(None) - } + // non-blocking sequence execution, return activation id + Future.successful((seqActivationId, None, 0)) } + } else { + // not topmost, no need to worry about terminating incoming request + // Note: the future for the sequence result recovers from all throwable failures + futureSeqResult + } + } - // store result of sequence execution - // if seqActivation is defined, use it; otherwise create it (e.g., for non-blocking activations) - // the execution can reach here without a seqActivation due to non-blocking activations OR blocking activations that reach the blocking invoke timeout - // futureSeqResult should always be successful, if failed, there is an error - futureSeqResult flatMap { tuples => seqActivationPromise.future map { (tuples, _) } } onComplete { - case Success((wskActivationTuples, seqActivation)) => - // all activations were successful - val activation = seqActivation getOrElse { - val wskActivationEithers = wskActivationTuples.map(_._1) - val end = Instant.now(Clock.systemUTC()) - // the response of the sequence is the response of the very last activation - makeSequenceActivation(user, action, seqActivationId, wskActivationEithers, topmost, cause, start, end) - } - storeSequenceActivation(activation) - case Failure(t: Throwable) => - // consider this whisk error - // TODO shall we attempt storing the activation if it exists or even inspect the futures? - // this should be a pretty serious whisk errror if it gets here + /** + * Creates an activation for the sequence and writes it back to the datastore. + */ + private def completeSequenceActivation( + seqActivationId: ActivationId, + futureSeqResult: Future[SequenceAccounting], + user: Identity, + action: WhiskAction, + topmost: Boolean, + start: Instant, + cause: Option[ActivationId])( + implicit transid: TransactionId): Future[(ActivationId, Some[WhiskActivation], Int)] = { + // not topmost, no need to worry about terminating incoming request + // Note: the future for the sequence result recovers from all throwable failures + futureSeqResult.map { accounting => + // sequence terminated, the result of the sequence is the result of the last completed activation + val end = Instant.now(Clock.systemUTC()) + val seqActivation = makeSequenceActivation(user, action, seqActivationId, accounting, topmost, cause, start, end) + (seqActivationId, Some(seqActivation), accounting.atomicActionCnt) + }.andThen { + case Success((_, Some(seqActivation), _)) => storeSequenceActivation(seqActivation) + case Failure(t) => + // This should never happen; in this case, there is no activation record created or stored: + // should there be? logging.error(this, s"Sequence activation failed: ${t.getMessage}") } - - response } /** @@ -172,7 +159,7 @@ protected[actions] trait SequenceActions { logging.info(this, s"recording activation '${activation.activationId}'") WhiskActivation.put(activationStore, activation) onComplete { case Success(id) => logging.info(this, s"recorded activation") - case Failure(t) => logging.error(this, s"failed to record activation") + case Failure(t) => logging.error(this, s"failed to record activation ${activation.activationId} with error ${t.getLocalizedMessage}") } } @@ -183,49 +170,20 @@ protected[actions] trait SequenceActions { user: Identity, action: WhiskAction, activationId: ActivationId, - wskActivationEithers: Vector[Either[ActivationResponse, WhiskActivation]], + accounting: SequenceAccounting, topmost: Boolean, cause: Option[ActivationId], start: Instant, end: Instant): WhiskActivation = { - // extract all successful activations from the vector of activation eithers - // the vector is either all rights, all lefts, or some rights followed by some lefts (no interleaving) - val (right, left) = wskActivationEithers.span(_.isRight) - val wskActivations = right.map(_.right.get) - - // the activation response is either the first left if it exists or the response of the last successful activation - val activationResponse = if (left.length == 0) { - wskActivations.last.response - } else { - left.head.left.get - } - - // compose logs - val logs = ActivationLogs(wskActivations map { - activation => activation.activationId.toString - }) - - // compute duration - val duration = (wskActivations map { activation => - activation.duration getOrElse { - logging.error(this, s"duration for $activation is not defined") - activation.end.toEpochMilli - activation.start.toEpochMilli - } - }).sum - // compute max memory - val maxMemory = Try { - val memoryLimits = wskActivations map { activation => - val limits = ActionLimits.serdes.read(activation.annotations.get("limits").get) - limits.memory.megabytes - } - memoryLimits.max.MB - } - - val sequenceLimits = maxMemory map { - mb => ActionLimits(action.limits.timeout, MemoryLimit(mb), action.limits.logs) - } + val sequenceLimits = accounting.maxMemory map { + maxMemoryAcrossActionsInSequence => + Parameters("limits", ActionLimits( + action.limits.timeout, + MemoryLimit(maxMemoryAcrossActionsInSequence MB), + action.limits.logs).toJson) + } getOrElse (Parameters()) // set causedBy if not topmost sequence val causedBy = if (!topmost) { @@ -243,16 +201,16 @@ protected[actions] trait SequenceActions { start = start, end = end, cause = if (topmost) None else cause, // propagate the cause for inner sequences, but undefined for topmost - response = activationResponse, - logs = logs, + response = accounting.previousResponse.getAndSet(null), // getAndSet(null) drops reference to the activation result + logs = accounting.finalLogs, version = action.version, publish = false, annotations = Parameters("topmost", JsBoolean(topmost)) ++ Parameters("path", action.fullyQualifiedName(false).toString) ++ Parameters("kind", "sequence") ++ causedBy ++ - sequenceLimits.map(l => Parameters("limits", l.toJson)).getOrElse(Parameters()), - duration = Some(duration)) + sequenceLimits, + duration = Some(accounting.duration)) } /** @@ -264,96 +222,62 @@ protected[actions] trait SequenceActions { * @param user the user invoking the sequence * @param seqAction the sequence invoked * @param seqActivationId the id of the sequence - * @param payload the payload passed to the first component in the sequence + * @param inputPayload the payload passed to the first component in the sequence * @param components the components in the sequence * @param cause the activation id of the sequence that lead to invoking this sequence or None if this sequence is topmost * @param atomicActionCnt the dynamic atomic action count observed so far since the start of the execution of the topmost sequence - * @return a vector of successful futures; each element contains a tuple with - * 1. an either with activation(right) or activation response in case of error (left) - * 2. the dynamic atomic action count after executing the components + * @return a future which resolves with the accounting for a sequence, including the last result, duration, and activation ids */ private def invokeSequenceComponents( user: Identity, seqAction: WhiskAction, seqActivationId: ActivationId, - payload: Option[JsObject], + inputPayload: Option[JsObject], components: Vector[FullyQualifiedEntityName], cause: Option[ActivationId], atomicActionCnt: Int)( - implicit transid: TransactionId): Vector[Future[(Either[ActivationResponse, WhiskActivation], Int)]] = { - logging.info(this, s"invoke sequence $seqAction ($seqActivationId) with components $components") - - // first retrieve the information/entities on all actions - // do not wait to successfully retrieve all the actions before starting the execution - // start execution of the first action while potentially still retrieving entities - // Note: the execution starts even if one of the futures retrieving an entity may fail - // first components need to be resolved given any package bindings and the params need to be merged - // NOTE: OLD-STYLE sequences may have default namespace in the names of the components, resolve default namespace first - val resolvedFutureActions = resolveDefaultNamespace(components, user) map { c => WhiskAction.resolveActionAndMergeParameters(entityStore, c) } - - // "scan" the wskActions to execute them in blocking fashion - // use scanLeft instead of foldLeft as we need the intermediate results - // TODO: double-check the package param policy - // env are the parameters for the package that the sequence is in; throw them away, not used in the sequence execution - // create a "fake" WhiskActivation to hold the payload of the sequence to init the scanLeft - val fakeStart = Instant.now() - val fakeEnd = Instant.now() - val fakeResponse = ActivationResponse.payloadPlaceholder(payload) - - // NOTE: the init value is a fake (unused) activation to bootstrap the invocations of actions - val initFakeWhiskActivation: Future[(Either[ActivationResponse, WhiskActivation], Int, Boolean)] = Future successful { - // use boolean in tuple to indicate first/incoming payload - (Right(WhiskActivation(seqAction.namespace, seqAction.name, user.subject, seqActivationId, fakeStart, fakeEnd, response = fakeResponse, duration = None)), atomicActionCnt, true) + implicit transid: TransactionId): Future[SequenceAccounting] = { + + // For each action in the sequence, fetch any of its associated parameters (including package or binding). + // We do this for all of the actions in the sequence even though it may be short circuited. This is to + // hide the latency of the fetches from the datastore and the parameter merging that has to occur. It + // may be desirable in the future to selectively speculate over a smaller number of components rather than + // the entire sequence. + // + // This action/parameter resolution is done in futures; the execution starts as soon as the first component + // is resolved. + val resolvedFutureActions = resolveDefaultNamespace(components, user) map { + c => WhiskAction.resolveActionAndMergeParameters(entityStore, c) + } + + // this holds the initial value of the accounting structure, including the input boxed as an ActivationResponse + val initialAccounting = Future.successful { + SequenceAccounting(atomicActionCnt, ActivationResponse.payloadPlaceholder(inputPayload)) } - // seqComponentWskActivationFutures contains a fake activation on the first position in the vector; the rest of the vector is the result of each component execution/activation - val seqComponentWskActivationFutures = resolvedFutureActions.scanLeft(initFakeWhiskActivation) { - (futureActivationAtomicCntTuple, futureAction) => - futureAction flatMap { - action => - futureActivationAtomicCntTuple flatMap { - case (activationEither, atomicActionCount, first) => - activationEither match { - case Right(activation) => - val payload = activation.response.result.map(_.asJsObject) - // first check conditions on payload that may lead to interrupting the execution of the sequence - val payloadContent = payload getOrElse JsObject.empty - val errorFields = payloadContent.getFields(ActivationResponse.ERROR_FIELD) - // short-circuit the execution of the sequence iff the payload contains an error field and is the result of an action return, not the initial payload - val errorShortcircuit = !errorFields.isEmpty && !first - if (!errorShortcircuit) { - // second check the atomic action count for sequence action limit) - if (atomicActionCount >= actionSequenceLimit) { - val activationResponse = ActivationResponse.applicationError(s"$sequenceIsTooLong") - Future.successful(Left(activationResponse), atomicActionCount, false) // dynamic action count and first don't really matter anymore - } else { - val compResultFuture : Future[(Either[ActivationResponse, WhiskActivation], Int)] = invokeSeqOneComponent(user, action, payload, cause, atomicActionCount) - compResultFuture map { - activationDynamicCountPair => (activationDynamicCountPair._1, activationDynamicCountPair._2, false) // it's not first payload anymore - } - } - } else { - // there is an error field, terminate sequence early - // propagate the activation response - Future.successful(Left(activation.response), atomicActionCount, false) // dynamic action count and first don't really matter anymore - } - case Left(activationResponse) => - // the sequence is interrupted, no more processing - Future.successful(Left(activationResponse), 0, false) // dynamic action count and first do not matter from now on - } + // execute the actions in sequential blocking fashion + resolvedFutureActions.foldLeft(initialAccounting) { + (accountingFuture, futureAction) => + accountingFuture.flatMap { accounting => + if (accounting.atomicActionCnt < actionSequenceLimit) { + invokeNextAction(user, futureAction, accounting, cause).flatMap { accounting => + if (!accounting.shortcircuit) { + Future.successful(accounting) + } else { + // this is to short circuit the fold + Future.failed(FailedSequenceActivation(accounting)) // terminates the fold + } } - } recover { - // check any failure here and generate an activation response such that this method always returns a vector of successful futures - case t: Throwable => - // consider this failure a whisk error - val activationResponse = ActivationResponse.whiskError(sequenceActivationFailure) - (Left(activationResponse), 0, false) + } else { + val updatedAccount = accounting.fail(ActivationResponse.applicationError(sequenceIsTooLong), None) + Future.failed(FailedSequenceActivation(updatedAccount)) // terminates the fold + } } - } - // drop the first future which contains the init value from scanLeft and project the first two fields from the tuples - // the third one was used to treat error property differently for first action vs the rest of the actions in the sequence (not useful past this point) - seqComponentWskActivationFutures.drop(1) map { - tupleFuture => tupleFuture map { tuple => (tuple._1, tuple._2) } + }.recoverWith { + // turn the failed accounting back to success; this is the only possible failure + // since all throwables are recovered with a failed accounting instance and this is + // in turned boxed to FailedSequenceActivation + case FailedSequenceActivation(accounting) => Future.successful(accounting) } } @@ -365,55 +289,191 @@ protected[actions] trait SequenceActions { * * The method distinguishes between invoking a sequence or an atomic action. * @param user the user executing the sequence - * @param action the action to be invoked - * @param payload the payload for the action - * @param cause the activation id of the first sequence containing this action - * @param atomicActionCount the number of activations - * @return future with the result of the invocation and the dynamic atomic action count so far + * @param futureAction the future which fetches the action to be invoked from the db + * @param accounting the state of the sequence activation, contains the dynamic activation count, logs and payload for the next action + * @param cause the activation id of the first sequence containing this activations + * @return a future which resolves with updated accounting for a sequence, including the last result, duration, and activation ids */ - private def invokeSeqOneComponent(user: Identity, action: WhiskAction, payload: Option[JsObject], cause: Option[ActivationId], atomicActionCount: Int)( - implicit transid: TransactionId): Future[(Either[ActivationResponse, WhiskActivation], Int)] = { - // invoke the action by calling the right method depending on whether it's an atomic action or a sequence - // the tuple contains activationId, wskActivation, atomicActionCount (up till this point in execution) - val futureWhiskActivationTuple = action.exec match { - case SequenceExec(components) => - // invoke a sequence - logging.info(this, s"sequence invoking an enclosed sequence $action") - // call invokeSequence to invoke the inner sequence - // true for blocking; false for topmost - invokeSequence(user, action, payload, blocking = true, topmost = false, components, cause, atomicActionCount) map { - case (activationId, wskActivation, seqAtomicActionCnt) => - (activationId, wskActivation, seqAtomicActionCnt + atomicActionCount) - } - case _ => - // this is an invoke for an atomic action - logging.info(this, s"sequence invoking an enclosed atomic action $action") - val timeout = action.limits.timeout.duration + blockingInvokeGrace - invokeSingleAction(user, action, payload, timeout, blocking = true, cause) map { - case (activationId, wskActivation) => (activationId, wskActivation, atomicActionCount + 1) - } - } + private def invokeNextAction( + user: Identity, + futureAction: Future[WhiskAction], + accounting: SequenceAccounting, + cause: Option[ActivationId])( + implicit transid: TransactionId): Future[SequenceAccounting] = { + futureAction.flatMap { action => + // the previous response becomes input for the next action in the sequence; + // the accounting no longer needs to hold a reference to it once the action is + // invoked, so previousResponse.getAndSet(null) drops the reference at this point + // which prevents dragging the previous response for the lifetime of the next activation + val inputPayload = accounting.previousResponse.getAndSet(null).result.map(_.asJsObject) + + // invoke the action by calling the right method depending on whether it's an atomic action or a sequence + val futureWhiskActivationTuple = action.exec match { + case SequenceExec(components) => + logging.info(this, s"sequence invoking an enclosed sequence $action") + // call invokeSequence to invoke the inner sequence + invokeSequence(user, action, inputPayload, blocking = true, topmost = false, components, cause, accounting.atomicActionCnt) + case _ => + // this is an invoke for an atomic action + logging.info(this, s"sequence invoking an enclosed atomic action $action") + val timeout = action.limits.timeout.duration + blockingInvokeGrace + invokeSingleAction(user, action, inputPayload, timeout, blocking = true, cause) map { + case (activationId, wskActivation) => (activationId, wskActivation, accounting.atomicActionCnt + 1) + } + } - futureWhiskActivationTuple map { - case (activationId, wskActivation, atomicActionCountSoFar) => - // the activation is None only if the activation could not be retrieved either from active ack or from db - wskActivation match { - case Some(activation) => (Right(activation), atomicActionCountSoFar) - case None => { - val activationResponse = ActivationResponse.whiskError(s"$sequenceRetrieveActivationTimeout Activation id '$activationId'.") - (Left(activationResponse), atomicActionCountSoFar) // dynamic count doesn't matter, sequence will be interrupted + futureWhiskActivationTuple.map { + case (activationId, wskActivation, atomicActionCountSoFar) => + wskActivation.map { + activation => accounting.maybe(activation, atomicActionCountSoFar, actionSequenceLimit) + }.getOrElse { + // the wskActivation is None only if the result could not be retrieved in time either from active ack or from db + logging.error(this, s"component activation timedout for $activationId") + val activationResponse = ActivationResponse.whiskError(sequenceRetrieveActivationTimeout(activationId)) + accounting.fail(activationResponse, Some(activationId)) } - } + }.recover { + // check any failure here and generate an activation response to encapsulate + // the failure mode; consider this failure a whisk error + case t: Throwable => + logging.error(this, s"component activation failed: $t") + accounting.fail(ActivationResponse.whiskError(sequenceActivationFailure), None) + } } } /** Replaces default namespaces in a vector of components from a sequence with appropriate namespace. */ private def resolveDefaultNamespace(components: Vector[FullyQualifiedEntityName], user: Identity): Vector[FullyQualifiedEntityName] = { - // if components are part of the default namespace, they contain `_`; replace it! - val resolvedComponents = components map { c => FullyQualifiedEntityName(c.path.resolveNamespace(user.namespace), c.name) } - resolvedComponents + // resolve any namespaces that may appears as "_" (the default namespace) + components.map(c => FullyQualifiedEntityName(c.path.resolveNamespace(user.namespace), c.name)) } /** Max atomic action count allowed for sequences */ private lazy val actionSequenceLimit = whiskConfig.actionSequenceLimit.toInt } + +/** + * Cumulative accounting of what happened during the execution of a sequence. + * + * @param atomicActionCnt the current count of non-sequence (c.f. atomic) actions already invoked + * @param previousResponse a reference to the previous activation result which will be nulled out + * when no longer needed (see previousResponse.getAndSet(null) below) + * @param logs a mutable buffer that is appended with new activation ids as the sequence unfolds + * @param duration the "user" time so far executing the sequence (sum of durations for + * all actions invoked so far which is different from the total time spent executing the sequence) + * @param maxMemory the maximum memory annotation observed so far for the + * components (needed to annotate the sequence with GB-s) + * @param shortcircuit when true, stops the execution of the next component in the sequence + */ +protected[actions] case class SequenceAccounting( + atomicActionCnt: Int, + previousResponse: AtomicReference[ActivationResponse], + logs: mutable.Buffer[ActivationId], + duration: Long = 0, + maxMemory: Option[Int] = None, + shortcircuit: Boolean = false) { + + /** @return the ActivationLogs data structure for this sequence invocation */ + def finalLogs = ActivationLogs(logs.map(id => id.asString).toVector) + + /** The previous activation was successful. */ + private def success(activation: WhiskActivation, newCnt: Int, shortcircuit: Boolean = false) = { + previousResponse.set(null) + SequenceAccounting( + prev = this, + newCnt = newCnt, + shortcircuit = shortcircuit, + incrDuration = activation.duration, + newResponse = activation.response, + newActivationId = activation.activationId, + newMemoryLimit = activation.annotations.get("limits") map { + limitsAnnotation => // we have a limits annotation + limitsAnnotation.asJsObject.getFields("memory") match { + case Seq(JsNumber(memory)) => Some(memory.toInt) // we have a numerical "memory" field in the "limits" annotation + } + } getOrElse { None }) + } + + /** The previous activation failed (this is used when there is no activation record or an internal error. */ + def fail(failureResponse: ActivationResponse, activationId: Option[ActivationId]) = { + require(!failureResponse.isSuccess) + logs.appendAll(activationId) + copy(previousResponse = new AtomicReference(failureResponse), shortcircuit = true) + } + + /** Determines whether the previous activation succeeded or failed. */ + def maybe(activation: WhiskActivation, newCnt: Int, maxSequenceCnt: Int) = { + // check conditions on payload that may lead to interrupting the execution of the sequence + // short-circuit the execution of the sequence iff the payload contains an error field + // and is the result of an action return, not the initial payload + val outputPayload = activation.response.result.map(_.asJsObject) + val payloadContent = outputPayload getOrElse JsObject.empty + val errorField = payloadContent.fields.get(ActivationResponse.ERROR_FIELD) + val withinSeqLimit = newCnt <= maxSequenceCnt + + if (withinSeqLimit && errorField.isEmpty) { + // all good with this action invocation + success(activation, newCnt) + } else { + val nextActivation = if (!withinSeqLimit) { + // no error in the activation but the dynamic count of actions exceeds the threshold + // this is here as defensive code; the activation should not occur if its takes the + // count above its limit + val newResponse = ActivationResponse.applicationError(sequenceIsTooLong) + activation.copy(response = newResponse) + } else { + assert(errorField.isDefined) + activation + } + + // there is an error field in the activation response. here, we treat this like success, + // in the sense of tallying up the accounting fields, but terminate the sequence early + success(nextActivation, newCnt, shortcircuit = true) + } + } +} + +/** + * Three constructors for SequenceAccounting: + * - one for successful invocation of an action in the sequence, + * - one for failed invocation, and + * - one to initialize things + */ +protected[actions] object SequenceAccounting { + + def maxMemory(prevMemoryLimit: Option[Int], newMemoryLimit: Option[Int]): Option[Int] = { + (prevMemoryLimit ++ newMemoryLimit).reduceOption(Math.max) + } + + // constructor for successful invocations, or error'ing ones (where shortcircuit = true) + def apply( + prev: SequenceAccounting, + newCnt: Int, + incrDuration: Option[Long], + newResponse: ActivationResponse, + newActivationId: ActivationId, + newMemoryLimit: Option[Int], + shortcircuit: Boolean): SequenceAccounting = { + + // compute the new max memory + val newMaxMemory = maxMemory(prev.maxMemory, newMemoryLimit) + + // append log entry + prev.logs += newActivationId + + SequenceAccounting( + atomicActionCnt = newCnt, + previousResponse = new AtomicReference(newResponse), + logs = prev.logs, + duration = incrDuration map { prev.duration + _ } getOrElse { prev.duration }, + maxMemory = newMaxMemory, + shortcircuit = shortcircuit) + } + + // constructor for initial payload + def apply(atomicActionCnt: Int, initialPayload: ActivationResponse): SequenceAccounting = { + SequenceAccounting(atomicActionCnt, new AtomicReference(initialPayload), mutable.Buffer.empty) + } +} + +protected[actions] case class FailedSequenceActivation(accounting: SequenceAccounting) extends Throwable diff --git a/tests/src/test/scala/system/basic/WskSequenceTests.scala b/tests/src/test/scala/system/basic/WskSequenceTests.scala index e0dfdefd406..06f5dea1b9f 100644 --- a/tests/src/test/scala/system/basic/WskSequenceTests.scala +++ b/tests/src/test/scala/system/basic/WskSequenceTests.scala @@ -60,7 +60,7 @@ class WskSequenceTests behavior of "Wsk Sequence" - it should "invoke a blocking sequence action and invoke the updated sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) { + it should "invoke a sequence with normal payload and payload with error field" in withAssetCleaner(wskprops) { (wp, assetHelper) => val name = "sequence" val actions = Seq("split", "sort", "head", "cat") @@ -109,7 +109,7 @@ class WskSequenceTests // result of sequence should be identical to previous invocation above val payload = Map("error" -> JsString("irrelevant error string"), "payload" -> args.mkString("\n").toJson) val thirdrun = wsk.action.invoke(name, payload) - withActivation(wsk.activation, thirdrun, totalWait = 2 *allowedActionDuration) { + withActivation(wsk.activation, thirdrun, totalWait = 2 * allowedActionDuration) { activation => checkSequenceLogsAndAnnotations(activation, 2) // 2 activations in this sequence val result = activation.response.result.get @@ -118,14 +118,57 @@ class WskSequenceTests } } + it should "invoke a sequence with an enclosing sequence action" in withAssetCleaner(wskprops) { + (wp, assetHelper) => + val inner_name = "inner_sequence" + val outer_name = "outer_sequence" + val inner_actions = Seq("sort", "head") + val actions = Seq("split") ++ inner_actions ++ Seq("cat") + // create atomic actions + for (actionName <- actions) { + val file = TestUtils.getTestActionFilename(s"$actionName.js") + assetHelper.withCleaner(wsk.action, actionName) { (action, _) => + action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration)) + } + } + + // create inner sequence + assetHelper.withCleaner(wsk.action, inner_name) { + val inner_sequence = inner_actions.mkString(",") + (action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence")) + } + + // create outer sequence + assetHelper.withCleaner(wsk.action, outer_name) { + val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",") + (action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence")) + } + + val now = "it is now " + new Date() + val args = Array("what time is it?", now) + val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson)) + withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) { + activation => + checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence + activation.cause shouldBe None // topmost sequence + val result = activation.response.result.get + result.fields.get("payload") shouldBe defined + result.fields.get("length") should not be defined + result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson))) + } + } + /** * s -> echo, x, echo * x -> echo * * update x -> echo -- should work * run s -> should stop after echo + * + * This confirms that a dynamic check on the sequence length holds within the system limit. + * This is different from creating a long sequence up front which will report a length error at create time. */ - it should "create a sequence, run it, update one of the atomic actions to a sequence and stop executing the outer sequence when limit reached" in withAssetCleaner(wskprops) { + it should "replace atomic component in a sequence that is too long and report invoke error" in withAssetCleaner(wskprops) { (wp, assetHelper) => val xName = "xSequence" val sName = "sSequence" @@ -176,52 +219,11 @@ class WskSequenceTests withActivation(wsk.activation, getInnerSeq, totalWait = allowedActionDuration) { innerSeqActivation => innerSeqActivation.logs.get.size shouldBe (limit - 1) - innerSeqActivation.cause shouldBe defined - innerSeqActivation.cause.get shouldBe (activation.activationId) + innerSeqActivation.cause shouldBe Some(activation.activationId) } } } - it should "invoke a blocking sequence action with an enclosing sequence action" in withAssetCleaner(wskprops) { - (wp, assetHelper) => - val inner_name = "inner_sequence" - val outer_name = "outer_sequence" - val inner_actions = Seq("sort", "head") - val actions = Seq("split") ++ inner_actions ++ Seq("cat") - // create atomic actions - for (actionName <- actions) { - val file = TestUtils.getTestActionFilename(s"$actionName.js") - assetHelper.withCleaner(wsk.action, actionName) { (action, _) => - action.create(name = actionName, artifact = Some(file), timeout = Some(allowedActionDuration)) - } - } - - // create inner sequence - assetHelper.withCleaner(wsk.action, inner_name) { - val inner_sequence = inner_actions.mkString(",") - (action, _) => action.create(inner_name, Some(inner_sequence), kind = Some("sequence")) - } - - // create outer sequence - assetHelper.withCleaner(wsk.action, outer_name) { - val outer_sequence = Seq("split", "inner_sequence", "cat").mkString(",") - (action, _) => action.create(outer_name, Some(outer_sequence), kind = Some("sequence")) - } - - val now = "it is now " + new Date() - val args = Array("what time is it?", now) - val run = wsk.action.invoke(outer_name, Map("payload" -> args.mkString("\n").toJson)) - withActivation(wsk.activation, run, totalWait = 4 * allowedActionDuration) { - activation => - checkSequenceLogsAndAnnotations(activation, 3) // 3 activations in this sequence - activation.cause shouldBe None // topmost sequence - val result = activation.response.result.get - result.fields.get("payload") shouldBe defined - result.fields.get("length") should not be defined - result.fields.get("lines") shouldBe Some(JsArray(Vector(now.toJson))) - } - } - it should "create and run a sequence in a package with parameters" in withAssetCleaner(wskprops) { (wp, assetHelper) => val sName = "sSequence" @@ -294,6 +296,7 @@ class WskSequenceTests // action params trump package params checkLogsAtomicAction(0, run, new Regex(String.format(".*key0: value0.*key1a: value1a.*key1b: value2b.*key2a: value2a.*payload: %s", now))) } + /** * s -> apperror, echo * only apperror should run diff --git a/tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala b/tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala new file mode 100644 index 00000000000..d3e591889c1 --- /dev/null +++ b/tests/src/test/scala/whisk/core/controller/actions/test/SequenceAccountingTests.scala @@ -0,0 +1,141 @@ +/* + * Copyright 2015-2016 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package whisk.core.controller.actions.test + +import java.time.Instant + +import scala.concurrent.duration.DurationInt + +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner + +import common.WskActorSystem +import spray.json._ +import whisk.core.controller.actions.SequenceAccounting +import whisk.core.entity._ +import whisk.core.entity.ActivationResponse +import whisk.core.entity.size.SizeInt +import whisk.http.Messages + +@RunWith(classOf[JUnitRunner]) +class SequenceAccountingTests extends FlatSpec with Matchers with WskActorSystem { + + behavior of "sequence accounting" + + val okRes1 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))) + val okRes2 = ActivationResponse.success(Some(JsObject("res" -> JsNumber(2)))) + val failedRes = ActivationResponse.applicationError(JsNumber(3)) + + val okActivation = WhiskActivation( + namespace = EntityPath("ns"), + name = EntityName("a"), + Subject(), + activationId = ActivationId(), + start = Instant.now(), + end = Instant.now(), + response = okRes2, + annotations = Parameters("limits", ActionLimits( + TimeLimit(1.second), + MemoryLimit(128.MB), + LogLimit(1.MB)).toJson), + duration = Some(123)) + + val notOkActivation = WhiskActivation( + namespace = EntityPath("ns"), + name = EntityName("a"), + Subject(), + activationId = ActivationId(), + start = Instant.now(), + end = Instant.now(), + response = failedRes, + annotations = Parameters("limits", ActionLimits( + TimeLimit(11.second), + MemoryLimit(256.MB), + LogLimit(2.MB)).toJson), + duration = Some(234)) + + it should "create initial accounting object" in { + val s = SequenceAccounting(2, okRes1) + s.atomicActionCnt shouldBe 2 + s.previousResponse.get shouldBe okRes1 + s.logs shouldBe empty + s.duration shouldBe 0 + s.maxMemory shouldBe None + s.shortcircuit shouldBe false + } + + it should "resolve maybe to success and update accounting object" in { + val p = SequenceAccounting(2, okRes1) + val n1 = p.maybe(okActivation, 3, 5) + n1.atomicActionCnt shouldBe 3 + n1.previousResponse.get shouldBe okRes2 + n1.logs.length shouldBe 1 + n1.logs(0) shouldBe okActivation.activationId + n1.duration shouldBe 123 + n1.maxMemory shouldBe Some(128) + n1.shortcircuit shouldBe false + } + + it should "resolve maybe and enable short circuit" in { + val p = SequenceAccounting(2, okRes1) + val n1 = p.maybe(okActivation, 3, 5) + val n2 = n1.maybe(notOkActivation, 4, 5) + n2.atomicActionCnt shouldBe 4 + n2.previousResponse.get shouldBe failedRes + n2.logs.length shouldBe 2 + n2.logs(0) shouldBe okActivation.activationId + n2.logs(1) shouldBe notOkActivation.activationId + n2.duration shouldBe (123 + 234) + n2.maxMemory shouldBe Some(256) + n2.shortcircuit shouldBe true + } + + it should "record an activation that exceeds allowed limit but also short circuit" in { + val p = SequenceAccounting(2, okRes1) + val n = p.maybe(okActivation, 3, 2) + n.atomicActionCnt shouldBe 3 + n.previousResponse.get shouldBe ActivationResponse.applicationError(Messages.sequenceIsTooLong) + n.logs.length shouldBe 1 + n.logs(0) shouldBe okActivation.activationId + n.duration shouldBe 123 + n.maxMemory shouldBe Some(128) + n.shortcircuit shouldBe true + } + + it should "set failed response and short circuit on failure" in { + val p = SequenceAccounting(2, okRes1) + val n = p.maybe(okActivation, 3, 3) + val f = n.fail(failedRes, None) + f.atomicActionCnt shouldBe 3 + f.previousResponse.get shouldBe failedRes + f.logs.length shouldBe 1 + f.logs(0) shouldBe okActivation.activationId + f.duration shouldBe 123 + f.maxMemory shouldBe Some(128) + f.shortcircuit shouldBe true + } + + it should "resolve max memory" in { + SequenceAccounting.maxMemory(None, None) shouldBe None + SequenceAccounting.maxMemory(None, Some(1)) shouldBe Some(1) + SequenceAccounting.maxMemory(Some(1), None) shouldBe Some(1) + SequenceAccounting.maxMemory(Some(1), Some(2)) shouldBe Some(2) + SequenceAccounting.maxMemory(Some(2), Some(1)) shouldBe Some(2) + SequenceAccounting.maxMemory(Some(2), Some(2)) shouldBe Some(2) + } +} diff --git a/tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala b/tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala new file mode 100644 index 00000000000..b530d3af302 --- /dev/null +++ b/tests/src/test/scala/whisk/utils/test/ExecutionContextFactoryTests.scala @@ -0,0 +1,43 @@ +/* + * Copyright 2015-2016 IBM Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package whisk.utils.test + +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt + +import org.junit.runner.RunWith +import org.scalatest.FlatSpec +import org.scalatest.Matchers +import org.scalatest.junit.JUnitRunner + +import common.WskActorSystem +import whisk.utils.ExecutionContextFactory.FutureExtensions + +@RunWith(classOf[JUnitRunner]) +class ExecutionContextFactoryTests extends FlatSpec with Matchers with WskActorSystem { + + behavior of "future extensions" + + it should "take first to complete" in { + val f1 = Future.successful({}).withTimeout(500.millis, new Throwable("error")) + Await.result(f1, 1.second) shouldBe ({}) + + val failure = new Throwable("error") + val f2 = Future { Thread.sleep(1.second.toMillis) }.withTimeout(500.millis, failure) + a[Throwable] shouldBe thrownBy { Await.result(f2, 1.seconds) } + } +}