Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add limit to not store activations for a limitted namespace. #4234

Merged
merged 3 commits into from Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -34,7 +34,26 @@ case class UserContext(user: Identity, request: HttpRequest = HttpRequest())
trait ActivationStore {

/**
* Stores an activation.
* Checks if an activation should be stored in database and stores it.
*
* @param activation activation to store
* @param context user and request context
* @param transid transaction ID for request
* @param notifier cache change notifier
* @return Future containing DocInfo related to stored activation
*/
def storeAfterCheck(activation: WhiskActivation, context: UserContext)(
implicit transid: TransactionId,
notifier: Option[CacheChangeNotification]): Future[DocInfo] = {
if (context.user.limits.storeActivations.getOrElse(true)) {
store(activation, context)
} else {
Future.successful(DocInfo(activation.docid))
}
}

/**
* Stores an activation in the database.
*
* @param activation activation to store
* @param context user and request context
Expand Down
Expand Up @@ -34,10 +34,11 @@ import scala.util.Try
case class UserLimits(invocationsPerMinute: Option[Int] = None,
concurrentInvocations: Option[Int] = None,
firesPerMinute: Option[Int] = None,
allowedKinds: Option[Set[String]] = None)
allowedKinds: Option[Set[String]] = None,
storeActivations: Option[Boolean] = None)

object UserLimits extends DefaultJsonProtocol {
implicit val serdes = jsonFormat4(UserLimits.apply)
implicit val serdes = jsonFormat5(UserLimits.apply)
}

protected[core] case class Namespace(name: EntityName, uuid: UUID)
Expand Down
Expand Up @@ -168,8 +168,9 @@ trait WhiskTriggersApi extends WhiskCollectionAPI {
triggerActivation
}
.map { activation =>
activationStore.store(activation, context)
activationStore.storeAfterCheck(activation, context)
}

respondWithActivationIdHeader(triggerActivationId) {
complete(Accepted, triggerActivationId.toJsObject)
}
Expand Down
Expand Up @@ -569,7 +569,7 @@ protected[actions] trait PrimitiveActions {
}
}

activationStore.store(activation, context)(transid, notifier = None)
activationStore.storeAfterCheck(activation, context)(transid, notifier = None)

activation
}
Expand Down
Expand Up @@ -175,7 +175,7 @@ protected[actions] trait SequenceActions {
case Failure(t) => logging.warn(this, s"activation event was not sent: $t")
}
}
activationStore.store(seqActivation, context)(transid, notifier = None)
activationStore.storeAfterCheck(seqActivation, context)(transid, notifier = None)

// This should never happen; in this case, there is no activation record created or stored:
// should there be?
Expand Down
Expand Up @@ -174,7 +174,7 @@ class InvokerReactive(
/** Stores an activation in the database. */
private val store = (tid: TransactionId, activation: WhiskActivation, context: UserContext) => {
implicit val transid: TransactionId = tid
activationStore.store(activation, context)(tid, notifier = None)
activationStore.storeAfterCheck(activation, context)(tid, notifier = None)
}

/** Creates a ContainerProxy Actor when being called. */
Expand Down
28 changes: 26 additions & 2 deletions tests/src/test/scala/limits/ThrottleTests.scala
Expand Up @@ -317,7 +317,7 @@ class NamespaceSpecificThrottleTests
}

sanitizeNamespaces(
Seq("zeroSubject", "zeroConcSubject", "oneSubject", "oneSequenceSubject"),
Seq("zeroSubject", "zeroConcSubject", "oneSubject", "oneSequenceSubject", "activationDisabled"),
expectedExitCode = DONTCARE_EXIT)

// Create a subject with rate limits == 0
Expand Down Expand Up @@ -346,8 +346,12 @@ class NamespaceSpecificThrottleTests
val oneSequenceProps = getAdditionalTestSubject("oneSequenceSubject")
wskadmin.cli(Seq("limits", "set", oneSequenceProps.namespace, "--invocationsPerMinute", "1", "--firesPerMinute", "1"))

// Create a subject where storing of activations in activationstore is disabled.
val activationDisabled = getAdditionalTestSubject("activationDisabled")
wskadmin.cli(Seq("limits", "set", activationDisabled.namespace, "--storeActivations", "false"))

