Skip to content

Commit

Permalink
updated per refactored functional peds.publish package
Browse files Browse the repository at this point in the history
  • Loading branch information
dmrolfs committed Oct 15, 2014
1 parent 14a1713 commit 572ceb5
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 56 deletions.
2 changes: 1 addition & 1 deletion examples/src/main/scala/blog/AuthorListingModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ object AuthorListingModule extends LazyLogging {

object AuthorListing {
import peds.akka.envelope.Envelope
import peds.akka.publish.ReliableMessage
import peds.akka.publish.ReliablePublisher.ReliableMessage

def props: Props = Props[AuthorListing]

Expand Down
13 changes: 11 additions & 2 deletions examples/src/main/scala/blog/post/PostModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import akka.contrib.pattern.ClusterSharding
import akka.persistence.AtLeastOnceDelivery
import akka.event.LoggingReceive
import demesne._
import peds.akka.publish.{EventPublisher, ReliablePublisher}
import peds.akka.envelope.Envelope
import peds.akka.publish.{Publisher, EventPublisher, ReliablePublisher}
import peds.commons.identifier._
import peds.commons.log.Trace
import sample.blog.author.AuthorListingModule
Expand Down Expand Up @@ -64,9 +65,17 @@ object PostModule extends AggregateRootModuleCompanion { module =>

object Post {
def props( meta: AggregateRootType, authorListing: ActorRef ): Props = {
import peds.akka.publish._

Props(
new Post( meta ) with ReliablePublisher with AtLeastOnceDelivery {
override def destination: ActorPath = authorListing.path
import peds.commons.util.Chain._
override def publish: Publisher = local +> filter +> reliablePublisher( authorListing.path )

val filter: Publisher = {
case e @ Envelope( _: PostPublished, _ ) => Left( e )
case _ => Right( () )
}
}
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ object ConferenceModule extends AggregateRootModuleCompanion { module =>

object Conference {
def props( meta: AggregateRootType, conferenceContext: ActorRef ): Props = {
Props( new Conference( meta, conferenceContext ) with LocalPublisher )
Props( new Conference( meta, conferenceContext ) with EventPublisher )
}

class ConferenceCreateException( cause: Throwable )
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
package contoso.conference.registration

import scala.concurrent.duration._
import scala.util.Random
import akka.actor._
import akka.contrib.pattern.ClusterSharding
import akka.event.LoggingReceive
import com.typesafe.config.ConfigFactory
import squants._
import com.github.nscala_time.time.{ Imports => joda }
import com.github.nscala_time.time.Imports._
import demesne._
import peds.akka.envelope._
import peds.akka.publish.{ EventPublisher, LocalPublisher }
import com.github.nscala_time.time.{Imports => joda}
import com.typesafe.config.ConfigFactory
import contoso.conference.ConferenceModule
import contoso.registration.{OrderLine, SeatQuantity}
import demesne._
import peds.akka.envelope._
import peds.akka.publish.EventPublisher
import squants._

import scala.util.Random


trait OrderModule extends AggregateRootModule {
import OrderModule.trace
import contoso.conference.registration.OrderModule.trace

abstract override def start( ctx: Map[Symbol, Any] ): Unit = trace.block( "start" ) {
super.start( ctx )
Expand All @@ -35,7 +35,7 @@ object OrderModule extends AggregateRootModuleCompanion { module =>
.getConfig( "contoso.conference.registration" )
.withFallback( ConfigFactory.parseString( fallback ) )

import java.util.concurrent.{ TimeUnit => TU }
import java.util.concurrent.{TimeUnit => TU}
val reservationAutoExpiration: joda.Period = joda.Period.millis(
config.getDuration( "reservation-auto-expiration", TU.MILLISECONDS ).toInt
)
Expand Down Expand Up @@ -211,7 +211,7 @@ object OrderModule extends AggregateRootModuleCompanion { module =>

object Order {
def props( meta: AggregateRootType, pricingRetriever: ActorRef ): Props = {
Props( new Order( meta, pricingRetriever ) with LocalPublisher )
Props( new Order( meta, pricingRetriever ) with EventPublisher )
}
}

Expand All @@ -221,8 +221,6 @@ object OrderModule extends AggregateRootModuleCompanion { module =>
override val trace = Trace( "Order", log )

override var state: OrderState = _

import context.dispatcher
var expirationMessager: Cancellable = _

override def transitionFor( state: OrderState ): Transition = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,17 @@
package contoso.conference.registration

import akka.actor.Actor.Receive
import akka.actor._
import akka.contrib.pattern.ClusterSharding
import com.github.nscala_time.time.{ Imports => joda }
import com.github.nscala_time.time.Imports._
import com.typesafe.config.ConfigFactory
import com.github.nscala_time.time.{Imports => joda}
import contoso.conference.ConferenceModule
import contoso.conference.payments.PaymentSourceModule
import contoso.conference.payments.PaymentSourceModule.PaymentCompleted
import contoso.conference.registration.OrderModule._
import contoso.conference.registration.SeatsAvailabilityModule.{CancelSeatReservation, CommitSeatReservation,
MakeSeatReservation, SeatsReserved}
import contoso.conference.registration.SeatsAvailabilityModule.{CancelSeatReservation, CommitSeatReservation, MakeSeatReservation, SeatsReserved}
import contoso.registration.SeatQuantity
import demesne._
import peds.akka.envelope.trace
import peds.akka.publish.{EventPublisher, LocalPublisher}
import peds.akka.envelope._
import peds.commons.identifier.ShortUUID
import peds.akka.publish.EventPublisher
import peds.commons.log.Trace

import scala.concurrent.duration._
Expand All @@ -30,7 +24,7 @@ import scala.concurrent.duration._
* Created by damonrolfs on 9/11/14.
*/
trait RegistrationSagaModule extends SagaModule {
import RegistrationSagaModule.trace
import contoso.conference.registration.RegistrationSagaModule.trace

abstract override def start( moduleContext: Map[Symbol, Any] ): Unit = trace.block( "start" ) {
super.start( moduleContext )
Expand Down Expand Up @@ -122,7 +116,7 @@ object RegistrationSagaModule extends SagaModuleCompanion { module =>

object RegistrationSaga {
def props( meta: AggregateRootType, orderType: AggregateRootType, availabilityType: AggregateRootType ): Props = {
Props( new RegistrationSaga( meta, orderType, availabilityType ) with LocalPublisher )
Props( new RegistrationSaga( meta, orderType, availabilityType ) with EventPublisher )
}

//DMR: det where to locate this b/h; e.g., pull-req into nscala-time, peds?
Expand All @@ -136,8 +130,6 @@ object RegistrationSagaModule extends SagaModuleCompanion { module =>
) extends Saga[RegistrationSagaState] {
outer: EventPublisher =>

import RegistrationSaga._

override val trace = Trace( "RegistrationSaga", log )

override var state: RegistrationSagaState = _
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package contoso.conference.registration

import scala.annotation.tailrec
import akka.actor.{ ActorSystem, Props }
import akka.actor.{ActorSystem, Props}
import akka.event.LoggingReceive
import peds.commons.log.Trace
import peds.akka.publish.{ EventPublisher, LocalPublisher }
import demesne._
import contoso.conference.SeatType
import contoso.registration.{SeatQuantity, PersonalInfo}
import contoso.registration.{PersonalInfo, SeatQuantity}
import demesne._
import peds.akka.publish.EventPublisher
import peds.commons.log.Trace

import scala.annotation.tailrec


trait SeatAssignmentsModule extends AggregateRootModule {
import SeatAssignmentsModule.trace
import contoso.conference.registration.SeatAssignmentsModule.trace

abstract override def start( ctx: Map[Symbol, Any] ): Unit = trace.block( "start" ) {
super.start( ctx )
Expand Down Expand Up @@ -142,7 +143,7 @@ object SeatAssignmentsModule extends AggregateRootModuleCompanion { module =>


object SeatAssignments {
def props( meta: AggregateRootType ): Props = Props( new SeatAssignments( meta ) with LocalPublisher )
def props( meta: AggregateRootType ): Props = Props( new SeatAssignments( meta ) with EventPublisher )
}

class SeatAssignments( override val meta: AggregateRootType ) extends AggregateRoot[SeatAssignmentsState] {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package contoso.conference.registration

import akka.actor.{Props, ActorSystem}
import akka.actor.{ActorSystem, Props}
import akka.event.LoggingReceive
import contoso.conference.{SeatType, ConferenceModule}
import contoso.conference.{ConferenceModule, SeatType}
import contoso.registration.SeatQuantity
import demesne._
import peds.akka.publish.{EventPublisher, LocalPublisher}
import peds.akka.publish.EventPublisher
import peds.commons.log.Trace
import squants.{Each, Dimensionless}
import squants.{Dimensionless, Each}


/**
Expand All @@ -17,7 +17,7 @@ import squants.{Each, Dimensionless}
* for the same conference at the same time.
*/
trait SeatsAvailabilityModule extends AggregateRootModule {
import SeatsAvailabilityModule.trace
import contoso.conference.registration.SeatsAvailabilityModule.trace

abstract override def start( ctx: Map[Symbol, Any] ): Unit = trace.block( "start" ) {
super.start( ctx )
Expand Down Expand Up @@ -172,14 +172,12 @@ object SeatsAvailabilityModule extends AggregateRootModuleCompanion { module =>


object SeatsAvailability {
def props( meta: AggregateRootType ): Props = Props( new SeatsAvailability( meta ) with LocalPublisher )
def props( meta: AggregateRootType ): Props = Props( new SeatsAvailability( meta ) with EventPublisher )
}

class SeatsAvailability( override val meta: AggregateRootType ) extends AggregateRoot[SeatsAvailabilityState] {
outer: EventPublisher =>

import SeatsAvailability._

override val trace = Trace( "SeatsAvailability", log )

override var state: SeatsAvailabilityState = _
Expand Down
2 changes: 1 addition & 1 deletion examples/src/test/scala/blog/AuthorListingModuleSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import demesne._
import demesne.testkit._
import org.scalatest.{Outcome, Tag}
import peds.akka.envelope._
import peds.akka.publish.ReliableMessage
import peds.akka.publish.ReliablePublisher.ReliableMessage
import peds.commons.identifier.{ShortUUID, TaggedID}
import peds.commons.log.Trace
import sample.blog.author.AuthorListingModule.{GetPosts, Posts}
Expand Down
35 changes: 27 additions & 8 deletions examples/src/test/scala/blog/post/PostModuleSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.testkit.TestProbe
import demesne._
import demesne.testkit.AggregateRootSpec
import peds.akka.envelope.{Envelope, MessageNumber, WorkId}
import peds.akka.publish.ReliableMessage
import peds.akka.publish.ReliablePublisher.ReliableMessage
import peds.commons.log.Trace

import scala.concurrent.duration._
Expand Down Expand Up @@ -38,18 +38,24 @@ class PostModuleSpec extends AggregateRootSpec[PostModuleSpec] {
"add content" in { fixture: Fixture =>
import fixture._

system.eventStream.subscribe( probe.ref, classOf[ReliableMessage] )
system.eventStream.subscribe( probe.ref, classOf[Envelope] )

val id = PostModule.nextId
val content = PostContent( author = "Damon", title = "Add Content", body = "add body content" )
val post = PostModule aggregateOf id
post ! AddPost( id, content )
probe.expectMsgPF( max = 1.second, hint = "post added" ) { //DMR: Is this sensitive to total num of tests executed?
case ReliableMessage( _, Envelope( payload: PostAdded, _ ) ) => payload.content mustBe content
case Envelope( payload: PostAdded, _ ) => payload.content mustBe content
}
}

"not respond before added" in { fixture: Fixture =>
import fixture._

system.eventStream.subscribe( probe.ref, classOf[ReliableMessage] )
system.eventStream.subscribe( probe.ref, classOf[Envelope] )

val id = PostModule.nextId
val post = PostModule aggregateOf id
post ! ChangeBody( id, "dummy content" )
Expand All @@ -60,6 +66,9 @@ class PostModuleSpec extends AggregateRootSpec[PostModuleSpec] {
"not respond to incomplete content" in { fixture: Fixture =>
import fixture._

system.eventStream.subscribe( probe.ref, classOf[ReliableMessage] )
system.eventStream.subscribe( probe.ref, classOf[Envelope] )

val id = PostModule.nextId
val post = PostModule aggregateOf id
post ! AddPost( id, PostContent( author = "Damon", title = "", body = "no title" ) )
Expand Down Expand Up @@ -159,20 +168,30 @@ class PostModuleSpec extends AggregateRootSpec[PostModuleSpec] {
val id = PostModule.nextId
val content = PostContent( author = "Damon", title = "Test Add", body = "testing happy path" )

system.eventStream.subscribe( probe.ref, classOf[ReliableMessage] )
system.eventStream.subscribe( probe.ref, classOf[Envelope] )

PostModule.aggregateOf( id ) ! AddPost( id, content )
PostModule.aggregateOf( id ) ! ChangeBody( id, "new content" )
PostModule.aggregateOf( id ) ! Publish( id )

probe.expectMsgPF() {
case ReliableMessage( 1, Envelope( payload: PostAdded, _) ) => payload.content mustBe content
probe.expectMsgPF( hint = "post-added" ) {
case Envelope( payload: PostAdded, _ ) => payload.content mustBe content
}

probe.expectMsgPF() {
case ReliableMessage( 2, Envelope( payload: BodyChanged, _) ) => payload.body mustBe "new content"
probe.expectMsgPF( hint = "body-changed" ) {
case Envelope( payload: BodyChanged, _ ) => payload.body mustBe "new content"
}

probe.expectMsgPF( hint = "post-published local" ) {
case Envelope( PostPublished( pid, _, title ), _ ) => {
pid mustBe id
title mustBe "Test Add"
}
}

probe.expectMsgPF() {
case ReliableMessage( 3, Envelope( PostPublished( pid, _, title ), _) ) => {
probe.expectMsgPF( hint = "post-published reliable" ) {
case ReliableMessage( 1, Envelope( PostPublished( pid, _, title ), _) ) => {
pid mustBe id
title mustBe "Test Add"
}
Expand Down
1 change: 1 addition & 0 deletions src/main/scala/demesne/AggregateRoot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ with ActorLogging {
case ex => log info s"${self.path.name} will not transition state for ${ex.getClass.safeSimpleName}"
}


override def receiveRecover: Receive = {
case offer: SnapshotOffer => { state = acceptSnapshot( offer ) }
case event => { state = accept( event ) }
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/demesne/AggregateRootType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package demesne
import akka.actor.{ActorRef, Props}
import akka.contrib.pattern.ShardRegion
import peds.akka.envelope.Envelope
import peds.akka.publish.ReliableMessage
import peds.akka.publish.ReliablePublisher.ReliableMessage
import peds.commons.util._

import scala.concurrent.duration._
Expand Down

0 comments on commit 572ceb5

Please sign in to comment.