Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NoSuchElementException from cachedHostConnectionPool #103

Closed
akka-ci opened this issue Sep 8, 2016 · 7 comments
Closed

NoSuchElementException from cachedHostConnectionPool #103

akka-ci opened this issue Sep 8, 2016 · 7 comments
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:client Issues related to the HTTP Client
Milestone

Comments

@akka-ci
Copy link

akka-ci commented Sep 8, 2016

Issue by ktoso
Tuesday Jun 23, 2015 at 16:42 GMT
Originally opened as akka/akka#17816


Promoting a question from akka-user here as a ticket as I'm not sure this is what we expect to happen...

Quoting Chris Baxter:

For a single request, I can handle this by adding takeWithin to my stream processing like so:

  val poolClientFlow = Http().cachedHostConnectionPool[Int]("localhost", 9200)
  val responseFuture: Future[(Try[HttpResponse], Int)] =
    Source.single(HttpRequest(uri = "/foo/bar/16") -> 42)
      .via(poolClientFlow)
      .takeWithin(5 seconds)
      .runWith(Sink.head)

In this case, the Future would be failed with a NoSuchElementException which I suppose I can interpret to mean the timeout occurred. Is there a better way to do this? What happens to the underlying connection from the pool in this case? If it was really and truly hung (as opposed to just being a little late with the response) I would want it closed. Is this kind of stuff possible?

// cc @sirthias @jrudolph

@akka-ci akka-ci added this to the http-backlog milestone Sep 8, 2016
@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by sirthias
Tuesday Jun 23, 2015 at 19:11 GMT


Using takeWithin as a substitute for the missing "native" request timeouts is a nice idea and works "on the outside" (i.e. with regard to the user-facing stream). For the low-level Http().outgoingConnection(...)-level API it also works "on the inside", i.e. it terminates the connection upon a timeout, as expected.

However, for the high-level APIs that are based on connection-pools it doesn't work "on the inside" as you expect it to, because the stream cancellation that is triggered on the user-level response stream and propagates upwards doesn't actually reach the connections in the pool. This is intentional since the connections in the pool are resources that are shared among several pool client streams. Just because you are not interested in the response anymore doesn't mean that someone else who's request might have been scheduled on the same connection (if pipelining is enabled) doesn't expect a response either.
Currently the outside (user-level) streams are completed decoupled from the connection pool internals.
Basically, by using the higher-level (pool-based) APIs the user streams attach and detach to and from pools. All that the cancellation triggered by the takeWithin does is detach. The connection will not be torn down. For this to work we will need a timeout feature as requested by #17346.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by cmbaxter
Thursday Oct 15, 2015 at 15:11 GMT


@sirthias, I wanted to test things out using the same approach I outlined in the code sample but instead of using a pool, using a single connection via Http().outboundConnection. I set up a dummy service to always wait 10 seconds before responding so that my takeWithin case would always result in a failure. I turned on trace logging at the io.tcp layer and then ran my test. I was expecting to see debug output indicating that the connection was closed immediately after the takeWithin fails (5 seconds), but I did not see this. I only saw it after 10 seconds when the service being called actually responds. Is this just missing logging or was the connection still open? I want the connection to be closed when the timeout is hit so how do I make sure that happens?

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by sirthias
Tuesday Oct 27, 2015 at 09:48 GMT


@cmbaxter I see. The way I see it is that cancellation of the response (reading) stream might not currently propagate to the request (writing) stream on the client-side of akka-http, because the underlying TCP-layer has no concept for "stop reading" and therefore simply ignores this cancellation rather than propagate it. After all on the TCP-level you might still want to write even after having announced that you won't read anymore. For an HTTP client, however, this doesn't make sense and we should actually propagate cancellation of the response stream over to the request upstream.
I'll open a new ticket for this.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by sirthias
Tuesday Oct 27, 2015 at 09:57 GMT


Actually, I just checked and we have a test for the this type of cancellation propagation from the response to the request side: https://github.com/akka/akka/blob/release-2.3-dev/akka-http-core/src/test/scala/akka/http/impl/engine/client/ClientCancellationSpec.scala

So, @cmbaxter, would you be able to supply us with a small snippet of code (e.g. a gist) demonstrating the problem?

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by cmbaxter
Tuesday Oct 27, 2015 at 13:25 GMT


@sirthias, I can't say for sure that there is something wrong. I just expected to see in the trace logging that the connection was closed as soon as the timeout condition was hit and I don't. Here is the sample client side code:

  val sysCfg = ConfigFactory.parseString("""
    akka.io.tcp.trace-logging = on
    akka.loglevel = DEBUG
  """)

  implicit val system = ActorSystem("test", sysCfg)
  val log = akka.event.Logging(system, "TimeoutTestHarness")
  implicit val mater = ActorMaterializer(ActorMaterializerSettings(system))
  val flow = Http().outgoingConnection("localhost", 8082)
  val request = HttpRequest(HttpMethods.GET, "http://localhost:8082/api/notification/push/foo")

  val resp = 
    Source.
      single(request).
      via(flow).
      takeWithin(5 seconds).
      runWith(Sink.head)

  import system.dispatcher
  resp.onComplete{
    case tr => 
      log.info("Got response {}", tr)
  }

