diff --git a/README.md b/README.md index aa76018..8371eaa 100644 --- a/README.md +++ b/README.md @@ -15,3 +15,11 @@ The setup as written by [Leonard Ehrenfried](https://leonard.io/blog/2017/01/an- Release command = sbt +publishSigned This will release for 2.12. Via oss.sonatype.org you need to 'Close' the uploaded packages and thereafter 'Release' them. + +# Release notes + +# 0.1.1 +Improvements for TestableAggregateRoot. When a command cannot be handled a CommandHandlingException is thrown. +When a command should not be handled by this aggregate root an IllegalCommandException is thrown + + diff --git a/bounded-test/src/main/scala/io/cafienne/bounded/test/TestableAggregateRoot.scala b/bounded-test/src/main/scala/io/cafienne/bounded/test/TestableAggregateRoot.scala index b25badd..6cfc94b 100644 --- a/bounded-test/src/main/scala/io/cafienne/bounded/test/TestableAggregateRoot.scala +++ b/bounded-test/src/main/scala/io/cafienne/bounded/test/TestableAggregateRoot.scala @@ -6,8 +6,9 @@ package io.cafienne.bounded.test import java.util.concurrent.atomic.AtomicInteger +import io.cafienne.bounded.test.TestableAggregateRoot.{CommandHandlingException, IllegalCommandException} + import scala.reflect.ClassTag -//import scala.reflect.runtime.universe import akka.actor._ import akka.pattern.ask import akka.testkit.TestProbe @@ -38,6 +39,9 @@ import scala.concurrent.duration.Duration */ object TestableAggregateRoot { + case class CommandHandlingException(msg: String) extends Exception(msg) + case class IllegalCommandException(msg: String) extends Exception(msg) + /** Construct a test for a specific aggregate root that has a specific initial state * * @param id The aggregate root ID that is used for testing @@ -90,37 +94,27 @@ class TestableAggregateRoot[A <: AggregateRootActor[B], B <: AggregateState[B]: ctag: reflect.ClassTag[A] ) { - import TestableAggregateRoot.testId - final val arTestId = testId(id) - - private val storeEventsActor = - system.actorOf(Props(classOf[CreateEventsInStoreActor], arTestId), "create-events-actor") + implicit val duration: Duration = timeout.duration private var handledEvents: List[DomainEvent] = List.empty - implicit val duration: Duration = timeout.duration - - private val testProbe = TestProbe() - testProbe watch storeEventsActor - - evt foreach { event => - testProbe.send(storeEventsActor, event) - testProbe.expectMsgAllConformingOf(classOf[DomainEvent]) - } - storeEventsActor ! PoisonPill - testProbe.expectTerminated(storeEventsActor) - - private var aggregateRootActor: Option[ActorRef] = None - -// private def aggregateRootCreator(): AggregateRootCreator = { -// val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader) -// val module = runtimeMirror.staticModule(ctag.runtimeClass.getCanonicalName) -// val obj = runtimeMirror.reflectModule(module) -// obj.instance.asInstanceOf[AggregateRootCreator] -// } + import TestableAggregateRoot.testId + final val arTestId = testId(id) - private def createActor(id: AggregateRootId) = { - handledEvents = List.empty - system.actorOf(creator.props(arTestId), s"test-aggregate-$arTestId") + if (evt != null && evt.nonEmpty) storeEvents(evt) + // Start the Aggregate Root and replay to initial state + private val aggregateRootActor: ActorRef = system.actorOf(creator.props(arTestId), s"test-aggregate-$arTestId") + + private def storeEvents(evt: Seq[DomainEvent]): Unit = { + val storeEventsActor = system.actorOf(Props(classOf[CreateEventsInStoreActor], arTestId), "create-events-actor") + val testProbe = TestProbe() + + testProbe watch storeEventsActor + evt foreach { event => + testProbe.send(storeEventsActor, event) + testProbe.expectMsgAllConformingOf(classOf[DomainEvent]) + } + storeEventsActor ! PoisonPill + testProbe.expectTerminated(storeEventsActor) } /** @@ -131,21 +125,23 @@ class TestableAggregateRoot[A <: AggregateRootActor[B], B <: AggregateState[B]: * @return This initialized TestableAggregateRoot that processed the command. */ def when(command: DomainCommand): TestableAggregateRoot[A, B] = { - if (command.aggregateRootId != id) - throw new IllegalArgumentException( + if (command.aggregateRootId != id) { + throw IllegalCommandException( s"Command for Aggregate Root ${command.aggregateRootId} cannot be handled by this aggregate root with id $id" ) - aggregateRootActor = aggregateRootActor.fold(Some(createActor(arTestId)))(r => Some(r)) - val aggregateRootProbe = TestProbe() - aggregateRootProbe watch aggregateRootActor.get + } - aggregateRootProbe.send(aggregateRootActor.get, command) + val aggregateRootProbe = TestProbe() + aggregateRootProbe watch aggregateRootActor + aggregateRootProbe.send(aggregateRootActor, command) val events = aggregateRootProbe - .expectMsgPF[Seq[DomainEvent]](duration, "reply with events") { - case Ok(events) => events + .expectMsgPF[Any](duration, "reply with events") { + case Ko(x) => throw CommandHandlingException(s"Command Handling failed with Ko $x") + case Ok(events: Seq[DomainEvent]) if events.isInstanceOf[Seq[DomainEvent]] => events.toList } + .asInstanceOf[List[DomainEvent]] handledEvents ++= events this @@ -158,10 +154,7 @@ class TestableAggregateRoot[A <: AggregateRootActor[B], B <: AggregateState[B]: * * @return Future with the AggregateState as defined for this Aggregate Root. */ - def currentState: Future[Option[B]] = - aggregateRootActor.fold(Future.failed[Option[B]](new IllegalStateException("")))( - actor => (actor ? GetState).mapTo[Option[B]] - ) + def currentState: Future[Option[B]] = (aggregateRootActor ? GetState).mapTo[Option[B]] /** * Give the events that are created by the command that was given to the aggregate root by when. diff --git a/bounded-test/src/test/scala/io/cafienne/bounded/test/SpecConfig.scala b/bounded-test/src/test/scala/io/cafienne/bounded/test/SpecConfig.scala new file mode 100644 index 0000000..ed6e151 --- /dev/null +++ b/bounded-test/src/test/scala/io/cafienne/bounded/test/SpecConfig.scala @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2016-2018 Cafienne B.V. + */ + +package io.cafienne.bounded.test + +import com.typesafe.config.ConfigFactory + +object SpecConfig { + + /* + PLEASE NOTE: + Currently the https://github.com/dnvriend/akka-persistence-inmemory is NOT working for Aggregate Root tests + because it is not possible to use a separate instance writing the events that should be in the event store + before you actually create the aggregate root (should replay those stored events) to check execution of a new + command. + A new configuration that uses the akka bundled inmem storage is added to create a working situation. + */ + val testConfig = ConfigFactory.parseString( + """ + | akka { + | loglevel = "DEBUG" + | stdout-loglevel = "DEBUG" + | loggers = ["akka.testkit.TestEventListener"] + | actor { + | default-dispatcher { + | executor = "fork-join-executor" + | fork-join-executor { + | parallelism-min = 8 + | parallelism-factor = 2.0 + | parallelism-max = 8 + | } + | } + | serialize-creators = off + | serialize-messages = off + | } + | persistence { + | publish-confirmations = on + | publish-plugin-commands = on + | journal { + | plugin = "inmemory-journal" + | } + | snapshot-store.plugin = "inmemory-snapshot-store" + | } + | test { + | single-expect-default = 10s + | timefactor = 1 + | } + | } + | inmemory-read-journal { + | refresh-interval = "10ms" + | max-buffer-size = "1000" + | } + | + | bounded.eventmaterializers.publish = true + | + | bounded.eventmaterializers.offsetstore { + | type = "inmemory" + | } + """.stripMargin + ) + +} diff --git a/bounded-test/src/test/scala/io/cafienne/bounded/test/TestAggregateRoot.scala b/bounded-test/src/test/scala/io/cafienne/bounded/test/TestAggregateRoot.scala new file mode 100644 index 0000000..868f1c8 --- /dev/null +++ b/bounded-test/src/test/scala/io/cafienne/bounded/test/TestAggregateRoot.scala @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2016-2018 Cafienne B.V. + */ + +package io.cafienne.bounded.test + +import akka.actor.{ActorSystem, Props} +import io.cafienne.bounded.{BuildInfo, RuntimeInfo} +import io.cafienne.bounded.aggregate._ +import io.cafienne.bounded.test.DomainProtocol.StateUpdated +import io.cafienne.bounded.test.TestAggregateRoot.TestAggregateRootState + +import scala.collection.immutable.Seq + +object DomainProtocol { + case class TestAggregateRootId(id: String) extends AggregateRootId { + override def idAsString: String = id + } + + case class CreateInitialState(metaData: CommandMetaData, aggregateRootId: AggregateRootId, state: String) + extends DomainCommand + case class InitialStateCreated(metaData: MetaData, id: AggregateRootId, state: String) extends DomainEvent + + case class UpdateState(metaData: CommandMetaData, aggregateRootId: AggregateRootId, state: String) + extends DomainCommand + case class StateUpdated(metaData: MetaData, id: AggregateRootId, state: String) extends DomainEvent + + case class InvalidCommand(msg: String) extends HandlingFailure + case class InvalidState(msg: String) extends HandlingFailure +} + +class TestAggregateRoot(aggregateRootId: AggregateRootId, buildInfo: BuildInfo, runtimeInfo: RuntimeInfo) + extends AggregateRootActor[TestAggregateRootState] { + import DomainProtocol._ + + implicit val bi = buildInfo + implicit val ri = runtimeInfo + + override def aggregateId: AggregateRootId = aggregateRootId + + override def handleCommand(command: DomainCommand, aggregateState: Option[TestAggregateRootState]): Reply = { + command match { + case CreateInitialState(metaData, aggregateRootId, state) => + Ok(Seq[DomainEvent](InitialStateCreated(MetaData.fromCommand(metaData), aggregateRootId, state))) + case UpdateState(metaData, aggregateRootId, state) => + if (aggregateState.isDefined && aggregateState.get.state.equals("new")) { + Ok(Seq(StateUpdated(MetaData.fromCommand(metaData), aggregateRootId, state))) + } else { + Ko(InvalidState(s"The current state $aggregateState does not allow an update to $state")) + } + case other => Ko(new UnexpectedCommand(other)) + } + } + + override def newState(evt: DomainEvent): Option[TestAggregateRootState] = { + evt match { + case InitialStateCreated(metaData, id, state) => Some(TestAggregateRootState(state)) + case _ => + log.error("Event {} is not valid to create a new TestAggregateRootState") + throw new IllegalArgumentException(s"Event $evt is not valid to create a new TestAggregateRootState") + } + } +} + +object TestAggregateRoot { + + case class TestAggregateRootState(state: String) extends AggregateState[TestAggregateRootState] { + override def update(event: DomainEvent): Option[TestAggregateRootState] = { + event match { + case evt: StateUpdated => + Some(this.copy(state = evt.state)) + case other => throw new IllegalArgumentException(s"Cannot update state based on event $other") + } + } + } + + val aggregateRootTag = "ar-test" +} + +class TestAggregateRootCreator(system: ActorSystem)(implicit buildInfo: BuildInfo, runtimeInfo: RuntimeInfo) + extends AggregateRootCreator { + + override def props(aggregateRootId: AggregateRootId): Props = { + system.log.debug("Returning new Props for {}", aggregateRootId) + Props(classOf[TestAggregateRoot], aggregateRootId, buildInfo, runtimeInfo) + } + +} diff --git a/bounded-test/src/test/scala/io/cafienne/bounded/test/TestableAggregateRootSpec.scala b/bounded-test/src/test/scala/io/cafienne/bounded/test/TestableAggregateRootSpec.scala new file mode 100644 index 0000000..d9c164c --- /dev/null +++ b/bounded-test/src/test/scala/io/cafienne/bounded/test/TestableAggregateRootSpec.scala @@ -0,0 +1,127 @@ +/* + * Copyright (C) 2016-2018 Cafienne B.V. + */ + +package io.cafienne.bounded.test + +import java.time.ZonedDateTime + +import akka.actor.ActorSystem +import akka.event.{Logging, LoggingAdapter} +import akka.testkit.TestKit +import akka.util.Timeout +import io.cafienne.bounded.{BuildInfo, RuntimeInfo} +import io.cafienne.bounded.aggregate.{CommandMetaData, MetaData} +import io.cafienne.bounded.test.TestableAggregateRoot.{CommandHandlingException, IllegalCommandException} +import io.cafienne.bounded.test.DomainProtocol._ +import io.cafienne.bounded.test.TestAggregateRoot.TestAggregateRootState +import org.scalatest.{AsyncWordSpec, BeforeAndAfterAll, Matchers} +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +class TestableAggregateRootSpec extends AsyncWordSpec with Matchers with ScalaFutures with BeforeAndAfterAll { + + //Setup required supporting classes + implicit val timeout = Timeout(10.seconds) + implicit val system = ActorSystem("TestSystem", SpecConfig.testConfig) + implicit val logger: LoggingAdapter = Logging(system, getClass) + implicit val buildInfo = BuildInfo("spec", "1.0") + implicit val runtimeInfo = RuntimeInfo("current") + + val testAggregateRootCreator = new TestAggregateRootCreator(system) + + val currentMeta = MetaData(ZonedDateTime.parse("2018-01-01T17:43:00+01:00"), None, None, buildInfo, runtimeInfo) + + "The testable aggregate root" must { + + "create without state in given" in { + val testAggregateRootId1 = TestAggregateRootId("1") + + val ar = TestableAggregateRoot + .given[TestAggregateRoot, TestAggregateRootState](testAggregateRootCreator, testAggregateRootId1) + + ar.currentState map { state => + assert(state.isEmpty) + } + } + + "create initial state in given" in { + val testAggregateRootId1 = TestAggregateRootId("2") + val targetState = TestAggregateRootState("new") + + val ar = TestableAggregateRoot + .given[TestAggregateRoot, TestAggregateRootState]( + testAggregateRootCreator, + testAggregateRootId1, + InitialStateCreated(currentMeta, testAggregateRootId1, "new") + ) + + ar.currentState map { state => + assert(state.isDefined, s"There is no defined state but expected $targetState") + assert(state.get == targetState) + } + } + + "handle the command to OK in when" in { + val testAggregateRootId1 = TestAggregateRootId("3") + val commandMetaData = CommandMetaData(currentMeta.timestamp, None) + val updateStateCommand = UpdateState(commandMetaData, testAggregateRootId1, "updated") + val targetState = TestAggregateRootState("updated") + + val ar = TestableAggregateRoot + .given[TestAggregateRoot, TestAggregateRootState]( + testAggregateRootCreator, + testAggregateRootId1, + InitialStateCreated(currentMeta, testAggregateRootId1, "new") + ) + .when(updateStateCommand) + + ar.events should contain(StateUpdated(MetaData.fromCommand(commandMetaData), testAggregateRootId1, "updated")) + + ar.currentState map { state => + assert(state.isDefined, s"There is no defined state but expected $targetState") + assert(state.get == targetState) + } + } + + "handle the CommandHandlingException to KO in when" in { + val testAggregateRootId1 = TestAggregateRootId("3") + val commandMetaData = CommandMetaData(currentMeta.timestamp, None) + val updateStateCommand = UpdateState(commandMetaData, testAggregateRootId1, "updated") + + an[CommandHandlingException] should be thrownBy { + TestableAggregateRoot + .given[TestAggregateRoot, TestAggregateRootState]( + testAggregateRootCreator, + testAggregateRootId1, + InitialStateCreated(currentMeta, testAggregateRootId1, "wronginitial") + ) + .when(updateStateCommand) + } + } + + "handle the IllegalCommandException to KO in when" in { + val testAggregateRootId1 = TestAggregateRootId("4") + val testAggregateRootIdWrongForCommand = TestAggregateRootId("5") + val commandMetaData = CommandMetaData(currentMeta.timestamp, None) + val updateStateCommand = UpdateState(commandMetaData, testAggregateRootIdWrongForCommand, "updated") + + an[IllegalCommandException] should be thrownBy { + TestableAggregateRoot + .given[TestAggregateRoot, TestAggregateRootState]( + testAggregateRootCreator, + testAggregateRootId1, + InitialStateCreated(currentMeta, testAggregateRootId1, "new") + ) + .when(updateStateCommand) + } + } + + } + + override protected def afterAll(): Unit = { + TestKit.shutdownActorSystem(system, 30.seconds, verifySystemShutdown = true) + } + +} diff --git a/build.sbt b/build.sbt index da2a6d3..4697a05 100644 --- a/build.sbt +++ b/build.sbt @@ -1,7 +1,7 @@ lazy val basicSettings = { - val currentScalaVersion = "2.12.3" - val scala211Version = "2.11.11" + val currentScalaVersion = "2.12.6" + val scala211Version = "2.11.12" Seq( organization := "io.cafienne.bounded", diff --git a/project/build.properties b/project/build.properties index 31334bb..f59579f 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.1.1 +sbt.version=1.2.0 diff --git a/project/plugins.sbt b/project/plugins.sbt index f76464e..7858eb8 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,5 +1,5 @@ -addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.6") -addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.4.0") +addSbtPlugin("com.github.gseitz" % "sbt-release" % "1.0.9") +addSbtPlugin("com.geirsson" % "sbt-scalafmt" % "1.5.1") addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.5.1") addSbtPlugin("org.scalastyle" % "scalastyle-sbt-plugin" % "1.0.0") addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "2.0") diff --git a/version.sbt b/version.sbt index 49e6b96..13818e9 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.1.0" +version in ThisBuild := "0.1.1"