Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

superpool leaks connections on graph failure #20902

Closed
stejskal opened this issue Jul 6, 2016 · 7 comments
Closed

superpool leaks connections on graph failure #20902

stejskal opened this issue Jul 6, 2016 · 7 comments
Milestone

Comments

@stejskal
Copy link

stejskal commented Jul 6, 2016

opened based on discussion in forum:
https://groups.google.com/forum/#!topic/akka-user/MJJADiJZOrY

Scala version: 2.11.8
akka version: 2.4.6

I have a graph that is leaking http connections from an Akka-Http Superpool when the graph fails. The general shape of the graph is:

A ~> B ~> C ~> D ~> E ~> F

The stage actions are:
A emits HttpRequests
B is the superpool (created by Http().superPool())
C does some validation on the HttpResponse but leaves the ResponseEntity alone
D uses flatMapConcat to emit the ResponseEntity ByteString Sources
E Collects the ResponseEntity ByteStrings into a single chunk
F further processes the ByteString

Which all works fine if the graph completes successfully. However if stage F fails and fails the graph then one connection is never released and eventually a materialization of the graph stalls forever waiting for stage B to process a request.

What I believe is happening is that when stage F fails, the ResponseEntity Source in stage C has not been sinked and so the connection it occupies is not released.

Is there a way to sink that ResponseEntity source on graph failure or alternatively, is there a better way to push a series of HttpRequests through a superPool that is aware of the entityBody and sinks on failure?

Here is a reproducible example of this problem.

import java.util.concurrent.Executors

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.util.ByteString

import scala.collection.immutable.Seq
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

object Example
{
  implicit val system = ActorSystem("example")
  implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())
  implicit val mat = ActorMaterializer()

  var requestCount = 0
  val pool = Http().superPool[NotUsed]()
  val requestSource = Source.repeat((HttpRequest(uri = "https://www.google.com/images/branding/googlelogo/2x/googlelogo_color_272x92dp.png"), NotUsed))
  val responseValidationFlow = Flow[(Try[HttpResponse], NotUsed)].collect
  {
    case (Success(response), _) =>
      if (response.status == StatusCodes.OK) Success(response.entity)
      else
      {
        response.entity.dataBytes.runWith(Sink.ignore)
        Failure(new Exception("non 200 status"))
      }
  }.collect
  {
    case Success(entity) => entity
  }

  val extractBodyFlowWithStrict = Flow[ResponseEntity].mapAsync(1)(entity => entity.toStrict(2 seconds).map(_.data))

  val failableFlow = Flow[ByteString].map
  {
    imageBytes =>
      requestCount += 1
      if (requestCount % 2 == 0) throw new Exception("i blew up")
      println(imageBytes)
      imageBytes
  }

  val sink: Sink[ByteString, Future[Seq[ByteString]]] = Sink.seq[ByteString]

  val runnableGraph = requestSource
    .via(pool)
    .via(responseValidationFlow)
    .via(extractBodyFlowWithStrict)
    .via(failableFlow)
    .toMat(sink)(Keep.right)

  def main(args: Array[String]): Unit =
  {
    materialize
  }

  def materialize: Unit =
  {
    println("starting graph")
    runnableGraph.run().recover
    {
      case t =>
        println(s"graph failed ${t.getMessage}")
        materialize
    }
  }
}

thanks,
Spencer

@patriknw
Copy link
Member

patriknw commented Jul 7, 2016

It's probably not related but I just want to mention that I fixed one leak in PoolInterfaceActor #20844 which will be released in 2.4.8

@patriknw patriknw added the t:http label Jul 7, 2016
@ktoso
Copy link
Member

ktoso commented Jul 7, 2016

Thanks for reporting, let's validate after 2.4.8

@ktoso ktoso added the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Jul 7, 2016
@ktoso
Copy link
Member

ktoso commented Jul 7, 2016

Could you in the meantime try on 2.4.7?

@ktoso ktoso added the bug label Jul 7, 2016
@ktoso ktoso modified the milestones: 2.4.8, 2.4.9 Jul 7, 2016
@stejskal
Copy link
Author

stejskal commented Jul 7, 2016

I tested on 2.4.7 and unfortunately the problem remains.

@ktoso
Copy link
Member

ktoso commented Jul 7, 2016

Thanks for confirming, hope to have time to look into it.
In the meantime any help from the community on debugging this would be nice.

@ktoso ktoso modified the milestones: 2.4.9-RC1, 2.4.9, 2.4.9-RC2 Aug 2, 2016
@johanandren johanandren modified the milestones: 2.4.9, 2.4.10 Aug 19, 2016
@derjust
Copy link

derjust commented Sep 1, 2016

Sound very like the issue I reported here: #21304
For the debugging: It looks like the DeathWatch of the TCP connection itself remains (sometimes) alive

@2m 2m modified the milestones: 2.4.10, 2.4.11 Sep 7, 2016
@ktoso ktoso removed the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Sep 8, 2016
@ktoso ktoso removed the 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted label Sep 8, 2016
@johanandren
Copy link
Member

Ticket moved to http://github.com/akka/akka-http

Rationale for the move discussed here: akka/akka-meta#27

If you are interested in contributing or tracking this issue, please comment in akka-http instead from now on :-)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants