-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Thread loops infinitely consuming CPU with non-zero akka.http.host-connection-pool.min-connections
setting
#1391
Comments
Thanks for filing the issue, @Discipe. I guess one solution would be to define and an enforce a minimum amount of time after a failled connection attempts that originated from a non-zero Actual incoming requests should still trigger reconnection attempts. |
Also reported by customer. |
Using the new client pool, I guess? |
The problem here is that we resolve the host name "only incidentally", i.e. we don't really control the resolution of the hostname in akka-http but just pass the unresolved hostname/address to the streams and akka-io TCP layers. I see two potential solutions:
Imo we should go with the second option, because we'll have to set up a custom state in the pool anyway to model the timespan during which the hostname is resolved. Also, this prepares us to implement the support for actively monitoring DNS entries to reconnect persistent connections in slots when DNS entries change over time (#1226). |
Yes, latest version. Here is my reproducer: package akka.http.scaladsl.server
import akka.actor._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse, Uri }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.{ ActorMaterializer, OverflowStrategy }
import com.typesafe.config.{ Config, ConfigFactory }
import scala.concurrent.Future
import scala.io.StdIn
import scala.util.{ Failure, Success, Try }
object ConnectionTestApp {
val testConf: Config = ConfigFactory.parseString("""
akka.loglevel = debug
akka.log-dead-letters = off
akka {
http {
host-connection-pool {
max-connections = 64
min-connections = 32
}
}
}
""")
implicit val system = ActorSystem("ConnectionTest", testConf)
import system.dispatcher
implicit val materializer = ActorMaterializer()
val clientFlow = Http().superPool[Int]()
val sourceActor = {
// Our superPool expects (HttpRequest, Int) as input
val source = Source.actorRef[(HttpRequest, Int)](10000, OverflowStrategy.dropNew).buffer(20000, OverflowStrategy.fail)
val sink = Sink.foreach[(Try[HttpResponse], Int)] {
case (resp, id) ⇒ handleResponse(resp, id)
}
source.via(clientFlow).to(sink).run()
}
def sendPoolFlow(uri: Uri, id: Int): Unit = {
sourceActor ! ((buildRequest(uri), id))
}
def sendPoolFuture(uri: Uri, id: Int): Unit = {
val responseFuture: Future[HttpResponse] =
Http().singleRequest(buildRequest(uri))
responseFuture.onComplete(r ⇒ handleResponse(r, id))
}
def sendSingle(uri: Uri, id: Int): Unit = {
val connectionFlow: Flow[HttpRequest, HttpResponse, Future[Http.OutgoingConnection]] =
Http().outgoingConnection(uri.authority.host.address, uri.effectivePort)
val responseFuture: Future[HttpResponse] =
Source.single(buildRequest(uri))
.via(connectionFlow)
.runWith(Sink.head)
responseFuture.onComplete(r ⇒ handleResponse(r, id))
}
private def buildRequest(uri: Uri): HttpRequest =
HttpRequest(uri = uri)
private def handleResponse(httpResp: Try[HttpResponse], id: Int): Unit = {
httpResp match {
case Success(httpRes) ⇒
println(s"$id: OK (${httpRes.status.intValue})")
httpRes.entity.dataBytes.runWith(Sink.ignore)
case Failure(ex) ⇒
println(s"$id: $ex")
}
}
def main(args: Array[String]): Unit = {
for (i ← 1 to 40) {
val u = s"http://abcdef:6666/test/$i"
println("u =>" + u)
sendPoolFlow(Uri(u), i)
//sendPoolFuture(uri, i)
//sendSingle(Uri(u), i)
}
StdIn.readLine()
println("===================== \n\n" + system.asInstanceOf[ActorSystemImpl].printTree + "\n\n========================")
StdIn.readLine()
system.terminate()
}
}
|
We already have a todo in akka-http/akka-http-core/src/main/scala/akka/http/impl/engine/client/pool/SlotState.scala Lines 122 to 124 in eb7150a
|
I created #2208 which is slightly related. |
This issue created as followup for 2-months-old thread in google groups: https://groups.google.com/forum/#!topic/akka-user/062Op2fyD-E
Sorry for absent of minimal example, only text description here.
How to reproduce:
akka.http.host-connection-pool.min-connections
set to non-zero valueAs a result - one of threads\actors starts to consume all available CPU by processing some inner-state actor messages.
The text was updated successfully, but these errors were encountered: