Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exception in Akka 2.5.22: Stream supervisor must be a local actor #26714

Closed
davidcorcoran opened this issue Apr 12, 2019 · 1 comment

Comments

Projects
None yet
3 participants
@davidcorcoran
Copy link

commented Apr 12, 2019

Hi. I'm trying to upgrade from Akka 2.5.19 to 2.5.22 and I'm hitting an intermitted failure in this snipped of my code:

    implicit val mat = ActorMaterializer()
    val source: Source[Event, ActorRef] = Source.actorRef[Event](10000, OverflowStrategy.fail)
    val (actorRef: ActorRef, publisher: Publisher[Event]) =
      source.toMat(Sink.asPublisher(false))(Keep.both).run()

The exception is thrown on the .run() call. It happens about 1 in every 10 calls and is this:

java.lang.IllegalStateException: Stream supervisor must be a local actor, was [akka.actor.RepointableActorRef]
	at akka.stream.stage.GraphStageLogic$StageActor.cell(GraphStage.scala:221)
	at akka.stream.stage.GraphStageLogic$StageActor.<init>(GraphStage.scala:225)
	at akka.stream.stage.GraphStageLogic.getEagerStageActor(GraphStage.scala:1235)
	at akka.stream.impl.ActorRefSource$$anon$1.<init>(ActorRefSource.scala:56)
	at akka.stream.impl.ActorRefSource.createLogicAndMaterializedValue(ActorRefSource.scala:41)
	at akka.stream.impl.GraphStageIsland.materializeAtomic(PhasedFusingActorMaterializer.scala:674)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:491)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:450)
	at akka.stream.impl.PhasedFusingActorMaterializer.materialize(PhasedFusingActorMaterializer.scala:443)
	at akka.stream.scaladsl.RunnableGraph.run(Flow.scala:620)

I think it might be a timing issue because if I change the code to this (horrible hack):

    implicit val mat = ActorMaterializer()
    val source: Source[Event, ActorRef] = Source.actorRef[Event](10000, OverflowStrategy.fail)
//    val (actorRef: ActorRef, publisher: Publisher[Event]) =
//      source.toMat(Sink.asPublisher(false))(Keep.both).run()

    var actorRef: ActorRef = null
    var publisher: Publisher[Event] = null
    while (actorRef == null) {
      try {
        val (a, b) = source.toMat(Sink.asPublisher(false))(Keep.both).run()
        actorRef = a
        publisher = b
      } catch {
        case e: IllegalStateException =>
          println(s"!!!!!!! $e")
      }
    }

Again it will work most of the time without hitting the exception, but about 1 in 10 times it will print out the exception, possibly a few times, and then continue on fine.

As a guess, I think the code which is throwing finally hits this case: case ref: RepointableActorRef if ref.isStarted, where ref.isStarted is true, and then continues.

In the failing case unknown (where the exception is thrown), unknown as you can see is a RepointableActorRef and unknown.underlying is an UnstartedCell.

I've tried to create a standalone program but can't get it to fail. So I guess maybe it only happens under load or some other circumstance I'm not replicating in my standalone case.

I've never seen this issue in Akka 2.5.19.

Thanks,

David

@patriknw

This comment has been minimized.

Copy link
Member

commented Apr 14, 2019

Thanks for reporting. It's a regression in Source.actorRef because of the new implementation in 2.5.22 (old implementation was using deprecated features). We'll fix for next patch release. It's a race condition that is triggered when the materializer isn't fully started. An ugly workaround is to add a delay after starting the materializer

implicit val mat = ActorMaterializer()
Thread.sleep(100)

@patriknw patriknw self-assigned this Apr 14, 2019

patriknw added a commit that referenced this issue Apr 14, 2019

Fix startup race condition in ActorRefSource, #26714
* fallback to sending message if materializer.supervisor RepointableActorRef
  is not started
* not nice to use Await, but should be rare and that is also used in
  ActorMaterializer.actorOf for similar thing
* also harden test of CompletionStrategy.Immediately, which
  failed if Thread.sleep(100) in RepointableActorRef.point

@patriknw patriknw added this to the 2.5.23 milestone Apr 15, 2019

johanandren added a commit that referenced this issue Apr 15, 2019

Fix startup race condition in ActorRefSource, #26714
* fallback to sending message if materializer.supervisor RepointableActorRef
  is not started
* not nice to use Await, but should be rare and that is also used in
  ActorMaterializer.actorOf for similar thing
* also harden test of CompletionStrategy.Immediately, which
  failed if Thread.sleep(100) in RepointableActorRef.point
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.