Skip to content

Commit

Permalink
Free up slot in Loadbalancer after log-collection is finished.
Browse files Browse the repository at this point in the history
Co-authored-by: Sugandha Agrawal <agrawals@de.ibm.com>
  • Loading branch information
cbickel and Sugandha Agrawal committed Oct 19, 2018
1 parent 0d6c017 commit 997ccd1
Show file tree
Hide file tree
Showing 7 changed files with 248 additions and 129 deletions.
73 changes: 63 additions & 10 deletions common/scala/src/main/scala/whisk/core/connector/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,51 @@ object ActivationMessage extends DefaultJsonProtocol {
}

/**
* When adding fields, the serdes of the companion object must be updated also.
* The whisk activation field will have its logs stripped.
* Message that is sent from the invoker to the controller after action is completed or after slot is free again for
* new actions.
*/
abstract class AcknowledegmentMessage() extends Message {
override val transid: TransactionId
override def serialize: String = {
AcknowledegmentMessage.serdes.write(this).compactPrint
}
}

/**
* This message is sent from the invoker to the controller, after the slot of an invoker that has been used by the
* current action, is free again (after log collection)
*/
case class CompletionMessage(override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
activationId: ActivationId,
isSystemError: Boolean,
invoker: InvokerInstanceId)
extends Message {
extends AcknowledegmentMessage() {

override def serialize: String = {
CompletionMessage.serdes.write(this).compactPrint
override def toString = {
activationId.asString
}
}

object CompletionMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat4(CompletionMessage.apply)
}

/**
* That message will be sent from the invoker to the controller after action completion if the user wants to have
* the result immediately (blocking activation).
* When adding fields, the serdes of the companion object must be updated also.
* The whisk activation field will have its logs stripped.
*/
case class ResultMessage(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage() {

override def toString = {
response.fold(l => l, r => r.activationId).asString
}
}

object CompletionMessage extends DefaultJsonProtocol {
object ResultMessage extends DefaultJsonProtocol {
implicit def eitherResponse =
new JsonFormat[Either[ActivationId, WhiskActivation]] {
def write(either: Either[ActivationId, WhiskActivation]) = either match {
Expand All @@ -101,12 +128,38 @@ object CompletionMessage extends DefaultJsonProtocol {
// per the ActivationId's serializer, it is guaranteed to be a String even if it only consists of digits
case _: JsString => Left(value.convertTo[ActivationId])
case _: JsObject => Right(value.convertTo[WhiskActivation])
case _ => deserializationError("could not read CompletionMessage")
case _ => deserializationError("could not read ResultMessage")
}
}

def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson))
private val serdes = jsonFormat3(CompletionMessage.apply)
def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat2(ResultMessage.apply)
}

object AcknowledegmentMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[AcknowledegmentMessage] = {
Try(serdes.read(msg.parseJson))
}

implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
override def write(obj: AcknowledegmentMessage): JsValue = {
obj match {
case c: CompletionMessage => c.toJson
case r: ResultMessage => r.toJson
}
}

override def read(json: JsValue): AcknowledegmentMessage = {
json.asJsObject
// The field invoker is only part of the CompletionMessage. If this field is part of the JSON, we try to convert
// it to a CompletionMessage. Otherwise to a ResultMessage.
// If both conversions fail, an error will be thrown that needs to be handled.
.getFields("invoker")
.headOption
.map(_ => json.convertTo[CompletionMessage])
.getOrElse(json.convertTo[ResultMessage])
}
}
}

case class PingMessage(instance: InvokerInstanceId) extends Message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con

// Install a timeout handler for the catastrophic case where an active ack is not received at all
// (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
// the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
// the completion ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
// in this case, if the activation handler is still registered, remove it and update the books.
activations.getOrElseUpdate(
msg.activationId, {
val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
processCompletion(Left(msg.activationId), msg.transid, forced = true, invoker = instance)
processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance)
}

// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
Expand Down Expand Up @@ -344,36 +344,61 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
activeAckConsumer,
maxActiveAcksPerPoll,
activeAckPollDuration,
processActiveAck)
processAcknowledgement)
})

/** 4. Get the active-ack message and parse it */
private def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
/** 4. Get the acknowledgement message and parse it */
private def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
CompletionMessage.parse(raw) match {
AcknowledegmentMessage.parse(raw) match {
case Success(m: CompletionMessage) =>
processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
processCompletion(
m.activationId,
m.transid,
forced = false,
isSystemError = m.isSystemError,
invoker = m.invoker)
activationFeed ! MessageFeed.Processed

case Success(m: ResultMessage) =>
processResult(m.response, m.transid)
activationFeed ! MessageFeed.Processed

case Failure(t) =>
activationFeed ! MessageFeed.Processed
logging.error(this, s"failed processing message: $raw with $t")
logging.error(this, s"failed processing message: $raw")

case _ =>
activationFeed ! MessageFeed.Processed
logging.error(this, s"Unexpected Acknowledgment message received by loadbalancer: $raw")
}
}

