From ef5535321915123223d0a547b341a03842928f99 Mon Sep 17 00:00:00 2001 From: Seonghyun Oh Date: Mon, 5 Oct 2020 12:32:17 +0900 Subject: [PATCH] Use latest code if action's revision is mismatched (#4954) * Use latest code if action's revision is mismatched * Add test casae --- .../database/ArtifactStoreExceptions.scala | 2 + .../openwhisk/core/database/StoreUtils.scala | 5 +- .../core/invoker/InvokerReactive.scala | 108 +++++++++--------- .../behavior/ArtifactStoreCRUDBehaviors.scala | 11 +- .../scala/system/basic/WskSequenceTests.scala | 40 +++++++ 5 files changed, 110 insertions(+), 56 deletions(-) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreExceptions.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreExceptions.scala index 701ff8405be..f7738b9ccb5 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreExceptions.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/ArtifactStoreExceptions.scala @@ -25,6 +25,8 @@ case class DocumentConflictException(message: String) extends ArtifactStoreExcep case class DocumentTypeMismatchException(message: String) extends ArtifactStoreException(message) +case class DocumentRevisionMismatchException(message: String) extends ArtifactStoreException(message) + case class DocumentUnreadable(message: String) extends ArtifactStoreException(message) case class GetException(message: String) extends ArtifactStoreException(message) diff --git a/common/scala/src/main/scala/org/apache/openwhisk/core/database/StoreUtils.scala b/common/scala/src/main/scala/org/apache/openwhisk/core/database/StoreUtils.scala index e57b67358db..6a0c5df5770 100644 --- a/common/scala/src/main/scala/org/apache/openwhisk/core/database/StoreUtils.scala +++ b/common/scala/src/main/scala/org/apache/openwhisk/core/database/StoreUtils.scala @@ -74,7 +74,10 @@ private[database] object StoreUtils { val deserialized = asFormat.asInstanceOf[A] val responseRev = js.fields("_rev").convertTo[String] - assert(doc.rev.rev == null || doc.rev.rev == responseRev, "Returned revision should match original argument") + if (doc.rev.rev != null && doc.rev.rev != responseRev) { + throw DocumentRevisionMismatchException( + s"Returned revision should match original argument ${doc.rev.rev} ${responseRev}") + } // FIXME remove mutability from appropriate classes now that it is no longer required by GSON. deserialized.asInstanceOf[WhiskDocument].revision(DocRevision(responseRev)) } diff --git a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala index d8d71d1c654..75d9bd01ab5 100644 --- a/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala +++ b/core/invoker/src/main/scala/org/apache/openwhisk/core/invoker/InvokerReactive.scala @@ -164,6 +164,61 @@ class InvokerReactive( private val pool = actorSystem.actorOf(ContainerPool.props(childFactory, poolConfig, activationFeed, prewarmingConfigs)) + def handleActivationMessage(msg: ActivationMessage)(implicit transid: TransactionId): Future[Unit] = { + val namespace = msg.action.path + val name = msg.action.name + val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision) + val subject = msg.user.subject + + logging.debug(this, s"${actionid.id} $subject ${msg.activationId}") + + // caching is enabled since actions have revision id and an updated + // action will not hit in the cache due to change in the revision id; + // if the doc revision is missing, then bypass cache + if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}") + + WhiskAction + .get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) + .flatMap(action => { + action.toExecutableWhiskAction match { + case Some(executable) => + pool ! Run(executable, msg) + Future.successful(()) + case None => + logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}") + Future.failed(new IllegalStateException("non-executable action reached the invoker")) + } + }) + .recoverWith { + case DocumentRevisionMismatchException(_) => + // if revision is mismatched, the action may have been updated, + // so try again with the latest code + handleActivationMessage(msg.copy(revision = DocRevision.empty)) + case t => + val response = t match { + case _: NoDocumentException => + ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking) + case _: DocumentTypeMismatchException | _: DocumentUnreadable => + ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking) + case _ => + ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking) + } + activationFeed ! MessageFeed.Processed + + val activation = generateFallbackActivation(msg, response) + ack( + msg.transid, + activation, + msg.blocking, + msg.rootControllerIndex, + msg.user.namespace.uuid, + CombinedCompletionAndResultMessage(transid, activation, instance)) + + store(msg.transid, activation, msg.blocking, UserContext(msg.user)) + Future.successful(()) + } + } + /** Is called when an ActivationMessage is read from Kafka */ def processActivationMessage(bytes: Array[Byte]): Future[Unit] = { Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8))) @@ -179,58 +234,7 @@ class InvokerReactive( if (!namespaceBlacklist.isBlacklisted(msg.user)) { val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel) - val namespace = msg.action.path - val name = msg.action.name - val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision) - val subject = msg.user.subject - - logging.debug(this, s"${actionid.id} $subject ${msg.activationId}") - - // caching is enabled since actions have revision id and an updated - // action will not hit in the cache due to change in the revision id; - // if the doc revision is missing, then bypass cache - if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}") - - WhiskAction - .get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty) - .flatMap { action => - action.toExecutableWhiskAction match { - case Some(executable) => - pool ! Run(executable, msg) - Future.successful(()) - case None => - logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}") - Future.failed(new IllegalStateException("non-executable action reached the invoker")) - } - } - .recoverWith { - case t => - // If the action cannot be found, the user has concurrently deleted it, - // making this an application error. All other errors are considered system - // errors and should cause the invoker to be considered unhealthy. - val response = t match { - case _: NoDocumentException => - ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking) - case _: DocumentTypeMismatchException | _: DocumentUnreadable => - ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking) - case _ => - ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking) - } - - activationFeed ! MessageFeed.Processed - - val activation = generateFallbackActivation(msg, response) - ack( - msg.transid, - activation, - msg.blocking, - msg.rootControllerIndex, - msg.user.namespace.uuid, - CombinedCompletionAndResultMessage(transid, activation, instance)) - - store(msg.transid, activation, msg.blocking, UserContext(msg.user)) - Future.successful(()) - } + handleActivationMessage(msg) } else { // Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol // Due to the protective nature of the blacklist, a database entry is not written. diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala index bbd232bbc97..2a4388a1f9b 100644 --- a/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala +++ b/tests/src/test/scala/org/apache/openwhisk/core/database/test/behavior/ArtifactStoreCRUDBehaviors.scala @@ -20,7 +20,12 @@ package org.apache.openwhisk.core.database.test.behavior import java.time.Instant import org.apache.openwhisk.common.TransactionId -import org.apache.openwhisk.core.database.{DocumentConflictException, DocumentProvider, NoDocumentException} +import org.apache.openwhisk.core.database.{ + DocumentConflictException, + DocumentProvider, + DocumentRevisionMismatchException, + NoDocumentException +} import org.apache.openwhisk.core.entity._ trait ArtifactStoreCRUDBehaviors extends ArtifactStoreBehaviorBase { @@ -171,7 +176,7 @@ trait ArtifactStoreCRUDBehaviors extends ArtifactStoreBehaviorBase { activationFromDb shouldBe activation } - it should "throws NoDocumentException when document revision does not match" in { + it should "throws DocumentRevisionMismatchException when document revision does not match" in { implicit val tid: TransactionId = transid() val auth = newAuth() val doc = put(authStore, auth) @@ -179,7 +184,7 @@ trait ArtifactStoreCRUDBehaviors extends ArtifactStoreBehaviorBase { val auth2 = getWhiskAuth(doc).copy(namespaces = Set(wskNS("foo1"))).revision[WhiskAuth](doc.rev) val doc2 = put(authStore, auth2) - authStore.get[WhiskAuth](doc).failed.futureValue.getCause shouldBe a[AssertionError] + authStore.get[WhiskAuth](doc).failed.futureValue shouldBe a[DocumentRevisionMismatchException] val authFromGet = getWhiskAuth(doc2) authFromGet shouldBe auth2 diff --git a/tests/src/test/scala/system/basic/WskSequenceTests.scala b/tests/src/test/scala/system/basic/WskSequenceTests.scala index 11ddd8fc9d6..1cf265f47b3 100644 --- a/tests/src/test/scala/system/basic/WskSequenceTests.scala +++ b/tests/src/test/scala/system/basic/WskSequenceTests.scala @@ -501,6 +501,46 @@ class WskSequenceTests extends TestHelpers with WskTestHelpers with StreamLoggin checkEchoSeqRuleResult(newRun, seqName, JsObject(newPayload)) } + it should "run a sub-action even if it is updated while the sequence action is running" in withAssetCleaner(wskprops) { + (wp, assetHelper) => + val seqName = "sequence" + val sleep = "sleep" + val echo = "echo" + val slowInvokeDuration = 5.seconds + + // create echo action + val echoFile = TestUtils.getTestActionFilename(s"$echo.js") + assetHelper.withCleaner(wsk.action, echo) { (action, actionName) => + action.create(name = actionName, artifact = Some(echoFile), timeout = Some(allowedActionDuration)) + } + // create sleep action + val sleepFile = TestUtils.getTestActionFilename(s"$sleep.js") + assetHelper.withCleaner(wsk.action, sleep) { (action, actionName) => + action.create( + name = sleep, + artifact = Some(sleepFile), + parameters = Map("sleepTimeInMs" -> slowInvokeDuration.toMillis.toJson), + timeout = Some(allowedActionDuration)) + } + + // create sequence + assetHelper.withCleaner(wsk.action, seqName) { (action, seqName) => + action.create(seqName, Some(s"$sleep,$echo"), kind = Some("sequence")) + } + val run = wsk.action.invoke(seqName) + + // update the sub-action before the sequence action invokes it + wsk.action.create(name = echo, artifact = None, annotations = Map("a" -> JsString("A")), update = true) + wsk.action.invoke(echo) + + wsk.action.create(name = echo, artifact = None, annotations = Map("b" -> JsString("B")), update = true) + wsk.action.invoke(echo) + + withActivation(wsk.activation, run, totalWait = 2 * allowedActionDuration) { activation => + activation.response.status shouldBe "success" + } + } + /** * checks the result of an echo sequence connected to a trigger through a rule * @param triggerFireRun the run result of firing the trigger