And then the simple little route that takes longer than the allowed 5 second timeout:

pathPrefix("api" / "notification" / "push"){  
  path("foo"){
    val fut = Future{
      Thread.sleep(10000)
      "bar"
    }
    onComplete(fut){
      case tr => complete(StatusCodes.OK)
    }
  }
}

Here is the log output from the client side:

[DEBUG] [10/27/2015 09:18:44.710] [main] [EventStream(akka://test)] logger log1-Logging$DefaultLogger started
[DEBUG] [10/27/2015 09:18:44.710] [main] [EventStream(akka://test)] Default Loggers started
[DEBUG] [10/27/2015 09:18:45.530] [test-akka.actor.default-dispatcher-2] [akka://test/system/IO-TCP/selectors/$a] Executing [WorkerForCommand(Connect(localhost/127.0.0.1:8082,None,List(),Some(10 seconds),true),Actor[akka://test/system/IO-TCP-STREAM/client-1-localhost%2F127.0.0.1%3A8082#-837821763],)]
[DEBUG] [10/27/2015 09:18:45.541] [test-akka.actor.default-dispatcher-8] [akka://test/system/IO-TCP/selectors/$a/0] Attempting connection to [localhost/127.0.0.1:8082]
[DEBUG] [10/27/2015 09:18:45.543] [test-akka.actor.default-dispatcher-8] [akka://test/system/IO-TCP/selectors/$a/0] Connection established to [localhost/127.0.0.1:8082]
[DEBUG] [10/27/2015 09:18:45.546] [test-akka.actor.default-dispatcher-8] [akka://test/system/IO-TCP/selectors/$a/0] [Actor[akka://test/system/IO-TCP-STREAM/client-1-localhost%2F127.0.0.1%3A8082#-837821763]] registered as connection handler
[DEBUG] [10/27/2015 09:18:45.563] [test-akka.actor.default-dispatcher-8] [akka://test/system/IO-TCP/selectors/$a/0] Wrote [116] bytes to channel
[INFO] [10/27/2015 09:18:50.545] [test-akka.actor.default-dispatcher-8] [TimeoutTestHarness(akka://test)] Got response Failure(java.util.NoSuchElementException: empty stream)
[DEBUG] [10/27/2015 09:18:55.647] [test-akka.actor.default-dispatcher-7] [akka://test/system/IO-TCP/selectors/$a/0] Read [142] bytes.
[DEBUG] [10/27/2015 09:18:55.680] [test-akka.actor.default-dispatcher-4] [akka://test/system/IO-TCP/selectors/$a/0] Got Close command, closing connection.
[DEBUG] [10/27/2015 09:19:00.683] [test-akka.actor.default-dispatcher-7] [akka://test/user/$a/flow-3-0-publisherSource-prefixAndTail] Cancelling akka.stream.impl.MultiStreamOutputProcessor$SubstreamOutput@38208582 (after: 5000 ms)

This is a trivial example because the call does eventually return and the connection does eventually close. But say the call hung and never returned. If I keep opening connections that don't close I will eventually run into issues with open file handles. Let me know what you think.

@akka-ci
Copy link
Author

akka-ci commented Sep 8, 2016

Comment by nkvoll
Tuesday Mar 01, 2016 at 17:29 GMT


I'm slightly confused as to the state of this right now. The linked issue (akka/akka#17346), is closed, but as far as I can see, it's still only available for the server side?

I tried fiddling around with the APIs, but I couldn't find a way to handle request timeouts other than to wait for the idle-timeout to kick in, and cancelling the sink doesn't actually close the associated connection, which leaves dangling resources until the idle-timeout is reached.

Have I missed out on something obvious and/or what would be the proper issue to subscribe to?

@ktoso ktoso added 1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted and removed t:http labels Sep 8, 2016
@jrudolph jrudolph added t:client Issues related to the HTTP Client and removed t:client Issues related to the HTTP Client t:http:client labels Nov 2, 2016
@jrudolph
Copy link
Member

I'm closing this for now. There seem to be multiple issues discussed in this ticket but it's pretty unclear whether one or all of them are still current. Please reopen new tickets if something similar can be still observed with the latest version.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
1 - triaged Tickets that are safe to pick up for contributing in terms of likeliness of being accepted t:client Issues related to the HTTP Client
Projects
None yet
Development

No branches or pull requests

3 participants