Skip to content

Commit

Permalink
Let typed Stateful optionally handle signals also, #22293
Browse files Browse the repository at this point in the history
* and thereby no need for MessageOrSignal
  • Loading branch information
patriknw committed Mar 16, 2017
1 parent b2b4f64 commit 06d95ce
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 95 deletions.
6 changes: 3 additions & 3 deletions akka-docs/rst/scala/code/docs/akka/typed/IntroSpec.scala
Expand Up @@ -114,7 +114,8 @@ class IntroSpec extends TypedSpec {

//#chatroom-main
val main: Behavior[akka.NotUsed] =
SignalOrMessage(
Stateful(
behavior = (_, _) Unhandled,
signal = { (ctx, sig)
sig match {
case PreStart
Expand All @@ -128,8 +129,7 @@ class IntroSpec extends TypedSpec {
case _
Unhandled
}
},
mesg = (_, _) Unhandled)
})

val system = ActorSystem("ChatRoomDemo", main)
Await.result(system.whenTerminated, 1.second)
Expand Down
2 changes: 1 addition & 1 deletion akka-docs/rst/scala/typed.rst
Expand Up @@ -248,7 +248,7 @@ Actor will perform its job on its own accord, we do not need to send messages
from the outside, so we declare it to be of type ``NotUsed``. Actors receive not
only external messages, they also are notified of certain system events,
so-called Signals. In order to get access to those we choose to implement this
particular one using the :class:`SignalOrMessage` behavior decorator. The
particular one using the :class:`Stateful` behavior decorator. The
provided ``signal`` function will be invoked for signals (subclasses of :class:`Signal`)
or the ``mesg`` function for user messages.

Expand Down
30 changes: 16 additions & 14 deletions akka-typed/src/main/java/akka/typed/javadsl/Actor.java
Expand Up @@ -52,11 +52,11 @@ public Behavior<T> message(akka.typed.ActorContext<T> ctx, T msg) throws Excepti
}

