Skip to content

Commit

Permalink
fixed concurrency issues uncovered in tests. Core issue was reliance …
Browse files Browse the repository at this point in the history
…on vars in aggregate root module companion.
  • Loading branch information
dmrolfs committed Oct 20, 2014
1 parent 2aebaf5 commit 439fc1d
Show file tree
Hide file tree
Showing 23 changed files with 208 additions and 151 deletions.
18 changes: 9 additions & 9 deletions examples/src/main/scala/blog/BlogApp.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package sample.blog

import akka.contrib.pattern.ClusterSharding

import scala.concurrent.duration._
import akka.actor._
import akka.contrib.pattern.ClusterSharding
import akka.pattern.ask
import akka.persistence.journal.leveldb.{ SharedLeveldbJournal, SharedLeveldbStore }
import akka.persistence.journal.leveldb.{SharedLeveldbJournal, SharedLeveldbStore}
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import demesne.DomainModel
import author.AuthorListingModule
import post.PostModule
import sample.blog.author.AuthorListingModule
import sample.blog.post.PostModule

import scala.concurrent.duration._


object BlogApp {
Expand All @@ -33,18 +33,18 @@ object BlogApp {
)

val makeAuthorListing: () => ActorRef = () => { ClusterSharding(clusterSystem).shardRegion(AuthorListingModule.shardName) }

val model = DomainModel.register( "blog" )( clusterSystem )
val context: Map[Symbol, Any] = Map(
demesne.SystemKey -> clusterSystem,
demesne.ModelKey -> DomainModel()( clusterSystem ),
demesne.ModelKey -> model,
demesne.FactoryKey -> demesne.factory.clusteredFactory,
'authorListing -> makeAuthorListing
)

registry.start( context )

// if ( port != 2551 && port != 2552 ) clusterSystem.actorOf( Bot.props( model ), "bot" )
if ( port != 2551 && port != 2552 ) clusterSystem.actorOf( Props[Bot], "bot" )
if ( port != 2551 && port != 2552 ) clusterSystem.actorOf( Bot.props( model ), "bot" )
}

