Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Akka Flow hangs when making multiple http requests via connection pool #57

Closed
akka-ci opened this issue Sep 8, 2016 · 23 comments
Closed
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:client Issues related to the HTTP Client
Milestone

Comments

@akka-ci
Copy link

akka-ci commented Sep 8, 2016

Issue by unoexperto
Thursday May 05, 2016 at 23:01 GMT
Originally opened as akka/akka#20460


I'm using Akka 2.4.4 and trying to move from Apache HttpAsyncClient (unsuccessfully).

Below is simplified version of code that I use in my project.

The problem is that it hangs if I send more than 1-3 requests to the flow. So far after 6 hours of debugging I couldn't even locate the problem. I don't see exceptions, error logs, events in Decider. NOTHING :)

I tried reducing connection-timeout setting to 1s thinking that maybe it's waiting for response from the server but it didn't help.

What am I doing wrong ?

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory

import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try

object Main {

  implicit val system = ActorSystem("root")
  implicit val executor = system.dispatcher
  val config = ConfigFactory.load()

  private val baseDomain = "www.google.com"
  private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))

  private val decider: Decider = {
    case ex =>
      ex.printStackTrace()
      Supervision.Stop
  }

  private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =

    Source.fromIterator(() => items.toIterator)
      .via(poolClientFlow)
      .log("Logger")(log = myAdapter)
      .recoverWith {
        case ex =>
          println(ex)
          null
      }
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .runWith(Sink.seq)
      .map { v =>
        println(s"Got ${v.length} responses in Flow")
        v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
      }

  def main(args: Array[String]) {

    val headers = imSeq(Referer("https://www.google.com/"))
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
    val requests = List.fill(10)(reqPair)
    val qwe = sendMultipleRequests(requests).map { case responses =>
      println(s"Got ${responses.length} responses")

      system.terminate()
    }

    Await.ready(system.whenTerminated, Duration.Inf)
  }
}

Also what's up with proxy support ? Doesn't seem to work for me either.

@akka-ci akka-ci added this to the 2.4.x milestone Sep 8, 2016
@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by ktoso
Thursday May 05, 2016 at 23:05 GMT


You MUST consume the response.entity.dataBytes – currently you've opened connections and are not reading from them, thus back-pressure kicks in and you don't read from the connections, and they're left to die from an idle timeouts within a few seconds.

There is no proxy support yet – akka/akka#16853

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by unoexperto
Thursday May 05, 2016 at 23:12 GMT


Sorry, I excluded part where I consume the response. Here is how it looks:

private def parseResponse[TR](response: Future[(Try[HttpResponse], RequestContext)], redirectCount: Int = 0)(implicit unmarshaller: FromEntityUnmarshaller[TR]): Future[TR] =
  response.flatMap { case (tryResp, reqContext) =>

    tryResp match {
      case Success(res) =>
        res.status match {
          case OK =>
            unmarshaller(res.entity).recoverWith {
              case ex =>
                Unmarshal(res.entity).to[String].flatMap { body =>
                  Future.failed(new IOException(s"Failed to unmarshal with ${ex.getMessage} and response body is\n $body"))
                }
            }
          case Found =>
            res.header[Location] match {
              case Some(value) =>
                if (redirectCount > 1)
                  Future.failed(throw new RuntimeException(s"Possible redirect loop? Redirect count is $redirectCount. Location is ${value.uri.toString()}"))
                else {
                  val newCookies = res.headers.filter(_.isInstanceOf[`Set-Cookie`]).map { v =>
                    val cookie = v.asInstanceOf[`Set-Cookie`].cookie
                    HttpCookiePair.apply(cookie.name, cookie.value)
                  }
                  parseResponse(createRequest(value.uri.toRelative, reqContext, imSeq(Cookie(newCookies))), redirectCount + 1)(unmarshaller)
                }

              case None =>
                Future.failed(new IOException(s"Got HTTP 302 response but Location header is missing"))
            }
          case _ =>
            Unmarshal(res.entity).to[String].flatMap { body =>
              Future.failed(new IOException(s"The response status is ${res.status} and response body is $body"))
            }
        }
      case Failure(ex) =>
        Future.failed(ex)
    }
  }

