Skip to content

Commit

Permalink
Optional actor names for processors, channels and journals
Browse files Browse the repository at this point in the history
  • Loading branch information
krasserm committed Nov 12, 2012
1 parent 6d71397 commit 6e1ce20
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,15 @@ class EventsourcingExtension(system: ActorSystem) extends Extension {
*
* @param props processor configuration object.
* @return a processor ref.
* @throws InvalidActorNameException if `props.name` is defined and already
* in use in the underlying actor system.
*
* @see [[org.eligosource.eventsourced.core.ProcessorProps]]
*/
def processorOf(props: ProcessorProps): ActorRef = {
registerProcessor(props.id, props.processor)
props.processor
def processorOf(props: ProcessorProps)(implicit actorRefFactory: ActorRefFactory): ActorRef = {
val processor = props.createProcessor()
registerProcessor(props.id, processor)
processor
}

/**
Expand All @@ -146,7 +149,12 @@ class EventsourcingExtension(system: ActorSystem) extends Extension {
* blocking.
*
* @param props actor ref configuration object.
* @param name optional processor name.
* @param actorRefFactory [[org.eligosource.eventsourced.core.Eventsourced]]
* ref factory.
* @return a processor ref.
* @throws InvalidActorNameException if `name` is defined and already in use
* in the underlying actor system.
*/
def processorOf(props: Props, name: Option[String] = None)(implicit actorRefFactory: ActorRefFactory): ActorRef = {
implicit val duration = 5 seconds
Expand All @@ -169,6 +177,8 @@ class EventsourcingExtension(system: ActorSystem) extends Extension {
* @param props channel configuration object.
* @param actorRefFactory [[org.eligosource.eventsourced.core.Channel]] ref factory.
* @return a channel ref.
* @throws InvalidActorNameException if `name` is defined and already in use
* in the underlying actor system.
*
* @see [[org.eligosource.eventsourced.core.DefaultChannelProps]]
* [[org.eligosource.eventsourced.core.ReliableChannelProps]]
Expand Down Expand Up @@ -278,23 +288,28 @@ object EventsourcingExtension extends ExtensionId[EventsourcingExtension] with E

/**
* [[org.eligosource.eventsourced.core.Eventsourced]] processor configuration object.
*
* @param id processor id.
* @param processor processor ref.
*/
case class ProcessorProps(id: Int, processor: ActorRef)
case class ProcessorProps(
/** Processor id. */
id: Int,
/** Processor factory. */
processorFactory: Int => Actor with Eventsourced,
/** Optional processor name. */
name: Option[String] = None) {

object ProcessorProps {
/**
* Creates a processor configuration object from a processor id and processor factory.
* Creates a processor with the settings defined by this configuration object.
*
* @param id processor id.
* @param processorFactory [[org.eligosource.eventsourced.core.Eventsourced]] processor factory.
* @param actorRefFactory [[org.eligosource.eventsourced.core.Eventsourced]] processor ref factory.
* @return processor configuration object containing the created processor ref.
* @param actorRefFactory [[org.eligosource.eventsourced.core.Eventsourced]]
* ref factory.
* @return a processor ref.
* @throws InvalidActorNameException if `name` is defined and already in use
* in the underlying actor system.
*/
def apply(id: Int, processorFactory: Int => Actor with Eventsourced)(implicit actorRefFactory: ActorRefFactory): ProcessorProps = {
new ProcessorProps(id, actorRefFactory.actorOf(Props(processorFactory(id))))
def createProcessor()(implicit actorRefFactory: ActorRefFactory): ActorRef = {
if (name.isDefined)
actorRefFactory.actorOf(Props(processorFactory(id)), name.get) else
actorRefFactory.actorOf(Props(processorFactory(id)))
}
}

Expand All @@ -315,6 +330,8 @@ trait ChannelProps {
* @param journal journal that is used by the channel.
* @param actorRefFactory [[org.eligosource.eventsourced.core.Channel]] ref factory.
* @return a channel ref.
* @throws InvalidActorNameException if `name` is defined and already in use
* in the underlying actor system.
*/
def createChannel(journal: ActorRef)(implicit actorRefFactory: ActorRefFactory): ActorRef
}
Expand All @@ -338,7 +355,9 @@ case class DefaultChannelProps(
* settings defined by this configuration object.
*/
def createChannel(journal: ActorRef)(implicit actorRefFactory: ActorRefFactory) = {
actorRefFactory.actorOf(Props(new DefaultChannel(id, journal, destination)))
if (name.isDefined)
actorRefFactory.actorOf(Props(new DefaultChannel(id, journal, destination)), name.get) else
actorRefFactory.actorOf(Props(new DefaultChannel(id, journal, destination)))
}
}

Expand Down Expand Up @@ -380,6 +399,8 @@ case class ReliableChannelProps(
* settings defined by this configuration object.
*/
def createChannel(journal: ActorRef)(implicit actorRefFactory: ActorRefFactory) = {
actorRefFactory.actorOf(Props(new ReliableChannel(id, journal, destination, policy)))
if (name.isDefined)
actorRefFactory.actorOf(Props(new ReliableChannel(id, journal, destination, policy)), name.get) else
actorRefFactory.actorOf(Props(new ReliableChannel(id, journal, destination, policy)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ private [eventsourced] class InmemJournal extends Actor {
* Creates an in-memory journal for testing purposes.
*/
object InmemJournal {
def apply()(implicit system: ActorSystem): ActorRef =
system.actorOf(Props(new InmemJournal))
def apply(name: Option[String] = None)(implicit system: ActorSystem): ActorRef =
if (name.isDefined)
system.actorOf(Props(new InmemJournal), name.get) else
system.actorOf(Props(new InmemJournal))
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,12 @@ object JournalioJournal {
* (with optional lower bound)
*
* @param dir journal directory
* @param name optional name of the journal actor in the underlying actor system.
* @throws InvalidActorNameException if `name` is defined and already in use
* in the underlying actor system.
*/
def apply(dir: File)(implicit system: ActorSystem): ActorRef =
system.actorOf(Props(new JournalioJournal(dir)))
def apply(dir: File, name: Option[String] = None)(implicit system: ActorSystem): ActorRef =
if (name.isDefined)
system.actorOf(Props(new JournalioJournal(dir)), name.get) else
system.actorOf(Props(new JournalioJournal(dir)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ object LeveldbJournal {
*
* - deletion of old entries requires full scan
*
* @param dir journal directory
* @param dir journal directory.
* @param name optional name of the journal actor in the underlying actor system.
* @throws InvalidActorNameException if `name` is defined and already in use
* in the underlying actor system.
*/
def processorStructured(dir: File)(implicit system: ActorSystem): ActorRef =
system.actorOf(Props(new LeveldbJournalPS(dir)))
def processorStructured(dir: File, name: Option[String] = None)(implicit system: ActorSystem): ActorRef =
if (name.isDefined)
system.actorOf(Props(new LeveldbJournalPS(dir)), name.get) else
system.actorOf(Props(new LeveldbJournalPS(dir)))

/**
* Creates a [[http://code.google.com/p/leveldb/ LevelDB]] based journal that
Expand All @@ -57,15 +62,24 @@ object LeveldbJournal {
* (with optional lower bound)
*
* @param dir journal directory
* @param name optional name of the journal actor in the underlying actor system.
* @throws InvalidActorNameException if `name` is defined and already in use
* in the underlying actor system.
*/
def sequenceStructured(dir: File)(implicit system: ActorSystem): ActorRef =
system.actorOf(Props(new LeveldbJournalSS(dir)))
def sequenceStructured(dir: File, name: Option[String] = None)(implicit system: ActorSystem): ActorRef =
if (name.isDefined)
system.actorOf(Props(new LeveldbJournalSS(dir)), name.get) else
system.actorOf(Props(new LeveldbJournalSS(dir)))

/**
* Creates a LevelDB based journal that organizes entries primarily based on processor id.
*
* @param dir journal directory
* @param name optional name of the journal actor in the underlying actor system.
* @throws InvalidActorNameException if `name` is defined and already in use
* in the underlying actor system.
* @see `LeveldbJournal.processorStructured`
*/
def apply(dir: File)(implicit system: ActorSystem) =
processorStructured(dir)
def apply(dir: File, name: Option[String] = None)(implicit system: ActorSystem) =
processorStructured(dir, name)
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ object AggregatorExample {

def configure(): ActorRef = {
val processor = extension.processorOf(Props(new Aggregator with Emitter with Confirm with Eventsourced { val id = 1 } ))
extension.channelOf(DefaultChannelProps(1, processor).withName("self"))
extension.channelOf(ReliableChannelProps(2, destination).withName("dest"))
extension.channelOf(DefaultChannelProps(1, processor))
extension.channelOf(ReliableChannelProps(2, destination))
processor
}
}
Expand All @@ -98,14 +98,14 @@ object AggregatorExample {
// count number of InputAggregated receivced
inputAggregatedCounter = inputAggregatedCounter + 1
// emit InputAggregated event to destination with sender message id containing the counted aggregations
emitter("dest") send { msg => msg.copy(senderMessageId = Some("aggregated-%d" format inputAggregatedCounter)) }
emitter(2) send { msg => msg.copy(senderMessageId = Some("aggregated-%d" format inputAggregatedCounter)) }
// reply to initial sender that message has been aggregated
sender ! "aggregated %d messages of %s".format(inputs.size, category)
}
case InputAvailable(category, input) => inputs = inputs.get(category) match {
case Some(List(i2, i1)) => {
// emit InputAggregated event to self when 3 events of same category exist
emitter("self") forwardEvent InputAggregated(category, List(i1, i2, input))
emitter(1) forwardEvent InputAggregated(category, List(i1, i2, input))
inputs - category
}
case Some(is) => inputs + (category -> (input :: is))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,12 @@ class NodeActor(selfAddress: Address) extends Actor {
var doorCreatedByMe = false

def receive = {
case state: CurrentClusterState => {
case state: CurrentClusterState =>
state.leader.foreach(setupDoor)
}
case LeaderChanged(Some(leaderAddress)) => {
case LeaderChanged(Some(leaderAddress)) =>
setupDoor(leaderAddress)
}
case cmd: String => {
case cmd: String =>
door foreach { _ forward Message(cmd) }
}
}

def setupDoor(leaderAddress: Address) {
Expand Down Expand Up @@ -102,7 +99,7 @@ object Node {

object Journal extends App {
implicit val system = ActorSystem("journal", ConfigFactory.load("journal"))
val journal = system.actorOf(Props(new JournalioJournal(new File("target/cluster"))), "journal")
val journal = JournalioJournal(new File("target/cluster"), Some("journal"))
val destination = system.actorOf(Props(new Destination with Receiver with Confirm), "destination")

class Destination extends Actor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object FsmExample {
val destination = system.actorOf(Props(new Destination(queue) with Receiver with Confirm))

def configure(): ActorRef = {
extension.channelOf(DefaultChannelProps(1, destination).withName("destination"))
extension.channelOf(DefaultChannelProps(1, destination))
extension.processorOf(Props(new Door with Emitter with Eventsourced { val id = 1 } ))
}
}
Expand Down Expand Up @@ -98,7 +98,7 @@ object FsmExample {
}
}

def emit(event: Any) = emitter("destination") forwardEvent event
def emit(event: Any) = emitter(1) forwardEvent event
}

class Destination(queue: java.util.Queue[Any]) extends Actor {
Expand Down

0 comments on commit 6e1ce20

Please sign in to comment.