def startSharedJournal( system: ActorSystem, startStore: Boolean, path: ActorPath ): Unit = {
Expand Down
8 changes: 4 additions & 4 deletions examples/src/main/scala/blog/Bot.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sample.blog

import akka.actor.{Actor, ActorLogging}
import akka.actor.{Actor, ActorLogging, Props}
import akka.cluster.Cluster
import akka.contrib.pattern.ClusterSharding
import akka.event.LoggingReceive
Expand All @@ -14,22 +14,22 @@ import scala.concurrent.duration._


object Bot {
// def props( model: DomainModel ): Props = Props( new Bot( model ) )
def props( model: DomainModel ): Props = Props( new Bot( model ) )

private case object Tick
}

class Bot extends Actor with ActorLogging {
class Bot( model: DomainModel ) extends Actor with ActorLogging {
val trace = Trace[Bot]

import context.dispatcher
import sample.blog.Bot._
val tickTask = context.system.scheduler.schedule( 3.seconds, 2.seconds, self, Tick )

val model = DomainModel()( context.system )
// val model =
// val postRegion = ClusterSharding( context.system ).shardRegion( PostModule.shardName )
def postRegion( id: ShortUUID ): AggregateRootRef = trace.block( s"postRegion( $id) " ) {
implicit val system = context.system
val result = model.aggregateOf( PostModule.aggregateRootType, id )
// log warning s"post AR = ${result}"
result
Expand Down
24 changes: 12 additions & 12 deletions examples/src/main/scala/blog/post/PostModule.scala
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package sample.blog.post

import akka.actor.{ActorPath, ActorRef, ActorSystem, Props}
import akka.contrib.pattern.ClusterSharding
import akka.persistence.AtLeastOnceDelivery
import akka.actor.{ActorRef, Props}
import akka.event.LoggingReceive
import akka.persistence.AtLeastOnceDelivery
import demesne._
import peds.akka.envelope.Envelope
import peds.akka.publish.{Publisher, EventPublisher, ReliablePublisher}
import peds.akka.publish.EventPublisher
import peds.commons.identifier._
import peds.commons.log.Trace
import sample.blog.author.AuthorListingModule
import shapeless._


Expand All @@ -25,18 +23,20 @@ trait PostModule extends AggregateRootModule {
object PostModule extends AggregateRootModuleCompanion { module =>
override val trace = Trace[PostModule.type]

var makeAuthorListing: () => ActorRef = _
override def initialize( context: Map[Symbol, Any] ): Unit = trace.block( "initialize" ) {
super.initialize( context )
require( context.contains( 'authorListing ), "must start PostModule with author listing factory" )
makeAuthorListing = context( 'authorListing ).asInstanceOf[() => ActorRef]
}

override val aggregateIdTag: Symbol = 'post


override def aggregateRootType( implicit system: ActorSystem = this.system ): AggregateRootType = {
override val aggregateRootType: AggregateRootType = {
new AggregateRootType {
override val name: String = module.shardName

override def aggregateRootProps: Props = trace.block( "aggregateRootProps" ) {
val authorListing = context( 'authorListing ).asInstanceOf[() => ActorRef]
Post.props( this, authorListing() )
}

override def aggregateRootProps( implicit model: DomainModel ): Props = Post.props( this, makeAuthorListing() )
override val toString: String = shardName + "AggregateRootType"
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/src/main/scala/blog/post/events.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import demesne.EventLike

sealed trait Event extends EventLike {
override type ID = PostModule.ID
override val sourceTypeName: Option[String] = Option( PostModule.aggregateRootType.name )
// override val sourceTypeName: Option[String] = Option( PostModule.aggregateRootType.name )
}

case class PostAdded( override val sourceId: PostAdded#TID, content: PostContent ) extends Event
Expand Down
34 changes: 20 additions & 14 deletions examples/src/main/scala/contoso/conference/ConferenceModule.scala
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
package contoso.conference

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.actor.{ActorRef, Props}
import akka.event.LoggingReceive
import shapeless._
import com.github.nscala_time.time.{ Imports => joda }
import com.github.nscala_time.time.{Imports => joda}
import demesne._
import peds.commons.log.Trace
import peds.commons.identifier._
import peds.akka.AskRetry._
import peds.akka.publish._
import contoso._
import peds.commons.log.Trace
import shapeless._

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success}


trait ConferenceModule extends AggregateRootModule {
import ConferenceModule.trace
import contoso.conference.ConferenceModule.trace

abstract override def start( ctx: Map[Symbol, Any] ): Unit = trace.block( "start" ) {
super.start( ctx )
Expand All @@ -28,12 +27,19 @@ object ConferenceModule extends AggregateRootModuleCompanion { module =>
//DMR move these into common AggregateModuleCompanion trait
val trace = Trace[ConferenceModule.type]

var conferenceContext: ActorRef = _
override def initialize( context: Map[Symbol, Any] ): Unit = trace.block( "initialize" ) {
super.initialize( context )
require( context.contains( 'ConferenceContext ), "must start ConferenceModule with ConferenceContext" )
conferenceContext = context( 'ConferenceContext ).asInstanceOf[ActorRef]
}

override val aggregateIdTag: Symbol = 'conference

override def aggregateRootType( implicit system: ActorSystem = this.system ): AggregateRootType = {
override val aggregateRootType: AggregateRootType = {
new AggregateRootType {
override val name: String = module.shardName
override val aggregateRootProps: Props = Conference.props( this, context( 'ConferenceContext ).asInstanceOf[ActorRef] )
override def aggregateRootProps( implicit model: DomainModel ): Props = Conference.props( this, conferenceContext )
override val toString: String = shardName + "AggregateRootType"
}
}
Expand Down Expand Up @@ -66,7 +72,7 @@ object ConferenceModule extends AggregateRootModuleCompanion { module =>

sealed trait Event extends ConferenceProtocol with EventLike {
override type ID = module.ID
override val sourceTypeName: Option[String] = Option( module.aggregateRootType.name )
// override val sourceTypeName: Option[String] = Option( module.aggregateRootType.name )
}

//Conference/Conference.Contracts/ConferenceCreated.cs
Expand Down Expand Up @@ -157,7 +163,7 @@ object ConferenceModule extends AggregateRootModuleCompanion { module =>

class Conference( override val meta: AggregateRootType, conferenceContext: ActorRef ) extends AggregateRoot[ConferenceState] {
outer: EventPublisher =>
import Conference._
import contoso.conference.ConferenceModule.Conference._

override val trace = Trace( "Conference", log )

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ object PaymentSourceModule { module =>

sealed trait Event extends EventLike {
override type ID = module.ID
override val sourceTypeName: Option[String] = Some( "PaymentSource" )
// override val sourceTypeName: Option[String] = Some( "PaymentSource" )
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ object OrderModule extends AggregateRootModuleCompanion { module =>

override val aggregateIdTag: Symbol = 'order

override def aggregateRootType( implicit system: ActorSystem = this.system ): AggregateRootType = {
override val aggregateRootType: AggregateRootType = {
new AggregateRootType {
override val name: String = module.shardName
override def aggregateRootProps: Props = {
override def aggregateRootProps( implicit model: DomainModel ): Props = {
Order.props(
this,
ClusterSharding( system ).shardRegion( PricingRetriever.shardName )
ClusterSharding( model.system ).shardRegion( PricingRetriever.shardName )
)
}
override val toString: String = shardName + "AggregateRootType"
Expand Down Expand Up @@ -119,7 +119,7 @@ object OrderModule extends AggregateRootModuleCompanion { module =>

sealed trait Event extends EventLike {
override type ID = module.ID
override val sourceTypeName: Option[String] = Option( module.aggregateRootType.name )
// override val sourceTypeName: Option[String] = Option( module.aggregateRootType.name )
}

// Registration.Contracts/Events/OrderPlaced.cs
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package contoso.conference.registration

import scala.concurrent.duration._
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import akka.actor.{Actor, ActorLogging, ActorRef, Props}
import akka.event.LoggingReceive
import com.typesafe.config.ConfigFactory
import squants.market._
import demesne.{ AggregateRootType, DomainModel }
import contoso.conference.ConferenceModule
import contoso.registration.{SeatOrderLine, OrderLine, SeatQuantity}
import contoso.registration.{OrderLine, SeatOrderLine, SeatQuantity}
import demesne.DomainModel
import squants.market._

import scala.concurrent.duration._


object PricingRetriever {
Expand All @@ -24,7 +25,7 @@ object PricingRetriever {

case object ConferencePublishedSeatTypesTimeout extends PricingMessage

lazy val conferenceRootType: AggregateRootType = ConferenceModule.aggregateRootType
// lazy val conferenceRootType: AggregateRootType = ConferenceModule.aggregateRootType

val fallback = "conference-timeout = 250ms"
val config = ConfigFactory.load
Expand Down Expand Up @@ -73,13 +74,13 @@ object PricingRetriever {


class PricingRetriever( model: DomainModel ) extends Actor with ActorLogging {
import PricingRetriever._
import contoso.conference.registration.PricingRetriever._

override def receive: Receive = LoggingReceive {
case CalculateTotal( conferenceId, seatItems ) => {
val originalSender = sender
val handler = context.actorOf( CalculationHandler.props( seatItems, originalSender ) )
val conference = model.aggregateOf( conferenceRootType, conferenceId )
val conference = model.aggregateOf( ConferenceModule.aggregateRootType, conferenceId )
conference.tell( ConferenceModule.GetPublishedSeatTypes, handler )
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,19 @@ object RegistrationSagaModule extends SagaModuleCompanion { module =>
val trace = Trace[RegistrationSagaModule.type]
override val aggregateIdTag: Symbol = 'registration

override def aggregateRootType( implicit system: ActorSystem = this.system ): AggregateRootType = {
override val aggregateRootType: AggregateRootType = {
new AggregateRootType {
override def name: String = module.shardName
override def aggregateRootProps: Props = {

override def aggregateRootProps( implicit model: DomainModel ): Props = {
RegistrationSaga.props(
meta = this,
model = model,
orderType = OrderModule.aggregateRootType,
availabilityType = SeatsAvailabilityModule.aggregateRootType
)
}

override val toString: String = shardName + "AggregateRootType"
}
}
Expand All @@ -60,7 +63,7 @@ object RegistrationSagaModule extends SagaModuleCompanion { module =>

sealed trait Event extends EventLike {
override type ID = module.ID
override val sourceTypeName: Option[String] = Option( module.aggregateRootType.name )
// override val sourceTypeName: Option[String] = Option( module.aggregateRootType.name )
}

case class RegistrationProcessExpired( override val sourceId: RegistrationProcessExpired#TID ) extends Event
Expand Down Expand Up @@ -115,8 +118,13 @@ object RegistrationSagaModule extends SagaModuleCompanion { module =>
}

object RegistrationSaga {
def props( meta: AggregateRootType, orderType: AggregateRootType, availabilityType: AggregateRootType ): Props = {
Props( new RegistrationSaga( meta, orderType, availabilityType ) with EventPublisher )
def props(
meta: AggregateRootType,
model: DomainModel,
orderType: AggregateRootType,
availabilityType: AggregateRootType
): Props = {
Props( new RegistrationSaga( meta, model, orderType, availabilityType ) with EventPublisher )
}

//DMR: det where to locate this b/h; e.g., pull-req into nscala-time, peds?
Expand All @@ -125,6 +133,7 @@ object RegistrationSagaModule extends SagaModuleCompanion { module =>

class RegistrationSaga(
override val meta: AggregateRootType,
model: DomainModel,
orderType: AggregateRootType,
seatsAvailabilityType: AggregateRootType
) extends Saga[RegistrationSagaState] {
Expand All @@ -151,10 +160,10 @@ object RegistrationSagaModule extends SagaModuleCompanion { module =>
}


def order( id: Option[OrderModule.TID] ): AggregateRootRef = OrderModule aggregateOf id
def order( id: Option[OrderModule.TID] ): AggregateRootRef = OrderModule.aggregateOf( id )( model )

def seatsAvailability( id: Option[SeatsAvailabilityModule.TID] ): AggregateRootRef = {
SeatsAvailabilityModule aggregateOf id
SeatsAvailabilityModule.aggregateOf( id )( model )
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package contoso.conference.registration

import akka.actor.{ActorSystem, Props}
import akka.actor.Props
import akka.event.LoggingReceive
import contoso.conference.SeatType
import contoso.registration.{PersonalInfo, SeatQuantity}
Expand All @@ -27,10 +27,10 @@ object SeatAssignmentsModule extends AggregateRootModuleCompanion { module =>

override val aggregateIdTag: Symbol = 'seatsAssignment

override def aggregateRootType( implicit system: ActorSystem = this.system ): AggregateRootType = {
override val aggregateRootType: AggregateRootType = {
new AggregateRootType {
override val name: String = module.shardName
override def aggregateRootProps: Props = SeatAssignments.props( this )
override def aggregateRootProps( implicit model: DomainModel ): Props = SeatAssignments.props( this )
override val toString: String = shardName + "AggregateRootType"
}
}
Expand Down Expand Up @@ -72,7 +72,7 @@ object SeatAssignmentsModule extends AggregateRootModuleCompanion { module =>

sealed trait Event extends EventLike {
override type ID = module.ID
override val sourceTypeName: Option[String] = Option( module.aggregateRootType.name )
// override val sourceTypeName: Option[String] = Option( module.aggregateRootType.name )
}

// Registration.Contracts/Events/SeatAssigned.cs
Expand Down

0 comments on commit 439fc1d

Please sign in to comment.