Skip to content

Commit

Permalink
Changed TestableAggregateRoot to be explicit on commandhandling failu…
Browse files Browse the repository at this point in the history
…res (#44)

Changed TestableAggregateRoot to be explicit on commandhandling failures
Ensured TestableAggregateRoot works properly based on a starting 'given' (the actor was created in when, that is now part of given that is actually class constructor work)
Added tests to cover the functionality of TestableAggregateRoot
  • Loading branch information
olger committed Aug 31, 2018
1 parent 1a75a83 commit 93d3543
Show file tree
Hide file tree
Showing 9 changed files with 326 additions and 47 deletions.
8 changes: 8 additions & 0 deletions README.md
Expand Up @@ -15,3 +15,11 @@ The setup as written by [Leonard Ehrenfried](https://leonard.io/blog/2017/01/an-

Release command = sbt +publishSigned
This will release for 2.12. Via oss.sonatype.org you need to 'Close' the uploaded packages and thereafter 'Release' them.

# Release notes

# 0.1.1
Improvements for TestableAggregateRoot. When a command cannot be handled a CommandHandlingException is thrown.
When a command should not be handled by this aggregate root an IllegalCommandException is thrown


Expand Up @@ -6,8 +6,9 @@ package io.cafienne.bounded.test

import java.util.concurrent.atomic.AtomicInteger

import io.cafienne.bounded.test.TestableAggregateRoot.{CommandHandlingException, IllegalCommandException}

import scala.reflect.ClassTag
//import scala.reflect.runtime.universe
import akka.actor._
import akka.pattern.ask
import akka.testkit.TestProbe
Expand Down Expand Up @@ -38,6 +39,9 @@ import scala.concurrent.duration.Duration
*/
object TestableAggregateRoot {

case class CommandHandlingException(msg: String) extends Exception(msg)
case class IllegalCommandException(msg: String) extends Exception(msg)

/** Construct a test for a specific aggregate root that has a specific initial state
*
* @param id The aggregate root ID that is used for testing
Expand Down Expand Up @@ -90,37 +94,27 @@ class TestableAggregateRoot[A <: AggregateRootActor[B], B <: AggregateState[B]:
ctag: reflect.ClassTag[A]
) {

import TestableAggregateRoot.testId
final val arTestId = testId(id)

private val storeEventsActor =
system.actorOf(Props(classOf[CreateEventsInStoreActor], arTestId), "create-events-actor")
implicit val duration: Duration = timeout.duration
private var handledEvents: List[DomainEvent] = List.empty

implicit val duration: Duration = timeout.duration

private val testProbe = TestProbe()
testProbe watch storeEventsActor

evt foreach { event =>
testProbe.send(storeEventsActor, event)
testProbe.expectMsgAllConformingOf(classOf[DomainEvent])
}
storeEventsActor ! PoisonPill
testProbe.expectTerminated(storeEventsActor)

private var aggregateRootActor: Option[ActorRef] = None

// private def aggregateRootCreator(): AggregateRootCreator = {
// val runtimeMirror = universe.runtimeMirror(getClass.getClassLoader)
// val module = runtimeMirror.staticModule(ctag.runtimeClass.getCanonicalName)
// val obj = runtimeMirror.reflectModule(module)
// obj.instance.asInstanceOf[AggregateRootCreator]
// }
import TestableAggregateRoot.testId
final val arTestId = testId(id)

private def createActor(id: AggregateRootId) = {
handledEvents = List.empty
system.actorOf(creator.props(arTestId), s"test-aggregate-$arTestId")
if (evt != null && evt.nonEmpty) storeEvents(evt)
// Start the Aggregate Root and replay to initial state
private val aggregateRootActor: ActorRef = system.actorOf(creator.props(arTestId), s"test-aggregate-$arTestId")

private def storeEvents(evt: Seq[DomainEvent]): Unit = {
val storeEventsActor = system.actorOf(Props(classOf[CreateEventsInStoreActor], arTestId), "create-events-actor")
val testProbe = TestProbe()

testProbe watch storeEventsActor
evt foreach { event =>
testProbe.send(storeEventsActor, event)
testProbe.expectMsgAllConformingOf(classOf[DomainEvent])
}
storeEventsActor ! PoisonPill
testProbe.expectTerminated(storeEventsActor)
}

/**
Expand All @@ -131,21 +125,23 @@ class TestableAggregateRoot[A <: AggregateRootActor[B], B <: AggregateState[B]:
* @return This initialized TestableAggregateRoot that processed the command.
*/
def when(command: DomainCommand): TestableAggregateRoot[A, B] = {
if (command.aggregateRootId != id)
throw new IllegalArgumentException(
if (command.aggregateRootId != id) {
throw IllegalCommandException(
s"Command for Aggregate Root ${command.aggregateRootId} cannot be handled by this aggregate root with id $id"
)
aggregateRootActor = aggregateRootActor.fold(Some(createActor(arTestId)))(r => Some(r))
val aggregateRootProbe = TestProbe()
aggregateRootProbe watch aggregateRootActor.get
}

aggregateRootProbe.send(aggregateRootActor.get, command)
val aggregateRootProbe = TestProbe()
aggregateRootProbe watch aggregateRootActor
aggregateRootProbe.send(aggregateRootActor, command)

val events =
aggregateRootProbe
.expectMsgPF[Seq[DomainEvent]](duration, "reply with events") {
case Ok(events) => events
.expectMsgPF[Any](duration, "reply with events") {
case Ko(x) => throw CommandHandlingException(s"Command Handling failed with Ko $x")
case Ok(events: Seq[DomainEvent]) if events.isInstanceOf[Seq[DomainEvent]] => events.toList
}
.asInstanceOf[List[DomainEvent]]

handledEvents ++= events
this
Expand All @@ -158,10 +154,7 @@ class TestableAggregateRoot[A <: AggregateRootActor[B], B <: AggregateState[B]:
*
* @return Future with the AggregateState as defined for this Aggregate Root.
*/
def currentState: Future[Option[B]] =
aggregateRootActor.fold(Future.failed[Option[B]](new IllegalStateException("")))(
actor => (actor ? GetState).mapTo[Option[B]]
)
def currentState: Future[Option[B]] = (aggregateRootActor ? GetState).mapTo[Option[B]]

/**
* Give the events that are created by the command that was given to the aggregate root by when.
Expand Down
@@ -0,0 +1,63 @@
/*
* Copyright (C) 2016-2018 Cafienne B.V. <https://www.cafienne.io/bounded>
*/

package io.cafienne.bounded.test

import com.typesafe.config.ConfigFactory

object SpecConfig {

/*
PLEASE NOTE:
Currently the https://github.com/dnvriend/akka-persistence-inmemory is NOT working for Aggregate Root tests
because it is not possible to use a separate instance writing the events that should be in the event store
before you actually create the aggregate root (should replay those stored events) to check execution of a new
command.
A new configuration that uses the akka bundled inmem storage is added to create a working situation.
*/
val testConfig = ConfigFactory.parseString(
"""
| akka {
| loglevel = "DEBUG"
| stdout-loglevel = "DEBUG"
| loggers = ["akka.testkit.TestEventListener"]
| actor {
| default-dispatcher {
| executor = "fork-join-executor"
| fork-join-executor {
| parallelism-min = 8
| parallelism-factor = 2.0
| parallelism-max = 8
| }
| }
| serialize-creators = off
| serialize-messages = off
| }
| persistence {
| publish-confirmations = on
| publish-plugin-commands = on
| journal {
| plugin = "inmemory-journal"
| }
| snapshot-store.plugin = "inmemory-snapshot-store"
| }
| test {
| single-expect-default = 10s
| timefactor = 1
| }
| }
| inmemory-read-journal {
| refresh-interval = "10ms"
| max-buffer-size = "1000"
| }
|
| bounded.eventmaterializers.publish = true
|
| bounded.eventmaterializers.offsetstore {
| type = "inmemory"
| }
""".stripMargin
)

}
@@ -0,0 +1,88 @@
/*
* Copyright (C) 2016-2018 Cafienne B.V. <https://www.cafienne.io/bounded>
*/

package io.cafienne.bounded.test

import akka.actor.{ActorSystem, Props}
import io.cafienne.bounded.{BuildInfo, RuntimeInfo}
import io.cafienne.bounded.aggregate._
import io.cafienne.bounded.test.DomainProtocol.StateUpdated
import io.cafienne.bounded.test.TestAggregateRoot.TestAggregateRootState

import scala.collection.immutable.Seq

object DomainProtocol {
case class TestAggregateRootId(id: String) extends AggregateRootId {
override def idAsString: String = id
}

case class CreateInitialState(metaData: CommandMetaData, aggregateRootId: AggregateRootId, state: String)
extends DomainCommand
case class InitialStateCreated(metaData: MetaData, id: AggregateRootId, state: String) extends DomainEvent

case class UpdateState(metaData: CommandMetaData, aggregateRootId: AggregateRootId, state: String)
extends DomainCommand
case class StateUpdated(metaData: MetaData, id: AggregateRootId, state: String) extends DomainEvent

case class InvalidCommand(msg: String) extends HandlingFailure
case class InvalidState(msg: String) extends HandlingFailure
}

class TestAggregateRoot(aggregateRootId: AggregateRootId, buildInfo: BuildInfo, runtimeInfo: RuntimeInfo)
extends AggregateRootActor[TestAggregateRootState] {
import DomainProtocol._

implicit val bi = buildInfo
implicit val ri = runtimeInfo

override def aggregateId: AggregateRootId = aggregateRootId

override def handleCommand(command: DomainCommand, aggregateState: Option[TestAggregateRootState]): Reply = {
command match {
case CreateInitialState(metaData, aggregateRootId, state) =>
Ok(Seq[DomainEvent](InitialStateCreated(MetaData.fromCommand(metaData), aggregateRootId, state)))
case UpdateState(metaData, aggregateRootId, state) =>
if (aggregateState.isDefined && aggregateState.get.state.equals("new")) {
Ok(Seq(StateUpdated(MetaData.fromCommand(metaData), aggregateRootId, state)))
} else {
Ko(InvalidState(s"The current state $aggregateState does not allow an update to $state"))
}
case other => Ko(new UnexpectedCommand(other))
}
}

override def newState(evt: DomainEvent): Option[TestAggregateRootState] = {
evt match {
case InitialStateCreated(metaData, id, state) => Some(TestAggregateRootState(state))
case _ =>
log.error("Event {} is not valid to create a new TestAggregateRootState")
throw new IllegalArgumentException(s"Event $evt is not valid to create a new TestAggregateRootState")
}
}
}

object TestAggregateRoot {

case class TestAggregateRootState(state: String) extends AggregateState[TestAggregateRootState] {
override def update(event: DomainEvent): Option[TestAggregateRootState] = {
event match {
case evt: StateUpdated =>
Some(this.copy(state = evt.state))
case other => throw new IllegalArgumentException(s"Cannot update state based on event $other")
}
}
}

val aggregateRootTag = "ar-test"
}

class TestAggregateRootCreator(system: ActorSystem)(implicit buildInfo: BuildInfo, runtimeInfo: RuntimeInfo)
extends AggregateRootCreator {

override def props(aggregateRootId: AggregateRootId): Props = {
system.log.debug("Returning new Props for {}", aggregateRootId)
Props(classOf[TestAggregateRoot], aggregateRootId, buildInfo, runtimeInfo)
}

}

0 comments on commit 93d3543

Please sign in to comment.