Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FanoutPublisherSink leaks actors on upstream failure #25634

Closed
edwinchoi opened this issue Sep 18, 2018 · 1 comment
Closed

FanoutPublisherSink leaks actors on upstream failure #25634

edwinchoi opened this issue Sep 18, 2018 · 1 comment
Assignees
Labels
3 - in progress Someone is working on this ticket bug help wanted Issues that the core team will likely not have time to work on t:stream
Milestone

Comments

@edwinchoi
Copy link

The implementation for FanoutPublisherSink doesn't terminate the actor FanoutProcessorImpl on upstream failure, leaving any elements buffered in FanoutOutputs strongly reachable from the StreamSupervisor instance.

I've tested this against the latest v2.5.16.

At the bottom of FanoutputProcessorImpl.fail, a comment indicates that the actor will be stopped after flushing... but if you take a look at FanoutOutput.error you can see that the failure flow never calls afterShutdown, which is how the actor is terminated by normal completion.

Example to reproduce:

// needs to be in akka package for access to StreamSupervisor
package akka.stream

import java.util.concurrent.CountDownLatch

import akka.NotUsed
import akka.actor._
import akka.stream.scaladsl._
import akka.stream.impl.StreamSupervisor

private case object Go
class MyActor(latch: CountDownLatch, numRepeat: Int, sourceF: () => Source[String, NotUsed]) extends Actor {
  implicit lazy val mat = ActorMaterializer()

  private var count = 0

  override def preStart(): Unit = self ! Go
  override def postStop(): Unit = latch.countDown()
  override def receive: Receive = {
    case Go if count < numRepeat =>
      count += 1
      val (_, source) = sourceF().preMaterialize()
      source.runForeach(_ => ()).onComplete(_ => self ! Go)(context.dispatcher)
    case Go =>
      mat.supervisor ! StreamSupervisor.GetChildren
    case StreamSupervisor.Children(xs) =>
      // this should report 0
      println("size=" + xs.size)
      context.stop(self)
  }
}

object ActorLeakTest extends App {
  val system = ActorSystem()
  val bigString = scala.util.Random.nextString(1000 * 1000) * 50

  locally {
    println("ABORT")
    val latch = new CountDownLatch(1)
    system.actorOf(Props(new MyActor(latch, 1000,
      () =>
        // if you uncomment this line, then there will be 1000x 50MB strings on the heap at the end of the test
        //Source.single((bigString + "0").dropRight(1)) ++
        Source.failed(new RuntimeException()))))
    latch.await()
  }

  locally {
    println("SHUTDOWN")
    val latch = new CountDownLatch(1)
    system.actorOf(Props(new MyActor(latch, 1000, () => Source.single("hello"))))
    latch.await()
  }

  system.terminate()
}
@patriknw
Copy link
Member

Doesn't sound good. Would you be able to create a PR fixing the issue @edwinchoi ?

@patriknw patriknw added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted help wanted Issues that the core team will likely not have time to work on bug t:stream labels Sep 18, 2018
@johanandren johanandren self-assigned this Oct 15, 2018
@johanandren johanandren added 3 - in progress Someone is working on this ticket and removed 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted labels Oct 15, 2018
johanandren added a commit to johanandren/akka that referenced this issue Oct 15, 2018
johanandren added a commit to johanandren/akka that referenced this issue Oct 30, 2018
johanandren added a commit to johanandren/akka that referenced this issue Mar 20, 2019
@johanandren johanandren added this to the 2.5.22 milestone Mar 27, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
3 - in progress Someone is working on this ticket bug help wanted Issues that the core team will likely not have time to work on t:stream
Projects
None yet
Development

No branches or pull requests

3 participants