Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send active-ack after log collection for nonblocking activations. #4041

Merged
merged 1 commit into from
Oct 31, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 63 additions & 10 deletions common/scala/src/main/scala/whisk/core/connector/Message.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,51 @@ object ActivationMessage extends DefaultJsonProtocol {
}

/**
* When adding fields, the serdes of the companion object must be updated also.
* The whisk activation field will have its logs stripped.
* Message that is sent from the invoker to the controller after action is completed or after slot is free again for
* new actions.
*/
abstract class AcknowledegmentMessage() extends Message {
override val transid: TransactionId
override def serialize: String = {
AcknowledegmentMessage.serdes.write(this).compactPrint
}
}

/**
* This message is sent from the invoker to the controller, after the slot of an invoker that has been used by the
* current action, is free again (after log collection)
*/
case class CompletionMessage(override val transid: TransactionId,
response: Either[ActivationId, WhiskActivation],
activationId: ActivationId,
isSystemError: Boolean,
invoker: InvokerInstanceId)
extends Message {
extends AcknowledegmentMessage() {

override def serialize: String = {
CompletionMessage.serdes.write(this).compactPrint
override def toString = {
activationId.asString
}
}

object CompletionMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat4(CompletionMessage.apply)
}

/**
* That message will be sent from the invoker to the controller after action completion if the user wants to have
* the result immediately (blocking activation).
* When adding fields, the serdes of the companion object must be updated also.
* The whisk activation field will have its logs stripped.
*/
case class ResultMessage(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation])
extends AcknowledegmentMessage() {

override def toString = {
response.fold(l => l, r => r.activationId).asString
}
}

object CompletionMessage extends DefaultJsonProtocol {
object ResultMessage extends DefaultJsonProtocol {
implicit def eitherResponse =
new JsonFormat[Either[ActivationId, WhiskActivation]] {
def write(either: Either[ActivationId, WhiskActivation]) = either match {
Expand All @@ -101,12 +128,38 @@ object CompletionMessage extends DefaultJsonProtocol {
// per the ActivationId's serializer, it is guaranteed to be a String even if it only consists of digits
case _: JsString => Left(value.convertTo[ActivationId])
case _: JsObject => Right(value.convertTo[WhiskActivation])
case _ => deserializationError("could not read CompletionMessage")
case _ => deserializationError("could not read ResultMessage")
}
}

def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson))
private val serdes = jsonFormat3(CompletionMessage.apply)
def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson))
implicit val serdes = jsonFormat2(ResultMessage.apply)
}

object AcknowledegmentMessage extends DefaultJsonProtocol {
def parse(msg: String): Try[AcknowledegmentMessage] = {
Try(serdes.read(msg.parseJson))
}

implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] {
override def write(obj: AcknowledegmentMessage): JsValue = {
obj match {
case c: CompletionMessage => c.toJson
case r: ResultMessage => r.toJson
}
}

override def read(json: JsValue): AcknowledegmentMessage = {
json.asJsObject
// The field invoker is only part of the CompletionMessage. If this field is part of the JSON, we try to convert
// it to a CompletionMessage. Otherwise to a ResultMessage.
// If both conversions fail, an error will be thrown that needs to be handled.
.getFields("invoker")
.headOption
.map(_ => json.convertTo[CompletionMessage])
.getOrElse(json.convertTo[ResultMessage])
}
}
}

case class PingMessage(instance: InvokerInstanceId) extends Message {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,12 +279,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con

// Install a timeout handler for the catastrophic case where an active ack is not received at all
// (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
// the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
// the completion ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
// in this case, if the activation handler is still registered, remove it and update the books.
activations.getOrElseUpdate(
msg.activationId, {
val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
processCompletion(Left(msg.activationId), msg.transid, forced = true, invoker = instance)
processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance)
}

// please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
Expand Down Expand Up @@ -344,36 +344,61 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con
activeAckConsumer,
maxActiveAcksPerPoll,
activeAckPollDuration,
processActiveAck)
processAcknowledgement)
})

