diff --git a/common/scala/src/main/scala/whisk/core/connector/Message.scala b/common/scala/src/main/scala/whisk/core/connector/Message.scala index ed677b112da..ebfb59b192f 100644 --- a/common/scala/src/main/scala/whisk/core/connector/Message.scala +++ b/common/scala/src/main/scala/whisk/core/connector/Message.scala @@ -72,24 +72,51 @@ object ActivationMessage extends DefaultJsonProtocol { } /** - * When adding fields, the serdes of the companion object must be updated also. - * The whisk activation field will have its logs stripped. + * Message that is sent from the invoker to the controller after action is completed or after slot is free again for + * new actions. + */ +abstract class AcknowledegmentMessage() extends Message { + override val transid: TransactionId + override def serialize: String = { + AcknowledegmentMessage.serdes.write(this).compactPrint + } +} + +/** + * This message is sent from the invoker to the controller, after the slot of an invoker that has been used by the + * current action, is free again (after log collection) */ case class CompletionMessage(override val transid: TransactionId, - response: Either[ActivationId, WhiskActivation], + activationId: ActivationId, + isSystemError: Boolean, invoker: InvokerInstanceId) - extends Message { + extends AcknowledegmentMessage() { - override def serialize: String = { - CompletionMessage.serdes.write(this).compactPrint + override def toString = { + activationId.asString } +} + +object CompletionMessage extends DefaultJsonProtocol { + def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson)) + implicit val serdes = jsonFormat4(CompletionMessage.apply) +} + +/** + * That message will be sent from the invoker to the controller after action completion if the user wants to have + * the result immediately (blocking activation). + * When adding fields, the serdes of the companion object must be updated also. + * The whisk activation field will have its logs stripped. + */ +case class ResultMessage(override val transid: TransactionId, response: Either[ActivationId, WhiskActivation]) + extends AcknowledegmentMessage() { override def toString = { response.fold(l => l, r => r.activationId).asString } } -object CompletionMessage extends DefaultJsonProtocol { +object ResultMessage extends DefaultJsonProtocol { implicit def eitherResponse = new JsonFormat[Either[ActivationId, WhiskActivation]] { def write(either: Either[ActivationId, WhiskActivation]) = either match { @@ -101,12 +128,38 @@ object CompletionMessage extends DefaultJsonProtocol { // per the ActivationId's serializer, it is guaranteed to be a String even if it only consists of digits case _: JsString => Left(value.convertTo[ActivationId]) case _: JsObject => Right(value.convertTo[WhiskActivation]) - case _ => deserializationError("could not read CompletionMessage") + case _ => deserializationError("could not read ResultMessage") } } - def parse(msg: String): Try[CompletionMessage] = Try(serdes.read(msg.parseJson)) - private val serdes = jsonFormat3(CompletionMessage.apply) + def parse(msg: String): Try[ResultMessage] = Try(serdes.read(msg.parseJson)) + implicit val serdes = jsonFormat2(ResultMessage.apply) +} + +object AcknowledegmentMessage extends DefaultJsonProtocol { + def parse(msg: String): Try[AcknowledegmentMessage] = { + Try(serdes.read(msg.parseJson)) + } + + implicit val serdes = new RootJsonFormat[AcknowledegmentMessage] { + override def write(obj: AcknowledegmentMessage): JsValue = { + obj match { + case c: CompletionMessage => c.toJson + case r: ResultMessage => r.toJson + } + } + + override def read(json: JsValue): AcknowledegmentMessage = { + json.asJsObject + // The field invoker is only part of the CompletionMessage. If this field is part of the JSON, we try to convert + // it to a CompletionMessage. Otherwise to a ResultMessage. + // If both conversions fail, an error will be thrown that needs to be handled. + .getFields("invoker") + .headOption + .map(_ => json.convertTo[CompletionMessage]) + .getOrElse(json.convertTo[ResultMessage]) + } + } } case class PingMessage(instance: InvokerInstanceId) extends Message { diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala index 66f66a2fe1a..1769904c723 100644 --- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala +++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala @@ -279,12 +279,12 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con // Install a timeout handler for the catastrophic case where an active ack is not received at all // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when - // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized); + // the completion ack is significantly delayed (possibly dues to long queues but the subject should not be penalized); // in this case, if the activation handler is still registered, remove it and update the books. activations.getOrElseUpdate( msg.activationId, { val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) { - processCompletion(Left(msg.activationId), msg.transid, forced = true, invoker = instance) + processCompletion(msg.activationId, msg.transid, forced = true, isSystemError = false, invoker = instance) } // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success @@ -344,36 +344,61 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con activeAckConsumer, maxActiveAcksPerPoll, activeAckPollDuration, - processActiveAck) + processAcknowledgement) }) - /** 4. Get the active-ack message and parse it */ - private def processActiveAck(bytes: Array[Byte]): Future[Unit] = Future { + /** 4. Get the acknowledgement message and parse it */ + private def processAcknowledgement(bytes: Array[Byte]): Future[Unit] = Future { val raw = new String(bytes, StandardCharsets.UTF_8) - CompletionMessage.parse(raw) match { + AcknowledegmentMessage.parse(raw) match { case Success(m: CompletionMessage) => - processCompletion(m.response, m.transid, forced = false, invoker = m.invoker) + processCompletion( + m.activationId, + m.transid, + forced = false, + isSystemError = m.isSystemError, + invoker = m.invoker) + activationFeed ! MessageFeed.Processed + + case Success(m: ResultMessage) => + processResult(m.response, m.transid) activationFeed ! MessageFeed.Processed case Failure(t) => activationFeed ! MessageFeed.Processed - logging.error(this, s"failed processing message: $raw with $t") + logging.error(this, s"failed processing message: $raw") + + case _ => + activationFeed ! MessageFeed.Processed + logging.error(this, s"Unexpected Acknowledgment message received by loadbalancer: $raw") + } + } + + /** 5. Process the result ack and return it to the user */ + private def processResult(response: Either[ActivationId, WhiskActivation], tid: TransactionId): Unit = { + val aid = response.fold(l => l, r => r.activationId) + + // Resolve the promise to send the result back to the user + // The activation will be removed from `activations`-map later, when we receive the completion message, because the + // slot of the invoker is not yet free for new activations. + activations.get(aid).map { entry => + entry.promise.trySuccess(response) } + logging.info(this, s"received result ack for '$aid'")(tid) } - /** 5. Process the active-ack and update the state accordingly */ - private def processCompletion(response: Either[ActivationId, WhiskActivation], + /** Process the completion ack and update the state */ + private def processCompletion(aid: ActivationId, tid: TransactionId, forced: Boolean, + isSystemError: Boolean, invoker: InvokerInstanceId): Unit = { - val aid = response.fold(l => l, r => r.activationId) val invocationResult = if (forced) { InvocationFinishedResult.Timeout } else { // If the response contains a system error, report that, otherwise report Success // Left generally is considered a Success, since that could be a message not fitting into Kafka - val isSystemError = response.fold(_ => false, _.response.isWhiskError) if (isSystemError) { InvocationFinishedResult.SystemError } else { @@ -390,12 +415,16 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con if (!forced) { entry.timeoutHandler.cancel() - entry.promise.trySuccess(response) + // If the action was blocking and the Resultmessage has been received before nothing will happen here. + // If the action was blocking and the ResultMessage is still missing, we pass the ActivationId. With this Id, + // the controller will get the result out of the database. + // If the action was non-blocking, we will close the promise here. + entry.promise.trySuccess(Left(aid)) } else { - entry.promise.tryFailure(new Throwable("no active ack received")) + entry.promise.tryFailure(new Throwable("no completion ack received")) } - logging.info(this, s"${if (!forced) "received" else "forced"} active ack for '$aid'")(tid) + logging.info(this, s"${if (!forced) "received" else "forced"} completion ack for '$aid'")(tid) // Active acks that are received here are strictly from user actions - health actions are not part of // the load balancer's activation map. Inform the invoker pool supervisor of the user action completion. invokerPool ! InvocationFinishedMessage(invoker, invocationResult) @@ -403,17 +432,17 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Con // Health actions do not have an ActivationEntry as they are written on the message bus directly. Their result // is important to pass to the invokerPool because they are used to determine if the invoker can be considered // healthy again. - logging.info(this, s"received active ack for health action on $invoker")(tid) + logging.info(this, s"received completion ack for health action on $invoker")(tid) invokerPool ! InvocationFinishedMessage(invoker, invocationResult) case None if !forced => // Received an active-ack that has already been taken out of the state because of a timeout (forced active-ack). // The result is ignored because a timeout has already been reported to the invokerPool per the force. - logging.debug(this, s"received active ack for '$aid' which has no entry")(tid) + logging.debug(this, s"received completion ack for '$aid' which has no entry")(tid) case None => // The entry has already been removed by an active ack. This part of the code is reached by the timeout and can // happen if active-ack and timeout happen roughly at the same time (the timeout was triggered before the active // ack canceled the timer). As the active ack is already processed we don't have to do anything here. - logging.debug(this, s"forced active ack for '$aid' which has no entry")(tid) + logging.debug(this, s"forced completion ack for '$aid' which has no entry")(tid) } } diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala index b34ce587d4d..57e5c4fc5ea 100644 --- a/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala +++ b/core/invoker/src/main/scala/whisk/core/containerpool/ContainerProxy.scala @@ -96,7 +96,7 @@ case object RescheduleJob // job is sent back to parent and could not be process */ class ContainerProxy( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any], + sendActiveAck: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any], storeActivation: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, @@ -168,7 +168,8 @@ class ContainerProxy( activation, job.msg.blocking, job.msg.rootControllerIndex, - job.msg.user.namespace.uuid) + job.msg.user.namespace.uuid, + true) storeActivation(transid, activation, context) } .flatMap { container => @@ -390,8 +391,10 @@ class ContainerProxy( } // Sending active ack. Entirely asynchronous and not waited upon. - activation.foreach( - sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid)) + if (job.msg.blocking) { + activation.foreach( + sendActiveAck(tid, _, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, false)) + } val context = UserContext(job.msg.user) @@ -418,8 +421,14 @@ class ContainerProxy( } } - // Storing the record. Entirely asynchronous and not waited upon. - activationWithLogs.map(_.fold(_.activation, identity)).foreach(storeActivation(tid, _, context)) + activationWithLogs + .map(_.fold(_.activation, identity)) + .foreach { activation => + // Sending the completionMessage to the controller asynchronously. + sendActiveAck(tid, activation, job.msg.blocking, job.msg.rootControllerIndex, job.msg.user.namespace.uuid, true) + // Storing the record. Entirely asynchronous and not waited upon. + storeActivation(tid, activation, context) + } // Disambiguate activation errors and transform the Either into a failed/successful Future respectively. activationWithLogs.flatMap { @@ -436,7 +445,7 @@ final case class ContainerProxyTimeoutConfig(idleContainer: FiniteDuration, paus object ContainerProxy { def props( factory: (TransactionId, String, ImageName, Boolean, ByteSize, Int) => Future[Container], - ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID) => Future[Any], + ack: (TransactionId, WhiskActivation, Boolean, ControllerInstanceId, UUID, Boolean) => Future[Any], store: (TransactionId, WhiskActivation, UserContext) => Future[Any], collectLogs: (TransactionId, Identity, WhiskActivation, Container, ExecutableWhiskAction) => Future[ActivationLogs], instance: InvokerInstanceId, diff --git a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala index 28b028984d2..324a8555c0a 100644 --- a/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/whisk/core/invoker/InvokerReactive.scala @@ -117,11 +117,19 @@ class InvokerReactive( activationResult: WhiskActivation, blockingInvoke: Boolean, controllerInstance: ControllerInstanceId, - userId: UUID) => { + userId: UUID, + isSlotFree: Boolean) => { implicit val transid: TransactionId = tid def send(res: Either[ActivationId, WhiskActivation], recovery: Boolean = false) = { - val msg = CompletionMessage(transid, res, instance) + val msg = if (isSlotFree) { + val aid = res.fold(identity, _.activationId) + val isWhiskSystemError = res.fold(_ => false, _.response.isWhiskError) + CompletionMessage(transid, aid, isWhiskSystemError, instance) + } else { + ResultMessage(transid, res) + } + producer.send(topic = "completed" + controllerInstance.asString, msg).andThen { case Success(_) => logging.info( @@ -223,7 +231,7 @@ class InvokerReactive( val context = UserContext(msg.user) val activation = generateFallbackActivation(msg, response) activationFeed ! MessageFeed.Processed - ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid) + ack(msg.transid, activation, msg.blocking, msg.rootControllerIndex, msg.user.namespace.uuid, true) store(msg.transid, activation, context) Future.successful(()) } @@ -233,7 +241,7 @@ class InvokerReactive( activationFeed ! MessageFeed.Processed val activation = generateFallbackActivation(msg, ActivationResponse.applicationError(Messages.namespacesBlacklisted)) - ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid) + ack(msg.transid, activation, false, msg.rootControllerIndex, msg.user.namespace.uuid, true) logging.warn(this, s"namespace ${msg.user.namespace.name} was blocked in invoker.") Future.successful(()) } diff --git a/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala new file mode 100644 index 00000000000..c82fc664388 --- /dev/null +++ b/tests/src/test/scala/whisk/core/connector/tests/AcknowledgementMessageTests.scala @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package whisk.core.connector.tests + +import java.time.Instant + +import org.junit.runner.RunWith +import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.junit.JUnitRunner +import spray.json._ +import whisk.common.TransactionId +import whisk.core.connector.{AcknowledegmentMessage, CompletionMessage, ResultMessage} +import whisk.core.entity._ +import whisk.core.entity.size.SizeInt + +import scala.concurrent.duration.DurationInt +import scala.util.Success + +/** + * Unit tests for the AcknowledgementMessageTests object. + */ +@RunWith(classOf[JUnitRunner]) +class AcknowledgementMessageTests extends FlatSpec with Matchers { + + behavior of "result message" + + val defaultUserMemory: ByteSize = 1024.MB + val activation = WhiskActivation( + namespace = EntityPath("ns"), + name = EntityName("a"), + Subject(), + activationId = ActivationId.generate(), + start = Instant.now(), + end = Instant.now(), + response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))), + annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson), + duration = Some(123)) + + it should "serialize a left result message" in { + val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate())) + m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> m.response.left.get.toJson).compactPrint + } + + it should "serialize a right result message" in { + val m = + ResultMessage(TransactionId.testing, Right(activation)) + m.serialize shouldBe JsObject("transid" -> m.transid.toJson, "response" -> m.response.right.get.toJson).compactPrint + } + + it should "deserialize a left result message" in { + val m = ResultMessage(TransactionId.testing, Left(ActivationId.generate())) + ResultMessage.parse(m.serialize) shouldBe Success(m) + } + + it should "deserialize a right result message" in { + val m = + ResultMessage(TransactionId.testing, Right(activation)) + ResultMessage.parse(m.serialize) shouldBe Success(m) + } + + behavior of "acknowledgement message" + + it should "serialize a Completion message" in { + val c = CompletionMessage( + TransactionId.testing, + ActivationId.generate(), + false, + InvokerInstanceId(0, userMemory = defaultUserMemory)) + val m: AcknowledegmentMessage = c + m.serialize shouldBe c.toJson.compactPrint + } + + it should "serialize a Result message" in { + val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate())) + val m: AcknowledegmentMessage = r + m.serialize shouldBe r.toJson.compactPrint + } + + it should "deserialize a Completion message" in { + val c = CompletionMessage( + TransactionId.testing, + ActivationId.generate(), + false, + InvokerInstanceId(0, userMemory = defaultUserMemory)) + val m: AcknowledegmentMessage = c + AcknowledegmentMessage.parse(m.serialize) shouldBe Success(c) + } + + it should "deserialize a Result message" in { + val r = ResultMessage(TransactionId.testing, Left(ActivationId.generate())) + val m: AcknowledegmentMessage = r + AcknowledegmentMessage.parse(m.serialize) shouldBe Success(r) + } +} diff --git a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala b/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala deleted file mode 100644 index ed1ac37199a..00000000000 --- a/tests/src/test/scala/whisk/core/connector/tests/CompletionMessageTests.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package whisk.core.connector.tests - -import java.time.Instant - -import scala.util.Success -import scala.concurrent.duration.DurationInt - -import org.junit.runner.RunWith -import org.scalatest.FlatSpec -import org.scalatest.Matchers -import org.scalatest.junit.JUnitRunner - -import spray.json._ -import whisk.common.TransactionId -import whisk.core.connector.CompletionMessage -import whisk.core.entity._ -import whisk.core.entity.size.SizeInt - -/** - * Unit tests for the CompletionMessage object. - */ -@RunWith(classOf[JUnitRunner]) -class CompletionMessageTests extends FlatSpec with Matchers { - - behavior of "completion message" - - val defaultUserMemory: ByteSize = 1024.MB - val activation = WhiskActivation( - namespace = EntityPath("ns"), - name = EntityName("a"), - Subject(), - activationId = ActivationId.generate(), - start = Instant.now(), - end = Instant.now(), - response = ActivationResponse.success(Some(JsObject("res" -> JsNumber(1)))), - annotations = Parameters("limits", ActionLimits(TimeLimit(1.second), MemoryLimit(128.MB), LogLimit(1.MB)).toJson), - duration = Some(123)) - - it should "serialize a left completion message" in { - val m = CompletionMessage( - TransactionId.testing, - Left(ActivationId.generate()), - InvokerInstanceId(0, userMemory = defaultUserMemory)) - m.serialize shouldBe JsObject( - "transid" -> m.transid.toJson, - "response" -> m.response.left.get.toJson, - "invoker" -> m.invoker.toJson).compactPrint - } - - it should "serialize a right completion message" in { - val m = - CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0, userMemory = defaultUserMemory)) - m.serialize shouldBe JsObject( - "transid" -> m.transid.toJson, - "response" -> m.response.right.get.toJson, - "invoker" -> m.invoker.toJson).compactPrint - } - - it should "deserialize a left completion message" in { - val m = CompletionMessage( - TransactionId.testing, - Left(ActivationId.generate()), - InvokerInstanceId(0, userMemory = defaultUserMemory)) - CompletionMessage.parse(m.serialize) shouldBe Success(m) - } - - it should "deserialize a right completion message" in { - val m = - CompletionMessage(TransactionId.testing, Right(activation), InvokerInstanceId(0, userMemory = defaultUserMemory)) - CompletionMessage.parse(m.serialize) shouldBe Success(m) - } -} diff --git a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala index 920d5358b64..24ef264ca69 100644 --- a/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala +++ b/tests/src/test/scala/whisk/core/containerpool/test/ContainerProxyTests.scala @@ -147,7 +147,7 @@ class ContainerProxyTests /** Creates an inspectable version of the ack method, which records all calls in a buffer */ def createAcker(a: ExecutableWhiskAction = action) = LoggedFunction { - (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID) => + (_: TransactionId, activation: WhiskActivation, _: Boolean, _: ControllerInstanceId, _: UUID, _: Boolean) => activation.annotations.get("limits") shouldBe Some(a.limits.toJson) activation.annotations.get("path") shouldBe Some(a.fullyQualifiedName(false).toString.toJson) activation.annotations.get("kind") shouldBe Some(a.exec.kind.toJson)