diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala index ecc0bc60263..f94ae34f7dc 100644 --- a/common/scala/src/main/scala/whisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala @@ -77,7 +77,8 @@ object ActivationMessage extends DefaultJsonProtocol { */ case class CompletionMessage(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation], - invoker: InvokerInstanceId) + invoker: InvokerInstanceId, + isLogCollectionFinished: Boolean = true) extends Message { override def serialize: String = { @@ -106,7 +107,7 @@ object CompletionMessage extends DefaultJsonProtocol { } def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson)) - private val serdes = jsonFormat3(CompletionMessage.apply) + private val serdes = jsonFormat4(CompletionMessage.apply) } case class PingMessage(instance: InvokerInstanceId) extends Message { diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 4adeb2a1223..4d6402312b3 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -284,7 +284,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con activations.getOrElseUpdate( msg.activationId, { val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) { - processCompletion(Left(msg.activationId), msg.transid, forced = true, invoker = instance) + processCompletion( + Left(msg.activationId), + msg.transid, + forced = true, + invoker = instance, + isLogCollectionFinished = true) } // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success @@ -352,7 +357,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con val raw = new String(bytes, StandardCharsets.UTF_8) CompletionMessage.parse(raw) match { case Success(m: CompletionMessage) => - processCompletion(m.response, m.transid, forced = false, invoker = m.invoker) + processCompletion(m.response, m.transid, forced = false, invoker = m.invoker, m.isLogCollectionFinished) activationFeed ! MessageFeed.Processed case Failure(t) => @@ -365,7 +370,8 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con private def processCompletion(response: Either[ActivationId, WhiskActivation], tid: TransactionId, forced: Boolean, - invoker: InvokerInstanceId): Unit = { + invoker: InvokerInstanceId, + isLogCollectionFinished: Boolean): Unit = { val aid = response.fold(l => l, r => r.activationId) val invocationResult = if (forced) { @@ -381,40 +387,47 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con } } - activations.remove(aid) match { - case Some(entry) => - totalActivations.decrement() - totalActivationMemory.add(entry.memory.toMB * (-1)) - activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement()) - schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release(entry.memory.toMB.toInt)) - - if (!forced) { - entry.timeoutHandler.cancel() - entry.promise.trySuccess(response) - } else { - entry.promise.tryFailure(new Throwable("no active ack received")) - } + if (isLogCollectionFinished) { + activations.remove(aid) match { + case Some(entry) => + totalActivations.decrement() + totalActivationMemory.add(entry.memory.toMB * (-1)) + activationsPerNamespace.get(entry.namespaceId).foreach(_.decrement()) + schedulingState.invokerSlots.lift(invoker.toInt).foreach(_.release(entry.memory.toMB.toInt)) + + if (!forced) { + entry.timeoutHandler.cancel() + entry.promise.trySuccess(response) + } else { + entry.promise.tryFailure(new Throwable("no active ack received")) + } - logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid) - // Active acks that are received here are strictly from user actions - health actions are not part of - // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. - invokerPool ! InvocationFinishedMessage(invoker, invocationResult) - case None if tid == TransactionId.invokerHealth => - // Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result - // is important to pass to the invokerPool because they are used to determine if the invoker can be considered - // healthy again. - logging.info(this, s"received active ack for health action on $invoker")(tid) - invokerPool ! InvocationFinishedMessage(invoker, invocationResult) - case None if !forced => - // Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack). - // The result is ignored because a timeout has already been reported to the invokerPool per the force. - logging.debug(this, s"received active ack for '$aid' which has no entry")(tid) - case None => - // The entry has already been removed by an active ack. This part of the code is reached by the timeout and can - // happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active - // ack canceled the timer). As the active ack is already processed we don't have to do anything here. - logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid) + logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid) + // Active acks that are received here are strictly from user actions - health actions are not part of + // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. + invokerPool ! InvocationFinishedMessage(invoker, invocationResult) + case None if tid == TransactionId.invokerHealth => + // Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result + // is important to pass to the invokerPool because they are used to determine if the invoker can be considered + // healthy again. + logging.info(this, s"received active ack for health action on $invoker")(tid) + invokerPool ! InvocationFinishedMessage(invoker, invocationResult) + case None if !forced => + // Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack). + // The result is ignored because a timeout has already been reported to the invokerPool per the force. + logging.debug(this, s"received active ack for '$aid' which has no entry")(tid) + case None => + // The entry has already been removed by an active ack. This part of the code is reached by the timeout and can + // happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active + // ack canceled the timer). As the active ack is already processed we don't have to do anything here. + logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid) + } + } else { + // If the other active ack was faster, there won't be an entry in activations-map and nothing will happen. + activations.get(aid).foreach(_.promise.trySuccess(response)) + logging.info(this, s"receiving active ack for $aid but logs are still getting collected")(tid) } + } private val invokerPool = { diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index b34ce587d4d..098515bbed4 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and could not be process */ class ContainerProxy( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any], + sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any], storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, @@ -168,7 +168,8 @@ class ContainerProxy( activation, job.msg.blocking, job.msg.rootControllerIndex, - job.msg.user.namespace.uuid) + job.msg.user.namespace.uuid, + true) storeActivation(transid, activation, context) } .flatMap { container => @@ -390,8 +391,12 @@ class ContainerProxy( } // Sending active ack. Entirely asynchronous and not waited upon. - activation.foreach( - sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid)) + // Send the active ack only if the activation was blocking. In this case, mark it as not completed yet, as the logs + // have to be collected, before the space on the invoker can get freed up. + if (job.msg.blocking) { + activation.foreach( + sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false)) + } val context = UserContext(job.msg.user) @@ -418,6 +423,13 @@ class ContainerProxy( } } + // Sending active ack. Entirely asynchronous and not waited upon. + // If the activations was blocking, the loadbalancer will be notified here, that the space on the invoker is free + // again, as all logs are collected. + activationWithLogs + .map(_.fold(_.activation, identity)) + .foreach(sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true)) + // Storing the record. Entirely asynchronous and not waited upon. activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _, context)) @@ -436,7 +448,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus object ContainerProxy { def props( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any], + ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any], store: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 652fefcaafe..a662cd308da 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -117,11 +117,12 @@ class InvokerReactive( activationResult: WhiskActivation, blockingInvoke: Boolean, controllerInstance: ControllerInstanceId, - userId: UUID) => { + userId: UUID, + isLogCollectionFinished: Boolean) => { implicit val transid: TransactionId = tid def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = { - val msg = CompletionMessage(transid, res, instance) + val msg = CompletionMessage(transid, res, instance, isLogCollectionFinished) producer.send(topic = "completed" + controllerInstance.asString, msg).andThen { case Success(_) => logging.info( @@ -233,7 +234,7 @@ class InvokerReactive( val context = UserContext(msg.user) val activation = generateFallbackActivation(msg, response) activationFeed ! MessageFeed.Processed - ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid) + ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, true) store(msg.transid, activation, context) Future.successful(()) } @@ -243,7 +244,7 @@ class InvokerReactive( activationFeed ! MessageFeed.Processed val activation = generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) - ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid) + ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid, true) logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.") Future.successful(()) } diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 904daebd053..274b6a9f214 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -146,7 +146,7 @@ class ContainerProxyTests /** Creates an inspectable version of the ack method, which records all calls in a buffer */ def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction { - (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID) => + (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: Boolean) => activation.annotations.get("limits") shouldBe Some(a.limits.toJson) activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson) activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)