Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

An actor restarts when an unhandled exception exists in typed `ClusterSingleton` and `ClusterSharding`. #28035

Closed
taeguk opened this issue Oct 18, 2019 · 6 comments
Assignees
Labels
bug
Milestone

Comments

@taeguk
Copy link
Contributor

@taeguk taeguk commented Oct 18, 2019

In akka typed, an actor should be stopped as default when an unhandled exception exists. But, it is restarted in my experiment. Even when using SupervisorStrategy.stop explicitly, the actor is restarted.
But, this problem only occurs in typed ClusterSingleton and ClusterSharding. In case of normal actor creating by ActorContext::spawn(), an actor stops normally. And in case of akka untyped, everything is okay.

My environment is:

  • akka version : 2.5.25
  • akka management version : 1.0.3

This is my experiment code:

import akka.actor.typed._
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.typed._
import akka.management.scaladsl.AkkaManagement
import akka.management.cluster.bootstrap.ClusterBootstrap

object AkkaTypedTest {
  object Guardian {
    def testBehavior: Behavior[String] = Behaviors.setup[String] { context =>
      println("@@@@ started!!!")

      Behaviors.receiveMessage[String] { str =>
        println(s"@@@@ Received: $str")
        throw new Exception(s"**** Test exception: $str")
      }.receiveSignal {
        case (_, PreRestart) =>
          println("@@@@ PreRestart received")
          Behaviors.same

        case (_, PostStop) =>
          println("@@@@ PostStop received")
          Behaviors.same
      }
    }

    def behavior: Behavior[Unit] = Behaviors.setup { context =>
      val testActor = context.spawn(
        Behaviors.supervise(testBehavior).onFailure(SupervisorStrategy.stop),
        "test-behavior"
      )

      // This code stops the actor (OKAY)
      testActor ! "hello"

      Behaviors.receiveMessage { _ => Behavior.same }
    }
  }

  def testBehavior: Behavior[String] = Behaviors.setup[String] { context =>
    println("**** started!!!")

    Behaviors.receiveMessage[String] { str =>
      println(s"**** Received: $str")
      throw new Exception(s"**** Test exception: $str")
    }.receiveSignal {
      case (_, PreRestart) =>
        println("**** PreRestart received")
        Behaviors.same

      case (_, PostStop) =>
        println("**** PostStop received")
        Behaviors.same
    }
  }

  def main(args: Array[String]): Unit = {
    val system = ActorSystem(Guardian.behavior, name = "actor-system")

    AkkaManagement(system.toUntyped).start()
    ClusterBootstrap(system.toUntyped).start()

    val singleton = ClusterSingleton(system)

    val testActor = singleton.init(
      SingletonActor(
        Behaviors.supervise(testBehavior).onFailure(SupervisorStrategy.stop),
        name = "test-behavior"
      )
    )

    // This code restarts the actor (BUG?!)
    testActor ! "hello"

  }
}

import akka.actor.ActorSystem
import akka.actor.{ Actor, SupervisorStrategy }
import akka.actor.{ PoisonPill, Props }
import akka.cluster.singleton.{ ClusterSingletonManager, ClusterSingletonManagerSettings, ClusterSingletonProxy, ClusterSingletonProxySettings }

object AkkaUnTypedTest {
  class SupervisorActor(childProps: Props, override val supervisorStrategy: SupervisorStrategy) extends Actor {
    val child = context.actorOf(childProps, "supervised-child")

    def receive = {
      case msg => child.forward(msg)
    }
  }

  class TestActor extends Actor {
    override def preStart(): Unit = println("**** preStart")
    override def postStop(): Unit = println("**** postStop")

    override def receive: Receive = {
      case str: String =>
        println(s"**** Received: $str")
        throw new Exception(s"**** Test exception: $str")

      case _ =>
        println("Unknown message")
    }
  }