override def afterAll() = {
sanitizeNamespaces(Seq(zeroProps, zeroConcProps, oneProps, oneSequenceProps).map(_.namespace))
sanitizeNamespaces(Seq(zeroProps, zeroConcProps, oneProps, oneSequenceProps, activationDisabled).map(_.namespace))
}

behavior of "Namespace-specific throttles"
Expand Down Expand Up @@ -463,4 +467,24 @@ class NamespaceSpecificThrottleTests
include(prefix(tooManyConcurrentRequests(0, 0))) and include("allowed: 0")
}
}

it should "not store an activation if disabled for this namespace" in withAssetCleaner(activationDisabled) {
(wp, assetHelper) =>
implicit val props = wp
val actionName = "activationDisabled"

assetHelper.withCleaner(wsk.action, actionName) { (action, _) =>
action.create(actionName, defaultAction)
}

val runResult = wsk.action.invoke(actionName)
val activationId = wsk.activation.extractActivationId(runResult)
withClue(s"did not find an activation id in '$runResult'") {
activationId shouldBe a[Some[_]]
}

val activation = wsk.activation.waitForActivation(activationId.get)

activation shouldBe 'Left
}
}
Expand Up @@ -229,6 +229,21 @@ class WskAdminTests extends TestHelpers with WskActorSystem with Matchers with B
wskadmin.cli(Seq("limits", "delete", subject)).stdout should include("Limits deleted")
}
}

it should "disable saving of activations in ActivationsStore" in {
val subject = Subject().asString
try {
// set limit
wskadmin.cli(Seq("limits", "set", subject, "--storeActivations", "false"))
// check correctly set
val lines = wskadmin.cli(Seq("limits", "get", subject)).stdout.lines.toSeq
lines should have size 1
lines(0) shouldBe "storeActivations = False"
} finally {
wskadmin.cli(Seq("limits", "delete", subject)).stdout should include("Limits deleted")
}
}

it should "adjust whitelist for namespace" in {
val subject = Subject().asString
try {
Expand Down
Expand Up @@ -105,7 +105,7 @@ protected trait ControllerTestCommon

def storeActivation(activation: WhiskActivation, context: UserContext)(implicit transid: TransactionId,
timeout: Duration = 10 seconds): DocInfo = {
val docFuture = activationStore.store(activation, context)
val docFuture = activationStore.storeAfterCheck(activation, context)
val doc = Await.result(docFuture, timeout)
assert(doc != null)
doc
Expand Down
Expand Up @@ -52,14 +52,17 @@ class LimitsCommandTests extends FlatSpec with WhiskAdminCliTestBase {
"--allowedKinds",
"nodejs:6",
"blackbox",
"--storeActivations",
"false",
ns) shouldBe CommandMessages.limitsSuccessfullySet(ns)

val limits = limitsStore.get[LimitEntity](DocInfo(LimitsCommand.limitIdOf(EntityName(ns)))).futureValue
limits.limits shouldBe UserLimits(
invocationsPerMinute = Some(3),
firesPerMinute = Some(7),
concurrentInvocations = Some(11),
allowedKinds = Some(Set("nodejs:6", "blackbox")))
allowedKinds = Some(Set("nodejs:6", "blackbox")),
storeActivations = Some(false))

resultOk("limits", "set", "--invocationsPerMinute", "13", ns) shouldBe CommandMessages.limitsSuccessfullyUpdated(ns)

Expand Down
4 changes: 4 additions & 0 deletions tools/admin/README-NEXT.md
Expand Up @@ -128,6 +128,10 @@ Limits successfully set for "space1"
# set limits on allowedKinds
$ wskadmin-next limits set --allowedKinds nodejs:6 python space1
Limits successfully set for "space1"

# set limits to disable saving of activations in activationstore
$ wskadmin-next limits set space1 --storeActivations false
Limits successfully set for "space1"
```

Note that limits apply to a namespace and will survive even if all users that share a namespace are deleted. You must manually delete them.
Expand Down
4 changes: 4 additions & 0 deletions tools/admin/README.md
Expand Up @@ -82,6 +82,10 @@ Limits successfully set for "space1"
# set limits on allowedKinds
$ wskadmin limits set space1 --allowedKinds nodejs:6 python
Limits successfully set for "space1"

# set limits to disable saving of activations in activationstore
$ wskadmin limits set space1 --storeActivations false
Limits successfully set for "space1"
```

Note that limits apply to a namespace and will survive even if all users that share a namespace are deleted. You must manually delete them.
Expand Down
Expand Up @@ -80,6 +80,13 @@ class LimitsCommand extends Subcommand("limits") with WhiskCommand {
name = "allowedKinds",
noshort = true,
default = None)
val storeActivations =
opt[String](
descr = "enable or disable storing of activations to datastore for this namespace",
argName = "STOREACTIVATIONS",
name = "storeActivations",
noshort = true,
default = None)

lazy val limits: LimitEntity =
new LimitEntity(
Expand All @@ -88,7 +95,8 @@ class LimitsCommand extends Subcommand("limits") with WhiskCommand {
invocationsPerMinute.toOption,
concurrentInvocations.toOption,
firesPerMinute.toOption,
allowedKinds.toOption.map(_.toSet)))
allowedKinds.toOption.map(_.toSet),
storeActivations.toOption.map(_.toBoolean)))
}
addSubcommand(set)