/** 5. Process the result ack and return it to the user */
private def processResult(response: Either[ActivationId, WhiskActivation], tid: TransactionId): Unit = {
val aid = response.fold(l => l, r => r.activationId)

// Resolve the promise to send the result back to the user
// The activation will be removed from `activations`-map later, when we receive the completion message, because the
// slot of the invoker is not yet free for new activations.
activations.get(aid).map { entry =>
entry.promise.trySuccess(response)
}
logging.info(this, s"received result ack for '$aid'")(tid)
}

/** 5. Process the active-ack and update the state accordingly */
private def processCompletion(response: Either[ActivationId, WhiskActivation],
/** Process the completion ack and update the state */
private def processCompletion(aid: ActivationId,
tid: TransactionId,
forced: Boolean,
isSystemError: Boolean,
invoker: InvokerInstanceId): Unit = {
val aid = response.fold(l => l, r => r.activationId)

val invocationResult = if (forced) {
InvocationFinishedResult.Timeout
} else {
// If the response contains a system error, report that, otherwise report Success
// Left generally is considered a Success, since that could be a message not fitting into Kafka
val isSystemError = response.fold(_ => false, _.response.isWhiskError)
if (isSystemError) {
InvocationFinishedResult.SystemError
} else {
Expand All @@ -390,30 +415,34 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con

if (!forced) {
entry.timeoutHandler.cancel()
entry.promise.trySuccess(response)
// If the action was blocking and the Resultmessage has been received before nothing will happen here.
// If the action was blocking and the ResultMessage is still missing, we pass the ActivationId. With this Id,
// the controller will get the result out of the database.
// If the action was non-blocking, we will close the promise here.
entry.promise.trySuccess(Left(aid))
} else {
entry.promise.tryFailure(new Throwable("no active ack received"))
entry.promise.tryFailure(new Throwable("no completion ack received"))
}

logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid)
// Active acks that are received here are strictly from user actions - health actions are not part of
// the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if tid == TransactionId.invokerHealth =>
// Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result
// is important to pass to the invokerPool because they are used to determine if the invoker can be considered
// healthy again.
logging.info(this, s"received active ack for health action on $invoker")(tid)
logging.info(this, s"received completion ack for health action on $invoker")(tid)
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if !forced =>
// Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack).
// The result is ignored because a timeout has already been reported to the invokerPool per the force.
logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
logging.debug(this, s"received completion ack for '$aid' which has no entry")(tid)
case None =>
// The entry has already been removed by an active ack. This part of the code is reached by the timeout and can
// happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active
// ack canceled the timer). As the active ack is already processed we don't have to do anything here.
logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and could not be process
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
Expand Down Expand Up @@ -168,7 +168,8 @@ class ContainerProxy(
activation,
job.msg.blocking,
job.msg.rootControllerIndex,
job.msg.user.namespace.uuid)
job.msg.user.namespace.uuid,
true)
storeActivation(transid, activation, context)
}
.flatMap { container =>
Expand Down Expand Up @@ -390,8 +391,10 @@ class ContainerProxy(
}

// Sending active ack. Entirely asynchronous and not waited upon.
activation.foreach(
sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid))
if (job.msg.blocking) {
activation.foreach(
sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false))
}

val context = UserContext(job.msg.user)

Expand All @@ -418,8 +421,14 @@ class ContainerProxy(
}
}

// Storing the record. Entirely asynchronous and not waited upon.
activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _, context))
activationWithLogs
.map(_.fold(_.activation, identity))
.foreach { activation =>
// Sending the completionMessage to the controller asynchronously.
sendActiveAck(tid, activation, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true)
// Storing the record. Entirely asynchronous and not waited upon.
storeActivation(tid, activation, context)
}

// Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
activationWithLogs.flatMap {
Expand All @@ -436,7 +445,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
object ContainerProxy {
def props(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,19 @@ class InvokerReactive(
activationResult: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID) => {
userId: UUID,
isSlotFree: Boolean) => {
implicit val transid: TransactionId = tid

def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
val msg = CompletionMessage(transid, res, instance)
val msg = if (isSlotFree) {
val aid = res.fold(identity, _.activationId)
val isWhiskSystemError = res.fold(_ => false, _.response.isWhiskError)
CompletionMessage(transid, aid, isWhiskSystemError, instance)
} else {
ResultMessage(transid, res)
}

producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
case Success(_) =>
logging.info(
Expand Down Expand Up @@ -223,7 +231,7 @@ class InvokerReactive(
val context = UserContext(msg.user)
val activation = generateFallbackActivation(msg, response)
activationFeed ! MessageFeed.Processed
ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid)
ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, true)
store(msg.transid, activation, context)
Future.successful(())
}
Expand All @@ -233,7 +241,7 @@ class InvokerReactive(
activationFeed ! MessageFeed.Processed
val activation =
generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted))
ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid)
ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid, true)
logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.")
Future.successful(())
}
Expand Down
Loading

0 comments on commit 997ccd1

Please sign in to comment.