Skip to content

Commit

Permalink
Use latest code if action's revision is mismatched
Browse files Browse the repository at this point in the history
  • Loading branch information
upgle committed Aug 28, 2020
1 parent e255126 commit f2e25f1
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 56 deletions.
Expand Up @@ -25,6 +25,8 @@ case class DocumentConflictException(message: String) extends ArtifactStoreExcep

case class DocumentTypeMismatchException(message: String) extends ArtifactStoreException(message)

case class DocumentRevisionMismatchException(message: String) extends ArtifactStoreException(message)

case class DocumentUnreadable(message: String) extends ArtifactStoreException(message)

case class GetException(message: String) extends ArtifactStoreException(message)
Expand Down
Expand Up @@ -74,7 +74,10 @@ private[database] object StoreUtils {
val deserialized = asFormat.asInstanceOf[A]

val responseRev = js.fields("_rev").convertTo[String]
assert(doc.rev.rev == null || doc.rev.rev == responseRev, "Returned revision should match original argument")
if (doc.rev.rev != null && doc.rev.rev != responseRev) {
throw DocumentRevisionMismatchException(
s"Returned revision should match original argument ${doc.rev.rev} ${responseRev}")
}
// FIXME remove mutability from appropriate classes now that it is no longer required by GSON.
deserialized.asInstanceOf[WhiskDocument].revision(DocRevision(responseRev))
}
Expand Down
Expand Up @@ -164,6 +164,61 @@ class InvokerReactive(
private val pool =
actorSystem.actorOf(ContainerPool.props(childFactory, poolConfig, activationFeed, prewarmingConfigs))

def handleActivationMessage(msg: ActivationMessage)(implicit transid: TransactionId): Future[Unit] = {
val namespace = msg.action.path
val name = msg.action.name
val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
val subject = msg.user.subject

logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")

// caching is enabled since actions have revision id and an updated
// action will not hit in the cache due to change in the revision id;
// if the doc revision is missing, then bypass cache
if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}")

WhiskAction
.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty)
.flatMap(action => {
action.toExecutableWhiskAction match {
case Some(executable) =>
pool ! Run(executable, msg)
Future.successful(())
case None =>
logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
Future.failed(new IllegalStateException("non-executable action reached the invoker"))
}
})
.recoverWith {
case DocumentRevisionMismatchException(_) =>
// if revision is mismatched, the action may have been updated,
// so try again with the latest code
handleActivationMessage(msg.copy(revision = DocRevision.empty))
case t =>
val response = t match {
case _: NoDocumentException =>
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
case _ =>
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
}
activationFeed ! MessageFeed.Processed

val activation = generateFallbackActivation(msg, response)
ack(
msg.transid,
activation,
msg.blocking,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))

store(msg.transid, activation, msg.blocking, UserContext(msg.user))
Future.successful(())
}
}

/** Is called when an ActivationMessage is read from Kafka */
def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
Expand All @@ -179,58 +234,7 @@ class InvokerReactive(

if (!namespaceBlacklist.isBlacklisted(msg.user)) {
val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel)
val namespace = msg.action.path
val name = msg.action.name
val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
val subject = msg.user.subject

logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")

// caching is enabled since actions have revision id and an updated
// action will not hit in the cache due to change in the revision id;
// if the doc revision is missing, then bypass cache
if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}")

WhiskAction
.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty)
.flatMap { action =>
action.toExecutableWhiskAction match {
case Some(executable) =>
pool ! Run(executable, msg)
Future.successful(())
case None =>
logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
Future.failed(new IllegalStateException("non-executable action reached the invoker"))
}
}
.recoverWith {
case t =>
// If the action cannot be found, the user has concurrently deleted it,
// making this an application error. All other errors are considered system
// errors and should cause the invoker to be considered unhealthy.
val response = t match {
case _: NoDocumentException =>
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
case _ =>
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
}

activationFeed ! MessageFeed.Processed

val activation = generateFallbackActivation(msg, response)
ack(
msg.transid,
activation,
msg.blocking,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))

store(msg.transid, activation, msg.blocking, UserContext(msg.user))
Future.successful(())
}
handleActivationMessage(msg)
} else {
// Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol
// Due to the protective nature of the blacklist, a database entry is not written.
Expand Down
Expand Up @@ -20,7 +20,12 @@ package org.apache.openwhisk.core.database.test.behavior
import java.time.Instant

import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.{DocumentConflictException, DocumentProvider, NoDocumentException}
import org.apache.openwhisk.core.database.{
DocumentConflictException,
DocumentProvider,
DocumentRevisionMismatchException,
NoDocumentException
}
import org.apache.openwhisk.core.entity._

trait ArtifactStoreCRUDBehaviors extends ArtifactStoreBehaviorBase {
Expand Down Expand Up @@ -171,15 +176,15 @@ trait ArtifactStoreCRUDBehaviors extends ArtifactStoreBehaviorBase {
activationFromDb shouldBe activation
}

it should "throws NoDocumentException when document revision does not match" in {
it should "throws DocumentRevisionMismatchException when document revision does not match" in {
implicit val tid: TransactionId = transid()
val auth = newAuth()
val doc = put(authStore, auth)

val auth2 = getWhiskAuth(doc).copy(namespaces = Set(wskNS("foo1"))).revision[WhiskAuth](doc.rev)
val doc2 = put(authStore, auth2)

authStore.get[WhiskAuth](doc).failed.futureValue.getCause shouldBe a[AssertionError]
authStore.get[WhiskAuth](doc).failed.futureValue shouldBe a[DocumentRevisionMismatchException]

val authFromGet = getWhiskAuth(doc2)
authFromGet shouldBe auth2
Expand Down

0 comments on commit f2e25f1

Please sign in to comment.