Permalink
Browse files

All invoice service methods implemented

  • Loading branch information...
krasserm committed Aug 10, 2012
1 parent 87a17bd commit c320b54fb878f95e7a0b4bcab2f251c339dfdd57
View
@@ -1,9 +1,7 @@
Info
----
This project re-implements [eventsourcing-example](https://github.com/krasserm/eventsourcing-example) based on the Eligosource [Eventsourced](https://github.com/eligosource/eventsourced) library. *This is currently work in progress. At the moment, only invoice creation and reading works. Stay tuned for more to come soon ...*
This project re-implements [eventsourcing-example](https://github.com/krasserm/eventsourcing-example) based on the Eligosource [Eventsourced](https://github.com/eligosource/eventsourced) library.
Run
---
@@ -51,6 +51,22 @@ sealed abstract class Invoice {
}
object Invoice {
val invalidVersionMessage = "invoice %s: expected version %s doesn't match current version %s"
def invalidVersion(invoiceId: String, expected: Long, current: Long) =
DomainError(invalidVersionMessage format (invoiceId, expected, current))
def requireVersion[T <: Invoice](invoice: T, expectedVersion: Option[Long]): DomainValidation[T] = {
val id = invoice.id
val version = invoice.version
expectedVersion match {
case Some(expected) if (version != expected) => invalidVersion(id, expected, version).fail
case Some(expected) if (version == expected) => invoice.success
case None => invoice.success
}
}
def create(id: String): DomainValidation[DraftInvoice] = DraftInvoice(id, version = 0L).success
}
@@ -155,4 +171,8 @@ case class InvoiceSent(invoiceId: String, invoice: Invoice, to: InvoiceAddress)
case class InvoicePaid(invoiceId: String)
// Commands
case class CreateInvoice(invoiceId: String)
case class CreateInvoice(invoiceId: String)
case class AddInvoiceItem(invoiceId: String, expectedVersion: Option[Long], invoiceItem: InvoiceItem)
case class SetInvoiceDiscount(invoiceId: String, expectedVersion: Option[Long], discount: BigDecimal)
case class SendInvoiceTo(invoiceId: String, expectedVersion: Option[Long], to: InvoiceAddress)
case class PayInvoice(invoiceId: String, expectedVersion: Option[Long], amount: BigDecimal)
@@ -29,34 +29,7 @@ import org.eligosource.eventsourced.example.domain._
import scalaz._
import Scalaz._
class InvoiceProcessor(invoicesRef: Ref[Map[String, Invoice]], outputChannels: Map[String, ActorRef]) extends Actor {
def receive = {
case msg: Message => msg.event match {
case CreateInvoice(id) => updateInvoicesAndReply(createInvoice(id), msg.sender)
}
}
def createInvoice(invoiceId: String): DomainValidation[DraftInvoice] = {
readInvoices.get(invoiceId) match {
case Some(invoice) => DomainError("invoice %s: already exists" format invoiceId).fail
case None => Invoice.create(invoiceId)
}
}
private def updateInvoicesAndReply(validation: DomainValidation[Invoice], sender: Option[ActorRef]) {
validation.foreach(updateInvoices)
sender.foreach(_ ! validation)
}
private def updateInvoices(invoice: Invoice) =
invoicesRef.single.transform(invoices => invoices + (invoice.id -> invoice))
private def readInvoices =
invoicesRef.single.get
}
class InvoiceService(invoicesRef: Ref[Map[String, Invoice]], invoiceComponent: Component) {
import InvoiceService._
//
// Consistent reads
@@ -80,22 +53,115 @@ class InvoiceService(invoicesRef: Ref[Map[String, Invoice]], invoiceComponent: C
invoiceComponent.inputProducer ? CreateInvoice(invoiceId) map(_.asInstanceOf[DomainValidation[DraftInvoice]])
def addInvoiceItem(invoiceId: String, expectedVersion: Option[Long], invoiceItem: InvoiceItem): Future[DomainValidation[DraftInvoice]] =
throw new UnsupportedOperationException("coming soon")
invoiceComponent.inputProducer ? AddInvoiceItem(invoiceId, expectedVersion, invoiceItem) map(_.asInstanceOf[DomainValidation[DraftInvoice]])
def setInvoiceDiscount(invoiceId: String, expectedVersion: Option[Long], discount: BigDecimal): Future[DomainValidation[DraftInvoice]] =
throw new UnsupportedOperationException("coming soon")
invoiceComponent.inputProducer ? SetInvoiceDiscount(invoiceId, expectedVersion, discount) map(_.asInstanceOf[DomainValidation[DraftInvoice]])
def sendInvoiceTo(invoiceId: String, expectedVersion: Option[Long], to: InvoiceAddress): Future[DomainValidation[SentInvoice]] =
throw new UnsupportedOperationException("coming soon")
invoiceComponent.inputProducer ? SendInvoiceTo(invoiceId, expectedVersion, to) map(_.asInstanceOf[DomainValidation[SentInvoice]])
def payInvoice(invoiceId: String, expectedVersion: Option[Long], amount: BigDecimal): Future[DomainValidation[PaidInvoice]] =
throw new UnsupportedOperationException("coming soon")
invoiceComponent.inputProducer ? PayInvoice(invoiceId, expectedVersion, amount) map(_.asInstanceOf[DomainValidation[PaidInvoice]])
}
object InvoiceService {
// -------------------------------------------------------------------------------------------------------------
// InvoiceProcessor is single writer to invoicesRef, so we can have reads and writes in separate transactions
// -------------------------------------------------------------------------------------------------------------
class InvoiceProcessor(invoicesRef: Ref[Map[String, Invoice]], outputChannels: Map[String, ActorRef]) extends Actor {
import InvoiceProcessor._
val listeners = outputChannels("listeners")
def receive = {
case msg: Message => msg.event match {
case CreateInvoice(invoiceId) =>
process(createInvoice(invoiceId), msg.sender) { invoice =>
listeners ! msg.copy(event = InvoiceCreated(invoiceId))
}
case AddInvoiceItem(invoiceId, expectedVersion, invoiceItem) =>
process(addInvoiceItem(invoiceId, expectedVersion, invoiceItem), msg.sender) { invoice =>
listeners ! msg.copy(event = InvoiceItemAdded(invoiceId, invoiceItem))
}
case SetInvoiceDiscount(invoiceId, expectedVersion, discount) =>
process(setInvoiceDiscount(invoiceId, expectedVersion, discount), msg.sender) { invoice =>
listeners ! msg.copy(event = InvoiceDiscountSet(invoiceId, discount))
}
case SendInvoiceTo(invoiceId, expectedVersion, to) =>
process(sendInvoiceTo(invoiceId, expectedVersion, to), msg.sender) { invoice =>
listeners ! msg.copy(event = InvoiceSent(invoiceId, invoice, to))
}
case PayInvoice(invoiceId, expectedVersion, amount) =>
process(payInvoice(invoiceId, expectedVersion, amount), msg.sender) { invoice =>
listeners ! msg.copy(event = InvoicePaid(invoiceId))
}
}
}
def process(validation: DomainValidation[Invoice], sender: Option[ActorRef])(onSuccess: Invoice => Unit) = {
validation.foreach { invoice =>
updateInvoices(invoice)
onSuccess(invoice)
}
sender.foreach { sender =>
sender ! validation
}
}
def createInvoice(invoiceId: String): DomainValidation[DraftInvoice] = {
readInvoices.get(invoiceId) match {
case Some(invoice) => DomainError("invoice %s: already exists" format invoiceId).fail
case None => Invoice.create(invoiceId)
}
}
def addInvoiceItem(invoiceId: String, expectedVersion: Option[Long], invoiceItem: InvoiceItem): DomainValidation[DraftInvoice] =
updateDraftInvoice(invoiceId, expectedVersion) { invoice => invoice.addItem(invoiceItem) }
def setInvoiceDiscount(invoiceId: String, expectedVersion: Option[Long], discount: BigDecimal): DomainValidation[DraftInvoice] =
updateDraftInvoice(invoiceId, expectedVersion) { invoice => invoice.setDiscount(discount) }
def sendInvoiceTo(invoiceId: String, expectedVersion: Option[Long], to: InvoiceAddress): DomainValidation[SentInvoice] =
updateDraftInvoice(invoiceId, expectedVersion) { invoice => invoice.sendTo(to) }
def payInvoice(invoiceId: String, expectedVersion: Option[Long], amount: BigDecimal): DomainValidation[PaidInvoice] =
updateInvoice(invoiceId, expectedVersion) { invoice =>
invoice match {
case invoice: SentInvoice => invoice.pay(amount)
case invoice: Invoice => notSentError(invoiceId).fail
}
}
def updateInvoice[B <: Invoice](invoiceId: String, expectedVersion: Option[Long])(f: Invoice => DomainValidation[B]): DomainValidation[B] =
readInvoices.get(invoiceId) match {
case None => DomainError("invoice %s: does not exist" format invoiceId).fail
case Some(invoice) => for {
current <- Invoice.requireVersion(invoice, expectedVersion)
updated <- f(invoice)
} yield updated
}
def updateDraftInvoice[B <: Invoice](invoiceId: String, expectedVersion: Option[Long])(f: DraftInvoice => DomainValidation[B]): DomainValidation[B] =
updateInvoice(invoiceId, expectedVersion) { invoice =>
invoice match {
case invoice: DraftInvoice => f(invoice)
case invoice: Invoice => notDraftError(invoiceId).fail
}
}
private def updateInvoices(invoice: Invoice) =
invoicesRef.single.transform(invoices => invoices + (invoice.id -> invoice))
private def readInvoices =
invoicesRef.single.get
}
object InvoiceProcessor {
private[service] def notDraftError(invoiceId: String) =
DomainError("invoice %s: not a draft invoice" format invoiceId)
private[service] def notSentError(invoiceId: String) =
DomainError("invoice %s: not a sent invoice" format invoiceId)
}
}
@@ -7,19 +7,20 @@ import akka.actor.Actor
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.example.domain._
class StatisticsService(statisticsRef: Ref[Map[String, Int]]) {
def statistics = statisticsRef.single.get
}
class StatisticsProcessor(statisticsRef: Ref[Map[String, Int]]) extends Actor {
def receive = {
case msg: Message => msg.event match {
case msg: Message => msg.event match {
case InvoiceItemAdded(id, _) => statisticsRef.single.transform { statistics =>
statistics.get(id) match {
case Some(count) => statistics + (id -> (count + 1))
case None => statistics + (id -> 1)
}
}
case _ =>
}
}
}
class StatisticsService(statisticsRef: Ref[Map[String, Int]]) {
def statistics = statisticsRef.single.get
}

0 comments on commit c320b54

Please sign in to comment.