But it never gets called in Future[(Try[HttpResponse], T)].map{ }. I guess backpressure should kick in after number of responses exceeds request limit, right ?

And nothing dies due to timeout. I left code running for ~4 hours.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by unoexperto
Friday May 06, 2016 at 09:45 GMT


Any ideas what I'm doing incorrectly, Konrad ? Would you like me to create separate project with this code ?

Thanks!

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by ktoso
Friday May 06, 2016 at 09:46 GMT


I'll not into it, but not right now, please be patient for a while – have some other urgent things in my hands.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by ktoso
Friday May 06, 2016 at 09:47 GMT


A self-container reproducer example app would help a lot to get into this quicker, thanks

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by unoexperto
Friday May 06, 2016 at 11:21 GMT


Here we go
https://github.com/cppexpert/akka_flow_freezing
I added you to collaborators just in case.

I've changed code I little bit. Instead of using Sink.seq I switched to Source.queue. It seems more natural in terms of usage.

And this is output I get

[DEBUG] [05/06/2016 14:17:17.831] [main] [EventStream(akka://root)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/06/2016 14:17:17.832] [main] [EventStream(akka://root)] Default Loggers started
[DEBUG] [05/06/2016 14:17:18.105] [main] [AkkaSSLConfig(akka://root)] Initializing AkkaSSLConfig extension...
[DEBUG] [05/06/2016 14:17:18.107] [main] [AkkaSSLConfig(akka://root)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@1ac85b0c
[DEBUG] [05/06/2016 14:17:18.406] [root-akka.actor.default-dispatcher-5] [akka://root/user/pool-master/PoolInterfaceActor-0] (Re-)starting host connection pool to www.zzzz.com:80
Request enqueued 2
Request enqueued 3
Request enqueued 1
Request enqueued 4
Request enqueued 5
Request enqueued 6
Request enqueued 7
Request enqueued 8
Request enqueued 9
[DEBUG] [05/06/2016 14:17:18.653] [root-akka.actor.default-dispatcher-2] [akka://root/user/SlotProcessor-1] become unconnected, from subscriber pending
[DEBUG] [05/06/2016 14:17:18.653] [root-akka.actor.default-dispatcher-9] [akka://root/user/SlotProcessor-2] become unconnected, from subscriber pending
[DEBUG] [05/06/2016 14:17:18.653] [root-akka.actor.default-dispatcher-8] [akka://root/user/SlotProcessor-0] become unconnected, from subscriber pending
[DEBUG] [05/06/2016 14:17:18.653] [root-akka.actor.default-dispatcher-7] [akka://root/user/SlotProcessor-3] become unconnected, from subscriber pending
[DEBUG] [05/06/2016 14:17:18.708] [root-akka.actor.default-dispatcher-15] [akka://root/system/IO-TCP/selectors/$a/0] Attempting connection to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.712] [root-akka.actor.default-dispatcher-12] [akka://root/system/IO-TCP/selectors/$a/2] Attempting connection to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.713] [root-akka.actor.default-dispatcher-16] [akka://root/system/IO-TCP/selectors/$a/1] Attempting connection to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.713] [root-akka.actor.default-dispatcher-12] [akka://root/system/IO-TCP/selectors/$a/3] Attempting connection to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.758] [root-akka.actor.default-dispatcher-7] [akka://root/system/IO-TCP/selectors/$a/0] Connection established to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.758] [root-akka.actor.default-dispatcher-21] [akka://root/system/IO-TCP/selectors/$a/2] Connection established to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.762] [root-akka.actor.default-dispatcher-23] [akka://root/system/IO-TCP/selectors/$a/3] Connection established to [www.zzz.com/172.227.100.160:80]
[DEBUG] [05/06/2016 14:17:18.762] [root-akka.actor.default-dispatcher-25] [akka://root/system/IO-TCP/selectors/$a/1] Connection established to [www.zzz.com/172.227.100.160:80]
Response was received 2
Response was received 3
Response was received 1
Response was received 4

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by lolski
Sunday May 08, 2016 at 07:24 GMT


Let me have a quick look, maybe I'll be able to find something

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by lolski
Sunday May 08, 2016 at 10:53 GMT


@CPPExpert in your git example, you have not consumed response.entity.dataBytes. If you modified your function as below, it will work

private val poolClientFlow = initialize()
private val queue = Source.queue[(HttpRequest, (Any, Promise[(Try[HttpResponse], Any)]))](1000, OverflowStrategy.backpressure)
  .via(poolClientFlow)
  .toMat(Sink.foreach({
    case (triedResp, (value: Any, p: Promise[(Try[HttpResponse], Any)])) =>
      println(s"Response was received ${value.toString}")
      val x = triedResp.get.entity.dataBytes.toMat(Sink.seq)(Keep.right).run() // consume dataBytes, which is also a source
      x map { e => p.success(triedResp -> value); println(e) }
      p.success(triedResp -> value)
    case _ =>
      throw new RuntimeException()
  }))(Keep.left)
  .run

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by unoexperto
Sunday May 08, 2016 at 11:19 GMT


I've updated git with your code and also switched to HTTPS connection to avoid HTTP 302 response.

It's still not working although behavior has changed.

Could you please explain why you consume bytes in toMat if I'm supposed to consume data in parseResponse where response is unmarshalled using FromEntityUnmarshaller ? Doesn't it mean that other one or another will fail because there are no bytes in response stream ?

Here is output I get with new version of the code.

[DEBUG] [05/08/2016 14:11:18.555] [main] [EventStream(akka://root)] logger log1-Logging$DefaultLogger started
[DEBUG] [05/08/2016 14:11:18.556] [main] [EventStream(akka://root)] Default Loggers started
[DEBUG] [05/08/2016 14:11:18.755] [main] [AkkaSSLConfig(akka://root)] Initializing AkkaSSLConfig extension...
[DEBUG] [05/08/2016 14:11:18.757] [main] [AkkaSSLConfig(akka://root)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@548e76f1
[DEBUG] [05/08/2016 14:11:19.006] [root-akka.actor.default-dispatcher-4] [akka://root/user/pool-master/PoolInterfaceActor-0] (Re-)starting host connection pool to www.zerobin.net:443
Request enqueued 2
Request enqueued 1
Request enqueued 3
Request enqueued 5
Request enqueued 4
Request enqueued 6
Request enqueued 7
Request enqueued 8
Request enqueued 10
Request enqueued 9
[DEBUG] [05/08/2016 14:11:19.208] [root-akka.actor.default-dispatcher-10] [akka://root/user/SlotProcessor-0] become unconnected, from subscriber pending
[DEBUG] [05/08/2016 14:11:19.208] [root-akka.actor.default-dispatcher-15] [akka://root/user/SlotProcessor-2] become unconnected, from subscriber pending
[DEBUG] [05/08/2016 14:11:19.208] [root-akka.actor.default-dispatcher-9] [akka://root/user/SlotProcessor-1] become unconnected, from subscriber pending
[DEBUG] [05/08/2016 14:11:19.210] [root-akka.actor.default-dispatcher-3] [akka://root/user/SlotProcessor-3] become unconnected, from subscriber pending
[DEBUG] [05/08/2016 14:11:19.250] [root-akka.actor.default-dispatcher-12] [akka://root/system/IO-TCP/selectors/$a/1] Attempting connection to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.250] [root-akka.actor.default-dispatcher-7] [akka://root/system/IO-TCP/selectors/$a/0] Attempting connection to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.250] [root-akka.actor.default-dispatcher-13] [akka://root/system/IO-TCP/selectors/$a/3] Attempting connection to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.250] [root-akka.actor.default-dispatcher-4] [akka://root/system/IO-TCP/selectors/$a/2] Attempting connection to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.275] [root-akka.actor.default-dispatcher-7] [akka://root/system/IO-TCP/selectors/$a/2] Connection established to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.275] [root-akka.actor.default-dispatcher-12] [akka://root/system/IO-TCP/selectors/$a/0] Connection established to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.276] [root-akka.actor.default-dispatcher-4] [akka://root/system/IO-TCP/selectors/$a/1] Connection established to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.276] [root-akka.actor.default-dispatcher-24] [akka://root/system/IO-TCP/selectors/$a/3] Connection established to [www.zerobin.net/104.28.31.164:443]
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-20] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: hostname = www.zerobin.net, sessionId (base64) = QSsq+F7N9c5p/FQjecfSMSJUKmUK3PPvDjcXGF/2sVk=
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-11] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: hostname = www.zerobin.net, sessionId (base64) = ilTf02/PUDP4QzLfOkpHkeno0xNKd/YtzS+le9CqoPc=
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-3] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: hostname = www.zerobin.net, sessionId (base64) = g46Ogf5rF8JuhizuDg5jov4malxaFLlr0wTMVzRvHvc=
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-11] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: returning true
[DEBUG] [05/08/2016 14:11:19.406] [root-akka.actor.default-dispatcher-20] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: returning true
[DEBUG] [05/08/2016 14:11:19.407] [root-akka.actor.default-dispatcher-13] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: hostname = www.zerobin.net, sessionId (base64) = ZrfJp3AE3QytMET/WPb+XyRAPiEAPZPnaICVgEW+HbY=
[DEBUG] [05/08/2016 14:11:19.407] [root-akka.actor.default-dispatcher-13] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: returning true
[DEBUG] [05/08/2016 14:11:19.407] [root-akka.actor.default-dispatcher-3] [com.typesafe.sslconfig.ssl.DefaultHostnameVerifier] verify: returning true
[WARN] [05/08/2016 14:11:19.897] [root-akka.actor.default-dispatcher-13] [akka.actor.ActorSystemImpl(root)] Illegal response header: Illegal 'strict-transport-security' header: Invalid input 'p', expected WSP, CRLF or 'i' (line 1, column 19): max-age=15552000; preload
                  ^
[WARN] [05/08/2016 14:11:19.897] [root-akka.actor.default-dispatcher-24] [akka.actor.ActorSystemImpl(root)] Illegal response header: Illegal 'strict-transport-security' header: Invalid input 'p', expected WSP, CRLF or 'i' (line 1, column 19): max-age=15552000; preload
                  ^
[WARN] [05/08/2016 14:11:19.897] [root-akka.actor.default-dispatcher-15] [akka.actor.ActorSystemImpl(root)] Illegal response header: Illegal 'strict-transport-security' header: Invalid input 'p', expected WSP, CRLF or 'i' (line 1, column 19): max-age=15552000; preload
                  ^
Response was received 1
Response was received 3
Received 1 bytes for ID 1
Response was received 4
Received 1 bytes for ID 3
Received 1 bytes for ID 4
[WARN] [05/08/2016 14:11:19.948] [root-akka.actor.default-dispatcher-5] [akka.actor.ActorSystemImpl(root)] Illegal response header: Illegal 'strict-transport-security' header: Invalid input 'p', expected WSP, CRLF or 'i' (line 1, column 19): max-age=15552000; preload
                  ^
Response was received 2
Received 1 bytes for ID 2
Response was received 6
Received 1 bytes for ID 6
Response was received 5
Received 1 bytes for ID 5
Response was received 9
Response was received 10
Received 1 bytes for ID 9
Received 1 bytes for ID 10
Response was received 7
Response was received 8
Received 1 bytes for ID 7
Received 1 bytes for ID 8
Got 10 responses
Parsing items ID 1
Parsing items ID 2
Parsing items ID 3
Parsing items ID 4
Parsing items ID 5
Parsing items ID 6
Parsing items ID 7
Parsing items ID 8
Parsing items ID 9
Parsing items ID 10
[ERROR] [05/08/2016 14:11:20.467] [root-akka.actor.default-dispatcher-21] [akka://root/user/StreamSupervisor-0/flow-26-0-unknown-operation] Error in stage [akka.http.impl.util.StreamUtils$$anon$2@490bf2d9]: Promise already completed.
java.lang.IllegalStateException: Promise already completed.
    at scala.concurrent.Promise$class.complete(Promise.scala:55)
    at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)

There are several weird things here:

  1. It never reaches point of testCall2() where it prints out Got ${newsHtml.toString.length} characters in html for index $index. Thus my code doesn't parse the response.
  2. map in val queue = Source.queue prints that only 1 bytes is received. Does it mean somebody has already consumed the response ?
  3. I get Promise already completed exception for each request although I don't see p.success being called more than once.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by lolski
Sunday May 08, 2016 at 15:23 GMT


@CPPExpert you are right, I oversimplified the example and commented out the parsing part completely and hence why it worked for me. The workaround for the issue seems to be to convert the entity to its strict counterpart: val en = Await.result(res.entity.toStrict(10 seconds), 10 seconds).

Since I am not a collaborator, I have created a PR (https://github.com/cppexpert/akka_flow_freezing/pull/1) with the workaround to your repo. please let us know if it works. If it does, then we have to check why the unmarshaller does not work properly on a non-strict HttpResponse.

cc: @ktoso

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by unoexperto
Sunday May 08, 2016 at 15:45 GMT


Thank you for you help, @lolski. I'm afraid having blocking code is not a solution for me. Let's see what more "reactive" way maintainer will offer.

Another workaround I see is to change Source.queue definition to

  private type RequestParamType = (Try[HttpResponse] => Try[Any], Any, Promise[(Try[Any], Any)])
  private val queue = Source.queue[(HttpRequest, RequestParamType)](1000, OverflowStrategy.backpressure)
    .via(poolClientFlow)
    .toMat(Sink.foreach({
      case (triedResp, (parser : (Try[HttpResponse] => Try[Any]), value : Any, p : Promise[(Try[Any], Any)]) ) =>
        println(s"Response was received ${value.toString}")
        p.success(parser(triedResp) -> value)
      case _ =>
        throw new RuntimeException()
    }))(Keep.left)
    .run

but my god it's even uglier than what I have with Apache HttpClient. Passing promise AND non-typesafe converter for HttpResponse looks awful.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by lolski
Sunday May 08, 2016 at 16:19 GMT


@CPPExpert It doesn't mean we have to block using Await.result. That was just an example. toStrict returns a Future, so this will work too:

val enFut = res.entity.toStrict(10 seconds)
enFut map { ... }

@ktoso could this be a bug with the unmarshalling process? I can investigate deeper if you think it is

@ktoso ktoso added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted and removed t:http labels Sep 8, 2016
@ktoso ktoso removed this from the 2.4.x milestone Sep 12, 2016
@jrudolph jrudolph added t:client Issues related to the HTTP Client and removed t:client Issues related to the HTTP Client t:http:client labels Nov 2, 2016
@SercanKaraoglu
Copy link

Hello @akka-ci , What is the current situation for this issue, I am also having trouble while sending multiple requests
So Seems like I could not consume http entity response as well, Isn't this one consuming the databytes ?

entity.dataBytes.runFold(ByteString.empty)(_ ++ _).map(_.utf8String).foreach(println
)

As a side note toStrict didn't work as well

@jlprat
Copy link
Member

jlprat commented Feb 24, 2017

Hi Sercan,
FYI, @akka-ci is just a bot 😉
Regarding the issue, it's up to be picked up, if you think you can help, PRs are welcome!

@SercanKaraoglu
Copy link

@jlprat how can I quick look at the issue, do you already have any active topic or PR on github or google groups?

@jlprat
Copy link
Member

jlprat commented Feb 24, 2017

All known information for this issue is written down here.
As far as I can see, there are no PRs for this issue (they would appear in here).
If you want to collaborate with a PR, you are more than welcome, just head to the CONTRIBUTING file where all the information on how to contribute is explained.
If you have specific questions on how to tackle this PR there is a gitter room for it.

@SercanKaraoglu
Copy link

SercanKaraoglu commented Feb 24, 2017

Here my reproducer code if that helps:

    implicit val system = ActorSystem("http-client")
    implicit val actorMaterializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher
    val httpClient = Http(system).outgoingConnection(host = "someurl.com")

Source(List("/firstQuery","/secondQuery"))
      .map(queryString => HttpRequest(uri = Uri(queryString)))
      .via(httpClient)
      .runWith(Sink.head)
      .flatMap { _.entity.dataBytes.runFold(ByteString.empty)(_ ++ _) }
      .map(_.utf8String)
      .foreach {println}

@SercanKaraoglu
Copy link

By the way this one solves the multiple request problem, @jlprat can you please check it out if it is true way of doing multiple requests

    implicit val system = ActorSystem("definition-client")
    implicit val actorMaterializer = ActorMaterializer()
    implicit val executionContext = system.dispatcher
    val pipeline =
      Source.queue[(HttpRequest, String)](100, OverflowStrategy.backpressure)
        .via(Http().cachedHostConnectionPool("someurl.com"))
        .toMat(Sink.foreach { p =>
          println(p._2)
          p._1.get.entity.dataBytes.runFold(ByteString.empty)(_ ++ _).map(_.utf8String).foreach(println)
        })(Keep.left)
        .run()

    Source(List("firstQuery","secondQuery")).runForeach { reqUri =>
      pipeline.offer((HttpRequest(uri = Uri(reqUri)), reqUri))
    }

@jlprat
Copy link
Member

jlprat commented Feb 24, 2017 via email

@jrudolph
Copy link
Member

Hi @SercanKaraoglu, please use Http.singleRequest in the usual case to run multiple requests.

@jrudolph
Copy link
Member

Closing this issue. The recommendation is to use Http.singleRequest and to make sure to read the entity data. This usually fixes almost all of the problems. We will continue to improve the client connection pool to give better debug logging about what's going on and maybe introduce measures to "eject" requests / connections from the pool which are taking to long to make sure the pool will make progress eventually.

Feel free to reopen if there's still a problem with the latest version of Akka Http.

@jrudolph jrudolph added this to the invalid milestone Feb 27, 2017
@ymeymann
Copy link

Hmm, quick reaction to closing the issue: maybe I am missing something, but I thought that the point of having a connection pool is to optimize the use-case of multiple requests...? If the pool cannot be safely used for multiple requests, what are the use-cases for which it is recommended? We issue thousands of requests partitioned to about a dozen of different web-services. Performance testing initially indicated that pooling the connections was useful, so we were using it for all services, until problems started popping up, and we backed down to using singleRequest in the sites that were problematic.
What is really the root cause of the problem, is it slow processing of the returned entities? We already make sure that entities are consumed, but that does not solve the problem. I am asking, because I would like to figure out if wrapping the code with our own utility can solve the issue or is it really something inherent in the streaming, and we should just abandon the use of pooled connections altogether? Thanks in advance.

@jrudolph
Copy link
Member

@ymeymann I'll try to answer your questions one by one:

I thought that the point of having a connection pool is to optimize the use-case of multiple requests...?

Yes, definitely, and as you have seen it helps a lot because TCP connection setup and getting into steady state is a process that takes several seconds (depending on RTT).

If the pool cannot be safely used for multiple requests, what are the use-cases for which it is recommended?

Can you explain what you mean with that? As far as we know there are no bugs in the pool anymore that would leak connections over time or make it slow. However, if you don't consume response entities in all cases, some connections might be stuck until the timeout kicks in.

The difference to other client tools is that akka-http will only open a limited number of connections to a host at all times (which is good practice) while other clients often just open another connection when all others are busy. This somewhat spreads the risk that problems on a single connection will have a severe impact on the whole pool but on the other hand is bad for performance and can overload servers.

So, what we currently require you to do is to consume the response entity or otherwise the performance of the pool might be bad (because some connections might be stuck until some timeout kicks in). We are thinking about improving the situation for the common error situations in #906.

until problems started popping up

What kind of problems?

we backed down to using singleRequest in the sites that were problematic.

What did you use before? singleRequest is using the same pool as cacheHostConnectionPool or superPool.

What is really the root cause of the problem, is it slow processing of the returned entities?

Which problem exactly? If there are problems we surely want to know about them ;)

Btw. it's perfectly ok to keep commenting on these closed tickets. I decided to close a few bugs for which it is hard to say if these are indeed bugs or something else or for which we cannot be sure if they are outdated or not. If it turns out that this ticket is one that many people are subscribed to, we could also keep it open to update everyone on potential solutions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:client Issues related to the HTTP Client
Projects
None yet
Development

No branches or pull requests

6 participants