diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala index f4c33005f57..641fed6187b 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/containerpool/ContainerProxy.scala @@ -24,6 +24,7 @@ import akka.actor.{FSM, Props, Stash} import akka.event.Logging.InfoLevel import akka.pattern.pipe import pureconfig.loadConfigOrThrow + import scala.collection.immutable import spray.json.DefaultJsonProtocol._ import spray.json._ @@ -35,6 +36,7 @@ import org.apache.openwhisk.core.database.UserContext import org.apache.openwhisk.core.entity.ExecManifest.ImageName import org.apache.openwhisk.core.entity._ import org.apache.openwhisk.core.entity.size._ +import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck import org.apache.openwhisk.http.Messages import scala.concurrent.Future @@ -127,7 +129,7 @@ case object RunCompleted */ class ContainerProxy( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any], + sendActiveAck: ActiveAck, storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, @@ -496,11 +498,17 @@ class ContainerProxy( ActivationResponse.whiskError(Messages.abnormalRun)) } - // Sending active ack. Entirely asynchronous and not waited upon. + // Sending active ack is an asynchronous operation. The result is forward as soon as possible + // for blocking activations so that dependent activations can be scheduled. The completion + // message which frees a loadbalancer slot is sent after the active ack future completes + // to ensure proper ordering. val sendResult = if (job.msg.blocking) { activation.map( sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false)) - } else Future.successful(()) + } else { + // For non-blocking request, do not forward the result. + Future.successful(()) + } val context = UserContext(job.msg.user) @@ -530,7 +538,8 @@ class ContainerProxy( activationWithLogs .map(_.fold(_.activation, identity)) .foreach { activation => - // Sending the completionMessage to the controller asynchronously. But not before result message is sent. + // Sending the completionMessage to the controller after the active ack ensure proper ordering (result + // is received before the completion message for blocking invokes). sendResult.onComplete( _ => sendActiveAck( diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index da868438757..ab3051044bd 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -43,6 +43,23 @@ import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} +object InvokerReactive { + + /** + * An method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages + * are either completion messages for an activation to indicate a resource slot is free, or result-forwarding + * messages for continuations (e.g., sequences and conductor actions). + * + * @param TransactionId the transaction id for the activation + * @param WhiskActivaiton is the activation result + * @param Boolean is true iff the activation was a blocking request + * @param ControllerInstanceId the originating controller/loadbalancer id + * @param UUID is the UUID for the namespace owning the activation + * @param Boolean is true this is resource free message and false if this is a result forwarding message + */ + type ActiveAck = (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any] +} + class InvokerReactive( config: WhiskConfig, instance: InvokerInstanceId, @@ -115,12 +132,12 @@ class InvokerReactive( }) /** Sends an active-ack. */ - private val ack = (tid: TransactionId, - activationResult: WhiskActivation, - blockingInvoke: Boolean, - controllerInstance: ControllerInstanceId, - userId: UUID, - isSlotFree: Boolean) => { + private val ack: InvokerReactive.ActiveAck = (tid: TransactionId, + activationResult: WhiskActivation, + blockingInvoke: Boolean, + controllerInstance: ControllerInstanceId, + userId: UUID, + isSlotFree: Boolean) => { implicit val transid: TransactionId = tid def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {