Skip to content

Commit

Permalink
Use stop supervision for PropsAdapter, #28035
Browse files Browse the repository at this point in the history
* scaladsl.PropsAdapter and javadsl.Adapter.props should use stop supervision,
  otherwise it will restart (from classic) by default
* PropsAdapter is used by Cluster Singleton and Sharding
* also typed.ActorSystem.systemActorOf
* removed default param value in internal.PropsAdapter to make the
  decision of rethrowTypedFailure more explicit
 * rethrowTypedFailure=true is used for the child faild propagation to
   parent and only makes sense when the parent is Typed
  • Loading branch information
patriknw committed Nov 18, 2019
1 parent 472dac4 commit b64eb0b
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,26 @@ class ClassicSupervisingTypedSpec extends WordSpecLike with LogCapturing with Be
probe.expectNoMessage()
expectTerminated(underTest.toClassic)
}

"default to stop for supervision of systemActorOf" in {
val probe = TestProbe()
val underTest = classicSystem.toTyped.systemActorOf(ProbedBehavior.behavior(probe.ref), "s1")
watch(underTest.toClassic)
underTest ! "throw"
probe.expectMsg(PostStop)
probe.expectNoMessage()
expectTerminated(underTest.toClassic)
}

"default to stop for PropsAdapter" in {
val probe = TestProbe()
val underTest = classicSystem.actorOf(PropsAdapter(ProbedBehavior.behavior(probe.ref)))
watch(underTest)
underTest ! "throw"
probe.expectMsg(PostStop)
probe.expectNoMessage()
expectTerminated(underTest)
}
}

override protected def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class PropsAdapterSpec extends WordSpec with Matchers {
"PropsAdapter" should {
"default to akka.dispatch.SingleConsumerOnlyUnboundedMailbox" in {
val props: Props = Props.empty
val pa: actor.Props = PropsAdapter(() => Behaviors.empty, props)
val pa: actor.Props = PropsAdapter(() => Behaviors.empty, props, rethrowTypedFailure = false)
pa.mailbox shouldEqual "akka.actor.typed.default-mailbox"
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# #28035 default to stop supervision when spawning via PropsAdapter

ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.adapter.PropsAdapter.apply$default$2")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.adapter.PropsAdapter.apply$default$3")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.adapter.PropsAdapter.apply$default$3")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.typed.internal.adapter.PropsAdapter.apply$default$2")
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,8 @@ object ActorSystem {
appConfig,
cl,
executionContext,
Some(PropsAdapter[Any](() => GuardianStartupBehavior(guardianBehavior), guardianProps)),
Some(
PropsAdapter[Any](() => GuardianStartupBehavior(guardianBehavior), guardianProps, rethrowTypedFailure = false)),
setup)
system.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import akka.actor.typed.Dispatchers
import akka.actor.typed.Props
import akka.actor.typed.Scheduler
import akka.actor.typed.Settings
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.internal.ActorRefImpl
import akka.actor.typed.internal.ExtensionsImpl
import akka.actor.typed.internal.InternalRecipientRef
import akka.actor.typed.internal.PropsImpl.DispatcherDefault
import akka.actor.typed.internal.PropsImpl.DispatcherFromConfig
import akka.actor.typed.internal.PropsImpl.DispatcherSameAsParent
import akka.actor.typed.internal.SystemMessage
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.InternalApi
import akka.{ actor => classic }
import org.slf4j.{ Logger, LoggerFactory }
Expand Down Expand Up @@ -107,7 +109,12 @@ import org.slf4j.{ Logger, LoggerFactory }
FutureConverters.toJava(whenTerminated)

override def systemActorOf[U](behavior: Behavior[U], name: String, props: Props): ActorRef[U] = {
val ref = system.systemActorOf(PropsAdapter(() => behavior, props), name)
val ref = system.systemActorOf(
PropsAdapter(
() => Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop),
props,
rethrowTypedFailure = false),
name)
ActorRefAdapter(ref)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,16 @@ import akka.dispatch.Mailboxes
* INTERNAL API
*/
@InternalApi private[akka] object PropsAdapter {
def apply[T](
behavior: () => Behavior[T],
deploy: Props = Props.empty,
rethrowTypedFailure: Boolean = true): akka.actor.Props = {
val props = akka.actor.Props(new ActorAdapter(behavior(), rethrowTypedFailure))

val dispatcherProps = (deploy.firstOrElse[DispatcherSelector](DispatcherDefault.empty) match {
case _: DispatcherDefault => props
case DispatcherFromConfig(name, _) => props.withDispatcher(name)
case _: DispatcherSameAsParent => props.withDispatcher(Deploy.DispatcherSameAsParent)
def apply[T](behavior: () => Behavior[T], props: Props, rethrowTypedFailure: Boolean): akka.actor.Props = {
val classicProps = akka.actor.Props(new ActorAdapter(behavior(), rethrowTypedFailure))

val dispatcherProps = (props.firstOrElse[DispatcherSelector](DispatcherDefault.empty) match {
case _: DispatcherDefault => classicProps
case DispatcherFromConfig(name, _) => classicProps.withDispatcher(name)
case _: DispatcherSameAsParent => classicProps.withDispatcher(Deploy.DispatcherSameAsParent)
}).withDeploy(Deploy.local) // disallow remote deployment for typed actors

val mailboxProps = deploy.firstOrElse[MailboxSelector](MailboxSelector.default()) match {
val mailboxProps = props.firstOrElse[MailboxSelector](MailboxSelector.default()) match {
case _: DefaultMailboxSelector => dispatcherProps
case BoundedMailboxSelector(capacity, _) =>
// specific support in classic Mailboxes
Expand All @@ -41,7 +38,7 @@ import akka.dispatch.Mailboxes

val localDeploy = mailboxProps.withDeploy(Deploy.local) // disallow remote deployment for typed actors

val tags = deploy.firstOrElse[ActorTags](ActorTagsImpl.empty).tags
val tags = props.firstOrElse[ActorTags](ActorTagsImpl.empty).tags
if (tags.isEmpty) localDeploy
else localDeploy.withActorTags(tags)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.ActorSystem
import akka.actor.typed.Scheduler
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.internal.adapter.ActorContextAdapter
import akka.japi.Creator

Expand Down Expand Up @@ -143,7 +144,10 @@ object Adapter {
* example of that.
*/
def props[T](behavior: Creator[Behavior[T]], deploy: Props): akka.actor.Props =
akka.actor.typed.internal.adapter.PropsAdapter(() => behavior.create(), deploy)
akka.actor.typed.internal.adapter.PropsAdapter(
() => Behaviors.supervise(behavior.create()).onFailure(SupervisorStrategy.stop),
deploy,
rethrowTypedFailure = false)

/**
* Wrap [[akka.actor.typed.Behavior]] in a classic [[akka.actor.Props]], i.e. when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package akka.actor.typed.scaladsl.adapter

import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.SupervisorStrategy
import akka.actor.typed.scaladsl.Behaviors

/**
* Wrap [[akka.actor.typed.Behavior]] in a classic [[akka.actor.Props]], i.e. when
Expand All @@ -18,5 +20,8 @@ import akka.actor.typed.Props
*/
object PropsAdapter {
def apply[T](behavior: => Behavior[T], deploy: Props = Props.empty): akka.actor.Props =
akka.actor.typed.internal.adapter.PropsAdapter(() => behavior, deploy)
akka.actor.typed.internal.adapter.PropsAdapter(
() => Behaviors.supervise(behavior).onFailure(SupervisorStrategy.stop),
deploy,
rethrowTypedFailure = false)
}

0 comments on commit b64eb0b

Please sign in to comment.