A simple event sourcing library written in Scala.

What is Event Sourcing?

E.S. is the idea of persisting immutable events of a domain object instead of its actual state:

Although you can save and get the last state into a database, you can also easily replay the events of the stream to get the state of your domain object.


There are a few problems with E.S., such as concurrency and side-effects while replaying your events. Styx can help you build your application using E.S., thus avoiding these common problems. ;)



The following example is based on:

The state

case class BankAccount(aggregationId: AggregationId) extends DynamicData with State

The event handler

object BankAccountEventHander {
  //you can always change the event store implementation, for instance:
  // implicit val eventHandler: EventHander[BankAccount] = new CassandraEventHandlerFetcher[BankAccount]
  implicit val eventHandler: EventHandler[BankAccount] with EventFetcher[BankAccount] = MongoDBEventHandlerFetcher(MongoD.collection, mapper, converter)

The events

case class BankAccountCreated(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    newAccount.balance = 0 =
    newAccount.status = "ACTIVE"
    newAccount.owner = this.owner

  override def canApply(state: BankAccount): Valid = validation(state.status == null, "this account is already created")
case class DepositPerformed(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    account copyTo newAccount
    newAccount.balance = account.balance[Int] + this.amount[Int]
case class OwnerChanged(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    account copyTo newAccount
    newAccount.owner = this.newOwner
case class WithdrawalPerformed(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    account copyTo newAccount
    newAccount.balance = account.balance[Int] - this.amount[Int]

  override def canApply(state: BankAccount): Event.Valid = validation((state.balance[Int] - this.amount[Int]) >= 0,
    s"the account cannot have a balance lower than zero. current balance: ${state.balance[Int]}, withdrawal amount: ${this.amount[Int]}")
case class BankAccountClosed(override val revision: Long, override val eventDate: Date = new Date()) extends Event[BankAccount](revision) {
  def applyTo(account: BankAccount): BankAccount = {
    val newAccount = BankAccount(revision, account.aggregationId)
    account copyTo newAccount
    newAccount.closeReason = this.closeReason
    newAccount.status = "CLOSED"

  override def canApply(state: BankAccount): Event.Valid = validation(!state.status.equals("CLOSED"), "this account is already closed")

The commands

class CreateAccountCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def event: EventProduce = (request) => (state) => {
    Future {
      val event = BankAccountCreated(state.lastEventVersion + 1) =
      event.owner = request.owner

  override def execute: ExecutionProduce = (request) => (state) => Future.successful()
class DepositCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def event: EventProduce = (request) => (state) => {
    Future {
      val event = DepositPerformed(state.lastEventVersion + 1)
      event.amount = request.amount

  override def execute: ExecutionProduce = (request) => (state) => Future.successful()
class ChangeOwnerCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def execute: ExecutionProduce = (request) => (state) => Future.successful()

  override def event: EventProduce = (request) => (state) => Future {
    val event = OwnerChanged(state.lastEventVersion + 1)
    event.newOwner = request.newOwner
class WithdrawalCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def event: EventProduce = (request) => (state) => {
    Future {
      val event = WithdrawalPerformed(state.lastEventVersion + 1)
      event.amount = request.amount

  override def execute: ExecutionProduce = (request) => (state) => Future.successful()
class CloseCommand(implicit override val executionContext: ExecutionContext) extends Command[Request, BankAccount] {
  override def event: EventProduce = (request) => (state) => {
    Future {
      val event = BankAccountClosed(state.lastEventVersion + 1)
      event.closeReason = request.reason

  override def execute: ExecutionProduce = (request) => (state) => Future.successful()

Commands as functions

object BankAccountCommands {
  implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

  val createAccount: ExecutionRequest[Request, BankAccount] = new CreateAccountCommand
  val withdrawal: ExecutionRequest[Request, BankAccount] = new WithdrawalCommand
  val deposit: ExecutionRequest[Request, BankAccount] = new DepositCommand
  val changeOwner: ExecutionRequest[Request, BankAccount] = new ChangeOwnerCommand
  val close: ExecutionRequest[Request, BankAccount] = new CloseCommand


class EventSourcingTest extends FeatureSpec with Matchers {
   feature("Creating an account") {
     scenario("withdrawing more money than the balance has should throw an exception") {
       implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3))
       val aggregationId = UUID.randomUUID().toString
       val result = List.range(0, 500).map { i =>
         val eventualBankAccount = for {
           account <- createAccount(Request("owner" -> "John Doe", "id" -> 123))(BankAccount(0, aggregationId))
           account <- deposit(Request("amount" -> 20))(account)
           account <- changeOwner(Request("newOwner" -> "Jane Doe"))(account)
           account <- withdrawal(Request("amount" -> 10))(account)
           account <- withdrawal(Request("amount" -> 10))(account)
           account <- withdrawal(Request("amount" -> 10))(account)
           account <- close(Request("reason" -> "Unavailable address"))(account)
         } yield account
         an[InvalidExecutionException] should be thrownBy Await.result(eventualBankAccount, 1000 millis)
         val eventualSeq = Await.result(eventHandler.get(aggregationId), 1 minute)
         val state =, aggregationId))
         state.balance[Int] shouldBe 0
         state.status[String] shouldNot be("CLOSED")
     scenario("assert that replaying restores the actual state of the BankAccount object") {
       implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(20))
       val result = List.range(0, 20).map { i =>
         val aggregationId = UUID.randomUUID().toString
         val eventualBankAccount = for {
           account <- createAccount(Request("owner" -> "John Doe", "id" -> 123))(BankAccount(0, aggregationId))
           account <- deposit(Request("amount" -> 20))(account)
           account <- changeOwner(Request("newOwner" -> "Jane Doe"))(account)
           account <- withdrawal(Request("amount" -> 10))(account)
           account <- close(Request("reason" -> "Unavailable address"))(account)
         } yield account
         val result: Future[BankAccount] = eventualBankAccount.andThen {
           case Success(state) => eventHandler.get(aggregationId).play(BankAccount(0, aggregationId))
         eventualBankAccount -> result
       result.foreach(f => {
         val (eventualActualState, eventualPlayedState) = f
         val playedState = Await.result(eventualPlayedState, 60 minutes)
         val actualState = Await.result(eventualActualState, 60 minutes)
         playedState shouldBe actualState