  def main(args: Array[String]): Unit = {
    val system = ActorSystem("actor-system")

    AkkaManagement(system).start()
    ClusterBootstrap(system).start()

    system.actorOf(
      ClusterSingletonManager.props(
        singletonProps = Props(classOf[SupervisorActor], Props[TestActor], SupervisorStrategy.stoppingStrategy),
        terminationMessage = PoisonPill.getInstance,
        settings = ClusterSingletonManagerSettings(system)
      ),
      name = "testActor"
    )

    val testActorProxy = system.actorOf(
      ClusterSingletonProxy.props(
        singletonManagerPath = "/user/testActor",
        settings = ClusterSingletonProxySettings(system)),
      name = "testActorProxy"
    )

    // This code stops the actor (OKAY)
    testActorProxy ! "hello"
  }
}

This is the picture about the problem.
You can see **** PreRestart received.
image

@patriknw

This comment has been minimized.

Copy link
Member

@patriknw patriknw commented Nov 4, 2019

@taeguk Thanks for reporting, and sorry that the notification of this fell between the cracks for a while.

I have tried with latest (2.6.0-RC2) and there it is working as expected:

[info] @@@@ started!!!
[info] @@@@ Received: hello
[info] 2019-11-04 11:56:57,341 ERROR akka.actor.typed.Behavior$ akka://actor-system/user/test-behavior  - Supervisor StopSupervisor saw failure: **** Test exception: hello
[info] java.lang.Exception: **** Test exception: hello
[info] 	at issue.AkkaTypedTest$Guardian$.$anonfun$testBehavior$2(AkkaTypedTest.scala:21)
[info] aat akka.actor.typed.internal.BehaviorImpl$ReceiveMessageBehavior.receive(BehaviorImpl.scala:150)
[info] 	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
[info] 	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
[info] 	at akka.actor.typed.internal.InterceptorImpl$$anon$2.apply(InterceptorImpl.scala:55)
[info] 	at akka.actor.typed.internal.SimpleSupervisor.aroundReceive(Supervision.scala:123)
[info] 	at akka.actor.typed.internal.InterceptorImpl.receive(InterceptorImpl.scala:83)
[info] 	at akka.actor.typed.Behavior$.interpret(Behavior.scala:274)
[info] 	at akka.actor.typed.Behavior$.interpretMessage(Behavior.scala:230)
[info] 	at akka.actor.typed.internal.adapter.ActorAdapter.handleMessage(ActorAdapter.scala:126)
[info] 	at akka.actor.typed.internal.adapter.ActorAdapter.aroundReceive(ActorAdapter.scala:106)
[info] 	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:573)
[info] 	at akka.actor.ActorCell.invoke(ActorCell.scala:543)
[info] 	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:269)
[info] 	at akka.dispatch.Mailbox.run(Mailbox.scala:230)
[info] 	at akka.dispatch.Mailbox.exec(Mailbox.scala:242)
[info] 	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
[info] 	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
[info] 	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
[info] 	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
[info] 	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
[info] @@@@ PostStop received
@patriknw patriknw closed this Nov 4, 2019
@taeguk

This comment has been minimized.

Copy link
Contributor Author

@taeguk taeguk commented Nov 7, 2019

@patriknw Excuse me, I tested with the lastest (akka: 2.6.0, akka-management: 1.0.4). But it still doesn't work as expected.
The logs you put are about normal case. In previous akka version, it works well in normal case.
The problem occurs in typed ClusterSingleton and ClusterSharding. Please focus logs with prefix ****, not @@@@.

@patriknw

This comment has been minimized.

Copy link
Member

@patriknw patriknw commented Nov 7, 2019

Ok, I'll take another look

@patriknw patriknw reopened this Nov 7, 2019
@patriknw

This comment has been minimized.

Copy link
Member

@patriknw patriknw commented Nov 7, 2019

It's indeed something wrong here. Thanks for reporting and pointing out the problem, @taeguk

