Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use latest code if action's revision is mismatched #4954

Merged
merged 2 commits into from Oct 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,6 +25,8 @@ case class DocumentConflictException(message: String) extends ArtifactStoreExcep

case class DocumentTypeMismatchException(message: String) extends ArtifactStoreException(message)

case class DocumentRevisionMismatchException(message: String) extends ArtifactStoreException(message)

case class DocumentUnreadable(message: String) extends ArtifactStoreException(message)

case class GetException(message: String) extends ArtifactStoreException(message)
Expand Down
Expand Up @@ -74,7 +74,10 @@ private[database] object StoreUtils {
val deserialized = asFormat.asInstanceOf[A]

val responseRev = js.fields("_rev").convertTo[String]
assert(doc.rev.rev == null || doc.rev.rev == responseRev, "Returned revision should match original argument")
if (doc.rev.rev != null && doc.rev.rev != responseRev) {
throw DocumentRevisionMismatchException(
s"Returned revision should match original argument ${doc.rev.rev} ${responseRev}")
}
// FIXME remove mutability from appropriate classes now that it is no longer required by GSON.
deserialized.asInstanceOf[WhiskDocument].revision(DocRevision(responseRev))
}
Expand Down
Expand Up @@ -164,6 +164,61 @@ class InvokerReactive(
private val pool =
actorSystem.actorOf(ContainerPool.props(childFactory, poolConfig, activationFeed, prewarmingConfigs))

def handleActivationMessage(msg: ActivationMessage)(implicit transid: TransactionId): Future[Unit] = {
val namespace = msg.action.path
val name = msg.action.name
val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
val subject = msg.user.subject

logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")

// caching is enabled since actions have revision id and an updated
// action will not hit in the cache due to change in the revision id;
// if the doc revision is missing, then bypass cache
if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}")

WhiskAction
.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty)
.flatMap(action => {
action.toExecutableWhiskAction match {
case Some(executable) =>
pool ! Run(executable, msg)
Future.successful(())
case None =>
logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
Future.failed(new IllegalStateException("non-executable action reached the invoker"))
}
})
.recoverWith {
case DocumentRevisionMismatchException(_) =>
// if revision is mismatched, the action may have been updated,
// so try again with the latest code
handleActivationMessage(msg.copy(revision = DocRevision.empty))
Copy link
Member

@style95 style95 Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be controversial but I think the system should take care of this.
This is to handle the case where the underlying actions are updated while a sequence action is being invoked.
If we consider the sequence as just a coordinator for underlying actions, it would be fine for it to always invoke the latest code. Once a sequence action is defined with some actions, we don't need to update the sequence action whenever the underlying actions are updated. It means the sequence action itself does not care about the version of actions in it but just focuses on the relation and the execution order of them.
So I think it is reasonable to invoke the latest codes all the time.

And regarding the implementation, while this is great, can we differentiate the sequence case with the others?
I feel like there can be some side effects.

For example, if any activation sent to Kafka arrives at the invoker side late, it could happen.
In such a case the activation is intended for the old codes but the latest code will be invoked with this change.

I did not look into code deeply yet, but can we make the controller setup subsequent activations with the latest codes while invoking a sequence action?

Copy link
Member Author

@upgle upgle Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review and I understand your concern.

For example, if any activation sent to Kafka arrives at the invoker side late, it could happen.
In such a case the activation is intended for the old codes but the latest code will be invoked with this change.

Currently, Openwhisk doesn't have a versioning feature. So I don't think there's any problem with always serving the latest code if the DB can't fetch the older version, because action developers are responsible for compatibility between old and new action codes.

HDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure about this.
It would be great to listen to other reviewers' opinions as well.
From some point of view, it could be a semantic change as activations that are supposed to be rejected would be invoked successfully.
(With an assumption that the codes are backward compatible.)

How about sending an email to the dev list to get more people to attend this PR?

Copy link
Member

@style95 style95 Sep 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we leave at least a warning log to describe the system takes a fallback to the latest codes because of the revision mismatch?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're also working on versioning - I think this change should be considered in that broader context.

If the sequence or composition references actions specifically by version, then it should be an error to invoke an alternate version. If the sequence uses "latest" then this change is acceptable.

Was the intent for this change strictly to address compositions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.
That needs to be considered in the action versioning feature I think.

CC: @jiangpengcheng

case t =>
val response = t match {
case _: NoDocumentException =>
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
case _ =>
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
}
activationFeed ! MessageFeed.Processed

val activation = generateFallbackActivation(msg, response)
ack(
msg.transid,
activation,
msg.blocking,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))

store(msg.transid, activation, msg.blocking, UserContext(msg.user))
Future.successful(())
}
}

