From 1a58e9aa232b5f23533842a955724ebb0895999c Mon Sep 17 00:00:00 2001 From: Carl Pulley <106966370+carlpulley-da@users.noreply.github.com> Date: Mon, 3 Apr 2023 17:21:12 +0100 Subject: [PATCH] Refactor trigger rule simulation lib (#16645) --- triggers/tests/BUILD.bazel | 39 ++- .../lf/engine/trigger/CatGenerators.scala | 227 ++++++++++++++++++ .../trigger/TriggerRuleSimulationLib.scala | 221 ++++++++++------- .../TriggerRuleSimulationLibTest.scala | 202 +--------------- 4 files changed, 404 insertions(+), 285 deletions(-) create mode 100644 triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/CatGenerators.scala diff --git a/triggers/tests/BUILD.bazel b/triggers/tests/BUILD.bazel index 19582b0b4896..e04ffeb967fe 100644 --- a/triggers/tests/BUILD.bazel +++ b/triggers/tests/BUILD.bazel @@ -83,13 +83,11 @@ EOF da_scala_library( name = "test-utils", srcs = [ - "src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLib.scala", "src/test/scala/com/digitalasset/daml/lf/engine/trigger/test/AbstractFuncTests.scala", "src/test/scala/com/digitalasset/daml/lf/engine/trigger/test/AbstractTriggerTest.scala", ], scala_deps = [ "@maven//:com_typesafe_akka_akka_stream", - "@maven//:org_scalacheck_scalacheck", "@maven//:org_scalactic_scalactic", "@maven//:org_scalatest_scalatest_core", "@maven//:org_scalatest_scalatest_matchers_core", @@ -104,7 +102,6 @@ da_scala_library( "//daml-lf/interpreter", "//daml-lf/language", "//daml-lf/transaction", - "//daml-script/converter", "//language-support/scala/bindings", "//language-support/scala/bindings-akka", "//ledger-api/rs-grpc-bridge", @@ -116,13 +113,9 @@ da_scala_library( "//ledger/participant-integration-api", "//ledger/sandbox-on-x", "//ledger/sandbox-on-x:sandbox-on-x-test-lib", - "//libs-scala/contextualized-logging", "//libs-scala/ledger-resources", - "//libs-scala/logging-entries", "//libs-scala/ports", "//libs-scala/resources", - "//libs-scala/scala-utils", - "//observability/tracing", "//test-common", "//triggers/runner:trigger-runner-lib", "@maven//:org_scalatest_scalatest_compatible", @@ -246,6 +239,37 @@ da_scala_library( for suffix in [("-" + lf_version) if lf_version else ""] ] +da_scala_library( + name = "trigger-simulation-lib", + srcs = [ + "src/test/scala/com/digitalasset/daml/lf/engine/trigger/CatGenerators.scala", + "src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLib.scala", + ], + scala_deps = [ + "@maven//:com_typesafe_akka_akka_stream", + "@maven//:org_scalacheck_scalacheck", + "@maven//:org_scalactic_scalactic", + "@maven//:org_scalatest_scalatest_core", + "@maven//:org_scalaz_scalaz_core", + ], + deps = [ + "//daml-lf/data", + "//daml-lf/interpreter", + "//daml-lf/transaction", + "//daml-script/converter", + "//language-support/scala/bindings", + "//language-support/scala/bindings-akka", + "//ledger/ledger-api-common", + "//ledger/participant-integration-api", + "//libs-scala/contextualized-logging", + "//libs-scala/logging-entries", + "//libs-scala/scala-utils", + "//observability/tracing", + "//triggers/runner:trigger-runner-lib", + "@maven//:org_scalatest_scalatest_compatible", + ], +) + da_scala_test_suite( name = "trigger-simulation-lib-tests", timeout = "long", @@ -264,6 +288,7 @@ da_scala_test_suite( ], deps = [ ":test-utils", + ":trigger-simulation-lib", "//bazel_tools/runfiles:scala_runfiles", "//daml-lf/archive:daml_lf_archive_reader", "//daml-lf/data", diff --git a/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/CatGenerators.scala b/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/CatGenerators.scala new file mode 100644 index 000000000000..18ba86123c8e --- /dev/null +++ b/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/CatGenerators.scala @@ -0,0 +1,227 @@ +// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 + +package com.daml.lf.engine.trigger + +import akka.NotUsed +import akka.stream.scaladsl.Source +import com.daml.ledger.api.v1.command_submission_service.SubmitRequest +import com.daml.ledger.api.v1.commands.Command.{Command => ApiCommand} +import com.daml.ledger.api.v1.completion.Completion +import com.daml.ledger.api.v1.event.{ArchivedEvent, CreatedEvent, Event} +import com.daml.ledger.api.v1.transaction.Transaction +import com.daml.ledger.api.v1.{value => LedgerApi} +import com.daml.lf.crypto +import com.daml.lf.data.{Bytes, Ref} +import com.daml.lf.data.Ref.{Identifier, PackageId, QualifiedName} +import com.daml.lf.speedy.{Command, SValue} +import com.daml.lf.speedy.SValue.{SContractId, SInt64, SParty} +import com.daml.lf.value.Value.ContractId +import com.daml.script.converter.Converter.record +import com.google.rpc.status.Status +import io.grpc.Status.Code +import io.grpc.Status.Code.OK +import org.scalacheck.Gen + +import java.util.UUID +import scala.concurrent.ExecutionContext + +trait CatGenerators { + + protected def packageId: PackageId + + val partyGen: Gen[String] = Gen.const("alice") + + def toContractId(s: String): ContractId = + ContractId.V1.assertBuild(crypto.Hash.hashPrivateKey(s), Bytes.assertFromString("00")) + + def create(template: String, owner: String, i: Long): CreatedEvent = + CreatedEvent( + contractId = toContractId(s"$template:$i").coid, + templateId = Some(LedgerApi.Identifier(packageId, "Cats", template)), + createArguments = Some( + LedgerApi.Record(fields = + Seq( + LedgerApi.RecordField("owner", Some(LedgerApi.Value().withParty(owner))), + template match { + case "TestControl" => + LedgerApi.RecordField("size", Some(LedgerApi.Value().withInt64(i))) + case _ => + LedgerApi.RecordField("isin", Some(LedgerApi.Value().withInt64(i))) + }, + ) + ) + ), + ) + + def archive(template: String, owner: String, i: Long): ArchivedEvent = + ArchivedEvent( + contractId = toContractId(s"$template:$i").coid, + templateId = Some(LedgerApi.Identifier(packageId, "Cats", template)), + witnessParties = Seq(owner), + ) + + def createCat(owner: String, i: Long): CreatedEvent = create("Cat", owner, i) + + def archivedCat(owner: String, i: Long): ArchivedEvent = archive("Cat", owner, i) + + def createFood(owner: String, i: Long): CreatedEvent = create("Food", owner, i) + + def archivedFood(owner: String, i: Long): ArchivedEvent = archive("Food", owner, i) + + def createCommandGen(maxNumOfCats: Long): Gen[Command.Create] = + for { + party <- partyGen + templateId <- Gen.frequency( + 1 -> Identifier(packageId, QualifiedName.assertFromString("Cats:Cat")), + 1 -> Identifier(packageId, QualifiedName.assertFromString("Cats:Food")), + ) + n <- Gen.choose(0, maxNumOfCats) + arguments = record( + templateId, + "owner" -> SParty(Ref.Party.assertFromString(party)), + "isin" -> SInt64(n), + ) + } yield Command.Create(templateId, arguments) + + def exerciseCommandGen(maxNumOfCats: Long): Gen[Command.ExerciseTemplate] = + for { + n <- Gen.choose(0L, maxNumOfCats) + templateId = Identifier(packageId, QualifiedName.assertFromString("Cats:Cat")) + contractId = SContractId(toContractId(s"Cat:$n")) + foodCid = SContractId(toContractId(s"Food:$n")) + choiceId <- Gen.const("Feed") + argument = record(templateId, "foodCid" -> foodCid) + } yield Command.ExerciseTemplate( + templateId, + contractId, + Ref.ChoiceName.assertFromString(choiceId), + argument, + ) + + def commandGen(maxNumOfCats: Long): Gen[Command] = Gen.frequency( + 1 -> createCommandGen(maxNumOfCats), + 9 -> exerciseCommandGen(maxNumOfCats), + ) + + def transactionGen( + owner: String, + numOfCats: Long = 0, + amountOfFood: Long = 0, + catsKilled: Long = 0, + foodEaten: Long = 0, + knownCmdId: String = UUID.randomUUID().toString, + ): Gen[Transaction] = + for { + id <- Gen.numStr + cmdId <- Gen.frequency( + 1 -> Gen.const(""), + 9 -> Gen.frequency( + 1 -> Gen.uuid.map(_.toString), + 9 -> Gen.const(knownCmdId), + ), + ) + cats = (0L to numOfCats) + .map(i => createCat(owner, i)) + .map(event => Event(Event.Event.Created(event))) + deadCats = (0L to catsKilled) + .map(i => archivedCat(owner, i)) + .map(event => Event(Event.Event.Archived(event))) + food = (0L to amountOfFood) + .map(i => createFood(owner, i)) + .map(event => Event(Event.Event.Created(event))) + eatenFood = (0L to foodEaten) + .map(i => archivedFood(owner, i)) + .map(event => Event(Event.Event.Archived(event))) + } yield Transaction( + transactionId = id, + commandId = cmdId, + events = cats ++ food ++ deadCats ++ eatenFood, + ) + + def successfulCompletionGen( + owner: String, + knownCmdId: String = UUID.randomUUID().toString, + ): Gen[Completion] = + for { + id <- Gen.numStr + cmdId <- Gen.frequency( + 1 -> Gen.const(""), + 9 -> Gen.frequency( + 1 -> Gen.uuid.map(_.toString), + 9 -> Gen.const(knownCmdId), + ), + ) + } yield Completion( + commandId = cmdId, + status = Some(Status(OK.value(), "")), + transactionId = id, + actAs = Seq(owner), + ) + + def failingCompletionGen( + owner: String, + knownCmdId: String = UUID.randomUUID().toString, + ): Gen[Completion] = + for { + id <- Gen.numStr + cmdId <- Gen.frequency( + 1 -> Gen.const(""), + 9 -> Gen.frequency( + 1 -> Gen.uuid.map(_.toString), + 9 -> Gen.const(knownCmdId), + ), + ) + code <- Gen.choose(1, Code.values().length) + } yield Completion( + commandId = cmdId, + status = Some(Status(code, "simulated-failure")), + transactionId = id, + actAs = Seq(owner), + ) +} + +trait CatTriggerResourceUsageTestGenerators extends CatGenerators { + + def acsGen(owner: String, size: Long): Seq[CreatedEvent] = + acsGen(owner, size, size) + + def acsGen(owner: String, numOfCats: Long, amountOfFood: Long): Seq[CreatedEvent] = + (0L to numOfCats).map(i => createCat(owner, i)) ++ (0L to amountOfFood).map(i => + createFood(owner, i) + ) + + def monotonicACS(owner: String, sizeGen: Iterator[Long]): Iterator[Seq[CreatedEvent]] = + sizeGen.map(acsGen(owner, _)) + + def triggerIterator(simulator: TriggerRuleSimulationLib, startState: SValue)(implicit + ec: ExecutionContext + ): Source[(Seq[SubmitRequest], TriggerRuleMetrics.RuleMetrics, SValue), NotUsed] = { + Source + .repeat(TriggerMsg.Heartbeat) + .scanAsync[(Seq[SubmitRequest], Option[TriggerRuleMetrics.RuleMetrics], SValue)]( + (Seq.empty, None, startState) + ) { case ((_, _, state), msg) => + for { + (submissions, metrics, nextState) <- simulator.updateStateLambda(state, msg) + } yield (submissions, Some(metrics), nextState) + } + .collect { case (submissions, Some(metrics), state) => (submissions, metrics, state) } + } + + implicit class TriggerRuleSimulationHelpers( + source: Source[(Seq[SubmitRequest], TriggerRuleMetrics.RuleMetrics, SValue), NotUsed] + ) { + def findDuplicateCommandRequests: Source[Set[ApiCommand], NotUsed] = { + source + .scan[(Set[ApiCommand], Set[ApiCommand])]((Set.empty, Set.empty)) { + case ((observations, _), (submissions, _, _)) => + val newObservations = submissions.flatMap(_.getCommands.commands.map(_.command)).toSet + val newDuplicates = newObservations.intersect(observations) + + (observations ++ newObservations, newDuplicates) + } + .map(_._2) + } + } +} diff --git a/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLib.scala b/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLib.scala index 637a39ba61f3..baf38b1791b4 100644 --- a/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLib.scala +++ b/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLib.scala @@ -3,7 +3,6 @@ package com.daml.lf.engine.trigger -import akka.actor.ActorSystem import akka.stream.{FlowShape, KillSwitches, Materializer, RestartSettings, SourceShape} import akka.stream.scaladsl.{Flow, GraphDSL, Keep, RestartSource, Sink, Source} import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party} @@ -26,6 +25,10 @@ import com.daml.scalautil.Statement.discard import com.daml.script.converter.Converter.Implicits._ import com.daml.util.Ctx import org.scalacheck.Gen +import org.scalatest.Assertion +import org.scalatest.Assertions.succeed +import scalaz.Tag +import scalaz.syntax.tag._ import java.util.UUID import scala.collection.mutable @@ -42,8 +45,8 @@ import scala.util.Try ) private class TriggerRuleMetrics { - private[this] var metricCountData = mutable.Map.empty[Set[UUID], mutable.Map[String, Long]] - private[this] var metricTimingData = + private[this] val metricCountData = mutable.Map.empty[Set[UUID], mutable.Map[String, Long]] + private[this] val metricTimingData = mutable.Map.empty[Set[UUID], mutable.Map[String, FiniteDuration]] def addLogEntry(msg: String, context: TriggerLogContext): Unit = { @@ -61,11 +64,6 @@ private class TriggerRuleMetrics { addStepIteratorDelayMean(context) } - def clearMetrics(): Unit = { - metricCountData = mutable.Map.empty - metricTimingData = mutable.Map.empty - } - def getMetrics( retries: Int = 5 )(implicit materializer: Materializer): Future[TriggerRuleMetrics.RuleMetrics] = { @@ -705,7 +703,7 @@ object TriggerRuleMetrics { ) } -final class TriggerRuleSimulationLib private ( +final class TriggerRuleSimulationLib private[trigger] ( compiledPackages: PureCompiledPackages, triggerId: Identifier, triggerConfig: TriggerRunnerConfig, @@ -713,25 +711,27 @@ final class TriggerRuleSimulationLib private ( timeProviderType: TimeProviderType, applicationId: ApplicationId, triggerParties: TriggerParties, -) { - - private[this] val ruleMetrics = new TriggerRuleMetrics - private[this] val logObserver: (String, TriggerLogContext) => Unit = { (msg, context) => - ruleMetrics.addLogEntry(msg, context) - } +)(implicit materializer: Materializer) { private[this] implicit val logger: ContextualizedLogger = ContextualizedLogger.get(classOf[Runner]) - private[this] implicit val materializer: Materializer = Materializer( - ActorSystem("TriggerRuleSimulator") - ) private[this] implicit val executionContext: ExecutionContext = materializer.executionContext private[this] implicit val loggingContext: LoggingContextOf[Trigger] = LoggingContextOf.newLoggingContext(LoggingContextOf.label[Trigger])(identity) - private[this] implicit val triggerContext: TriggerLogContext = - TriggerLogContext.newRootSpanWithCallback("trigger", logObserver)(identity) - private val trigger = Trigger.fromIdentifier(compiledPackages, triggerId).toOption.get + private[trigger] val trigger = Trigger + .fromIdentifier(compiledPackages, triggerId)( + TriggerLogContext.newRootSpan( + "trigger", + "id" -> triggerId.toString, + "applicationId" -> applicationId.unwrap, + "definition" -> triggerId.toString, + "readAs" -> Tag.unsubst(triggerParties.readAs), + "actAs" -> Tag.unwrap(triggerParties.actAs), + )(identity) + ) + .toOption + .get // We only perform rule simulation for recent high level triggers require(trigger.defn.level == Trigger.Level.High) @@ -740,51 +740,66 @@ final class TriggerRuleSimulationLib private ( require(triggerConfig.hardLimit.allowTriggerTimeouts) require(triggerConfig.hardLimit.allowInFlightCommandOverflows) - private[this] val runner = Runner( - compiledPackages, - trigger, - triggerConfig, - client, - timeProviderType, - applicationId, - triggerParties, - ) + class SimulationContext { + private[trigger] val ruleMetrics = new TriggerRuleMetrics + private[this] val logObserver: (String, TriggerLogContext) => Unit = { (msg, context) => + ruleMetrics.addLogEntry(msg, context) + } + + private[trigger] implicit val triggerContext: TriggerLogContext = + TriggerLogContext.newRootSpanWithCallback( + "trigger", + logObserver, + "id" -> triggerId.toString, + "applicationId" -> applicationId.unwrap, + "definition" -> triggerId.toString, + "readAs" -> Tag.unsubst(triggerParties.readAs), + "actAs" -> Tag.unwrap(triggerParties.actAs), + )(identity) + + private[trigger] val runner = Runner( + compiledPackages, + trigger, + triggerConfig, + client, + timeProviderType, + applicationId, + triggerParties, + ) + } @SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) def initialStateLambda( acs: Seq[CreatedEvent] ): Future[(Seq[SubmitRequest], TriggerRuleMetrics.RuleMetrics, SValue)] = { - ruleMetrics.clearMetrics() - - triggerContext.childSpan("simulation") { implicit triggerContext: TriggerLogContext => - triggerContext.childSpan("initialStateLambda") { implicit triggerContext: TriggerLogContext => - def initStateGraph = GraphDSL.createGraph(Sink.last[SValue]) { - implicit gb => saveLastState => - import GraphDSL.Implicits._ - - val clientTime: Timestamp = - Timestamp.assertFromInstant( - Runner.getTimeProvider(RunnerConfig.DefaultTimeProviderType).getCurrentTime - ) - val killSwitch = KillSwitches.shared("init-state-simulation") - val initialState = gb add runner.runInitialState(clientTime, killSwitch)(acs) - val submissions = gb add Flow[TriggerContext[SubmitRequest]] - - initialState.finalState ~> saveLastState - initialState.elemsOut ~> submissions - - new SourceShape(submissions.out) + val context = new SimulationContext + val initStateGraph = context.triggerContext.childSpan("simulation") { + implicit triggerContext: TriggerLogContext => + triggerContext.childSpan("initialStateLambda") { + implicit triggerContext: TriggerLogContext => + GraphDSL.createGraph(Sink.last[SValue]) { implicit gb => saveLastState => + import GraphDSL.Implicits._ + + val clientTime: Timestamp = + Timestamp.assertFromInstant( + Runner.getTimeProvider(RunnerConfig.DefaultTimeProviderType).getCurrentTime + ) + val killSwitch = KillSwitches.shared("init-state-simulation") + val initialState = gb add context.runner.runInitialState(clientTime, killSwitch)(acs) + val submissions = gb add Flow[TriggerContext[SubmitRequest]] + + initialState.finalState ~> saveLastState + initialState.elemsOut ~> submissions + + new SourceShape(submissions.out) + } } + } + val initStateSimulation = Source.fromGraph(initStateGraph) + val (initState, submissions) = initStateSimulation.toMat(Sink.seq)(Keep.both).run() - def initStateSimulation = Source.fromGraph(initStateGraph) - - val submissions = initStateSimulation.runWith(Sink.seq) - val initState = initStateSimulation.toMat(Sink.ignore)(Keep.left).run() - - submissions.map(_.map(_.value)).zip(initState).flatMap { case (requests, state) => - ruleMetrics.getMetrics().map((requests, _, state)) - } - } + submissions.map(_.map(_.value)).zip(initState).flatMap { case (requests, state) => + context.ruleMetrics.getMetrics().map((requests, _, state)) } } @@ -793,22 +808,24 @@ final class TriggerRuleSimulationLib private ( state: SValue, message: TriggerMsg, ): Future[(Seq[SubmitRequest], TriggerRuleMetrics.RuleMetrics, SValue)] = { - ruleMetrics.clearMetrics() - - triggerContext.childSpan("simulation") { implicit triggerContext: TriggerLogContext => - triggerContext.childSpan("updateStateLambda") { implicit triggerContext: TriggerLogContext => - def updateStateGraph = GraphDSL.createGraph(Sink.last[SValue]) { - implicit gb => saveLastState => - import GraphDSL.Implicits._ - - val lambdaKillSwitch = KillSwitches.shared("update-state-simulation") - val msgIn = gb add TriggerContextualFlow[TriggerMsg] - val encodeMsg = - gb add runner.encodeMsgs.map(ctx => ctx.copy(value = SList(FrontStack(ctx.value)))) - val stateOut = gb add Source.single(state) - val rule = gb add runner.runRuleOnMsgs(lambdaKillSwitch) - val killSwitch = gb add lambdaKillSwitch.flow[TriggerContext[SValue]] - val submissions = gb add Flow[TriggerContext[SubmitRequest]] + val context = new SimulationContext + val updateStateGraph = context.triggerContext.childSpan("simulation") { + implicit triggerContext: TriggerLogContext => + triggerContext.childSpan("updateStateLambda") { + implicit triggerContext: TriggerLogContext => + GraphDSL.createGraph(Sink.last[SValue]) { implicit gb => saveLastState => + import GraphDSL.Implicits._ + + val lambdaKillSwitch = KillSwitches.shared("update-state-simulation") + val msgIn = gb add TriggerContextualFlow[TriggerMsg] + val encodeMsg = + gb add context.runner.encodeMsgs.map(ctx => + ctx.copy(value = SList(FrontStack(ctx.value))) + ) + val stateOut = gb add Source.single(state) + val rule = gb add context.runner.runRuleOnMsgs(lambdaKillSwitch) + val killSwitch = gb add lambdaKillSwitch.flow[TriggerContext[SValue]] + val submissions = gb add Flow[TriggerContext[SubmitRequest]] // format: off stateOut ~> rule.initState @@ -817,20 +834,17 @@ final class TriggerRuleSimulationLib private ( rule.finalStates ~> saveLastState // format: on - new FlowShape(msgIn.in, submissions.out) + new FlowShape(msgIn.in, submissions.out) + } } + } + val updateStateSimulation = Source + .single(Ctx(context.triggerContext, message)) + .viaMat(Flow.fromGraph(updateStateGraph))(Keep.right) + val (nextState, submissions) = updateStateSimulation.toMat(Sink.seq)(Keep.both).run() - def updateStateSimulation = Source - .single(Ctx(triggerContext, message)) - .viaMat(Flow.fromGraph(updateStateGraph))(Keep.right) - - val submissions = updateStateSimulation.runWith(Sink.seq) - val nextState = updateStateSimulation.toMat(Sink.ignore)(Keep.left).run() - - submissions.map(_.map(_.value)).zip(nextState).flatMap { case (requests, state) => - ruleMetrics.getMetrics().map((requests, _, state)) - } - } + submissions.map(_.map(_.value)).zip(nextState).flatMap { case (requests, state) => + context.ruleMetrics.getMetrics().map((requests, _, state)) } } } @@ -856,6 +870,37 @@ object TriggerRuleSimulationLib { } yield (acs, userState, inFlightCmds, msg) } + def forAll[T](gen: Gen[T], sampleSize: Int = 100, parallelism: Int = 1)( + test: T => Future[Assertion] + )(implicit materializer: Materializer): Future[Assertion] = { + Source(0 to sampleSize) + .map(_ => gen.sample) + .collect { case Some(data) => data } + .mapAsync(parallelism) { data => + test(data) + } + .takeWhile(_ == succeed, inclusive = true) + .runWith(Sink.last) + } + + def forAll[T](gen: Iterator[T], parallelism: Int)( + test: T => Future[Assertion] + )(implicit materializer: Materializer): Future[Assertion] = { + Source + .fromIterator(() => gen) + .mapAsync(parallelism = parallelism) { data => + test(data) + } + .takeWhile(_ == succeed, inclusive = true) + .runWith(Sink.last) + } + + def forAll[T](gen: Iterator[T])( + test: T => Future[Assertion] + )(implicit materializer: Materializer): Future[Assertion] = { + forAll[T](gen, 1)(test) + } + def getSimulator( client: LedgerClient, name: QualifiedName, @@ -866,7 +911,7 @@ object TriggerRuleSimulationLib { triggerConfig: TriggerRunnerConfig, actAs: String, readAs: Set[String] = Set.empty, - ): (TriggerDefinition, TriggerRuleSimulationLib) = { + )(implicit materializer: Materializer): (TriggerDefinition, TriggerRuleSimulationLib) = { val triggerId = Identifier(packageId, name) val simulator = new TriggerRuleSimulationLib( compiledPackages, diff --git a/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLibTest.scala b/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLibTest.scala index b32298c7ef31..d769f4ba8be7 100644 --- a/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLibTest.scala +++ b/triggers/tests/src/test/scala/com/digitalasset/daml/lf/engine/trigger/TriggerRuleSimulationLibTest.scala @@ -3,18 +3,12 @@ package com.daml.lf.engine.trigger -import akka.stream.scaladsl.{Sink, Source} -import com.daml.ledger.api.refinements.ApiTypes.Party +import com.daml.ledger.api.refinements.ApiTypes.{ApplicationId, Party} import com.daml.ledger.api.testing.utils.SuiteResourceManagementAroundAll -import com.daml.ledger.api.v1.completion.Completion -import com.daml.ledger.api.v1.event.{ArchivedEvent, CreatedEvent, Event} -import com.daml.ledger.api.v1.transaction.Transaction -import com.daml.ledger.api.v1.{value => LedgerApi} +import com.daml.ledger.api.v1.event.CreatedEvent import com.daml.ledger.runner.common.Config import com.daml.ledger.sandbox.SandboxOnXForTest.{ApiServerConfig, ParticipantId, singleParticipant} -import com.daml.lf.crypto -import com.daml.lf.data.{Bytes, Ref} -import com.daml.lf.data.Ref.{Identifier, PackageId, QualifiedName} +import com.daml.lf.data.Ref.QualifiedName import com.daml.lf.engine.trigger.Runner.{ numberOfActiveContracts, numberOfInFlightCommands, @@ -22,21 +16,14 @@ import com.daml.lf.engine.trigger.Runner.{ numberOfPendingContracts, } import com.daml.lf.engine.trigger.test.AbstractTriggerTest -import com.daml.lf.speedy.SValue.{SContractId, SInt64, SParty} -import com.daml.lf.speedy.{Command, SValue} -import com.daml.lf.value.Value.ContractId +import com.daml.lf.speedy.SValue import com.daml.platform.services.time.TimeProviderType -import com.daml.script.converter.Converter.record -import com.google.rpc.status.Status -import io.grpc.Status.Code -import io.grpc.Status.Code.OK import org.scalacheck.Gen -import org.scalatest.{Assertion, Inside, TryValues} +import org.scalatest.{Inside, TryValues} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AsyncWordSpec import java.util.UUID -import scala.concurrent.Future class TriggerRuleSimulationLibTest extends AsyncWordSpec @@ -45,7 +32,7 @@ class TriggerRuleSimulationLibTest with Inside with SuiteResourceManagementAroundAll with TryValues - with CatGenerators { + with TriggerRuleSimulationLibTestGenerators { import TriggerRuleSimulationLib._ @@ -63,20 +50,6 @@ class TriggerRuleSimulationLibTest .copy(allowTriggerTimeouts = true, allowInFlightCommandOverflows = true) ) - def forAll[T](gen: Gen[T], sampleSize: Int = 100, parallelism: Int = 1)( - test: T => Future[Assertion] - ): Future[Assertion] = { - // TODO: ????: use results (e.g. submissions and ACS/inflight changes) of simulator runs to infer additional events - Source(0 to sampleSize) - .map(_ => gen.sample) - .collect { case Some(data) => data } - .mapAsync(parallelism) { data => - test(data) - } - .takeWhile(_ == succeed, inclusive = true) - .runWith(Sink.last) - } - "Trigger rule simulation" should { "correctly log metrics for initState lambda" in { for { @@ -86,7 +59,7 @@ class TriggerRuleSimulationLibTest client, QualifiedName.assertFromString("Cats:feedingTrigger"), packageId, - applicationId, + ApplicationId("metric-logging-init-state"), compiledPackages, config.participants(ParticipantId).apiServer.timeProviderType, triggerRunnerConfiguration, @@ -135,7 +108,7 @@ class TriggerRuleSimulationLibTest client, QualifiedName.assertFromString("Cats:feedingTrigger"), packageId, - applicationId, + ApplicationId("metric-logging-update-state"), compiledPackages, config.participants(ParticipantId).apiServer.timeProviderType, triggerRunnerConfiguration, @@ -276,163 +249,12 @@ class TriggerRuleSimulationLibTest } } -trait CatGenerators { - import TriggerRuleSimulationLib.{CommandsInFlight, TriggerExperiment} - - protected def packageId: PackageId - - def toContractId(s: String): ContractId = - ContractId.V1.assertBuild(crypto.Hash.hashPrivateKey(s), Bytes.assertFromString("00")) - - def create(template: String, owner: String, i: Long): CreatedEvent = - CreatedEvent( - contractId = toContractId(s"$template:$i").coid, - templateId = Some(LedgerApi.Identifier(packageId, "Cats", template)), - createArguments = Some( - LedgerApi.Record(fields = - Seq( - LedgerApi.RecordField("owner", Some(LedgerApi.Value().withParty(owner))), - template match { - case "TestControl" => - LedgerApi.RecordField("size", Some(LedgerApi.Value().withInt64(i))) - case _ => - LedgerApi.RecordField("isin", Some(LedgerApi.Value().withInt64(i))) - }, - ) - ) - ), - ) - - def archive(template: String, owner: String, i: Long): ArchivedEvent = - ArchivedEvent( - contractId = toContractId(s"$template:$i").coid, - templateId = Some(LedgerApi.Identifier(packageId, "Cats", template)), - witnessParties = Seq(owner), - ) - - def createCat(owner: String, i: Long): CreatedEvent = create("Cat", owner, i) - - def archivedCat(owner: String, i: Long): ArchivedEvent = archive("Cat", owner, i) - - def createFood(owner: String, i: Long): CreatedEvent = create("Food", owner, i) - - def archivedFood(owner: String, i: Long): ArchivedEvent = archive("Food", owner, i) - - def createCommandGen: Gen[Command.Create] = - for { - party <- partyGen - templateId <- Gen.frequency( - 1 -> Identifier(packageId, QualifiedName.assertFromString("Cats:Cat")), - 1 -> Identifier(packageId, QualifiedName.assertFromString("Cats:Food")), - ) - n <- Gen.choose(0, maxNumOfCats) - arguments = record( - templateId, - "owner" -> SParty(Ref.Party.assertFromString(party)), - "isin" -> SInt64(n), - ) - } yield Command.Create(templateId, arguments) - - def exerciseCommandGen: Gen[Command.ExerciseTemplate] = - for { - n <- Gen.choose(0L, maxNumOfCats) - templateId = Identifier(packageId, QualifiedName.assertFromString("Cats:Cat")) - contractId = SContractId(toContractId(s"Cat:$n")) - foodCid = SContractId(toContractId(s"Food:$n")) - choiceId <- Gen.const("Feed") - argument = record(templateId, "foodCid" -> foodCid) - } yield Command.ExerciseTemplate( - templateId, - contractId, - Ref.ChoiceName.assertFromString(choiceId), - argument, - ) - - def commandGen: Gen[Command] = Gen.frequency( - 1 -> createCommandGen, - 9 -> exerciseCommandGen, - ) - - def transactionGen( - owner: String, - numOfCats: Long = 0, - amountOfFood: Long = 0, - catsKilled: Long = 0, - foodEaten: Long = 0, - knownCmdId: String = UUID.randomUUID().toString, - ): Gen[Transaction] = - for { - id <- Gen.numStr - cmdId <- Gen.frequency( - 1 -> Gen.const(""), - 9 -> Gen.frequency( - 1 -> Gen.uuid.map(_.toString), - 9 -> Gen.const(knownCmdId), - ), - ) - cats = (0L to numOfCats) - .map(i => createCat(owner, i)) - .map(event => Event(Event.Event.Created(event))) - deadCats = (0L to catsKilled) - .map(i => archivedCat(owner, i)) - .map(event => Event(Event.Event.Archived(event))) - food = (0L to amountOfFood) - .map(i => createFood(owner, i)) - .map(event => Event(Event.Event.Created(event))) - eatenFood = (0L to foodEaten) - .map(i => archivedFood(owner, i)) - .map(event => Event(Event.Event.Archived(event))) - } yield Transaction( - transactionId = id, - commandId = cmdId, - events = cats ++ food ++ deadCats ++ eatenFood, - ) - - def successfulCompletionGen( - owner: String, - knownCmdId: String = UUID.randomUUID().toString, - ): Gen[Completion] = - for { - id <- Gen.numStr - cmdId <- Gen.frequency( - 1 -> Gen.const(""), - 9 -> Gen.frequency( - 1 -> Gen.uuid.map(_.toString), - 9 -> Gen.const(knownCmdId), - ), - ) - } yield Completion( - commandId = cmdId, - status = Some(Status(OK.value(), "")), - transactionId = id, - actAs = Seq(owner), - ) +trait TriggerRuleSimulationLibTestGenerators extends CatGenerators { - def failingCompletionGen( - owner: String, - knownCmdId: String = UUID.randomUUID().toString, - ): Gen[Completion] = - for { - id <- Gen.numStr - cmdId <- Gen.frequency( - 1 -> Gen.const(""), - 9 -> Gen.frequency( - 1 -> Gen.uuid.map(_.toString), - 9 -> Gen.const(knownCmdId), - ), - ) - code <- Gen.choose(1, Code.values().length) - } yield Completion( - commandId = cmdId, - status = Some(Status(code, "simulated-failure")), - transactionId = id, - actAs = Seq(owner), - ) + import TriggerRuleSimulationLib.{CommandsInFlight, TriggerExperiment} private val maxNumOfCats = 10L - private val partyGen: Gen[String] = Gen.const("alice") - private val userStateGen: Gen[SValue] = Gen.choose(0L, maxNumOfCats).map(SValue.SInt64) private val basicTesting = TriggerExperiment( @@ -470,7 +292,7 @@ trait CatGenerators { private val transactionInFlightTesting = { val cmdId = UUID.randomUUID().toString val inFlightCmdGen = for { - cmds <- Gen.listOfN(maxNumOfCats.toInt, commandGen) + cmds <- Gen.listOfN(maxNumOfCats.toInt, commandGen(maxNumOfCats)) } yield (cmdId, cmds) TriggerExperiment( @@ -517,7 +339,7 @@ trait CatGenerators { private val completionInFlightTesting = { val cmdId = UUID.randomUUID().toString val inFlightCmdGen = for { - cmds <- Gen.listOfN(maxNumOfCats.toInt, commandGen) + cmds <- Gen.listOfN(maxNumOfCats.toInt, commandGen(maxNumOfCats)) } yield (cmdId, cmds) TriggerExperiment(