Skip to content

Commit

Permalink
Send active-ack after log collection for nonblocking activations.
Browse files Browse the repository at this point in the history
Until now, an active-ack is sent before logs of a container are collected. If one customer writes a lot of logs or if log-collection is slow for some other reason, the invoker already gets new activations, that are queueing up.
This PR changes the behavior, to send the active-ack (for non-blocking) activations after log collection is finished. For blocking actions, there are two active acks now. One with the response for the user and one to free up the space in the bookkeeping of the loadbalancer.
  • Loading branch information
cbickel committed Sep 25, 2018
1 parent a6ae673 commit 9b495bd
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ object ActivationMessage extends DefaultJsonProtocol {
*/
case class CompletionMessage(override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
invoker: InvokerInstanceId)
invoker: InvokerInstanceId,
isLogCollectionFinished: Boolean = true)
extends Message {

override def serialize: String = {
Expand Down Expand Up @@ -106,7 +107,7 @@ object CompletionMessage extends DefaultJsonProtocol {
}

def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson))
private val serdes = jsonFormat3(CompletionMessage.apply)
private val serdes = jsonFormat4(CompletionMessage.apply)
}

case class PingMessage(instance: InvokerInstanceId) extends Message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
activations.getOrElseUpdate(
msg.activationId, {
val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
processCompletion(Left(msg.activationId), msg.transid, forced = true, invoker = instance)
processCompletion(
Left(msg.activationId),
msg.transid,
forced = true,
invoker = instance,
isLogCollectionFinished = true)
}

// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
Expand Down Expand Up @@ -352,7 +357,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
val raw = new String(bytes, StandardCharsets.UTF_8)
CompletionMessage.parse(raw) match {
case Success(m: CompletionMessage) =>
processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
processCompletion(m.response, m.transid, forced = false, invoker = m.invoker, m.isLogCollectionFinished)
activationFeed ! MessageFeed.Processed

case Failure(t) =>
Expand All @@ -365,7 +370,8 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
private def processCompletion(response: Either[ActivationId, WhiskActivation],
tid: TransactionId,
forced: Boolean,
invoker: InvokerInstanceId): Unit = {
invoker: InvokerInstanceId,
isLogCollectionFinished: Boolean): Unit = {
val aid = response.fold(l => l, r => r.activationId)

val invocationResult = if (forced) {
Expand All @@ -381,40 +387,47 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
}
}

activations.remove(aid) match {
case Some(entry) =>
totalActivations.decrement()
totalActivationMemory.add(entry.memory.toMB * (-1))
activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release(entry.memory.toMB.toInt))

if (!forced) {
entry.timeoutHandler.cancel()
entry.promise.trySuccess(response)
} else {
entry.promise.tryFailure(new Throwable("no active ack received"))
}
if (isLogCollectionFinished) {
activations.remove(aid) match {
case Some(entry) =>
totalActivations.decrement()
totalActivationMemory.add(entry.memory.toMB * (-1))
activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement())
schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release(entry.memory.toMB.toInt))

if (!forced) {
entry.timeoutHandler.cancel()
entry.promise.trySuccess(response)
} else {
entry.promise.tryFailure(new Throwable("no active ack received"))
}

logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
// Active acks that are received here are strictly from user actions - health actions are not part of
// the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if tid == TransactionId.invokerHealth =>
// Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result
// is important to pass to the invokerPool because they are used to determine if the invoker can be considered
// healthy again.
logging.info(this, s"received active ack for health action on $invoker")(tid)
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if !forced =>
// Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack).
// The result is ignored because a timeout has already been reported to the invokerPool per the force.
logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
case None =>
// The entry has already been removed by an active ack. This part of the code is reached by the timeout and can
// happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active
// ack canceled the timer). As the active ack is already processed we don't have to do anything here.
logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
// Active acks that are received here are strictly from user actions - health actions are not part of
// the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if tid == TransactionId.invokerHealth =>
// Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result
// is important to pass to the invokerPool because they are used to determine if the invoker can be considered
// healthy again.
logging.info(this, s"received active ack for health action on $invoker")(tid)
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if !forced =>
// Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack).
// The result is ignored because a timeout has already been reported to the invokerPool per the force.
logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
case None =>
// The entry has already been removed by an active ack. This part of the code is reached by the timeout and can
// happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active
// ack canceled the timer). As the active ack is already processed we don't have to do anything here.
logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
}
} else {
// If the other active ack was faster, there won't be an entry in activations-map and nothing will happen.
activations.get(aid).foreach(_.promise.trySuccess(response))
logging.info(this, s"receiving active ack for $aid but logs are still getting collected")(tid)
}

}

private val invokerPool = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and could not be process
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
Expand Down Expand Up @@ -168,7 +168,8 @@ class ContainerProxy(
activation,
job.msg.blocking,
job.msg.rootControllerIndex,
job.msg.user.namespace.uuid)
job.msg.user.namespace.uuid,
true)
storeActivation(transid, activation, context)
}
.flatMap { container =>
Expand Down Expand Up @@ -390,8 +391,12 @@ class ContainerProxy(
}

// Sending active ack. Entirely asynchronous and not waited upon.
activation.foreach(
sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid))
// Send the active ack only if the activation was blocking. In this case, mark it as not completed yet, as the logs
// have to be collected, before the space on the invoker can get freed up.
if (job.msg.blocking) {
activation.foreach(
sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false))
}

val context = UserContext(job.msg.user)

Expand All @@ -418,6 +423,13 @@ class ContainerProxy(
}
}

// Sending active ack. Entirely asynchronous and not waited upon.
// If the activations was blocking, the loadbalancer will be notified here, that the space on the invoker is free
// again, as all logs are collected.
activationWithLogs
.map(_.fold(_.activation, identity))
.foreach(sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true))

// Storing the record. Entirely asynchronous and not waited upon.
activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _, context))

Expand All @@ -436,7 +448,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
object ContainerProxy {
def props(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,12 @@ class InvokerReactive(
activationResult: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID) => {
userId: UUID,
isLogCollectionFinished: Boolean) => {
implicit val transid: TransactionId = tid

def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
val msg = CompletionMessage(transid, res, instance)
val msg = CompletionMessage(transid, res, instance, isLogCollectionFinished)
producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
case Success(_) =>
logging.info(
Expand Down Expand Up @@ -233,7 +234,7 @@ class InvokerReactive(
val context = UserContext(msg.user)
val activation = generateFallbackActivation(msg, response)
activationFeed ! MessageFeed.Processed
ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid)
ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, true)
store(msg.transid, activation, context)
Future.successful(())
}
Expand All @@ -243,7 +244,7 @@ class InvokerReactive(
activationFeed ! MessageFeed.Processed
val activation =
generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted))
ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid)
ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid, true)
logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.")
Future.successful(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class ContainerProxyTests

/** Creates an inspectable version of the ack method, which records all calls in a buffer */
def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction {
(_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID) =>
(_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: Boolean) =>
activation.annotations.get("limits") shouldBe Some(a.limits.toJson)
activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson)
activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)
Expand Down

0 comments on commit 9b495bd

Please sign in to comment.