Expand Down Expand Up @@ -147,7 +155,8 @@ class LimitsCommand extends Subcommand("limits") with WhiskCommand {
l.concurrentInvocations.map(ci => s"concurrentInvocations = $ci"),
l.invocationsPerMinute.map(i => s"invocationsPerMinute = $i"),
l.firesPerMinute.map(i => s"firesPerMinute = $i"),
l.allowedKinds.map(k => s"allowedKinds = ${k.mkString(", ")}")).flatten.mkString(Properties.lineSeparator)
l.allowedKinds.map(k => s"allowedKinds = ${k.mkString(", ")}"),
l.storeActivations.map(sa => s"storeActivations = $sa")).flatten.mkString(Properties.lineSeparator)
Right(msg)
}
.recover {
Expand Down
13 changes: 11 additions & 2 deletions tools/admin/wskadmin
Expand Up @@ -86,6 +86,14 @@ def main():
exitCode = 1
sys.exit(exitCode)

def str_to_bool(value):
if value.lower() in ("yes", "true"):
return True
elif value.lower() in ("no", "false"):
return False
else:
raise argparse.ArgumentTypeError("%s is not a valid boolean." % value)

def parseArgs():
parser = argparse.ArgumentParser(description='OpenWhisk admin command line tool')
parser.add_argument('-v', '--verbose', help='verbose output', action='store_true')
Expand Down Expand Up @@ -132,6 +140,7 @@ def parseArgs():
subcmd.add_argument('--firesPerMinute', help='trigger fires per minute allowed', type=int)
subcmd.add_argument('--concurrentInvocations', help='concurrent invocations allowed for this namespace', type=int)
subcmd.add_argument('--allowedKinds', help='list of runtime kinds allowed in this namespace', nargs='+', type=str)
subcmd.add_argument('--storeActivations', help='enable or disable storing of activations to datastore for this namespace', default=None, type=str_to_bool)

subcmd = subparser.add_parser('get', help='get limits for a given namespace (if none exist, system defaults apply)')
subcmd.add_argument('namespace', help='the namespace to get limits for')
Expand Down Expand Up @@ -517,7 +526,7 @@ def setLimitsCmd(args, props):
(dbDoc, res) = getDocumentFromDb(props, quote_plus(docId), args.verbose)
doc = dbDoc or {'_id': docId}

limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds']
limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds', 'storeActivations']
for limit in limits:
givenLimit = argsDict.get(limit)
toSet = givenLimit if givenLimit != None else doc.get(limit)
Expand All @@ -536,7 +545,7 @@ def getLimitsCmd(args, props):
(dbDoc, res) = getDocumentFromDb(props, quote_plus(docId), args.verbose)

if dbDoc is not None:
limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds']
limits = ['invocationsPerMinute', 'firesPerMinute', 'concurrentInvocations', 'allowedKinds', 'storeActivations']
for limit in limits:
givenLimit = dbDoc.get(limit)
if givenLimit != None:
Expand Down