Skip to content

Commit

Permalink
Add additional comments to clarify the ordering of result and complet…
Browse files Browse the repository at this point in the history
…ion messages.

Adds a type alias for the active ack messages, and document the interface.
  • Loading branch information
rabbah committed Nov 22, 2018
1 parent 1fd36bb commit 5e43072
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import akka.actor.{FSM, Props, Stash}
import akka.event.Logging.InfoLevel
import akka.pattern.pipe
import pureconfig.loadConfigOrThrow

import scala.collection.immutable
import spray.json.DefaultJsonProtocol._
import spray.json._
Expand All @@ -35,6 +36,7 @@ import org.apache.openwhisk.core.database.UserContext
import org.apache.openwhisk.core.entity.ExecManifest.ImageName
import org.apache.openwhisk.core.entity._
import org.apache.openwhisk.core.entity.size._
import org.apache.openwhisk.core.invoker.InvokerReactive.ActiveAck
import org.apache.openwhisk.http.Messages

import scala.concurrent.Future
Expand Down Expand Up @@ -127,7 +129,7 @@ case object RunCompleted
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
sendActiveAck: ActiveAck,
storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
Expand Down Expand Up @@ -496,11 +498,17 @@ class ContainerProxy(
ActivationResponse.whiskError(Messages.abnormalRun))
}

// Sending active ack. Entirely asynchronous and not waited upon.
// Sending active ack is an asynchronous operation. The result is forward as soon as possible
// for blocking activations so that dependent activations can be scheduled. The completion
// message which frees a loadbalancer slot is sent after the active ack future completes
// to ensure proper ordering.
val sendResult = if (job.msg.blocking) {
activation.map(
sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false))
} else Future.successful(())
} else {
// For non-blocking request, do not forward the result.
Future.successful(())
}

val context = UserContext(job.msg.user)

Expand Down Expand Up @@ -530,7 +538,8 @@ class ContainerProxy(
activationWithLogs
.map(_.fold(_.activation, identity))
.foreach { activation =>
// Sending the completionMessage to the controller asynchronously. But not before result message is sent.
// Sending the completionMessage to the controller after the active ack ensure proper ordering (result
// is received before the completion message for blocking invokes).
sendResult.onComplete(
_ =>
sendActiveAck(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

object InvokerReactive {

/**
* An method for sending Active Acknowledgements (aka "active ack") messages to the load balancer. These messages
* are either completion messages for an activation to indicate a resource slot is free, or result-forwarding
* messages for continuations (e.g., sequences and conductor actions).
*
* @param TransactionId the transaction id for the activation
* @param WhiskActivaiton is the activation result
* @param Boolean is true iff the activation was a blocking request
* @param ControllerInstanceId the originating controller/loadbalancer id
* @param UUID is the UUID for the namespace owning the activation
* @param Boolean is true this is resource free message and false if this is a result forwarding message
*/
type ActiveAck = (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any]
}

class InvokerReactive(
config: WhiskConfig,
instance: InvokerInstanceId,
Expand Down Expand Up @@ -115,12 +132,12 @@ class InvokerReactive(
})

/** Sends an active-ack. */
private val ack = (tid: TransactionId,
activationResult: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID,
isSlotFree: Boolean) => {
private val ack: InvokerReactive.ActiveAck = (tid: TransactionId,
activationResult: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID,
isSlotFree: Boolean) => {
implicit val transid: TransactionId = tid

def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
Expand Down

0 comments on commit 5e43072

Please sign in to comment.