/** Is called when an ActivationMessage is read from Kafka */
def processActivationMessage(bytes: Array[Byte]): Future[Unit] = {
Future(ActivationMessage.parse(new String(bytes, StandardCharsets.UTF_8)))
Expand All @@ -179,58 +234,7 @@ class InvokerReactive(

if (!namespaceBlacklist.isBlacklisted(msg.user)) {
val start = transid.started(this, LoggingMarkers.INVOKER_ACTIVATION, logLevel = InfoLevel)
val namespace = msg.action.path
val name = msg.action.name
val actionid = FullyQualifiedEntityName(namespace, name).toDocId.asDocInfo(msg.revision)
val subject = msg.user.subject

logging.debug(this, s"${actionid.id} $subject ${msg.activationId}")

// caching is enabled since actions have revision id and an updated
// action will not hit in the cache due to change in the revision id;
// if the doc revision is missing, then bypass cache
if (actionid.rev == DocRevision.empty) logging.warn(this, s"revision was not provided for ${actionid.id}")

WhiskAction
.get(entityStore, actionid.id, actionid.rev, fromCache = actionid.rev != DocRevision.empty)
.flatMap { action =>
action.toExecutableWhiskAction match {
case Some(executable) =>
pool ! Run(executable, msg)
Future.successful(())
case None =>
logging.error(this, s"non-executable action reached the invoker ${action.fullyQualifiedName(false)}")
Future.failed(new IllegalStateException("non-executable action reached the invoker"))
}
}
.recoverWith {
case t =>
// If the action cannot be found, the user has concurrently deleted it,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is still valid - why was it removed?

// making this an application error. All other errors are considered system
// errors and should cause the invoker to be considered unhealthy.
val response = t match {
case _: NoDocumentException =>
ActivationResponse.applicationError(Messages.actionRemovedWhileInvoking)
case _: DocumentTypeMismatchException | _: DocumentUnreadable =>
ActivationResponse.whiskError(Messages.actionMismatchWhileInvoking)
case _ =>
ActivationResponse.whiskError(Messages.actionFetchErrorWhileInvoking)
}

activationFeed ! MessageFeed.Processed

val activation = generateFallbackActivation(msg, response)
ack(
msg.transid,
activation,
msg.blocking,
msg.rootControllerIndex,
msg.user.namespace.uuid,
CombinedCompletionAndResultMessage(transid, activation, instance))

store(msg.transid, activation, msg.blocking, UserContext(msg.user))
Future.successful(())
}
handleActivationMessage(msg)
} else {
// Iff the current namespace is blacklisted, an active-ack is only produced to keep the loadbalancer protocol
// Due to the protective nature of the blacklist, a database entry is not written.
Expand Down
Expand Up @@ -20,7 +20,12 @@ package org.apache.openwhisk.core.database.test.behavior
import java.time.Instant

import org.apache.openwhisk.common.TransactionId
import org.apache.openwhisk.core.database.{DocumentConflictException, DocumentProvider, NoDocumentException}
import org.apache.openwhisk.core.database.{
DocumentConflictException,
DocumentProvider,
DocumentRevisionMismatchException,
NoDocumentException
}
import org.apache.openwhisk.core.entity._

trait ArtifactStoreCRUDBehaviors extends ArtifactStoreBehaviorBase {
Expand Down Expand Up @@ -171,15 +176,15 @@ trait ArtifactStoreCRUDBehaviors extends ArtifactStoreBehaviorBase {
activationFromDb shouldBe activation
}

it should "throws NoDocumentException when document revision does not match" in {
it should "throws DocumentRevisionMismatchException when document revision does not match" in {
implicit val tid: TransactionId = transid()
val auth = newAuth()
val doc = put(authStore, auth)

val auth2 = getWhiskAuth(doc).copy(namespaces = Set(wskNS("foo1"))).revision[WhiskAuth](doc.rev)
val doc2 = put(authStore, auth2)

authStore.get[WhiskAuth](doc).failed.futureValue.getCause shouldBe a[AssertionError]
authStore.get[WhiskAuth](doc).failed.futureValue shouldBe a[DocumentRevisionMismatchException]

val authFromGet = getWhiskAuth(doc2)
authFromGet shouldBe auth2
Expand Down
40 changes: 40 additions & 0 deletions tests/src/test/scala/system/basic/WskSequenceTests.scala
Expand Up @@ -501,6 +501,46 @@ class WskSequenceTests extends TestHelpers with WskTestHelpers with StreamLoggin
checkEchoSeqRuleResult(newRun, seqName, JsObject(newPayload))
}

it should "run a sub-action even if it is updated while the sequence action is running" in withAssetCleaner(wskprops) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a test case. And this test currently does not pass on the master branch.

(wp, assetHelper) =>
val seqName = "sequence"
val sleep = "sleep"
val echo = "echo"
val slowInvokeDuration = 5.seconds

// create echo action
val echoFile = TestUtils.getTestActionFilename(s"$echo.js")
assetHelper.withCleaner(wsk.action, echo) { (action, actionName) =>
action.create(name = actionName, artifact = Some(echoFile), timeout = Some(allowedActionDuration))
}
// create sleep action
val sleepFile = TestUtils.getTestActionFilename(s"$sleep.js")
assetHelper.withCleaner(wsk.action, sleep) { (action, actionName) =>
action.create(
name = sleep,
artifact = Some(sleepFile),
parameters = Map("sleepTimeInMs" -> slowInvokeDuration.toMillis.toJson),
timeout = Some(allowedActionDuration))
}

// create sequence
assetHelper.withCleaner(wsk.action, seqName) { (action, seqName) =>
action.create(seqName, Some(s"$sleep,$echo"), kind = Some("sequence"))
}
val run = wsk.action.invoke(seqName)

// update the sub-action before the sequence action invokes it
wsk.action.create(name = echo, artifact = None, annotations = Map("a" -> JsString("A")), update = true)
wsk.action.invoke(echo)

wsk.action.create(name = echo, artifact = None, annotations = Map("b" -> JsString("B")), update = true)
wsk.action.invoke(echo)

withActivation(wsk.activation, run, totalWait = 2 * allowedActionDuration) { activation =>
activation.response.status shouldBe "success"
}
}

/**
* checks the result of an echo sequence connected to a trigger through a rule
* @param triggerFireRun the run result of firing the trigger
Expand Down