Skip to content

Commit

Permalink
refactored publish package to be functional. added Chain of Responsib…
Browse files Browse the repository at this point in the history
…ility pattern implementation in commons.util.Chain
  • Loading branch information
dmrolfs committed Oct 15, 2014
1 parent 7d8de68 commit 9afdbb3
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 76 deletions.
16 changes: 0 additions & 16 deletions akka/src/main/scala/peds/akka/publish/EventPublisher.scala

This file was deleted.

23 changes: 0 additions & 23 deletions akka/src/main/scala/peds/akka/publish/LocalPublisher.scala

This file was deleted.

40 changes: 24 additions & 16 deletions akka/src/main/scala/peds/akka/publish/ReliablePublisher.scala
Original file line number Diff line number Diff line change
@@ -1,13 +1,35 @@
package peds.akka.publish

import akka.actor.{ ActorLogging, ActorPath }
import akka.actor.{ ActorContext, ActorLogging, ActorPath }
import akka.persistence.{ AtLeastOnceDelivery, PersistentActor }
import akka.persistence.AtLeastOnceDelivery.{ UnconfirmedDelivery, UnconfirmedWarning }
import peds.akka.envelope._


object ReliablePublisher {
sealed trait Protocol
case class ReliableMessage( deliveryId: Long, message: Envelope ) extends Protocol
case class Confirm( deliveryId: Long ) extends Protocol

case class RedeliveryFailedException( message: Any ) extends RuntimeException
}

trait ReliablePublisher extends EventPublisher { outer: PersistentActor with AtLeastOnceDelivery with Enveloping =>
def destination: ActorPath
import ReliablePublisher._
import peds.commons.util.Chain._

def reliablePublisher( destination: ActorPath )( implicit context: ActorContext ): Publisher = {
( event: Envelope ) => {
deliver(
destination,
deliveryId => {
log info s"ReliablePublisher.publish.DELIVER: deliveryId=${deliveryId}; dest=${destination}; event=${event}"
ReliableMessage( deliveryId, event )
}
)
Left( event )
}
}

//DMR don't want to override default b/h here. instead, concrete class can
// override val redeliverInterval: FiniteDuration = 30.seconds
Expand All @@ -18,20 +40,6 @@ trait ReliablePublisher extends EventPublisher { outer: PersistentActor with AtL
// val listener = context.actorOf( RedeliverFailureListener.props )
// }

override def publish(
event: Any
)(
implicit workId: WorkId = WorkId(),
messageNumber: MessageNumber = MessageNumber( -1 )
): Unit = {
deliver(
destination,
deliveryId => {
log info s"ReliablePublisher.publish.DELIVER: deliveryId=${deliveryId}; dest=${destination}; event=${event}"
ReliableMessage( deliveryId, event )
}
)
}

override def around( r: Receive ): Receive = {
case Confirm( deliveryId ) => trace.block( "ReliablePublisher.around(Confirm)" ) {
Expand Down
4 changes: 2 additions & 2 deletions akka/src/main/scala/peds/akka/publish/ReliableReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import peds.akka.ActorStack

trait ReliableReceiver extends ActorStack { outer: ActorLogging =>
override def around( r: Receive ): Receive = LoggingReceive {
case ReliableMessage( deliveryId, message ) => {
case ReliablePublisher.ReliableMessage( deliveryId, message ) => {
log info s"ReliableReceiver.ReliableMessage(deliveryId=${deliveryId}, message=${message})"
super.around( r )( message )
sender() ! Confirm( deliveryId ) //DMR: send confirmation after hanlding receive? or before?
sender() ! ReliablePublisher.Confirm( deliveryId ) //DMR: send confirmation after hanlding receive? or before?
}

case msg => super.around( r )( msg )
Expand Down
14 changes: 0 additions & 14 deletions akka/src/main/scala/peds/akka/publish/SilentPublisher.scala

This file was deleted.

31 changes: 26 additions & 5 deletions akka/src/main/scala/peds/akka/publish/package.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,35 @@
package peds.akka

import scala.util.Try
import akka.actor.{ ActorContext, ActorLogging }
import com.typesafe.scalalogging.LazyLogging
import shapeless.syntax.typeable._
import peds.commons.log.Trace
import peds.akka.envelope.Envelope
import peds.commons.util.Chain


package object publish {
package object publish extends LazyLogging {

sealed trait ReliableProtocol
case class ReliableMessage( deliveryId: Long, message: Envelope ) extends ReliableProtocol
case class Confirm( deliveryId: Long ) extends ReliableProtocol
private[this] val trace = Trace( "peds.akka.publish", logger )

type Publisher = Chain.Link[Envelope, Unit]

case class RedeliveryFailedException( message: Any ) extends RuntimeException

trait EventPublisher extends ActorStack with ActorLogging {
def publish: Publisher = local
}


def local( implicit context: ActorContext ): Publisher = ( event: Envelope ) => trace.block( s"publish.local($event)" ) {
val target = context.system.eventStream
event.cast[target.Event] foreach { e =>
logger info s"local stream publishing event:${e} on target:${target}"
//DMR: somehow need to update envelope per EnvelopeSending.update
target publish e
}
Left( event )
}

val silent: Publisher = ( event: Envelope ) => trace.block( s"publish.silent($event)" ) { Left( event ) }
}
25 changes: 25 additions & 0 deletions commons/src/main/scala/peds/commons/util/Chain.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package peds.commons.util


object Chain {
type Link[A, B] = A => Either[A, B]
implicit def func2Chain[A, B]( f: Link[A, B] ): Chain[A, B] = new Chain[A, B]( f )
}


/**
* Implements a functional version of the Chain of Responsibility pattern. The Chain of Responsibility pattern is a GoF design
* pattern consisting of a source of command objects and a series of processing objects. Each processing object contains logic
* that defines the types of command objects that it can handle; the rest are passed to the next processing object in the chain.
*
* In this Scala version, processing steps are defined as functions, which the Chain links together via either the chainWith() or
* +> operations. The return value is defined as an Either[A, B]:
* - Left[A] to direct processing by the next handler
* - Right[B] to direct that processing must stop with the preceeding result.
*/
class Chain[A, B]( f: Chain.Link[A, B] ) {

def chainWith( next: => Chain.Link[A, B] ): Chain.Link[A, B] = f( _ ).left.flatMap( next ) // call next handler if not yet processed

def +>( next: => Chain.Link[A, B] ): Chain.Link[A, B] = chainWith( next )
}

0 comments on commit 9afdbb3

Please sign in to comment.