Permalink
Browse files

Payment process added

  • Loading branch information...
krasserm committed Aug 11, 2012
1 parent 3765839 commit c35d91e1bfeb395272099906fffe2c1c6d8b337e
@@ -170,9 +170,13 @@ case class InvoiceDiscountSet(invoiceId: String, discount: BigDecimal)
case class InvoiceSent(invoiceId: String, invoice: Invoice, to: InvoiceAddress)
case class InvoicePaid(invoiceId: String)
case class InvoicePaymentRequested(invoiceId: String, amount: BigDecimal, to: InvoiceAddress)
case class InvoicePaymentReceived(invoiceId: String, amount: BigDecimal)
// Commands
case class CreateInvoice(invoiceId: String)
case class AddInvoiceItem(invoiceId: String, expectedVersion: Option[Long], invoiceItem: InvoiceItem)
case class SetInvoiceDiscount(invoiceId: String, expectedVersion: Option[Long], discount: BigDecimal)
case class SendInvoiceTo(invoiceId: String, expectedVersion: Option[Long], to: InvoiceAddress)
case class PayInvoice(invoiceId: String, expectedVersion: Option[Long], amount: BigDecimal)
@@ -23,16 +23,24 @@ object Appserver {
val invoicesRef = Ref(Map.empty[String, Invoice])
val statisticsRef = Ref(Map.empty[String, Int])
val invoiceComponent = Component(1, journal)
val listenersComponent = Component(2, journal)
.setProcessor(outputChannels => system.actorOf(Props(new StatisticsProcessor(statisticsRef))))
val invoiceComponent = Component(1, journal)
val invoiceService = new InvoiceService(invoicesRef, invoiceComponent)
val statisticsService = new StatisticsService(statisticsRef)
val paymentService = system.actorOf(Props(new PaymentService(invoiceComponent)))
listenersComponent
.addDefaultOutputChannelToActor("payment", paymentService)
.setProcessors(outputChannels => List(
system.actorOf(Props(new StatisticsProcessor(statisticsRef))),
system.actorOf(Props(new PaymentProcess(outputChannels)))))
invoiceComponent
.addDefaultOutputChannelToComponent("listeners", listenersComponent)
.setProcessor(outputChannels => system.actorOf(Props(new InvoiceProcessor(invoicesRef, outputChannels))))
Composite.init(invoiceComponent)
val invoiceService = new InvoiceService(invoicesRef, invoiceComponent)
val statisticsService = new StatisticsService(statisticsRef)
}
}
@@ -60,9 +60,6 @@ class InvoiceService(invoicesRef: Ref[Map[String, Invoice]], invoiceComponent: C
def sendInvoiceTo(invoiceId: String, expectedVersion: Option[Long], to: InvoiceAddress): Future[DomainValidation[SentInvoice]] =
invoiceComponent.inputProducer ? SendInvoiceTo(invoiceId, expectedVersion, to) map(_.asInstanceOf[DomainValidation[SentInvoice]])
def payInvoice(invoiceId: String, expectedVersion: Option[Long], amount: BigDecimal): Future[DomainValidation[PaidInvoice]] =
invoiceComponent.inputProducer ? PayInvoice(invoiceId, expectedVersion, amount) map(_.asInstanceOf[DomainValidation[PaidInvoice]])
}
// -------------------------------------------------------------------------------------------------------------
@@ -92,8 +89,8 @@ class InvoiceProcessor(invoicesRef: Ref[Map[String, Invoice]], outputChannels: M
process(sendInvoiceTo(invoiceId, expectedVersion, to), msg.sender) { invoice =>
listeners ! msg.copy(event = InvoiceSent(invoiceId, invoice, to))
}
case PayInvoice(invoiceId, expectedVersion, amount) =>
process(payInvoice(invoiceId, expectedVersion, amount), msg.sender) { invoice =>
case InvoicePaymentReceived(invoiceId, amount) =>
process(payInvoice(invoiceId, None, amount), None) { invoice =>
listeners ! msg.copy(event = InvoicePaid(invoiceId))
}
@@ -0,0 +1,47 @@
package org.eligosource.eventsourced.example.service
import akka.actor._
import akka.dispatch.Future
import org.eligosource.eventsourced.core._
import org.eligosource.eventsourced.example.domain._
/**
* Payment service that interacts with customer to request payment (mocked).
*/
class PaymentService(invoiceComponent: Component) extends Actor {
implicit val executor = context.system.dispatcher
def receive = {
case msg: Message => msg.event match {
case InvoicePaymentRequested(invoiceId, amount, to) => {
// don't use a replay channel but acknowledge immediately
sender ! Ack
// because payments may take several days to arrive ...
Future { invoiceComponent.inputChannel ! Message(InvoicePaymentReceived(invoiceId, amount)) }
}
}
}
}
/**
* (Long-running) payment process.
*/
class PaymentProcess(outputChannels: Map[String, ActorRef]) extends Actor {
var pendingPayments = Map.empty[String, InvoicePaymentRequested]
def receive = {
case msg: Message => msg.event match {
case InvoiceSent(invoiceId, invoice, to) => {
val outEvent = InvoicePaymentRequested(invoiceId, invoice.getTotal, to)
outputChannels("payment") ! msg.copy(event = outEvent)
pendingPayments = pendingPayments + (invoiceId -> outEvent)
}
case InvoicePaid(invoiceId) => {
pendingPayments = pendingPayments - invoiceId
}
case _ =>
}
}
}

0 comments on commit c35d91e

Please sign in to comment.