private static class Stateful<T> extends Behavior<T> {
final Function2<ActorContext<T>, Signal, Behavior<T>> signal;
final Function2<ActorContext<T>, T, Behavior<T>> message;
final Function2<ActorContext<T>, Signal, Behavior<T>> signal;

public Stateful(Function2<ActorContext<T>, Signal, Behavior<T>> signal,
Function2<ActorContext<T>, T, Behavior<T>> message) {
public Stateful(Function2<ActorContext<T>, T, Behavior<T>> message,
Function2<ActorContext<T>, Signal, Behavior<T>> signal) {
this.signal = signal;
this.message = message;
}
Expand Down Expand Up @@ -164,23 +164,21 @@ private static <T> Procedure2<ActorContext<T>, Signal> doNothing() {
* {@link ActorContext} that allows access to the system, spawning and watching
* other actors, etc.
*
* In either case—signal or message—the next behavior must be returned. If no
* change is desired, use {@link #same}.
* This constructor is called stateful because processing the next message
* results in a new behavior that can potentially be different from this one.
* If no change is desired, use {@link #same}.
*
* @param signal
* the function that describes how this actor reacts to the given
* signal
* @param message
* the function that describes how this actor reacts to the next
* message
* @return the behavior
*/
static public <T> Behavior<T> signalOrMessage(Function2<ActorContext<T>, Signal, Behavior<T>> signal,
Function2<ActorContext<T>, T, Behavior<T>> message) {
return new Stateful<T>(signal, message);
static public <T> Behavior<T> stateful(Function2<ActorContext<T>, T, Behavior<T>> message) {
return new Stateful<T>(message, unhandledFun());
}

/**
*
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
* guardian of an {@link akka.typed.ActorSystem}) it will be executed within an
Expand All @@ -194,10 +192,14 @@ static public <T> Behavior<T> signalOrMessage(Function2<ActorContext<T>, Signal,
* @param message
* the function that describes how this actor reacts to the next
* message
* @param signal
* the function that describes how this actor reacts to the given
* signal
* @return the behavior
*/
static public <T> Behavior<T> stateful(Function2<ActorContext<T>, T, Behavior<T>> message) {
return new Stateful<T>(unhandledFun(), message);
static public <T> Behavior<T> stateful(Function2<ActorContext<T>, T, Behavior<T>> message,
Function2<ActorContext<T>, Signal, Behavior<T>> signal) {
return new Stateful<T>(message, signal);
}

/**
Expand Down
8 changes: 8 additions & 0 deletions akka-typed/src/main/scala/akka/typed/Behavior.scala
Expand Up @@ -3,6 +3,8 @@
*/
package akka.typed

import akka.annotation.InternalApi

/**
* The behavior of an actor defines how it reacts to the messages that it
* receives. The message may either be of the type that the Actor declares
Expand Down Expand Up @@ -93,6 +95,12 @@ object Behavior {
override def toString = "Unhandled"
}

/**
* INTERNAL API
*/
@InternalApi private[akka] val unhandledSignal: (ActorContext[Nothing], Signal) Behavior[Nothing] =
(_, _) unhandledBehavior

/**
* INTERNAL API.
*/
Expand Down
25 changes: 5 additions & 20 deletions akka-typed/src/main/scala/akka/typed/scaladsl/Actor.scala
Expand Up @@ -262,24 +262,6 @@ object Actor {
*/
def Ignore[T]: Behavior[T] = ignoreBehavior.asInstanceOf[Behavior[T]]

/**
* Construct an actor behavior that can react both to lifecycle signals and
* incoming messages. After spawning this actor from another actor (or as the
* guardian of an [[akka.typed.ActorSystem]]) it will be executed within an
* [[ActorContext]] that allows access to the system, spawning and watching
* other actors, etc.
*
* In either case—signal or message—the next behavior must be returned. If no
* change is desired, use `Actor.same()`.
*/
final case class SignalOrMessage[T](
signal: (ActorContext[T], Signal) Behavior[T],
mesg: (ActorContext[T], T) Behavior[T]) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = signal(ctx, msg)
override def message(ctx: AC[T], msg: T): Behavior[T] = mesg(ctx, msg)
override def toString = s"SignalOrMessage(${LineNumbers(signal)},${LineNumbers(mesg)})"
}

/**
* Construct an actor behavior that can react to incoming messages but not to
* lifecycle signals. After spawning this actor from another actor (or as the
Expand All @@ -290,8 +272,11 @@ object Actor {
* This constructor is called stateful because processing the next message
* results in a new behavior that can potentially be different from this one.
*/
final case class Stateful[T](behavior: (ActorContext[T], T) Behavior[T]) extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = Unhandled
final case class Stateful[T](
behavior: (ActorContext[T], T) Behavior[T],
signal: (ActorContext[T], Signal) Behavior[T] = Behavior.unhandledSignal.asInstanceOf[(ActorContext[T], Signal) Behavior[T]])
extends Behavior[T] {
override def management(ctx: AC[T], msg: Signal): Behavior[T] = signal(ctx, msg)
override def message(ctx: AC[T], msg: T) = behavior(ctx, msg)
override def toString = s"Stateful(${LineNumbers(behavior)})"
}
Expand Down
Expand Up @@ -26,7 +26,7 @@ public MyMsgB(String greeting) {
}
}

Behavior<MyMsg> actor1 = signalOrMessage((ctx, signal) -> same(), (ctx, msg) -> stopped());
Behavior<MyMsg> actor1 = stateful((ctx, msg) -> stopped(), (ctx, signal) -> same());
Behavior<MyMsg> actor2 = stateful((ctx, msg) -> unhandled());
Behavior<MyMsg> actor3 = stateless((ctx, msg) -> {});
Behavior<MyMsg> actor4 = empty();
Expand Down
16 changes: 7 additions & 9 deletions akka-typed/src/test/scala/akka/typed/ActorContextSpec.scala
Expand Up @@ -5,7 +5,7 @@ import scala.concurrent.Future
import com.typesafe.config.ConfigFactory
import akka.actor.DeadLetterSuppression
import akka.typed.scaladsl.Actor
import akka.typed.scaladsl.Actor.SignalOrMessage
import akka.typed.scaladsl.Actor.Stateful

object ActorContextSpec {

Expand Down Expand Up @@ -74,8 +74,7 @@ object ActorContextSpec {
final case class Adapter(a: ActorRef[Command]) extends Event

def subject(monitor: ActorRef[Monitor]): Behavior[Command] =
Actor.SignalOrMessage(
(ctx, signal) { monitor ! GotSignal(signal); Actor.Same },
Actor.Stateful(
(ctx, message) message match {
case ReceiveTimeout
monitor ! GotReceiveTimeout
Expand Down Expand Up @@ -142,20 +141,19 @@ object ActorContextSpec {
}
case BecomeCareless(replyTo)
replyTo ! BecameCareless
Actor.SignalOrMessage(
Actor.Stateful(
(ctx, message) Actor.Unhandled,
(ctx, signal) signal match {
case Terminated(_) Actor.Unhandled
case sig
monitor ! GotSignal(sig)
Actor.Same
},
(ctx, message) Actor.Unhandled
)
})
case GetAdapter(replyTo, name)
replyTo ! Adapter(ctx.spawnAdapter(identity, name))
Actor.Same
}
)
},
(ctx, signal) { monitor ! GotSignal(signal); Actor.Same })

def oldSubject(monitor: ActorRef[Monitor]): Behavior[Command] = {
import ScalaDSL._
Expand Down
96 changes: 49 additions & 47 deletions akka-typed/src/test/scala/akka/typed/BehaviorSpec.scala
Expand Up @@ -538,37 +538,39 @@ class BehaviorSpec extends TypedSpec {
object `A Static Behavior (native)` extends StaticBehavior with NativeSystem
object `A Static Behavior (adapted)` extends StaticBehavior with AdaptedSystem

trait SignalOrMessageScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
trait StatefulWithSignalScalaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
SActor.SignalOrMessage((ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
}, (ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Stateful(
(ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.self)
SActor.Same
case Miss
monitor ! Missed
SActor.Unhandled
case Ignore
monitor ! Ignored
SActor.Same
case Ping
monitor ! Pong
behv(monitor, state)
case Swap
monitor ! Swapped
behv(monitor, state.next)
case GetState()
monitor ! state
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
},
(ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
})
})
}
object `A SignalOrMessage Behavior (scala,native)` extends SignalOrMessageScalaBehavior with NativeSystem
object `A SignalOrMessage Behavior (scala,adapted)` extends SignalOrMessageScalaBehavior with AdaptedSystem
object `A StatefulWithSignal Behavior (scala,native)` extends StatefulWithSignalScalaBehavior with NativeSystem
object `A StatefulWithSignal Behavior (scala,adapted)` extends StatefulWithSignalScalaBehavior with AdaptedSystem

trait StatefulScalaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
Expand Down Expand Up @@ -619,7 +621,7 @@ class BehaviorSpec extends TypedSpec {
object `A Stateless Behavior (scala,native)` extends StatelessScalaBehavior with NativeSystem
object `A Stateless Behavior (scala,adapted)` extends StatelessScalaBehavior with AdaptedSystem

trait WidenedScalaBehavior extends SignalOrMessageScalaBehavior with Reuse with Siphon {
trait WidenedScalaBehavior extends StatefulWithSignalScalaBehavior with Reuse with Siphon {
import SActor.BehaviorDecorators

override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
Expand All @@ -630,7 +632,7 @@ class BehaviorSpec extends TypedSpec {
object `A widened Behavior (scala,native)` extends WidenedScalaBehavior with NativeSystem
object `A widened Behavior (scala,adapted)` extends WidenedScalaBehavior with AdaptedSystem

trait DeferredScalaBehavior extends SignalOrMessageScalaBehavior {
trait DeferredScalaBehavior extends StatefulWithSignalScalaBehavior {
override type Aux = Inbox[PreStart]

override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
Expand All @@ -650,20 +652,19 @@ class BehaviorSpec extends TypedSpec {
object `A deferred Behavior (scala,native)` extends DeferredScalaBehavior with NativeSystem
object `A deferred Behavior (scala,adapted)` extends DeferredScalaBehavior with AdaptedSystem

trait TapScalaBehavior extends SignalOrMessageScalaBehavior with Reuse with SignalSiphon {
trait TapScalaBehavior extends StatefulWithSignalScalaBehavior with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(SActor.Tap(
(_, sig) inbox.ref ! Left(sig),
(_, msg) inbox.ref ! Right(msg),
super.behavior(monitor)._1
), inbox)
super.behavior(monitor)._1), inbox)
}
}
object `A tap Behavior (scala,native)` extends TapScalaBehavior with NativeSystem
object `A tap Behavior (scala,adapted)` extends TapScalaBehavior with AdaptedSystem

trait RestarterScalaBehavior extends SignalOrMessageScalaBehavior with Reuse {
trait RestarterScalaBehavior extends StatefulWithSignalScalaBehavior with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
SActor.Restarter[Exception]().wrap(super.behavior(monitor)._1) null
}
Expand Down Expand Up @@ -703,13 +704,11 @@ class BehaviorSpec extends TypedSpec {
override def apply(in: JActorContext[Command]) = f(in)
}

trait SignalOrMessageJavaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
trait StatefulWithSignalJavaBehavior extends Messages with BecomeWithLifecycle with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor) null
def behv(monitor: ActorRef[Event], state: State = StateA): Behavior[Command] =
JActor.signalOrMessage(fs((ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
}), fc((ctx, msg) msg match {
JActor.stateful(
fc((ctx, msg) msg match {
case GetSelf
monitor ! Self(ctx.getSelf)
SActor.Same
Expand All @@ -730,10 +729,14 @@ class BehaviorSpec extends TypedSpec {
SActor.Same
case Stop SActor.Stopped
case _: AuxPing SActor.Unhandled
}))
}),
fs((ctx, sig) {
monitor ! GotSignal(sig)
SActor.Same
}))
}
object `A SignalOrMessage Behavior (java,native)` extends SignalOrMessageJavaBehavior with NativeSystem
object `A SignalOrMessage Behavior (java,adapted)` extends SignalOrMessageJavaBehavior with AdaptedSystem
object `A StatefulWithSignal Behavior (java,native)` extends StatefulWithSignalJavaBehavior with NativeSystem
object `A StatefulWithSignal Behavior (java,adapted)` extends StatefulWithSignalJavaBehavior with AdaptedSystem

trait StatefulJavaBehavior extends Messages with Become with Stoppable {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = behv(monitor, StateA) null
Expand Down Expand Up @@ -786,7 +789,7 @@ class BehaviorSpec extends TypedSpec {
object `A Stateless Behavior (java,native)` extends StatelessJavaBehavior with NativeSystem
object `A Stateless Behavior (java,adapted)` extends StatelessJavaBehavior with AdaptedSystem

trait WidenedJavaBehavior extends SignalOrMessageJavaBehavior with Reuse with Siphon {
trait WidenedJavaBehavior extends StatefulWithSignalJavaBehavior with Reuse with Siphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Command]("widenedListener")
JActor.widened(super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x { inbox.ref ! x; x })))) inbox
Expand All @@ -795,7 +798,7 @@ class BehaviorSpec extends TypedSpec {
object `A widened Behavior (java,native)` extends WidenedJavaBehavior with NativeSystem
object `A widened Behavior (java,adapted)` extends WidenedJavaBehavior with AdaptedSystem

trait DeferredJavaBehavior extends SignalOrMessageJavaBehavior {
trait DeferredJavaBehavior extends StatefulWithSignalJavaBehavior {
override type Aux = Inbox[PreStart]

override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
Expand All @@ -815,20 +818,19 @@ class BehaviorSpec extends TypedSpec {
object `A deferred Behavior (java,native)` extends DeferredJavaBehavior with NativeSystem
object `A deferred Behavior (java,adapted)` extends DeferredJavaBehavior with AdaptedSystem

trait TapJavaBehavior extends SignalOrMessageJavaBehavior with Reuse with SignalSiphon {
trait TapJavaBehavior extends StatefulWithSignalJavaBehavior with Reuse with SignalSiphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = Inbox[Either[Signal, Command]]("tapListener")
(JActor.tap(
ps((_, sig) inbox.ref ! Left(sig)),
pc((_, msg) inbox.ref ! Right(msg)),
super.behavior(monitor)._1
), inbox)
super.behavior(monitor)._1), inbox)
}
}
object `A tap Behavior (java,native)` extends TapJavaBehavior with NativeSystem
object `A tap Behavior (java,adapted)` extends TapJavaBehavior with AdaptedSystem

trait RestarterJavaBehavior extends SignalOrMessageJavaBehavior with Reuse {
trait RestarterJavaBehavior extends StatefulWithSignalJavaBehavior with Reuse {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
JActor.restarter(classOf[Exception], JActor.OnFailure.RESTART, super.behavior(monitor)._1) null
}
Expand Down

0 comments on commit 06d95ce

Please sign in to comment.