Expected behavior is documented as: When combining classic and typed actors the default supervision is based on the default behavior of the child, for example if a classic actor creates a typed child, its default supervision will be to stop. If a typed actor creates a classic child, its default supervision will be to restart.

ClusterSingleton is maybe implemented as a Classic parent starting a Typed child, but that is not what we see here. The rethrowTypedFailure is set to true in the ActorAdapter for this case. Have to write some tests and look into the details.

Minimized reproducer:

object AkkaTypedTest {

  def testBehavior: Behavior[String] = Behaviors.setup[String] { _ =>
    println("**** started!!!")

    Behaviors
      .receiveMessage[String] { str =>
        println(s"**** Received: $str")
        throw new Exception(s"**** Test exception: $str")
      }
      .receiveSignal {
        case (_, PreRestart) =>
          println("**** PreRestart received")
          Behaviors.same

        case (_, PostStop) =>
          println("**** PostStop received")
          Behaviors.same
      }
  }

  def main(args: Array[String]): Unit = {
    val system =
      ActorSystem(Behaviors.empty, name = "actor-system", ConfigFactory.parseString("akka.actor.provider = cluster"))

    Cluster(system).manager ! Join(Cluster(system).selfMember.address)

    val singleton = ClusterSingleton(system)

    val testActor = singleton.init(
      SingletonActor(Behaviors.supervise(testBehavior).onFailure(SupervisorStrategy.stop), name = "test-behavior"))

    // This code restarts the actor (BUG?!)
    testActor ! "hello"
  }
}
@patriknw patriknw self-assigned this Nov 18, 2019
patriknw added a commit that referenced this issue Nov 18, 2019
* scaladsl.PropsAdapter and javadsl.Adapter.props should use stop supervision,
  otherwise it will restart (from classic) by default
* PropsAdapter is used by Cluster Singleton and Sharding
* also typed.ActorSystem.systemActorOf
* removed default param value in internal.PropsAdapter to make the
  decision of rethrowTypedFailure more explicit
 * rethrowTypedFailure=true is used for the child faild propagation to
   parent and only makes sense when the parent is Typed
patriknw added a commit that referenced this issue Nov 25, 2019
* scaladsl.PropsAdapter and javadsl.Adapter.props should use stop supervision,
  otherwise it will restart (from classic) by default
* PropsAdapter is used by Cluster Singleton and Sharding
* also typed.ActorSystem.systemActorOf
* removed default param value in internal.PropsAdapter to make the
  decision of rethrowTypedFailure more explicit
 * rethrowTypedFailure=true is used for the child faild propagation to
   parent and only makes sense when the parent is Typed
@patriknw patriknw added this to the 2.6.1 milestone Nov 25, 2019
@patriknw patriknw closed this Nov 25, 2019
@taeguk

This comment has been minimized.

Copy link
Contributor Author

@taeguk taeguk commented Nov 26, 2019

@patriknw Thanks for your works for resolving this issue. I have a question about your PR. Does the PR enforce EventSourcedBehavior must be supervised with SupervisorStrategy.stop? In other words, can't I change it to another supervisor instead of SupervisorStrategy.stop?

@patriknw

This comment has been minimized.

Copy link
Member

@patriknw patriknw commented Nov 26, 2019

@taeguk I think your question is answered in https://doc.akka.io/docs/akka/current/typed/persistence.html

Did you see anything in my PR that changed that part? Let me know in that case because that is not intended to change.

navaro1 added a commit to navaro1/akka that referenced this issue Dec 17, 2019
* scaladsl.PropsAdapter and javadsl.Adapter.props should use stop supervision,
  otherwise it will restart (from classic) by default
* PropsAdapter is used by Cluster Singleton and Sharding
* also typed.ActorSystem.systemActorOf
* removed default param value in internal.PropsAdapter to make the
  decision of rethrowTypedFailure more explicit
 * rethrowTypedFailure=true is used for the child faild propagation to
   parent and only makes sense when the parent is Typed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.