/** 4. Get the active-ack message and parse it */
private def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future {
/** 4. Get the acknowledgement message and parse it */
private def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future {
val raw = new String(bytes, StandardCharsets.UTF_8)
CompletionMessage.parse(raw) match {
AcknowledegmentMessage.parse(raw) match {
case Success(m: CompletionMessage) =>
processCompletion(m.response, m.transid, forced = false, invoker = m.invoker)
processCompletion(
m.activationId,
m.transid,
forced = false,
isSystemError = m.isSystemError,
invoker = m.invoker)
activationFeed ! MessageFeed.Processed

case Success(m: ResultMessage) =>
processResult(m.response, m.transid)
activationFeed ! MessageFeed.Processed

case Failure(t) =>
activationFeed ! MessageFeed.Processed
logging.error(this, s"failed processing message: $raw with $t")
logging.error(this, s"failed processing message: $raw")

case _ =>
activationFeed ! MessageFeed.Processed
logging.error(this, s"Unexpected Acknowledgment message received by loadbalancer: $raw")
}
}

/** 5. Process the result ack and return it to the user */
private def processResult(response: Either[ActivationId, WhiskActivation], tid: TransactionId): Unit = {
val aid = response.fold(l => l, r => r.activationId)

// Resolve the promise to send the result back to the user
// The activation will be removed from `activations`-map later, when we receive the completion message, because the
// slot of the invoker is not yet free for new activations.
activations.get(aid).map { entry =>
entry.promise.trySuccess(response)
}
logging.info(this, s"received result ack for '$aid'")(tid)
}

/** 5. Process the active-ack and update the state accordingly */
private def processCompletion(response: Either[ActivationId, WhiskActivation],
/** Process the completion ack and update the state */
private def processCompletion(aid: ActivationId,
tid: TransactionId,
forced: Boolean,
isSystemError: Boolean,
invoker: InvokerInstanceId): Unit = {
val aid = response.fold(l => l, r => r.activationId)

val invocationResult = if (forced) {
InvocationFinishedResult.Timeout
} else {
// If the response contains a system error, report that, otherwise report Success
// Left generally is considered a Success, since that could be a message not fitting into Kafka
val isSystemError = response.fold(_ => false, _.response.isWhiskError)
if (isSystemError) {
InvocationFinishedResult.SystemError
} else {
Expand All @@ -390,30 +415,34 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con

if (!forced) {
entry.timeoutHandler.cancel()
entry.promise.trySuccess(response)
// If the action was blocking and the Resultmessage has been received before nothing will happen here.
// If the action was blocking and the ResultMessage is still missing, we pass the ActivationId. With this Id,
// the controller will get the result out of the database.
// If the action was non-blocking, we will close the promise here.
entry.promise.trySuccess(Left(aid))
} else {
entry.promise.tryFailure(new Throwable("no active ack received"))
entry.promise.tryFailure(new Throwable("no completion ack received"))
}

logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid)
logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid)
// Active acks that are received here are strictly from user actions - health actions are not part of
// the load balancer's activation map. Inform the invoker pool supervisor of the user action completion.
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if tid == TransactionId.invokerHealth =>
// Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result
// is important to pass to the invokerPool because they are used to determine if the invoker can be considered
// healthy again.
logging.info(this, s"received active ack for health action on $invoker")(tid)
logging.info(this, s"received completion ack for health action on $invoker")(tid)
invokerPool ! InvocationFinishedMessage(invoker, invocationResult)
case None if !forced =>
// Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack).
// The result is ignored because a timeout has already been reported to the invokerPool per the force.
logging.debug(this, s"received active ack for '$aid' which has no entry")(tid)
logging.debug(this, s"received completion ack for '$aid' which has no entry")(tid)
case None =>
// The entry has already been removed by an active ack. This part of the code is reached by the timeout and can
// happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active
// ack canceled the timer). As the active ack is already processed we don't have to do anything here.
logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid)
logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and could not be process
*/
class ContainerProxy(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
Expand Down Expand Up @@ -168,7 +168,8 @@ class ContainerProxy(
activation,
job.msg.blocking,
job.msg.rootControllerIndex,
job.msg.user.namespace.uuid)
job.msg.user.namespace.uuid,
true)
storeActivation(transid, activation, context)
}
.flatMap { container =>
Expand Down Expand Up @@ -390,8 +391,10 @@ class ContainerProxy(
}

// Sending active ack. Entirely asynchronous and not waited upon.
activation.foreach(
sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid))
if (job.msg.blocking) {
activation.foreach(
sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false))
}

val context = UserContext(job.msg.user)

Expand All @@ -418,8 +421,14 @@ class ContainerProxy(
}
}

// Storing the record. Entirely asynchronous and not waited upon.
activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _, context))
activationWithLogs
.map(_.fold(_.activation, identity))
.foreach { activation =>
// Sending the completionMessage to the controller asynchronously.
sendActiveAck(tid, activation, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true)
// Storing the record. Entirely asynchronous and not waited upon.
storeActivation(tid, activation, context)
}

// Disambiguate activation errors and transform the Either into a failed/successful Future respectively.
activationWithLogs.flatMap {
Expand All @@ -436,7 +445,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus
object ContainerProxy {
def props(
factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container],
ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any],
ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any],
store: (TransactionId, WhiskActivation, UserContext) => Future[Any],
collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs],
instance: InvokerInstanceId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,19 @@ class InvokerReactive(
activationResult: WhiskActivation,
blockingInvoke: Boolean,
controllerInstance: ControllerInstanceId,
userId: UUID) => {
userId: UUID,
isSlotFree: Boolean) => {
implicit val transid: TransactionId = tid

def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = {
val msg = CompletionMessage(transid, res, instance)
val msg = if (isSlotFree) {
val aid = res.fold(identity, _.activationId)
val isWhiskSystemError = res.fold(_ => false, _.response.isWhiskError)
CompletionMessage(transid, aid, isWhiskSystemError, instance)
} else {
ResultMessage(transid, res)
}

producer.send(topic = "completed" + controllerInstance.asString, msg).andThen {
case Success(_) =>
logging.info(
Expand All @@ -130,7 +138,8 @@ class InvokerReactive(
}
}

if (UserEvents.enabled) {
// UserMetrics are sent, when the slot is free again. This ensures, that all metrics are sent.
if (UserEvents.enabled && isSlotFree) {
EventMessage.from(activationResult, s"invoker${instance.instance}", userId) match {
case Success(msg) => UserEvents.send(producer, msg)
case Failure(t) => logging.error(this, s"activation event was not sent: $t")
Expand Down Expand Up @@ -223,7 +232,7 @@ class InvokerReactive(
val context = UserContext(msg.user)
val activation = generateFallbackActivation(msg, response)
activationFeed ! MessageFeed.Processed
ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid)
ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, true)
store(msg.transid, activation, context)
Future.successful(())
}
Expand All @@ -233,7 +242,7 @@ class InvokerReactive(
activationFeed ! MessageFeed.Processed
val activation =
generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted))
ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid)
ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid, true)
logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.")
Future.successful(())
}
Expand Down
Loading